Skip to content

Commit

Permalink
processor_labels: add support for record accessor in upsert action
Browse files Browse the repository at this point in the history
Signed-off-by: Eduardo Silva <[email protected]>
  • Loading branch information
edsiper committed Jan 16, 2025
1 parent 27734ee commit ebc1d3a
Showing 1 changed file with 113 additions and 18 deletions.
131 changes: 113 additions & 18 deletions plugins/processor_labels/labels.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include <fluent-bit/flb_processor.h>
#include <fluent-bit/flb_log_event_decoder.h>
#include <fluent-bit/flb_log_event_encoder.h>
#include <fluent-bit/flb_record_accessor.h>

#include <cmetrics/cmetrics.h>
#include <cmetrics/cmt_histogram.h>
Expand All @@ -43,6 +44,16 @@

typedef int (*label_transformer)(struct cmt_metric *, cfl_sds_t *value);

struct label_kv {
cfl_sds_t key;
cfl_sds_t val;
/*
* record accessor is only set if a '$' string exists in the
* given value string, otherwise the string is copied directly into 'val'
*/
struct flb_record_accessor *ra;
struct cfl_list _head;
};
struct internal_processor_context {
struct mk_list *update_list;
struct mk_list *insert_list;
Expand Down Expand Up @@ -262,11 +273,11 @@ static int process_label_modification_kvlist_setting(
struct mk_list *source_list,
struct cfl_list *destination_list)
{
struct cfl_kv *processed_pair;
struct flb_config_map_val *source_entry;
struct mk_list *iterator;
struct flb_slist_entry *value;
struct flb_slist_entry *key;
struct label_kv *kv_node;

if (source_list == NULL ||
mk_list_is_empty(source_list) == 0) {
Expand All @@ -290,29 +301,82 @@ static int process_label_modification_kvlist_setting(
value = mk_list_entry_last(source_entry->val.list,
struct flb_slist_entry, _head);

processed_pair = cfl_kv_item_create(destination_list,
key->str,
value->str);
kv_node = flb_malloc(sizeof(struct label_kv));
if (kv_node == NULL) {
flb_errno();
return -1;
}

if (processed_pair == NULL) {
/* only initialize record accessor if a pattern is found */
if (strchr(value->str, '$') != NULL) {
kv_node->ra = flb_ra_create(value->str, FLB_FALSE);
if (kv_node->ra == NULL) {
flb_plg_error(plugin_instance,
"could not create record accessor for '%s'",
value->str);
return -1;
}
}
else {
kv_node->ra = NULL;
}

kv_node->key = cfl_sds_create(key->str);
if (kv_node->key == NULL) {
flb_ra_destroy(kv_node->ra);
flb_free(kv_node);
flb_plg_error(plugin_instance,
"could not append label %s=%s\n",
key->str,
value->str);
"could not create label key '%s'",
key->str);
return -1;
}

kv_node->val = cfl_sds_create(value->str);
if (kv_node->val == NULL) {
cfl_sds_destroy(kv_node->key);
flb_ra_destroy(kv_node->ra);
flb_free(kv_node);
flb_plg_error(plugin_instance,
"could not create label value '%s'",
value->str);
return -1;
}

cfl_list_add(&kv_node->_head, destination_list);
}

return 0;
}

static void destroy_label_kv_list(struct cfl_list *list)
{
struct cfl_list *tmp;
struct cfl_list *iterator;
struct label_kv *kv_node;

cfl_list_foreach_safe(iterator, tmp, list) {
kv_node = cfl_list_entry(iterator, struct label_kv, _head);

cfl_sds_destroy(kv_node->key);
cfl_sds_destroy(kv_node->val);

if (kv_node->ra != NULL) {
flb_ra_destroy(kv_node->ra);
}

cfl_list_del(&kv_node->_head);
flb_free(kv_node);
}
}

static void destroy_context(struct internal_processor_context *context)
{
if (context != NULL) {
cfl_kv_release(&context->update_labels);
cfl_kv_release(&context->insert_labels);
cfl_kv_release(&context->upsert_labels);

destroy_label_kv_list(&context->upsert_labels);

flb_slist_destroy(&context->delete_labels);
flb_slist_destroy(&context->hash_labels);

Expand All @@ -333,9 +397,15 @@ static struct internal_processor_context *
context->instance = processor_instance;
context->config = config;

cfl_kv_init(&context->update_labels);
cfl_kv_init(&context->insert_labels);
cfl_kv_init(&context->upsert_labels);
// WIP / FIXME
// cfl_kv_init(&context->update_labels);
// cfl_kv_init(&context->insert_labels);
// cfl_kv_init(&context->upsert_labels);

cfl_list_init(&context->update_labels);
cfl_list_init(&context->insert_labels);
cfl_list_init(&context->upsert_labels);

flb_slist_create(&context->delete_labels);
flb_slist_create(&context->hash_labels);

Expand Down Expand Up @@ -1553,38 +1623,63 @@ static int insert_labels(struct cmt *metrics_context,
}

static int upsert_labels(struct cmt *metrics_context,
char *tag, int tag_len,
struct cfl_list *labels)
{
struct cfl_list *iterator;
int result;
struct cfl_kv *pair;
struct cfl_list *iterator;
struct label_kv *pair;
flb_sds_t value;
msgpack_object o = {0};

cfl_list_foreach(iterator, labels) {
pair = cfl_list_entry(iterator, struct cfl_kv, _head);
pair = cfl_list_entry(iterator, struct label_kv, _head);

if (pair->ra != NULL) {
/* get the value using a record accessor pattern */
value = flb_ra_translate(pair->ra, tag, tag_len, o, NULL);
if (value == NULL) {
return FLB_FALSE;
}
}
else {
/* use the pre-defined string */
value = pair->val;
}

result = metrics_context_contains_dynamic_label(metrics_context,
pair->key);

if (result == FLB_TRUE) {
result = metrics_context_upsert_dynamic_label(metrics_context,
pair->key,
pair->val);
value);

if (result == FLB_FALSE) {
if (pair->ra != NULL) {
flb_sds_destroy(value);
}
return FLB_FALSE;
}
}
else {
result = metrics_context_upsert_static_label(metrics_context,
pair->key,
pair->val);
value);

if (result == FLB_FALSE) {
if (pair->ra != NULL) {
flb_sds_destroy(value);
}
return FLB_FALSE;
}
}
}

if (pair->ra != NULL) {
flb_sds_destroy(value);
}

return FLB_PROCESSOR_SUCCESS;
}

Expand Down Expand Up @@ -1729,7 +1824,7 @@ static int cb_process_metrics(struct flb_processor_instance *processor_instance,
}

if (result == FLB_PROCESSOR_SUCCESS) {
result = upsert_labels(out_cmt,
result = upsert_labels(out_cmt, (char *) tag, tag_len,
&processor_context->upsert_labels);
}

Expand Down

0 comments on commit ebc1d3a

Please sign in to comment.