Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

in_splunk: add ability to map records from specific tokens to specific tags #9831

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions plugins/in_splunk/splunk.c
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,12 @@ static struct flb_config_map config_map[] = {
""
},

{
FLB_CONFIG_MAP_SLIST_2, "map_token_to_tag", NULL,
FLB_CONFIG_MAP_MULT, FLB_TRUE, offsetof(struct flb_splunk, token_to_tag_mappings),
"Map input records from given Splunk HEC token to given tag. Multiple of these can be set to map different tokens to different tags."
},


/* EOF */
{0}
Expand Down
4 changes: 4 additions & 0 deletions plugins/in_splunk/splunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@
struct flb_splunk_tokens {
flb_sds_t header;
size_t length;
/* Fluentbit routing tag that records sent in with
this particular token should be given */
flb_sds_t map_to_tag;
struct mk_list _head;
};

Expand All @@ -52,6 +55,7 @@ struct flb_splunk {
size_t ingested_auth_header_len;
int store_token_in_metadata;
flb_sds_t store_token_key;
struct mk_list *token_to_tag_mappings;

struct flb_log_event_encoder log_encoder;

Expand Down
43 changes: 43 additions & 0 deletions plugins/in_splunk/splunk_config.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,19 @@ static int setup_hec_tokens(struct flb_splunk *ctx)
const char *raw_token;
struct mk_list *head = NULL;
struct mk_list *kvs = NULL;
int token_idx = 0;
struct flb_split_entry *cur = NULL;
flb_sds_t auth_header = NULL;
struct flb_splunk_tokens *splunk_token;
flb_sds_t credential = NULL;

struct flb_config_map_val *matching_token_mapping;
/* iterators for token to tag mappings */
struct mk_list *ttm_list_i;
struct flb_config_map_val *ttm_cmval_i;
struct flb_slist_entry *ttm_i_token;
struct flb_slist_entry *ttm_tag;

raw_token = flb_input_get_property("splunk_token", ctx->ins);
if (raw_token) {
kvs = flb_utils_split(raw_token, ',', -1 );
Expand Down Expand Up @@ -93,10 +101,45 @@ static int setup_hec_tokens(struct flb_splunk *ctx)
splunk_token->header = auth_header;
splunk_token->length = flb_sds_len(auth_header);

if (ctx->token_to_tag_mappings != NULL) {
int mapping_idx = 0;
matching_token_mapping = NULL;
/* search all the configured token_to_tag_mappings to see if the current
token is one that a mapping was specified for */
flb_config_map_foreach(ttm_list_i, ttm_cmval_i, ctx->token_to_tag_mappings) {
ttm_i_token = mk_list_entry_first(ttm_cmval_i->val.list,
struct flb_slist_entry,
_head);

if (flb_sds_cmp(ttm_i_token->str, credential, flb_sds_len(credential)) == 0) {
matching_token_mapping = ttm_cmval_i;
break;
}
mapping_idx += 1;
}
if (matching_token_mapping != NULL) {
/* Token is the first arg (list->next),
Tag is the second arg (list->next->next) */
ttm_tag = container_of(matching_token_mapping->val.list->next->next,
struct flb_slist_entry,
_head);
flb_plg_debug(ctx->ins, "token #%d will map to tag %s", token_idx + 1, ttm_tag->str);
splunk_token->map_to_tag = flb_sds_create(ttm_tag->str);
}
else {
flb_plg_warn(ctx->ins, "token #%d has no tag mapping, records from this token will not re-map to specific tag", token_idx + 1);
splunk_token->map_to_tag = NULL;
}
}
else {
splunk_token->map_to_tag = NULL;
}

flb_sds_destroy(credential);

/* Link to parent list */
mk_list_add(&splunk_token->_head, &ctx->auth_tokens);
token_idx++;
}
}

Expand Down
65 changes: 50 additions & 15 deletions plugins/in_splunk/splunk_prot.c
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,7 @@ static ssize_t parse_hec_payload_json(struct flb_splunk *ctx, flb_sds_t tag,
return 0;
}

