From a34b5edfcebe8916a721cd874b32b76197ae8bc5 Mon Sep 17 00:00:00 2001 From: Phillip Whelan Date: Mon, 19 Feb 2024 18:07:14 -0300 Subject: [PATCH] in_http: parse msgpack payloads emitted by out_http. Signed-off-by: Phillip Whelan --- plugins/in_http/http_prot.c | 108 ++++++++++++++++++++++++++++++++++++ 1 file changed, 108 insertions(+) diff --git a/plugins/in_http/http_prot.c b/plugins/in_http/http_prot.c index de10697f523..066c0641453 100644 --- a/plugins/in_http/http_prot.c +++ b/plugins/in_http/http_prot.c @@ -30,6 +30,7 @@ #define HTTP_CONTENT_JSON 0 #define HTTP_CONTENT_URLENCODED 1 +#define HTTP_CONTENT_MSGPACK 2 static inline char hex2nibble(char c) { @@ -512,6 +513,101 @@ static ssize_t parse_payload_urlencoded(struct flb_http *ctx, flb_sds_t tag, return ret; } +static ssize_t parse_payload_msgpack(struct flb_http *ctx, flb_sds_t tag, + char *payload, size_t size) +{ + int ret = FLB_EVENT_ENCODER_SUCCESS; + struct flb_time tm; + size_t offset = 0; + msgpack_unpacked result; + msgpack_object *record; + msgpack_object *metadata; + msgpack_object *data; + flb_sds_t tag_from_record = NULL; + + + msgpack_unpacked_init(&result); + + while (ret == FLB_EVENT_ENCODER_SUCCESS && + msgpack_unpack_next(&result, payload, size, &offset) == MSGPACK_UNPACK_SUCCESS) { + + if (result.data.type != MSGPACK_OBJECT_ARRAY) { + msgpack_unpacked_destroy(&result); + return -1; + } + + record = &result.data; + metadata = &record->via.array.ptr[0]; + data = &record->via.array.ptr[1]; + + if (ctx->tag_key) { + tag_from_record = tag_key(ctx, data); + } + + ret = flb_log_event_encoder_begin_record(&ctx->log_encoder); + + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + msgpack_unpacked_destroy(&result); + return -1; + } + + ret = flb_time_msgpack_to_time(&tm, &metadata->via.array.ptr[0]); + + if (ret == -1) { + msgpack_unpacked_destroy(&result); + return -1; + } + + ret = flb_log_event_encoder_set_timestamp( + &ctx->log_encoder, + &tm); + + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + msgpack_unpacked_destroy(&result); + return -1; + } + + ret = flb_log_event_encoder_set_body_from_msgpack_object(&ctx->log_encoder, data); + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + msgpack_unpacked_destroy(&result); + return -1; + } + + ret = flb_log_event_encoder_commit_record(&ctx->log_encoder); + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + msgpack_unpacked_destroy(&result); + return -1; + } + + if (tag_from_record) { + ret = flb_input_log_append(ctx->ins, tag_from_record, + flb_sds_len(tag_from_record), + ctx->log_encoder.output_buffer, + ctx->log_encoder.output_length); + } + else if (tag) { + ret = flb_input_log_append(ctx->ins, tag, flb_sds_len(tag), + ctx->log_encoder.output_buffer, + ctx->log_encoder.output_length); + } + else { + ret = flb_input_log_append(ctx->ins, NULL, 0, + ctx->log_encoder.output_buffer, + ctx->log_encoder.output_length); + } + + if (ret != 0) { + msgpack_unpacked_destroy(&result); + return -1; + } + + flb_log_event_encoder_reset(&ctx->log_encoder); + } + + msgpack_unpacked_destroy(&result); + return 0; +} + static int process_payload(struct flb_http *ctx, struct http_conn *conn, flb_sds_t tag, struct mk_http_session *session, @@ -537,6 +633,11 @@ static int process_payload(struct flb_http *ctx, struct http_conn *conn, type = HTTP_CONTENT_URLENCODED; } + if (header->val.len == 19 && + strncasecmp(header->val.data, "application/msgpack", 19) == 0) { + type = HTTP_CONTENT_MSGPACK; + } + if (type == -1) { send_response(conn, 400, "error: invalid 'Content-Type'\n"); return -1; @@ -557,6 +658,13 @@ static int process_payload(struct flb_http *ctx, struct http_conn *conn, return -1; } } + else if (type == HTTP_CONTENT_MSGPACK) { + ret = parse_payload_msgpack(ctx, tag, request->data.data, request->data.len); + if (ret != 0) { + send_response(conn, 400, "error: invalid msgpack payload\n"); + return -1; + } + } return 0; }