Skip to content

Commit

Permalink
RDKB-51547 : RawData Direct mode with subscription Id (#186)
Browse files Browse the repository at this point in the history
* RDKB-51547 : RawData Direct mode with subscription Id

Reason for change: Removed topic created with subscriptionId for
normal subscription and subscriptionId based topic is only created for
rawdata direct mode subscription
Test Procedure: Test and verified with rbuscli and testapp
Risks: High
Priority: P1
Signed-off-by: Gururaaja ESR <[email protected]>

* RDKB-51547 : RawData Direct mode with subscription Id

Reason for change: Removed topic created with subscriptionId for
normal subscription and subscriptionId based topic is only created for
rawdata direct mode subscription
Test Procedure: Test and verified with rbuscli and testapp
Risks: High
Priority: P1
Signed-off-by: Gururaaja ESR <[email protected]>
  • Loading branch information
gururaajar authored Dec 4, 2023
1 parent 8b39c81 commit 24ed328
Show file tree
Hide file tree
Showing 7 changed files with 111 additions and 149 deletions.
102 changes: 93 additions & 9 deletions src/core/rbuscore.c
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,7 @@ static pthread_mutex_t g_mutex;
static pthread_mutex_t g_directCliMutex;
static pthread_mutex_t g_directServMutex;
static int g_mutex_init = 0;
static bool g_run_event_client_dispatch = false;
static rtVector g_event_subscriptions_for_client; /*client_subscription_t list. Used by the subscriber to track all active subscriptions. */
static rtVector g_queued_requests; /*list of queued_request */

Expand Down Expand Up @@ -1445,6 +1446,85 @@ rbusCoreError_t rbus_unregisterEvent(const char* object_name, const char * event
return ret;
}

static void master_event_callback(rtMessageHeader const* hdr, uint8_t const* data, uint32_t dataLen, void* closure)
{
/*using namespace rbus_client;*/
rbusMessage msg = NULL;
const char * sender = hdr->reply_topic;
const char * event_name = NULL;
const char * object_name = NULL;
int32_t is_rbus_flag = 1;
rtError err;
size_t subs_len;
size_t i;
(void)closure;

/*Sanitize the incoming data.*/
if(MAX_OBJECT_NAME_LENGTH <= strlen(sender))
{
RBUSCORELOG_ERROR("Object name length exceeds limits.");
return;
}

rbusMessage_FromBytes(&msg, data, dataLen);

rbusMessage_BeginMetaSectionRead(msg);
err = rbusMessage_GetString(msg, &event_name);
err = rbusMessage_GetString(msg, &object_name);
err = rbusMessage_GetInt32(msg, &is_rbus_flag);
rbusMessage_EndMetaSectionRead(msg);
if(RT_OK != err)
{
RBUSCORELOG_ERROR("Event message doesn't contain an event name.");
rbusMessage_Release(msg);
return;
}

if(is_rbus_flag)
{
if(g_master_event_callback)
{
err = g_master_event_callback(sender, event_name, msg, g_master_event_user_data);
if(err != RBUSCORE_ERROR_EVENT_NOT_HANDLED)
{
rbusMessage_Release(msg);
return;
}
}
else
{
RBUSCORELOG_ERROR("Received rbus event but no master callback registered yet.");
}
}

lock();
subs_len = rtVector_Size(g_event_subscriptions_for_client);
for(i = 0; i < subs_len; ++i)
{
client_subscription_t sub = rtVector_At(g_event_subscriptions_for_client, i);

if( strncmp(sub->object, sender, MAX_OBJECT_NAME_LENGTH) == 0 ||
strncmp(sub->object, event_name, MAX_OBJECT_NAME_LENGTH) == 0 ) /* support rbus events being elements : the object name will be the event name */
{
client_event_t evt = rtVector_Find(sub->events, event_name, client_event_compare);

if(evt)
{
unlock();
evt->callback(sender, event_name, msg, evt->data);
rbusMessage_Release(msg);
return;
}
/* support rbus events being elements : keep searching */
}
}
/* If no matching objects exist in records. Create a new entry.*/
unlock();
rbusMessage_Release(msg);
RBUSCORELOG_WARN("Received event %s::%s for which no subscription exists.", sender, event_name);
return;
}

static rbusCoreError_t remove_subscription_callback(const char * object_name, const char * event_name)
{
/*using namespace rbus_client;*/
Expand Down Expand Up @@ -1515,6 +1595,13 @@ static rbusCoreError_t rbus_subscribeToEventInternal(const char * object_name,
if(NULL == event_name)
event_name = DEFAULT_EVENT;

if(false == g_run_event_client_dispatch)
{
RBUSCORELOG_DEBUG("Starting event dispatching.");
rtConnection_AddDefaultListener(g_connection, master_event_callback, NULL);
g_run_event_client_dispatch = true;
}

if(g_master_event_callback == NULL)
{
sub = rtVector_Find(g_event_subscriptions_for_client, object_name, client_subscription_compare);
Expand Down Expand Up @@ -1725,25 +1812,24 @@ rbusCoreError_t rbus_unregisterClientDisconnectHandler()
return RBUSCORE_SUCCESS;
}

rbusCoreError_t rbuscore_publishDirectSubscriberEvent(const char * event_name, const char* listener, const void* data, uint32_t dataLength, uint32_t subscriptionId)
rbusCoreError_t rbuscore_publishDirectSubscriberEvent(const char * event_name, const char* listener, const void* data, uint32_t dataLength, uint32_t subscriptionId, bool rawData)
{
rtError err = RT_OK;

directServerLock();
const rtPrivateClientInfo *pPrivCliInfo = _rbuscore_find_server_privateconnection (event_name, listener);
if(pPrivCliInfo)
{
err = rtRouteDirect_SendMessage (pPrivCliInfo, data, dataLength, (char*)event_name, subscriptionId);
err = rtRouteDirect_SendMessage (pPrivCliInfo, data, dataLength, subscriptionId, rawData);
}
directServerUnlock();
return translate_rt_error(err);
}

rbusCoreError_t rbus_publishSubscriberEvent(const char* object_name, const char * event_name, const char* listener, rbusMessage out, uint32_t subscriptionId)
rbusCoreError_t rbus_publishSubscriberEvent(const char* object_name, const char * event_name, const char* listener, rbusMessage out, uint32_t subscriptionId, bool rawData)
{
/*using namespace rbus_server;*/
rbusCoreError_t ret = RBUSCORE_SUCCESS;
char topic[MAX_OBJECT_NAME_LENGTH] = {0};

if(NULL == event_name)
event_name = DEFAULT_EVENT;
Expand All @@ -1765,15 +1851,12 @@ rbusCoreError_t rbus_publishSubscriberEvent(const char* object_name, const char
uint8_t* data;
uint32_t dataLength;
rbusMessage_ToBytes(out, &data, &dataLength);
rtRouteDirect_SendMessage (pPrivCliInfo, data, dataLength, (char*)event_name, subscriptionId);
rtRouteDirect_SendMessage (pPrivCliInfo, data, dataLength, subscriptionId, rawData);
}
directServerUnlock();
if (!pPrivCliInfo)
{
lock();
snprintf(topic, MAX_OBJECT_NAME_LENGTH, "%d.%s", subscriptionId ,event_name);
if(topic[strlen(topic) - 1] == '.')
topic[strlen(topic) - 1] = '\0';
server_object_t obj = get_object(object_name);
if(NULL == obj)
{
Expand All @@ -1782,7 +1865,7 @@ rbusCoreError_t rbus_publishSubscriberEvent(const char* object_name, const char
ret = RBUSCORE_ERROR_INVALID_PARAM;
}

if(rbus_sendMessage(out, topic, object_name) != RBUSCORE_SUCCESS)
if(rbus_sendMessage(out, listener, object_name) != RBUSCORE_SUCCESS)
{
RBUSCORELOG_ERROR("Couldn't send event %s::%s to %s.", object_name, event_name, listener);
}
Expand Down Expand Up @@ -2888,6 +2971,7 @@ rbusCoreError_t rbuscore_openPrivateConnectionToProvider(rtConnection *pPrivateC
}
*pPrivateConn = connection;

rtConnection_AddDefaultListener(connection, master_event_callback, NULL);
RBUSCORELOG_DEBUG("pPrivateConn new = %p", connection);
rtMessage_Release(config);
}
Expand Down
4 changes: 2 additions & 2 deletions src/core/rbuscore.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,9 @@ rbusCoreError_t rbus_registerClientDisconnectHandler(rbus_client_disconnect_call
rbusCoreError_t rbus_unregisterClientDisconnectHandler();

/* Send an event message directly to a specific subscribe(e.g. listener) */
rbusCoreError_t rbus_publishSubscriberEvent(const char* object_name, const char * event_name, const char* listener, rbusMessage out, uint32_t subscriptionId);
rbusCoreError_t rbus_publishSubscriberEvent(const char* object_name, const char * event_name, const char* listener, rbusMessage out, uint32_t subscriptionId, bool rawData);

rbusCoreError_t rbuscore_publishDirectSubscriberEvent(const char * event_name, const char* listener, const void* data, uint32_t dataLength, uint32_t subscriptionId);
rbusCoreError_t rbuscore_publishDirectSubscriberEvent(const char * event_name, const char* listener, const void* data, uint32_t dataLength, uint32_t subscriptionId, bool rawData);

/*------ Convenience functions built on top of base functions above. ------*/

Expand Down
Loading

0 comments on commit 24ed328

Please sign in to comment.