From 501c63a32687f808bff555b55449b57b648d298d Mon Sep 17 00:00:00 2001 From: Phillip Whelan Date: Mon, 25 Sep 2023 17:02:22 -0300 Subject: [PATCH 1/7] in_calyptia_fleet: add functions to better handle the different configuration versions. Signed-off-by: Phillip Whelan --- plugins/in_calyptia_fleet/in_calyptia_fleet.c | 105 +++++++++++------- 1 file changed, 66 insertions(+), 39 deletions(-) diff --git a/plugins/in_calyptia_fleet/in_calyptia_fleet.c b/plugins/in_calyptia_fleet/in_calyptia_fleet.c index c3b729ce8f1..43777638c62 100644 --- a/plugins/in_calyptia_fleet/in_calyptia_fleet.c +++ b/plugins/in_calyptia_fleet/in_calyptia_fleet.c @@ -966,55 +966,32 @@ static int get_calyptia_fleet_id_by_name(struct flb_in_calyptia_fleet_config *ct } #ifdef FLB_SYSTEM_WINDOWS -#define _mkdir(a, b) mkdir(a) -#else -#define _mkdir(a, b) mkdir(a, b) -#endif +#define link(a, b) CreateHardLinkA(b, a, 0) -/* recursively create directories, based on: - * https://stackoverflow.com/a/2336245 - * who found it at: - * http://nion.modprobe.de/blog/archives/357-Recursive-directory-creation.html - */ -static int __mkdir(const char *dir, int perms) { - char tmp[255]; - char *ptr = NULL; - size_t len; - int ret; +ssize_t readlink(const char *path, char *realpath, size_t srealpath) { + HANDLE hFile; + DWORD ret; - ret = snprintf(tmp, sizeof(tmp),"%s",dir); - if (ret > sizeof(tmp)) { - flb_error("directory too long for __mkdir: %s", dir); + hFile = CreateFile(path, GENERIC_READ, FILE_SHARE_READ, NULL, OPEN_EXISTING, + FILE_ATTRIBUTE_NORMAL, NULL); + + if (hFile == INVALID_HANDLE_VALUE) { return -1; } - len = strlen(tmp); - - if (tmp[len - 1] == PATH_SEPARATOR[0]) { - tmp[len - 1] = 0; - } + ret = GetFinalPathNameByHandleA(hFile, realpath, srealpath, VOLUME_NAME_NT); -#ifndef FLB_SYSTEM_WINDOWS - for (ptr = tmp + 1; *ptr; ptr++) { -#else - for (ptr = tmp + 3; *ptr; ptr++) { -#endif - - if (*ptr == PATH_SEPARATOR[0]) { - *ptr = 0; - if (access(tmp, F_OK) != 0) { - ret = _mkdir(tmp, perms); - if (ret != 0) { - return ret; - } - } - *ptr = PATH_SEPARATOR[0]; - } + if (ret < srealpath) { + CloseHandle(hFile); + return -1; } - return _mkdir(tmp, perms); + CloseHandle(hFile); + return ret; } +#endif + static int get_calyptia_file(struct flb_in_calyptia_fleet_config *ctx, struct flb_connection *u_conn, flb_sds_t url, @@ -1102,6 +1079,56 @@ static int get_calyptia_file(struct flb_in_calyptia_fleet_config *ctx, return ret; } +#ifdef FLB_SYSTEM_WINDOWS +#define _mkdir(a, b) mkdir(a) +#else +#define _mkdir(a, b) mkdir(a, b) +#endif + +/* recursively create directories, based on: + * https://stackoverflow.com/a/2336245 + * who found it at: + * http://nion.modprobe.de/blog/archives/357-Recursive-directory-creation.html + */ +static int __mkdir(const char *dir, int perms) { + char tmp[255]; + char *ptr = NULL; + size_t len; + int ret; + + ret = snprintf(tmp, sizeof(tmp),"%s",dir); + if (ret > sizeof(tmp)) { + flb_error("directory too long for __mkdir: %s", dir); + return -1; + } + + len = strlen(tmp); + + if (tmp[len - 1] == PATH_SEPARATOR[0]) { + tmp[len - 1] = 0; + } + +#ifndef FLB_SYSTEM_WINDOWS + for (ptr = tmp + 1; *ptr; ptr++) { +#else + for (ptr = tmp + 3; *ptr; ptr++) { +#endif + + if (*ptr == PATH_SEPARATOR[0]) { + *ptr = 0; + if (access(tmp, F_OK) != 0) { + ret = _mkdir(tmp, perms); + if (ret != 0) { + return ret; + } + } + *ptr = PATH_SEPARATOR[0]; + } + } + + return _mkdir(tmp, perms); +} + #ifndef _WIN32 static struct cfl_array *read_glob(const char *path) { From 418899f783730f3f98cd8335f788e526e3e71ba8 Mon Sep 17 00:00:00 2001 From: Phillip Whelan Date: Fri, 6 Oct 2023 15:41:42 -0300 Subject: [PATCH 2/7] fleet: fix return value handling for fwrite. Signed-off-by: Phillip Whelan From 54bad8d7b5724af73b40b7ea5ec50b27fc954be9 Mon Sep 17 00:00:00 2001 From: Phillip Whelan Date: Wed, 11 Oct 2023 16:41:59 -0300 Subject: [PATCH 3/7] in_calyptia_fleet: download and load fleet files. Download fleet files to a sub-directory with the timestamp of the relevant configuration and allow it to be loaded by changing to that directory when fluent-bit is reloaded. * Recurse through fleet files and download each one. * Change to the fleet files directory on reload so they can be referred to from the main fleet configuration file. * Delete old fleet file directories when the configuration is deleted. Signed-off-by: Phillip Whelan --- plugins/in_calyptia_fleet/in_calyptia_fleet.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/plugins/in_calyptia_fleet/in_calyptia_fleet.c b/plugins/in_calyptia_fleet/in_calyptia_fleet.c index 43777638c62..d9db1f9dc09 100644 --- a/plugins/in_calyptia_fleet/in_calyptia_fleet.c +++ b/plugins/in_calyptia_fleet/in_calyptia_fleet.c @@ -648,6 +648,8 @@ static int execute_reload(struct flb_in_calyptia_fleet_config *ctx, flb_sds_t cf flb_plg_error(ctx->ins, "unable to change to configuration directory"); } + fleet_cur_chdir(ctx); + pthread_attr_init(&ptha); pthread_attr_setdetachstate(&ptha, PTHREAD_CREATE_DETACHED); pthread_create(&pth, &ptha, do_reload, reload); @@ -967,6 +969,7 @@ static int get_calyptia_fleet_id_by_name(struct flb_in_calyptia_fleet_config *ct #ifdef FLB_SYSTEM_WINDOWS #define link(a, b) CreateHardLinkA(b, a, 0) +#define symlink(a, b) CreateSymLinkA(b, a, 0) ssize_t readlink(const char *path, char *realpath, size_t srealpath) { HANDLE hFile; From 11e8dd176ed76955fda2e80c4441373155fc8e17 Mon Sep 17 00:00:00 2001 From: Phillip Whelan Date: Thu, 26 Oct 2023 11:38:00 -0300 Subject: [PATCH 4/7] in_calyptia_fleet: improve config_commit/add/rollback. Signed-off-by: Phillip Whelan --- plugins/in_calyptia_fleet/in_calyptia_fleet.c | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/plugins/in_calyptia_fleet/in_calyptia_fleet.c b/plugins/in_calyptia_fleet/in_calyptia_fleet.c index d9db1f9dc09..f24be816d07 100644 --- a/plugins/in_calyptia_fleet/in_calyptia_fleet.c +++ b/plugins/in_calyptia_fleet/in_calyptia_fleet.c @@ -1376,7 +1376,15 @@ static int calyptia_config_delete_old_dir(const char *cfgpath) rmdir(cfg_glob); } + /* attempt to delete the main directory */ + ext = strrchr(cfg_glob, '/'); + if (ext) { + *ext = '\0'; + rmdir(cfg_glob); + } + flb_sds_destroy(cfg_glob); + cfl_array_destroy(files); return FLB_TRUE; } From 68c89c94b2d5602d30dbfdf6f43086b7857c1d66 Mon Sep 17 00:00:00 2001 From: Phillip Whelan Date: Thu, 26 Oct 2023 11:38:46 -0300 Subject: [PATCH 5/7] in_calyptia_fleet: finish read_glob for files on windows. Signed-off-by: Phillip Whelan --- plugins/in_calyptia_fleet/in_calyptia_fleet.c | 31 +------------------ 1 file changed, 1 insertion(+), 30 deletions(-) diff --git a/plugins/in_calyptia_fleet/in_calyptia_fleet.c b/plugins/in_calyptia_fleet/in_calyptia_fleet.c index f24be816d07..41bd0da1285 100644 --- a/plugins/in_calyptia_fleet/in_calyptia_fleet.c +++ b/plugins/in_calyptia_fleet/in_calyptia_fleet.c @@ -609,7 +609,7 @@ static int execute_reload(struct flb_in_calyptia_fleet_config *ctx, flb_sds_t cf reload = flb_calloc(1, sizeof(struct reload_ctx)); reload->flb = flb; - reload->cfg_path = cfgpath; + reload->cfg_path = flb_sds_create(cfgpath); if (ctx->collect_fd > 0) { flb_input_collector_pause(ctx->collect_fd, ctx->ins); @@ -967,34 +967,6 @@ static int get_calyptia_fleet_id_by_name(struct flb_in_calyptia_fleet_config *ct return 0; } -#ifdef FLB_SYSTEM_WINDOWS -#define link(a, b) CreateHardLinkA(b, a, 0) -#define symlink(a, b) CreateSymLinkA(b, a, 0) - -ssize_t readlink(const char *path, char *realpath, size_t srealpath) { - HANDLE hFile; - DWORD ret; - - hFile = CreateFile(path, GENERIC_READ, FILE_SHARE_READ, NULL, OPEN_EXISTING, - FILE_ATTRIBUTE_NORMAL, NULL); - - if (hFile == INVALID_HANDLE_VALUE) { - return -1; - } - - ret = GetFinalPathNameByHandleA(hFile, realpath, srealpath, VOLUME_NAME_NT); - - if (ret < srealpath) { - CloseHandle(hFile); - return -1; - } - - CloseHandle(hFile); - return ret; -} - -#endif - static int get_calyptia_file(struct flb_in_calyptia_fleet_config *ctx, struct flb_connection *u_conn, flb_sds_t url, @@ -1727,7 +1699,6 @@ static int get_calyptia_fleet_config(struct flb_in_calyptia_fleet_config *ctx, } #ifndef FLB_SYSTEM_WINDOWS - flb_sds_destroy(cfgname); cfgnewname = new_fleet_config_filename(ctx); if (execute_reload(ctx, cfgnewname) == FLB_FALSE) { calyptia_config_rollback(ctx, cfgname); From 3550dd59ff61fc51e2acd5b2f9f417b8174a2270 Mon Sep 17 00:00:00 2001 From: Phillip Whelan Date: Mon, 6 Nov 2023 12:36:22 -0300 Subject: [PATCH 6/7] in_calyptia_fleet: fix memory leak from configuration file name. Signed-off-by: Phillip Whelan --- plugins/in_calyptia_fleet/in_calyptia_fleet.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/in_calyptia_fleet/in_calyptia_fleet.c b/plugins/in_calyptia_fleet/in_calyptia_fleet.c index 41bd0da1285..72bae1f67fe 100644 --- a/plugins/in_calyptia_fleet/in_calyptia_fleet.c +++ b/plugins/in_calyptia_fleet/in_calyptia_fleet.c @@ -609,7 +609,7 @@ static int execute_reload(struct flb_in_calyptia_fleet_config *ctx, flb_sds_t cf reload = flb_calloc(1, sizeof(struct reload_ctx)); reload->flb = flb; - reload->cfg_path = flb_sds_create(cfgpath); + reload->cfg_path = cfgpath; if (ctx->collect_fd > 0) { flb_input_collector_pause(ctx->collect_fd, ctx->ins); From e617c7b7688a20076f9f7caf9abd4d80c023cba2 Mon Sep 17 00:00:00 2001 From: Phillip Whelan Date: Mon, 6 Nov 2023 17:09:56 -0300 Subject: [PATCH 7/7] in_calyptia_fleet: use a single header file to store the base configuration. Use a header.conf file that is refreshed on each invocation of fluent-bit to track the main configuration for the fleet agent. This file is then included by the downloade fleet configuration files. Signed-off-by: Phillip Whelan --- plugins/in_calyptia_fleet/in_calyptia_fleet.c | 236 ++++++++++++------ 1 file changed, 156 insertions(+), 80 deletions(-) diff --git a/plugins/in_calyptia_fleet/in_calyptia_fleet.c b/plugins/in_calyptia_fleet/in_calyptia_fleet.c index 72bae1f67fe..d3a16d4f2d8 100644 --- a/plugins/in_calyptia_fleet/in_calyptia_fleet.c +++ b/plugins/in_calyptia_fleet/in_calyptia_fleet.c @@ -26,6 +26,7 @@ #include #include #include +#include #include #include #include @@ -34,6 +35,7 @@ #include #include #include +#include #include #include @@ -365,21 +367,17 @@ static int is_old_fleet_config(struct flb_in_calyptia_fleet_config *ctx, struct return ret; } -static int is_timestamped_fleet_config(struct flb_in_calyptia_fleet_config *ctx, struct flb_config *cfg) +static int is_timestamped_fleet_config_path(struct flb_in_calyptia_fleet_config *ctx, const char *path) { char *fname; char *end; long val; - if (cfg == NULL) { + if (path == NULL) { return FLB_FALSE; } - if (cfg->conf_path_file == NULL) { - return FLB_FALSE; - } - - fname = strrchr(cfg->conf_path_file, PATH_SEPARATOR[0]); + fname = strrchr(path, PATH_SEPARATOR[0]); if (fname == NULL) { return FLB_FALSE; @@ -403,6 +401,19 @@ static int is_timestamped_fleet_config(struct flb_in_calyptia_fleet_config *ctx, return FLB_FALSE; } +static int is_timestamped_fleet_config(struct flb_in_calyptia_fleet_config *ctx, struct flb_config *cfg) +{ + if (cfg == NULL) { + return FLB_FALSE; + } + + if (cfg->conf_path_file == NULL) { + return FLB_FALSE; + } + + return is_timestamped_fleet_config_path(ctx, cfg->conf_path_file); +} + static int is_fleet_config(struct flb_in_calyptia_fleet_config *ctx, struct flb_config *cfg) { if (cfg == NULL) { @@ -657,15 +668,6 @@ static int execute_reload(struct flb_in_calyptia_fleet_config *ctx, flb_sds_t cf return FLB_TRUE; } -static char *tls_setting_string(int use_tls) -{ - if (use_tls) { - return "On"; - } - - return "Off"; -} - static msgpack_object *msgpack_lookup_map_key(msgpack_object *obj, const char *keyname) { int idx; @@ -1363,7 +1365,8 @@ static int calyptia_config_delete_old_dir(const char *cfgpath) static int calyptia_config_delete_old(struct flb_in_calyptia_fleet_config *ctx) { - struct cfl_array *inis; + struct cfl_array *confs; + struct cfl_array *tconfs; flb_sds_t glob_files = NULL; int idx; @@ -1381,22 +1384,42 @@ static int calyptia_config_delete_old(struct flb_in_calyptia_fleet_config *ctx) return -1; } - inis = read_glob(glob_files); - if (inis == NULL) { + confs = read_glob(glob_files); + if (confs == NULL) { flb_sds_destroy(glob_files); return -1; } - qsort(inis->entries, inis->entry_count, + tconfs = cfl_array_create(1); + if (tconfs == NULL) { + flb_sds_destroy(glob_files); + cfl_array_destroy(confs); + return -1; + } + + if (cfl_array_resizable(tconfs, FLB_TRUE) != 0) { + flb_sds_destroy(glob_files); + cfl_array_destroy(confs); + return -1; + } + + for (idx = 0; idx < confs->entry_count; idx++) { + if (is_timestamped_fleet_config_path(ctx, confs->entries[idx]->data.as_string) == FLB_TRUE) { + cfl_array_append_string(tconfs, confs->entries[idx]->data.as_string); + } + } + + qsort(tconfs->entries, tconfs->entry_count, sizeof(struct cfl_variant *), cfl_array_qsort_conf_files); - for (idx = 0; idx < (((ssize_t)inis->entry_count) -1 - 3); idx++) { - unlink(inis->entries[idx]->data.as_string); - calyptia_config_delete_old_dir(inis->entries[idx]->data.as_string); + for (idx = 0; idx < (((ssize_t)tconfs->entry_count) -3); idx++) { + unlink(tconfs->entries[idx]->data.as_string); + calyptia_config_delete_old_dir(tconfs->entries[idx]->data.as_string); } - cfl_array_destroy(inis); + cfl_array_destroy(confs); + cfl_array_destroy(tconfs); flb_sds_destroy(glob_files); return 0; @@ -1407,6 +1430,8 @@ static flb_sds_t calyptia_config_get_newest(struct flb_in_calyptia_fleet_config struct cfl_array *inis; flb_sds_t glob_conf_files = NULL; flb_sds_t cfgnewname = NULL; + const char *curconf; + int idx; if (ctx == NULL) { return NULL; @@ -1436,8 +1461,13 @@ static flb_sds_t calyptia_config_get_newest(struct flb_in_calyptia_fleet_config sizeof(struct cfl_variant *), cfl_array_qsort_conf_files); - cfgnewname = flb_sds_create_len(inis->entries[inis->entry_count-1]->data.as_string, - strlen(inis->entries[inis->entry_count-1]->data.as_string)); + for (idx = inis->entry_count-1; idx >= 0; idx--) { + curconf = inis->entries[idx]->data.as_string; + if (is_timestamped_fleet_config_path(ctx, curconf)) { + cfgnewname = flb_sds_create(curconf); + break; + } + } cfl_array_destroy(inis); flb_sds_destroy(glob_conf_files); @@ -1521,12 +1551,6 @@ static int calyptia_config_commit(struct flb_in_calyptia_fleet_config *ctx) unlink(cfgoldname); } - if (exists_cur_fleet_config(ctx) == FLB_TRUE) { - if (rename(cfgcurname, cfgoldname)) { - goto error; - } - } - if (exists_new_fleet_config(ctx) == FLB_TRUE) { if (rename(cfgnewname, cfgcurname)) { goto error; @@ -1614,12 +1638,102 @@ static int calyptia_config_rollback(struct flb_in_calyptia_fleet_config *ctx, } #endif +static void fleet_config_get_properties(flb_sds_t *buf, struct mk_list *props) +{ + struct mk_list *head; + struct flb_kv *kv; + + mk_list_foreach(head, props) { + kv = mk_list_entry(head, struct flb_kv, _head); + + if (kv->key != NULL && kv->val != NULL) { + flb_sds_printf(buf, " %s ", kv->key); + flb_sds_cat_safe(buf, kv->val, strlen(kv->val)); + flb_sds_cat_safe(buf, "\n", 1); + } + } +} + +flb_sds_t fleet_config_get(struct flb_in_calyptia_fleet_config *ctx) +{ + flb_sds_t buf; + struct mk_list *head; + struct flb_custom_instance *c_ins; + flb_ctx_t *flb = flb_context_get(); + + + buf = flb_sds_create_size(2048); + + if (!buf) { + return NULL; + } + + /* [INPUT] */ + mk_list_foreach(head, &flb->config->customs) { + c_ins = mk_list_entry(head, struct flb_custom_instance, _head); + if (strcasecmp(c_ins->p->name, "calyptia")) { + continue; + } + flb_sds_printf(&buf, "[CUSTOM]\n"); + flb_sds_printf(&buf, " name %s\n", c_ins->p->name); + + fleet_config_get_properties(&buf, &c_ins->properties); + + if (ctx->fleet_id && flb_config_prop_get("fleet_id", &c_ins->properties) == NULL) { + flb_sds_printf(&buf, " fleet_id %s\n", ctx->fleet_id); + } + } + flb_sds_printf(&buf, "\n"); + + return buf; +} + +static int create_fleet_header(struct flb_in_calyptia_fleet_config *ctx) +{ + flb_sds_t hdrname; + FILE *fp; + flb_sds_t header; + int rc = FLB_FALSE; + + + hdrname = fleet_config_filename(ctx, "header"); + if (hdrname == NULL) { + goto hdrname_error; + } + + header = fleet_config_get(ctx); + if (header == NULL) { + goto header_error; + } + + fp = fopen(hdrname, "w+"); + if (fp == NULL) { + goto file_open_error; + } + + if (fwrite(header, strlen(header), 1, fp) < 1) { + goto file_error; + } + + rc = FLB_TRUE; + +file_error: + fclose(fp); +file_open_error: + flb_sds_destroy(header); +header_error: + flb_sds_destroy(hdrname); +hdrname_error: + return rc; +} + static int get_calyptia_fleet_config(struct flb_in_calyptia_fleet_config *ctx, struct flb_connection *u_conn) { flb_sds_t cfgname; flb_sds_t cfgnewname; flb_sds_t header; + flb_sds_t hdrname; time_t time_last_modified; int ret = -1; @@ -1633,54 +1747,11 @@ static int get_calyptia_fleet_config(struct flb_in_calyptia_fleet_config *ctx, flb_sds_printf(&ctx->fleet_files_url, "/v1/fleets/%s/files", ctx->fleet_id); } - header = flb_sds_create_size(4096); - - if (ctx->fleet_name == NULL) { - flb_sds_printf(&header, - "[CUSTOM]\n" - " Name calyptia\n" - " api_key %s\n" - " fleet_id %s\n" - " add_label fleet_id %s\n" - " machine_id %s\n" - " fleet.config_dir %s\n" - " calyptia_host %s\n" - " calyptia_port %d\n" - " calyptia_tls %s\n", - ctx->api_key, - ctx->fleet_id, - ctx->fleet_id, - ctx->machine_id, - ctx->config_dir, - ctx->ins->host.name, - ctx->ins->host.port, - tls_setting_string(ctx->ins->use_tls) - ); - } - else { - flb_sds_printf(&header, - "[CUSTOM]\n" - " Name calyptia\n" - " api_key %s\n" - " fleet_name %s\n" - " fleet_id %s\n" - " add_label fleet_id %s\n" - " machine_id %s\n" - " fleet.config_dir %s\n" - " calyptia_host %s\n" - " calyptia_port %d\n" - " calyptia_tls %s\n", - ctx->api_key, - ctx->fleet_name, - ctx->fleet_id, - ctx->fleet_id, - ctx->machine_id, - ctx->config_dir, - ctx->ins->host.name, - ctx->ins->host.port, - tls_setting_string(ctx->ins->use_tls) - ); - } + create_fleet_header(ctx); + + hdrname = fleet_config_filename(ctx, "header"); + header = flb_sds_create_size(32); + flb_sds_printf(&header, "@include %s\n\n", hdrname); /* create the base file. */ ret = get_calyptia_file(ctx, u_conn, ctx->fleet_url, header, @@ -2121,6 +2192,11 @@ static int in_calyptia_fleet_init(struct flb_input_instance *in, /* Set the context */ flb_input_set_context(in, ctx); + /* refresh calyptia settings before attempting to load the fleet + * configuration file. + */ + create_fleet_header(ctx); + /* if we load a new configuration then we will be reloaded anyways */ if (load_fleet_config(ctx) == FLB_TRUE) { return 0;