diff --git a/plugins/out_azure_blob/azure_blob.c b/plugins/out_azure_blob/azure_blob.c index c13da85204d..a3ac64b6fe3 100644 --- a/plugins/out_azure_blob/azure_blob.c +++ b/plugins/out_azure_blob/azure_blob.c @@ -51,6 +51,160 @@ struct worker_info { FLB_TLS_DEFINE(struct worker_info, worker_info); +static flb_sds_t cb_azb_msgpack_extract_log_key(void *out_context, const char *data, + uint64_t bytes) +{ + int i; + int records = 0; + int map_size; + int check = FLB_FALSE; + int found = FLB_FALSE; + int log_key_missing = 0; + int ret; + int alloc_error = 0; + struct flb_azure_blob *ctx = out_context; + char *val_buf; + char *key_str = NULL; + size_t key_str_size = 0; + size_t msgpack_size = bytes + bytes / 4; + size_t val_offset = 0; + flb_sds_t out_buf; + msgpack_object map; + msgpack_object key; + msgpack_object val; + struct flb_log_event_decoder log_decoder; + struct flb_log_event log_event; + + /* Iterate the original buffer and perform adjustments */ + records = flb_mp_count(data, bytes); + if (records <= 0) { + return NULL; + } + + /* Allocate buffer to store log_key contents */ + val_buf = flb_calloc(1, msgpack_size); + if (val_buf == NULL) { + flb_plg_error(ctx->ins, "Could not allocate enough " + "memory to read record"); + flb_errno(); + return NULL; + } + + ret = flb_log_event_decoder_init(&log_decoder, (char *) data, bytes); + + if (ret != FLB_EVENT_DECODER_SUCCESS) { + flb_plg_error(ctx->ins, + "Log event decoder initialization error : %d", ret); + + flb_free(val_buf); + + return NULL; + } + + + while (!alloc_error && + (ret = flb_log_event_decoder_next( + &log_decoder, + &log_event)) == FLB_EVENT_DECODER_SUCCESS) { + + /* Get the record/map */ + map = *log_event.body; + + if (map.type != MSGPACK_OBJECT_MAP) { + continue; + } + + map_size = map.via.map.size; + + /* Reset variables for found log_key and correct type */ + found = FLB_FALSE; + check = FLB_FALSE; + + /* Extract log_key from record and append to output buffer */ + for (i = 0; i < map_size; i++) { + key = map.via.map.ptr[i].key; + val = map.via.map.ptr[i].val; + + if (key.type == MSGPACK_OBJECT_BIN) { + key_str = (char *) key.via.bin.ptr; + key_str_size = key.via.bin.size; + check = FLB_TRUE; + } + if (key.type == MSGPACK_OBJECT_STR) { + key_str = (char *) key.via.str.ptr; + key_str_size = key.via.str.size; + check = FLB_TRUE; + } + + if (check == FLB_TRUE) { + if (strncmp(ctx->log_key, key_str, key_str_size) == 0) { + found = FLB_TRUE; + + /* + * Copy contents of value into buffer. Necessary to copy + * strings because flb_msgpack_to_json does not handle nested + * JSON gracefully and double escapes them. + */ + if (val.type == MSGPACK_OBJECT_BIN) { + memcpy(val_buf + val_offset, val.via.bin.ptr, val.via.bin.size); + val_offset += val.via.bin.size; + val_buf[val_offset] = '\n'; + val_offset++; + } + else if (val.type == MSGPACK_OBJECT_STR) { + memcpy(val_buf + val_offset, val.via.str.ptr, val.via.str.size); + val_offset += val.via.str.size; + val_buf[val_offset] = '\n'; + val_offset++; + } + else { + ret = flb_msgpack_to_json(val_buf + val_offset, + msgpack_size - val_offset, &val); + if (ret < 0) { + break; + } + val_offset += ret; + val_buf[val_offset] = '\n'; + val_offset++; + } + /* Exit early once log_key has been found for current record */ + break; + } + } + } + + /* If log_key was not found in the current record, mark log key as missing */ + if (found == FLB_FALSE) { + log_key_missing++; + } + } + + /* Throw error once per chunk if at least one log key was not found */ + if (log_key_missing > 0) { + flb_plg_error(ctx->ins, "Could not find log_key '%s' in %d records", + ctx->log_key, log_key_missing); + } + + flb_log_event_decoder_destroy(&log_decoder); + + /* If nothing was read, destroy buffer */ + if (val_offset == 0) { + flb_free(val_buf); + return NULL; + } + val_buf[val_offset] = '\0'; + + /* Create output buffer to store contents */ + out_buf = flb_sds_create(val_buf); + if (out_buf == NULL) { + flb_plg_error(ctx->ins, "Error creating buffer to store log_key contents."); + flb_errno(); + } + flb_free(val_buf); + + return out_buf; +} + static int azure_blob_format(struct flb_config *config, struct flb_input_instance *ins, void *plugin_context, @@ -63,10 +217,15 @@ static int azure_blob_format(struct flb_config *config, flb_sds_t out_buf; struct flb_azure_blob *ctx = plugin_context; - out_buf = flb_pack_msgpack_to_json_format(data, bytes, - FLB_PACK_JSON_FORMAT_LINES, - FLB_PACK_JSON_DATE_ISO8601, - ctx->date_key); + if (ctx->log_key) { + out_buf = cb_azb_msgpack_extract_log_key(ctx, data, bytes); + } + else { + out_buf = flb_pack_msgpack_to_json_format(data, bytes, + FLB_PACK_JSON_FORMAT_LINES, + FLB_PACK_JSON_DATE_ISO8601, + ctx->date_key); + } if (!out_buf) { return -1; } @@ -593,13 +752,13 @@ static int ensure_container(struct flb_azure_blob *ctx) else if (status == 200) { flb_plg_info(ctx->ins, "container '%s' already exists", ctx->container_name); return FLB_TRUE; - } + } else if (status == 403) { flb_plg_error(ctx->ins, "failed getting container '%s', access denied", ctx->container_name); return FLB_FALSE; } - + flb_plg_error(ctx->ins, "get container request failed, status=%i", status); @@ -1171,6 +1330,14 @@ static struct flb_config_map config_map[] = { "Set the block type: appendblob or blockblob" }, + { + FLB_CONFIG_MAP_STR, "log_key", NULL, + 0, FLB_TRUE, offsetof(struct flb_azure_blob, log_key), + "By default, the whole log record will be sent to blob storage. " + "If you specify a key name with this option, then only the value of " + "that key will be sent" + }, + { FLB_CONFIG_MAP_STR, "compress", NULL, 0, FLB_FALSE, 0, diff --git a/plugins/out_azure_blob/azure_blob.h b/plugins/out_azure_blob/azure_blob.h index 361e348de79..3b396cd88fb 100644 --- a/plugins/out_azure_blob/azure_blob.h +++ b/plugins/out_azure_blob/azure_blob.h @@ -53,6 +53,7 @@ struct flb_azure_blob { flb_sds_t account_name; flb_sds_t container_name; flb_sds_t blob_type; + flb_sds_t log_key; flb_sds_t shared_key; flb_sds_t endpoint; flb_sds_t path;