From daad13e172985a0c2d8782c85a31de9b2dee911d Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Thu, 16 Jan 2025 21:39:08 -0600 Subject: [PATCH] processor_labels: add support for record accessor Signed-off-by: Eduardo Silva --- plugins/processor_labels/labels.c | 305 ++++++++++++++++++++++-------- 1 file changed, 226 insertions(+), 79 deletions(-) diff --git a/plugins/processor_labels/labels.c b/plugins/processor_labels/labels.c index a0afd8314d4..5fd73fc3944 100644 --- a/plugins/processor_labels/labels.c +++ b/plugins/processor_labels/labels.c @@ -28,6 +28,7 @@ #include #include #include +#include #include #include @@ -43,6 +44,16 @@ typedef int (*label_transformer)(struct cmt_metric *, cfl_sds_t *value); +struct label_kv { + cfl_sds_t key; + cfl_sds_t val; + /* + * record accessor is only set if a '$' string exists in the + * given value string, otherwise the string is copied directly into 'val' + */ + struct flb_record_accessor *ra; + struct cfl_list _head; +}; struct internal_processor_context { struct mk_list *update_list; struct mk_list *insert_list; @@ -262,11 +273,11 @@ static int process_label_modification_kvlist_setting( struct mk_list *source_list, struct cfl_list *destination_list) { - struct cfl_kv *processed_pair; struct flb_config_map_val *source_entry; struct mk_list *iterator; struct flb_slist_entry *value; struct flb_slist_entry *key; + struct label_kv *kv_node; if (source_list == NULL || mk_list_is_empty(source_list) == 0) { @@ -290,29 +301,82 @@ static int process_label_modification_kvlist_setting( value = mk_list_entry_last(source_entry->val.list, struct flb_slist_entry, _head); - processed_pair = cfl_kv_item_create(destination_list, - key->str, - value->str); + kv_node = flb_malloc(sizeof(struct label_kv)); + if (kv_node == NULL) { + flb_errno(); + return -1; + } + + /* only initialize record accessor if a pattern is found */ + if (strchr(value->str, '$') != NULL) { + kv_node->ra = flb_ra_create(value->str, FLB_FALSE); + if (kv_node->ra == NULL) { + flb_plg_error(plugin_instance, + "could not create record accessor for '%s'", + value->str); + return -1; + } + } + else { + kv_node->ra = NULL; + } - if (processed_pair == NULL) { + kv_node->key = cfl_sds_create(key->str); + if (kv_node->key == NULL) { + flb_ra_destroy(kv_node->ra); + flb_free(kv_node); flb_plg_error(plugin_instance, - "could not append label %s=%s\n", - key->str, - value->str); + "could not create label key '%s'", + key->str); + return -1; + } + kv_node->val = cfl_sds_create(value->str); + if (kv_node->val == NULL) { + cfl_sds_destroy(kv_node->key); + flb_ra_destroy(kv_node->ra); + flb_free(kv_node); + flb_plg_error(plugin_instance, + "could not create label value '%s'", + value->str); return -1; } + + cfl_list_add(&kv_node->_head, destination_list); } return 0; } +static void destroy_label_kv_list(struct cfl_list *list) +{ + struct cfl_list *tmp; + struct cfl_list *iterator; + struct label_kv *kv_node; + + cfl_list_foreach_safe(iterator, tmp, list) { + kv_node = cfl_list_entry(iterator, struct label_kv, _head); + + cfl_sds_destroy(kv_node->key); + cfl_sds_destroy(kv_node->val); + + if (kv_node->ra != NULL) { + flb_ra_destroy(kv_node->ra); + } + + cfl_list_del(&kv_node->_head); + flb_free(kv_node); + } +} + static void destroy_context(struct internal_processor_context *context) { if (context != NULL) { - cfl_kv_release(&context->update_labels); - cfl_kv_release(&context->insert_labels); - cfl_kv_release(&context->upsert_labels); + + destroy_label_kv_list(&context->update_labels); + destroy_label_kv_list(&context->insert_labels); + destroy_label_kv_list(&context->upsert_labels); + flb_slist_destroy(&context->delete_labels); flb_slist_destroy(&context->hash_labels); @@ -320,70 +384,69 @@ static void destroy_context(struct internal_processor_context *context) } } -static struct internal_processor_context * - create_context(struct flb_processor_instance *processor_instance, - struct flb_config *config) +static struct internal_processor_context *create_context(struct flb_processor_instance *processor_instance, + struct flb_config *config) { - struct internal_processor_context *context; int result; + struct internal_processor_context *context; context = flb_calloc(1, sizeof(struct internal_processor_context)); + if (!context) { + flb_errno(); + return NULL; + } - if (context != NULL) { - context->instance = processor_instance; - context->config = config; - - cfl_kv_init(&context->update_labels); - cfl_kv_init(&context->insert_labels); - cfl_kv_init(&context->upsert_labels); - flb_slist_create(&context->delete_labels); - flb_slist_create(&context->hash_labels); + context->instance = processor_instance; + context->config = config; - result = flb_processor_instance_config_map_set(processor_instance, (void *) context); + cfl_list_init(&context->update_labels); + cfl_list_init(&context->insert_labels); + cfl_list_init(&context->upsert_labels); - if (result == 0) { - result = process_label_modification_kvlist_setting(processor_instance, - "update", - context->update_list, - &context->update_labels); - } + flb_slist_create(&context->delete_labels); + flb_slist_create(&context->hash_labels); - if (result == 0) { - result = process_label_modification_kvlist_setting(processor_instance, - "insert", - context->insert_list, - &context->insert_labels); - } + result = flb_processor_instance_config_map_set(processor_instance, (void *) context); - if (result == 0) { - result = process_label_modification_kvlist_setting(processor_instance, - "upsert", - context->upsert_list, - &context->upsert_labels); - } + if (result == 0) { + result = process_label_modification_kvlist_setting(processor_instance, + "update", + context->update_list, + &context->update_labels); + } - if (result == 0) { - result = process_label_modification_list_setting(processor_instance, - "delete", - context->delete_list, - &context->delete_labels); - } + if (result == 0) { + result = process_label_modification_kvlist_setting(processor_instance, + "insert", + context->insert_list, + &context->insert_labels); + } - if (result == 0) { - result = process_label_modification_list_setting(processor_instance, - "hash", - context->hash_list, - &context->hash_labels); - } + if (result == 0) { + result = process_label_modification_kvlist_setting(processor_instance, + "upsert", + context->upsert_list, + &context->upsert_labels); + } - if (result != 0) { - destroy_context(context); + if (result == 0) { + result = process_label_modification_list_setting(processor_instance, + "delete", + context->delete_list, + &context->delete_labels); + } - context = NULL; - } + if (result == 0) { + result = process_label_modification_list_setting(processor_instance, + "hash", + context->hash_list, + &context->hash_labels); } - else { - flb_errno(); + + if (result != 0) { + destroy_context(context); + + context = NULL; } return context; @@ -1470,25 +1533,64 @@ static int metrics_context_remove_dynamic_label(struct cmt *metrics_context, return FLB_TRUE; } +/* + * Retrieve the value based on a potential record_accessor patern or a direct + * mapping of the value set in the configuration. If the returned buffer + * must be freed by the caller, then 'destroy_buf' will be set to FLB_TRUE, + * otherwise it will be set to FLB_FALSE. + */ +static flb_sds_t get_label_value(struct label_kv *pair, char *tag, int tag_len, int *destroy_buf) +{ + flb_sds_t value; + msgpack_object o = {0}; + + *destroy_buf = FLB_FALSE; + + if (pair->ra != NULL) { + /* get the value using a record accessor pattern */ + value = flb_ra_translate(pair->ra, tag, tag_len, o, NULL); + if (value == NULL) { + return NULL; + } + } + else { + /* use the pre-defined string */ + value = pair->val; + } + + return value; +} + static int update_labels(struct cmt *metrics_context, + char *tag, int tag_len, struct cfl_list *labels) { - struct cfl_list *iterator; int result; - struct cfl_kv *pair; + int destroy_buf = FLB_FALSE; + struct cfl_list *iterator; + struct label_kv *pair; + flb_sds_t value = NULL; cfl_list_foreach(iterator, labels) { - pair = cfl_list_entry(iterator, struct cfl_kv, _head); + pair = cfl_list_entry(iterator, struct label_kv, _head); result = metrics_context_contains_dynamic_label(metrics_context, pair->key); + value = get_label_value(pair, tag, tag_len, &destroy_buf); + if (value == NULL) { + return FLB_FALSE; + } if (result == FLB_TRUE) { result = metrics_context_update_dynamic_label(metrics_context, pair->key, - pair->val); + value); + if (result == FLB_FALSE) { + if (destroy_buf == FLB_TRUE) { + flb_sds_destroy(value); + } return FLB_FALSE; } } @@ -1499,26 +1601,36 @@ static int update_labels(struct cmt *metrics_context, if (result == FLB_TRUE) { result = metrics_context_update_static_label(metrics_context, pair->key, - pair->val); + value); if (result == FLB_FALSE) { + if (destroy_buf == FLB_TRUE) { + flb_sds_destroy(value); + } return FLB_FALSE; } } } + if (destroy_buf == FLB_TRUE) { + flb_sds_destroy(value); + } + return FLB_PROCESSOR_SUCCESS; } static int insert_labels(struct cmt *metrics_context, + char *tag, int tag_len, struct cfl_list *labels) { - struct cfl_list *iterator; int result; - struct cfl_kv *pair; + int destroy_buf = FLB_FALSE; + struct cfl_list *iterator; + struct label_kv *pair; + flb_sds_t value = NULL; cfl_list_foreach(iterator, labels) { - pair = cfl_list_entry(iterator, struct cfl_kv, _head); + pair = cfl_list_entry(iterator, struct label_kv, _head); result = metrics_context_contains_dynamic_label(metrics_context, pair->key); @@ -1527,11 +1639,19 @@ static int insert_labels(struct cmt *metrics_context, continue; } + value = get_label_value(pair, tag, tag_len, &destroy_buf); + if (value == NULL) { + return FLB_FALSE; + } + result = metrics_context_insert_dynamic_label(metrics_context, pair->key, - pair->val); + value); if (result == FLB_FALSE) { + if (destroy_buf == FLB_TRUE) { + flb_sds_destroy(value); + } return FLB_FALSE; } @@ -1541,26 +1661,43 @@ static int insert_labels(struct cmt *metrics_context, if (result == FLB_TRUE) { result = metrics_context_insert_static_label(metrics_context, pair->key, - pair->val); + value); + if (result == FLB_FALSE) { + if (destroy_buf == FLB_TRUE) { + flb_sds_destroy(value); + } return FLB_FALSE; } } } + if (destroy_buf == FLB_TRUE) { + flb_sds_destroy(value); + } + return FLB_PROCESSOR_SUCCESS; } + static int upsert_labels(struct cmt *metrics_context, + char *tag, int tag_len, struct cfl_list *labels) { - struct cfl_list *iterator; int result; - struct cfl_kv *pair; + int destroy_buf = FLB_FALSE; + struct cfl_list *iterator; + struct label_kv *pair; + flb_sds_t value = NULL; cfl_list_foreach(iterator, labels) { - pair = cfl_list_entry(iterator, struct cfl_kv, _head); + pair = cfl_list_entry(iterator, struct label_kv, _head); + + value = get_label_value(pair, tag, tag_len, &destroy_buf); + if (value == NULL) { + return FLB_FALSE; + } result = metrics_context_contains_dynamic_label(metrics_context, pair->key); @@ -1568,23 +1705,33 @@ static int upsert_labels(struct cmt *metrics_context, if (result == FLB_TRUE) { result = metrics_context_upsert_dynamic_label(metrics_context, pair->key, - pair->val); + value); if (result == FLB_FALSE) { + if (destroy_buf == FLB_TRUE) { + flb_sds_destroy(value); + } return FLB_FALSE; } } else { result = metrics_context_upsert_static_label(metrics_context, pair->key, - pair->val); + value); if (result == FLB_FALSE) { + if (destroy_buf == FLB_TRUE) { + flb_sds_destroy(value); + } return FLB_FALSE; } } } + if (destroy_buf == FLB_TRUE) { + flb_sds_destroy(value); + } + return FLB_PROCESSOR_SUCCESS; } @@ -1724,17 +1871,17 @@ static int cb_process_metrics(struct flb_processor_instance *processor_instance, &processor_context->delete_labels); if (result == FLB_PROCESSOR_SUCCESS) { - result = update_labels(out_cmt, + result = update_labels(out_cmt, (char *) tag, tag_len, &processor_context->update_labels); } if (result == FLB_PROCESSOR_SUCCESS) { - result = upsert_labels(out_cmt, + result = upsert_labels(out_cmt, (char *) tag, tag_len, &processor_context->upsert_labels); } if (result == FLB_PROCESSOR_SUCCESS) { - result = insert_labels(out_cmt, + result = insert_labels(out_cmt, (char *) tag, tag_len, &processor_context->insert_labels); }