Skip to content

Commit

Permalink
Merge branch 'master' into kccain/daos_17094
Browse files Browse the repository at this point in the history
Features: container test_engine_restart
  • Loading branch information
kccain committed Feb 14, 2025
2 parents 72e61bb + 7fa8052 commit afe1f33
Show file tree
Hide file tree
Showing 17 changed files with 621 additions and 487 deletions.
129 changes: 22 additions & 107 deletions src/cart/crt_context.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/*
* (C) Copyright 2016-2024 Intel Corporation.
* (C) Copyright 2025 Hewlett Packard Enterprise Development LP
*
* SPDX-License-Identifier: BSD-2-Clause-Patent
*/
Expand Down Expand Up @@ -1252,7 +1253,7 @@ crt_context_timeout_check(struct crt_context *crt_ctx)
D_ASSERTF(d_list_empty(&rpc_priv->crp_tmp_link_timeout),
"already on timeout list\n");
d_list_add_tail(&rpc_priv->crp_tmp_link_timeout, &timeout_list);
};
}
D_MUTEX_UNLOCK(&crt_ctx->cc_mutex);

/* handle the timeout RPCs */
Expand Down Expand Up @@ -1776,39 +1777,6 @@ crt_context_empty(crt_provider_t provider, int locked)
return rc;
}

static int64_t
crt_exec_progress_cb(struct crt_context *ctx, int64_t timeout)
{
struct crt_prog_cb_priv *cbs_prog;
crt_progress_cb cb_func;
void *cb_args;
size_t cbs_size, i;
int ctx_idx;
int rc;

if (unlikely(crt_plugin_gdata.cpg_inited == 0 || ctx == NULL))
return timeout;

rc = crt_context_idx(ctx, &ctx_idx);
if (unlikely(rc)) {
D_ERROR("crt_context_idx() failed, rc: %d.\n", rc);
return timeout;
}

cbs_size = crt_plugin_gdata.cpg_prog_size[ctx_idx];
cbs_prog = crt_plugin_gdata.cpg_prog_cbs[ctx_idx];

for (i = 0; i < cbs_size; i++) {
cb_func = cbs_prog[i].cpcp_func;
cb_args = cbs_prog[i].cpcp_args;
/* check for and execute progress callbacks here */
if (cb_func != NULL)
timeout = cb_func(ctx, timeout, cb_args);
}

return timeout;
}

