Skip to content

Commit

Permalink
RDKB-000 : RawData Direct mode
Browse files Browse the repository at this point in the history
Reason for change: RawData Direct mode
Test Procedure: Test and verified with rbuscli and testapp
Risks: High
Priority: P1
Signed-off-by: Gururaaja ESR <[email protected]>
  • Loading branch information
Gururaaja E S R authored and Gururaaja E S R committed Oct 24, 2023
1 parent bb53bef commit 3a791ac
Show file tree
Hide file tree
Showing 20 changed files with 672 additions and 292 deletions.
3 changes: 2 additions & 1 deletion include/rbus.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ typedef enum _rbusError
RBUS_ERROR_INVALID_METHOD, /**< Invalid Method */
RBUS_ERROR_NOSUBSCRIBERS, /**< No subscribers present */
RBUS_ERROR_SUBSCRIPTION_ALREADY_EXIST, /**< The subscription already exists*/
RBUS_ERROR_INVALID_NAMESPACE /**< Invalid namespace as per standard */
RBUS_ERROR_INVALID_NAMESPACE, /**< Invalid namespace as per standard */
RBUS_ERROR_DIRECT_CON_NOT_EXIST /**< Direct connection not exist */
} rbusError_t;


Expand Down
42 changes: 40 additions & 2 deletions include/rbus_message.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,27 @@ typedef void (*rbusMessageHandler_t)(
rbusMessage_t* message,
void* userData);

/** @fn rbusError_t rbusMessage_AddPrivateListener(
* rbusHandle_t handle,
* char const* expression,
* rbusMessageHandler_t callback,
* void * userData)
* @brief Add a message for private listener created for direct mode.
* @param handle Bus Handle
* @param expression A topic or a topic expression
* @param handler The message callback handler
* @param userData User data to be passed back to the callback handler
* @return RBus error code as defined by rbusError_t.
* Possible errors are: RBUS_ERROR_BUS_ERROR
*/
rbusError_t rbusMessage_AddPrivateListener(
rbusHandle_t handle,
char const* expression,
rbusMessageHandler_t handler,
void* userData,
uint32_t subscriptionId,
uint32_t* listenerId);

/** @fn rbusError_t rbusMessage_AddListener(
* rbusHandle_t handle,
* char const* expression,
Expand All @@ -93,7 +114,23 @@ rbusError_t rbusMessage_AddListener(
rbusHandle_t handle,
char const* expression,
rbusMessageHandler_t handler,
void* userData);
void* userData,
uint32_t* listenerId);

/** @fn rbusError_t rbusMessage_RemovePrivateListener(
* rbusHandle_t handle,
* char const* expression)
* @brief Remove a message for private listener created for direct mode.
* @param handle Bus Handle
* @param expression A topic or a topic expression
* @return RBus error code as defined by rbusError_t.
* Possible errors are: RBUS_ERROR_BUS_ERROR
*/
rbusError_t rbusMessage_RemovePrivateListener(
rbusHandle_t handle,
char const* expression,
uint32_t subscriptionId,
uint32_t listenerId);

/** @fn rbusError_t rbusMessage_RemoveListener(
* rbusHandle_t handle,
Expand All @@ -106,7 +143,8 @@ rbusError_t rbusMessage_AddListener(
*/
rbusError_t rbusMessage_RemoveListener(
rbusHandle_t handle,
char const* expression);
char const* expression,
uint32_t listenerId);

/** @fn rbusError_t rbusMessage_RemoveAllListeners(
* rbusHandle_t handle,
Expand Down
3 changes: 2 additions & 1 deletion sampleapps/message/rbusMessageListener.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ int main()
{
rbusError_t err;
rbusHandle_t rbus;
uint32_t listenerId;

err = rbus_open(&rbus, "rbus_recv");
if (err)
Expand All @@ -49,7 +50,7 @@ int main()
return 1;
}

rbusMessage_AddListener(rbus, "A.B.C", &rbusMessageHandler, NULL);
rbusMessage_AddListener(rbus, "A.B.C", &rbusMessageHandler, NULL, &listenerId);

while (running)
{
Expand Down
132 changes: 28 additions & 104 deletions src/core/rbuscore.c
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ extern char* __progname;
static void _freePrivateServer(void* p);
static void _rbuscore_directconnection_load_from_cache();
static void _rbuscore_destroy_clientPrivate_connections();
const rtPrivateClientInfo* _rbuscore_find_server_privateconnection(const char *pParameterName, const char *pConsumerName);
/////

void server_method_create(server_method_t* meth, char const* name, rbus_callback_t callback, void* data)
Expand Down Expand Up @@ -326,7 +325,6 @@ static pthread_mutex_t g_mutex;
static pthread_mutex_t g_directCliMutex;
static pthread_mutex_t g_directServMutex;
static int g_mutex_init = 0;
static bool g_run_event_client_dispatch = false;
static rtVector g_event_subscriptions_for_client; /*client_subscription_t list. Used by the subscriber to track all active subscriptions. */
static rtVector g_queued_requests; /*list of queued_request */

