Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

loki_out: add stuctured_metadata_map_keys #9530

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions docker_compose/loki-grafana-structured_metadata_map/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
### Description

This directory has a docker-compose file and its
configuration required to run:

1) A fluentbit installation with a dummy input, and Loki output configured for `structured_metadata_map_keys`
3) A Loki installation
4) A grafana installation with a default Loki datasource

To run this, execute:

$ docker-compose up --force-recreate -d

n.b., the [docker compose file](./docker-compose.yml) contains an `image` and a commented out `build` section. Change
these to build from local source.
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
service:
log_level: debug

pipeline:
inputs:
- name: dummy
tag: logs
dummy: |
{
"message": "simple log generated",
"logger": "my.logger",
"level": "INFO",
"hostname": "localhost",
"my_map_of_attributes_1": {
"key_1": "hello, world!",
"key_2": "goodbye, world!"
},
"my_map_of_maps_1": {
"root_key": {
"sub_key_1": "hello, world!",
"sub_key_2": "goodbye, world!"
}
}
}

outputs:
- name: loki
match: logs
host: loki
remove_keys: hostname,my_map_of_attributes_1,my_map_of_maps_1
label_keys: $level,$logger
labels: service_name=test
structured_metadata: $hostname
structured_metadata_map_keys: $my_map_of_attributes_1,$my_map_of_maps_1['root_key']
line_format: key_value
drop_single_key: on
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# config file version
apiVersion: 1

# list of datasources that should be deleted from the database
deleteDatasources:
- name: Loki
orgId: 1

# list of datasources to insert/update depending
# whats available in the database
datasources:
- name: Loki
type: loki
access: proxy
orgId: 1
url: http://loki:3100
basicAuth: false
isDefault: true
version: 1
editable: false
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
auth_enabled: false

server:
http_listen_port: 3100
grpc_listen_port: 9096

common:
instance_addr: 127.0.0.1
path_prefix: /tmp/loki
storage:
filesystem:
chunks_directory: /tmp/loki/chunks
rules_directory: /tmp/loki/rules
replication_factor: 1
ring:
kvstore:
store: inmemory

query_range:
results_cache:
cache:
embedded_cache:
enabled: true
max_size_mb: 100

schema_config:
configs:
- from: 2020-10-24
store: tsdb
object_store: filesystem
schema: v13
index:
prefix: index_
period: 24h

ruler:
alertmanager_url: http://localhost:9093

# By default, Loki will send anonymous, but uniquely-identifiable usage and configuration
# analytics to Grafana Labs. These statistics are sent to https://stats.grafana.org/
#
# Statistics help us better understand how Loki is used, and they show us performance
# levels for most users. This helps us prioritize features and documentation.
# For more information on what's sent, look at
# https://github.com/grafana/loki/blob/main/pkg/analytics/stats.go
# Refer to the buildReport method to see what goes into a report.
#
# If you would like to disable reporting, uncomment the following lines:
#analytics:
# reporting_enabled: false
0x006EA1E5 marked this conversation as resolved.
Show resolved Hide resolved
limits_config:
allow_structured_metadata: true
volume_enabled: true
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
services:
fluentbit:
# Comment out `image` and uncomment `build` to build the fluent-bit image from local source
image: fluent/fluent-bit:latest
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd probably add a build section as well and comment out one or the other just to show we can either compile or use the latest.

Copy link
Author

@0x006EA1E5 0x006EA1E5 Jan 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

like this?:

    build:
      context: ../../
      dockerfile: dockerfiles/Dockerfile
    pull_policy: build

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need for pull_policy and I'd just leave it commented out once you've tested it - main thing is it allows people to see how to build vs pull

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be working locally now

# build:
# context: ../../
# dockerfile: dockerfiles/Dockerfile
depends_on:
- loki
container_name: fluentbit
command: /fluent-bit/bin/fluent-bit -c /etc/fluent-bit_loki_out-structured_metadata_map.yaml
ports:
- 2021:2021
networks:
- loki-network
volumes:
- ./config/fluent-bit_loki_out-structured_metadata_map.yaml:/etc/fluent-bit_loki_out-structured_metadata_map.yaml
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you mount this to the default location then you can also remove the command override - although that only is true currently if it is the legacy TOML format config.

