From 870ce45b1c1ea8ad8817539be39f0ae5b0fc526c Mon Sep 17 00:00:00 2001 From: Stewart Webb Date: Sat, 11 Jan 2025 14:46:15 +1100 Subject: [PATCH] in_splunk: add config option for mapping records from specific tokens to specific tags --- plugins/in_splunk/splunk.c | 6 +++ plugins/in_splunk/splunk.h | 4 ++ plugins/in_splunk/splunk_config.c | 43 ++++++++++++++++++++ plugins/in_splunk/splunk_prot.c | 65 ++++++++++++++++++++++++------- 4 files changed, 103 insertions(+), 15 deletions(-) diff --git a/plugins/in_splunk/splunk.c b/plugins/in_splunk/splunk.c index f965b3c1ed1..1b07d615f74 100644 --- a/plugins/in_splunk/splunk.c +++ b/plugins/in_splunk/splunk.c @@ -254,6 +254,12 @@ static struct flb_config_map config_map[] = { "" }, + { + FLB_CONFIG_MAP_SLIST_2, "map_token_to_tag", NULL, + FLB_CONFIG_MAP_MULT, FLB_TRUE, offsetof(struct flb_splunk, token_to_tag_mappings), + "Map input records from given Splunk HEC token to given tag. Multiple of these can be set to map different tokens to different tags." + }, + /* EOF */ {0} diff --git a/plugins/in_splunk/splunk.h b/plugins/in_splunk/splunk.h index 5dc4645088c..5d228adef34 100644 --- a/plugins/in_splunk/splunk.h +++ b/plugins/in_splunk/splunk.h @@ -35,6 +35,9 @@ struct flb_splunk_tokens { flb_sds_t header; size_t length; + /* Fluentbit routing tag that records sent in with + this particular token should be given */ + flb_sds_t map_to_tag; struct mk_list _head; }; @@ -52,6 +55,7 @@ struct flb_splunk { size_t ingested_auth_header_len; int store_token_in_metadata; flb_sds_t store_token_key; + struct mk_list *token_to_tag_mappings; struct flb_log_event_encoder log_encoder; diff --git a/plugins/in_splunk/splunk_config.c b/plugins/in_splunk/splunk_config.c index a7c588658ee..ad22bd25be3 100644 --- a/plugins/in_splunk/splunk_config.c +++ b/plugins/in_splunk/splunk_config.c @@ -44,11 +44,19 @@ static int setup_hec_tokens(struct flb_splunk *ctx) const char *raw_token; struct mk_list *head = NULL; struct mk_list *kvs = NULL; + int token_idx = 0; struct flb_split_entry *cur = NULL; flb_sds_t auth_header = NULL; struct flb_splunk_tokens *splunk_token; flb_sds_t credential = NULL; + struct flb_config_map_val *matching_token_mapping; + /* iterators for token to tag mappings */ + struct mk_list *ttm_list_i; + struct flb_config_map_val *ttm_cmval_i; + struct flb_slist_entry *ttm_i_token; + struct flb_slist_entry *ttm_tag; + raw_token = flb_input_get_property("splunk_token", ctx->ins); if (raw_token) { kvs = flb_utils_split(raw_token, ',', -1 ); @@ -93,10 +101,45 @@ static int setup_hec_tokens(struct flb_splunk *ctx) splunk_token->header = auth_header; splunk_token->length = flb_sds_len(auth_header); + if (ctx->token_to_tag_mappings != NULL) { + int mapping_idx = 0; + matching_token_mapping = NULL; + /* search all the configured token_to_tag_mappings to see if the current + token is one that a mapping was specified for */ + flb_config_map_foreach(ttm_list_i, ttm_cmval_i, ctx->token_to_tag_mappings) { + ttm_i_token = mk_list_entry_first(ttm_cmval_i->val.list, + struct flb_slist_entry, + _head); + + if (flb_sds_cmp(ttm_i_token->str, credential, flb_sds_len(credential)) == 0) { + matching_token_mapping = ttm_cmval_i; + break; + } + mapping_idx += 1; + } + if (matching_token_mapping != NULL) { + /* Token is the first arg (list->next), + Tag is the second arg (list->next->next) */ + ttm_tag = container_of(matching_token_mapping->val.list->next->next, + struct flb_slist_entry, + _head); + flb_plg_debug(ctx->ins, "token #%d will map to tag %s", token_idx + 1, ttm_tag->str); + splunk_token->map_to_tag = flb_sds_create(ttm_tag->str); + } + else { + flb_plg_warn(ctx->ins, "token #%d has no tag mapping, records from this token will not re-map to specific tag", token_idx + 1); + splunk_token->map_to_tag = NULL; + } + } + else { + splunk_token->map_to_tag = NULL; + } + flb_sds_destroy(credential); /* Link to parent list */ mk_list_add(&splunk_token->_head, &ctx->auth_tokens); + token_idx++; } } diff --git a/plugins/in_splunk/splunk_prot.c b/plugins/in_splunk/splunk_prot.c index 6647b0b1736..9f70c0f60f8 100644 --- a/plugins/in_splunk/splunk_prot.c +++ b/plugins/in_splunk/splunk_prot.c @@ -483,7 +483,7 @@ static ssize_t parse_hec_payload_json(struct flb_splunk *ctx, flb_sds_t tag, return 0; } -static int validate_auth_header(struct flb_splunk *ctx, struct mk_http_request *request) +static int validate_auth_header(struct flb_splunk *ctx, struct mk_http_request *request, struct flb_splunk_tokens **matched_token_out) { int ret = 0; struct mk_list *head; @@ -517,6 +517,7 @@ static int validate_auth_header(struct flb_splunk *ctx, struct mk_http_request * authorization, splunk_token->length) == 0) { flb_sds_destroy(authorization); + *matched_token_out = splunk_token; return SPLUNK_AUTH_SUCCESS; } @@ -716,6 +717,8 @@ int splunk_prot_handle(struct flb_splunk *ctx, struct splunk_conn *conn, off_t diff; flb_sds_t tag; struct mk_http_header *header; + struct flb_splunk_tokens *matched_token = NULL; + flb_sds_t tag_from_token = NULL; if (request->uri.data[0] != '/') { send_response(conn, 400, "error: invalid request\n"); @@ -818,7 +821,7 @@ int splunk_prot_handle(struct flb_splunk *ctx, struct splunk_conn *conn, /* Under services/collector endpoints are required for * authentication if provided splunk_token */ - ret = validate_auth_header(ctx, request); + ret = validate_auth_header(ctx, request, &matched_token); if (ret < 0){ send_response(conn, 401, "error: unauthorized\n"); if (ret == SPLUNK_AUTH_MISSING_CRED) { @@ -834,6 +837,12 @@ int splunk_prot_handle(struct flb_splunk *ctx, struct splunk_conn *conn, return -1; } + /* Tokens can be configured to map to a particular tag */ + if (matched_token != NULL && matched_token->map_to_tag != NULL) { + tag_from_token = matched_token->map_to_tag; + tag = tag_from_token; + } + /* If the request contains chunked transfer encoded data, decode it */\ if (mk_http_parser_is_content_chunked(&session->parser)) { ret = mk_http_parser_chunked_decode(&session->parser, @@ -845,7 +854,10 @@ int splunk_prot_handle(struct flb_splunk *ctx, struct splunk_conn *conn, flb_plg_error(ctx->ins, "failed to decode chunked data"); send_response(conn, 400, "error: invalid chunked data\n"); - flb_sds_destroy(tag); + /* Free the tag only if it was a temporarily-allocated/calculated one */ + if (tag_from_token == NULL) { + flb_sds_destroy(tag); + } mk_mem_free(uri); return -1; @@ -879,7 +891,10 @@ int splunk_prot_handle(struct flb_splunk *ctx, struct splunk_conn *conn, ret = process_hec_payload(ctx, conn, tag, session, request); if (ret == -2) { - flb_sds_destroy(tag); + /* Free the tag only if it was a temporarily-allocated/calculated one */ + if (tag_from_token == NULL) { + flb_sds_destroy(tag); + } mk_mem_free(uri); if (out_chunked) { @@ -899,7 +914,10 @@ int splunk_prot_handle(struct flb_splunk *ctx, struct splunk_conn *conn, else { send_response(conn, 400, "error: invalid HTTP endpoint\n"); - flb_sds_destroy(tag); + /* Free the tag only if it was a temporarily-allocated/calculated one */ + if (tag_from_token == NULL) { + flb_sds_destroy(tag); + } mk_mem_free(uri); if (out_chunked) { @@ -914,7 +932,10 @@ int splunk_prot_handle(struct flb_splunk *ctx, struct splunk_conn *conn, else { /* HEAD, PUT, PATCH, and DELETE methods are prohibited to use.*/ - flb_sds_destroy(tag); + /* Free the tag only if it was a temporarily-allocated/calculated one */ + if (tag_from_token == NULL) { + flb_sds_destroy(tag); + } mk_mem_free(uri); if (out_chunked) { @@ -927,7 +948,10 @@ int splunk_prot_handle(struct flb_splunk *ctx, struct splunk_conn *conn, return -1; } - flb_sds_destroy(tag); + /* Free the tag only if it was a temporarily-allocated/calculated one */ + if (tag_from_token == NULL) { + flb_sds_destroy(tag); + } mk_mem_free(uri); if (out_chunked) { @@ -1022,7 +1046,7 @@ static int send_json_message_response_ng(struct flb_http_response *response, return 0; } -static int validate_auth_header_ng(struct flb_splunk *ctx, struct flb_http_request *request) +static int validate_auth_header_ng(struct flb_splunk *ctx, struct flb_http_request *request, struct flb_splunk_tokens **matched_token_out) { struct mk_list *tmp; struct mk_list *head; @@ -1049,6 +1073,7 @@ static int validate_auth_header_ng(struct flb_splunk *ctx, struct flb_http_reque if (strncasecmp(splunk_token->header, auth_header, splunk_token->length) == 0) { + *matched_token_out = splunk_token; return SPLUNK_AUTH_SUCCESS; } } @@ -1147,6 +1172,8 @@ int splunk_prot_handle_ng(struct flb_http_request *request, struct flb_splunk *context; int ret = -1; flb_sds_t tag; + struct flb_splunk_tokens *matched_token = NULL; + flb_sds_t tag_from_token = NULL; context = (struct flb_splunk *) response->stream->user_data; @@ -1176,7 +1203,7 @@ int splunk_prot_handle_ng(struct flb_http_request *request, /* Under services/collector endpoints are required for * authentication if provided splunk_token */ - ret = validate_auth_header_ng(context, request); + ret = validate_auth_header_ng(context, request, &matched_token); if (ret < 0) { send_response_ng(response, 401, "error: unauthorized\n"); @@ -1201,10 +1228,16 @@ int splunk_prot_handle_ng(struct flb_http_request *request, return -1; } - tag = flb_sds_create(context->ins->tag); - - if (tag == NULL) { - return -1; + /* Tokens can be configured to map to a particular tag */ + if (matched_token != NULL && matched_token->map_to_tag != NULL) { + tag_from_token = matched_token->map_to_tag; + tag = tag_from_token; + } + else { + tag = flb_sds_create(context->ins->tag); + if (tag == NULL) { + return -1; + } } if (strcasecmp(request->path, "/services/collector/raw/1.0") == 0 || @@ -1236,7 +1269,9 @@ int splunk_prot_handle_ng(struct flb_http_request *request, send_response_ng(response, 400, "error: invalid HTTP endpoint\n"); ret = -1; } - - flb_sds_destroy(tag); + /* Free the tag only if it was a temporarily-allocated/calculated one */ + if (tag_from_token == NULL) { + flb_sds_destroy(tag); + } return ret; } \ No newline at end of file