Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes #1723: Force dataConnectionCount to 1 if set to zero. Changed t… #1729

Merged
merged 1 commit into from
Jan 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 0 additions & 8 deletions include/qpid/dispatch/router_core.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,6 @@ qd_dispatch_t *qdr_core_dispatch(qdr_core_t *core);
*/
void qdr_process_tick(qdr_core_t *core);

/**
* Return the count of worker threads.
*
* @param core Pointer to the core object returned by qd_core()
* @return count of worker threads
*/
int qdr_core_get_worker_thread_count(const qdr_core_t *core);

/**
* @brief Return the text of the router's virtual application network ID, or 0.
*
Expand Down
2 changes: 1 addition & 1 deletion python/skupper_router/management/skrouter.json
Original file line number Diff line number Diff line change
Expand Up @@ -618,7 +618,7 @@
"required": false
},
"dataConnectionCount": {
"description": "The number of parallel data connections to carry streaming data between routers. Applies only to interior routers",
"description": "The number of parallel data connections to carry streaming data between routers. Applies only to interior routers and to connectors with a role of 'inter-router'",
"type": "string",
"required": false,
"default": "auto",
Expand Down
37 changes: 1 addition & 36 deletions src/adaptors/amqp/connection_manager.c
Original file line number Diff line number Diff line change
Expand Up @@ -267,15 +267,6 @@ QD_EXPORT qd_error_t qd_entity_refresh_connector(qd_entity_t* entity, void *impl
}


// This is used to calculate inter-router data connection
// count when the user explicitly requests 'auto'
// or lets it default to that.
static int auto_calc_connection_count(qd_dispatch_t *qd)
{
return (qdr_core_get_worker_thread_count(qd_dispatch_router_core(qd)) + 1) / 2;
}


QD_EXPORT qd_connector_t *qd_dispatch_configure_connector(qd_dispatch_t *qd, qd_entity_t *entity)
{
qd_connection_manager_t *cm = qd->connection_manager;
Expand Down Expand Up @@ -309,38 +300,12 @@ QD_EXPORT qd_connector_t *qd_dispatch_configure_connector(qd_dispatch_t *qd, qd_
}
}

uint32_t connection_count = 0;
//
// If the user asks for automatic setting of the number
// of data connnection count, make one data connection
// for every two worker threads. This is the best ratio
// for high load, as determined by throughput performance
// testing.
//
if (!strcmp("auto", ct->config.data_connection_count)) {
// The user has explicitly requested 'auto'.
connection_count = auto_calc_connection_count(qd);
qd_log(LOG_CONN_MGR, QD_LOG_INFO, "Inter-router data connections calculated at %d ",
connection_count);
} else if (1 == sscanf(ct->config.data_connection_count, "%u", &connection_count)) {
// The user has requested a specific number of connections.
qd_log(LOG_CONN_MGR, QD_LOG_INFO, "Inter-router data connections set to %d ", connection_count);
} else {
// The user has entered a non-numeric value that is not 'auto'.
// This is not a legal value. Default to 'auto' and mention it.
qd_log(LOG_CONN_MGR, QD_LOG_INFO, "Bad value \"%s\" for dataConnectionCount ",
ct->config.data_connection_count);
connection_count = auto_calc_connection_count(qd);
qd_log(LOG_CONN_MGR, QD_LOG_INFO, "Inter-router data connections calculated at %d ",
connection_count);
}

//
// If this connection has a data-connection-group, set up the group members now
//
if (ct->config.has_data_connectors) {
qd_generate_discriminator(ct->group_correlator);
for (uint32_t i = 0; i < connection_count; i++) {
for (int i = 0; i < qd->data_connection_count; i++) {
qd_connector_t *dc = qd_server_connector(qd->server);
if (!dc) {
qd_error(QD_ERROR_CONFIG, "Failed to create data Connector %s: resource allocation failed", ct->config.name);
Expand Down
8 changes: 1 addition & 7 deletions src/adaptors/amqp/server_config.c
Original file line number Diff line number Diff line change
Expand Up @@ -152,14 +152,9 @@ qd_error_t qd_server_config_load(qd_dispatch_t *qd, qd_server_config_t *config,
if (strcmp(config->role, "inter-router") == 0) {
// For inter-router connections only, the dataConnectionCount defaults to "auto",
// which means it will be determined as a function of the number of worker threads.
config->data_connection_count = strdup(qd->data_connection_count);
// If the user has *not* explicitly set the value "0",
// then we will have some data connections.
if (strcmp(config->data_connection_count, "0")) {
config->has_data_connectors = true;
}
} else {
config->data_connection_count = strdup("0");
config->has_data_connectors = true;
}

set_config_host(config, entity);
Expand Down Expand Up @@ -281,7 +276,6 @@ void qd_server_config_free(qd_server_config_t *cf)
if (cf->log_message) free(cf->log_message);
if (cf->policy_vhost) free(cf->policy_vhost);

free(cf->data_connection_count);
if (cf->conn_props) pn_data_free(cf->conn_props);

memset(cf, 0, sizeof(*cf));
Expand Down
4 changes: 0 additions & 4 deletions src/adaptors/amqp/server_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -268,10 +268,6 @@ typedef struct qd_server_config_t {
*/
pn_data_t *conn_props;

