Skip to content

Commit

Permalink
Fixes #1723: Force dataConnectionCount to 1 if set to zero. Changed t…
Browse files Browse the repository at this point in the history
…ype of data_connection_count to int (#1729)
  • Loading branch information
ganeshmurthy authored Jan 30, 2025
1 parent ae494a1 commit 6edb844
Show file tree
Hide file tree
Showing 9 changed files with 41 additions and 77 deletions.
8 changes: 0 additions & 8 deletions include/qpid/dispatch/router_core.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,6 @@ qd_dispatch_t *qdr_core_dispatch(qdr_core_t *core);
*/
void qdr_process_tick(qdr_core_t *core);

/**
* Return the count of worker threads.
*
* @param core Pointer to the core object returned by qd_core()
* @return count of worker threads
*/
int qdr_core_get_worker_thread_count(const qdr_core_t *core);

/**
* @brief Return the text of the router's virtual application network ID, or 0.
*
Expand Down
2 changes: 1 addition & 1 deletion python/skupper_router/management/skrouter.json
Original file line number Diff line number Diff line change
Expand Up @@ -618,7 +618,7 @@
"required": false
},
"dataConnectionCount": {
"description": "The number of parallel data connections to carry streaming data between routers. Applies only to interior routers",
"description": "The number of parallel data connections to carry streaming data between routers. Applies only to interior routers and to connectors with a role of 'inter-router'",
"type": "string",
"required": false,
"default": "auto",
Expand Down
37 changes: 1 addition & 36 deletions src/adaptors/amqp/connection_manager.c
Original file line number Diff line number Diff line change
Expand Up @@ -267,15 +267,6 @@ QD_EXPORT qd_error_t qd_entity_refresh_connector(qd_entity_t* entity, void *impl
}


// This is used to calculate inter-router data connection
// count when the user explicitly requests 'auto'
// or lets it default to that.
static int auto_calc_connection_count(qd_dispatch_t *qd)
{
return (qdr_core_get_worker_thread_count(qd_dispatch_router_core(qd)) + 1) / 2;
}


QD_EXPORT qd_connector_t *qd_dispatch_configure_connector(qd_dispatch_t *qd, qd_entity_t *entity)
{
qd_connection_manager_t *cm = qd->connection_manager;
Expand Down Expand Up @@ -309,38 +300,12 @@ QD_EXPORT qd_connector_t *qd_dispatch_configure_connector(qd_dispatch_t *qd, qd_
}
}

uint32_t connection_count = 0;
//
// If the user asks for automatic setting of the number
// of data connnection count, make one data connection
// for every two worker threads. This is the best ratio
// for high load, as determined by throughput performance
// testing.
//
if (!strcmp("auto", ct->config.data_connection_count)) {
// The user has explicitly requested 'auto'.
connection_count = auto_calc_connection_count(qd);
qd_log(LOG_CONN_MGR, QD_LOG_INFO, "Inter-router data connections calculated at %d ",
connection_count);
} else if (1 == sscanf(ct->config.data_connection_count, "%u", &connection_count)) {
// The user has requested a specific number of connections.
qd_log(LOG_CONN_MGR, QD_LOG_INFO, "Inter-router data connections set to %d ", connection_count);
} else {
// The user has entered a non-numeric value that is not 'auto'.
// This is not a legal value. Default to 'auto' and mention it.
qd_log(LOG_CONN_MGR, QD_LOG_INFO, "Bad value \"%s\" for dataConnectionCount ",
ct->config.data_connection_count);
connection_count = auto_calc_connection_count(qd);
qd_log(LOG_CONN_MGR, QD_LOG_INFO, "Inter-router data connections calculated at %d ",
connection_count);
}

//
// If this connection has a data-connection-group, set up the group members now
//
if (ct->config.has_data_connectors) {
qd_generate_discriminator(ct->group_correlator);
for (uint32_t i = 0; i < connection_count; i++) {
for (int i = 0; i < qd->data_connection_count; i++) {
qd_connector_t *dc = qd_server_connector(qd->server);
if (!dc) {
qd_error(QD_ERROR_CONFIG, "Failed to create data Connector %s: resource allocation failed", ct->config.name);
Expand Down
8 changes: 1 addition & 7 deletions src/adaptors/amqp/server_config.c
Original file line number Diff line number Diff line change
Expand Up @@ -152,14 +152,9 @@ qd_error_t qd_server_config_load(qd_dispatch_t *qd, qd_server_config_t *config,
if (strcmp(config->role, "inter-router") == 0) {
// For inter-router connections only, the dataConnectionCount defaults to "auto",
// which means it will be determined as a function of the number of worker threads.
config->data_connection_count = strdup(qd->data_connection_count);
// If the user has *not* explicitly set the value "0",
// then we will have some data connections.
if (strcmp(config->data_connection_count, "0")) {
config->has_data_connectors = true;
}
} else {
config->data_connection_count = strdup("0");
config->has_data_connectors = true;
}

set_config_host(config, entity);
Expand Down Expand Up @@ -281,7 +276,6 @@ void qd_server_config_free(qd_server_config_t *cf)
if (cf->log_message) free(cf->log_message);
if (cf->policy_vhost) free(cf->policy_vhost);

free(cf->data_connection_count);
if (cf->conn_props) pn_data_free(cf->conn_props);

memset(cf, 0, sizeof(*cf));
Expand Down
4 changes: 0 additions & 4 deletions src/adaptors/amqp/server_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -268,10 +268,6 @@ typedef struct qd_server_config_t {
*/
pn_data_t *conn_props;

/**
* For inter-router roles only. The number of data connections associated with the link.
*/
char *data_connection_count;
bool has_data_connectors;

/**
Expand Down
25 changes: 22 additions & 3 deletions src/dispatch.c
Original file line number Diff line number Diff line change
Expand Up @@ -238,9 +238,29 @@ qd_error_t qd_dispatch_configure_router(qd_dispatch_t *qd, qd_entity_t *entity)
strcpy(qd->router_id, mode);
qd_generate_discriminator(qd->router_id + strlen(qd->router_id));
}

qd->thread_count = qd_entity_opt_long(entity, "workerThreads", 4); QD_ERROR_RET();
qd->data_connection_count = qd_entity_opt_string(entity, "dataConnectionCount", "auto"); QD_ERROR_RET();
char *data_conn_count_str = qd_entity_opt_string(entity, "dataConnectionCount", "auto"); QD_ERROR_RET();

if (!strcmp("auto", data_conn_count_str)) {
// The user has explicitly requested 'auto'.
qd->data_connection_count = (qd->thread_count + 1) / 2;
qd_log(LOG_ROUTER, QD_LOG_INFO, "Inter-router data connections calculated at %d ", qd->data_connection_count);
} else if (1 == sscanf(data_conn_count_str, "%u", &qd->data_connection_count)) {
// The user has requested a specific number of connections.
if (qd->data_connection_count == 0) {
// Force data_connection_count to 1 if set to 0.
qd->data_connection_count = 1;
}
qd_log(LOG_ROUTER, QD_LOG_INFO, "Inter-router data connections set to %d ", qd->data_connection_count);
} else {
// The user has entered a non-numeric value that is not 'auto'.
// This is not a legal value. Default to 'auto' and mention it.
qd_log(LOG_ROUTER, QD_LOG_INFO, "Bad value \"%s\" for dataConnectionCount ", data_conn_count_str);
qd->data_connection_count = (qd->thread_count + 1) / 2;
qd_log(LOG_ROUTER, QD_LOG_INFO, "Inter-router data connections calculated at %d ", qd->data_connection_count);
}
free(data_conn_count_str);

qd->timestamps_in_utc = qd_entity_opt_bool(entity, "timestampsInUTC", false); QD_ERROR_RET();
qd->timestamp_format = qd_entity_opt_string(entity, "timestampFormat", 0); QD_ERROR_RET();
qd->metadata = qd_entity_opt_string(entity, "metadata", 0); QD_ERROR_RET();
Expand Down Expand Up @@ -385,7 +405,6 @@ void qd_dispatch_free(qd_dispatch_t *qd)
qd_http_server_free(qd_server_http(qd->server));

free(qd->sasl_config_path);
free(qd->data_connection_count);
free(qd->sasl_config_name);
qd_connection_manager_free(qd->connection_manager);
qd_policy_free(qd->policy);
Expand Down
25 changes: 12 additions & 13 deletions src/dispatch_private.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,19 +46,18 @@ struct qd_dispatch_t {
qd_connection_manager_t *connection_manager;
qd_policy_t *policy;
qd_address_treatment_t default_treatment;

int thread_count;
char *sasl_config_path;
char *sasl_config_name;
char *router_area;
char *router_id;
qd_router_mode_t router_mode;
char *van_id;
char *timestamp_format;
char *metadata;
bool timestamps_in_utc;
char *data_connection_count;
bool terminate_tcp_conns;
qd_router_mode_t router_mode;
int thread_count;
uint32_t data_connection_count;
char *sasl_config_path;
char *sasl_config_name;
char *router_area;
char *router_id;
char *van_id;
char *timestamp_format;
char *metadata;
bool timestamps_in_utc;
bool terminate_tcp_conns;
};

qd_dispatch_t *qd_dispatch_get_dispatch(void);
Expand Down
4 changes: 0 additions & 4 deletions src/router_core/router_core.c
Original file line number Diff line number Diff line change
Expand Up @@ -390,10 +390,6 @@ void qdr_router_node_free(qdr_core_t *core, qdr_node_t *rnode)
free_qdr_node_t(rnode);
}

int qdr_core_get_worker_thread_count(const qdr_core_t *core)
{
return core->worker_thread_count;
}

const char *qdr_core_van_id(const qdr_core_t *core)
{
Expand Down
5 changes: 4 additions & 1 deletion tests/system_tests_one_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -3588,7 +3588,10 @@ def test_60_threads_vs_data_connection_count(self):
expected_result = int((int(worker_threads) + 1) / 2)
msg = "Inter-router data connections calculated at " + str(expected_result)
else:
expected_result = con_count
if con_count == '0':
expected_result = '1'
else:
expected_result = con_count
msg = "Inter-router data connections set to " + str(expected_result)

print("worker threads:", worker_threads, ", requested connections:", con_count, ", expected result:", expected_result, end=" ")
Expand Down

0 comments on commit 6edb844

Please sign in to comment.