Skip to content

Commit

Permalink
Merge branch 'Opendirect_rawdata' of github.com:gururaajar/rbus into …
Browse files Browse the repository at this point in the history
…Opendirect_rawdata
  • Loading branch information
Gururaaja E S R authored and Gururaaja E S R committed Nov 21, 2023
2 parents 029c658 + 865de81 commit c2ad9fa
Show file tree
Hide file tree
Showing 10 changed files with 117 additions and 93 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/unit_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,13 @@ jobs:
export LD_LIBRARY_PATH=$PREFIX/lib
nohup ./bin/rbusTestProvider >/tmp/plog.txt &
./bin/rbusTestConsumer -a
- name: Run multiRbusOpenMethod Unit test
- name: Run multiRbusOpenMethod Unit Test
run: |
cd install/usr
export PREFIX=$PWD
export LD_LIBRARY_PATH=$PREFIX/lib
nohup ./bin/multiRbusOpenMethodProvider >/tmp/log_multiRbusOpenMethodProvider.txt &
./bin/multiRbusOpenMethodConsumer
./bin/multiRbusOpenMethodProvider &
./bin/multiRbusOpenMethodConsumer &
- name: Run multiRbusOpenSubscribe Unit test
run: |
cd install/usr
Expand Down
3 changes: 2 additions & 1 deletion include/rbus.h
Original file line number Diff line number Diff line change
Expand Up @@ -1714,8 +1714,9 @@ rbusError_t rbusEvent_PublishRawData(
provider is responsible in sending the errorcode and error string.
other error like no method/handling issues, internal err
will be taken care by rbus.
consumer side errors returning (RBUS_ERROR_INVALID_HANDLE and RBUS_ERROR_INVALID_INPUT) will not contain outparams for input validation.
* @return RBus error code as defined by rbusError_t.
* Possible values are: RBUS_ERROR_SUCCESS, RBUS_ERROR_BUS_ERROR, RBUS_ERROR_INVALID_INPUT
* Possible values are: RBUS_ERROR_SUCCESS, RBUS_ERROR_BUS_ERROR, RBUS_ERROR_INVALID_INPUT, RBUS_ERROR_TIMEOUT, RBUS_ERROR_INVALID_METHOD, RBUS_ERROR_INVALID_HANDLE
* @ingroup Methods
*/
rbusError_t rbusMethod_Invoke(
Expand Down
41 changes: 35 additions & 6 deletions src/core/rbuscore.c
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ extern char* __progname;
static void _freePrivateServer(void* p);
static void _rbuscore_directconnection_load_from_cache();
static void _rbuscore_destroy_clientPrivate_connections();
const rtPrivateClientInfo* _rbuscore_find_server_privateconnection(const char *pParameterName, const char *pConsumerName);
/////

void server_method_create(server_method_t* meth, char const* name, rbus_callback_t callback, void* data)
Expand Down Expand Up @@ -810,7 +811,7 @@ rbusCoreError_t rbus_registerObj(const char * object_name, rbus_callback_t handl
server_object_create(&obj, object_name, handler, user_data);

//TODO: callback signature translation. rbusMessage uses a significantly wider signature for callbacks. Translate to something simpler.
err = rtConnection_AddListener(g_connection, object_name, onMessage, obj, RBUS_REGISTER_OBJECT_SUBSCRIPTION_ID);
err = rtConnection_AddListener(g_connection, object_name, RBUS_REGISTER_OBJECT_EXPRESSION_ID, onMessage, obj);

if(RT_OK == err)
{
Expand Down Expand Up @@ -956,7 +957,7 @@ rbusCoreError_t rbus_unregisterObj(const char * object_name)
return RBUSCORE_ERROR_INVALID_PARAM;
}

err = rtConnection_RemoveListenerWithId(g_connection, RBUS_REGISTER_OBJECT_SUBSCRIPTION_ID);
err = rtConnection_RemoveListener(g_connection, RBUS_REGISTER_OBJECT_EXPRESSION_ID);
if(RT_OK != err)
{
RBUSCORELOG_ERROR("rtConnection_RemoveListener %s failed: Err=%d", object_name, err);
Expand Down Expand Up @@ -1216,6 +1217,20 @@ rbusCoreError_t rbus_pullObj(const char * object_name, int timeout_millisecs, rb
return ret;
}

rbusCoreError_t rbus_sendData(const void* data, uint32_t dataLength, const char * topic)
{
rtError ret;

if(NULL == g_connection)
{
RBUSCORELOG_ERROR("Not connected.");
return RBUSCORE_ERROR_INVALID_STATE;
}

ret = rtConnection_SendBinaryDirect(g_connection, data, dataLength, topic, NULL);
return translate_rt_error(ret);
}

static rbusCoreError_t rbus_sendMessage(rbusMessage msg, const char * destination, const char * sender)
{
rtError ret;
Expand Down Expand Up @@ -1680,7 +1695,7 @@ rbusCoreError_t rbus_registerClientDisconnectHandler(rbus_client_disconnect_call
lock();
if(!g_advisory_listener_installed)
{
rtError err = rtConnection_AddListener(g_connection, RTMSG_ADVISORY_TOPIC, &rtrouted_advisory_callback, g_connection, RBUS_ADVISORY_SUBSCRIPTION_ID);
rtError err = rtConnection_AddListener(g_connection, RTMSG_ADVISORY_TOPIC, RBUS_ADVISORY_EXPRESSION_ID, &rtrouted_advisory_callback, g_connection);
if(err == RT_OK)
{
RBUSCORELOG_DEBUG("Listening for advisory messages");
Expand All @@ -1703,13 +1718,27 @@ rbusCoreError_t rbus_unregisterClientDisconnectHandler()
lock();
if(g_advisory_listener_installed)
{
rtConnection_RemoveListenerWithId(g_connection, RBUS_ADVISORY_SUBSCRIPTION_ID);
rtConnection_RemoveListener(g_connection, RBUS_ADVISORY_EXPRESSION_ID);
g_advisory_listener_installed = false;
}
unlock();
return RBUSCORE_SUCCESS;
}

rbusCoreError_t rbuscore_publishDirectSubscriberEvent(const char * event_name, const char* listener, const void* data, uint32_t dataLength, uint32_t subscriptionId)
{
rtError err = RT_OK;

lock();
const rtPrivateClientInfo *pPrivCliInfo = _rbuscore_find_server_privateconnection (event_name, listener);
if(pPrivCliInfo)
{
err = rtRouteDirect_SendMessage (pPrivCliInfo, data, dataLength, (char*)event_name, subscriptionId);
}
unlock();
return translate_rt_error(err);
}

rbusCoreError_t rbus_publishSubscriberEvent(const char* object_name, const char * event_name, const char* listener, rbusMessage out, uint32_t subscriptionId)
{
/*using namespace rbus_server;*/
Expand All @@ -1729,6 +1758,7 @@ rbusCoreError_t rbus_publishSubscriberEvent(const char* object_name, const char
rbusMessage_SetInt32(out, 1);/*is rbus 2.0*/
rbusMessage_EndMetaSectionWrite(out);

lock();
const rtPrivateClientInfo *pPrivCliInfo = _rbuscore_find_server_privateconnection (event_name, listener);
if(pPrivCliInfo)
{
Expand All @@ -1742,7 +1772,6 @@ rbusCoreError_t rbus_publishSubscriberEvent(const char* object_name, const char
snprintf(topic, MAX_OBJECT_NAME_LENGTH, "%d.%s", subscriptionId ,event_name);
if(topic[strlen(topic) - 1] == '.')
topic[strlen(topic) - 1] = '\0';
lock();
server_object_t obj = get_object(object_name);
if(NULL == obj)
{
Expand All @@ -1755,8 +1784,8 @@ rbusCoreError_t rbus_publishSubscriberEvent(const char* object_name, const char
{
RBUSCORELOG_ERROR("Couldn't send event %s::%s to %s.", object_name, event_name, listener);
}
unlock();
}
unlock();
return ret;
}

Expand Down
8 changes: 5 additions & 3 deletions src/core/rbuscore.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
#define MAX_SUPPORTED_METHODS 32
#define MAX_REGISTERED_OBJECTS 64
#define RBUS_OPEN_TELEMETRY_DATA_MAX 512
#define RBUS_REGISTER_OBJECT_SUBSCRIPTION_ID 2
#define RBUS_ADVISORY_SUBSCRIPTION_ID 3
#define RBUS_REGISTER_OBJECT_EXPRESSION_ID 2
#define RBUS_ADVISORY_EXPRESSION_ID 3

void rbus_getOpenTelemetryContext(const char **s, const char **t);
void rbus_setOpenTelemetryContext(const char *s, const char *t);
Expand Down Expand Up @@ -164,6 +164,8 @@ rbusCoreError_t rbus_unregisterClientDisconnectHandler();
/* Send an event message directly to a specific subscribe(e.g. listener) */
rbusCoreError_t rbus_publishSubscriberEvent(const char* object_name, const char * event_name, const char* listener, rbusMessage out, uint32_t subscriptionId);

rbusCoreError_t rbuscore_publishDirectSubscriberEvent(const char * event_name, const char* listener, const void* data, uint32_t dataLength, uint32_t subscriptionId);

/*------ Convenience functions built on top of base functions above. ------*/


Expand Down Expand Up @@ -229,7 +231,7 @@ rbusCoreError_t rbuscore_startPrivateListener(const char* pPrivateConnAddress, c
/* The Provider application to request to remove/close the instance of rtrouted in the given DML */
rbusCoreError_t rbuscore_updatePrivateListener(const char* pConsumerName, const char *pDMLName);

const rtPrivateClientInfo* _rbuscore_find_server_privateconnection(const char *pParameterName, const char *pConsumerName);
rbusCoreError_t rbus_sendData(const void* data, uint32_t dataLength, const char * topic);

#ifdef __cplusplus
}
Expand Down
42 changes: 18 additions & 24 deletions src/rbus/rbus.c
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ rbusError_t rbusOpenDirect_SubAdd(rbusHandle_t handle, rtVector eventSubs, char
errorcode = rbusMessage_RemoveListener(handle, rawDataTopic, subInternal->subscriptionId);
if (errorcode != RBUS_ERROR_SUCCESS)
{
RBUSLOG_WARN("rtConnection_RemoveListener failed err:%d", errorcode);
RBUSLOG_WARN("rbusMessage_RemoveListener failed err:%d", errorcode);
}
}
memset(rawDataTopic, '\0', strlen(rawDataTopic));
Expand Down Expand Up @@ -399,7 +399,7 @@ rbusError_t rbusCloseDirect_SubRemove(rbusHandle_t handle, rtVector eventSubs, c
errorcode = rbusMessage_RemovePrivateListener(handle, rawDataTopic, subInternal->subscriptionId);
if (errorcode != RBUS_ERROR_SUCCESS)
{
RBUSLOG_WARN("rtConnection_RemoveListener failed err:%d", errorcode);
RBUSLOG_WARN("rbusMessage_RemovePrivateListener failed err:%d", errorcode);
}
handle->m_connection = handle->m_connectionParent; /* changed the handle m_connection of direct connection to use normal m_connection and used the same to add the rawdatatopic for normal connection*/
memset(rawDataTopic, '\0', strlen(rawDataTopic));
Expand Down Expand Up @@ -5105,7 +5105,7 @@ rbusError_t rbusEvent_SubscribeRawData(
errorcode = rbusMessage_RemoveListener(handle, rawDataTopic, subInternal->subscriptionId);
if (errorcode != RT_OK)
{
RBUSLOG_WARN("rtConnection_RemoveListener:%d", errorcode);
RBUSLOG_WARN("rbusMessage_RemoveListener:%d", errorcode);
}
}
memset(rawDataTopic, '\0', strlen(rawDataTopic));
Expand Down Expand Up @@ -5393,7 +5393,7 @@ rbusError_t rbusEvent_SubscribeExRawData(
errorcode = rbusMessage_RemoveListener(handle, rawDataTopic, subInternal->subscriptionId);
if (errorcode != RT_OK)
{
RBUSLOG_WARN("rtConnection_RemoveListener:%d", errorcode);
RBUSLOG_WARN("rbusMessage_RemoveListener:%d", errorcode);
}
}
memset(rawDataTopic, '\0', strlen(rawDataTopic));
Expand Down Expand Up @@ -5585,7 +5585,7 @@ rbusError_t rbusEvent_UnsubscribeEx(
errorcode = rbusMessage_RemoveListener(handle, topic, subInternal->subscriptionId);
if (errorcode != RBUS_ERROR_SUCCESS)
{
RBUSLOG_WARN("rtConnection_RemoveListener failed err:%d", errorcode);
RBUSLOG_WARN("rbusMessage_RemoveListener failed err:%d", errorcode);
}
}
errorcode = _rbus_event_unsubscribe(handle, subInternal);
Expand Down Expand Up @@ -5653,7 +5653,7 @@ rbusError_t rbusEvent_PublishRawData(
rbusError_t rc = RBUS_ERROR_SUCCESS;
rtListItem listItem;
rbusSubscription_t* subscription;
rtError ret = RT_OK;
rbusCoreError_t err = RBUSCORE_SUCCESS;
char rawDataTopic[RBUS_MAX_NAME_LENGTH] = {0};

VERIFY_NULL(handle);
Expand All @@ -5678,6 +5678,8 @@ rbusError_t rbusEvent_PublishRawData(
{
return RBUS_ERROR_NOSUBSCRIBERS;
}

HANDLE_SUBS_MUTEX_LOCK(handle);
rtList_GetFront(el->subscriptions, &listItem);
while(listItem)
{
Expand All @@ -5689,21 +5691,14 @@ rbusError_t rbusEvent_PublishRawData(
rc = RBUS_ERROR_BUS_ERROR;
rtListItem_GetNext(listItem, &listItem);
}
const rtPrivateClientInfo *pPrivCliInfo = _rbuscore_find_server_privateconnection (subscription->eventName, subscription->listener);
if(pPrivCliInfo && subscription->rawData)
{
ret = rtRouteDirect_SendMessage (pPrivCliInfo, eventData->rawData, eventData->rawDataLen, subscription->eventName, subscription->subscriptionId);
if(RT_OK != ret)
{
rc = RBUS_ERROR_BUS_ERROR;
}
}
err = rbuscore_publishDirectSubscriberEvent(subscription->eventName, subscription->listener, eventData->rawData, eventData->rawDataLen, subscription->subscriptionId);
rc = rbusCoreError_to_rbusError(err);
rtListItem_GetNext(listItem, &listItem);
}
HANDLE_SUBS_MUTEX_UNLOCK(handle);
snprintf(rawDataTopic, RBUS_MAX_NAME_LENGTH, "rawdata.%s", eventData->name);
ret = rtConnection_SendBinary(handleInfo->m_connection, eventData->rawData, eventData->rawDataLen, rawDataTopic);
if(RT_OK != ret)
rc = RBUS_ERROR_BUS_ERROR;
err = rbus_sendData(eventData->rawData, eventData->rawDataLen, rawDataTopic);
rc = rbusCoreError_to_rbusError(err);
return rc;
}

Expand Down Expand Up @@ -5920,9 +5915,9 @@ rbusError_t rbusMethod_Invoke(
rbusObject_t* outParams)
{
VERIFY_HANDLE(handle);
struct _rbusHandle* handleInfo = (struct _rbusHandle*)handle;
VERIFY_NULL(handle);
VERIFY_NULL(methodName);

struct _rbusHandle* handleInfo = (struct _rbusHandle*)handle;

if (handleInfo->m_handleType != RBUS_HWDL_TYPE_REGULAR)
return RBUS_ERROR_INVALID_HANDLE;
Expand Down Expand Up @@ -5972,15 +5967,14 @@ rbusError_t rbusMethod_InvokeAsync(
int timeout)
{
VERIFY_HANDLE(handle);
VERIFY_NULL(methodName);
VERIFY_NULL(callback);

struct _rbusHandle* handleInfo = (struct _rbusHandle*)handle;
pthread_t pid;
rbusMethodInvokeAsyncData_t* data;
int err = 0;

VERIFY_NULL(handle);
VERIFY_NULL(methodName);
VERIFY_NULL(callback);

if (handleInfo->m_handleType != RBUS_HWDL_TYPE_REGULAR)
return RBUS_ERROR_INVALID_HANDLE;

Expand Down
10 changes: 5 additions & 5 deletions src/rbus/rbus_message.c
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ rbusError_t rbusMessage_AddPrivateListener(
rtVector_PushBack(handle->messageCallbacks, ctx);

snprintf(rawDataTopic, RBUS_MAX_NAME_LENGTH, "%d.%s", subscriptionId, expression);
rtError e = rtConnection_AddListener(myConn, rawDataTopic, &rtMessage_CallbackHandler, ctx, subscriptionId);
rtError e = rtConnection_AddListener(myConn, rawDataTopic, subscriptionId, &rtMessage_CallbackHandler, ctx);
RBUS_MESSAGE_MUTEX_UNLOCK();
if (e != RT_OK)
{
Expand Down Expand Up @@ -178,7 +178,7 @@ rbusError_t rbusMessage_AddListener(
RBUS_MESSAGE_MUTEX_LOCK();
rtVector_PushBack(handle->messageCallbacks, ctx);

rtError e = rtConnection_AddListener(con, expression, &rtMessage_CallbackHandler, ctx, subscriptionId);
rtError e = rtConnection_AddListener(con, expression, subscriptionId, &rtMessage_CallbackHandler, ctx);
RBUS_MESSAGE_MUTEX_UNLOCK();
if (e != RT_OK)
{
Expand Down Expand Up @@ -207,7 +207,7 @@ rbusError_t rbusMessage_RemovePrivateListener(
RBUS_MESSAGE_MUTEX_LOCK();
rtVector_RemoveItemByCompare(handle->messageCallbacks, rawDataTopic, compareContextExpression, cleanupContext);

rtError e = rtConnection_RemoveListenerWithId(myConn, subscriptionId);
rtError e = rtConnection_RemoveListener(myConn, subscriptionId);
RBUS_MESSAGE_MUTEX_UNLOCK();
if (e != RT_OK)
{
Expand All @@ -229,7 +229,7 @@ rbusError_t rbusMessage_RemoveListener(
RBUS_MESSAGE_MUTEX_LOCK();
rtVector_RemoveItemByCompare(handle->messageCallbacks, expression, compareContextExpression, cleanupContext);

rtError e = rtConnection_RemoveListenerWithId(con, subscriptionId);
rtError e = rtConnection_RemoveListener(con, subscriptionId);
RBUS_MESSAGE_MUTEX_UNLOCK();
if (e != RT_OK)
{
Expand All @@ -252,7 +252,7 @@ rbusError_t rbusMessage_RemoveAllListeners(
{
rbusMessageHandlerContext_t* ctx = rtVector_At(handle->messageCallbacks, i);
VERIFY_NULL(ctx);
rtError e = rtConnection_RemoveListenerWithId(con, ctx->subscriptionId);
rtError e = rtConnection_RemoveListener(con, ctx->subscriptionId);
if (e != RT_OK)
{
RBUSLOG_WARN("rbusMessage_RemoveAllListener %s :%s", ctx->expression, rtStrError(e));
Expand Down
12 changes: 6 additions & 6 deletions src/rtmessage/rtConnection.c
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ typedef volatile int atomic_uint_least32_t;
#include <sys/time.h>

#define RTMSG_LISTENERS_MAX 128
#define RTCONNECTION_CREATE_SUBSCRIPTION_ID 1
#define RTCONNECTION_CREATE_EXPRESSION_ID 1
#ifdef RDKC_BUILD
#define RTMSG_SEND_BUFFER_SIZE (1024 * 8)
#else
Expand Down Expand Up @@ -599,7 +599,7 @@ rtConnection_CreateInternal(rtConnection* con, char const* application_name, cha

if (err == RT_OK)
{
rtConnection_AddListener(c, c->inbox_name, onDefaultMessage, c, RTCONNECTION_CREATE_SUBSCRIPTION_ID);
rtConnection_AddListener(c, c->inbox_name, RTCONNECTION_CREATE_EXPRESSION_ID, onDefaultMessage, c);
rtConnection_StartThreads(c);
*con = c;
}
Expand Down Expand Up @@ -1223,7 +1223,7 @@ rtConnection_SendInternal(rtConnection con, uint8_t const* buff, uint32_t n, cha
}

rtError
rtConnection_AddListener(rtConnection con, char const* expression, rtMessageCallback callback, void* closure, uint32_t subscriptionId)
rtConnection_AddListener(rtConnection con, char const* expression, uint32_t expressionId, rtMessageCallback callback, void* closure)
{
int i;

Expand All @@ -1246,7 +1246,7 @@ rtConnection_AddListener(rtConnection con, char const* expression, rtMessageCall
}

con->listeners[i].in_use = 1;
con->listeners[i].subscription_id = subscriptionId;
con->listeners[i].subscription_id = expressionId;
con->listeners[i].closure = closure;
con->listeners[i].callback = callback;
con->listeners[i].expression = strdup(expression);
Expand All @@ -1264,7 +1264,7 @@ rtConnection_AddListener(rtConnection con, char const* expression, rtMessageCall
}

rtError
rtConnection_RemoveListenerWithId(rtConnection con, uint32_t listenerId)
rtConnection_RemoveListener(rtConnection con, uint32_t expressionId)
{
int i;
int route_id = 0;
Expand All @@ -1276,7 +1276,7 @@ rtConnection_RemoveListenerWithId(rtConnection con, uint32_t listenerId)
pthread_mutex_lock(&con->mutex);
for (i = 0; i < RTMSG_LISTENERS_MAX; ++i)
{
if ((con->listeners[i].in_use) && (listenerId == con->listeners[i].subscription_id))
if ((con->listeners[i].in_use) && (expressionId == con->listeners[i].subscription_id))
{
strncpy(expression, con->listeners[i].expression, RTMSG_HEADER_MAX_TOPIC_LENGTH);
con->listeners[i].in_use = 0;
Expand Down
Loading

0 comments on commit c2ad9fa

Please sign in to comment.