Skip to content

Commit

Permalink
in_opentelemetry: add a function to store metadata
Browse files Browse the repository at this point in the history
This function is to store fields other than body as metadata.

The fields are defined at
https://opentelemetry.io/docs/specs/otel/logs/data-model/#definitions-used-in-this-document

Signed-off-by: Takahiro Yamashita <[email protected]>
  • Loading branch information
nokute78 committed Jan 20, 2024
1 parent 0f0e64f commit ccdf39b
Showing 1 changed file with 137 additions and 41 deletions.
178 changes: 137 additions & 41 deletions plugins/in_opentelemetry/opentelemetry_prot.c
Original file line number Diff line number Diff line change
Expand Up @@ -387,17 +387,148 @@ static int otlp_pack_any_value(msgpack_packer *mp_pck,
return result;
}

/* https://opentelemetry.io/docs/specs/otel/logs/data-model/#log-and-event-record-definition */
static int otel_pack_v1_metadata(msgpack_packer *mp_pck,
struct Opentelemetry__Proto__Logs__V1__LogRecord *log_record,
Opentelemetry__Proto__Resource__V1__Resource *resource,
Opentelemetry__Proto__Common__V1__InstrumentationScope *scope)
{
struct flb_mp_map_header mh;
struct flb_mp_map_header scope_mh;
int ret;
flb_mp_map_header_init(&mh, mp_pck);

flb_mp_map_header_append(&mh);
msgpack_pack_str(mp_pck, 17);
msgpack_pack_str_body(mp_pck, "ObservedTimestamp", 17);
msgpack_pack_uint64(mp_pck, log_record->observed_time_unix_nano);

/* Value of 0 indicates unknown or missing timestamp. */
if (log_record->time_unix_nano != 0) {
flb_mp_map_header_append(&mh);
msgpack_pack_str(mp_pck, 9);
msgpack_pack_str_body(mp_pck, "Timestamp", 9);
msgpack_pack_uint64(mp_pck, log_record->time_unix_nano);
}

/* https://opentelemetry.io/docs/specs/otel/logs/data-model/#field-severitynumber */
if (log_record->severity_number >= 1 && log_record->severity_number <= 24) {
flb_mp_map_header_append(&mh);
msgpack_pack_str(mp_pck, 14);
msgpack_pack_str_body(mp_pck, "SeverityNumber", 14);
msgpack_pack_uint64(mp_pck, log_record->severity_number);
}

if (log_record->severity_text != NULL && strlen(log_record->severity_text) > 0) {
flb_mp_map_header_append(&mh);
msgpack_pack_str(mp_pck, 12);
msgpack_pack_str_body(mp_pck, "SeverityText", 12);
msgpack_pack_str(mp_pck, strlen(log_record->severity_text));
msgpack_pack_str_body(mp_pck, log_record->severity_text, strlen(log_record->severity_text));
}

if (log_record->n_attributes > 0) {
flb_mp_map_header_append(&mh);
msgpack_pack_str(mp_pck, 10);
msgpack_pack_str_body(mp_pck, "Attributes", 10);
ret = otel_pack_kvarray(mp_pck,
log_record->attributes,
log_record->n_attributes);
if (ret != 0) {
return ret;
}
}

if (log_record->trace_id.len > 0) {
flb_mp_map_header_append(&mh);
msgpack_pack_str(mp_pck, 7);
msgpack_pack_str_body(mp_pck, "TraceId", 7);
ret = otel_pack_bytes(mp_pck, log_record->trace_id);
if (ret != 0) {
return ret;
}
}

if (log_record->span_id.len > 0) {
flb_mp_map_header_append(&mh);
msgpack_pack_str(mp_pck, 6);
msgpack_pack_str_body(mp_pck, "SpanId", 6);
ret = otel_pack_bytes(mp_pck, log_record->span_id);
if (ret != 0) {
return ret;
}
}

flb_mp_map_header_append(&mh);
msgpack_pack_str(mp_pck, 10);
msgpack_pack_str_body(mp_pck, "TraceFlags", 10);
msgpack_pack_uint8(mp_pck, (uint8_t)log_record->flags & 0xff);



if (resource != NULL && resource->n_attributes > 0 && resource->attributes) {
flb_mp_map_header_append(&mh);
msgpack_pack_str(mp_pck, 8);
msgpack_pack_str_body(mp_pck, "Resource", 8);

ret = otel_pack_kvarray(mp_pck,
resource->attributes,
resource->n_attributes);
if (ret != 0) {
return ret;
}
}

if (scope != NULL && (scope->name || scope->version || scope->n_attributes > 0)) {
flb_mp_map_header_append(&mh);
msgpack_pack_str(mp_pck, 20);
msgpack_pack_str_body(mp_pck, "InstrumentationScope", 20);

flb_mp_map_header_init(&scope_mh, mp_pck);
if (scope->name != NULL && strlen(scope->name) > 0) {
flb_mp_map_header_append(&scope_mh);
msgpack_pack_str(mp_pck, 4);
msgpack_pack_str_body(mp_pck, "Name", 4);
msgpack_pack_str(mp_pck, strlen(scope->name));
msgpack_pack_str_body(mp_pck, scope->name, strlen(scope->name));
}
if (scope->version != NULL && strlen(scope->version) > 0) {
flb_mp_map_header_append(&scope_mh);
msgpack_pack_str(mp_pck, 7);
msgpack_pack_str_body(mp_pck, "Version", 7);
msgpack_pack_str(mp_pck, strlen(scope->version));
msgpack_pack_str_body(mp_pck, scope->version, strlen(scope->version));
}
if (scope->n_attributes > 0 && scope->attributes) {
msgpack_pack_str(mp_pck, 10);
msgpack_pack_str_body(mp_pck, "Attributes", 10);
ret = otel_pack_kvarray(mp_pck,
scope->attributes,
scope->n_attributes);
if (ret != 0) {
return ret;
}
}

flb_mp_map_header_end(&scope_mh);
}

flb_mp_map_header_end(&mh);
return 0;
}

