From a57a4667bcda97aa1dd9d6b5ba4be7a9442c735b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jiri=20Dan=C4=9Bk?= Date: Sun, 1 May 2022 00:33:43 +0200 Subject: [PATCH] NO-ISSUE: Add Clang Thread Safety Annotations for `_CT` functions --- .../dispatch/internal/thread_annotations.h | 4 + include/qpid/dispatch/router_core.h | 9 +- src/adaptors/http_common.h | 6 +- src/adaptors/tcp/tcp_adaptor.h | 6 +- src/router_core/address_watch.c | 6 +- src/router_core/address_watch.h | 4 +- src/router_core/agent.c | 18 ++-- src/router_core/agent_address.h | 7 +- src/router_core/agent_config_address.h | 14 ++-- src/router_core/agent_config_auto_link.c | 2 +- src/router_core/agent_config_auto_link.h | 12 +-- src/router_core/agent_connection.c | 17 ++-- src/router_core/agent_connection.h | 8 +- src/router_core/agent_link.h | 6 +- src/router_core/agent_router.h | 6 +- src/router_core/connections.c | 23 +++-- src/router_core/core_client_api.c | 32 ++++--- src/router_core/core_client_api.h | 6 +- src/router_core/core_link_endpoint.c | 15 ++-- src/router_core/delivery.c | 20 ++--- src/router_core/delivery.h | 32 +++---- src/router_core/forwarder.c | 13 +-- src/router_core/management_agent.c | 6 +- .../address_lookup_client.c | 6 +- .../edge_addr_tracking/edge_addr_tracking.c | 16 ++-- .../modules/edge_router/addr_proxy.c | 21 +++-- .../modules/edge_router/connection_manager.c | 8 +- .../modules/heartbeat_edge/heartbeat_edge.c | 6 +- .../heartbeat_server/heartbeat_server.c | 6 +- src/router_core/modules/mobile_sync/mobile.c | 32 +++---- .../streaming_link_scrubber.c | 8 +- .../delivery_tracker.c | 12 +-- .../modules/test_hooks/core_test_hooks.c | 16 ++-- src/router_core/route_control.c | 12 +-- src/router_core/route_control.h | 14 ++-- src/router_core/route_tables.c | 24 +++--- src/router_core/router_core.c | 6 +- src/router_core/router_core_private.h | 83 +++++++++---------- src/router_core/router_core_thread.c | 2 +- src/router_core/transfer.c | 11 +-- src/router_node.c | 2 +- tests/core_timer_test.c | 6 +- tests/run_unit_tests.c | 5 +- 43 files changed, 292 insertions(+), 276 deletions(-) diff --git a/include/qpid/dispatch/internal/thread_annotations.h b/include/qpid/dispatch/internal/thread_annotations.h index 60c7cf2d6..808e92f80 100644 --- a/include/qpid/dispatch/internal/thread_annotations.h +++ b/include/qpid/dispatch/internal/thread_annotations.h @@ -84,3 +84,7 @@ #define TA_RET_CAP(x) THREAD_ANNOTATION(lock_returned(x)) #define TA_SCOPED_CAP THREAD_ANNOTATION(scoped_lockable) #define TA_NO_THREAD_SAFETY_ANALYSIS THREAD_ANNOTATION(no_thread_safety_analysis) + +typedef void const * const core_thread_capability_t TA_CAP("core_thread"); + +extern core_thread_capability_t core_thread_capability; diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h index 3ea0b8f51..e07c6b3eb 100644 --- a/include/qpid/dispatch/router_core.h +++ b/include/qpid/dispatch/router_core.h @@ -46,12 +46,13 @@ ENUM_DECLARE(qd_router_mode); /** * Allocate and start an instance of the router core module. */ -qdr_core_t *qdr_core(qd_dispatch_t *qd, qd_router_mode_t mode, const char *area, const char *id); +qdr_core_t *qdr_core(qd_dispatch_t *qd, qd_router_mode_t mode, const char *area, const char *id) + TA_RET_CAP(core_thread_capability); /** * Stop and deallocate an instance of the router core. */ -void qdr_core_free(qdr_core_t *core); +void qdr_core_free(qdr_core_t *core) TA_REQ(core_thread_capability); /** ****************************************************************************** @@ -363,8 +364,8 @@ void qdr_manage_update(qdr_core_t *core, void *context, qd_router_entity_type_t qdr_query_t *qdr_manage_query(qdr_core_t *core, void *context, qd_router_entity_type_t type, qd_parsed_field_t *attribute_names, qd_composed_field_t *body, - uint64_t in_conn); -void qdr_query_add_attribute_names(qdr_query_t *query); + uint64_t in_conn) TA_REQ(core_thread_capability); +void qdr_query_add_attribute_names(qdr_query_t *query) TA_REQ(core_thread_capability); void qdr_query_get_first(qdr_query_t *query, int offset); void qdr_query_get_next(qdr_query_t *query); void qdr_query_free(qdr_query_t *query); diff --git a/src/adaptors/http_common.h b/src/adaptors/http_common.h index 6318a3e44..08e524244 100644 --- a/src/adaptors/http_common.h +++ b/src/adaptors/http_common.h @@ -97,13 +97,13 @@ QD_EXPORT void qd_dispatch_delete_http_connector(qd_dispatch_t *qd, void *impl); QD_EXPORT qd_error_t qd_entity_refresh_httpConnector(qd_entity_t* entity, void *impl); // Management interfaces for retrieval of HttpRequestInfo entities -void qdra_http_request_info_get_first_CT(qdr_core_t *core, qdr_query_t *query, int offset); -void qdra_http_request_info_get_next_CT(qdr_core_t *core, qdr_query_t *query); +void qdra_http_request_info_get_first_CT(qdr_core_t *core, qdr_query_t *query, int offset) TA_REQ(core_thread_capability); +void qdra_http_request_info_get_next_CT(qdr_core_t *core, qdr_query_t *query) TA_REQ(core_thread_capability); void qdra_http_request_info_get_CT(qdr_core_t *core, qd_iterator_t *name, qd_iterator_t *identity, qdr_query_t *query, - const char *qdr_http_request_info_columns[]); + const char *qdr_http_request_info_columns[]) TA_REQ(core_thread_capability); #define QDR_HTTP_REQUEST_INFO_COLUMN_COUNT 11 extern const char *qdr_http_request_info_columns[QDR_HTTP_REQUEST_INFO_COLUMN_COUNT + 1]; diff --git a/src/adaptors/tcp/tcp_adaptor.h b/src/adaptors/tcp/tcp_adaptor.h index f788fefd5..64fb36ed6 100644 --- a/src/adaptors/tcp/tcp_adaptor.h +++ b/src/adaptors/tcp/tcp_adaptor.h @@ -96,13 +96,13 @@ struct qd_tcp_connector_t DEQ_DECLARE(qd_tcp_connector_t, qd_tcp_connector_list_t); ALLOC_DECLARE(qd_tcp_connector_t); -void qdra_tcp_connection_get_first_CT(qdr_core_t *core, qdr_query_t *query, int offset); -void qdra_tcp_connection_get_next_CT(qdr_core_t *core, qdr_query_t *query); +void qdra_tcp_connection_get_first_CT(qdr_core_t *core, qdr_query_t *query, int offset) TA_REQ(core_thread_capability); +void qdra_tcp_connection_get_next_CT(qdr_core_t *core, qdr_query_t *query) TA_REQ(core_thread_capability); void qdra_tcp_connection_get_CT(qdr_core_t *core, qd_iterator_t *name, qd_iterator_t *identity, qdr_query_t *query, - const char *qdr_tcp_connection_columns[]); + const char *qdr_tcp_connection_columns[]) TA_REQ(core_thread_capability); #define QDR_TCP_CONNECTION_COLUMN_COUNT 10 extern const char *qdr_tcp_connection_columns[QDR_TCP_CONNECTION_COLUMN_COUNT + 1]; diff --git a/src/router_core/address_watch.c b/src/router_core/address_watch.c index 45e712197..352adac99 100644 --- a/src/router_core/address_watch.c +++ b/src/router_core/address_watch.c @@ -35,9 +35,9 @@ ALLOC_DEFINE(qdr_address_watch_t); static void qdr_watch_invoker(qdr_core_t *core, qdr_general_work_t *work, bool discard); static void qdr_watch_cancel_invoker(qdr_core_t *core, qdr_general_work_t *work, bool discard); -static void qdr_core_watch_address_CT(qdr_core_t *core, qdr_action_t *action, bool discard); -static void qdr_core_unwatch_address_CT(qdr_core_t *core, qdr_action_t *action, bool discard); -static void qdr_address_watch_free_CT(qdr_core_t *core, qdr_address_watch_t *watch); +static void qdr_core_watch_address_CT(qdr_core_t *core, qdr_action_t *action, bool discard) TA_REQ(core_thread_capability); +static void qdr_core_unwatch_address_CT(qdr_core_t *core, qdr_action_t *action, bool discard) TA_REQ(core_thread_capability); +static void qdr_address_watch_free_CT(qdr_core_t *core, qdr_address_watch_t *watch) TA_REQ(core_thread_capability); //================================================================================== // Core Interface Functions diff --git a/src/router_core/address_watch.h b/src/router_core/address_watch.h index 307e38548..72b267074 100644 --- a/src/router_core/address_watch.h +++ b/src/router_core/address_watch.h @@ -34,8 +34,8 @@ DEQ_DECLARE(qdr_address_watch_t, qdr_address_watch_list_t); * @param core Pointer to the router core state * @param addr Pointer to the address record that was modified */ -void qdr_trigger_address_watch_CT(qdr_core_t *core, qdr_address_t *addr); +void qdr_trigger_address_watch_CT(qdr_core_t *core, qdr_address_t *addr) TA_REQ(core_thread_capability); -void qdr_address_watch_shutdown(qdr_core_t *core); +void qdr_address_watch_shutdown(qdr_core_t *core) TA_REQ(core_thread_capability); #endif diff --git a/src/router_core/agent.c b/src/router_core/agent.c index d9d61d65e..405bcd7a6 100644 --- a/src/router_core/agent.c +++ b/src/router_core/agent.c @@ -29,10 +29,10 @@ #include "qpid/dispatch/amqp.h" -static void qdr_manage_read_CT(qdr_core_t *core, qdr_action_t *action, bool discard); -static void qdr_manage_create_CT(qdr_core_t *core, qdr_action_t *action, bool discard); -static void qdr_manage_delete_CT(qdr_core_t *core, qdr_action_t *action, bool discard); -static void qdr_manage_update_CT(qdr_core_t *core, qdr_action_t *action, bool discard); +static void qdr_manage_read_CT(qdr_core_t *core, qdr_action_t *action, bool discard) TA_REQ(core_thread_capability); +static void qdr_manage_create_CT(qdr_core_t *core, qdr_action_t *action, bool discard) TA_REQ(core_thread_capability); +static void qdr_manage_delete_CT(qdr_core_t *core, qdr_action_t *action, bool discard) TA_REQ(core_thread_capability); +static void qdr_manage_update_CT(qdr_core_t *core, qdr_action_t *action, bool discard) TA_REQ(core_thread_capability); ALLOC_DECLARE(qdr_query_t); ALLOC_DEFINE(qdr_query_t); @@ -110,10 +110,10 @@ qdr_query_t *qdr_query(qdr_core_t *core, return query; } -static void qdrh_query_get_first_CT(qdr_core_t *core, qdr_action_t *action, bool discard); -static void qdrh_query_get_next_CT(qdr_core_t *core, qdr_action_t *action, bool discard); -static void qdr_agent_emit_columns(qdr_query_t *query, const char *qdr_columns[], int column_count); -static void qdr_agent_set_columns(qdr_query_t *query, qd_parsed_field_t *attribute_names, const char *qdr_columns[], int column_count); +static void qdrh_query_get_first_CT(qdr_core_t *core, qdr_action_t *action, bool discard) TA_REQ(core_thread_capability); +static void qdrh_query_get_next_CT(qdr_core_t *core, qdr_action_t *action, bool discard) TA_REQ(core_thread_capability); +static void qdr_agent_emit_columns(qdr_query_t *query, const char *qdr_columns[], int column_count) TA_REQ(core_thread_capability); +static void qdr_agent_set_columns(qdr_query_t *query, qd_parsed_field_t *attribute_names, const char *qdr_columns[], int column_count) TA_REQ(core_thread_capability); //================================================================================== // Interface Functions @@ -387,7 +387,7 @@ void qdr_manage_handler(qdr_core_t *core, qdr_manage_response_t response_handler // In-Thread Functions //================================================================================== -static void qdr_agent_forbidden(qdr_core_t *core, qdr_query_t *query, bool op_query) +static void qdr_agent_forbidden(qdr_core_t *core, qdr_query_t *query, bool op_query) TA_REQ(core_thread_capability) { query->status = QD_AMQP_FORBIDDEN; if (query->body && !op_query) diff --git a/src/router_core/agent_address.h b/src/router_core/agent_address.h index e25ba6a46..5e055efde 100644 --- a/src/router_core/agent_address.h +++ b/src/router_core/agent_address.h @@ -21,15 +21,14 @@ #include "router_core_private.h" -void qdra_address_get_first_CT(qdr_core_t *core, qdr_query_t *query, int offset); -void qdra_address_get_next_CT(qdr_core_t *core, qdr_query_t *query); +void qdra_address_get_first_CT(qdr_core_t *core, qdr_query_t *query, int offset) TA_REQ(core_thread_capability); +void qdra_address_get_next_CT(qdr_core_t *core, qdr_query_t *query) TA_REQ(core_thread_capability); void qdra_address_get_CT(qdr_core_t *core, qd_iterator_t *name, qd_iterator_t *identity, qdr_query_t *query, - const char *qdr_address_columns[]); - + const char *qdr_address_columns[]) TA_REQ(core_thread_capability); #define QDR_ADDRESS_COLUMN_COUNT 21 diff --git a/src/router_core/agent_config_address.h b/src/router_core/agent_config_address.h index a4d172613..7d77dde0b 100644 --- a/src/router_core/agent_config_address.h +++ b/src/router_core/agent_config_address.h @@ -21,20 +21,20 @@ #include "router_core_private.h" -void qdra_config_address_get_first_CT(qdr_core_t *core, qdr_query_t *query, int offset); -void qdra_config_address_get_next_CT(qdr_core_t *core, qdr_query_t *query); -void qdra_config_address_create_CT(qdr_core_t *core, qd_iterator_t *name, qdr_query_t *query, qd_parsed_field_t *in_body); -void qdra_config_address_update_CT(qdr_core_t *core, qdr_query_t *query, qd_parsed_field_t *in_body); +void qdra_config_address_get_first_CT(qdr_core_t *core, qdr_query_t *query, int offset) TA_REQ(core_thread_capability); +void qdra_config_address_get_next_CT(qdr_core_t *core, qdr_query_t *query) TA_REQ(core_thread_capability); +void qdra_config_address_create_CT(qdr_core_t *core, qd_iterator_t *name, qdr_query_t *query, qd_parsed_field_t *in_body) TA_REQ(core_thread_capability); +void qdra_config_address_update_CT(qdr_core_t *core, qdr_query_t *query, qd_parsed_field_t *in_body) TA_REQ(core_thread_capability); void qdra_config_address_delete_CT(qdr_core_t *core, qdr_query_t *query, qd_iterator_t *name, - qd_iterator_t *identity); + qd_iterator_t *identity) TA_REQ(core_thread_capability); void qdra_config_address_get_CT(qdr_core_t *core, qd_iterator_t *name, qd_iterator_t *identity, qdr_query_t *query, - const char *qdr_config_address_columns[]); + const char *qdr_config_address_columns[]) TA_REQ(core_thread_capability); char *qdra_config_address_validate_pattern_CT(qd_parsed_field_t *pattern_field, bool is_prefix, - const char **error); + const char **error) TA_REQ(core_thread_capability); #define QDR_CONFIG_ADDRESS_COLUMN_COUNT 7 extern const char *qdr_config_address_columns[QDR_CONFIG_ADDRESS_COLUMN_COUNT + 1]; diff --git a/src/router_core/agent_config_auto_link.c b/src/router_core/agent_config_auto_link.c index 8fa36fb21..c7d07b8ae 100644 --- a/src/router_core/agent_config_auto_link.c +++ b/src/router_core/agent_config_auto_link.c @@ -195,7 +195,7 @@ static void qdr_manage_advance_config_auto_link_CT(qdr_query_t *query, qdr_auto_ } -void qdra_config_auto_link_get_first_CT(qdr_core_t *core, qdr_query_t *query, int offset) +void qdra_config_auto_link_get_first_CT(qdr_core_t *core, qdr_query_t *query, int offset) TA_REQ(core_thread_capability) { // // Queries that get this far will always succeed. diff --git a/src/router_core/agent_config_auto_link.h b/src/router_core/agent_config_auto_link.h index 173e9c5ad..e3f69c0ba 100644 --- a/src/router_core/agent_config_auto_link.h +++ b/src/router_core/agent_config_auto_link.h @@ -21,17 +21,17 @@ #include "router_core_private.h" -void qdra_config_auto_link_get_first_CT(qdr_core_t *core, qdr_query_t *query, int offset); -void qdra_config_auto_link_get_next_CT(qdr_core_t *core, qdr_query_t *query); -void qdra_config_auto_link_create_CT(qdr_core_t *core, qd_iterator_t *name, qdr_query_t *query, qd_parsed_field_t *in_body); -void qdra_config_auto_link_update_CT(qdr_core_t *core, qdr_query_t *query, qd_parsed_field_t *in_body); +void qdra_config_auto_link_get_first_CT(qdr_core_t *core, qdr_query_t *query, int offset) TA_REQ(core_thread_capability); +void qdra_config_auto_link_get_next_CT(qdr_core_t *core, qdr_query_t *query) TA_REQ(core_thread_capability); +void qdra_config_auto_link_create_CT(qdr_core_t *core, qd_iterator_t *name, qdr_query_t *query, qd_parsed_field_t *in_body) TA_REQ(core_thread_capability); +void qdra_config_auto_link_update_CT(qdr_core_t *core, qdr_query_t *query, qd_parsed_field_t *in_body) TA_REQ(core_thread_capability); void qdra_config_auto_link_delete_CT(qdr_core_t *core, qdr_query_t *query, qd_iterator_t *name, - qd_iterator_t *identity); + qd_iterator_t *identity) TA_REQ(core_thread_capability); void qdra_config_auto_link_get_CT(qdr_core_t *core, qd_iterator_t *name, qd_iterator_t *identity, qdr_query_t *query, - const char *qdr_config_auto_link_columns[]); + const char *qdr_config_auto_link_columns[]) TA_REQ(core_thread_capability); #define QDR_CONFIG_AUTO_LINK_COLUMN_COUNT 14 extern const char *qdr_config_auto_link_columns[QDR_CONFIG_AUTO_LINK_COLUMN_COUNT + 1]; diff --git a/src/router_core/agent_connection.c b/src/router_core/agent_connection.c index c91bbfb86..ae702b283 100644 --- a/src/router_core/agent_connection.c +++ b/src/router_core/agent_connection.c @@ -120,10 +120,10 @@ static void qd_get_next_pn_data(pn_data_t **data, const char **d, int *d1) break; } } - } +} -static void qdr_connection_insert_column_CT(qdr_core_t *core, qdr_connection_t *conn, int col, qd_composed_field_t *body, bool as_map) +static void qdr_connection_insert_column_CT(qdr_core_t *core, qdr_connection_t *conn, int col, qd_composed_field_t *body, bool as_map) TA_REQ(core_thread_capability) { char id_str[100]; const char *text = 0; @@ -174,7 +174,7 @@ static void qdr_connection_insert_column_CT(qdr_core_t *core, qdr_connection_t * break; case QDR_CONNECTION_SASL_MECHANISMS: - if (conn->connection_info->sasl_mechanisms) + if (conn->connection_info->sasl_mechanisms) qd_compose_insert_string(body, conn->connection_info->sasl_mechanisms); else qd_compose_insert_null(body); @@ -319,7 +319,7 @@ static void qdr_connection_insert_column_CT(qdr_core_t *core, qdr_connection_t * } -static void qdr_agent_write_connection_CT(qdr_core_t *core, qdr_query_t *query, qdr_connection_t *conn) +static void qdr_agent_write_connection_CT(qdr_core_t *core, qdr_query_t *query, qdr_connection_t *conn) TA_REQ(core_thread_capability) { qd_composed_field_t *body = query->body; @@ -427,7 +427,7 @@ void qdra_connection_get_next_CT(qdr_core_t *core, qdr_query_t *query) static void qdr_manage_write_connection_map_CT(qdr_core_t *core, qdr_connection_t *conn, qd_composed_field_t *body, - const char *qdr_connection_columns[]) + const char *qdr_connection_columns[]) TA_REQ(core_thread_capability) { qd_compose_start_map(body); @@ -451,7 +451,7 @@ static qdr_connection_t *_find_conn_CT(qdr_core_t *core, uint64_t conn_id) } -static qdr_connection_t *qdr_connection_find_by_identity_CT(qdr_core_t *core, qd_iterator_t *identity) +static qdr_connection_t *qdr_connection_find_by_identity_CT(qdr_core_t *core, qd_iterator_t *identity) TA_REQ(core_thread_capability) { if (!identity) return 0; @@ -518,7 +518,10 @@ static void qdra_connection_set_bad_request(qdr_query_t *query) } -static void qdra_connection_update_set_status(qdr_core_t *core, qdr_query_t *query, qdr_connection_t *conn, qd_parsed_field_t *admin_state) +static void qdra_connection_update_set_status(qdr_core_t *core, + qdr_query_t *query, + qdr_connection_t *conn, + qd_parsed_field_t *admin_state) TA_REQ(core_thread_capability) { if (conn) { qd_iterator_t *admin_status_iter = qd_parse_raw(admin_state); diff --git a/src/router_core/agent_connection.h b/src/router_core/agent_connection.h index dc57dc5e4..a43a5fa00 100644 --- a/src/router_core/agent_connection.h +++ b/src/router_core/agent_connection.h @@ -21,19 +21,19 @@ #include "router_core_private.h" -void qdra_connection_get_first_CT(qdr_core_t *core, qdr_query_t *query, int offset); -void qdra_connection_get_next_CT(qdr_core_t *core, qdr_query_t *query); +void qdra_connection_get_first_CT(qdr_core_t *core, qdr_query_t *query, int offset) TA_REQ(core_thread_capability); +void qdra_connection_get_next_CT(qdr_core_t *core, qdr_query_t *query) TA_REQ(core_thread_capability); void qdra_connection_get_CT(qdr_core_t *core, qd_iterator_t *name, qd_iterator_t *identity, qdr_query_t *query, - const char *qdr_connection_columns[]); + const char *qdr_connection_columns[]) TA_REQ(core_thread_capability); void qdra_connection_update_CT(qdr_core_t *core, qd_iterator_t *name, qd_iterator_t *identity, qdr_query_t *query, - qd_parsed_field_t *in_body); + qd_parsed_field_t *in_body) TA_REQ(core_thread_capability); #define QDR_CONNECTION_COLUMN_COUNT 25 extern const char *qdr_connection_columns[QDR_CONNECTION_COLUMN_COUNT + 1]; diff --git a/src/router_core/agent_link.h b/src/router_core/agent_link.h index b0ec1a117..705b60e09 100644 --- a/src/router_core/agent_link.h +++ b/src/router_core/agent_link.h @@ -21,13 +21,13 @@ #include "router_core_private.h" -void qdra_link_get_first_CT(qdr_core_t *core, qdr_query_t *query, int offset); -void qdra_link_get_next_CT(qdr_core_t *core, qdr_query_t *query); +void qdra_link_get_first_CT(qdr_core_t *core, qdr_query_t *query, int offset) TA_REQ(core_thread_capability); +void qdra_link_get_next_CT(qdr_core_t *core, qdr_query_t *query) TA_REQ(core_thread_capability); void qdra_link_update_CT(qdr_core_t *core, qd_iterator_t *name, qd_iterator_t *identity, qdr_query_t *query, - qd_parsed_field_t *in_body); + qd_parsed_field_t *in_body) TA_REQ(core_thread_capability); #define QDR_LINK_COLUMN_COUNT 30 diff --git a/src/router_core/agent_router.h b/src/router_core/agent_router.h index 8b2a9c6c9..7337910b8 100644 --- a/src/router_core/agent_router.h +++ b/src/router_core/agent_router.h @@ -25,8 +25,8 @@ extern const char *qdr_router_columns[QDR_ROUTER_COLUMN_COUNT + 1]; -void qdra_router_get_first_CT(qdr_core_t *core, qdr_query_t *query, int offset); -void qdra_router_get_next_CT(qdr_core_t *core, qdr_query_t *query); -void qdra_router_get_next_CT(qdr_core_t *core, qdr_query_t *query); +void qdra_router_get_first_CT(qdr_core_t *core, qdr_query_t *query, int offset) TA_REQ(core_thread_capability); +void qdra_router_get_next_CT(qdr_core_t *core, qdr_query_t *query) TA_REQ(core_thread_capability); +void qdra_router_get_next_CT(qdr_core_t *core, qdr_query_t *query) TA_REQ(core_thread_capability); #endif diff --git a/src/router_core/connections.c b/src/router_core/connections.c index 093b21cd0..7957d8a24 100644 --- a/src/router_core/connections.c +++ b/src/router_core/connections.c @@ -31,13 +31,13 @@ #include #include -static void qdr_connection_opened_CT(qdr_core_t *core, qdr_action_t *action, bool discard); -static void qdr_connection_closed_CT(qdr_core_t *core, qdr_action_t *action, bool discard); -static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *action, bool discard); -static void qdr_link_inbound_second_attach_CT(qdr_core_t *core, qdr_action_t *action, bool discard); -static void qdr_link_inbound_detach_CT(qdr_core_t *core, qdr_action_t *action, bool discard); -static void qdr_link_detach_sent_CT(qdr_core_t *core, qdr_action_t *action, bool discard); -static void qdr_link_processing_complete_CT(qdr_core_t *core, qdr_action_t *action, bool discard); +static void qdr_connection_opened_CT(qdr_core_t *core, qdr_action_t *action, bool discard) TA_REQ(core_thread_capability); +static void qdr_connection_closed_CT(qdr_core_t *core, qdr_action_t *action, bool discard) TA_REQ(core_thread_capability); +static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *action, bool discard) TA_REQ(core_thread_capability); +static void qdr_link_inbound_second_attach_CT(qdr_core_t *core, qdr_action_t *action, bool discard) TA_REQ(core_thread_capability); +static void qdr_link_inbound_detach_CT(qdr_core_t *core, qdr_action_t *action, bool discard) TA_REQ(core_thread_capability); +static void qdr_link_detach_sent_CT(qdr_core_t *core, qdr_action_t *action, bool discard) TA_REQ(core_thread_capability); +static void qdr_link_processing_complete_CT(qdr_core_t *core, qdr_action_t *action, bool discard) TA_REQ(core_thread_capability); static void qdr_link_detach_sent(qdr_link_t *link); static void qdr_link_processing_complete(qdr_core_t *core, qdr_link_t *link); static void qdr_connection_group_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn); @@ -312,6 +312,7 @@ void qdr_close_connection_CT(qdr_core_t *core, qdr_connection_t *conn) static void qdr_core_close_connection_CT(qdr_core_t *core, qdr_action_t *action, bool discard) + TA_REQ(core_thread_capability) { qdr_connection_t *conn = safe_deref_qdr_connection_t(action->args.connection.conn); @@ -1044,6 +1045,7 @@ static void qdr_link_abort_undelivered_CT(qdr_core_t *core, qdr_link_t *link) static void qdr_link_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_link_t *link, const char *log_text) + TA_REQ(core_thread_capability) { // // Remove the link from the overall list of links and possibly the streaming @@ -1159,6 +1161,7 @@ static void qdr_link_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_li static void qdr_link_cleanup_protected_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_link_t *link, const char *label) + TA_REQ(core_thread_capability) { bool do_cleanup = false; @@ -1852,6 +1855,7 @@ static void qdr_attach_link_control_CT(qdr_core_t *core, qdr_connection_t *conn, static void qdr_detach_link_control_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_link_t *link) + TA_REQ(core_thread_capability) { if (conn->role == QDR_ROLE_INTER_ROUTER) { qdr_del_link_ref(&core->hello_addr->rlinks, link, QDR_LINK_LIST_CLASS_ADDRESS); @@ -1895,7 +1899,8 @@ static void qdr_detach_link_data_CT(qdr_core_t *core, qdr_connection_t *conn, qd } -static void qdr_attach_link_downlink_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_link_t *link, qdr_terminus_t *source) +static void qdr_attach_link_downlink_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_link_t *link, + qdr_terminus_t *source) TA_REQ(core_thread_capability) { qdr_address_t *addr; qd_iterator_t *iter = qd_iterator_dup(qdr_terminus_get_address(source)); @@ -1916,7 +1921,7 @@ static void qdr_attach_link_downlink_CT(qdr_core_t *core, qdr_connection_t *conn // move dlv to new link. -static void qdr_link_process_initial_delivery_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_t *dlv) +static void qdr_link_process_initial_delivery_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_t *dlv) TA_REQ(core_thread_capability) { // // Remove the delivery from its current link if needed diff --git a/src/router_core/core_client_api.c b/src/router_core/core_client_api.c index 262343d76..29d6445b4 100644 --- a/src/router_core/core_client_api.c +++ b/src/router_core/core_client_api.c @@ -87,43 +87,41 @@ struct qdrc_client_t { ALLOC_DECLARE(qdrc_client_t); ALLOC_DEFINE(qdrc_client_t); - static void _send_request_CT(qdrc_client_t *client, - qdrc_client_request_t *req); -static void _flush_send_queue_CT(qdrc_client_t *client); + qdrc_client_request_t *req) TA_REQ(core_thread_capability); +static void _flush_send_queue_CT(qdrc_client_t *client) TA_REQ(core_thread_capability); -static void _state_updated_CT(qdrc_client_t *client); +static void _state_updated_CT(qdrc_client_t *client) TA_REQ(core_thread_capability); static void _sender_second_attach_CT(void *client_context, qdr_terminus_t *remote_source, - qdr_terminus_t *remote_target); + qdr_terminus_t *remote_target) TA_REQ(core_thread_capability); static void _receiver_second_attach_CT(void *client_context, qdr_terminus_t *remote_source, - qdr_terminus_t *remote_target); + qdr_terminus_t *remote_target) TA_REQ(core_thread_capability); static void _sender_flow_CT(void *client_context, int available_credit, - bool drain); + bool drain) TA_REQ(core_thread_capability); static void _sender_update_CT(void *client_context, qdr_delivery_t *delivery, bool settled, - uint64_t disposition); + uint64_t disposition) TA_REQ(core_thread_capability); static void _receiver_transfer_CT(void *client_context, qdr_delivery_t *delivery, - qd_message_t *message); + qd_message_t *message) TA_REQ(core_thread_capability); static void _sender_detached_CT(void *client_context, - qdr_error_t *error); + qdr_error_t *error) TA_REQ(core_thread_capability); static void _receiver_detached_CT(void *client_context, - qdr_error_t *error); -static void _sender_cleanup_CT(void *client_context); -static void _receiver_cleanup_CT(void *client_context); + qdr_error_t *error) TA_REQ(core_thread_capability); +static void _sender_cleanup_CT(void *client_context) TA_REQ(core_thread_capability); +static void _receiver_cleanup_CT(void *client_context) TA_REQ(core_thread_capability); static void _free_request_CT(qdrc_client_t *client, qdrc_client_request_t *req, - const char *error); + const char *error) TA_REQ(core_thread_capability); static qd_message_t *_create_message_CT(qdrc_client_t *client, - qdrc_client_request_t *req); -static void _timer_expired(qdr_core_t *core, void *context); - + qdrc_client_request_t *req) TA_REQ(core_thread_capability); +static void _timer_expired(qdr_core_t *core, void *context) TA_REQ(core_thread_capability); static qdrc_endpoint_desc_t sender_endpoint = { .label = "core client - sender", diff --git a/src/router_core/core_client_api.h b/src/router_core/core_client_api.h index d737fe48e..75b95533a 100644 --- a/src/router_core/core_client_api.h +++ b/src/router_core/core_client_api.h @@ -124,7 +124,7 @@ qdrc_client_t *qdrc_client_CT(qdr_core_t *core, uint32_t credit_window, void *user_context, qdrc_client_on_state_CT_t on_state_cb, - qdrc_client_on_flow_CT_t on_flow_cb); + qdrc_client_on_flow_CT_t on_flow_cb) TA_REQ(core_thread_capability); /** @@ -132,7 +132,7 @@ qdrc_client_t *qdrc_client_CT(qdr_core_t *core, * * @param client - as returned by qdrc_client_CT() */ -void qdrc_client_free_CT(qdrc_client_t *client); +void qdrc_client_free_CT(qdrc_client_t *client) TA_REQ(core_thread_capability); /** @@ -161,6 +161,6 @@ int qdrc_client_request_CT(qdrc_client_t *client, uint32_t timeout, qdrc_client_on_reply_CT_t on_reply_cb, qdrc_client_on_ack_CT_t on_ack_cb, - qdrc_client_request_done_CT_t done_cb); + qdrc_client_request_done_CT_t done_cb) TA_REQ(core_thread_capability); #endif // #define qd_router_core_client_api_h 1 diff --git a/src/router_core/core_link_endpoint.c b/src/router_core/core_link_endpoint.c index 8c4e83ebe..07adcb892 100644 --- a/src/router_core/core_link_endpoint.c +++ b/src/router_core/core_link_endpoint.c @@ -37,7 +37,7 @@ ALLOC_DEFINE(qdrc_endpoint_t); void qdrc_endpoint_bind_mobile_address_CT(qdr_core_t *core, const char *address, qdrc_endpoint_desc_t *desc, - void *bind_context) + void *bind_context) TA_REQ(core_thread_capability) { qdr_address_t *addr = 0; qd_iterator_t *iter = qd_iterator_string(address, ITER_VIEW_ADDRESS_HASH); @@ -67,7 +67,7 @@ qdrc_endpoint_t *qdrc_endpoint_create_link_CT(qdr_core_t *core, qdr_terminus_t *source, qdr_terminus_t *target, qdrc_endpoint_desc_t *desc, - void *link_context) + void *link_context) TA_REQ(core_thread_capability) { qdrc_endpoint_t *ep = new_qdrc_endpoint_t(); @@ -92,13 +92,14 @@ qdr_connection_t *qdrc_endpoint_get_connection_CT(qdrc_endpoint_t *ep) } -void qdrc_endpoint_second_attach_CT(qdr_core_t *core, qdrc_endpoint_t *ep, qdr_terminus_t *source, qdr_terminus_t *target) +void qdrc_endpoint_second_attach_CT(qdr_core_t *core, qdrc_endpoint_t *ep, qdr_terminus_t *source, + qdr_terminus_t *target) TA_REQ(core_thread_capability) { qdr_link_outbound_second_attach_CT(core, ep->link, source, target); } -void qdrc_endpoint_detach_CT(qdr_core_t *core, qdrc_endpoint_t *ep, qdr_error_t *error) +void qdrc_endpoint_detach_CT(qdr_core_t *core, qdrc_endpoint_t *ep, qdr_error_t *error) TA_REQ(core_thread_capability) { qdr_link_outbound_detach_CT(core, ep->link, error, QDR_CONDITION_NONE, true); if (ep->link->detach_count == 2) { @@ -107,13 +108,14 @@ void qdrc_endpoint_detach_CT(qdr_core_t *core, qdrc_endpoint_t *ep, qdr_error_t } -void qdrc_endpoint_flow_CT(qdr_core_t *core, qdrc_endpoint_t *ep, int credit, bool drain) +void qdrc_endpoint_flow_CT(qdr_core_t *core, qdrc_endpoint_t *ep, int credit, bool drain) TA_REQ(core_thread_capability) { qdr_link_issue_credit_CT(core, ep->link, credit, drain); } void qdrc_endpoint_send_CT(qdr_core_t *core, qdrc_endpoint_t *ep, qdr_delivery_t *dlv, bool presettled) + TA_REQ(core_thread_capability) { set_safe_ptr_qdr_link_t(ep->link, &dlv->link_sp); dlv->settled = presettled; @@ -151,7 +153,7 @@ qdr_delivery_t *qdrc_endpoint_delivery_CT(qdr_core_t *core, qdrc_endpoint_t *end } -void qdrc_endpoint_settle_CT(qdr_core_t *core, qdr_delivery_t *dlv, uint64_t disposition) +void qdrc_endpoint_settle_CT(qdr_core_t *core, qdr_delivery_t *dlv, uint64_t disposition) TA_REQ(core_thread_capability) { // // Set the new delivery state @@ -214,6 +216,7 @@ void qdrc_endpoint_do_flow_CT(qdr_core_t *core, qdrc_endpoint_t *ep, int credit, void qdrc_endpoint_do_detach_CT(qdr_core_t *core, qdrc_endpoint_t *ep, qdr_error_t *error, qd_detach_type_t dt) + TA_REQ(core_thread_capability) { if (dt == QD_LOST) { qdrc_endpoint_do_cleanup_CT(core, ep); diff --git a/src/router_core/delivery.c b/src/router_core/delivery.c index fec10146a..0c3e34c2e 100644 --- a/src/router_core/delivery.c +++ b/src/router_core/delivery.c @@ -24,15 +24,15 @@ ALLOC_DEFINE(qdr_delivery_t); -static void qdr_update_delivery_CT(qdr_core_t *core, qdr_action_t *action, bool discard); -static void qdr_delete_delivery_CT(qdr_core_t *core, qdr_action_t *action, bool discard); -static void qdr_delivery_continue_CT(qdr_core_t *core, qdr_action_t *action, bool discard); -static void qdr_delete_delivery_internal_CT(qdr_core_t *core, qdr_delivery_t *delivery); +static void qdr_update_delivery_CT(qdr_core_t *core, qdr_action_t *action, bool discard) TA_REQ(core_thread_capability); +static void qdr_delete_delivery_CT(qdr_core_t *core, qdr_action_t *action, bool discard) TA_REQ(core_thread_capability); +static void qdr_delivery_continue_CT(qdr_core_t *core, qdr_action_t *action, bool discard) TA_REQ(core_thread_capability); +static void qdr_delete_delivery_internal_CT(qdr_core_t *core, qdr_delivery_t *delivery) TA_REQ(core_thread_capability); static void qdr_delivery_anycast_propagate_CT(qdr_core_t *core, qdr_delivery_t *dlv, - qdr_delivery_t *peer, bool settled); -static void qdr_delivery_anycast_reforward_CT(qdr_core_t *core, qdr_delivery_t *dlv, qdr_delivery_t *peer); + qdr_delivery_t *peer, bool settled) TA_REQ(core_thread_capability); +static void qdr_delivery_anycast_reforward_CT(qdr_core_t *core, qdr_delivery_t *dlv, qdr_delivery_t *peer) TA_REQ(core_thread_capability); static bool qdr_delivery_set_remote_delivery_state_CT(qdr_delivery_t *dlv, uint64_t dispo, - qd_delivery_state_t *dstate); + qd_delivery_state_t *dstate) TA_REQ(core_thread_capability); void qdr_delivery_set_context(qdr_delivery_t *delivery, void *context) @@ -447,7 +447,7 @@ void qdr_delivery_increment_counters_CT(qdr_core_t *core, qdr_delivery_t *delive } -static void qdr_delete_delivery_internal_CT(qdr_core_t *core, qdr_delivery_t *delivery) +static void qdr_delete_delivery_internal_CT(qdr_core_t *core, qdr_delivery_t *delivery) TA_REQ(core_thread_capability) { assert(sys_atomic_get(&delivery->ref_count) == 0); @@ -1000,7 +1000,7 @@ void qdr_delivery_mcast_inbound_update_CT(qdr_core_t *core, qdr_delivery_t *in_d // return: true if in_dlv has been settled // static bool qdr_delivery_mcast_outbound_settled_CT(qdr_core_t *core, qdr_delivery_t *in_dlv, - qdr_delivery_t *out_dlv, bool *moved) + qdr_delivery_t *out_dlv, bool *moved) TA_REQ(core_thread_capability) { bool push = false; *moved = false; @@ -1051,7 +1051,7 @@ static bool qdr_delivery_mcast_outbound_settled_CT(qdr_core_t *core, qdr_deliver // returns true if dlv disposition has been updated // static bool qdr_delivery_mcast_outbound_disposition_CT(qdr_core_t *core, qdr_delivery_t *in_dlv, - qdr_delivery_t *out_dlv, uint64_t new_disp) + qdr_delivery_t *out_dlv, uint64_t new_disp) TA_REQ(core_thread_capability) { // The AMQP 1.0 spec does not define a way to propagate disposition // back to the sender in the case of unsettled multicast. In the diff --git a/src/router_core/delivery.h b/src/router_core/delivery.h index 2f1c324cf..0b1b97ef2 100644 --- a/src/router_core/delivery.h +++ b/src/router_core/delivery.h @@ -155,30 +155,30 @@ qdr_delivery_t *qdr_delivery_continue(qdr_core_t *core, qdr_delivery_t *delivery /* update settlement and/or disposition and schedule I/O processing */ -void qdr_delivery_release_CT(qdr_core_t *core, qdr_delivery_t *delivery); -void qdr_delivery_reject_CT(qdr_core_t *core, qdr_delivery_t *delivery, qdr_error_t *error); -void qdr_delivery_failed_CT(qdr_core_t *core, qdr_delivery_t *delivery); -bool qdr_delivery_settled_CT(qdr_core_t *core, qdr_delivery_t *delivery); +void qdr_delivery_release_CT(qdr_core_t *core, qdr_delivery_t *delivery) TA_REQ(core_thread_capability); +void qdr_delivery_reject_CT(qdr_core_t *core, qdr_delivery_t *delivery, qdr_error_t *error) TA_REQ(core_thread_capability); +void qdr_delivery_failed_CT(qdr_core_t *core, qdr_delivery_t *delivery) TA_REQ(core_thread_capability); +bool qdr_delivery_settled_CT(qdr_core_t *core, qdr_delivery_t *delivery) TA_REQ(core_thread_capability); /* add dlv to links list of updated deliveries and schedule I/O thread processing */ -void qdr_delivery_push_CT(qdr_core_t *core, qdr_delivery_t *dlv); +void qdr_delivery_push_CT(qdr_core_t *core, qdr_delivery_t *dlv) TA_REQ(core_thread_capability); /* optimized decref for core thread */ -void qdr_delivery_decref_CT(qdr_core_t *core, qdr_delivery_t *delivery, const char *label); +void qdr_delivery_decref_CT(qdr_core_t *core, qdr_delivery_t *delivery, const char *label) TA_REQ(core_thread_capability); /* peer delivery list management*/ -void qdr_delivery_link_peers_CT(qdr_delivery_t *in_dlv, qdr_delivery_t *out_dlv); -void qdr_delivery_unlink_peers_CT(qdr_core_t *core, qdr_delivery_t *dlv, qdr_delivery_t *peer); +void qdr_delivery_link_peers_CT(qdr_delivery_t *in_dlv, qdr_delivery_t *out_dlv) TA_REQ(core_thread_capability); +void qdr_delivery_unlink_peers_CT(qdr_core_t *core, qdr_delivery_t *dlv, qdr_delivery_t *peer) TA_REQ(core_thread_capability); /* peer iterator - warning: not reentrant! */ -qdr_delivery_t *qdr_delivery_first_peer_CT(qdr_delivery_t *dlv); -qdr_delivery_t *qdr_delivery_next_peer_CT(qdr_delivery_t *dlv); +qdr_delivery_t *qdr_delivery_first_peer_CT(qdr_delivery_t *dlv) TA_REQ(core_thread_capability); +qdr_delivery_t *qdr_delivery_next_peer_CT(qdr_delivery_t *dlv) TA_REQ(core_thread_capability); /* schedules all peer deliveries with work for I/O processing */ -void qdr_delivery_continue_peers_CT(qdr_core_t *core, qdr_delivery_t *in_dlv, bool more); +void qdr_delivery_continue_peers_CT(qdr_core_t *core, qdr_delivery_t *in_dlv, bool more) TA_REQ(core_thread_capability); /* update the links counters with respect to its delivery */ -void qdr_delivery_increment_counters_CT(qdr_core_t *core, qdr_delivery_t *delivery); +void qdr_delivery_increment_counters_CT(qdr_core_t *core, qdr_delivery_t *delivery) TA_REQ(core_thread_capability); /** * multicast delivery state and settlement management @@ -186,16 +186,16 @@ void qdr_delivery_increment_counters_CT(qdr_core_t *core, qdr_delivery_t *delive // remote updated disposition/settlement for incoming delivery void qdr_delivery_mcast_inbound_update_CT(qdr_core_t *core, qdr_delivery_t *in_dlv, - uint64_t new_disp, bool settled); + uint64_t new_disp, bool settled) TA_REQ(core_thread_capability); // remote update disposition/settlement for outgoing delivery void qdr_delivery_mcast_outbound_update_CT(qdr_core_t *core, qdr_delivery_t *in_dlv, qdr_delivery_t *out_peer, - uint64_t new_disp, bool settled); + uint64_t new_disp, bool settled) TA_REQ(core_thread_capability); // number of unsettled peer (outbound) deliveries for in_dlv -int qdr_delivery_peer_count_CT(const qdr_delivery_t *in_dlv); +int qdr_delivery_peer_count_CT(const qdr_delivery_t *in_dlv) TA_REQ(core_thread_capability); // Return the number of outbound paths that have been invalidated due to releases -int qdr_delivery_invalidated_path_count_CT(const qdr_delivery_t *dlv); +int qdr_delivery_invalidated_path_count_CT(const qdr_delivery_t *dlv) TA_REQ(core_thread_capability); #endif // __delivery_h__ diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c index 3df8c0fbc..4f452959e 100644 --- a/src/router_core/forwarder.c +++ b/src/router_core/forwarder.c @@ -70,7 +70,7 @@ static inline qdr_link_t *peer_router_data_link(qdr_core_t *core, // Get an idle anonymous link for a streaming message. This link will come from // either the connection's free link pool or it will be dynamically created on // the given connection. -static inline qdr_link_t *get_outgoing_streaming_link(qdr_core_t *core, qdr_connection_t *base_conn, qdr_link_t *base_link) +static inline qdr_link_t *get_outgoing_streaming_link(qdr_core_t *core, qdr_connection_t *base_conn, qdr_link_t *base_link) TA_REQ(core_thread_capability) { qdr_connection_t *conn; if (!base_conn) return 0; @@ -220,7 +220,7 @@ qdr_delivery_t *qdr_forward_new_delivery_CT(qdr_core_t *core, qdr_delivery_t *in // Drop all pre-settled deliveries pending on the link's // undelivered list. // -static void qdr_forward_drop_presettled_CT_LH(qdr_core_t *core, qdr_link_t *link) TA_REQ(link->conn->work_lock) +static void qdr_forward_drop_presettled_CT_LH(qdr_core_t *core, qdr_link_t *link) TA_REQ(link->conn->work_lock) TA_REQ(core_thread_capability) { qdr_delivery_t *dlv = DEQ_HEAD(link->undelivered); @@ -339,6 +339,7 @@ void qdr_forward_deliver_CT(qdr_core_t *core, qdr_link_t *out_link, qdr_delivery static void qdr_settle_subscription_delivery_CT(qdr_core_t *core, qdr_action_t *action, bool discard) + TA_REQ(core_thread_capability) { qdr_delivery_t *in_delivery = action->args.delivery.delivery; @@ -522,7 +523,7 @@ static inline bool qdr_invalidated_link_CT(qdr_delivery_t *in_dlv, qdr_link_t *o /** * Handle forwarding to a subscription */ -static void qdr_forward_to_subscriber_CT(qdr_core_t *core, qdr_subscription_t *sub, qdr_delivery_t *in_dlv, qd_message_t *in_msg, bool receive_complete) +static void qdr_forward_to_subscriber_CT(qdr_core_t *core, qdr_subscription_t *sub, qdr_delivery_t *in_dlv, qd_message_t *in_msg, bool receive_complete) TA_REQ(core_thread_capability) { qd_message_add_fanout(in_msg); @@ -551,7 +552,7 @@ int qdr_forward_multicast_CT(qdr_core_t *core, qd_message_t *msg, qdr_delivery_t *in_delivery, bool exclude_inprocess, - bool control) + bool control) TA_REQ(core_thread_capability) { bool bypass_valid_origins = addr->forwarder->bypass_valid_origins; int fanout = 0; @@ -716,7 +717,7 @@ int qdr_forward_closest_CT(qdr_core_t *core, qd_message_t *msg, qdr_delivery_t *in_delivery, bool exclude_inprocess, - bool control) + bool control) TA_REQ(core_thread_capability) { const bool receive_complete = qd_message_receive_complete(msg); // @@ -900,7 +901,7 @@ int qdr_forward_balanced_CT(qdr_core_t *core, qd_message_t *msg, qdr_delivery_t *in_delivery, bool exclude_inprocess, - bool control) + bool control) TA_REQ(core_thread_capability) { // // Control messages should never use balanced treatment. diff --git a/src/router_core/management_agent.c b/src/router_core/management_agent.c index dfcc3e67e..e3eefd23f 100644 --- a/src/router_core/management_agent.c +++ b/src/router_core/management_agent.c @@ -28,6 +28,7 @@ #include "qpid/dispatch/parse.h" #include "qpid/dispatch/router.h" #include "qpid/dispatch/router_core.h" +#include "qpid/dispatch/internal/thread_annotations.h" #include @@ -235,14 +236,13 @@ static void qd_manage_response_handler(void *context, const qd_amqp_error_t *sta free_qd_management_context_t(ctx); } - static void qd_core_agent_query_handler(qdr_core_t *core, qd_router_entity_type_t entity_type, qd_router_operation_type_t operation_type, qd_message_t *msg, int *count, int *offset, - uint64_t in_conn) + uint64_t in_conn) TA_REQ(core_thread_capability) { // // Add the Body. @@ -486,7 +486,7 @@ static bool qd_can_handle_request(qd_parsed_field_t *properties_fld, * */ uint64_t qdr_management_agent_on_message(void *context, qd_message_t *msg, int unused_link_id, int unused_cost, - uint64_t in_conn_id, const qd_policy_spec_t *policy_spec, qdr_error_t **error) + uint64_t in_conn_id, const qd_policy_spec_t *policy_spec, qdr_error_t **error) TA_REQ(core_thread_capability) { qdr_core_t *core = (qdr_core_t*) context; qd_iterator_t *app_properties_iter = qd_message_field_iterator(msg, QD_FIELD_APPLICATION_PROPERTIES); diff --git a/src/router_core/modules/address_lookup_client/address_lookup_client.c b/src/router_core/modules/address_lookup_client/address_lookup_client.c index 8ee8e369c..290634b22 100644 --- a/src/router_core/modules/address_lookup_client/address_lookup_client.c +++ b/src/router_core/modules/address_lookup_client/address_lookup_client.c @@ -102,7 +102,7 @@ static qdr_address_t *qdr_lookup_terminus_address_CT(qdr_core_t *core, bool create_if_not_found, bool accept_dynamic, bool *unavailable, - bool *core_endpoint) + bool *core_endpoint) TA_REQ(core_thread_capability) { qdr_address_t *addr = 0; @@ -216,7 +216,7 @@ static void qdr_link_react_to_first_attach_CT(qdr_core_t *core, qdr_terminus_t *source, // must free when done qdr_terminus_t *target, // must free when done bool unavailable, - bool core_endpoint) + bool core_endpoint) TA_REQ(core_thread_capability) { if (core_endpoint) { qdrc_endpoint_do_bound_attach_CT(core, addr, link, source, target); @@ -283,7 +283,7 @@ static void qcm_addr_lookup_CT(void *context, qdr_link_t *link, qd_direction_t dir, qdr_terminus_t *source, - qdr_terminus_t *target) + qdr_terminus_t *target) TA_REQ(core_thread_capability) { qcm_lookup_client_t *client = (qcm_lookup_client_t*) context; bool unavailable; diff --git a/src/router_core/modules/edge_addr_tracking/edge_addr_tracking.c b/src/router_core/modules/edge_addr_tracking/edge_addr_tracking.c index 6b76ab4eb..bba0f10ab 100644 --- a/src/router_core/modules/edge_addr_tracking/edge_addr_tracking.c +++ b/src/router_core/modules/edge_addr_tracking/edge_addr_tracking.c @@ -171,11 +171,11 @@ static void qdrc_address_endpoint_cleanup(void *link_context) * @brief Determine if deliveries to the specified address coming in on the specified connection * can be successfully forwarded to a valid destination. A destination is valid if it is not * on the same edge-mesh as the delivery's origin. - * - * @param addr - * @param in_conn - * @return true - * @return false + * + * @param addr + * @param in_conn + * @return true + * @return false */ static bool qdrc_can_send_address(qdr_address_t *addr, qdr_connection_t *in_conn) { @@ -291,7 +291,7 @@ static void qdrc_check_edge_peers(qdr_core_t *core, qdr_address_t *addr) } -static void on_addr_event(void *context, qdrc_event_t event, qdr_address_t *addr) +static void on_addr_event(void *context, qdrc_event_t event, qdr_address_t *addr) TA_REQ(core_thread_capability) { // We only care about mobile addresses. if (!qdr_address_is_mobile_CT(addr)) @@ -341,7 +341,7 @@ static void on_addr_event(void *context, qdrc_event_t event, qdr_address_t *addr } } -static void on_link_event(void *context, qdrc_event_t event, qdr_link_t *link) +static void on_link_event(void *context, qdrc_event_t event, qdr_link_t *link) TA_REQ(core_thread_capability) { switch (event) { case QDRC_EVENT_LINK_EDGE_DATA_ATTACHED : @@ -364,7 +364,7 @@ static void on_link_event(void *context, qdrc_event_t event, qdr_link_t *link) } break; } - + case QDRC_EVENT_LINK_EDGE_DATA_DETACHED : { if (link->edge_context) { diff --git a/src/router_core/modules/edge_router/addr_proxy.c b/src/router_core/modules/edge_router/addr_proxy.c index 4d7e228c0..1d9bf21df 100644 --- a/src/router_core/modules/edge_router/addr_proxy.c +++ b/src/router_core/modules/edge_router/addr_proxy.c @@ -95,7 +95,7 @@ static qdr_terminus_t *qdr_terminus_normal(const char *addr) } -static void add_inlink(qcm_edge_addr_proxy_t *ap, const char *key, qdr_address_t *addr) +static void add_inlink(qcm_edge_addr_proxy_t *ap, const char *key, qdr_address_t *addr) TA_REQ(core_thread_capability) { qdr_link_t *edge_inlink = safe_deref_qdr_link_t(addr->edge_inlink_sp); if (edge_inlink == 0) { @@ -111,7 +111,7 @@ static void add_inlink(qcm_edge_addr_proxy_t *ap, const char *key, qdr_address_t } -static void del_inlink(qcm_edge_addr_proxy_t *ap, qdr_address_t *addr) +static void del_inlink(qcm_edge_addr_proxy_t *ap, qdr_address_t *addr) TA_REQ(core_thread_capability) { qdr_link_t *link = safe_deref_qdr_link_t(addr->edge_inlink_sp); if (link) { @@ -122,7 +122,7 @@ static void del_inlink(qcm_edge_addr_proxy_t *ap, qdr_address_t *addr) } -static void add_outlink(qcm_edge_addr_proxy_t *ap, const char *key, qdr_address_t *addr) +static void add_outlink(qcm_edge_addr_proxy_t *ap, const char *key, qdr_address_t *addr) TA_REQ(core_thread_capability) { qdr_link_t *edge_outlink = safe_deref_qdr_link_t(addr->edge_outlink_sp); if (edge_outlink == 0 && DEQ_SIZE(addr->subscriptions) == 0) { @@ -142,7 +142,7 @@ static void add_outlink(qcm_edge_addr_proxy_t *ap, const char *key, qdr_address_ } -static void del_outlink(qcm_edge_addr_proxy_t *ap, qdr_address_t *addr) +static void del_outlink(qcm_edge_addr_proxy_t *ap, qdr_address_t *addr) TA_REQ(core_thread_capability) { qdr_link_t *link = safe_deref_qdr_link_t(addr->edge_outlink_sp); if (link) { @@ -154,6 +154,7 @@ static void del_outlink(qcm_edge_addr_proxy_t *ap, qdr_address_t *addr) static void proxy_addr_on_inter_edge_connection(qcm_edge_addr_proxy_t *ap, qdr_address_t *addr, qdr_connection_t *conn) + TA_REQ(core_thread_capability) { const char *key = (const char*) qd_hash_key_by_handle(addr->hash_handle); qdr_terminus_t *term = qdr_terminus_normal(key + 1); @@ -167,6 +168,7 @@ static void proxy_addr_on_inter_edge_connection(qcm_edge_addr_proxy_t *ap, qdr_a static void proxy_addr_on_all_inter_edge_connections(qcm_edge_addr_proxy_t *ap, qdr_address_t *addr) + TA_REQ(core_thread_capability) { qdr_edge_peer_t *edge_peer = DEQ_HEAD(ap->core->edge_peers); while (!!edge_peer) { @@ -176,7 +178,7 @@ static void proxy_addr_on_all_inter_edge_connections(qcm_edge_addr_proxy_t *ap, } -static void remove_proxies_for_addr(qcm_edge_addr_proxy_t *ap, qdr_address_t *addr) +static void remove_proxies_for_addr(qcm_edge_addr_proxy_t *ap, qdr_address_t *addr) TA_REQ(core_thread_capability) { qdr_link_ref_t *ref = DEQ_HEAD(addr->inlinks); while (!!ref) { @@ -192,6 +194,7 @@ static void remove_proxies_for_addr(qcm_edge_addr_proxy_t *ap, qdr_address_t *ad static void on_inter_edge_connection_opened(qcm_edge_addr_proxy_t *ap, qdr_connection_t *conn) + TA_REQ(core_thread_capability) { qdr_address_t *addr = DEQ_HEAD(ap->core->addrs); while (!!addr) { @@ -254,7 +257,7 @@ static void on_link_event(void *context, qdrc_event_t event, qdr_link_t *link) } -static void on_conn_event(void *context, qdrc_event_t event, qdr_connection_t *conn) +static void on_conn_event(void *context, qdrc_event_t event, qdr_connection_t *conn) TA_REQ(core_thread_capability) { qcm_edge_addr_proxy_t *ap = (qcm_edge_addr_proxy_t*) context; @@ -368,7 +371,7 @@ static void on_conn_event(void *context, qdrc_event_t event, qdr_connection_t *c } -static void on_addr_event(void *context, qdrc_event_t event, qdr_address_t *addr) +static void on_addr_event(void *context, qdrc_event_t event, qdr_address_t *addr) TA_REQ(core_thread_capability) { qcm_edge_addr_proxy_t *ap = (qcm_edge_addr_proxy_t*) context; @@ -460,7 +463,7 @@ static void on_second_attach(void *link_context, static void on_transfer(void *link_context, qdr_delivery_t *dlv, - qd_message_t *msg) + qd_message_t *msg) TA_REQ(core_thread_capability) { qcm_edge_addr_proxy_t *ap = (qcm_edge_addr_proxy_t*) link_context; uint64_t dispo = PN_ACCEPTED; @@ -536,7 +539,7 @@ static void on_cleanup(void *link_context) } -qcm_edge_addr_proxy_t *qcm_edge_addr_proxy(qdr_core_t *core) +qcm_edge_addr_proxy_t *qcm_edge_addr_proxy(qdr_core_t *core) TA_REQ(core_thread_capability) { qcm_edge_addr_proxy_t *ap = NEW(qcm_edge_addr_proxy_t); diff --git a/src/router_core/modules/edge_router/connection_manager.c b/src/router_core/modules/edge_router/connection_manager.c index 14994987b..eccb53e5a 100644 --- a/src/router_core/modules/edge_router/connection_manager.c +++ b/src/router_core/modules/edge_router/connection_manager.c @@ -63,7 +63,7 @@ static qdr_edge_peer_t *qdr_find_edge_peer_CT(qdr_core_t *core, const char *cont } -static void qdr_inter_edge_peer_activate_CT(qdr_core_t *core, qdr_edge_peer_t *edge_peer) +static void qdr_inter_edge_peer_activate_CT(qdr_core_t *core, qdr_edge_peer_t *edge_peer) TA_REQ(core_thread_capability) { edge_peer->router_addr = qdr_add_local_address_CT(core, QD_ITER_HASH_PREFIX_EDGE_SUMMARY, edge_peer->identity, QD_TREATMENT_ANYCAST_CLOSEST); qdr_link_t *link = qdr_create_link_CT(core, edge_peer->primary_conn, QD_LINK_INTER_EDGE, QD_OUTGOING, @@ -72,7 +72,7 @@ static void qdr_inter_edge_peer_activate_CT(qdr_core_t *core, qdr_edge_peer_t *e } -static void qdr_inter_edge_connection_setup_CT(qdr_core_t *core, qdr_connection_t *conn) +static void qdr_inter_edge_connection_setup_CT(qdr_core_t *core, qdr_connection_t *conn) TA_REQ(core_thread_capability) { qdr_edge_peer_t *edge_peer = qdr_find_edge_peer_CT(core, conn->connection_info->container); @@ -105,7 +105,7 @@ static void qdr_inter_edge_connection_setup_CT(qdr_core_t *core, qdr_connection_ } -static void qdr_inter_edge_connection_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn) +static void qdr_inter_edge_connection_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn) TA_REQ(core_thread_capability) { qdr_edge_peer_t *edge_peer = conn->edge_peer; @@ -132,7 +132,7 @@ static void qdr_inter_edge_connection_cleanup_CT(qdr_core_t *core, qdr_connectio } -static void on_conn_event(void *context, qdrc_event_t event, qdr_connection_t *conn) +static void on_conn_event(void *context, qdrc_event_t event, qdr_connection_t *conn) TA_REQ(core_thread_capability) { qcm_edge_conn_mgr_t *cm = (qcm_edge_conn_mgr_t*) context; diff --git a/src/router_core/modules/heartbeat_edge/heartbeat_edge.c b/src/router_core/modules/heartbeat_edge/heartbeat_edge.c index 19c0e691b..3768d5c4e 100644 --- a/src/router_core/modules/heartbeat_edge/heartbeat_edge.c +++ b/src/router_core/modules/heartbeat_edge/heartbeat_edge.c @@ -58,7 +58,7 @@ typedef struct qcm_heartbeat_edge_t { */ static void on_second_attach(void *link_context, qdr_terminus_t *remote_source, - qdr_terminus_t *remote_target) + qdr_terminus_t *remote_target) TA_REQ(core_thread_capability) { qcm_heartbeat_edge_t *client = (qcm_heartbeat_edge_t*) link_context; qdr_core_timer_schedule_CT(client->core, client->timer, 1); @@ -127,7 +127,7 @@ static qdrc_endpoint_desc_t descriptor = { // Event Handlers //================================================================================ -static void on_timer(qdr_core_t *core, void *context) +static void on_timer(qdr_core_t *core, void *context) TA_REQ(core_thread_capability) { qcm_heartbeat_edge_t *client = (qcm_heartbeat_edge_t*) context; qdr_core_timer_schedule_CT(client->core, client->timer, 2); @@ -195,7 +195,7 @@ static bool qcm_heartbeat_edge_enable_CT(qdr_core_t *core) } -static void qcm_heartbeat_edge_init_CT(qdr_core_t *core, void **module_context) +static void qcm_heartbeat_edge_init_CT(qdr_core_t *core, void **module_context) TA_REQ(core_thread_capability) { qcm_heartbeat_edge_t *client = NEW(qcm_heartbeat_edge_t); ZERO(client); diff --git a/src/router_core/modules/heartbeat_server/heartbeat_server.c b/src/router_core/modules/heartbeat_server/heartbeat_server.c index 17e8eb95b..d02c9a237 100644 --- a/src/router_core/modules/heartbeat_server/heartbeat_server.c +++ b/src/router_core/modules/heartbeat_server/heartbeat_server.c @@ -130,7 +130,7 @@ static qdrc_endpoint_desc_t _endpoint_handlers = }; -static void on_timer(qdr_core_t *core, void *context) +static void on_timer(qdr_core_t *core, void *context) TA_REQ(core_thread_capability) { qdr_core_timer_schedule_CT(core, _server_state.timer, 2); endpoint_ref_t *epr = DEQ_HEAD(_server_state.endpoints); @@ -146,13 +146,13 @@ static void on_timer(qdr_core_t *core, void *context) } -static bool _heartbeat_server_enable_CT(qdr_core_t *core) +static bool _heartbeat_server_enable_CT(qdr_core_t *core) TA_REQ(core_thread_capability) { return true; } -static void _heartbeat_server_init_CT(qdr_core_t *core, void **module_context) +static void _heartbeat_server_init_CT(qdr_core_t *core, void **module_context) TA_REQ(core_thread_capability) { _server_state.core = core; diff --git a/src/router_core/modules/mobile_sync/mobile.c b/src/router_core/modules/mobile_sync/mobile.c index 31bdd1e14..c4e837c27 100644 --- a/src/router_core/modules/mobile_sync/mobile.c +++ b/src/router_core/modules/mobile_sync/mobile.c @@ -90,7 +90,7 @@ static void log_unknown_router(qdrm_mobile_sync_t *msync, qd_parsed_field_t *id_ free(r_id); } -static void qcm_mobile_sync_on_router_advanced_CT(qdrm_mobile_sync_t *msync, qdr_node_t *router); +static void qcm_mobile_sync_on_router_advanced_CT(qdrm_mobile_sync_t *msync, qdr_node_t *router) TA_REQ(core_thread_capability); //================================================================================ // Helper Functions @@ -152,7 +152,7 @@ static void qcm_mobile_sync_start_tracking(qdr_address_t *addr) * Decrement the address's ref_count. * Check the address to have it deleted if it is no longer referenced anywhere. */ -static void qcm_mobile_sync_stop_tracking(qdr_core_t *core, qdr_address_t *addr) +static void qcm_mobile_sync_stop_tracking(qdr_core_t *core, qdr_address_t *addr) TA_REQ(core_thread_capability) { BIT_CLEAR(addr->sync_mask, ADDR_SYNC_ADDRESS_MOBILE_TRACKING); if (--addr->ref_count == 0) @@ -260,7 +260,7 @@ static qd_iterator_t *qcm_mobile_sync_parse_addr_descriptor(qd_parsed_field_t *f } -static void qcm_mobile_sync_compose_diff_addr_list_add(qdrm_mobile_sync_t *msync, qd_composed_field_t *field) +static void qcm_mobile_sync_compose_diff_addr_list_add(qdrm_mobile_sync_t *msync, qd_composed_field_t *field) TA_REQ(core_thread_capability) { qd_compose_start_list(field); qdr_address_t *addr = DEQ_HEAD(msync->sync_addrs); @@ -277,7 +277,7 @@ static void qcm_mobile_sync_compose_diff_addr_list_add(qdrm_mobile_sync_t *msync } -static void qcm_mobile_sync_compose_diff_addr_list_del(qdrm_mobile_sync_t *msync, qd_composed_field_t *field) +static void qcm_mobile_sync_compose_diff_addr_list_del(qdrm_mobile_sync_t *msync, qd_composed_field_t *field) TA_REQ(core_thread_capability) { qd_compose_start_list(field); qdr_address_t *addr = DEQ_HEAD(msync->sync_addrs); @@ -295,7 +295,7 @@ static void qcm_mobile_sync_compose_diff_addr_list_del(qdrm_mobile_sync_t *msync } -static qd_message_t *qcm_mobile_sync_compose_differential_mau(qdrm_mobile_sync_t *msync, const char *address) +static qd_message_t *qcm_mobile_sync_compose_differential_mau(qdrm_mobile_sync_t *msync, const char *address) TA_REQ(core_thread_capability) { qd_message_t *msg = qd_message(); qd_composed_field_t *headers = qcm_mobile_sync_message_headers(address, MAU); @@ -448,7 +448,7 @@ static void qcm_mobile_sync_process_addr_attributes_CT(qdrm_mobile_sync_t *msync // Timer Handler //================================================================================ -static void qcm_mobile_sync_on_timer_CT(qdr_core_t *core, void *context) +static void qcm_mobile_sync_on_timer_CT(qdr_core_t *core, void *context) TA_REQ(core_thread_capability) { qdrm_mobile_sync_t *msync = (qdrm_mobile_sync_t*) context; @@ -553,7 +553,7 @@ static void qcm_mobile_sync_on_mar_CT(qdrm_mobile_sync_t *msync, qd_parsed_field } -static void qcm_mobile_sync_on_mau_CT(qdrm_mobile_sync_t *msync, qd_parsed_field_t *body) +static void qcm_mobile_sync_on_mau_CT(qdrm_mobile_sync_t *msync, qd_parsed_field_t *body) TA_REQ(core_thread_capability) { if (!!body && qd_parse_is_map(body)) { qd_parsed_field_t *id_field = qd_parse_value_by_key(body, ID); @@ -774,7 +774,7 @@ static uint64_t qcm_mobile_sync_on_message_CT(void *context, int unused_inter_router_cost, uint64_t unused_conn_id, const qd_policy_spec_t *unused_policy_spec, - qdr_error_t **error) + qdr_error_t **error) TA_REQ(core_thread_capability) { qdrm_mobile_sync_t *msync = (qdrm_mobile_sync_t*) context; qd_iterator_t *ap_iter = qd_message_field_iterator(msg, QD_FIELD_APPLICATION_PROPERTIES); @@ -830,7 +830,7 @@ static void qcm_mobile_sync_on_became_local_dest_CT(qdrm_mobile_sync_t *msync, q } -static void qcm_mobile_sync_on_no_longer_local_dest_CT(qdrm_mobile_sync_t *msync, qdr_address_t *addr) +static void qcm_mobile_sync_on_no_longer_local_dest_CT(qdrm_mobile_sync_t *msync, qdr_address_t *addr) TA_REQ(core_thread_capability) { if (!qcm_mobile_sync_addr_is_mobile(addr)) return; @@ -861,7 +861,7 @@ static void qcm_mobile_sync_on_no_longer_local_dest_CT(qdrm_mobile_sync_t *msync } -static void qcm_mobile_sync_on_address_local_change_CT(qdrm_mobile_sync_t *msync, qdr_address_t *addr) +static void qcm_mobile_sync_on_address_local_change_CT(qdrm_mobile_sync_t *msync, qdr_address_t *addr) TA_REQ(core_thread_capability) { if (!qcm_mobile_sync_addr_is_mobile(addr)) { return; @@ -886,7 +886,7 @@ static void qcm_mobile_sync_on_address_local_change_CT(qdrm_mobile_sync_t *msync } -static void qcm_mobile_sync_on_router_flush_CT(qdrm_mobile_sync_t *msync, qdr_node_t *router) +static void qcm_mobile_sync_on_router_flush_CT(qdrm_mobile_sync_t *msync, qdr_node_t *router) TA_REQ(core_thread_capability) { router->mobile_seq = 0; qdr_address_t *addr = DEQ_HEAD(msync->core->addrs); @@ -939,7 +939,7 @@ static uint32_t local_dest_count(qdr_address_t *addr) static void qcm_mobile_sync_on_addr_event_CT(void *context, qdrc_event_t event_type, - qdr_address_t *addr) + qdr_address_t *addr) TA_REQ(core_thread_capability) { qdrm_mobile_sync_t *msync = (qdrm_mobile_sync_t*) context; @@ -968,7 +968,7 @@ static void qcm_mobile_sync_on_addr_event_CT(void *context, static void qcm_mobile_sync_on_router_event_CT(void *context, qdrc_event_t event_type, - qdr_node_t *router) + qdr_node_t *router) TA_REQ(core_thread_capability) { qdrm_mobile_sync_t *msync = (qdrm_mobile_sync_t*) context; @@ -991,13 +991,13 @@ static void qcm_mobile_sync_on_router_event_CT(void *context, // Module Handlers //================================================================================ -static bool qcm_mobile_sync_enable_CT(qdr_core_t *core) +static bool qcm_mobile_sync_enable_CT(qdr_core_t *core) TA_REQ(core_thread_capability) { return core->router_mode == QD_ROUTER_MODE_INTERIOR; } -static void qcm_mobile_sync_init_CT(qdr_core_t *core, void **module_context) +static void qcm_mobile_sync_init_CT(qdr_core_t *core, void **module_context) TA_REQ(core_thread_capability) { qdrm_mobile_sync_t *msync = NEW(qdrm_mobile_sync_t); ZERO(msync); @@ -1041,7 +1041,7 @@ static void qcm_mobile_sync_init_CT(qdr_core_t *core, void **module_context) } -static void qcm_mobile_sync_final_CT(void *module_context) +static void qcm_mobile_sync_final_CT(void *module_context) TA_REQ(core_thread_capability) { qdrm_mobile_sync_t *msync = (qdrm_mobile_sync_t*) module_context; diff --git a/src/router_core/modules/streaming_link_scrubber/streaming_link_scrubber.c b/src/router_core/modules/streaming_link_scrubber/streaming_link_scrubber.c index 34981d00c..76a1c01e6 100644 --- a/src/router_core/modules/streaming_link_scrubber/streaming_link_scrubber.c +++ b/src/router_core/modules/streaming_link_scrubber/streaming_link_scrubber.c @@ -55,7 +55,7 @@ struct tracker_t { * Check the size of the connections idle link free pool. If the connection * has accumulated too many unused links start closing them */ -static void idle_link_cleanup(qdr_core_t *core, qdr_connection_t *conn) +static void idle_link_cleanup(qdr_core_t *core, qdr_connection_t *conn) TA_REQ(core_thread_capability) { qdr_link_list_t to_free = DEQ_EMPTY; @@ -97,7 +97,7 @@ static void idle_link_cleanup(qdr_core_t *core, qdr_connection_t *conn) } -static void timer_handler_CT(qdr_core_t *core, void *context) +static void timer_handler_CT(qdr_core_t *core, void *context) TA_REQ(core_thread_capability) { tracker_t *tracker = (tracker_t*) context; qdr_connection_ref_t *first_ref = DEQ_HEAD(core->streaming_connections); @@ -113,7 +113,7 @@ static void timer_handler_CT(qdr_core_t *core, void *context) } -static void qdr_streaming_link_scrubber_CT(qdr_core_t *core, qdr_action_t *action, bool discard) +static void qdr_streaming_link_scrubber_CT(qdr_core_t *core, qdr_action_t *action, bool discard) TA_REQ(core_thread_capability) { if (discard) return; @@ -164,7 +164,7 @@ static bool qcm_streaming_link_scrubber_enable_CT(qdr_core_t *core) } -static void qcm_streaming_link_scrubber_init_CT(qdr_core_t *core, void **module_context) +static void qcm_streaming_link_scrubber_init_CT(qdr_core_t *core, void **module_context) TA_REQ(core_thread_capability) { tracker_t *tracker = NEW(tracker_t); ZERO(tracker); diff --git a/src/router_core/modules/stuck_delivery_detection/delivery_tracker.c b/src/router_core/modules/stuck_delivery_detection/delivery_tracker.c index 25e8386f8..63eed1d34 100644 --- a/src/router_core/modules/stuck_delivery_detection/delivery_tracker.c +++ b/src/router_core/modules/stuck_delivery_detection/delivery_tracker.c @@ -33,7 +33,7 @@ static int timer_interval = PROD_TIMER_INTERVAL; static int stuck_age = PROD_STUCK_AGE; -static void action_handler_CT(qdr_core_t *core, qdr_action_t *action, bool discard); +static void action_handler_CT(qdr_core_t *core, qdr_action_t *action, bool discard) TA_REQ(core_thread_capability); typedef struct tracker_t tracker_t; @@ -44,7 +44,7 @@ struct tracker_t { }; -static void check_delivery_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_t *dlv) +static void check_delivery_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_t *dlv) TA_REQ(core_thread_capability) { // DISPATCH-2036: ignore "infinitely long" streaming messages (like TCP // adaptor deliveries) @@ -69,7 +69,7 @@ static void check_delivery_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_t } -static void process_link_CT(qdr_core_t *core, qdr_link_t *link) +static void process_link_CT(qdr_core_t *core, qdr_link_t *link) TA_REQ(core_thread_capability) { qdr_delivery_t *dlv = DEQ_HEAD(link->undelivered); while (dlv) { @@ -100,7 +100,7 @@ static void process_link_CT(qdr_core_t *core, qdr_link_t *link) } -static void timer_handler_CT(qdr_core_t *core, void *context) +static void timer_handler_CT(qdr_core_t *core, void *context) TA_REQ(core_thread_capability) { tracker_t *tracker = (tracker_t*) context; qdr_link_t *first_link = DEQ_HEAD(core->open_links); @@ -166,7 +166,7 @@ static bool qdrc_delivery_tracker_enable_CT(qdr_core_t *core) } -static void qdrc_delivery_tracker_init_CT(qdr_core_t *core, void **module_context) +static void qdrc_delivery_tracker_init_CT(qdr_core_t *core, void **module_context) TA_REQ(core_thread_capability) { tracker_t *tracker = NEW(tracker_t); ZERO(tracker); @@ -182,7 +182,7 @@ static void qdrc_delivery_tracker_init_CT(qdr_core_t *core, void **module_contex } -static void qdrc_delivery_tracker_final_CT(void *module_context) +static void qdrc_delivery_tracker_final_CT(void *module_context) TA_REQ(core_thread_capability) { tracker_t *tracker = (tracker_t*) module_context; qdr_core_timer_free_CT(tracker->core, tracker->timer); diff --git a/src/router_core/modules/test_hooks/core_test_hooks.c b/src/router_core/modules/test_hooks/core_test_hooks.c index 9d88a91fb..eafc5d369 100644 --- a/src/router_core/modules/test_hooks/core_test_hooks.c +++ b/src/router_core/modules/test_hooks/core_test_hooks.c @@ -537,7 +537,7 @@ static void qdrc_test_hooks_core_endpoint_finalize(test_module_t *module) // tests. Any changes here may require updates to those tests. // -static void _do_send(test_client_t *tc); +static void _do_send(test_client_t *tc) TA_REQ(core_thread_capability); struct test_client_t { test_module_t *module; @@ -579,7 +579,7 @@ static void _client_on_done_cb(qdr_core_t *core, qdrc_client_t *client, void *user_context, void *request_context, - const char *error) + const char *error) TA_REQ(core_thread_capability) { // the system_tests_core_client.py looks for the following // log message during the tests @@ -632,7 +632,7 @@ static void _client_on_state_cb(qdr_core_t *core, qdrc_client_t *core_client, static void _client_on_flow_cb(qdr_core_t *core, qdrc_client_t *core_client, void *user_context, int available_credit, - bool drain) + bool drain) TA_REQ(core_thread_capability) { test_client_t *tc = (test_client_t *)user_context; @@ -650,7 +650,7 @@ static void _client_on_flow_cb(qdr_core_t *core, qdrc_client_t *core_client, } } -static void _on_conn_event(void *context, qdrc_event_t type, qdr_connection_t *conn) +static void _on_conn_event(void *context, qdrc_event_t type, qdr_connection_t *conn) TA_REQ(core_thread_capability) { test_client_t *tc = (test_client_t *)context; @@ -710,7 +710,7 @@ static void qdrc_test_client_api_setup(test_module_t *test_module) } -static void qdrc_test_client_api_finalize(test_module_t *test_module) +static void qdrc_test_client_api_finalize(test_module_t *test_module) TA_REQ(core_thread_capability) { test_client_t *tc = test_module->test_client; if (tc) { @@ -784,13 +784,13 @@ static void _handle_crash_request(qdr_core_t *core, qd_message_t *message) } -static bool qdrc_test_hooks_enable_CT(qdr_core_t *core) +static bool qdrc_test_hooks_enable_CT(qdr_core_t *core) TA_REQ(core_thread_capability) { return qd_router_test_hooks_enabled(); } -static void qdrc_test_hooks_init_CT(qdr_core_t *core, void **module_context) +static void qdrc_test_hooks_init_CT(qdr_core_t *core, void **module_context) TA_REQ(core_thread_capability) { test_module_t *test_module = NEW(test_module_t); ZERO(test_module); @@ -801,7 +801,7 @@ static void qdrc_test_hooks_init_CT(qdr_core_t *core, void **module_context) } -static void qdrc_test_hooks_final_CT(void *module_context) +static void qdrc_test_hooks_final_CT(void *module_context) TA_REQ(core_thread_capability) { qdrc_test_hooks_core_endpoint_finalize(module_context); qdrc_test_client_api_finalize(module_context); diff --git a/src/router_core/route_control.c b/src/router_core/route_control.c index 1e485a661..fb69ecfbd 100644 --- a/src/router_core/route_control.c +++ b/src/router_core/route_control.c @@ -136,7 +136,7 @@ static void qdr_route_log_CT(qdr_core_t *core, const char *text, const char *nam } -static void qdr_auto_link_activate_CT(qdr_core_t *core, qdr_auto_link_t *al, qdr_connection_t *conn) +static void qdr_auto_link_activate_CT(qdr_core_t *core, qdr_auto_link_t *al, qdr_connection_t *conn) TA_REQ(core_thread_capability) { const char *key; @@ -176,8 +176,8 @@ static void qdr_auto_link_activate_CT(qdr_core_t *core, qdr_auto_link_t *al, qdr /** * Attempts re-establishing auto links across the related connections/containers */ -static void qdr_route_attempt_auto_link_CT(qdr_core_t *core, - void *context) +static void qdr_route_attempt_auto_link_CT(qdr_core_t *core, + void *context) TA_REQ(core_thread_capability) { qdr_auto_link_t *al = (qdr_auto_link_t *)context; qdr_connection_ref_t * cref = DEQ_HEAD(al->conn_id->connection_refs); @@ -189,7 +189,7 @@ static void qdr_route_attempt_auto_link_CT(qdr_core_t *core, } -static void qdr_auto_link_deactivate_CT(qdr_core_t *core, qdr_auto_link_t *al, qdr_connection_t *conn) +static void qdr_auto_link_deactivate_CT(qdr_core_t *core, qdr_auto_link_t *al, qdr_connection_t *conn) TA_REQ(core_thread_capability) { qdr_route_log_CT(core, "Auto Link Deactivated", al->name, al->identity, conn); @@ -352,7 +352,7 @@ void qdr_route_del_auto_link_CT(qdr_core_t *core, qdr_auto_link_t *al) qdr_core_delete_auto_link(core, al); } -static void activate_route_connection(qdr_core_t *core, qdr_connection_t *conn, qdr_conn_identifier_t *cid) +static void activate_route_connection(qdr_core_t *core, qdr_connection_t *conn, qdr_conn_identifier_t *cid) TA_REQ(core_thread_capability) { // // Activate all auto-links associated with this remote container. @@ -364,7 +364,7 @@ static void activate_route_connection(qdr_core_t *core, qdr_connection_t *conn, } } -static void deactivate_route_connection(qdr_core_t *core, qdr_connection_t *conn, qdr_conn_identifier_t *cid) +static void deactivate_route_connection(qdr_core_t *core, qdr_connection_t *conn, qdr_conn_identifier_t *cid) TA_REQ(core_thread_capability) { // // Deactivate all auto-links associated with this remote container. diff --git a/src/router_core/route_control.h b/src/router_core/route_control.h index a7ed2ccea..44b33f440 100644 --- a/src/router_core/route_control.h +++ b/src/router_core/route_control.h @@ -27,18 +27,18 @@ qdr_auto_link_t *qdr_route_add_auto_link_CT(qdr_core_t *core, qd_direction_t dir, qd_parsed_field_t *container_field, qd_parsed_field_t *connection_field, - qd_parsed_field_t *external_addr); + qd_parsed_field_t *external_addr) TA_REQ(core_thread_capability); -void qdr_route_del_auto_link_CT(qdr_core_t *core, qdr_auto_link_t *auto_link); +void qdr_route_del_auto_link_CT(qdr_core_t *core, qdr_auto_link_t *auto_link) TA_REQ(core_thread_capability); void qdr_route_connection_opened_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_field_t *container_field, - qdr_field_t *connection_field); + qdr_field_t *connection_field) TA_REQ(core_thread_capability); -void qdr_route_connection_closed_CT(qdr_core_t *core, qdr_connection_t *conn); +void qdr_route_connection_closed_CT(qdr_core_t *core, qdr_connection_t *conn) TA_REQ(core_thread_capability); -void qdr_route_check_id_for_deletion_CT(qdr_core_t *core, qdr_conn_identifier_t *cid); +void qdr_route_check_id_for_deletion_CT(qdr_core_t *core, qdr_conn_identifier_t *cid) TA_REQ(core_thread_capability); /** * Actions to be performed when an auto link detaches. @@ -48,13 +48,13 @@ void qdr_route_check_id_for_deletion_CT(qdr_core_t *core, qdr_conn_identifier_t * @param core Pointer to the core object returned by qd_core() * @param link qdr_link_t reference. The attach on this link for an auto link was rejected. */ -void qdr_route_auto_link_detached_CT(qdr_core_t *core, qdr_link_t *link); +void qdr_route_auto_link_detached_CT(qdr_core_t *core, qdr_link_t *link) TA_REQ(core_thread_capability); /** * Performs actions that need to be taken when an auto link is closed. * For example, if a timer was setup to reconnect the autolink, it needs to be canceled. * @param link qdr_link_t reference. */ -void qdr_route_auto_link_closed_CT(qdr_core_t *core, qdr_link_t *link); +void qdr_route_auto_link_closed_CT(qdr_core_t *core, qdr_link_t *link) TA_REQ(core_thread_capability); #endif diff --git a/src/router_core/route_tables.c b/src/router_core/route_tables.c index 70792dd2c..32f11c491 100644 --- a/src/router_core/route_tables.c +++ b/src/router_core/route_tables.c @@ -19,18 +19,18 @@ #include "router_core_private.h" -static void qdr_add_router_CT (qdr_core_t *core, qdr_action_t *action, bool discard); -static void qdr_del_router_CT (qdr_core_t *core, qdr_action_t *action, bool discard); -static void qdr_set_link_CT (qdr_core_t *core, qdr_action_t *action, bool discard); -static void qdr_remove_link_CT (qdr_core_t *core, qdr_action_t *action, bool discard); -static void qdr_set_next_hop_CT (qdr_core_t *core, qdr_action_t *action, bool discard); -static void qdr_remove_next_hop_CT (qdr_core_t *core, qdr_action_t *action, bool discard); -static void qdr_set_cost_CT (qdr_core_t *core, qdr_action_t *action, bool discard); -static void qdr_set_valid_origins_CT (qdr_core_t *core, qdr_action_t *action, bool discard); -static void qdr_flush_destinations_CT (qdr_core_t *core, qdr_action_t *action, bool discard); -static void qdr_mobile_seq_advanced_CT (qdr_core_t *core, qdr_action_t *action, bool discard); -static void qdr_subscribe_CT (qdr_core_t *core, qdr_action_t *action, bool discard); -static void qdr_unsubscribe_CT (qdr_core_t *core, qdr_action_t *action, bool discard); +static void qdr_add_router_CT (qdr_core_t *core, qdr_action_t *action, bool discard) TA_REQ(core_thread_capability); +static void qdr_del_router_CT (qdr_core_t *core, qdr_action_t *action, bool discard) TA_REQ(core_thread_capability); +static void qdr_set_link_CT (qdr_core_t *core, qdr_action_t *action, bool discard) TA_REQ(core_thread_capability); +static void qdr_remove_link_CT (qdr_core_t *core, qdr_action_t *action, bool discard) TA_REQ(core_thread_capability); +static void qdr_set_next_hop_CT (qdr_core_t *core, qdr_action_t *action, bool discard) TA_REQ(core_thread_capability); +static void qdr_remove_next_hop_CT (qdr_core_t *core, qdr_action_t *action, bool discard) TA_REQ(core_thread_capability); +static void qdr_set_cost_CT (qdr_core_t *core, qdr_action_t *action, bool discard) TA_REQ(core_thread_capability); +static void qdr_set_valid_origins_CT (qdr_core_t *core, qdr_action_t *action, bool discard) TA_REQ(core_thread_capability); +static void qdr_flush_destinations_CT (qdr_core_t *core, qdr_action_t *action, bool discard) TA_REQ(core_thread_capability); +static void qdr_mobile_seq_advanced_CT (qdr_core_t *core, qdr_action_t *action, bool discard) TA_REQ(core_thread_capability); +static void qdr_subscribe_CT (qdr_core_t *core, qdr_action_t *action, bool discard) TA_REQ(core_thread_capability); +static void qdr_unsubscribe_CT (qdr_core_t *core, qdr_action_t *action, bool discard) TA_REQ(core_thread_capability); //================================================================================== diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c index 2dead73df..ee7c52a16 100644 --- a/src/router_core/router_core.c +++ b/src/router_core/router_core.c @@ -45,7 +45,7 @@ const uint64_t QD_DELIVERY_MOVED_TO_NEW_LINK = 999999999; static void qdr_general_handler(void *context); -static void qdr_core_setup_init(qdr_core_t *core) +static void qdr_core_setup_init(qdr_core_t *core) TA_REQ(core_thread_capability) { // // Check the environment variable to see if we should disable the fix for issue #867. @@ -72,7 +72,7 @@ static void qdr_core_setup_init(qdr_core_t *core) qdr_adaptors_init(core); } -qdr_core_t *qdr_core(qd_dispatch_t *qd, qd_router_mode_t mode, const char *area, const char *id) +qdr_core_t *qdr_core(qd_dispatch_t *qd, qd_router_mode_t mode, const char *area, const char *id) TA_NO_THREAD_SAFETY_ANALYSIS { qdr_core_t *core = NEW(qdr_core_t); ZERO(core); @@ -1078,7 +1078,7 @@ static void qdr_post_global_stats_response(qdr_core_t *core, qdr_general_work_t work->stats_handler(work->context, discard); } -static void qdr_global_stats_request_CT(qdr_core_t *core, qdr_action_t *action, bool discard) +static void qdr_global_stats_request_CT(qdr_core_t *core, qdr_action_t *action, bool discard) TA_REQ(core_thread_capability) { if (!discard) { qdr_global_stats_t *stats = action->args.stats_request.stats; diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h index 4d822b293..f0929fb13 100644 --- a/src/router_core/router_core_private.h +++ b/src/router_core/router_core_private.h @@ -603,13 +603,13 @@ struct qdr_address_t { DEQ_DECLARE(qdr_address_t, qdr_address_list_t); -qdr_address_t *qdr_address_CT(qdr_core_t *core, qd_address_treatment_t treatment, qdr_address_config_t *config); -qdr_address_t *qdr_add_local_address_CT(qdr_core_t *core, char aclass, const char *addr, qd_address_treatment_t treatment); -qdr_address_t *qdr_add_mobile_address_CT(qdr_core_t *core, const char* prefix, const char *addr, qd_address_treatment_t treatment, bool edge); -void qdr_core_remove_address(qdr_core_t *core, qdr_address_t *addr); +qdr_address_t *qdr_address_CT(qdr_core_t *core, qd_address_treatment_t treatment, qdr_address_config_t *config) TA_REQ(core_thread_capability); +qdr_address_t *qdr_add_local_address_CT(qdr_core_t *core, char aclass, const char *addr, qd_address_treatment_t treatment) TA_REQ(core_thread_capability); +qdr_address_t *qdr_add_mobile_address_CT(qdr_core_t *core, const char *prefix, const char *addr, qd_address_treatment_t treatment, bool edge) TA_REQ(core_thread_capability); +void qdr_core_remove_address(qdr_core_t *core, qdr_address_t *addr) TA_REQ(core_thread_capability); void qdr_core_edge_mesh_id_changed_CT(qdr_core_t *core, qdr_connection_t *conn); -void qdr_core_bind_address_link_CT(qdr_core_t *core, qdr_address_t *addr, qdr_link_t *link); -void qdr_core_unbind_address_link_CT(qdr_core_t *core, qdr_address_t *addr, qdr_link_t *link); +void qdr_core_bind_address_link_CT(qdr_core_t *core, qdr_address_t *addr, qdr_link_t *link) TA_REQ(core_thread_capability); +void qdr_core_unbind_address_link_CT(qdr_core_t *core, qdr_address_t *addr, qdr_link_t *link) TA_REQ(core_thread_capability); struct qdr_address_config_t { DEQ_LINKS(qdr_address_config_t); @@ -701,7 +701,7 @@ struct qdr_connection_t { char edge_mesh_id[QD_DISCRIMINATOR_BYTES]; ///< Interior, edge-role only - Identity of the connected mesh }; -void qdr_core_delete_auto_link (qdr_core_t *core, qdr_auto_link_t *al); +void qdr_core_delete_auto_link(qdr_core_t *core, qdr_auto_link_t *al) TA_REQ(core_thread_capability); // Core timer related field/data structures typedef void (*qdr_timer_cb_t)(qdr_core_t *core, void* context); @@ -933,59 +933,59 @@ void *router_core_thread(void *arg); uint64_t qdr_identifier(qdr_core_t* core); uint64_t qdr_management_agent_on_message(void *context, qd_message_t *msg, int link_id, int cost, uint64_t in_conn_id, const qd_policy_spec_t *policy_spec, qdr_error_t **error); -void qdr_route_table_setup_CT(qdr_core_t *core); +void qdr_route_table_setup_CT(qdr_core_t *core) TA_REQ(core_thread_capability); qdr_agent_t *qdr_agent(qdr_core_t *core); void qdr_agent_setup_subscriptions(qdr_agent_t *agent, qdr_core_t *core); void qdr_agent_free(qdr_agent_t *agent); -void qdr_forwarder_setup_CT(qdr_core_t *core); +void qdr_forwarder_setup_CT(qdr_core_t *core) TA_REQ(core_thread_capability); qdr_action_t *qdr_action(qdr_action_handler_t action_handler, const char *label); void qdr_action_enqueue(qdr_core_t *core, qdr_action_t *action); void qdr_action_background_enqueue(qdr_core_t *core, qdr_action_t *action); -void qdr_link_issue_credit_CT(qdr_core_t *core, qdr_link_t *link, int credit, bool drain); -void qdr_drain_inbound_undelivered_CT(qdr_core_t *core, qdr_link_t *link, qdr_address_t *addr); -void qdr_addr_start_inlinks_CT(qdr_core_t *core, qdr_address_t *addr); +void qdr_link_issue_credit_CT(qdr_core_t *core, qdr_link_t *link, int credit, bool drain) TA_REQ(core_thread_capability); +void qdr_drain_inbound_undelivered_CT(qdr_core_t *core, qdr_link_t *link, qdr_address_t *addr) TA_REQ(core_thread_capability); +void qdr_addr_start_inlinks_CT(qdr_core_t *core, qdr_address_t *addr) TA_REQ(core_thread_capability) TA_REQ(core_thread_capability); static inline bool qdr_link_is_streaming_deliveries(qdr_link_t *link) { return IS_ATOMIC_FLAG_SET(&link->streaming_deliveries); } /** * Returns true if the passed in address is a mobile address, false otherwise * If the first character of the address_key (obtained using its hash_handle) is M, the address is mobile. */ -bool qdr_address_is_mobile_CT(qdr_address_t *addr); +bool qdr_address_is_mobile_CT(qdr_address_t *addr) TA_REQ(core_thread_capability); -void qdr_forward_on_message_CT(qdr_core_t *core, qdr_subscription_t *sub, qdr_link_t *link, qd_message_t *msg, qdr_delivery_t *in_dlv); -void qdr_in_process_send_to_CT(qdr_core_t *core, qd_iterator_t *address, qd_message_t *msg, bool exclude_inprocess, bool control); -void qdr_agent_enqueue_response_CT(qdr_core_t *core, qdr_query_t *query); +void qdr_forward_on_message_CT(qdr_core_t *core, qdr_subscription_t *sub, qdr_link_t *link, qd_message_t *msg, qdr_delivery_t *in_dlv) TA_REQ(core_thread_capability); +void qdr_in_process_send_to_CT(qdr_core_t *core, qd_iterator_t *address, qd_message_t *msg, bool exclude_inprocess, bool control) TA_REQ(core_thread_capability); +void qdr_agent_enqueue_response_CT(qdr_core_t *core, qdr_query_t *query) TA_REQ(core_thread_capability); -void qdr_post_set_mobile_seq_CT(qdr_core_t *core, int router_maskbit, uint64_t mobile_seq); -void qdr_post_set_my_mobile_seq_CT(qdr_core_t *core, uint64_t mobile_seq); -void qdr_post_link_lost_CT(qdr_core_t *core, int link_maskbit); +void qdr_post_set_mobile_seq_CT(qdr_core_t *core, int router_maskbit, uint64_t mobile_seq) TA_REQ(core_thread_capability); +void qdr_post_set_my_mobile_seq_CT(qdr_core_t *core, uint64_t mobile_seq) TA_REQ(core_thread_capability); +void qdr_post_link_lost_CT(qdr_core_t *core, int link_maskbit) TA_REQ(core_thread_capability); -void qdr_post_general_work_CT(qdr_core_t *core, qdr_general_work_t *work); -void qdr_check_addr_CT(qdr_core_t *core, qdr_address_t *addr); +void qdr_post_general_work_CT(qdr_core_t *core, qdr_general_work_t *work) TA_REQ(core_thread_capability); +void qdr_check_addr_CT(qdr_core_t *core, qdr_address_t *addr) TA_REQ(core_thread_capability); void qdr_process_addr_attributes_CT(qdr_core_t *core, qdr_address_t *addr); -bool qdr_is_addr_treatment_multicast(qdr_address_t *addr); -qdr_delivery_t *qdr_forward_new_delivery_CT(qdr_core_t *core, qdr_delivery_t *peer, qdr_link_t *link, qd_message_t *msg); -void qdr_forward_deliver_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_t *dlv); -void qdr_connection_free(qdr_connection_t *conn); -void qdr_connection_activate_CT(qdr_core_t *core, qdr_connection_t *conn); -void qdr_close_connection_CT(qdr_core_t *core, qdr_connection_t *conn); -qdr_link_t *qdr_connection_new_streaming_link_CT(qdr_core_t *core, qdr_connection_t *conn); -qdr_address_config_t *qdr_config_for_address_CT(qdr_core_t *core, qdr_connection_t *conn, qd_iterator_t *iter); -qd_address_treatment_t qdr_treatment_for_address_hash_CT(qdr_core_t *core, qd_iterator_t *iter, qdr_address_config_t **addr_config); -qd_address_treatment_t qdr_treatment_for_address_hash_with_default_CT(qdr_core_t *core, qd_iterator_t *iter, qd_address_treatment_t default_treatment, qdr_address_config_t **addr_config); +bool qdr_is_addr_treatment_multicast(qdr_address_t *addr) TA_REQ(core_thread_capability); +qdr_delivery_t *qdr_forward_new_delivery_CT(qdr_core_t *core, qdr_delivery_t *peer, qdr_link_t *link, qd_message_t *msg) TA_REQ(core_thread_capability); +void qdr_forward_deliver_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_t *dlv) TA_REQ(core_thread_capability); +void qdr_connection_free(qdr_connection_t *conn) TA_REQ(core_thread_capability); +void qdr_connection_activate_CT(qdr_core_t *core, qdr_connection_t *conn) TA_REQ(core_thread_capability); +void qdr_close_connection_CT(qdr_core_t *core, qdr_connection_t *conn) TA_REQ(core_thread_capability); +qdr_link_t *qdr_connection_new_streaming_link_CT(qdr_core_t *core, qdr_connection_t *conn) TA_REQ(core_thread_capability); +qdr_address_config_t *qdr_config_for_address_CT(qdr_core_t *core, qdr_connection_t *conn, qd_iterator_t *iter) TA_REQ(core_thread_capability); +qd_address_treatment_t qdr_treatment_for_address_hash_CT(qdr_core_t *core, qd_iterator_t *iter, qdr_address_config_t **addr_config) TA_REQ(core_thread_capability); +qd_address_treatment_t qdr_treatment_for_address_hash_with_default_CT(qdr_core_t *core, qd_iterator_t *iter, qd_address_treatment_t default_treatment, qdr_address_config_t **addr_config) TA_REQ(core_thread_capability); qdr_edge_t *qdr_edge(qdr_core_t *); void qdr_edge_free(qdr_edge_t *); void qdr_edge_connection_opened(qdr_edge_t *edge, qdr_connection_t *conn); void qdr_edge_connection_closed(qdr_edge_t *edge); -void qdr_link_cleanup_deliveries_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_link_t *link, bool on_shutdown); -void qdr_link_deliver_CT(qdr_core_t *core, qdr_action_t *action, bool discard); +void qdr_link_cleanup_deliveries_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_link_t *link, bool on_shutdown) TA_REQ(core_thread_capability); +void qdr_link_deliver_CT(qdr_core_t *core, qdr_action_t *action, bool discard) TA_REQ(core_thread_capability); void qdr_connection_enqueue_work_CT(qdr_core_t *core, qdr_connection_t *conn, - qdr_connection_work_t *work); + qdr_connection_work_t *work) TA_REQ(core_thread_capability); void qdr_link_enqueue_work_CT(qdr_core_t *core, qdr_link_t *conn, - qdr_link_work_t *work); + qdr_link_work_t *work) TA_REQ(core_thread_capability); qdr_link_t *qdr_create_link_CT(qdr_core_t *core, qdr_connection_t *conn, @@ -994,11 +994,11 @@ qdr_link_t *qdr_create_link_CT(qdr_core_t *core, qdr_terminus_t *source, qdr_terminus_t *target, qd_session_class_t ssn_class, - uint8_t priority); + uint8_t priority) TA_REQ(core_thread_capability); -void qdr_link_outbound_detach_CT(qdr_core_t *core, qdr_link_t *link, qdr_error_t *error, qdr_condition_t condition, bool close); -void qdr_link_outbound_second_attach_CT(qdr_core_t *core, qdr_link_t *link, qdr_terminus_t *source, qdr_terminus_t *target); -bool qdr_link_is_idle_CT(const qdr_link_t *link); +void qdr_link_outbound_detach_CT(qdr_core_t *core, qdr_link_t *link, qdr_error_t *error, qdr_condition_t condition, bool close) TA_REQ(core_thread_capability); +void qdr_link_outbound_second_attach_CT(qdr_core_t *core, qdr_link_t *link, qdr_terminus_t *source, qdr_terminus_t *target) TA_REQ(core_thread_capability); +bool qdr_link_is_idle_CT(const qdr_link_t *link) TA_REQ(core_thread_capability); qdr_terminus_t *qdr_terminus_router_control(void); ///< new terminus for router control links qdr_terminus_t *qdr_terminus_router_data(void); ///< new terminus for router links qdr_terminus_t *qdr_terminus_inter_edge(void); ///< new terminus for inter-edge links @@ -1021,8 +1021,7 @@ void qdr_adaptors_finalize(qdr_core_t *core); * @callback Callback function to be invoked when timer fires. * @timer_context Context to be used when firing callback */ -qdr_core_timer_t *qdr_core_timer_CT(qdr_core_t *core, qdr_timer_cb_t callback, void *timer_context); - +qdr_core_timer_t *qdr_core_timer_CT(qdr_core_t *core, qdr_timer_cb_t callback, void *timer_context) TA_REQ(core_thread_capability); /** * Schedules a core timer with a delay. The timer will fire after "delay" seconds @@ -1030,7 +1029,7 @@ qdr_core_timer_t *qdr_core_timer_CT(qdr_core_t *core, qdr_timer_cb_t callback, v * @param timer Timer object that needs to be scheduled. * @param delay The number of seconds to wait before firing the timer */ -void qdr_core_timer_schedule_CT(qdr_core_t *core, qdr_core_timer_t *timer, uint32_t delay); +void qdr_core_timer_schedule_CT(qdr_core_t *core, qdr_core_timer_t *timer, uint32_t delay) TA_REQ(core_thread_capability); /** * Cancels an already scheduled timeer. This does not free the timer. It is the responsibility of the person who diff --git a/src/router_core/router_core_thread.c b/src/router_core/router_core_thread.c index a72b91745..63de80433 100644 --- a/src/router_core/router_core_thread.c +++ b/src/router_core/router_core_thread.c @@ -190,7 +190,7 @@ void qdr_adaptors_finalize(qdr_core_t *core) } -void *router_core_thread(void *arg) +void *router_core_thread(void *arg) TA_REQ(core_thread_capability) { qdr_core_t *core = (qdr_core_t*) arg; qdr_action_list_t action_list = DEQ_EMPTY; diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c index 44bd55553..ed5efe616 100644 --- a/src/router_core/transfer.c +++ b/src/router_core/transfer.c @@ -28,8 +28,8 @@ // Internal Functions //================================================================================== -static void qdr_link_flow_CT(qdr_core_t *core, qdr_action_t *action, bool discard); -static void qdr_send_to_CT(qdr_core_t *core, qdr_action_t *action, bool discard); +static void qdr_link_flow_CT(qdr_core_t *core, qdr_action_t *action, bool discard) TA_REQ(core_thread_capability); +static void qdr_send_to_CT(qdr_core_t *core, qdr_action_t *action, bool discard) TA_REQ(core_thread_capability); //================================================================================== @@ -417,7 +417,7 @@ void qdr_send_to2(qdr_core_t *core, qd_message_t *msg, const char *addr, bool ex //================================================================================== -static void qdr_link_flow_CT(qdr_core_t *core, qdr_action_t *action, bool discard) +static void qdr_link_flow_CT(qdr_core_t *core, qdr_action_t *action, bool discard) TA_REQ(core_thread_capability) { qdr_link_t *link = safe_deref_qdr_link_t(action->args.connection.link); @@ -525,6 +525,7 @@ static long qdr_addr_path_count_CT(qdr_address_t *addr) static void qdr_link_forward_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_t *dlv, qdr_address_t *addr, bool more) + TA_REQ(core_thread_capability) { qdr_link_t *dlv_link = qdr_delivery_link(dlv); @@ -729,7 +730,7 @@ static void qdr_link_forward_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery } -void qdr_link_deliver_CT(qdr_core_t *core, qdr_action_t *action, bool discard) +void qdr_link_deliver_CT(qdr_core_t *core, qdr_action_t *action, bool discard) TA_REQ(core_thread_capability) { if (discard) return; @@ -1007,7 +1008,7 @@ void qdr_drain_inbound_undelivered_CT(qdr_core_t *core, qdr_link_t *link, qdr_ad * Also, check the inlinks to see if there are undelivered messages. If so, drain them to * the forwarder. */ -void qdr_addr_start_inlinks_CT(qdr_core_t *core, qdr_address_t *addr) +void qdr_addr_start_inlinks_CT(qdr_core_t *core, qdr_address_t *addr) TA_REQ(core_thread_capability) { if (qdr_addr_path_count_CT(addr) == 1) { qdr_link_ref_t *ref = DEQ_HEAD(addr->inlinks); diff --git a/src/router_node.c b/src/router_node.c index a0cd55181..d99c7c87b 100644 --- a/src/router_node.c +++ b/src/router_node.c @@ -2363,7 +2363,7 @@ QD_EXPORT void qd_router_setup_late(qd_dispatch_t *qd) qd_timer_schedule(qd->router->timer, 1000); } -void qd_router_free(qd_router_t *router) +void qd_router_free(qd_router_t *router) TA_NO_THREAD_SAFETY_ANALYSIS { if (!router) return; diff --git a/tests/core_timer_test.c b/tests/core_timer_test.c index a85e4a771..cdf9bf09c 100644 --- a/tests/core_timer_test.c +++ b/tests/core_timer_test.c @@ -23,7 +23,7 @@ #include #include -void qdr_process_tick_CT(qdr_core_t *core, qdr_action_t *action, bool discard); +void qdr_process_tick_CT(qdr_core_t *core, qdr_action_t *action, bool discard) TA_REQ(core_thread_capability); static int results[5]; @@ -33,7 +33,7 @@ static void callback(qdr_core_t *unused, void *context) { } -static char* test_core_timer(void *context) +static char *test_core_timer(void *context) TA_REQ(core_thread_capability) { qdr_core_t *core = NEW(qdr_core_t); ZERO(core); @@ -177,7 +177,7 @@ static char* test_core_timer(void *context) } -int core_timer_tests(void) +int core_timer_tests(void) TA_NO_THREAD_SAFETY_ANALYSIS { int result = 0; char *test_group = "core_timer_tests"; diff --git a/tests/run_unit_tests.c b/tests/run_unit_tests.c index 772d5e039..b3b59c3db 100644 --- a/tests/run_unit_tests.c +++ b/tests/run_unit_tests.c @@ -24,20 +24,19 @@ int tool_tests(void); int timer_tests(void); -int core_timer_tests(void); +int core_timer_tests(void) TA_REQ(core_thread_capability); int alloc_tests(void); int compose_tests(void); int policy_tests(void); int failoverlist_tests(void); int parse_tree_tests(void); int proton_utils_tests(void); -int version_tests(void); int hash_tests(void); int thread_tests(void); int platform_tests(void); -int main(int argc, char** argv) +int main(int argc, char** argv) TA_ACQ(core_thread_capability) TA_NO_THREAD_SAFETY_ANALYSIS { if (argc != 2) { fprintf(stderr, "usage: %s \n", argv[0]);