e.g. this would mean no need to have a command:

      - ./config/fluent-bit_loki_out-structured_metadata_map.conf:/fluent-bit/etc/fluent-bit.conf

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this preferred? I find the yaml easier to read.

I think I read somewhere that you're going to make the yaml config the default? Would this end up diverging between the 3.x branch and 4.x?

Happy to change this though if that's what you want 😄

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this is fine and yeah we will be moving to YAML by default for 4.0.


grafana:
image: grafana/grafana:11.4.0
depends_on:
- loki
- fluentbit
ports:
- 3000:3000
volumes:
- ./config/grafana/provisioning:/etc/grafana/provisioning
networks:
- loki-network
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin

loki:
image: grafana/loki:2.9.2
command: -config.file=/etc/loki/loki-config.yaml
networks:
- loki-network
ports:
- 3100:3100
volumes:
- ./config/loki-config.yaml:/etc/loki/loki-config.yaml

networks:
loki-network:
driver: bridge
147 changes: 142 additions & 5 deletions plugins/out_loki/loki.c
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,13 @@ static void flb_loki_kv_exit(struct flb_loki *ctx)
mk_list_foreach_safe(head, tmp, &ctx->structured_metadata_list) {
kv = mk_list_entry(head, struct flb_loki_kv, _head);

/* unlink and destroy */
mk_list_del(&kv->_head);
flb_loki_kv_destroy(kv);
}
mk_list_foreach_safe(head, tmp, &ctx->structured_metadata_map_keys_list) {
kv = mk_list_entry(head, struct flb_loki_kv, _head);

/* unlink and destroy */
mk_list_del(&kv->_head);
flb_loki_kv_destroy(kv);
Expand Down Expand Up @@ -416,6 +423,93 @@ static void pack_kv(struct flb_loki *ctx,
}
}