static int validate_auth_header(struct flb_splunk *ctx, struct mk_http_request *request)
static int validate_auth_header(struct flb_splunk *ctx, struct mk_http_request *request, struct flb_splunk_tokens **matched_token_out)
{
int ret = 0;
struct mk_list *head;
Expand Down Expand Up @@ -517,6 +517,7 @@ static int validate_auth_header(struct flb_splunk *ctx, struct mk_http_request *
authorization,
splunk_token->length) == 0) {
flb_sds_destroy(authorization);
*matched_token_out = splunk_token;

return SPLUNK_AUTH_SUCCESS;
}
Expand Down Expand Up @@ -716,6 +717,8 @@ int splunk_prot_handle(struct flb_splunk *ctx, struct splunk_conn *conn,
off_t diff;
flb_sds_t tag;
struct mk_http_header *header;
struct flb_splunk_tokens *matched_token = NULL;
flb_sds_t tag_from_token = NULL;

if (request->uri.data[0] != '/') {
send_response(conn, 400, "error: invalid request\n");
Expand Down Expand Up @@ -818,7 +821,7 @@ int splunk_prot_handle(struct flb_splunk *ctx, struct splunk_conn *conn,

/* Under services/collector endpoints are required for
* authentication if provided splunk_token */
ret = validate_auth_header(ctx, request);
ret = validate_auth_header(ctx, request, &matched_token);
if (ret < 0){
send_response(conn, 401, "error: unauthorized\n");
if (ret == SPLUNK_AUTH_MISSING_CRED) {
Expand All @@ -834,6 +837,12 @@ int splunk_prot_handle(struct flb_splunk *ctx, struct splunk_conn *conn,
return -1;
}

/* Tokens can be configured to map to a particular tag */
if (matched_token != NULL && matched_token->map_to_tag != NULL) {
tag_from_token = matched_token->map_to_tag;
tag = tag_from_token;
}

/* If the request contains chunked transfer encoded data, decode it */\
if (mk_http_parser_is_content_chunked(&session->parser)) {
ret = mk_http_parser_chunked_decode(&session->parser,
Expand All @@ -845,7 +854,10 @@ int splunk_prot_handle(struct flb_splunk *ctx, struct splunk_conn *conn,
flb_plg_error(ctx->ins, "failed to decode chunked data");
send_response(conn, 400, "error: invalid chunked data\n");

flb_sds_destroy(tag);
/* Free the tag only if it was a temporarily-allocated/calculated one */
if (tag_from_token == NULL) {
flb_sds_destroy(tag);
}
mk_mem_free(uri);

return -1;
Expand Down Expand Up @@ -879,7 +891,10 @@ int splunk_prot_handle(struct flb_splunk *ctx, struct splunk_conn *conn,

ret = process_hec_payload(ctx, conn, tag, session, request);
if (ret == -2) {
flb_sds_destroy(tag);
/* Free the tag only if it was a temporarily-allocated/calculated one */
if (tag_from_token == NULL) {
flb_sds_destroy(tag);
}
mk_mem_free(uri);

if (out_chunked) {
Expand All @@ -899,7 +914,10 @@ int splunk_prot_handle(struct flb_splunk *ctx, struct splunk_conn *conn,
else {
send_response(conn, 400, "error: invalid HTTP endpoint\n");

flb_sds_destroy(tag);
/* Free the tag only if it was a temporarily-allocated/calculated one */
if (tag_from_token == NULL) {
flb_sds_destroy(tag);
}
mk_mem_free(uri);

if (out_chunked) {
Expand All @@ -914,7 +932,10 @@ int splunk_prot_handle(struct flb_splunk *ctx, struct splunk_conn *conn,
else {
/* HEAD, PUT, PATCH, and DELETE methods are prohibited to use.*/

flb_sds_destroy(tag);
/* Free the tag only if it was a temporarily-allocated/calculated one */
if (tag_from_token == NULL) {
flb_sds_destroy(tag);
}
mk_mem_free(uri);

if (out_chunked) {
Expand All @@ -927,7 +948,10 @@ int splunk_prot_handle(struct flb_splunk *ctx, struct splunk_conn *conn,
return -1;
}

flb_sds_destroy(tag);
/* Free the tag only if it was a temporarily-allocated/calculated one */
if (tag_from_token == NULL) {
flb_sds_destroy(tag);
}
mk_mem_free(uri);

if (out_chunked) {
Expand Down Expand Up @@ -1022,7 +1046,7 @@ static int send_json_message_response_ng(struct flb_http_response *response,
return 0;
}

static int validate_auth_header_ng(struct flb_splunk *ctx, struct flb_http_request *request)
static int validate_auth_header_ng(struct flb_splunk *ctx, struct flb_http_request *request, struct flb_splunk_tokens **matched_token_out)
{
struct mk_list *tmp;
struct mk_list *head;
Expand All @@ -1049,6 +1073,7 @@ static int validate_auth_header_ng(struct flb_splunk *ctx, struct flb_http_reque
if (strncasecmp(splunk_token->header,
auth_header,
splunk_token->length) == 0) {
*matched_token_out = splunk_token;
return SPLUNK_AUTH_SUCCESS;
}
}
Expand Down Expand Up @@ -1147,6 +1172,8 @@ int splunk_prot_handle_ng(struct flb_http_request *request,
struct flb_splunk *context;
int ret = -1;
flb_sds_t tag;
struct flb_splunk_tokens *matched_token = NULL;
flb_sds_t tag_from_token = NULL;

context = (struct flb_splunk *) response->stream->user_data;

Expand Down Expand Up @@ -1176,7 +1203,7 @@ int splunk_prot_handle_ng(struct flb_http_request *request,

/* Under services/collector endpoints are required for
* authentication if provided splunk_token */
ret = validate_auth_header_ng(context, request);
ret = validate_auth_header_ng(context, request, &matched_token);

if (ret < 0) {
send_response_ng(response, 401, "error: unauthorized\n");
Expand All @@ -1201,10 +1228,16 @@ int splunk_prot_handle_ng(struct flb_http_request *request,
return -1;
}

tag = flb_sds_create(context->ins->tag);

if (tag == NULL) {
return -1;
/* Tokens can be configured to map to a particular tag */
if (matched_token != NULL && matched_token->map_to_tag != NULL) {
tag_from_token = matched_token->map_to_tag;
tag = tag_from_token;
}
else {
tag = flb_sds_create(context->ins->tag);
if (tag == NULL) {
return -1;
}
}

if (strcasecmp(request->path, "/services/collector/raw/1.0") == 0 ||
Expand Down Expand Up @@ -1236,7 +1269,9 @@ int splunk_prot_handle_ng(struct flb_http_request *request,
send_response_ng(response, 400, "error: invalid HTTP endpoint\n");
ret = -1;
}

flb_sds_destroy(tag);
/* Free the tag only if it was a temporarily-allocated/calculated one */
if (tag_from_token == NULL) {
flb_sds_destroy(tag);
}
return ret;
}
Loading