Expand Down Expand Up @@ -370,7 +368,7 @@ static int unlock()
return pthread_mutex_unlock(&g_mutex);
}

static rbusCoreError_t send_subscription_request(const char * object_name, const char * event_name, bool activate, const rbusMessage payload, int* providerError, int timeout, bool publishOnSubscribe, rbusMessage *response);
static rbusCoreError_t send_subscription_request(const char * object_name, const char * event_name, bool activate, const rbusMessage payload, int* providerError, int timeout, bool publishOnSubscribe, rbusMessage *response, bool rawData);

static void perform_init()
{
Expand Down Expand Up @@ -410,7 +408,7 @@ static void perform_cleanup()
for(i2 = 0; i2 < sz2; i2++)
{
client_event_t event = rtVector_At(sub->events, i2);
send_subscription_request(sub->object, event->name, false, NULL, NULL, 0, false, NULL);
send_subscription_request(sub->object, event->name, false, NULL, NULL, 0, false, NULL, false);
}
}
lock();
Expand Down Expand Up @@ -698,7 +696,7 @@ rtConnection rbus_getConnection()
return g_connection;
}

static rbusCoreError_t send_subscription_request(const char * object_name, const char * event_name, bool activate, const rbusMessage payload, int* providerError, int timeout_ms, bool publishOnSubscribe, rbusMessage *response)
static rbusCoreError_t send_subscription_request(const char * object_name, const char * event_name, bool activate, const rbusMessage payload, int* providerError, int timeout_ms, bool publishOnSubscribe, rbusMessage *response, bool rawData)
{
/* Method definition to add new event subscription:
* method name: METHOD_ADD_EVENT_SUBSCRIPTION / METHOD_REMOVE_EVENT_SUBSCRIPTION.
Expand All @@ -719,6 +717,10 @@ static rbusCoreError_t send_subscription_request(const char * object_name, const
rbusMessage_SetInt32(request, 1); /*for publishOnSubscribe */
else
rbusMessage_SetInt32(request, 0);
if(rawData)
rbusMessage_SetInt32(request, 1); /*for rawDataSubscription */
else
rbusMessage_SetInt32(request, 0);

if(timeout_ms <= 0)
timeout_ms = TIMEOUT_VALUE_FIRE_AND_FORGET;
Expand Down Expand Up @@ -776,6 +778,8 @@ rbusCoreError_t rbus_registerObj(const char * object_name, rbus_callback_t handl
{
rtError err = RT_OK;
server_object_t obj = NULL;
uint32_t Id;
uint32_t *listenerId = &Id;

if(NULL == g_connection)
{
Expand Down Expand Up @@ -808,7 +812,7 @@ rbusCoreError_t rbus_registerObj(const char * object_name, rbus_callback_t handl
server_object_create(&obj, object_name, handler, user_data);

//TODO: callback signature translation. rbusMessage uses a significantly wider signature for callbacks. Translate to something simpler.
err = rtConnection_AddListener(g_connection, object_name, onMessage, obj);
err = rtConnection_AddListener(g_connection, object_name, onMessage, obj, &listenerId, 0);

if(RT_OK == err)
{
Expand Down Expand Up @@ -1428,85 +1432,6 @@ rbusCoreError_t rbus_unregisterEvent(const char* object_name, const char * event
return ret;
}

static void master_event_callback(rtMessageHeader const* hdr, uint8_t const* data, uint32_t dataLen, void* closure)
{
/*using namespace rbus_client;*/
rbusMessage msg = NULL;
const char * sender = hdr->reply_topic;
const char * event_name = NULL;
const char * object_name = NULL;
int32_t is_rbus_flag = 1;
rtError err;
size_t subs_len;
size_t i;
(void)closure;

/*Sanitize the incoming data.*/
if(MAX_OBJECT_NAME_LENGTH <= strlen(sender))
{
RBUSCORELOG_ERROR("Object name length exceeds limits.");
return;
}

rbusMessage_FromBytes(&msg, data, dataLen);

rbusMessage_BeginMetaSectionRead(msg);
err = rbusMessage_GetString(msg, &event_name);
err = rbusMessage_GetString(msg, &object_name);
err = rbusMessage_GetInt32(msg, &is_rbus_flag);
rbusMessage_EndMetaSectionRead(msg);
if(RT_OK != err)
{
RBUSCORELOG_ERROR("Event message doesn't contain an event name.");
rbusMessage_Release(msg);
return;
}

if(is_rbus_flag)
{
if(g_master_event_callback)
{
err = g_master_event_callback(sender, event_name, msg, g_master_event_user_data);
if(err != RBUSCORE_ERROR_EVENT_NOT_HANDLED)
{
rbusMessage_Release(msg);
return;
}
}
else
{
RBUSCORELOG_ERROR("Received rbus event but no master callback registered yet.");
}
}

lock();
subs_len = rtVector_Size(g_event_subscriptions_for_client);
for(i = 0; i < subs_len; ++i)
{
client_subscription_t sub = rtVector_At(g_event_subscriptions_for_client, i);

if( strncmp(sub->object, sender, MAX_OBJECT_NAME_LENGTH) == 0 ||
strncmp(sub->object, event_name, MAX_OBJECT_NAME_LENGTH) == 0 ) /* support rbus events being elements : the object name will be the event name */
{
client_event_t evt = rtVector_Find(sub->events, event_name, client_event_compare);

if(evt)
{
unlock();
evt->callback(sender, event_name, msg, evt->data);
rbusMessage_Release(msg);
return;
}
/* support rbus events being elements : keep searching */
}
}
/* If no matching objects exist in records. Create a new entry.*/
unlock();
rbusMessage_Release(msg);
RBUSCORELOG_WARN("Received event %s::%s for which no subscription exists.", sender, event_name);
return;
}

static rbusCoreError_t remove_subscription_callback(const char * object_name, const char * event_name)
{
/*using namespace rbus_client;*/
Expand Down Expand Up @@ -1539,7 +1464,7 @@ static rbusCoreError_t remove_subscription_callback(const char * object_name, c
return ret;
}

static rbusCoreError_t rbus_subscribeToEventInternal(const char * object_name, const char * event_name, rbus_event_callback_t callback, const rbusMessage payload, void * user_data, int* providerError, int timeout, bool publishOnSubscribe, rbusMessage *response)
static rbusCoreError_t rbus_subscribeToEventInternal(const char * object_name, const char * event_name, rbus_event_callback_t callback, const rbusMessage payload, void * user_data, int* providerError, int timeout, bool publishOnSubscribe, rbusMessage *response, bool rawData)
{
/*using namespace rbus_client;*/
rbusCoreError_t ret = RBUSCORE_SUCCESS;
Expand Down Expand Up @@ -1577,13 +1502,6 @@ static rbusCoreError_t rbus_subscribeToEventInternal(const char * object_name,
if(NULL == event_name)
event_name = DEFAULT_EVENT;

if(false == g_run_event_client_dispatch)
{
RBUSCORELOG_DEBUG("Starting event dispatching.");
rtConnection_AddDefaultListener(g_connection, master_event_callback, NULL);
g_run_event_client_dispatch = true;
}

if(g_master_event_callback == NULL)
{
sub = rtVector_Find(g_event_subscriptions_for_client, object_name, client_subscription_compare);
Expand Down Expand Up @@ -1612,7 +1530,7 @@ static rbusCoreError_t rbus_subscribeToEventInternal(const char * object_name,

unlock();

if((ret = send_subscription_request(object_name, event_name, true, payload, providerError, timeout, publishOnSubscribe, response)) != RBUSCORE_SUCCESS)
if((ret = send_subscription_request(object_name, event_name, true, payload, providerError, timeout, publishOnSubscribe, response, rawData)) != RBUSCORE_SUCCESS)
{
if(g_master_event_callback == NULL)
{
Expand All @@ -1627,15 +1545,15 @@ static rbusCoreError_t rbus_subscribeToEventInternal(const char * object_name,

rbusCoreError_t rbus_subscribeToEvent(const char * object_name, const char * event_name, rbus_event_callback_t callback, const rbusMessage payload, void * user_data, int* providerError)
{
return rbus_subscribeToEventInternal(object_name, event_name, callback, payload, user_data, providerError, 0, false, NULL);
return rbus_subscribeToEventInternal(object_name, event_name, callback, payload, user_data, providerError, 0, false, NULL, false);
}

rbusCoreError_t rbus_subscribeToEventTimeout(const char * object_name, const char * event_name, rbus_event_callback_t callback, const rbusMessage payload, void * user_data, int* providerError, int timeout, bool publishOnSubscribe, rbusMessage *response)
rbusCoreError_t rbus_subscribeToEventTimeout(const char * object_name, const char * event_name, rbus_event_callback_t callback, const rbusMessage payload, void * user_data, int* providerError, int timeout, bool publishOnSubscribe, rbusMessage *response, bool rawData)
{
return rbus_subscribeToEventInternal(object_name, event_name, callback, payload, user_data, providerError, timeout, publishOnSubscribe, response);
return rbus_subscribeToEventInternal(object_name, event_name, callback, payload, user_data, providerError, timeout, publishOnSubscribe, response, rawData);
}

rbusCoreError_t rbus_unsubscribeFromEvent(const char * object_name, const char * event_name, const rbusMessage payload)
rbusCoreError_t rbus_unsubscribeFromEvent(const char * object_name, const char * event_name, const rbusMessage payload, bool rawData)
{
rbusCoreError_t ret = RBUSCORE_ERROR_INVALID_PARAM;

Expand All @@ -1658,7 +1576,7 @@ rbusCoreError_t rbus_unsubscribeFromEvent(const char * object_name, const char

if(!g_master_event_callback)
remove_subscription_callback(object_name, event_name);
ret = send_subscription_request(object_name, event_name, false, payload, NULL, 0, false, NULL);
ret = send_subscription_request(object_name, event_name, false, payload, NULL, 0, false, NULL, rawData);
return ret;
}

Expand Down Expand Up @@ -1761,10 +1679,12 @@ rbusCoreError_t rbus_registerMasterEventHandler(rbus_event_callback_t callback,
}
rbusCoreError_t rbus_registerClientDisconnectHandler(rbus_client_disconnect_callback_t callback)
{
uint32_t Id;
uint32_t *listenerId = &Id;
lock();
if(!g_advisory_listener_installed)
{
rtError err = rtConnection_AddListener(g_connection, RTMSG_ADVISORY_TOPIC, &rtrouted_advisory_callback, g_connection);
rtError err = rtConnection_AddListener(g_connection, RTMSG_ADVISORY_TOPIC, &rtrouted_advisory_callback, g_connection, &listenerId, 0);
if(err == RT_OK)
{
RBUSCORELOG_DEBUG("Listening for advisory messages");
Expand Down Expand Up @@ -1794,10 +1714,12 @@ rbusCoreError_t rbus_unregisterClientDisconnectHandler()
return RBUSCORE_SUCCESS;
}

rbusCoreError_t rbus_publishSubscriberEvent(const char* object_name, const char * event_name, const char* listener, rbusMessage out)
rbusCoreError_t rbus_publishSubscriberEvent(const char* object_name, const char * event_name, const char* listener, rbusMessage out, uint32_t subscriptionId)
{
/*using namespace rbus_server;*/
rbusCoreError_t ret = RBUSCORE_SUCCESS;
char topic[MAX_OBJECT_NAME_LENGTH] = {0};

if(NULL == event_name)
event_name = DEFAULT_EVENT;
if(MAX_OBJECT_NAME_LENGTH <= strnlen(object_name, MAX_OBJECT_NAME_LENGTH))
Expand All @@ -1817,10 +1739,13 @@ rbusCoreError_t rbus_publishSubscriberEvent(const char* object_name, const char
uint8_t* data;
uint32_t dataLength;
rbusMessage_ToBytes(out, &data, &dataLength);
rtRouteDirect_SendMessage (pPrivCliInfo, data, dataLength);
rtRouteDirect_SendMessage (pPrivCliInfo, data, dataLength, false, (char*)event_name, subscriptionId);
}
else
{
snprintf(topic, MAX_OBJECT_NAME_LENGTH, "%d.%s", subscriptionId ,event_name);
if(topic[strlen(topic) - 1] == '.')
topic[strlen(topic) - 1] = '\0';
lock();
server_object_t obj = get_object(object_name);
if(NULL == obj)
Expand All @@ -1830,7 +1755,7 @@ rbusCoreError_t rbus_publishSubscriberEvent(const char* object_name, const char
ret = RBUSCORE_ERROR_INVALID_PARAM;
}

if(rbus_sendMessage(out, listener, object_name) != RBUSCORE_SUCCESS)
if(rbus_sendMessage(out, topic, object_name) != RBUSCORE_SUCCESS)
{
RBUSCORELOG_ERROR("Couldn't send event %s::%s to %s.", object_name, event_name, listener);
}
Expand Down Expand Up @@ -2938,7 +2863,6 @@ rbusCoreError_t rbuscore_openPrivateConnectionToProvider(rtConnection *pPrivateC
}
*pPrivateConn = connection;

rtConnection_AddDefaultListener(connection, master_event_callback, NULL);
RBUSCORELOG_DEBUG("pPrivateConn new = %p", connection);
rtMessage_Release(config);
}
Expand Down
Loading

0 comments on commit 3a791ac

Please sign in to comment.