/*
* Similar to pack_kv above, except will only use msgpack_objects of type
* MSGPACK_OBJECT_MAP, and will iterate over the keys adding each entry as a
* separate item. Non-string map values are serialised to JSON, as Loki requires
* all values to be strings.
*/
static void pack_maps(struct flb_loki *ctx,
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This essentially tries to follow the flow of the proceeding pack_kv function.

Do you need a more detailed spec of what exactly this is supposed to be doing?

It should implement the proposal descibed in the FR #9463

msgpack_packer *mp_pck,
char *tag, int tag_len,
msgpack_object *map,
struct flb_mp_map_header *mh,
struct mk_list *list)
{
struct mk_list *head;
struct flb_loki_kv *kv;

msgpack_object *start_key;
msgpack_object *out_key;
msgpack_object *out_val;

msgpack_object_map accessed_map;
uint32_t accessed_map_index;
msgpack_object_kv accessed_map_kv;

char *accessed_map_val_json;

mk_list_foreach(head, list) {
/* get the flb_loki_kv for this iteration of the loop */
kv = mk_list_entry(head, struct flb_loki_kv, _head);

/* record accessor key/value pair */
if (kv->ra_key != NULL && kv->ra_val == NULL) {

/* try to get the value for the record accessor */
if (flb_ra_get_kv_pair(kv->ra_key, *map, &start_key, &out_key, &out_val)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flb_ra_* API do not use msgpack-c return codes like MSGPACK_UNPACK_CONTINUE, need to check the return values and adjust it

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit confused as to the meaning of the return code from flb_ra_get_kv_pair. I see from the source it is commented that this should

Returns FLB_TRUE if the pattern matched a kv pair, otherwise it returns FLB_FALSE

Where FLB_TRUE is 1 and FLB_FALSE is 0

However, actually reading the code, if this doesn't hit the FLB_FALSE code path, it actually just returns whatever the following call to flb_ra_key_value_get returns, and function flb_ra_key_value_get actually seems to return 0 for okay, and -1 for not okay..

It looks like we are getting return codes as follows:

  • Valid RA, normal happy path - this will delegate to flb_ra_key_value_get and return 0
  • Syntactically invalid RA key, such as $$ - this will return FLB_FALSE (also 0) via get_ra_parser.
    • This PR then proceeds assuming things are okay, then to check out_val and finally log No valid map data found for key $ (test case flb_test_structured_metadata_map_invalid_ra_key)
  • Syntactically valid RA key, such as $missing_map which doesn't reference anything - this will delegate to flb_ra_key_value_get and return -1 (test case structured_metadata_map_single_missing_map)

It seems like the main option here to to test for not equal to -1, and then check out_val.

Otherwise, we have to change how flb_ra_get_kv_pair works to agree with its comment, which I assume would break some important things elsewhere?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@edsiper Any thoughts on this? I believe checking != -1 is the best we can do here.

!= -1) {

/*
* we require the value to be a map, or it doesn't make sense as
* this is adding a map's key / values
*/
if (out_val->type != MSGPACK_OBJECT_MAP || out_val->via.map.size <= 0) {
flb_plg_debug(ctx->ins, "No valid map data found for key %s",
kv->ra_key->pattern);
}
else {
accessed_map = out_val->via.map;

/* for each entry in the accessed map... */
for (accessed_map_index = 0; accessed_map_index < accessed_map.size;
accessed_map_index++) {

/* get the entry */
accessed_map_kv = accessed_map.ptr[accessed_map_index];

/* Pack the key and value */
flb_mp_map_header_append(mh);

pack_label_key(mp_pck, (char*) accessed_map_kv.key.via.str.ptr,
accessed_map_kv.key.via.str.size);

/* If the value is a string, just pack it... */
if (accessed_map_kv.val.type == MSGPACK_OBJECT_STR) {
msgpack_pack_str_with_body(mp_pck,
accessed_map_kv.val.via.str.ptr,
accessed_map_kv.val.via.str.size);
}
/*
* ...otherwise convert value to JSON string, as Loki always
* requires a string value
*/
else {
accessed_map_val_json = flb_msgpack_to_json_str(1024,
&accessed_map_kv.val);
if (accessed_map_val_json) {
msgpack_pack_str_with_body(mp_pck, accessed_map_val_json,
strlen(accessed_map_val_json));
flb_free(accessed_map_val_json);
}
}
}
}
}
}
}
}

static flb_sds_t pack_structured_metadata(struct flb_loki *ctx,
msgpack_packer *mp_pck,
char *tag, int tag_len,
Expand All @@ -424,7 +518,17 @@ static flb_sds_t pack_structured_metadata(struct flb_loki *ctx,
struct flb_mp_map_header mh;
/* Initialize dynamic map header */
flb_mp_map_header_init(&mh, mp_pck);
pack_kv(ctx, mp_pck, tag, tag_len, map, &mh, &ctx->structured_metadata_list);
if (ctx->structured_metadata_map_keys) {
pack_maps(ctx, mp_pck, tag, tag_len, map, &mh,
&ctx->structured_metadata_map_keys_list);
}
/*
* explicit structured_metadata entries override
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this works as expected.

The intention is, if we try to add an entry to the structured_metadata with the same key twice (first from the new structured_metadata_map_keys, then from the existing structured_metadata), then the second entry should "win" and overrwite the first.

I am assuming msgpack works this way. But maybe I actually need to do some kind of explicit check, and only add a new entry where there is not already one for a given key?

* structured_metadata_map_keys entries
* */
if (ctx->structured_metadata) {
pack_kv(ctx, mp_pck, tag, tag_len, map, &mh, &ctx->structured_metadata_list);
}
flb_mp_map_header_end(&mh);
return 0;
}
Expand Down Expand Up @@ -788,6 +892,7 @@ static int parse_labels(struct flb_loki *ctx)

flb_loki_kv_init(&ctx->labels_list);
flb_loki_kv_init(&ctx->structured_metadata_list);
flb_loki_kv_init(&ctx->structured_metadata_map_keys_list);

if (ctx->structured_metadata) {
ret = parse_kv(ctx, ctx->structured_metadata, &ctx->structured_metadata_list, &ra_used);
Expand All @@ -796,6 +901,28 @@ static int parse_labels(struct flb_loki *ctx)
}
}

