Skip to content

Commit

Permalink
Merge branch 'main' into iotcored_updates
Browse files Browse the repository at this point in the history
  • Loading branch information
n9wxu authored Jan 16, 2025
2 parents 300e797 + ec9c996 commit 8a1c5a3
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 35 deletions.
2 changes: 2 additions & 0 deletions ggdeploymentd/src/deployment_queue.c
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,8 @@ GglError ggl_deployment_dequeue(GglDeployment **deployment) {
}

void ggl_deployment_release(GglDeployment *deployment) {
GGL_MTX_SCOPE_GUARD(&queue_mtx);

assert(ggl_buffer_eq(
deployment->deployment_id, deployments[queue_index].deployment_id
));
Expand Down
6 changes: 0 additions & 6 deletions ggdeploymentd/src/entry.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,6 @@
#include <stdint.h>
#include <stdlib.h>

static void *job_listener_thread(void *ctx) {
(void) ctx;
listen_for_jobs_deployments();
return NULL;
}

GglError run_ggdeploymentd(const char *bin_path) {
GGL_LOGI("Started ggdeploymentd process.");

Expand Down
69 changes: 41 additions & 28 deletions ggdeploymentd/src/iot_jobs_listener.c
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,19 @@ static uint8_t current_deployment_id_buf[64];
static GglByteVec current_deployment_id;
static int64_t current_job_version;

pthread_mutex_t topic_scratch_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_mutex_t listener_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t listener_cond = PTHREAD_COND_INITIALIZER;
static bool needs_describe = false;
// aws_iot_mqtt subscription handles
static uint32_t connection_status_handle;
static uint32_t next_job_handle;

static pthread_mutex_t topic_scratch_mutex = PTHREAD_MUTEX_INITIALIZER;
static uint8_t topic_scratch[256];
static uint8_t response_scratch[4096];
static uint8_t subscription_scratch[4096];

// aws_iot_mqtt subscription handles
static uint32_t connection_status_handle;
static uint32_t next_job_handle;
static void listen_for_jobs_deployments(void);

static GglError create_get_next_job_topic(
GglBuffer thing_name, GglBuffer *job_topic
Expand Down Expand Up @@ -234,6 +239,7 @@ static GglError update_job(

static GglError describe_next_job(void *ctx) {
(void) ctx;
GGL_LOGD("Requesting next job information.");
GGL_MTX_SCOPE_GUARD(&topic_scratch_mutex);
GglBuffer topic = GGL_BUF(topic_scratch);
GglError ret = create_get_next_job_topic(thing_name_buf, &topic);
Expand Down Expand Up @@ -274,8 +280,10 @@ static GglError describe_next_job(void *ctx) {
return GGL_ERR_FAILURE;
}
if (execution == NULL) {
GGL_LOGD("No deployment to process.");
return GGL_ERR_OK;
}
GGL_LOGD("Processing execution.");
return process_job_execution(execution->map);
}

Expand Down Expand Up @@ -374,6 +382,7 @@ static GglError next_job_execution_changed_callback(
) {
(void) ctx;
(void) handle;
GGL_LOGD("Received next job execution changed response.");
GglBumpAlloc json_allocator
= ggl_bump_alloc_init(GGL_BUF(subscription_scratch));
GglObject json = GGL_OBJ_NULL();
Expand Down Expand Up @@ -408,35 +417,41 @@ static GglError next_job_execution_changed_callback(
return GGL_ERR_OK;
}

static void *job_listener_thread(void *ctx) {
void *job_listener_thread(void *ctx) {
(void) ctx;
ggl_backoff_indefinite(1, 1000, get_thing_name, NULL);
listen_for_jobs_deployments();

while (true) {
pthread_mutex_lock(&listener_mutex);
pthread_cond_wait(&listener_cond, &listener_mutex);
pthread_mutex_unlock(&listener_mutex);

if ((next_job_handle == 0) || (connection_status_handle == 0)) {
listen_for_jobs_deployments();
}

if (needs_describe) {
ggl_backoff_indefinite(10, 10000, describe_next_job, NULL);
needs_describe = false;
}
}
return NULL;
}

// TODO: replace on_close with mqtt reconnect signal handling
// Since that will significantly reduce chatter
// between jobs listener and iotcored
// when the core device is offline from the network.
static void subscribe_on_close(void *ctx, uint32_t handle) {
(void) ctx;
if (handle != next_job_handle) {
return;
}
(void) handle;
GGL_LOGD("Subscriptions closed. Subscribing again.");
GGL_MTX_SCOPE_GUARD(&listener_mutex);
next_job_handle = 0;
pthread_t ptid_jobs;
pthread_create(&ptid_jobs, NULL, &job_listener_thread, NULL);
pthread_detach(ptid_jobs);
connection_status_handle = 0;
pthread_cond_signal(&listener_cond);
}

static GglError subscribe_to_next_job_topics(void *ctx) {
(void) ctx;
if (next_job_handle != 0) {
return GGL_ERR_OK;
}

GGL_MTX_SCOPE_GUARD(&topic_scratch_mutex);

GglBuffer job_topic = GGL_BUF(topic_scratch);
GglError err
= create_next_job_execution_changed_topic(thing_name_buf, &job_topic);
Expand All @@ -459,16 +474,16 @@ static GglError iot_jobs_on_reconnect(
(void) ctx;
(void) handle;
if (data.boolean) {
ggl_backoff_indefinite(10, 10000, describe_next_job, NULL);
GGL_LOGD("Reconnected to MQTT; requesting new job query publish.");
GGL_MTX_SCOPE_GUARD(&listener_mutex);
needs_describe = true;
pthread_cond_signal(&listener_cond);
}
return GGL_ERR_OK;
}

static GglError subscribe_to_connection_status(void *ctx) {
(void) ctx;
if (connection_status_handle != 0) {
return GGL_ERR_OK;
}
return ggl_subscribe(
GGL_STR("aws_iot_mqtt"),
GGL_STR("connection_status"),
Expand All @@ -482,12 +497,10 @@ static GglError subscribe_to_connection_status(void *ctx) {
}

// Make subscriptions and kick off IoT Jobs Workflow
void listen_for_jobs_deployments(void) {
static void listen_for_jobs_deployments(void) {
// Following "Get the next job" workflow
// https://docs.aws.amazon.com/iot/latest/developerguide/jobs-workflow-device-online.html
next_job_handle = 0;
connection_status_handle = 0;
ggl_backoff_indefinite(1, 1000, get_thing_name, NULL);
GGL_LOGD("Subscribing to IoT Jobs topics.");
ggl_backoff_indefinite(10, 10000, subscribe_to_next_job_topics, NULL);
ggl_backoff_indefinite(10, 10000, subscribe_to_connection_status, NULL);
}
Expand Down
2 changes: 1 addition & 1 deletion ggdeploymentd/src/iot_jobs_listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
#include <ggl/error.h>
#include <stdint.h>

void listen_for_jobs_deployments(void);
void *job_listener_thread(void *ctx);

GglError update_current_jobs_deployment(
GglBuffer deployment_id, GglBuffer status
Expand Down

0 comments on commit 8a1c5a3

Please sign in to comment.