Skip to content

Commit

Permalink
in_http: parse msgpack payloads emitted by out_http.
Browse files Browse the repository at this point in the history
Signed-off-by: Phillip Whelan <[email protected]>
  • Loading branch information
pwhelan committed Feb 19, 2024
1 parent bf76745 commit a34b5ed
Showing 1 changed file with 108 additions and 0 deletions.
108 changes: 108 additions & 0 deletions plugins/in_http/http_prot.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

#define HTTP_CONTENT_JSON 0
#define HTTP_CONTENT_URLENCODED 1
#define HTTP_CONTENT_MSGPACK 2

static inline char hex2nibble(char c)
{
Expand Down Expand Up @@ -512,6 +513,101 @@ static ssize_t parse_payload_urlencoded(struct flb_http *ctx, flb_sds_t tag,
return ret;
}

static ssize_t parse_payload_msgpack(struct flb_http *ctx, flb_sds_t tag,
char *payload, size_t size)
{
int ret = FLB_EVENT_ENCODER_SUCCESS;
struct flb_time tm;
size_t offset = 0;
msgpack_unpacked result;
msgpack_object *record;
msgpack_object *metadata;
msgpack_object *data;
flb_sds_t tag_from_record = NULL;


msgpack_unpacked_init(&result);

while (ret == FLB_EVENT_ENCODER_SUCCESS &&
msgpack_unpack_next(&result, payload, size, &offset) == MSGPACK_UNPACK_SUCCESS) {

if (result.data.type != MSGPACK_OBJECT_ARRAY) {
msgpack_unpacked_destroy(&result);
return -1;
}

record = &result.data;
metadata = &record->via.array.ptr[0];
data = &record->via.array.ptr[1];

if (ctx->tag_key) {
tag_from_record = tag_key(ctx, data);
}

ret = flb_log_event_encoder_begin_record(&ctx->log_encoder);

if (ret != FLB_EVENT_ENCODER_SUCCESS) {
msgpack_unpacked_destroy(&result);
return -1;
}

ret = flb_time_msgpack_to_time(&tm, &metadata->via.array.ptr[0]);

if (ret == -1) {
msgpack_unpacked_destroy(&result);
return -1;
}

ret = flb_log_event_encoder_set_timestamp(
&ctx->log_encoder,
&tm);

if (ret != FLB_EVENT_ENCODER_SUCCESS) {
msgpack_unpacked_destroy(&result);
return -1;
}

ret = flb_log_event_encoder_set_body_from_msgpack_object(&ctx->log_encoder, data);
if (ret != FLB_EVENT_ENCODER_SUCCESS) {
msgpack_unpacked_destroy(&result);
return -1;
}

ret = flb_log_event_encoder_commit_record(&ctx->log_encoder);
if (ret != FLB_EVENT_ENCODER_SUCCESS) {
msgpack_unpacked_destroy(&result);
return -1;
}

if (tag_from_record) {
ret = flb_input_log_append(ctx->ins, tag_from_record,
flb_sds_len(tag_from_record),
ctx->log_encoder.output_buffer,
ctx->log_encoder.output_length);
}
else if (tag) {
ret = flb_input_log_append(ctx->ins, tag, flb_sds_len(tag),
ctx->log_encoder.output_buffer,
ctx->log_encoder.output_length);
}
else {
ret = flb_input_log_append(ctx->ins, NULL, 0,
ctx->log_encoder.output_buffer,
ctx->log_encoder.output_length);
}

if (ret != 0) {
msgpack_unpacked_destroy(&result);
return -1;
}

flb_log_event_encoder_reset(&ctx->log_encoder);
}

msgpack_unpacked_destroy(&result);
return 0;
}

static int process_payload(struct flb_http *ctx, struct http_conn *conn,
flb_sds_t tag,
struct mk_http_session *session,
Expand All @@ -537,6 +633,11 @@ static int process_payload(struct flb_http *ctx, struct http_conn *conn,
type = HTTP_CONTENT_URLENCODED;
}

if (header->val.len == 19 &&
strncasecmp(header->val.data, "application/msgpack", 19) == 0) {
type = HTTP_CONTENT_MSGPACK;
}

if (type == -1) {
send_response(conn, 400, "error: invalid 'Content-Type'\n");
return -1;
Expand All @@ -557,6 +658,13 @@ static int process_payload(struct flb_http *ctx, struct http_conn *conn,
return -1;
}
}
else if (type == HTTP_CONTENT_MSGPACK) {
ret = parse_payload_msgpack(ctx, tag, request->data.data, request->data.len);
if (ret != 0) {
send_response(conn, 400, "error: invalid msgpack payload\n");
return -1;
}
}

return 0;
}
Expand Down

0 comments on commit a34b5ed

Please sign in to comment.