int
crt_progress_cond(crt_context_t crt_ctx, int64_t timeout,
crt_progress_cond_cb_t cond_cb, void *arg)
Expand Down Expand Up @@ -1858,7 +1826,8 @@ crt_progress_cond(crt_context_t crt_ctx, int64_t timeout,
/** loop until callback returns non-null value */
while ((rc = cond_cb(arg)) == 0) {
crt_context_timeout_check(ctx);
timeout = crt_exec_progress_cb(ctx, timeout);
if (ctx->cc_prog_cb != NULL)
timeout = ctx->cc_prog_cb(ctx, timeout, ctx->cc_prog_cb_arg);

if (timeout < 0) {
/**
Expand Down Expand Up @@ -1930,7 +1899,8 @@ crt_progress(crt_context_t crt_ctx, int64_t timeout)
* progress
*/
crt_context_timeout_check(ctx);
timeout = crt_exec_progress_cb(ctx, timeout);
if (ctx->cc_prog_cb != NULL)
timeout = ctx->cc_prog_cb(ctx, timeout, ctx->cc_prog_cb_arg);

if (timeout != 0 && (rc == 0 || rc == -DER_TIMEDOUT)) {
/** call progress once again with the real timeout */
Expand All @@ -1950,93 +1920,38 @@ crt_progress(crt_context_t crt_ctx, int64_t timeout)
int
crt_register_progress_cb(crt_progress_cb func, int ctx_idx, void *args)
{
struct crt_prog_cb_priv *cbs_prog;
size_t i, cbs_size;
int rc = 0;
struct crt_context *ctx;
int rc;

if (ctx_idx >= CRT_SRV_CONTEXT_NUM) {
D_ERROR("ctx_idx %d >= %d\n", ctx_idx, CRT_SRV_CONTEXT_NUM);
D_GOTO(out, rc = -DER_INVAL);
}

D_MUTEX_LOCK(&crt_plugin_gdata.cpg_mutex);

cbs_size = crt_plugin_gdata.cpg_prog_size[ctx_idx];
cbs_prog = crt_plugin_gdata.cpg_prog_cbs[ctx_idx];

for (i = 0; i < cbs_size; i++) {
if (cbs_prog[i].cpcp_func == func &&
cbs_prog[i].cpcp_args == args) {
D_GOTO(out_unlock, rc = -DER_EXIST);
}
}

for (i = 0; i < cbs_size; i++) {
if (cbs_prog[i].cpcp_func == NULL) {
cbs_prog[i].cpcp_args = args;
cbs_prog[i].cpcp_func = func;
D_GOTO(out_unlock, rc = 0);
}
D_GOTO(error, rc = -DER_INVAL);
}

D_FREE(crt_plugin_gdata.cpg_prog_cbs_old[ctx_idx]);

crt_plugin_gdata.cpg_prog_cbs_old[ctx_idx] = cbs_prog;
cbs_size += CRT_CALLBACKS_NUM;

D_ALLOC_ARRAY(cbs_prog, cbs_size);
if (cbs_prog == NULL) {
crt_plugin_gdata.cpg_prog_cbs_old[ctx_idx] = NULL;
D_GOTO(out_unlock, rc = -DER_NOMEM);
ctx = crt_context_lookup(ctx_idx);
if (ctx == NULL) {
D_ERROR("crt_context_lookup(%d) failed.\n", ctx_idx);
D_GOTO(error, rc = -DER_NONEXIST);
}

if (i > 0)
memcpy(cbs_prog, crt_plugin_gdata.cpg_prog_cbs_old[ctx_idx],
i * sizeof(*cbs_prog));
cbs_prog[i].cpcp_args = args;
cbs_prog[i].cpcp_func = func;
D_MUTEX_LOCK(&ctx->cc_mutex);
ctx->cc_prog_cb = func;
ctx->cc_prog_cb_arg = args;
D_MUTEX_UNLOCK(&ctx->cc_mutex);

crt_plugin_gdata.cpg_prog_cbs[ctx_idx] = cbs_prog;
crt_plugin_gdata.cpg_prog_size[ctx_idx] = cbs_size;
return 0;

out_unlock:
D_MUTEX_UNLOCK(&crt_plugin_gdata.cpg_mutex);
out:
error:
return rc;
}

int
crt_unregister_progress_cb(crt_progress_cb func, int ctx_idx, void *args)
{
struct crt_prog_cb_priv *cbs_prog;
size_t i, cbs_size;
int rc = -DER_NONEXIST;

if (ctx_idx >= CRT_SRV_CONTEXT_NUM) {
D_ERROR("ctx_idx %d >= %d\n", ctx_idx, CRT_SRV_CONTEXT_NUM);
D_GOTO(out, rc = -DER_INVAL);
}

D_MUTEX_LOCK(&crt_plugin_gdata.cpg_mutex);
(void)func;
(void)args;

cbs_size = crt_plugin_gdata.cpg_prog_size[ctx_idx];
cbs_prog = crt_plugin_gdata.cpg_prog_cbs[ctx_idx];

for (i = 0; i < cbs_size; i++) {
if (cbs_prog[i].cpcp_func == func &&
cbs_prog[i].cpcp_args == args) {
cbs_prog[i].cpcp_func = NULL;
cbs_prog[i].cpcp_args = NULL;
D_GOTO(out_unlock, rc = 0);
}
}

out_unlock:
D_FREE(crt_plugin_gdata.cpg_prog_cbs_old[ctx_idx]);

D_MUTEX_UNLOCK(&crt_plugin_gdata.cpg_mutex);
out:
return rc;
return crt_register_progress_cb(NULL, ctx_idx, NULL);
}

int
Expand Down
27 changes: 2 additions & 25 deletions src/cart/crt_init.c
Original file line number Diff line number Diff line change
Expand Up @@ -374,29 +374,16 @@ data_init(int server, crt_init_options_t *opt)
static int
crt_plugin_init(void)
{
struct crt_prog_cb_priv *cbs_prog;
struct crt_event_cb_priv *cbs_event;
size_t cbs_size = CRT_CALLBACKS_NUM;
int i, rc;
int rc;

D_ASSERT(crt_plugin_gdata.cpg_inited == 0);

for (i = 0; i < CRT_SRV_CONTEXT_NUM; i++) {
crt_plugin_gdata.cpg_prog_cbs_old[i] = NULL;
D_ALLOC_ARRAY(cbs_prog, cbs_size);
if (cbs_prog == NULL) {
for (i--; i >= 0; i--)
D_FREE(crt_plugin_gdata.cpg_prog_cbs[i]);
D_GOTO(out, rc = -DER_NOMEM);
}
crt_plugin_gdata.cpg_prog_size[i] = cbs_size;
crt_plugin_gdata.cpg_prog_cbs[i] = cbs_prog;
}

crt_plugin_gdata.cpg_event_cbs_old = NULL;
D_ALLOC_ARRAY(cbs_event, cbs_size);
if (cbs_event == NULL) {
D_GOTO(out_destroy_prog, rc = -DER_NOMEM);
D_GOTO(out, rc = -DER_NOMEM);
}
crt_plugin_gdata.cpg_event_size = cbs_size;
crt_plugin_gdata.cpg_event_cbs = cbs_event;
Expand All @@ -410,27 +397,17 @@ crt_plugin_init(void)

out_destroy_event:
D_FREE(crt_plugin_gdata.cpg_event_cbs);
out_destroy_prog:
for (i = 0; i < CRT_SRV_CONTEXT_NUM; i++)
D_FREE(crt_plugin_gdata.cpg_prog_cbs[i]);
out:
return rc;
}

static void
crt_plugin_fini(void)
{
int i;

D_ASSERT(crt_plugin_gdata.cpg_inited == 1);

crt_plugin_gdata.cpg_inited = 0;

for (i = 0; i < CRT_SRV_CONTEXT_NUM; i++) {
D_FREE(crt_plugin_gdata.cpg_prog_cbs[i]);
D_FREE(crt_plugin_gdata.cpg_prog_cbs_old[i]);
}

D_FREE(crt_plugin_gdata.cpg_event_cbs);
D_FREE(crt_plugin_gdata.cpg_event_cbs_old);

Expand Down
13 changes: 4 additions & 9 deletions src/cart/crt_internal_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -178,11 +178,6 @@ struct crt_gdata {

extern struct crt_gdata crt_gdata;

struct crt_prog_cb_priv {
crt_progress_cb cpcp_func;
void *cpcp_args;
};

struct crt_event_cb_priv {
crt_event_cb cecp_func;
void *cecp_args;
Expand Down Expand Up @@ -357,10 +352,6 @@ crt_env_dump(void)

/* structure of global fault tolerance data */
struct crt_plugin_gdata {
/* list of progress callbacks */
size_t cpg_prog_size[CRT_SRV_CONTEXT_NUM];
struct crt_prog_cb_priv *cpg_prog_cbs[CRT_SRV_CONTEXT_NUM];
struct crt_prog_cb_priv *cpg_prog_cbs_old[CRT_SRV_CONTEXT_NUM];
/* list of event notification callbacks */
size_t cpg_event_size;
struct crt_event_cb_priv *cpg_event_cbs;
Expand Down Expand Up @@ -405,6 +396,10 @@ struct crt_context {
crt_rpc_task_t cc_rpc_cb; /** rpc callback */
crt_rpc_task_t cc_iv_resp_cb;

/* progress callback */
void *cc_prog_cb_arg;
crt_progress_cb cc_prog_cb;

/** RPC tracking */
/** in-flight endpoint tracking hash table */
struct d_hash_table cc_epi_table;
Expand Down
3 changes: 2 additions & 1 deletion src/cart/utils/crt_utils.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/*
* (C) Copyright 2019-2024 Intel Corporation.
* (C) Copyright 2025 Hewlett Packard Enterprise Development LP
*
* SPDX-License-Identifier: BSD-2-Clause-Patent
*/
Expand Down Expand Up @@ -284,7 +285,7 @@ crtu_wait_for_ranks(crt_context_t ctx, crt_group_t *grp,

rc = d_gettime(&t2);
D_ASSERTF(rc == 0, "d_gettime() failed; rc=%d\n", rc);
time_s = d_time2s(d_timediff(t1, t2));
time_s = d_time2s(d_timediff(&t1, &t2));

if (ws.rc != 0 && time_s < total_timeout)
sleep(1);
Expand Down
26 changes: 1 addition & 25 deletions src/client/api/event.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/**
* (C) Copyright 2016-2024 Intel Corporation.
* (C) Copyright 2025 Google LLC
*
* SPDX-License-Identifier: BSD-2-Clause-Patent
*/
Expand Down Expand Up @@ -27,31 +28,6 @@ static __thread bool ev_thpriv_is_init;
*/
static uint32_t ev_prog_timeout;

#define EQ_WITH_CRT

#if !defined(EQ_WITH_CRT)

#define crt_init(a,b,c) ({0;})
#define crt_finalize() ({0;})
#define crt_context_create(a, b) ({0;})
#define crt_context_destroy(a, b) ({0;})
#define crt_progress_cond(ctx, timeout, cb, args) \
({ \
int __rc = cb(args); \
\
while ((timeout) != 0 && __rc == 0) { \
sleep(1); \
__rc = cb(args); \
if ((timeout) < 0) \
continue; \
if ((timeout) < 1000000) \
break; \
(timeout) -= 1000000; \
} \
0; \
})
#endif

/*
* For the moment, we use a global crt_context_t to create all the RPC requests
* this module uses.
Expand Down
Loading

0 comments on commit afe1f33

Please sign in to comment.