Skip to content

Commit

Permalink
filter_kubernetes: allow retrieving kubernetes metadata from CloudWat…
Browse files Browse the repository at this point in the history
…ch Agent

Adds Use_Pod_association option. When running in a kubernetes container, setting this option to On vends an `entity` object with pod metadata from CWA to a cloudwatch_logs output with the new `add_entity` option set to true.

(PR #2 in amazon-contributing/upstream-to-fluent-bit)
  • Loading branch information
zhihonl authored and Swapneil Singh committed Oct 22, 2024
1 parent 6b3b67f commit dca1459
Show file tree
Hide file tree
Showing 49 changed files with 2,762 additions and 19 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@
*~
_book/
lib/jemalloc
cmake-build-debug/
tests/internal/flb_tests_internal.h
tests/runtime/flb_tests_runtime.h
tests/internal/cmake-build-debug/
tests/runtime/cmake-build-debug/
build/*
include/fluent-bit/flb_info.h
include/fluent-bit/flb_plugins.h
Expand Down
3 changes: 3 additions & 0 deletions include/fluent-bit/flb_hash.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ struct flb_hash {
int max_entries;
int total_count;
int cache_ttl;
int force_remove_pointer;
size_t size;
struct mk_list entries;
struct flb_hash_table *table;
Expand All @@ -63,6 +64,8 @@ struct flb_hash {
struct flb_hash *flb_hash_create(int evict_mode, size_t size, int max_entries);
struct flb_hash *flb_hash_create_with_ttl(int cache_ttl, int evict_mode,
size_t size, int max_entries);
struct flb_hash *flb_hash_create_with_ttl_force_destroy(int cache_ttl, int evict_mode,
size_t size, int max_entries);
void flb_hash_destroy(struct flb_hash *ht);

int flb_hash_add(struct flb_hash *ht,
Expand Down
16 changes: 15 additions & 1 deletion plugins/filter_aws/aws.c
Original file line number Diff line number Diff line change
Expand Up @@ -558,14 +558,22 @@ static int cb_aws_filter(const void *data, size_t bytes,
ctx->availability_zone_len);
}

if (ctx->instance_id_include) {
if (ctx->instance_id_include && !ctx->enable_entity) {
msgpack_pack_str(&tmp_pck, FLB_FILTER_AWS_INSTANCE_ID_KEY_LEN);
msgpack_pack_str_body(&tmp_pck,
FLB_FILTER_AWS_INSTANCE_ID_KEY,
FLB_FILTER_AWS_INSTANCE_ID_KEY_LEN);
msgpack_pack_str(&tmp_pck, ctx->instance_id_len);
msgpack_pack_str_body(&tmp_pck,
ctx->instance_id, ctx->instance_id_len);
} else if (ctx->instance_id_include && ctx->enable_entity) {
msgpack_pack_str(&tmp_pck, FLB_FILTER_AWS_ENTITY_INSTANCE_ID_KEY_LEN);
msgpack_pack_str_body(&tmp_pck,
FLB_FILTER_AWS_ENTITY_INSTANCE_ID_KEY,
FLB_FILTER_AWS_ENTITY_INSTANCE_ID_KEY_LEN);
msgpack_pack_str(&tmp_pck, ctx->instance_id_len);
msgpack_pack_str_body(&tmp_pck,
ctx->instance_id, ctx->instance_id_len);
}

if (ctx->instance_type_include) {
Expand Down Expand Up @@ -740,6 +748,12 @@ static struct flb_config_map config_map[] = {
0, FLB_TRUE, offsetof(struct flb_filter_aws, hostname_include),
"Enable EC2 instance hostname"
},
{
FLB_CONFIG_MAP_BOOL, "enable_entity", "false",
0, FLB_TRUE, offsetof(struct flb_filter_aws, enable_entity),
"Enable entity prefix for fields used for constructing entity."
"This currently only affects instance ID"
},
{0}
};

Expand Down
8 changes: 8 additions & 0 deletions plugins/filter_aws/aws.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@
#define FLB_FILTER_AWS_AVAILABILITY_ZONE_KEY_LEN 2
#define FLB_FILTER_AWS_INSTANCE_ID_KEY "ec2_instance_id"
#define FLB_FILTER_AWS_INSTANCE_ID_KEY_LEN 15
#define FLB_FILTER_AWS_ENTITY_INSTANCE_ID_KEY "aws_entity_ec2_instance_id"
#define FLB_FILTER_AWS_ENTITY_INSTANCE_ID_KEY_LEN 26
#define FLB_FILTER_AWS_INSTANCE_TYPE_KEY "ec2_instance_type"
#define FLB_FILTER_AWS_INSTANCE_TYPE_KEY_LEN 17
#define FLB_FILTER_AWS_PRIVATE_IP_KEY "private_ip"
Expand Down Expand Up @@ -111,6 +113,12 @@ struct flb_filter_aws {
size_t hostname_len;
int hostname_include;

/*
* Enable entity prefix appending. This appends
* 'aws_entity' to relevant keys
*/
int enable_entity;

/* number of new keys added by this plugin */
int new_keys;

Expand Down
37 changes: 37 additions & 0 deletions plugins/filter_kubernetes/kube_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,12 @@ struct flb_kube *flb_kube_conf_create(struct flb_filter_instance *ins,
ctx->api_https = FLB_FALSE;
}

if (ctx->use_pod_association) {
ctx->kubernetes_api_host = flb_strdup(FLB_API_HOST);
ctx->kubernetes_api_port = FLB_API_PORT;
}


}
else if (!url) {
ctx->api_host = flb_strdup(FLB_API_HOST);
Expand Down Expand Up @@ -190,6 +196,12 @@ struct flb_kube *flb_kube_conf_create(struct flb_filter_instance *ins,
flb_plg_info(ctx->ins, "https=%i host=%s port=%i",
ctx->api_https, ctx->api_host, ctx->api_port);
}


ctx->pod_hash_table = flb_hash_create_with_ttl_force_destroy(ctx->pod_service_map_ttl,
FLB_HASH_EVICT_OLDER,
FLB_HASH_TABLE_SIZE,
FLB_HASH_TABLE_SIZE);
return ctx;
}

