diff --git a/src/core/rbuscore.c b/src/core/rbuscore.c index 1367e241..48be8131 100644 --- a/src/core/rbuscore.c +++ b/src/core/rbuscore.c @@ -812,7 +812,7 @@ rbusCoreError_t rbus_registerObj(const char * object_name, rbus_callback_t handl server_object_create(&obj, object_name, handler, user_data); //TODO: callback signature translation. rbusMessage uses a significantly wider signature for callbacks. Translate to something simpler. - err = rtConnection_AddListener(g_connection, object_name, RBUS_REGISTER_OBJECT_EXPRESSION_ID, onMessage, obj); + err = rtConnection_AddListenerWithId(g_connection, object_name, RBUS_REGISTER_OBJECT_EXPRESSION_ID, onMessage, obj); if(RT_OK == err) { @@ -957,11 +957,10 @@ rbusCoreError_t rbus_unregisterObj(const char * object_name) RBUSCORELOG_ERROR("object_name is invalid."); return RBUSCORE_ERROR_INVALID_PARAM; } - - err = rtConnection_RemoveListener(g_connection, object_name, RBUS_REGISTER_OBJECT_EXPRESSION_ID); + err = rtConnection_RemoveListenerWithId(g_connection, object_name, RBUS_REGISTER_OBJECT_EXPRESSION_ID); if(RT_OK != err) { - RBUSCORELOG_ERROR("rtConnection_RemoveListener %s failed: Err=%d", object_name, err); + RBUSCORELOG_ERROR("rtConnection_RemoveListenerWithId %s failed: Err=%d", object_name, err); return RBUSCORE_ERROR_GENERAL; } @@ -1782,7 +1781,7 @@ rbusCoreError_t rbus_registerClientDisconnectHandler(rbus_client_disconnect_call lock(); if(!g_advisory_listener_installed) { - rtError err = rtConnection_AddListener(g_connection, RTMSG_ADVISORY_TOPIC, RBUS_ADVISORY_EXPRESSION_ID, &rtrouted_advisory_callback, g_connection); + rtError err = rtConnection_AddListenerWithId(g_connection, RTMSG_ADVISORY_TOPIC, RBUS_ADVISORY_EXPRESSION_ID, &rtrouted_advisory_callback, g_connection); if(err == RT_OK) { RBUSCORELOG_DEBUG("Listening for advisory messages"); @@ -1805,7 +1804,7 @@ rbusCoreError_t rbus_unregisterClientDisconnectHandler() lock(); if(g_advisory_listener_installed) { - rtConnection_RemoveListener(g_connection, RTMSG_ADVISORY_TOPIC, RBUS_ADVISORY_EXPRESSION_ID); + rtConnection_RemoveListenerWithId(g_connection, RTMSG_ADVISORY_TOPIC, RBUS_ADVISORY_EXPRESSION_ID); g_advisory_listener_installed = false; } unlock(); diff --git a/src/core/rbuscore.h b/src/core/rbuscore.h index 83d73ace..d5c97df8 100644 --- a/src/core/rbuscore.h +++ b/src/core/rbuscore.h @@ -24,7 +24,6 @@ #include "rbuscore_types.h" #include "rbuscore_message.h" #include -#include "rtrouteBase.h" #define MAX_OBJECT_NAME_LENGTH RTMSG_HEADER_MAX_TOPIC_LENGTH #define MAX_METHOD_NAME_LENGTH 64 diff --git a/src/rbus/rbus_message.c b/src/rbus/rbus_message.c index 43fd8bce..4197fa7c 100644 --- a/src/rbus/rbus_message.c +++ b/src/rbus/rbus_message.c @@ -146,11 +146,11 @@ rbusError_t rbusMessage_AddPrivateListener( rtVector_PushBack(handle->messageCallbacks, ctx); snprintf(rawDataTopic, RBUS_MAX_NAME_LENGTH, "%d.%s", subscriptionId, expression); - rtError e = rtConnection_AddListener(myConn, rawDataTopic, subscriptionId, &rtMessage_CallbackHandler, ctx); + rtError e = rtConnection_AddListenerWithId(myConn, rawDataTopic, subscriptionId, &rtMessage_CallbackHandler, ctx); RBUS_MESSAGE_MUTEX_UNLOCK(); if (e != RT_OK) { - RBUSLOG_WARN("rtConnection_AddListener:%s", rtStrError(e)); + RBUSLOG_WARN("rtConnection_AddListenerWithId:%s", rtStrError(e)); return RBUS_ERROR_BUS_ERROR; } @@ -178,11 +178,11 @@ rbusError_t rbusMessage_AddListener( RBUS_MESSAGE_MUTEX_LOCK(); rtVector_PushBack(handle->messageCallbacks, ctx); - rtError e = rtConnection_AddListener(con, expression, subscriptionId, &rtMessage_CallbackHandler, ctx); + rtError e = rtConnection_AddListenerWithId(con, expression, subscriptionId, &rtMessage_CallbackHandler, ctx); RBUS_MESSAGE_MUTEX_UNLOCK(); if (e != RT_OK) { - RBUSLOG_WARN("rtConnection_AddListener:%s", rtStrError(e)); + RBUSLOG_WARN("rtConnection_AddListenerWithId:%s", rtStrError(e)); return RBUS_ERROR_BUS_ERROR; } @@ -207,11 +207,11 @@ rbusError_t rbusMessage_RemovePrivateListener( RBUS_MESSAGE_MUTEX_LOCK(); rtVector_RemoveItemByCompare(handle->messageCallbacks, rawDataTopic, compareContextExpression, cleanupContext); - rtError e = rtConnection_RemoveListener(myConn, rawDataTopic, subscriptionId); + rtError e = rtConnection_RemoveListenerWithId(myConn, rawDataTopic, subscriptionId); RBUS_MESSAGE_MUTEX_UNLOCK(); if (e != RT_OK) { - RBUSLOG_WARN("rtConnection_RemoveListener:%s", rtStrError(e)); + RBUSLOG_WARN("rtConnection_RemoveListenerWithId:%s", rtStrError(e)); return RBUS_ERROR_BUS_ERROR; } @@ -229,11 +229,11 @@ rbusError_t rbusMessage_RemoveListener( RBUS_MESSAGE_MUTEX_LOCK(); rtVector_RemoveItemByCompare(handle->messageCallbacks, expression, compareContextExpression, cleanupContext); - rtError e = rtConnection_RemoveListener(con, expression, subscriptionId); + rtError e = rtConnection_RemoveListenerWithId(con, expression, subscriptionId); RBUS_MESSAGE_MUTEX_UNLOCK(); if (e != RT_OK) { - RBUSLOG_WARN("rtConnection_RemoveListener:%s", rtStrError(e)); + RBUSLOG_WARN("rtConnection_RemoveListenerWithId:%s", rtStrError(e)); return RBUS_ERROR_BUS_ERROR; } @@ -252,7 +252,7 @@ rbusError_t rbusMessage_RemoveAllListeners( { rbusMessageHandlerContext_t* ctx = rtVector_At(handle->messageCallbacks, i); VERIFY_NULL(ctx); - rtError e = rtConnection_RemoveListener(con, ctx->expression, ctx->subscriptionId); + rtError e = rtConnection_RemoveListenerWithId(con, ctx->expression, ctx->subscriptionId); if (e != RT_OK) { RBUSLOG_WARN("rbusMessage_RemoveAllListener %s :%s", ctx->expression, rtStrError(e)); diff --git a/src/rtmessage/rtConnection.c b/src/rtmessage/rtConnection.c index 1a81cdc0..5edfe525 100644 --- a/src/rtmessage/rtConnection.c +++ b/src/rtmessage/rtConnection.c @@ -245,6 +245,13 @@ rtConnection_SendRequestInternal( int32_t timeout, int flags); +static uint32_t +rtConnection_GetNextSubscriptionId() +{ + static uint32_t next_id = 10000; /* Keeping this number high to avoid conflict with the subscription Id added in rbusSubscriptions_addSubscription() which starts with 1 */ + return next_id++; +} + static int GetRunThreadsSync(rtConnection con) { int run_threads; @@ -599,7 +606,7 @@ rtConnection_CreateInternal(rtConnection* con, char const* application_name, cha if (err == RT_OK) { - rtConnection_AddListener(c, c->inbox_name, RTCONNECTION_CREATE_EXPRESSION_ID, onDefaultMessage, c); + rtConnection_AddListenerWithId(c, c->inbox_name, RTCONNECTION_CREATE_EXPRESSION_ID, onDefaultMessage, c); rtConnection_StartThreads(c); *con = c; } @@ -1223,7 +1230,13 @@ rtConnection_SendInternal(rtConnection con, uint8_t const* buff, uint32_t n, cha } rtError -rtConnection_AddListener(rtConnection con, char const* expression, uint32_t expressionId, rtMessageCallback callback, void* closure) +rtConnection_AddListener(rtConnection con, char const* expression, rtMessageCallback callback, void* closure) +{ + return rtConnection_AddListenerWithId(con, expression, rtConnection_GetNextSubscriptionId(), callback, closure); +} + +rtError +rtConnection_AddListenerWithId(rtConnection con, char const* expression, uint32_t expressionId, rtMessageCallback callback, void* closure) { int i; @@ -1264,7 +1277,46 @@ rtConnection_AddListener(rtConnection con, char const* expression, uint32_t expr } rtError -rtConnection_RemoveListener(rtConnection con, char const* expression, uint32_t expressionId) +rtConnection_RemoveListener(rtConnection con, char const* expression) +{ + int i; + int route_id = 0; + + if (!con) + return rtErrorFromErrno(EINVAL); + + pthread_mutex_lock(&con->mutex); + for (i = 0; i < RTMSG_LISTENERS_MAX; ++i) + { + if ((con->listeners[i].in_use) && (0 == strcmp(expression, con->listeners[i].expression))) + { + con->listeners[i].in_use = 0; + route_id = con->listeners[i].subscription_id; + con->listeners[i].subscription_id = 0; + con->listeners[i].closure = NULL; + con->listeners[i].callback = NULL; + free(con->listeners[i].expression); + con->listeners[i].expression = NULL; + break; + } + } + pthread_mutex_unlock(&con->mutex); + + if (i >= RTMSG_LISTENERS_MAX) + return RT_ERROR_INVALID_ARG; + + rtMessage m; + rtMessage_Create(&m); + rtMessage_SetInt32(m, "add", 0); + rtMessage_SetString(m, "topic", expression); + rtMessage_SetInt32(m, "route_id", route_id); + rtConnection_SendMessage(con, m, "_RTROUTED.INBOX.SUBSCRIBE"); + rtMessage_Release(m); + return 0; +} + +rtError +rtConnection_RemoveListenerWithId(rtConnection con, char const* expression, uint32_t expressionId) { int i; int route_id = 0; diff --git a/src/rtmessage/rtConnection.h b/src/rtmessage/rtConnection.h index ae01e1ef..e6cba056 100644 --- a/src/rtmessage/rtConnection.h +++ b/src/rtmessage/rtConnection.h @@ -180,22 +180,44 @@ rtConnection_SendBinaryResponse(rtConnection con, rtMessageHeader const* request * Register a callback for message receipt * @param con * @param topic expression + * @param expression Id * @param callback handler * @param closure * @return error */ rtError -rtConnection_AddListener(rtConnection con, char const* expression, +rtConnection_AddListenerWithId(rtConnection con, char const* expression, uint32_t expressionId, rtMessageCallback callback, void* closure); +/** + * Register a callback for message receipt + * @param con + * @param topic expression + * @param callback handler + * @param closure + * @return error + */ +rtError +rtConnection_AddListener(rtConnection con, char const* expression, rtMessageCallback callback, void* closure); + +/** + * Remove a callback listener + * @param con + * @param topic expression + * @return error + */ +rtError +rtConnection_RemoveListener(rtConnection con, char const* expression); + /** * Remove a callback listener * @param con * @param topic expression + * @param expression Id * @return error */ rtError -rtConnection_RemoveListener(rtConnection con, char const* expression, uint32_t expressionId); +rtConnection_RemoveListenerWithId(rtConnection con, char const* expression, uint32_t expressionId); /** * Add an alias to an existing listener