/* Append structured metadata map keys set in the configuration */
if (ctx->structured_metadata_map_keys) {
mk_list_foreach(head, ctx->structured_metadata_map_keys) {
entry = mk_list_entry(head, struct flb_slist_entry, _head);
if (entry->str[0] != '$') {
flb_plg_error(ctx->ins,
"invalid structured metadata map key, the name must start "
"with '$'");
return -1;
}

ret = flb_loki_kv_append(ctx, &ctx->structured_metadata_map_keys_list,
entry->str, NULL);
if (ret == -1) {
return -1;
}
else if (ret > 0) {
ra_used++;
}
}
}

if (ctx->labels) {
ret = parse_kv(ctx, ctx->labels, &ctx->labels_list, &ra_used);
if (ret == -1) {
Expand Down Expand Up @@ -971,6 +1098,7 @@ static struct flb_loki *loki_config_create(struct flb_output_instance *ins,
ctx->ins = ins;
flb_loki_kv_init(&ctx->labels_list);
flb_loki_kv_init(&ctx->structured_metadata_list);
flb_loki_kv_init(&ctx->structured_metadata_map_keys_list);

/* Register context with plugin instance */
flb_output_set_context(ins, ctx);
Expand Down Expand Up @@ -1539,12 +1667,13 @@ static flb_sds_t loki_compose_payload(struct flb_loki *ctx,
while ((ret = flb_log_event_decoder_next(
&log_decoder,
&log_event)) == FLB_EVENT_DECODER_SUCCESS) {
msgpack_pack_array(&mp_pck, ctx->structured_metadata ? 3 : 2);
msgpack_pack_array(&mp_pck, ctx->structured_metadata ||
ctx->structured_metadata_map_keys ? 3 : 2);

/* Append the timestamp */
pack_timestamp(&mp_pck, &log_event.timestamp);
pack_record(ctx, &mp_pck, log_event.body, dynamic_tenant_id);
if (ctx->structured_metadata) {
if (ctx->structured_metadata || ctx->structured_metadata_map_keys) {
pack_structured_metadata(ctx, &mp_pck, tag, tag_len, NULL);
}
}
Expand Down Expand Up @@ -1575,12 +1704,13 @@ static flb_sds_t loki_compose_payload(struct flb_loki *ctx,
msgpack_pack_str_body(&mp_pck, "values", 6);
msgpack_pack_array(&mp_pck, 1);

msgpack_pack_array(&mp_pck, ctx->structured_metadata ? 3 : 2);
msgpack_pack_array(&mp_pck, ctx->structured_metadata ||
ctx->structured_metadata_map_keys ? 3 : 2);

/* Append the timestamp */
pack_timestamp(&mp_pck, &log_event.timestamp);
pack_record(ctx, &mp_pck, log_event.body, dynamic_tenant_id);
if (ctx->structured_metadata) {
if (ctx->structured_metadata || ctx->structured_metadata_map_keys) {
pack_structured_metadata(ctx, &mp_pck, tag, tag_len, log_event.body);
}
}
Expand Down Expand Up @@ -1905,6 +2035,13 @@ static struct flb_config_map config_map[] = {
0, FLB_TRUE, offsetof(struct flb_loki, structured_metadata),
"optional structured metadata fields for API requests."
},

{
FLB_CONFIG_MAP_CLIST, "structured_metadata_map_keys", NULL,
0, FLB_TRUE, offsetof(struct flb_loki, structured_metadata_map_keys),
"optional structured metadata fields, as derived dynamically from configured maps "
"keys, for API requests."
},

{
FLB_CONFIG_MAP_BOOL, "auto_kubernetes_labels", "false",
Expand Down
Loading
Loading