diff --git a/src/flb_chunk_trace.c b/src/flb_chunk_trace.c index a28ed805424..2eebe4d92a4 100644 --- a/src/flb_chunk_trace.c +++ b/src/flb_chunk_trace.c @@ -118,8 +118,15 @@ static void trace_chunk_context_destroy(struct flb_chunk_trace_context *ctxt) } flb_sds_destroy(ctxt->trace_prefix); - pthread_cond_signal(&ctxt->wait); + flb_sds_destroy(ctxt->output_name); + flb_trace("stop the pipeline"); flb_stop(ctxt->flb); + flb_trace("signaling pipeline thread to stop"); + pthread_mutex_lock(&ctxt->lock); + pthread_cond_signal(&ctxt->wait); + pthread_mutex_unlock(&ctxt->lock); + flb_trace("joining pipeline thread..."); + pthread_join(ctxt->thread, NULL); flb_destroy(ctxt->flb); flb_free(ctxt); } @@ -144,12 +151,13 @@ static void *pipeline_thread(void *arg) struct mk_list *head = NULL; struct flb_kv *prop = NULL; + flb_trace("[pipeline_thead]: waiting for start lock"); pthread_mutex_lock(&ctx->lock); + flb_trace("[pipeline_thead]: waited for start lock"); ctx->flb = flb_create(); if (ctx->flb == NULL) { flb_errno(); - pthread_mutex_unlock(&ctx->lock); return NULL; } @@ -196,10 +204,16 @@ static void *pipeline_thread(void *arg) ctx->output = (void *)output; ctx->input = (void *)input; + flb_trace("[pipeline_thead]: start pipeline in thread"); flb_start(ctx->flb); pthread_cond_signal(&ctx->wait); + pthread_mutex_unlock(&ctx->lock); + flb_trace("[pipeline_thead]: wait for exit of pipeline thread"); + pthread_mutex_lock(&ctx->lock); pthread_cond_wait(&ctx->wait, &ctx->lock); + pthread_mutex_unlock(&ctx->lock); + flb_trace("[pipeline_thead]: exit trace pipeline thread."); return NULL; error_output: flb_output_instance_destroy(output); @@ -210,6 +224,7 @@ static void *pipeline_thread(void *arg) flb_input_instance_destroy(input); error_flb: flb_destroy(ctx->flb); + flb_trace("[pipeline_thead]: error: exit trace pipeline thread."); return NULL; } @@ -261,8 +276,14 @@ struct flb_chunk_trace_context *flb_chunk_trace_context_new(void *trace_input, pthread_mutex_init(&ctx->lock, NULL); pthread_cond_init(&ctx->wait, NULL); + + flb_trace("wait for pipeline to start"); + pthread_mutex_lock(&ctx->lock); + flb_trace("waiting for pipeline to start"); pthread_create(&ctx->thread, NULL, pipeline_thread, ctx); pthread_cond_wait(&ctx->wait, &ctx->lock); + pthread_mutex_unlock(&ctx->lock); + flb_trace("waited for pipeline to start"); ctx->trace_prefix = flb_sds_create(trace_prefix); @@ -281,7 +302,6 @@ struct flb_chunk_trace *flb_chunk_trace_new(struct flb_input_chunk *chunk) struct flb_chunk_trace *trace = NULL; struct flb_input_instance *f_ins = (struct flb_input_instance *)chunk->in; - flb_error("new chunk trace"); pthread_mutex_lock(&f_ins->chunk_trace_lock); if (flb_chunk_trace_to_be_destroyed(f_ins->chunk_trace_ctxt) == FLB_TRUE) { @@ -316,7 +336,6 @@ struct flb_chunk_trace *flb_chunk_trace_new(struct flb_input_chunk *chunk) void flb_chunk_trace_destroy(struct flb_chunk_trace *trace) { - flb_error("destroy input chunk trace"); pthread_mutex_lock(&trace->ic->in->chunk_trace_lock); flb_chunk_trace_sub(trace->ctxt);