Skip to content

Commit

Permalink
RDKB-51547 : [On rbusDirect] Implementation of RBUS Publish API to se…
Browse files Browse the repository at this point in the history
…nd Raw Data (#187)

Implemented rtConnection_AddListener() and rtConnection_AddListenerWithUID()


---------

Co-authored-by: Karunakaran A <[email protected]>
  • Loading branch information
gururaajar and karuna2git authored Dec 15, 2023
1 parent 24ed328 commit 1179aa4
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 21 deletions.
11 changes: 5 additions & 6 deletions src/core/rbuscore.c
Original file line number Diff line number Diff line change
Expand Up @@ -812,7 +812,7 @@ rbusCoreError_t rbus_registerObj(const char * object_name, rbus_callback_t handl
server_object_create(&obj, object_name, handler, user_data);

//TODO: callback signature translation. rbusMessage uses a significantly wider signature for callbacks. Translate to something simpler.
err = rtConnection_AddListener(g_connection, object_name, RBUS_REGISTER_OBJECT_EXPRESSION_ID, onMessage, obj);
err = rtConnection_AddListenerWithId(g_connection, object_name, RBUS_REGISTER_OBJECT_EXPRESSION_ID, onMessage, obj);

if(RT_OK == err)
{
Expand Down Expand Up @@ -957,11 +957,10 @@ rbusCoreError_t rbus_unregisterObj(const char * object_name)
RBUSCORELOG_ERROR("object_name is invalid.");
return RBUSCORE_ERROR_INVALID_PARAM;
}

err = rtConnection_RemoveListener(g_connection, object_name, RBUS_REGISTER_OBJECT_EXPRESSION_ID);
err = rtConnection_RemoveListenerWithId(g_connection, object_name, RBUS_REGISTER_OBJECT_EXPRESSION_ID);
if(RT_OK != err)
{
RBUSCORELOG_ERROR("rtConnection_RemoveListener %s failed: Err=%d", object_name, err);
RBUSCORELOG_ERROR("rtConnection_RemoveListenerWithId %s failed: Err=%d", object_name, err);
return RBUSCORE_ERROR_GENERAL;
}

Expand Down Expand Up @@ -1782,7 +1781,7 @@ rbusCoreError_t rbus_registerClientDisconnectHandler(rbus_client_disconnect_call
lock();
if(!g_advisory_listener_installed)
{
rtError err = rtConnection_AddListener(g_connection, RTMSG_ADVISORY_TOPIC, RBUS_ADVISORY_EXPRESSION_ID, &rtrouted_advisory_callback, g_connection);
rtError err = rtConnection_AddListenerWithId(g_connection, RTMSG_ADVISORY_TOPIC, RBUS_ADVISORY_EXPRESSION_ID, &rtrouted_advisory_callback, g_connection);
if(err == RT_OK)
{
RBUSCORELOG_DEBUG("Listening for advisory messages");
Expand All @@ -1805,7 +1804,7 @@ rbusCoreError_t rbus_unregisterClientDisconnectHandler()
lock();
if(g_advisory_listener_installed)
{
rtConnection_RemoveListener(g_connection, RTMSG_ADVISORY_TOPIC, RBUS_ADVISORY_EXPRESSION_ID);
rtConnection_RemoveListenerWithId(g_connection, RTMSG_ADVISORY_TOPIC, RBUS_ADVISORY_EXPRESSION_ID);
g_advisory_listener_installed = false;
}
unlock();
Expand Down
1 change: 0 additions & 1 deletion src/core/rbuscore.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
#include "rbuscore_types.h"
#include "rbuscore_message.h"
#include <rtm_discovery_api.h>
#include "rtrouteBase.h"

#define MAX_OBJECT_NAME_LENGTH RTMSG_HEADER_MAX_TOPIC_LENGTH
#define MAX_METHOD_NAME_LENGTH 64
Expand Down
18 changes: 9 additions & 9 deletions src/rbus/rbus_message.c
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,11 @@ rbusError_t rbusMessage_AddPrivateListener(
rtVector_PushBack(handle->messageCallbacks, ctx);

snprintf(rawDataTopic, RBUS_MAX_NAME_LENGTH, "%d.%s", subscriptionId, expression);
rtError e = rtConnection_AddListener(myConn, rawDataTopic, subscriptionId, &rtMessage_CallbackHandler, ctx);
rtError e = rtConnection_AddListenerWithId(myConn, rawDataTopic, subscriptionId, &rtMessage_CallbackHandler, ctx);
RBUS_MESSAGE_MUTEX_UNLOCK();
if (e != RT_OK)
{
RBUSLOG_WARN("rtConnection_AddListener:%s", rtStrError(e));
RBUSLOG_WARN("rtConnection_AddListenerWithId:%s", rtStrError(e));
return RBUS_ERROR_BUS_ERROR;
}

Expand Down Expand Up @@ -178,11 +178,11 @@ rbusError_t rbusMessage_AddListener(
RBUS_MESSAGE_MUTEX_LOCK();
rtVector_PushBack(handle->messageCallbacks, ctx);

rtError e = rtConnection_AddListener(con, expression, subscriptionId, &rtMessage_CallbackHandler, ctx);
rtError e = rtConnection_AddListenerWithId(con, expression, subscriptionId, &rtMessage_CallbackHandler, ctx);
RBUS_MESSAGE_MUTEX_UNLOCK();
if (e != RT_OK)
{
RBUSLOG_WARN("rtConnection_AddListener:%s", rtStrError(e));
RBUSLOG_WARN("rtConnection_AddListenerWithId:%s", rtStrError(e));
return RBUS_ERROR_BUS_ERROR;
}

Expand All @@ -207,11 +207,11 @@ rbusError_t rbusMessage_RemovePrivateListener(
RBUS_MESSAGE_MUTEX_LOCK();
rtVector_RemoveItemByCompare(handle->messageCallbacks, rawDataTopic, compareContextExpression, cleanupContext);

rtError e = rtConnection_RemoveListener(myConn, rawDataTopic, subscriptionId);
rtError e = rtConnection_RemoveListenerWithId(myConn, rawDataTopic, subscriptionId);
RBUS_MESSAGE_MUTEX_UNLOCK();
if (e != RT_OK)
{
RBUSLOG_WARN("rtConnection_RemoveListener:%s", rtStrError(e));
RBUSLOG_WARN("rtConnection_RemoveListenerWithId:%s", rtStrError(e));
return RBUS_ERROR_BUS_ERROR;
}

Expand All @@ -229,11 +229,11 @@ rbusError_t rbusMessage_RemoveListener(
RBUS_MESSAGE_MUTEX_LOCK();
rtVector_RemoveItemByCompare(handle->messageCallbacks, expression, compareContextExpression, cleanupContext);

rtError e = rtConnection_RemoveListener(con, expression, subscriptionId);
rtError e = rtConnection_RemoveListenerWithId(con, expression, subscriptionId);
RBUS_MESSAGE_MUTEX_UNLOCK();
if (e != RT_OK)
{
RBUSLOG_WARN("rtConnection_RemoveListener:%s", rtStrError(e));
RBUSLOG_WARN("rtConnection_RemoveListenerWithId:%s", rtStrError(e));
return RBUS_ERROR_BUS_ERROR;
}

Expand All @@ -252,7 +252,7 @@ rbusError_t rbusMessage_RemoveAllListeners(
{
rbusMessageHandlerContext_t* ctx = rtVector_At(handle->messageCallbacks, i);
VERIFY_NULL(ctx);
rtError e = rtConnection_RemoveListener(con, ctx->expression, ctx->subscriptionId);
rtError e = rtConnection_RemoveListenerWithId(con, ctx->expression, ctx->subscriptionId);
if (e != RT_OK)
{
RBUSLOG_WARN("rbusMessage_RemoveAllListener %s :%s", ctx->expression, rtStrError(e));
Expand Down
58 changes: 55 additions & 3 deletions src/rtmessage/rtConnection.c
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,13 @@ rtConnection_SendRequestInternal(
int32_t timeout,
int flags);

static uint32_t
rtConnection_GetNextSubscriptionId()
{
static uint32_t next_id = 10000; /* Keeping this number high to avoid conflict with the subscription Id added in rbusSubscriptions_addSubscription() which starts with 1 */
return next_id++;
}

static int GetRunThreadsSync(rtConnection con)
{
int run_threads;
Expand Down Expand Up @@ -599,7 +606,7 @@ rtConnection_CreateInternal(rtConnection* con, char const* application_name, cha

if (err == RT_OK)
{
rtConnection_AddListener(c, c->inbox_name, RTCONNECTION_CREATE_EXPRESSION_ID, onDefaultMessage, c);
rtConnection_AddListenerWithId(c, c->inbox_name, RTCONNECTION_CREATE_EXPRESSION_ID, onDefaultMessage, c);
rtConnection_StartThreads(c);
*con = c;
}
Expand Down Expand Up @@ -1223,7 +1230,13 @@ rtConnection_SendInternal(rtConnection con, uint8_t const* buff, uint32_t n, cha
}

rtError
rtConnection_AddListener(rtConnection con, char const* expression, uint32_t expressionId, rtMessageCallback callback, void* closure)
rtConnection_AddListener(rtConnection con, char const* expression, rtMessageCallback callback, void* closure)
{
return rtConnection_AddListenerWithId(con, expression, rtConnection_GetNextSubscriptionId(), callback, closure);
}

rtError
rtConnection_AddListenerWithId(rtConnection con, char const* expression, uint32_t expressionId, rtMessageCallback callback, void* closure)
{
int i;

Expand Down Expand Up @@ -1264,7 +1277,46 @@ rtConnection_AddListener(rtConnection con, char const* expression, uint32_t expr
}

rtError
rtConnection_RemoveListener(rtConnection con, char const* expression, uint32_t expressionId)
rtConnection_RemoveListener(rtConnection con, char const* expression)
{
int i;
int route_id = 0;

if (!con)
return rtErrorFromErrno(EINVAL);

pthread_mutex_lock(&con->mutex);
for (i = 0; i < RTMSG_LISTENERS_MAX; ++i)
{
if ((con->listeners[i].in_use) && (0 == strcmp(expression, con->listeners[i].expression)))
{
con->listeners[i].in_use = 0;
route_id = con->listeners[i].subscription_id;
con->listeners[i].subscription_id = 0;
con->listeners[i].closure = NULL;
con->listeners[i].callback = NULL;
free(con->listeners[i].expression);
con->listeners[i].expression = NULL;
break;
}
}
pthread_mutex_unlock(&con->mutex);

if (i >= RTMSG_LISTENERS_MAX)
return RT_ERROR_INVALID_ARG;

rtMessage m;
rtMessage_Create(&m);
rtMessage_SetInt32(m, "add", 0);
rtMessage_SetString(m, "topic", expression);
rtMessage_SetInt32(m, "route_id", route_id);
rtConnection_SendMessage(con, m, "_RTROUTED.INBOX.SUBSCRIBE");
rtMessage_Release(m);
return 0;
}

rtError
rtConnection_RemoveListenerWithId(rtConnection con, char const* expression, uint32_t expressionId)
{
int i;
int route_id = 0;
Expand Down
26 changes: 24 additions & 2 deletions src/rtmessage/rtConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -180,22 +180,44 @@ rtConnection_SendBinaryResponse(rtConnection con, rtMessageHeader const* request
* Register a callback for message receipt
* @param con
* @param topic expression
* @param expression Id
* @param callback handler
* @param closure
* @return error
*/
rtError
rtConnection_AddListener(rtConnection con, char const* expression,
rtConnection_AddListenerWithId(rtConnection con, char const* expression,
uint32_t expressionId, rtMessageCallback callback, void* closure);

/**
* Register a callback for message receipt
* @param con
* @param topic expression
* @param callback handler
* @param closure
* @return error
*/
rtError
rtConnection_AddListener(rtConnection con, char const* expression, rtMessageCallback callback, void* closure);

/**
* Remove a callback listener
* @param con
* @param topic expression
* @return error
*/
rtError
rtConnection_RemoveListener(rtConnection con, char const* expression);

/**
* Remove a callback listener
* @param con
* @param topic expression
* @param expression Id
* @return error
*/
rtError
rtConnection_RemoveListener(rtConnection con, char const* expression, uint32_t expressionId);
rtConnection_RemoveListenerWithId(rtConnection con, char const* expression, uint32_t expressionId);

/**
* Add an alias to an existing listener
Expand Down

0 comments on commit 1179aa4

Please sign in to comment.