Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

filter_aws: Adds resource entity to PLE calls in cloudwatch logs plugin for dataplane and host logs #7

Open
wants to merge 13 commits into
base: 1.9.10
Choose a base branch
from

Conversation

nathalapooja
Copy link

@nathalapooja nathalapooja commented Dec 30, 2024

  • Modified aws filter plugin to extract additional resource entity attributes
  • Modified cloudwatch logs output plugin to add resource entity in PLE calls

Enter [N/A] in the box, if an item is not applicable to your change.

Testing
Before we can approve your change; please submit the following in a comment:

  • Example configuration file for the change
        dataplane-log.conf: |
          [INPUT]
            Name                systemd
            Tag                 dataplane.systemd.*
            Systemd_Filter      _SYSTEMD_UNIT=docker.service
            Systemd_Filter      _SYSTEMD_UNIT=containerd.service
            Systemd_Filter      _SYSTEMD_UNIT=kubelet.service
            DB                  /var/fluent-bit/state/systemd.db
            Path                /var/log/journal
            Read_From_Tail      ${READ_FROM_TAIL}

          [INPUT]
            Name                tail
            Tag                 dataplane.tail.*
            Path                /var/log/containers/aws-node*, /var/log/containers/kube-proxy*
            multiline.parser    docker, cri
            DB                  /var/fluent-bit/state/flb_dataplane_tail.db
            Mem_Buf_Limit       50MB
            Skip_Long_Lines     On
            Refresh_Interval    10
            Rotate_Wait         30
            storage.type        filesystem
            Read_from_Head      ${READ_FROM_HEAD}

          [FILTER]
            Name                modify
            Match               dataplane.systemd.*
            Rename              _HOSTNAME                   hostname
            Rename              _SYSTEMD_UNIT               systemd_unit
            Rename              MESSAGE                     message
            Remove_regex        ^((?!hostname|systemd_unit|message).)*$

          [FILTER]
            Name                aws
            Match               dataplane.*
            imds_version        v2
            enable_entity       true
            entity_type         resource

          [OUTPUT]
            Name                cloudwatch_logs
            Match               dataplane.*
            region              ${AWS_REGION}
            log_group_name      /aws/containerinsights/${CLUSTER_NAME}/dataplane
            log_stream_prefix   ${HOST_NAME}-
            auto_create_group   true
            extra_user_agent    container-insights
            entity_type         resource
            add_entity          true
        host-log.conf: |
          [INPUT]
            Name                tail
            Tag                 host.dmesg
            Path                /var/log/dmesg
            Key                 message
            DB                  /var/fluent-bit/state/flb_dmesg.db
            Mem_Buf_Limit       5MB
            Skip_Long_Lines     On
            Refresh_Interval    10
            Read_from_Head      ${READ_FROM_HEAD}

          [INPUT]
            Name                tail
            Tag                 host.messages
            Path                /var/log/messages
            Parser              syslog
            DB                  /var/fluent-bit/state/flb_messages.db
            Mem_Buf_Limit       5MB
            Skip_Long_Lines     On
            Refresh_Interval    10
            Read_from_Head      ${READ_FROM_HEAD}

          [INPUT]
            Name                tail
            Tag                 host.secure
            Path                /var/log/secure
            Parser              syslog
            DB                  /var/fluent-bit/state/flb_secure.db
            Mem_Buf_Limit       5MB
            Skip_Long_Lines     On
            Refresh_Interval    10
            Read_from_Head      ${READ_FROM_HEAD}

          [FILTER]
            Name                aws
            Match               host.*
            imds_version        v2
            enable_entity       true
            entity_type         resource

          [OUTPUT]
            Name                cloudwatch_logs
            Match               host.*
            region              ${AWS_REGION}
            log_group_name      /aws/containerinsights/${CLUSTER_NAME}/host
            log_stream_prefix   ${HOST_NAME}.
            auto_create_group   true
            extra_user_agent    container-insights
            entity_type         resource
            add_entity          true
  • Debug log output from testing the change
    For Dataplane logs on EKS cluster