Expand All @@ -203,6 +215,10 @@ void flb_kube_conf_destroy(struct flb_kube *ctx)
flb_hash_destroy(ctx->hash_table);
}

if (ctx->pod_hash_table) {
flb_hash_destroy(ctx->pod_hash_table);
}

if (ctx->merge_log == FLB_TRUE) {
flb_free(ctx->unesc_buf);
}
Expand All @@ -211,6 +227,9 @@ void flb_kube_conf_destroy(struct flb_kube *ctx)
if (ctx->parser == NULL && ctx->regex) {
flb_regex_destroy(ctx->regex);
}
if (ctx->deploymentRegex) {
flb_regex_destroy(ctx->deploymentRegex);
}

flb_free(ctx->api_host);
flb_free(ctx->token);
Expand All @@ -222,6 +241,24 @@ void flb_kube_conf_destroy(struct flb_kube *ctx)
flb_upstream_destroy(ctx->upstream);
}

if(ctx->pod_association_tls) {
flb_tls_destroy(ctx->pod_association_tls);
}

if (ctx->pod_association_upstream) {
flb_upstream_destroy(ctx->pod_association_upstream);
}

if (ctx->kubernetes_upstream) {
flb_upstream_destroy(ctx->kubernetes_upstream);
}
if (ctx->kubernetes_api_host) {
flb_free(ctx->kubernetes_api_host);
}
if (ctx->platform) {
flb_free(ctx->platform);
}

#ifdef FLB_HAVE_TLS
if (ctx->tls) {
flb_tls_destroy(ctx->tls);
Expand Down
72 changes: 72 additions & 0 deletions plugins/filter_kubernetes/kube_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,40 @@
#define FLB_KUBE_TAG_PREFIX "kube.var.log.containers."
#endif

/*
* Maximum attribute length for Entity's KeyAttributes
* values
* https://docs.aws.amazon.com/applicationsignals/latest/APIReference/API_Service.html#:~:text=Maximum%20length%20of%201024.
*/
#define KEY_ATTRIBUTES_MAX_LEN 1024
#define SERVICE_NAME_SOURCE_MAX_LEN 64

/*
* Configmap used for verifying whether if FluentBit is
* on EKS or native Kubernetes
*/
#define KUBE_SYSTEM_NAMESPACE "kube-system"
#define AWS_AUTH_CONFIG_MAP "aws-auth"

/*
* Possible platform values for Kubernetes plugin
*/
#define NATIVE_KUBERNETES_PLATFORM "k8s"
#define EKS_PLATFORM "eks"

struct kube_meta;

struct service_attributes {
char name[KEY_ATTRIBUTES_MAX_LEN];
int name_len;
char environment[KEY_ATTRIBUTES_MAX_LEN];
int environment_len;
char name_source[SERVICE_NAME_SOURCE_MAX_LEN];
int name_source_len;
int fields;

};

/* Filter context */
struct flb_kube {
/* Configuration parameters */
Expand Down Expand Up @@ -119,6 +151,7 @@ struct flb_kube {

/* Regex context to parse records */
struct flb_regex *regex;
struct flb_regex *deploymentRegex;
struct flb_parser *parser;

/* TLS CA certificate file */
Expand Down Expand Up @@ -158,6 +191,45 @@ struct flb_kube {

int kube_meta_cache_ttl;

/* Configuration used for enabling pod to service name mapping*/
int use_pod_association;
char *pod_association_host;
char *pod_association_endpoint;
int pod_association_port;

/*
* TTL is used to check how long should the mapped entry
* remain in the hash table
*/
struct flb_hash *pod_hash_table;
int pod_service_map_ttl;
int pod_service_map_refresh_interval;
flb_sds_t pod_service_preload_cache_path;
struct flb_upstream *pod_association_upstream;
/*
* This connection is used for calling Kubernetes configmaps
* endpoint so pod association can determine the environment.
* Example: EKS or Native Kubernetes.
*/
char *kubernetes_api_host;
int kubernetes_api_port;
struct flb_upstream *kubernetes_upstream;
char *platform;
/*
* This value is used for holding the platform config
* value. Platform will be overriden with this variable
* if it's set
*/
char *set_platform;

//Agent TLS certs
struct flb_tls *pod_association_tls;
char *pod_association_host_server_ca_file;
char *pod_association_host_client_cert_file;
char *pod_association_host_client_key_file;
int pod_association_host_tls_debug;
int pod_association_host_tls_verify;

struct flb_tls *tls;

struct flb_config *config;
Expand Down
Loading

0 comments on commit dca1459

Please sign in to comment.