/**
* For inter-router roles only. The number of data connections associated with the link.
*/
char *data_connection_count;
bool has_data_connectors;

/**
Expand Down
25 changes: 22 additions & 3 deletions src/dispatch.c
Original file line number Diff line number Diff line change
Expand Up @@ -238,9 +238,29 @@ qd_error_t qd_dispatch_configure_router(qd_dispatch_t *qd, qd_entity_t *entity)
strcpy(qd->router_id, mode);
qd_generate_discriminator(qd->router_id + strlen(qd->router_id));
}

qd->thread_count = qd_entity_opt_long(entity, "workerThreads", 4); QD_ERROR_RET();
qd->data_connection_count = qd_entity_opt_string(entity, "dataConnectionCount", "auto"); QD_ERROR_RET();
char *data_conn_count_str = qd_entity_opt_string(entity, "dataConnectionCount", "auto"); QD_ERROR_RET();

if (!strcmp("auto", data_conn_count_str)) {
// The user has explicitly requested 'auto'.
qd->data_connection_count = (qd->thread_count + 1) / 2;
qd_log(LOG_ROUTER, QD_LOG_INFO, "Inter-router data connections calculated at %d ", qd->data_connection_count);
} else if (1 == sscanf(data_conn_count_str, "%u", &qd->data_connection_count)) {
// The user has requested a specific number of connections.
if (qd->data_connection_count == 0) {
// Force data_connection_count to 1 if set to 0.
qd->data_connection_count = 1;
}
qd_log(LOG_ROUTER, QD_LOG_INFO, "Inter-router data connections set to %d ", qd->data_connection_count);
} else {
// The user has entered a non-numeric value that is not 'auto'.
// This is not a legal value. Default to 'auto' and mention it.
qd_log(LOG_ROUTER, QD_LOG_INFO, "Bad value \"%s\" for dataConnectionCount ", data_conn_count_str);
qd->data_connection_count = (qd->thread_count + 1) / 2;
qd_log(LOG_ROUTER, QD_LOG_INFO, "Inter-router data connections calculated at %d ", qd->data_connection_count);
}
free(data_conn_count_str);

qd->timestamps_in_utc = qd_entity_opt_bool(entity, "timestampsInUTC", false); QD_ERROR_RET();
qd->timestamp_format = qd_entity_opt_string(entity, "timestampFormat", 0); QD_ERROR_RET();
qd->metadata = qd_entity_opt_string(entity, "metadata", 0); QD_ERROR_RET();
Expand Down Expand Up @@ -385,7 +405,6 @@ void qd_dispatch_free(qd_dispatch_t *qd)
qd_http_server_free(qd_server_http(qd->server));

free(qd->sasl_config_path);
free(qd->data_connection_count);
free(qd->sasl_config_name);
qd_connection_manager_free(qd->connection_manager);
qd_policy_free(qd->policy);
Expand Down
25 changes: 12 additions & 13 deletions src/dispatch_private.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,19 +46,18 @@ struct qd_dispatch_t {
qd_connection_manager_t *connection_manager;
qd_policy_t *policy;
qd_address_treatment_t default_treatment;

int thread_count;
char *sasl_config_path;
char *sasl_config_name;
char *router_area;
char *router_id;
qd_router_mode_t router_mode;
char *van_id;
char *timestamp_format;
char *metadata;
bool timestamps_in_utc;
char *data_connection_count;
bool terminate_tcp_conns;
qd_router_mode_t router_mode;
int thread_count;
uint32_t data_connection_count;
char *sasl_config_path;
char *sasl_config_name;
char *router_area;
char *router_id;
char *van_id;
char *timestamp_format;
char *metadata;
bool timestamps_in_utc;
bool terminate_tcp_conns;
};

qd_dispatch_t *qd_dispatch_get_dispatch(void);
Expand Down
4 changes: 0 additions & 4 deletions src/router_core/router_core.c
Original file line number Diff line number Diff line change
Expand Up @@ -390,10 +390,6 @@ void qdr_router_node_free(qdr_core_t *core, qdr_node_t *rnode)
free_qdr_node_t(rnode);
}

int qdr_core_get_worker_thread_count(const qdr_core_t *core)
{
return core->worker_thread_count;
}

const char *qdr_core_van_id(const qdr_core_t *core)
{
Expand Down
5 changes: 4 additions & 1 deletion tests/system_tests_one_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -3588,7 +3588,10 @@ def test_60_threads_vs_data_connection_count(self):
expected_result = int((int(worker_threads) + 1) / 2)
msg = "Inter-router data connections calculated at " + str(expected_result)
else:
expected_result = con_count
if con_count == '0':
expected_result = '1'
else:
expected_result = con_count
msg = "Inter-router data connections set to " + str(expected_result)

print("worker threads:", worker_threads, ", requested connections:", con_count, ", expected result:", expected_result, end=" ")
Expand Down
Loading