[2024/12/27 15:49:01] [ info] [output:cloudwatch_logs:cloudwatch_logs.1] entity platform is added eks
[2024/12/27 15:49:01] [ info] [output:cloudwatch_logs:cloudwatch_logs.1] stream->entity->root_filter_count 2
[2024/12/27 15:49:01] [ info] [output:cloudwatch_logs:cloudwatch_logs.1] entity_add_resource_key_attributes is called
[2024/12/27 15:49:01] [ info] [output:cloudwatch_logs:cloudwatch_logs.1] stream entity resource platform eks
[2024/12/27 15:49:01] [ info] [output:cloudwatch_logs:cloudwatch_logs.1] setting platform to eks eks
[2024/12/27 15:49:01] [ info] [output:cloudwatch_logs:cloudwatch_logs.1] entity {"logGroupName":"/aws/containerinsights/compass-ga2/dataplane","logStreamName":"ip-192-168-2-9.ec2.internal-dataplane.systemd.kubelet.service","entity":{"keyAttributes":{"Type":"Resource","ResourceType":"AWS::EKS::Cluster","Identifier":"compass-ga2"}},"logEvents":[
[2024/12/27 15:49:01] [ info] [output:cloudwatch_logs:cloudwatch_logs.1] cloudwatch:PutLogEvents: events=1, payload=900 bytes
[2024/12/27 15:49:01] [ info] [output:cloudwatch_logs:cloudwatch_logs.1] Sending log events to log stream ip-192-168-2-9.ec2.internal-dataplane.systemd.kubelet.service
[2024/12/27 15:49:01] [ info] [output:cloudwatch_logs:cloudwatch_logs.1] data buf {"logGroupName":"/aws/containerinsights/compass-ga2/dataplane","logStreamName":"ip-192-168-2-9.ec2.internal-dataplane.systemd.kubelet.service","entity":{"keyAttributes":{"Type":"Resource","ResourceType":"AWS::EKS::Cluster","Identifier":"compass-ga2"}},"logEvents":[{"timestamp":1735314537930,"message":"{\"systemd_unit\":\"kubelet.service\",\"hostname\":\"ip-192-168-2-9.ec2.internal\",\"message\":\"E1227 15:48:57.930646    2487 pod_workers.go:965] \\\"Error syncing pod, skipping\\\" err=\\\"failed to \\\\\\\"StartContainer\\\\\\\" for \\\\\\\"otc-container\\\\\\\" with ImagePullBackOff: \\\\\\\"Back-off pulling image \\\\\\\\\\\\\\\"public.ecr.aws/cloudwatch-agent/cloudwatch-agent:1.300051.0b992\\\\\\\\\\\\\\\"\\\\\\\"\\\" pod=\\\"amazon-cloudwatch/cloudwatch-agent-9sgrn\\\" podUID=884d1655-a56a-4201-a5c5-a2934b4eee57\",\"az\":\"us-east-1d\",\"ec2_instance_id\":\"i-0cc95093249392a3f\"}"}]}
[2024/12/27 15:49:01] [ info] [output:cloudwatch_logs:cloudwatch_logs.1] PutLogEvents http status=200
Screenshot 2024-12-27 at 10 52 43 AM

For Host logs in EKS cluster

[2024/12/27 15:54:39] [ info] [output:cloudwatch_logs:cloudwatch_logs.2] stream->entity->root_filter_count 2
[2024/12/27 15:54:39] [ info] [output:cloudwatch_logs:cloudwatch_logs.1] entity platform is added eks
[2024/12/27 15:54:39] [ info] [output:cloudwatch_logs:cloudwatch_logs.2] entity cluster name is added compass-ga2
[2024/12/27 15:54:39] [ info] [output:cloudwatch_logs:cloudwatch_logs.2] entity platform is added eks
[2024/12/27 15:54:39] [ info] [output:cloudwatch_logs:cloudwatch_logs.2] stream->entity->root_filter_count 2
[2024/12/27 15:54:39] [ info] [output:cloudwatch_logs:cloudwatch_logs.1] stream->entity->root_filter_count 2
[2024/12/27 15:54:39] [ info] [output:cloudwatch_logs:cloudwatch_logs.2] entity_add_resource_key_attributes is called
[2024/12/27 15:54:39] [ info] [output:cloudwatch_logs:cloudwatch_logs.2] stream entity resource platform eks
[2024/12/27 15:54:39] [ info] [output:cloudwatch_logs:cloudwatch_logs.2] setting platform to eks eks
[2024/12/27 15:54:39] [ info] [output:cloudwatch_logs:cloudwatch_logs.2] entity {"logGroupName":"/aws/containerinsights/compass-ga2/host","logStreamName":"ip-192-168-2-9.ec2.internal.host.messages","entity":{"keyAttributes":{"Type":"Resource","ResourceType":"AWS::EKS::Cluster","Identifier":"compass-ga2"}},"logEvents":[
[2024/12/27 15:54:39] [ info] [output:cloudwatch_logs:cloudwatch_logs.2] cloudwatch:PutLogEvents: events=2, payload=1206 bytes
[2024/12/27 15:54:39] [ info] [output:cloudwatch_logs:cloudwatch_logs.2] Sending log events to log stream ip-192-168-2-9.ec2.internal.host.messages
[2024/12/27 15:54:39] [ info] [output:cloudwatch_logs:cloudwatch_logs.2] data buf {"logGroupName":"/aws/containerinsights/compass-ga2/host","logStreamName":"ip-192-168-2-9.ec2.internal.host.messages","entity":{"keyAttributes":{"Type":"Resource","ResourceType":"AWS::EKS::Cluster","Identifier":"compass-ga2"}},"logEvents":[{"timestamp":1735314876000,"message":"{\"host\":\"ip-192-168-2-9\",\"ident\":\"kubelet\",\"message\":\"I1227 15:54:36.929262    2487 scope.go:115] \\\"RemoveContainer\\\" containerID=\\\"c2a79d41c2ef5f4dced735138985de3ddf07a9f2c70bd9f4bf80ef4aad5118fd\\\"\",\"az\":\"us-east-1d\",\"ec2_instance_id\":\"i-0cc95093249392a3f\"}"},{"timestamp":1735314876000,"message":"{\"host\":\"ip-192-168-2-9\",\"ident\":\"kubelet\",\"message\":\"E1227 15:54:36.929677    2487 pod_workers.go:965] \\\"Error syncing pod, skipping\\\" err=\\\"failed to \\\\\\\"StartContainer\\\\\\\" for \\\\\\\"aws-guardduty-agent\\\\\\\" with CrashLoopBackOff: \\\\\\\"back-off 5m0s restarting failed container=aws-guardduty-agent pod=aws-guardduty-agent-d8dlq_amazon-guardduty(9516d6bb-3ea3-4ae1-9a00-b602a0ba0ead)\\\\\\\"\\\" pod=\\\"amazon-guardduty/aws-guardduty-agent-d8dlq\\\" podUID=9516d6bb-3ea3-4ae1-9a00-b602a0ba0ead\",\"az\":\"us-east-1d\",\"ec2_instance_id\":\"i-0cc95093249392a3f\"}"}]}
[2024/12/27 15:54:39] [ info] [output:cloudwatch_logs:cloudwatch_logs.1] entity cluster name is added compass-ga2
Screenshot 2024-12-27 at 10 58 28 AM
  • Attached Valgrind output that shows no leaks or memory corruption was found
    For cloudwatch logs output plugin: flb-rt-out_cloudwatch
SUCCESS: All unit tests have passed.
==1052== 
==1052== HEAP SUMMARY:
==1052==     in use at exit: 0 bytes in 0 blocks
==1052==   total heap usage: 2 allocs, 2 frees, 1,168 bytes allocated
==1052== 
==1052== All heap blocks were freed -- no leaks are possible
==1052== 
==1052== For lists of detected and suppressed errors, rerun with: -s
==1052== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0)

If this is a change to packaging of containers or native binaries then please confirm it works for all targets.

  • Run local packaging test showing all targets (including any new ones) build.
  • Set ok-package-test label to test for all targets (requires maintainer to do).

Documentation

  • Documentation required for this feature

Backporting

  • Backport to latest stable release.

Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.

@nathalapooja nathalapooja changed the title adds resource entity to PLE calls in cloudwatch logs plugin for dataplane and host logs Adds resource entity to PLE calls in cloudwatch logs plugin for dataplane and host logs Dec 30, 2024
@nathalapooja nathalapooja changed the title Adds resource entity to PLE calls in cloudwatch logs plugin for dataplane and host logs filter_aws: Adds resource entity to PLE calls in cloudwatch logs plugin for dataplane and host logs Dec 30, 2024
@zhihonl
Copy link

zhihonl commented Jan 7, 2025

In the entity, Type should be AWS::Resource instead of Resource for AWS resources like EKS

#define AWS_AUTH_CONFIG_MAP "aws-auth"

/* Kubernetes API server info */
#define FLB_API_HOST "kubernetes.default.svc"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: These constants are already defined in kube_conf.h, would it be cleaner to re-use them instead?

}

/* Create an Upstream context */
ctx->kubernetes_upstream = flb_upstream_create(config,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any scenario where we don't need TLS to communicate with Kubernetes API server? I see the original code uses TCP instead of TLS connection as default so just curious:

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Though the default is TCP, in the code, we have the below definitions for k8s API server info
/* Kubernetes API server info */
#define FLB_API_HOST "kubernetes.default.svc"
#define FLB_API_PORT 443
#define FLB_API_TLS FLB_TRUE

FLB_API_TLS is true in this case, for which we used the FLB_IO_TLS flag in creating the upstream

if(ctx->cluster == NULL) {
char* cluster_name = getenv("CLUSTER_NAME");
if(cluster_name) {
ctx->cluster = strdup(cluster_name);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
ctx->cluster = strdup(cluster_name);
ctx->cluster = flb_strdup(cluster_name);

Both code are doing same thing, just trivial code schematics suggestions to follow FluentBit patterns.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will update in revision

if (ret == -1) {
flb_plg_warn(ctx->ins, "cannot open %s", FLB_KUBE_TOKEN);
}
flb_plg_info(ctx->ins, " token updated");
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this log line is vague. Could just follow the original log line pasted from kube_meta.c

Suggested change
flb_plg_info(ctx->ins, " token updated");
flb_plg_info(ctx->ins, " token updated", FLB_KUBE_TOKEN);

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will update in revision

}

/* Gather metadata from HTTP Request,
* this could send out HTTP Request either to KUBE Server API or Kubelet
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct me if I'm wrong: based on the code so far, I think we are only making call to Kubernetes Server API not Kubelet right?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes just the k8s server API, updated the comment to reflect

static void get_platform(struct flb_filter_aws *ctx)
{
if (ctx->platform == NULL) {
char *config_buf = NULL;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be freed at the end of function?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. updated in revision

}
ctx->cluster = NULL;
ctx->platform = NULL;
if (strncmp(ctx->entity_type , FLB_FILTER_ENTITY_TYPE_RESOURCE, FLB_FILTER_ENTITY_TYPE_RESOURCE_LEN) == 0) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking of edge case here: If IMDS retrieval was unsuccessful, this function exits early so we never end up making calls for the below functions. Is this understanding correct? if so we may have to move these Kubernetes specific logics to its own function and make a single call in the init function so it's not dependent on IMDS

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMDS retrieval only happens when the enable entity is true and entity type is service. the logic remains the same as before.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed with @nathalapooja that this is fine because we need IMDS to retrieve account ID which is required for sending entity anyway.

@@ -673,6 +969,26 @@ static int cb_aws_filter(const void *data, size_t bytes,
msgpack_pack_str_body(&tmp_pck,
ctx->account_id, ctx->account_id_len);
}

if (ctx->enable_entity && ctx->cluster != NULL && ctx->platform != NULL && strncmp(ctx->entity_type , FLB_FILTER_ENTITY_TYPE_RESOURCE, FLB_FILTER_ENTITY_TYPE_RESOURCE_LEN) == 0 ) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What this is doing now is we only attach account ID when entity type is service, but we want to send account ID when entity type is resource as well.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will update in revision

goto error;
}
if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,
"\"Type\":\"Resource\"",0)) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed with @nathalapooja but we need to distinguish between AWS::Resource and Resource

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will update in revision

ret = entity_add_key_attributes(ctx,buf,stream,offset);
if (ret < 0) {
flb_plg_error(ctx->ins, "Failed to initialize Entity KeyAttributes");
"\"entity\":{", 10)) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think adding entity to the HTTP payload here can cause an issue when entity type is service and service name or account ID is not present. Because the HTTP payload would just end up being "entity:{" but with nothing filled inside therefore the entire PLE call would be corrupted.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will update in revision

stream->entity->attributes = flb_malloc(sizeof(entity_attributes));
if (stream->entity->attributes == NULL) {
return;
if (strncmp(ctx->entity_type , FLB_FILTER_ENTITY_TYPE_SERVICE, FLB_FILTER_ENTITY_TYPE_SERVICE_LEN) == 0) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh nice, this is saving some memory in case entity type is resource? since resource entity don't have attributes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants