diff --git a/plugins/processor_labels/labels.c b/plugins/processor_labels/labels.c index a0afd8314d4..4da5180bc02 100644 --- a/plugins/processor_labels/labels.c +++ b/plugins/processor_labels/labels.c @@ -28,6 +28,7 @@ #include #include #include +#include #include #include @@ -43,6 +44,16 @@ typedef int (*label_transformer)(struct cmt_metric *, cfl_sds_t *value); +struct label_kv { + cfl_sds_t key; + cfl_sds_t val; + /* + * record accessor is only set if a '$' string exists in the + * given value string, otherwise the string is copied directly into 'val' + */ + struct flb_record_accessor *ra; + struct cfl_list _head; +}; struct internal_processor_context { struct mk_list *update_list; struct mk_list *insert_list; @@ -262,11 +273,11 @@ static int process_label_modification_kvlist_setting( struct mk_list *source_list, struct cfl_list *destination_list) { - struct cfl_kv *processed_pair; struct flb_config_map_val *source_entry; struct mk_list *iterator; struct flb_slist_entry *value; struct flb_slist_entry *key; + struct label_kv *kv_node; if (source_list == NULL || mk_list_is_empty(source_list) == 0) { @@ -290,29 +301,82 @@ static int process_label_modification_kvlist_setting( value = mk_list_entry_last(source_entry->val.list, struct flb_slist_entry, _head); - processed_pair = cfl_kv_item_create(destination_list, - key->str, - value->str); + kv_node = flb_malloc(sizeof(struct label_kv)); + if (kv_node == NULL) { + flb_errno(); + return -1; + } - if (processed_pair == NULL) { + /* only initialize record accessor if a pattern is found */ + if (strchr(value->str, '$') != NULL) { + kv_node->ra = flb_ra_create(value->str, FLB_FALSE); + if (kv_node->ra == NULL) { + flb_plg_error(plugin_instance, + "could not create record accessor for '%s'", + value->str); + return -1; + } + } + else { + kv_node->ra = NULL; + } + + kv_node->key = cfl_sds_create(key->str); + if (kv_node->key == NULL) { + flb_ra_destroy(kv_node->ra); + flb_free(kv_node); flb_plg_error(plugin_instance, - "could not append label %s=%s\n", - key->str, - value->str); + "could not create label key '%s'", + key->str); + return -1; + } + kv_node->val = cfl_sds_create(value->str); + if (kv_node->val == NULL) { + cfl_sds_destroy(kv_node->key); + flb_ra_destroy(kv_node->ra); + flb_free(kv_node); + flb_plg_error(plugin_instance, + "could not create label value '%s'", + value->str); return -1; } + + cfl_list_add(&kv_node->_head, destination_list); } return 0; } +static void destroy_label_kv_list(struct cfl_list *list) +{ + struct cfl_list *tmp; + struct cfl_list *iterator; + struct label_kv *kv_node; + + cfl_list_foreach_safe(iterator, tmp, list) { + kv_node = cfl_list_entry(iterator, struct label_kv, _head); + + cfl_sds_destroy(kv_node->key); + cfl_sds_destroy(kv_node->val); + + if (kv_node->ra != NULL) { + flb_ra_destroy(kv_node->ra); + } + + cfl_list_del(&kv_node->_head); + flb_free(kv_node); + } +} + static void destroy_context(struct internal_processor_context *context) { if (context != NULL) { cfl_kv_release(&context->update_labels); cfl_kv_release(&context->insert_labels); - cfl_kv_release(&context->upsert_labels); + + destroy_label_kv_list(&context->upsert_labels); + flb_slist_destroy(&context->delete_labels); flb_slist_destroy(&context->hash_labels); @@ -333,9 +397,15 @@ static struct internal_processor_context * context->instance = processor_instance; context->config = config; - cfl_kv_init(&context->update_labels); - cfl_kv_init(&context->insert_labels); - cfl_kv_init(&context->upsert_labels); + // WIP / FIXME + // cfl_kv_init(&context->update_labels); + // cfl_kv_init(&context->insert_labels); + // cfl_kv_init(&context->upsert_labels); + + cfl_list_init(&context->update_labels); + cfl_list_init(&context->insert_labels); + cfl_list_init(&context->upsert_labels); + flb_slist_create(&context->delete_labels); flb_slist_create(&context->hash_labels); @@ -1553,14 +1623,29 @@ static int insert_labels(struct cmt *metrics_context, } static int upsert_labels(struct cmt *metrics_context, + char *tag, int tag_len, struct cfl_list *labels) { - struct cfl_list *iterator; int result; - struct cfl_kv *pair; + struct cfl_list *iterator; + struct label_kv *pair; + flb_sds_t value; + msgpack_object o = {0}; cfl_list_foreach(iterator, labels) { - pair = cfl_list_entry(iterator, struct cfl_kv, _head); + pair = cfl_list_entry(iterator, struct label_kv, _head); + + if (pair->ra != NULL) { + /* get the value using a record accessor pattern */ + value = flb_ra_translate(pair->ra, tag, tag_len, o, NULL); + if (value == NULL) { + return FLB_FALSE; + } + } + else { + /* use the pre-defined string */ + value = pair->val; + } result = metrics_context_contains_dynamic_label(metrics_context, pair->key); @@ -1568,23 +1653,33 @@ static int upsert_labels(struct cmt *metrics_context, if (result == FLB_TRUE) { result = metrics_context_upsert_dynamic_label(metrics_context, pair->key, - pair->val); + value); if (result == FLB_FALSE) { + if (pair->ra != NULL) { + flb_sds_destroy(value); + } return FLB_FALSE; } } else { result = metrics_context_upsert_static_label(metrics_context, pair->key, - pair->val); + value); if (result == FLB_FALSE) { + if (pair->ra != NULL) { + flb_sds_destroy(value); + } return FLB_FALSE; } } } + if (pair->ra != NULL) { + flb_sds_destroy(value); + } + return FLB_PROCESSOR_SUCCESS; } @@ -1729,7 +1824,7 @@ static int cb_process_metrics(struct flb_processor_instance *processor_instance, } if (result == FLB_PROCESSOR_SUCCESS) { - result = upsert_labels(out_cmt, + result = upsert_labels(out_cmt, (char *) tag, tag_len, &processor_context->upsert_labels); }