Skip to content

Commit

Permalink
flb_chunk_trace: coordinate the stopping and starting of the pipeline…
Browse files Browse the repository at this point in the history
… thread.

Use a pthread cond to coordinate the startup and then shutdown of the
pipeline thread, avoiding deadlocks and unfreed resources.

Signed-off-by: Phillip Whelan <[email protected]>
  • Loading branch information
pwhelan authored and edsiper committed Oct 29, 2023
1 parent daffba9 commit 7ae6750
Showing 1 changed file with 23 additions and 4 deletions.
27 changes: 23 additions & 4 deletions src/flb_chunk_trace.c
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,15 @@ static void trace_chunk_context_destroy(struct flb_chunk_trace_context *ctxt)
}

flb_sds_destroy(ctxt->trace_prefix);
pthread_cond_signal(&ctxt->wait);
flb_sds_destroy(ctxt->output_name);
flb_trace("stop the pipeline");
flb_stop(ctxt->flb);
flb_trace("signaling pipeline thread to stop");
pthread_mutex_lock(&ctxt->lock);
pthread_cond_signal(&ctxt->wait);
pthread_mutex_unlock(&ctxt->lock);
flb_trace("joining pipeline thread...");
pthread_join(ctxt->thread, NULL);
flb_destroy(ctxt->flb);
flb_free(ctxt);
}
Expand All @@ -144,12 +151,13 @@ static void *pipeline_thread(void *arg)
struct mk_list *head = NULL;
struct flb_kv *prop = NULL;

flb_trace("[pipeline_thead]: waiting for start lock");
pthread_mutex_lock(&ctx->lock);
flb_trace("[pipeline_thead]: waited for start lock");

ctx->flb = flb_create();
if (ctx->flb == NULL) {
flb_errno();
pthread_mutex_unlock(&ctx->lock);
return NULL;
}

Expand Down Expand Up @@ -196,10 +204,16 @@ static void *pipeline_thread(void *arg)
ctx->output = (void *)output;
ctx->input = (void *)input;

flb_trace("[pipeline_thead]: start pipeline in thread");
flb_start(ctx->flb);
pthread_cond_signal(&ctx->wait);
pthread_mutex_unlock(&ctx->lock);
flb_trace("[pipeline_thead]: wait for exit of pipeline thread");
pthread_mutex_lock(&ctx->lock);
pthread_cond_wait(&ctx->wait, &ctx->lock);
pthread_mutex_unlock(&ctx->lock);

flb_trace("[pipeline_thead]: exit trace pipeline thread.");
return NULL;
error_output:
flb_output_instance_destroy(output);
Expand All @@ -210,6 +224,7 @@ static void *pipeline_thread(void *arg)
flb_input_instance_destroy(input);
error_flb:
flb_destroy(ctx->flb);
flb_trace("[pipeline_thead]: error: exit trace pipeline thread.");
return NULL;
}

Expand Down Expand Up @@ -261,8 +276,14 @@ struct flb_chunk_trace_context *flb_chunk_trace_context_new(void *trace_input,

pthread_mutex_init(&ctx->lock, NULL);
pthread_cond_init(&ctx->wait, NULL);

flb_trace("wait for pipeline to start");
pthread_mutex_lock(&ctx->lock);
flb_trace("waiting for pipeline to start");
pthread_create(&ctx->thread, NULL, pipeline_thread, ctx);
pthread_cond_wait(&ctx->wait, &ctx->lock);
pthread_mutex_unlock(&ctx->lock);
flb_trace("waited for pipeline to start");

ctx->trace_prefix = flb_sds_create(trace_prefix);

Expand All @@ -281,7 +302,6 @@ struct flb_chunk_trace *flb_chunk_trace_new(struct flb_input_chunk *chunk)
struct flb_chunk_trace *trace = NULL;
struct flb_input_instance *f_ins = (struct flb_input_instance *)chunk->in;

flb_error("new chunk trace");
pthread_mutex_lock(&f_ins->chunk_trace_lock);

if (flb_chunk_trace_to_be_destroyed(f_ins->chunk_trace_ctxt) == FLB_TRUE) {
Expand Down Expand Up @@ -316,7 +336,6 @@ struct flb_chunk_trace *flb_chunk_trace_new(struct flb_input_chunk *chunk)

void flb_chunk_trace_destroy(struct flb_chunk_trace *trace)
{
flb_error("destroy input chunk trace");
pthread_mutex_lock(&trace->ic->in->chunk_trace_lock);
flb_chunk_trace_sub(trace->ctxt);

Expand Down

0 comments on commit 7ae6750

Please sign in to comment.