Skip to content

Commit

Permalink
filter_parser: Add parameter to nest parsed fields under
Browse files Browse the repository at this point in the history
  • Loading branch information
RaJiska committed Jan 13, 2025
1 parent 417d129 commit 5414ee9
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 1 deletion.
56 changes: 56 additions & 0 deletions plugins/filter_parser/filter_parser.c
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,53 @@ static int delete_parsers(struct filter_parser_ctx *ctx)
return c;
}

static int nest_raw_map(struct filter_parser_ctx *ctx,
char **buf,
size_t *size,
const flb_sds_t key)
{
msgpack_sbuffer sbuf;
msgpack_packer pk;
msgpack_unpacked outbuf_result;
msgpack_object obj;
msgpack_object_kv *kv;
const size_t key_len = flb_sds_len(key);
int ret = 0;

msgpack_sbuffer_init(&sbuf);
msgpack_packer_init(&pk, &sbuf, msgpack_sbuffer_write);

msgpack_unpacked_init(&outbuf_result);
ret = msgpack_unpack_next(&outbuf_result, *buf, *size, NULL);
if (ret != MSGPACK_UNPACK_SUCCESS) {
flb_plg_error(ctx->ins,
"Nest: failed to unpack msgpack data with error code %d",
ret);
msgpack_unpacked_destroy(&outbuf_result);
return -1;
}

/* Create a new map, unpacking map `buf` under the new `key` root key */
obj = outbuf_result.data;
if (obj.type == MSGPACK_OBJECT_MAP) {
msgpack_pack_map(&pk, 1);
msgpack_pack_str(&pk, key_len);
msgpack_pack_str_body(&pk, key, key_len);
msgpack_pack_map(&pk, obj.via.map.size);
for (unsigned x = 0; x < obj.via.map.size; ++x) {
kv = &obj.via.map.ptr[x];
msgpack_pack_object(&pk, kv->key);
msgpack_pack_object(&pk, kv->val);
}
flb_free(*buf);
*buf = sbuf.data;
*size = sbuf.size;
}

msgpack_unpacked_destroy(&outbuf_result);
return 0;
}

static int configure(struct filter_parser_ctx *ctx,
struct flb_filter_instance *f_ins,
struct flb_config *config)
Expand Down Expand Up @@ -223,6 +270,7 @@ static int cb_parser_filter(const void *data, size_t bytes,

if (obj->type == MSGPACK_OBJECT_MAP) {
map_num = obj->via.map.size;

/* Calculate initial array size based on configuration */
append_arr_len = (ctx->reserve_data ? map_num : 0);
if (ctx->preserve_key && !ctx->reserve_data) {
Expand Down Expand Up @@ -301,6 +349,9 @@ static int cb_parser_filter(const void *data, size_t bytes,
}

if (out_buf != NULL && parse_ret >= 0) {
if (ctx->nest_under) {
nest_raw_map(ctx, &out_buf, &out_size, ctx->nest_under);
}
if (append_arr != NULL && append_arr_len > 0) {
char *new_buf = NULL;
int new_size;
Expand Down Expand Up @@ -440,6 +491,11 @@ static struct flb_config_map config_map[] = {
"Keep all other original fields in the parsed result. "
"If false, all other original fields will be removed."
},
{
FLB_CONFIG_MAP_STR, "Nest_Under", NULL,
0, FLB_TRUE, offsetof(struct filter_parser_ctx, nest_under),
"Specify field name to nest parsed records under."
},
{
FLB_CONFIG_MAP_DEPRECATED, "Unescape_key", NULL,
0, FLB_FALSE, 0,
Expand Down
1 change: 1 addition & 0 deletions plugins/filter_parser/filter_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ struct filter_parser_ctx {
int key_name_len;
int reserve_data;
int preserve_key;
flb_sds_t nest_under;
struct mk_list parsers;
struct flb_filter_instance *ins;
};
Expand Down
79 changes: 78 additions & 1 deletion tests/runtime/filter_parser.c
Original file line number Diff line number Diff line change
Expand Up @@ -1299,6 +1299,83 @@ void flb_test_filter_parser_reserve_on_preserve_on()
test_ctx_destroy(ctx);
}

void flb_test_filter_parser_nest_under_on()
{
int ret;
int bytes;
char *p, *output, *expected;
flb_ctx_t *ctx;
int in_ffd;
int out_ffd;
int filter_ffd;
struct flb_parser *parser;

struct flb_lib_out_cb cb;
cb.cb = callback_test;
cb.data = NULL;

clear_output();

ctx = flb_create();

/* Configure service */
flb_service_set(ctx, "Flush", FLUSH_INTERVAL, "Grace" "1", "Log_Level", "debug", NULL);

/* Input */
in_ffd = flb_input(ctx, (char *) "lib", NULL);
TEST_CHECK(in_ffd >= 0);
flb_input_set(ctx, in_ffd,
"Tag", "test",
NULL);

/* Parser */
parser = flb_parser_create("json", "json", NULL,
FLB_FALSE,
NULL, NULL, NULL, MK_FALSE, MK_TRUE, FLB_FALSE, FLB_FALSE,
NULL, 0, NULL, ctx->config);
TEST_CHECK(parser != NULL);

/* Filter */
filter_ffd = flb_filter(ctx, (char *) "parser", NULL);
TEST_CHECK(filter_ffd >= 0);
ret = flb_filter_set(ctx, filter_ffd,
"Match", "test",
"Key_Name", "to_parse",
"Nest_Under", "nest_key",
"Parser", "json",
NULL);
TEST_CHECK(ret == 0);

/* Output */
out_ffd = flb_output(ctx, (char *) "lib", &cb);
TEST_CHECK(out_ffd >= 0);
flb_output_set(ctx, out_ffd,
"Match", "*",
"format", "json",
NULL);

/* Start the engine */
ret = flb_start(ctx);
TEST_CHECK(ret == 0);

/* Ingest data */
p = "[1,{\"hello\":\"world\",\"some_object\":{\"foo\":\"bar\"},\"to_parse\":\"{\\\"key\\\":\\\"value\\\",\\\"object\\\":{\\\"a\\\":\\\"b\\\"}}\"}]";
bytes = flb_lib_push(ctx, in_ffd, p, strlen(p));
TEST_CHECK(bytes == strlen(p));

wait_with_timeout(1500, &output); /* waiting flush and ensuring data flush */
TEST_CHECK_(output != NULL, "Expected output to not be NULL");
if (output != NULL) {
/* check extra data was not preserved */
expected = "{\"nest_key\":{\"key\":\"value\",\"object\":{\"a\":\"b\"}}}";
TEST_CHECK_(strstr(output, expected) != NULL, "Expected output to contain key one , got '%s'", output);
free(output);
}

flb_stop(ctx);
flb_destroy(ctx);
}

TEST_LIST = {
{"filter_parser_extract_fields", flb_test_filter_parser_extract_fields },
{"filter_parser_reserve_data_off", flb_test_filter_parser_reserve_data_off },
Expand All @@ -1313,6 +1390,6 @@ TEST_LIST = {
{"filter_parser_reserve_off_preserve_on", flb_test_filter_parser_reserve_off_preserve_on},
{"filter_parser_reserve_on_preserve_off", flb_test_filter_parser_reserve_on_preserve_off},
{"filter_parser_reserve_on_preserve_on", flb_test_filter_parser_reserve_on_preserve_on},
{"filter_parser_nest_under_on", flb_test_filter_parser_nest_under_on},
{NULL, NULL}
};

0 comments on commit 5414ee9

Please sign in to comment.