Skip to content

Commit

Permalink
in_splunk: add config option for mapping records from specific tokens…
Browse files Browse the repository at this point in the history
… to specific tags
  • Loading branch information
nuclearpidgeon committed Jan 11, 2025
1 parent 14ca011 commit 870ce45
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 15 deletions.
6 changes: 6 additions & 0 deletions plugins/in_splunk/splunk.c
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,12 @@ static struct flb_config_map config_map[] = {
""
},

{
FLB_CONFIG_MAP_SLIST_2, "map_token_to_tag", NULL,
FLB_CONFIG_MAP_MULT, FLB_TRUE, offsetof(struct flb_splunk, token_to_tag_mappings),
"Map input records from given Splunk HEC token to given tag. Multiple of these can be set to map different tokens to different tags."
},


/* EOF */
{0}
Expand Down
4 changes: 4 additions & 0 deletions plugins/in_splunk/splunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@
struct flb_splunk_tokens {
flb_sds_t header;
size_t length;
/* Fluentbit routing tag that records sent in with
this particular token should be given */
flb_sds_t map_to_tag;
struct mk_list _head;
};

Expand All @@ -52,6 +55,7 @@ struct flb_splunk {
size_t ingested_auth_header_len;
int store_token_in_metadata;
flb_sds_t store_token_key;
struct mk_list *token_to_tag_mappings;

struct flb_log_event_encoder log_encoder;

Expand Down
43 changes: 43 additions & 0 deletions plugins/in_splunk/splunk_config.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,19 @@ static int setup_hec_tokens(struct flb_splunk *ctx)
const char *raw_token;
struct mk_list *head = NULL;
struct mk_list *kvs = NULL;
int token_idx = 0;
struct flb_split_entry *cur = NULL;
flb_sds_t auth_header = NULL;
struct flb_splunk_tokens *splunk_token;
flb_sds_t credential = NULL;

struct flb_config_map_val *matching_token_mapping;
/* iterators for token to tag mappings */
struct mk_list *ttm_list_i;
struct flb_config_map_val *ttm_cmval_i;
struct flb_slist_entry *ttm_i_token;
struct flb_slist_entry *ttm_tag;

raw_token = flb_input_get_property("splunk_token", ctx->ins);
if (raw_token) {
kvs = flb_utils_split(raw_token, ',', -1 );
Expand Down Expand Up @@ -93,10 +101,45 @@ static int setup_hec_tokens(struct flb_splunk *ctx)
splunk_token->header = auth_header;
splunk_token->length = flb_sds_len(auth_header);

if (ctx->token_to_tag_mappings != NULL) {
int mapping_idx = 0;
matching_token_mapping = NULL;
/* search all the configured token_to_tag_mappings to see if the current
token is one that a mapping was specified for */
flb_config_map_foreach(ttm_list_i, ttm_cmval_i, ctx->token_to_tag_mappings) {
ttm_i_token = mk_list_entry_first(ttm_cmval_i->val.list,
struct flb_slist_entry,
_head);

if (flb_sds_cmp(ttm_i_token->str, credential, flb_sds_len(credential)) == 0) {
matching_token_mapping = ttm_cmval_i;
break;
}
mapping_idx += 1;
}
if (matching_token_mapping != NULL) {
/* Token is the first arg (list->next),
Tag is the second arg (list->next->next) */
ttm_tag = container_of(matching_token_mapping->val.list->next->next,
struct flb_slist_entry,
_head);
flb_plg_debug(ctx->ins, "token #%d will map to tag %s", token_idx + 1, ttm_tag->str);
splunk_token->map_to_tag = flb_sds_create(ttm_tag->str);
}
else {
flb_plg_warn(ctx->ins, "token #%d has no tag mapping, records from this token will not re-map to specific tag", token_idx + 1);
splunk_token->map_to_tag = NULL;
}
}
else {
splunk_token->map_to_tag = NULL;
}

flb_sds_destroy(credential);

/* Link to parent list */
mk_list_add(&splunk_token->_head, &ctx->auth_tokens);
token_idx++;
}
}

Expand Down
65 changes: 50 additions & 15 deletions plugins/in_splunk/splunk_prot.c
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,7 @@ static ssize_t parse_hec_payload_json(struct flb_splunk *ctx, flb_sds_t tag,
return 0;
}