static int binary_payload_to_msgpack(struct flb_log_event_encoder *encoder,
uint8_t *in_buf,
size_t in_size)
{
int ret;
msgpack_packer packer;
msgpack_sbuffer buffer;
msgpack_packer meta_packer;
msgpack_sbuffer meta_buffer;
int resource_logs_index;
int scope_log_index;
int log_record_index;
struct flb_mp_map_header mh;

Opentelemetry__Proto__Collector__Logs__V1__ExportLogsServiceRequest *input_logs;
Opentelemetry__Proto__Logs__V1__ScopeLogs **scope_logs;
Expand All @@ -409,6 +540,8 @@ static int binary_payload_to_msgpack(struct flb_log_event_encoder *encoder,

msgpack_sbuffer_init(&buffer);
msgpack_packer_init(&packer, &buffer, msgpack_sbuffer_write);
msgpack_sbuffer_init(&meta_buffer);
msgpack_packer_init(&meta_packer, &buffer, msgpack_sbuffer_write);

input_logs = opentelemetry__proto__collector__logs__v1__export_logs_service_request__unpack(NULL, in_size, in_buf);
if (input_logs == NULL) {
Expand Down Expand Up @@ -453,51 +586,13 @@ static int binary_payload_to_msgpack(struct flb_log_event_encoder *encoder,
}

if (ret == FLB_EVENT_ENCODER_SUCCESS) {
flb_mp_map_header_init(&mh, &packer);

/* pack resource */
flb_mp_map_header_append(&mh);
msgpack_pack_str(&packer, 8);
msgpack_pack_str_body(&packer, "resource", 8);
if (resource != NULL) {
msgpack_pack_map(&packer, 1);

msgpack_pack_str(&packer, 10);
msgpack_pack_str_body(&packer, "attributes", 10);
ret = otel_pack_kvarray(
&packer,
resource->attributes,
resource->n_attributes);
}
else {
msgpack_pack_map(&packer, 0);
}

if (ret != 0) {
flb_error("[otel] Failed to convert log resource attributes");
goto binary_payload_to_msgpack_end;
}

/* pack logRecords */
flb_mp_map_header_append(&mh);
msgpack_pack_str(&packer, 10);
msgpack_pack_str_body(&packer, "logRecords", 10);

msgpack_pack_map(&packer, 1);
msgpack_pack_str(&packer, 10);
msgpack_pack_str_body(&packer, "attributes", 10);
ret = otel_pack_kvarray(
&packer,
log_records[log_record_index]->attributes,
log_records[log_record_index]->n_attributes);

ret = otel_pack_v1_metadata(&meta_packer, log_records[log_record_index], resource, scope_log->scope);
if (ret != 0) {
flb_error("[otel] Failed to convert log record attributes");
flb_error("[otel] Failed to convert log record");

ret = FLB_EVENT_ENCODER_ERROR_SERIALIZATION_FAILURE;
}
else {
flb_mp_map_header_end(&mh);
ret = flb_log_event_encoder_set_metadata_from_raw_msgpack(
encoder,
buffer.data,
Expand Down Expand Up @@ -549,6 +644,7 @@ static int binary_payload_to_msgpack(struct flb_log_event_encoder *encoder,

binary_payload_to_msgpack_end:
msgpack_sbuffer_destroy(&buffer);
msgpack_sbuffer_destroy(&meta_buffer);
if (input_logs) {
opentelemetry__proto__collector__logs__v1__export_logs_service_request__free_unpacked(
input_logs, NULL);
Expand Down

0 comments on commit ccdf39b

Please sign in to comment.