static int validate_auth_header(struct flb_splunk *ctx, struct mk_http_request *request)
static int validate_auth_header(struct flb_splunk *ctx, struct mk_http_request *request, struct flb_splunk_tokens **matched_token_out)
{
int ret = 0;
struct mk_list *head;
Expand Down Expand Up @@ -517,6 +517,7 @@ static int validate_auth_header(struct flb_splunk *ctx, struct mk_http_request *
authorization,
splunk_token->length) == 0) {
flb_sds_destroy(authorization);
*matched_token_out = splunk_token;

return SPLUNK_AUTH_SUCCESS;
}
Expand Down Expand Up @@ -716,6 +717,8 @@ int splunk_prot_handle(struct flb_splunk *ctx, struct splunk_conn *conn,
off_t diff;
flb_sds_t tag;
struct mk_http_header *header;
struct flb_splunk_tokens *matched_token = NULL;
flb_sds_t tag_from_token = NULL;

if (request->uri.data[0] != '/') {
send_response(conn, 400, "error: invalid request\n");
Expand Down Expand Up @@ -818,7 +821,7 @@ int splunk_prot_handle(struct flb_splunk *ctx, struct splunk_conn *conn,

/* Under services/collector endpoints are required for
* authentication if provided splunk_token */
ret = validate_auth_header(ctx, request);
ret = validate_auth_header(ctx, request, &matched_token);
if (ret < 0){
send_response(conn, 401, "error: unauthorized\n");
if (ret == SPLUNK_AUTH_MISSING_CRED) {
Expand All @@ -834,6 +837,12 @@ int splunk_prot_handle(struct flb_splunk *ctx, struct splunk_conn *conn,
return -1;
}

/* Tokens can be configured to map to a particular tag */
if (matched_token != NULL && matched_token->map_to_tag != NULL) {
tag_from_token = matched_token->map_to_tag;
tag = tag_from_token;
}

/* If the request contains chunked transfer encoded data, decode it */\
if (mk_http_parser_is_content_chunked(&session->parser)) {
ret = mk_http_parser_chunked_decode(&session->parser,
Expand All @@ -845,7 +854,10 @@ int splunk_prot_handle(struct flb_splunk *ctx, struct splunk_conn *conn,
flb_plg_error(ctx->ins, "failed to decode chunked data");
send_response(conn, 400, "error: invalid chunked data\n");

flb_sds_destroy(tag);
/* Free the tag only if it was a temporarily-allocated/calculated one */
if (tag_from_token == NULL) {
flb_sds_destroy(tag);
}
mk_mem_free(uri);

return -1;
Expand Down Expand Up @@ -879,7 +891,10 @@ int splunk_prot_handle(struct flb_splunk *ctx, struct splunk_conn *conn,

ret = process_hec_payload(ctx, conn, tag, session, request);
if (ret == -2) {
flb_sds_destroy(tag);
/* Free the tag only if it was a temporarily-allocated/calculated one */
if (tag_from_token == NULL) {
flb_sds_destroy(tag);
}
mk_mem_free(uri);

if (out_chunked) {
Expand All @@ -899,7 +914,10 @@ int splunk_prot_handle(struct flb_splunk *ctx, struct splunk_conn *conn,
else {
send_response(conn, 400, "error: invalid HTTP endpoint\n");

flb_sds_destroy(tag);
/* Free the tag only if it was a temporarily-allocated/calculated one */
if (tag_from_token == NULL) {
flb_sds_destroy(tag);
}
mk_mem_free(uri);

if (out_chunked) {
Expand All @@ -914,7 +932,10 @@ int splunk_prot_handle(struct flb_splunk *ctx, struct splunk_conn *conn,
else {
/* HEAD, PUT, PATCH, and DELETE methods are prohibited to use.*/

flb_sds_destroy(tag);
/* Free the tag only if it was a temporarily-allocated/calculated one */
if (tag_from_token == NULL) {
flb_sds_destroy(tag);
}
mk_mem_free(uri);

if (out_chunked) {
Expand All @@ -927,7 +948,10 @@ int splunk_prot_handle(struct flb_splunk *ctx, struct splunk_conn *conn,
return -1;
}

flb_sds_destroy(tag);
/* Free the tag only if it was a temporarily-allocated/calculated one */
if (tag_from_token == NULL) {
flb_sds_destroy(tag);
}
mk_mem_free(uri);

if (out_chunked) {
Expand Down Expand Up @@ -1022,7 +1046,7 @@ static int send_json_message_response_ng(struct flb_http_response *response,
return 0;
}

static int validate_auth_header_ng(struct flb_splunk *ctx, struct flb_http_request *request)
static int validate_auth_header_ng(struct flb_splunk *ctx, struct flb_http_request *request, struct flb_splunk_tokens **matched_token_out)
{
struct mk_list *tmp;
struct mk_list *head;
Expand All @@ -1049,6 +1073,7 @@ static int validate_auth_header_ng(struct flb_splunk *ctx, struct flb_http_reque
if (strncasecmp(splunk_token->header,
auth_header,
splunk_token->length) == 0) {
*matched_token_out = splunk_token;
return SPLUNK_AUTH_SUCCESS;
}
}
Expand Down Expand Up @@ -1147,6 +1172,8 @@ int splunk_prot_handle_ng(struct flb_http_request *request,
struct flb_splunk *context;
int ret = -1;
flb_sds_t tag;
struct flb_splunk_tokens *matched_token = NULL;
flb_sds_t tag_from_token = NULL;

context = (struct flb_splunk *) response->stream->user_data;

Expand Down Expand Up @@ -1176,7 +1203,7 @@ int splunk_prot_handle_ng(struct flb_http_request *request,

/* Under services/collector endpoints are required for
* authentication if provided splunk_token */
ret = validate_auth_header_ng(context, request);
ret = validate_auth_header_ng(context, request, &matched_token);

if (ret < 0) {
send_response_ng(response, 401, "error: unauthorized\n");
Expand All @@ -1201,10 +1228,16 @@ int splunk_prot_handle_ng(struct flb_http_request *request,
return -1;
}

tag = flb_sds_create(context->ins->tag);

if (tag == NULL) {
return -1;
/* Tokens can be configured to map to a particular tag */
if (matched_token != NULL && matched_token->map_to_tag != NULL) {
tag_from_token = matched_token->map_to_tag;
tag = tag_from_token;
}
else {
tag = flb_sds_create(context->ins->tag);
if (tag == NULL) {
return -1;
}
}

if (strcasecmp(request->path, "/services/collector/raw/1.0") == 0 ||
Expand Down Expand Up @@ -1236,7 +1269,9 @@ int splunk_prot_handle_ng(struct flb_http_request *request,
send_response_ng(response, 400, "error: invalid HTTP endpoint\n");
ret = -1;
}

flb_sds_destroy(tag);
/* Free the tag only if it was a temporarily-allocated/calculated one */
if (tag_from_token == NULL) {
flb_sds_destroy(tag);
}
return ret;
}

0 comments on commit 870ce45

Please sign in to comment.