diff --git a/include/rbus.h b/include/rbus.h index 49d827a8..0688d80e 100644 --- a/include/rbus.h +++ b/include/rbus.h @@ -127,7 +127,8 @@ typedef enum _rbusError RBUS_ERROR_INVALID_METHOD, /**< Invalid Method */ RBUS_ERROR_NOSUBSCRIBERS, /**< No subscribers present */ RBUS_ERROR_SUBSCRIPTION_ALREADY_EXIST, /**< The subscription already exists*/ - RBUS_ERROR_INVALID_NAMESPACE /**< Invalid namespace as per standard */ + RBUS_ERROR_INVALID_NAMESPACE, /**< Invalid namespace as per standard */ + RBUS_ERROR_DIRECT_CON_NOT_EXIST /**< Direct connection not exist */ } rbusError_t; diff --git a/include/rbus_message.h b/include/rbus_message.h index ad685ad1..95e2d96f 100644 --- a/include/rbus_message.h +++ b/include/rbus_message.h @@ -76,6 +76,26 @@ typedef void (*rbusMessageHandler_t)( rbusMessage_t* message, void* userData); +/** @fn rbusError_t rbusMessage_AddPrivateListener( + * rbusHandle_t handle, + * char const* expression, + * rbusMessageHandler_t callback, + * void * userData) + * @brief Add a message for private listener created for direct mode. + * @param handle Bus Handle + * @param expression A topic or a topic expression + * @param handler The message callback handler + * @param userData User data to be passed back to the callback handler + * @return RBus error code as defined by rbusError_t. + * Possible errors are: RBUS_ERROR_BUS_ERROR + */ +rbusError_t rbusMessage_AddPrivateListener( + rbusHandle_t handle, + char const* expression, + rbusMessageHandler_t handler, + void* userData, + uint32_t subscriptionId); + /** @fn rbusError_t rbusMessage_AddListener( * rbusHandle_t handle, * char const* expression, @@ -93,7 +113,22 @@ rbusError_t rbusMessage_AddListener( rbusHandle_t handle, char const* expression, rbusMessageHandler_t handler, - void* userData); + void* userData, + uint32_t subscriptionId); + +/** @fn rbusError_t rbusMessage_RemovePrivateListener( + * rbusHandle_t handle, + * char const* expression) + * @brief Remove a message for private listener created for direct mode. + * @param handle Bus Handle + * @param expression A topic or a topic expression + * @return RBus error code as defined by rbusError_t. + * Possible errors are: RBUS_ERROR_BUS_ERROR + */ +rbusError_t rbusMessage_RemovePrivateListener( + rbusHandle_t handle, + char const* expression, + uint32_t subscriptionId); /** @fn rbusError_t rbusMessage_RemoveListener( * rbusHandle_t handle, @@ -106,7 +141,8 @@ rbusError_t rbusMessage_AddListener( */ rbusError_t rbusMessage_RemoveListener( rbusHandle_t handle, - char const* expression); + char const* expression, + uint32_t subscriptionId); /** @fn rbusError_t rbusMessage_RemoveAllListeners( * rbusHandle_t handle, diff --git a/sampleapps/message/rbusMessageListener.c b/sampleapps/message/rbusMessageListener.c index 6daa23c5..e7bf3645 100644 --- a/sampleapps/message/rbusMessageListener.c +++ b/sampleapps/message/rbusMessageListener.c @@ -49,7 +49,7 @@ int main() return 1; } - rbusMessage_AddListener(rbus, "A.B.C", &rbusMessageHandler, NULL); + rbusMessage_AddListener(rbus, "A.B.C", &rbusMessageHandler, NULL, 0); while (running) { diff --git a/src/core/rbuscore.c b/src/core/rbuscore.c index 6d1d670a..17f94ecb 100644 --- a/src/core/rbuscore.c +++ b/src/core/rbuscore.c @@ -100,7 +100,6 @@ extern char* __progname; static void _freePrivateServer(void* p); static void _rbuscore_directconnection_load_from_cache(); static void _rbuscore_destroy_clientPrivate_connections(); -const rtPrivateClientInfo* _rbuscore_find_server_privateconnection(const char *pParameterName, const char *pConsumerName); ///// void server_method_create(server_method_t* meth, char const* name, rbus_callback_t callback, void* data) @@ -326,7 +325,6 @@ static pthread_mutex_t g_mutex; static pthread_mutex_t g_directCliMutex; static pthread_mutex_t g_directServMutex; static int g_mutex_init = 0; -static bool g_run_event_client_dispatch = false; static rtVector g_event_subscriptions_for_client; /*client_subscription_t list. Used by the subscriber to track all active subscriptions. */ static rtVector g_queued_requests; /*list of queued_request */ @@ -370,7 +368,7 @@ static int unlock() return pthread_mutex_unlock(&g_mutex); } -static rbusCoreError_t send_subscription_request(const char * object_name, const char * event_name, bool activate, const rbusMessage payload, int* providerError, int timeout, bool publishOnSubscribe, rbusMessage *response); +static rbusCoreError_t send_subscription_request(const char * object_name, const char * event_name, bool activate, const rbusMessage payload, int* providerError, int timeout, bool publishOnSubscribe, rbusMessage *response, bool rawData); static void perform_init() { @@ -410,7 +408,7 @@ static void perform_cleanup() for(i2 = 0; i2 < sz2; i2++) { client_event_t event = rtVector_At(sub->events, i2); - send_subscription_request(sub->object, event->name, false, NULL, NULL, 0, false, NULL); + send_subscription_request(sub->object, event->name, false, NULL, NULL, 0, false, NULL, false); } } lock(); @@ -698,7 +696,7 @@ rtConnection rbus_getConnection() return g_connection; } -static rbusCoreError_t send_subscription_request(const char * object_name, const char * event_name, bool activate, const rbusMessage payload, int* providerError, int timeout_ms, bool publishOnSubscribe, rbusMessage *response) +static rbusCoreError_t send_subscription_request(const char * object_name, const char * event_name, bool activate, const rbusMessage payload, int* providerError, int timeout_ms, bool publishOnSubscribe, rbusMessage *response, bool rawData) { /* Method definition to add new event subscription: * method name: METHOD_ADD_EVENT_SUBSCRIPTION / METHOD_REMOVE_EVENT_SUBSCRIPTION. @@ -719,6 +717,10 @@ static rbusCoreError_t send_subscription_request(const char * object_name, const rbusMessage_SetInt32(request, 1); /*for publishOnSubscribe */ else rbusMessage_SetInt32(request, 0); + if(rawData) + rbusMessage_SetInt32(request, 1); /*for rawDataSubscription */ + else + rbusMessage_SetInt32(request, 0); if(timeout_ms <= 0) timeout_ms = TIMEOUT_VALUE_FIRE_AND_FORGET; @@ -808,7 +810,7 @@ rbusCoreError_t rbus_registerObj(const char * object_name, rbus_callback_t handl server_object_create(&obj, object_name, handler, user_data); //TODO: callback signature translation. rbusMessage uses a significantly wider signature for callbacks. Translate to something simpler. - err = rtConnection_AddListener(g_connection, object_name, onMessage, obj); + err = rtConnection_AddListener(g_connection, object_name, onMessage, obj, RBUS_REGISTER_OBJECT_SUBSCRIPTION_ID); if(RT_OK == err) { @@ -954,7 +956,7 @@ rbusCoreError_t rbus_unregisterObj(const char * object_name) return RBUSCORE_ERROR_INVALID_PARAM; } - err = rtConnection_RemoveListener(g_connection, object_name); + err = rtConnection_RemoveListenerWithId(g_connection, RBUS_REGISTER_OBJECT_SUBSCRIPTION_ID); if(RT_OK != err) { RBUSCORELOG_ERROR("rtConnection_RemoveListener %s failed: Err=%d", object_name, err); @@ -1428,85 +1430,6 @@ rbusCoreError_t rbus_unregisterEvent(const char* object_name, const char * event return ret; } -static void master_event_callback(rtMessageHeader const* hdr, uint8_t const* data, uint32_t dataLen, void* closure) -{ - /*using namespace rbus_client;*/ - rbusMessage msg = NULL; - const char * sender = hdr->reply_topic; - const char * event_name = NULL; - const char * object_name = NULL; - int32_t is_rbus_flag = 1; - rtError err; - size_t subs_len; - size_t i; - (void)closure; - - /*Sanitize the incoming data.*/ - if(MAX_OBJECT_NAME_LENGTH <= strlen(sender)) - { - RBUSCORELOG_ERROR("Object name length exceeds limits."); - return; - } - - rbusMessage_FromBytes(&msg, data, dataLen); - - rbusMessage_BeginMetaSectionRead(msg); - err = rbusMessage_GetString(msg, &event_name); - err = rbusMessage_GetString(msg, &object_name); - err = rbusMessage_GetInt32(msg, &is_rbus_flag); - rbusMessage_EndMetaSectionRead(msg); - if(RT_OK != err) - { - RBUSCORELOG_ERROR("Event message doesn't contain an event name."); - rbusMessage_Release(msg); - return; - } - - if(is_rbus_flag) - { - if(g_master_event_callback) - { - err = g_master_event_callback(sender, event_name, msg, g_master_event_user_data); - if(err != RBUSCORE_ERROR_EVENT_NOT_HANDLED) - { - rbusMessage_Release(msg); - return; - } - } - else - { - RBUSCORELOG_ERROR("Received rbus event but no master callback registered yet."); - } - } - - lock(); - subs_len = rtVector_Size(g_event_subscriptions_for_client); - for(i = 0; i < subs_len; ++i) - { - client_subscription_t sub = rtVector_At(g_event_subscriptions_for_client, i); - - if( strncmp(sub->object, sender, MAX_OBJECT_NAME_LENGTH) == 0 || - strncmp(sub->object, event_name, MAX_OBJECT_NAME_LENGTH) == 0 ) /* support rbus events being elements : the object name will be the event name */ - { - client_event_t evt = rtVector_Find(sub->events, event_name, client_event_compare); - - if(evt) - { - unlock(); - evt->callback(sender, event_name, msg, evt->data); - rbusMessage_Release(msg); - return; - } - /* support rbus events being elements : keep searching */ - } - } - /* If no matching objects exist in records. Create a new entry.*/ - unlock(); - rbusMessage_Release(msg); - RBUSCORELOG_WARN("Received event %s::%s for which no subscription exists.", sender, event_name); - return; -} - static rbusCoreError_t remove_subscription_callback(const char * object_name, const char * event_name) { /*using namespace rbus_client;*/ @@ -1539,7 +1462,7 @@ static rbusCoreError_t remove_subscription_callback(const char * object_name, c return ret; } -static rbusCoreError_t rbus_subscribeToEventInternal(const char * object_name, const char * event_name, rbus_event_callback_t callback, const rbusMessage payload, void * user_data, int* providerError, int timeout, bool publishOnSubscribe, rbusMessage *response) +static rbusCoreError_t rbus_subscribeToEventInternal(const char * object_name, const char * event_name, rbus_event_callback_t callback, const rbusMessage payload, void * user_data, int* providerError, int timeout, bool publishOnSubscribe, rbusMessage *response, bool rawData) { /*using namespace rbus_client;*/ rbusCoreError_t ret = RBUSCORE_SUCCESS; @@ -1577,13 +1500,6 @@ static rbusCoreError_t rbus_subscribeToEventInternal(const char * object_name, if(NULL == event_name) event_name = DEFAULT_EVENT; - if(false == g_run_event_client_dispatch) - { - RBUSCORELOG_DEBUG("Starting event dispatching."); - rtConnection_AddDefaultListener(g_connection, master_event_callback, NULL); - g_run_event_client_dispatch = true; - } - if(g_master_event_callback == NULL) { sub = rtVector_Find(g_event_subscriptions_for_client, object_name, client_subscription_compare); @@ -1612,7 +1528,7 @@ static rbusCoreError_t rbus_subscribeToEventInternal(const char * object_name, unlock(); - if((ret = send_subscription_request(object_name, event_name, true, payload, providerError, timeout, publishOnSubscribe, response)) != RBUSCORE_SUCCESS) + if((ret = send_subscription_request(object_name, event_name, true, payload, providerError, timeout, publishOnSubscribe, response, rawData)) != RBUSCORE_SUCCESS) { if(g_master_event_callback == NULL) { @@ -1627,15 +1543,15 @@ static rbusCoreError_t rbus_subscribeToEventInternal(const char * object_name, rbusCoreError_t rbus_subscribeToEvent(const char * object_name, const char * event_name, rbus_event_callback_t callback, const rbusMessage payload, void * user_data, int* providerError) { - return rbus_subscribeToEventInternal(object_name, event_name, callback, payload, user_data, providerError, 0, false, NULL); + return rbus_subscribeToEventInternal(object_name, event_name, callback, payload, user_data, providerError, 0, false, NULL, false); } -rbusCoreError_t rbus_subscribeToEventTimeout(const char * object_name, const char * event_name, rbus_event_callback_t callback, const rbusMessage payload, void * user_data, int* providerError, int timeout, bool publishOnSubscribe, rbusMessage *response) +rbusCoreError_t rbus_subscribeToEventTimeout(const char * object_name, const char * event_name, rbus_event_callback_t callback, const rbusMessage payload, void * user_data, int* providerError, int timeout, bool publishOnSubscribe, rbusMessage *response, bool rawData) { - return rbus_subscribeToEventInternal(object_name, event_name, callback, payload, user_data, providerError, timeout, publishOnSubscribe, response); + return rbus_subscribeToEventInternal(object_name, event_name, callback, payload, user_data, providerError, timeout, publishOnSubscribe, response, rawData); } -rbusCoreError_t rbus_unsubscribeFromEvent(const char * object_name, const char * event_name, const rbusMessage payload) +rbusCoreError_t rbus_unsubscribeFromEvent(const char * object_name, const char * event_name, const rbusMessage payload, bool rawData) { rbusCoreError_t ret = RBUSCORE_ERROR_INVALID_PARAM; @@ -1658,7 +1574,7 @@ rbusCoreError_t rbus_unsubscribeFromEvent(const char * object_name, const char if(!g_master_event_callback) remove_subscription_callback(object_name, event_name); - ret = send_subscription_request(object_name, event_name, false, payload, NULL, 0, false, NULL); + ret = send_subscription_request(object_name, event_name, false, payload, NULL, 0, false, NULL, rawData); return ret; } @@ -1764,7 +1680,7 @@ rbusCoreError_t rbus_registerClientDisconnectHandler(rbus_client_disconnect_call lock(); if(!g_advisory_listener_installed) { - rtError err = rtConnection_AddListener(g_connection, RTMSG_ADVISORY_TOPIC, &rtrouted_advisory_callback, g_connection); + rtError err = rtConnection_AddListener(g_connection, RTMSG_ADVISORY_TOPIC, &rtrouted_advisory_callback, g_connection, RBUS_ADVISORY_SUBSCRIPTION_ID); if(err == RT_OK) { RBUSCORELOG_DEBUG("Listening for advisory messages"); @@ -1787,17 +1703,19 @@ rbusCoreError_t rbus_unregisterClientDisconnectHandler() lock(); if(g_advisory_listener_installed) { - rtConnection_RemoveListener(g_connection, RTMSG_ADVISORY_TOPIC); + rtConnection_RemoveListenerWithId(g_connection, RBUS_ADVISORY_SUBSCRIPTION_ID); g_advisory_listener_installed = false; } unlock(); return RBUSCORE_SUCCESS; } -rbusCoreError_t rbus_publishSubscriberEvent(const char* object_name, const char * event_name, const char* listener, rbusMessage out) +rbusCoreError_t rbus_publishSubscriberEvent(const char* object_name, const char * event_name, const char* listener, rbusMessage out, uint32_t subscriptionId) { /*using namespace rbus_server;*/ rbusCoreError_t ret = RBUSCORE_SUCCESS; + char topic[MAX_OBJECT_NAME_LENGTH] = {0}; + if(NULL == event_name) event_name = DEFAULT_EVENT; if(MAX_OBJECT_NAME_LENGTH <= strnlen(object_name, MAX_OBJECT_NAME_LENGTH)) @@ -1817,10 +1735,13 @@ rbusCoreError_t rbus_publishSubscriberEvent(const char* object_name, const char uint8_t* data; uint32_t dataLength; rbusMessage_ToBytes(out, &data, &dataLength); - rtRouteDirect_SendMessage (pPrivCliInfo, data, dataLength); + rtRouteDirect_SendMessage (pPrivCliInfo, data, dataLength, false, (char*)event_name, subscriptionId); } else { + snprintf(topic, MAX_OBJECT_NAME_LENGTH, "%d.%s", subscriptionId ,event_name); + if(topic[strlen(topic) - 1] == '.') + topic[strlen(topic) - 1] = '\0'; lock(); server_object_t obj = get_object(object_name); if(NULL == obj) @@ -1830,7 +1751,7 @@ rbusCoreError_t rbus_publishSubscriberEvent(const char* object_name, const char ret = RBUSCORE_ERROR_INVALID_PARAM; } - if(rbus_sendMessage(out, listener, object_name) != RBUSCORE_SUCCESS) + if(rbus_sendMessage(out, topic, object_name) != RBUSCORE_SUCCESS) { RBUSCORELOG_ERROR("Couldn't send event %s::%s to %s.", object_name, event_name, listener); } @@ -2938,7 +2859,6 @@ rbusCoreError_t rbuscore_openPrivateConnectionToProvider(rtConnection *pPrivateC } *pPrivateConn = connection; - rtConnection_AddDefaultListener(connection, master_event_callback, NULL); RBUSCORELOG_DEBUG("pPrivateConn new = %p", connection); rtMessage_Release(config); } diff --git a/src/core/rbuscore.h b/src/core/rbuscore.h index a625af1a..f22f7de3 100644 --- a/src/core/rbuscore.h +++ b/src/core/rbuscore.h @@ -24,6 +24,7 @@ #include "rbuscore_types.h" #include "rbuscore_message.h" #include +#include "rtrouteBase.h" #define MAX_OBJECT_NAME_LENGTH RTMSG_HEADER_MAX_TOPIC_LENGTH #define MAX_METHOD_NAME_LENGTH 64 @@ -31,6 +32,8 @@ #define MAX_SUPPORTED_METHODS 32 #define MAX_REGISTERED_OBJECTS 64 #define RBUS_OPEN_TELEMETRY_DATA_MAX 512 +#define RBUS_REGISTER_OBJECT_SUBSCRIPTION_ID 2 +#define RBUS_ADVISORY_SUBSCRIPTION_ID 3 void rbus_getOpenTelemetryContext(const char **s, const char **t); void rbus_setOpenTelemetryContext(const char *s, const char *t); @@ -142,10 +145,10 @@ rbusCoreError_t rbus_subscribeToEvent(const char * object_name, const char * ev /* Subscribe to 'event_name' events from 'object_name' object, with the specified timeout. If the timeout is less than or equal to zero, timeout will be set to 1000. * If the object supports only one event, event_name can be NULL. If the event_name is an alias for the object, then object_name can be NULL. The installed callback will be invoked every time * a matching event is received. */ -rbusCoreError_t rbus_subscribeToEventTimeout(const char * object_name, const char * event_name, rbus_event_callback_t callback, const rbusMessage payload, void * user_data, int* providerError, int timeout_ms, bool publishOnSubscribe, rbusMessage *response); +rbusCoreError_t rbus_subscribeToEventTimeout(const char * object_name, const char * event_name, rbus_event_callback_t callback, const rbusMessage payload, void * user_data, int* providerError, int timeout_ms, bool publishOnSubscribe, rbusMessage *response, bool rawData); /* Unsubscribe from receiving 'event_name' events from 'object_name' object. If the object supports only one event, event_name can be NULL. */ -rbusCoreError_t rbus_unsubscribeFromEvent(const char * object_name, const char * event_name, const rbusMessage payload); +rbusCoreError_t rbus_unsubscribeFromEvent(const char * object_name, const char * event_name, const rbusMessage payload, bool rawData); /* Register a on-subscribe callback which will be called when any subscriber subscribes to any event. This disables the rbuscore built-in server-side event subscription handling. Used by rbus 2.0*/ @@ -159,7 +162,7 @@ rbusCoreError_t rbus_registerClientDisconnectHandler(rbus_client_disconnect_call rbusCoreError_t rbus_unregisterClientDisconnectHandler(); /* Send an event message directly to a specific subscribe(e.g. listener) */ -rbusCoreError_t rbus_publishSubscriberEvent(const char* object_name, const char * event_name, const char* listener, rbusMessage out); +rbusCoreError_t rbus_publishSubscriberEvent(const char* object_name, const char * event_name, const char* listener, rbusMessage out, uint32_t subscriptionId); /*------ Convenience functions built on top of base functions above. ------*/ @@ -226,6 +229,7 @@ rbusCoreError_t rbuscore_startPrivateListener(const char* pPrivateConnAddress, c /* The Provider application to request to remove/close the instance of rtrouted in the given DML */ rbusCoreError_t rbuscore_updatePrivateListener(const char* pConsumerName, const char *pDMLName); +const rtPrivateClientInfo* _rbuscore_find_server_privateconnection(const char *pParameterName, const char *pConsumerName); #ifdef __cplusplus } diff --git a/src/rbus/rbus.c b/src/rbus/rbus.c index c4135e8b..98c25b9b 100644 --- a/src/rbus/rbus.c +++ b/src/rbus/rbus.c @@ -149,8 +149,10 @@ typedef enum _rbus_legacy_returns { typedef struct _rbusEventSubscriptionInternal { - bool dirty; - rbusEventSubscription_t* sub; + bool dirty; + bool rawData; + rbusEventSubscription_t* sub; + uint32_t subscriptionId; } rbusEventSubscriptionInternal_t; //********************************************************************************// @@ -165,6 +167,8 @@ static int _callback_handler(char const* destination, char const* method, rbusMe static rbusError_t _rbus_event_unsubscribe(rbusHandle_t handle, rbusEventSubscriptionInternal_t* subscription); static rbusError_t _rbus_AsyncSubscribe_remove_subscription(rbusHandle_t handle, rbusEventSubscription_t* subscription); +static void _consumer_event_handler(rbusHandle_t handle, rbusMessage_t* msg, void * userData); +static void _subscribe_rawdata_handler(rbusHandle_t handle, rbusMessage_t* msg, void * userData); //******************************* INTERNAL FUNCTIONS *****************************// static rbusError_t rbusCoreError_to_rbusError(rtError e) @@ -293,7 +297,7 @@ void rbusEventSubscriptionInternal_free(void* p) } static rbusEventSubscriptionInternal_t* rbusEventSubscription_find(rtVector eventSubs, char const* eventName, - rbusFilter_t filter, uint32_t interval, uint32_t duration) + rbusFilter_t filter, uint32_t interval, uint32_t duration, bool rawData) { /*FIXME - convert to map */ size_t i; @@ -302,7 +306,7 @@ static rbusEventSubscriptionInternal_t* rbusEventSubscription_find(rtVector even rbusEventSubscriptionInternal_t* subInternal = (rbusEventSubscriptionInternal_t*)rtVector_At(eventSubs, i); if(subInternal && subInternal->sub && !strcmp(subInternal->sub->eventName, eventName) && !rbusFilter_Compare(subInternal->sub->filter, filter) && (subInternal->sub->interval == interval) && - (subInternal->sub->duration == duration)) + (subInternal->sub->duration == duration) && subInternal->rawData == rawData) { return subInternal; } @@ -310,6 +314,103 @@ static rbusEventSubscriptionInternal_t* rbusEventSubscription_find(rtVector even return NULL; } +static rbusEventSubscriptionInternal_t* rbusEventSubscription_findWithId(rtVector eventSubs, uint32_t subscriptionId) +{ + /*FIXME - convert to map */ + size_t i; + for(i=0; i < rtVector_Size(eventSubs); ++i) + { + rbusEventSubscriptionInternal_t* subInternal = (rbusEventSubscriptionInternal_t*)rtVector_At(eventSubs, i); + if(subInternal && subInternal->sub && (subscriptionId == subInternal->subscriptionId)) + { + return subInternal; + } + } + return NULL; +} + +rbusError_t rbusOpenDirect_SubAdd(rbusHandle_t handle, rtVector eventSubs, char const* eventName) +{ + size_t i; + char rawDataTopic[RBUS_MAX_NAME_LENGTH] = {0}; + rbusEventSubscriptionInternal_t* subInternal = NULL; + rbusError_t errorcode = RBUS_ERROR_SUCCESS; + + for(i=0; i < rtVector_Size(eventSubs); ++i) + { + subInternal = (rbusEventSubscriptionInternal_t*)rtVector_At(eventSubs, i); + if(subInternal && !strcmp(subInternal->sub->eventName, eventName)) + { + if(subInternal->rawData) + snprintf(rawDataTopic, RBUS_MAX_NAME_LENGTH, "rawdata.%s", subInternal->sub->eventName); + else + snprintf(rawDataTopic, RBUS_MAX_NAME_LENGTH, "%d.%s", subInternal->subscriptionId, subInternal->sub->eventName); + + errorcode = rbusMessage_RemoveListener(handle, rawDataTopic, subInternal->subscriptionId); + if (errorcode != RBUS_ERROR_SUCCESS) + { + RBUSLOG_WARN("rtConnection_RemoveListener failed err:%d", errorcode); + } + memset(rawDataTopic, '\0', strlen(rawDataTopic)); + snprintf(rawDataTopic, RBUS_MAX_NAME_LENGTH, "%s", subInternal->sub->eventName); + if(subInternal->rawData) + { + errorcode = rbusMessage_AddPrivateListener(handle, rawDataTopic, _subscribe_rawdata_handler, (void *)(subInternal->sub), subInternal->subscriptionId); + } + else + { + errorcode = rbusMessage_AddPrivateListener(handle, rawDataTopic, _consumer_event_handler, (void *)&subInternal->subscriptionId, subInternal->subscriptionId); + } + if(errorcode != RBUS_ERROR_SUCCESS) + { + RBUSLOG_ERROR("rbusMessage_AddPrivateListener failed err: %d", errorcode); + } + } + } + return errorcode; +} + +rbusError_t rbusCloseDirect_SubRemove(rbusHandle_t handle, rtVector eventSubs, char const* eventName) +{ + size_t i; + rbusError_t errorcode = RBUS_ERROR_SUCCESS; + rbusEventSubscriptionInternal_t* subInternal = NULL; + char rawDataTopic[RBUS_MAX_NAME_LENGTH] = {0}; + + for(i=0; i < rtVector_Size(eventSubs); ++i) + { + subInternal = (rbusEventSubscriptionInternal_t*)rtVector_At(eventSubs, i); + if(subInternal && !strcmp(subInternal->sub->eventName, eventName)) + { + snprintf(rawDataTopic, RBUS_MAX_NAME_LENGTH, "%s", subInternal->sub->eventName); + errorcode = rbusMessage_RemovePrivateListener(handle, rawDataTopic, subInternal->subscriptionId); + if (errorcode != RBUS_ERROR_SUCCESS) + { + RBUSLOG_WARN("rtConnection_RemoveListener failed err:%d", errorcode); + } + handle->m_connection = handle->m_connectionParent; /* changed the handle m_connection of direct connection to use normal m_connection and used the same to add the rawdatatopic for normal connection*/ + memset(rawDataTopic, '\0', strlen(rawDataTopic)); + if(subInternal->rawData) + snprintf(rawDataTopic, RBUS_MAX_NAME_LENGTH, "rawdata.%s", subInternal->sub->eventName); + else + snprintf(rawDataTopic, RBUS_MAX_NAME_LENGTH, "%d.%s", subInternal->subscriptionId, subInternal->sub->eventName); + if(subInternal->rawData) + { + errorcode = rbusMessage_AddListener(handle, rawDataTopic, _subscribe_rawdata_handler, (void *)(subInternal->sub), subInternal->subscriptionId); + } + else + { + errorcode = rbusMessage_AddListener(handle, rawDataTopic, _consumer_event_handler, (void *)&subInternal->subscriptionId, subInternal->subscriptionId); + } + if(errorcode != RBUS_ERROR_SUCCESS) + { + RBUSLOG_ERROR("rbusMessage_AddListener failed err: %d", errorcode); + } + } + } + return errorcode; +} + static bool _parse_rbusData_to_value (char const* pBuff, rbusLegacyDataType_t legacyType, rbusValue_t value) { bool rc = false; @@ -954,7 +1055,9 @@ int subscribeHandlerImpl( int32_t componentId, int32_t interval, int32_t duration, - rbusFilter_t filter) + rbusFilter_t filter, + int rawData, + uint64_t** subscriptionId) { int error = RBUS_ERROR_SUCCESS; rbusSubscription_t* subscription = NULL; @@ -971,6 +1074,9 @@ int subscribeHandlerImpl( if(!el) return -1; + if(rawData) + autoPublish = false; + RBUSLOG_INFO("Consumer=%s %s to event=%s", listener, added ? "SUBSCRIBED" : "UNSUBSCRIBED", eventName); HANDLE_SUBS_MUTEX_LOCK(handle); @@ -988,6 +1094,13 @@ int subscribeHandlerImpl( err = el->cbTable.eventSubHandler(handle, action, eventName, filter, interval, &autoPublish); ELM_PRIVATE_UNLOCK(el); + if(rawData && autoPublish) + { + RBUSLOG_DEBUG("%s raw data subscription doesn't allow autoPublish=%d", __FUNCTION__, err); + HANDLE_SUBS_MUTEX_UNLOCK(handle); + return RBUS_ERROR_INVALID_INPUT; + } + if(err != RBUS_ERROR_SUCCESS) { RBUSLOG_DEBUG("%s provider subHandler return err=%d", __FUNCTION__, err); @@ -1010,15 +1123,17 @@ int subscribeHandlerImpl( return RBUS_ERROR_INVALID_OPERATION; } - subscription = rbusSubscriptions_getSubscription(handleInfo->subscriptions, listener, eventName, componentId, filter, interval, duration); + subscription = rbusSubscriptions_getSubscription(handleInfo->subscriptions, listener, eventName, componentId, filter, interval, duration, rawData); if(!subscription) { - subscription = rbusSubscriptions_addSubscription(handleInfo->subscriptions, listener, eventName, componentId, filter, interval, duration, autoPublish, el); + subscription = rbusSubscriptions_addSubscription(handleInfo->subscriptions, listener, eventName, componentId, filter, interval, duration, autoPublish, el, rawData); if(!subscription) { HANDLE_SUBS_MUTEX_UNLOCK(handle); return RBUS_ERROR_INVALID_INPUT; // Adding fails because of invalid input } + else + **subscriptionId = subscription->subscriptionId; } else { @@ -1028,7 +1143,7 @@ int subscribeHandlerImpl( } else { - subscription = rbusSubscriptions_getSubscription(handleInfo->subscriptions, listener, eventName, componentId, filter, interval, duration); + subscription = rbusSubscriptions_getSubscription(handleInfo->subscriptions, listener, eventName, componentId, filter, interval, duration, rawData); if(!subscription) { @@ -1040,48 +1155,57 @@ int subscribeHandlerImpl( /* if autoPublish and its a property being subscribed to then update rbusValueChange to handle the property */ - if(el->type == RBUS_ELEMENT_TYPE_PROPERTY && subscription->autoPublish) + if(rawData && el->type != RBUS_ELEMENT_TYPE_EVENT) + { + RBUSLOG_INFO("rawDataSubscription is only allowed for events"); + HANDLE_SUBS_MUTEX_UNLOCK(handle); + return RBUS_ERROR_INVALID_INPUT; + } + else { - rtListItem item; - rtList_GetFront(subscription->instances, &item); - while(item) + if(el->type == RBUS_ELEMENT_TYPE_PROPERTY && subscription->autoPublish) { - elementNode* node; + rtListItem item; + rtList_GetFront(subscription->instances, &item); + while(item) + { + elementNode* node; - rtListItem_GetData(item, (void**)&node); + rtListItem_GetData(item, (void**)&node); - if (subscription->interval) - { - RBUSLOG_INFO("%s: subscription with interval %s event=%s prop=%s", __FUNCTION__, - added ? "Add" : "Remove", subscription->eventName, node->fullName); - if(added) { - if((error = rbusInterval_AddSubscriptionRecord(handle, node, subscription)) != RBUS_ERROR_SUCCESS) - RBUSLOG_ERROR("rbusInterval_AddSubscriptionRecord failed with error : %d\n", error); - break; - } - else + if (subscription->interval) { - rbusInterval_RemoveSubscriptionRecord(handle, node, subscription); - break; - } - } - else if(!elementHasAutoPubSubscriptions(node, subscription)) - { - /* Check if the node has other subscribers or not. If it has other - subs then we don't need to either add or remove it from ValueChange */ - RBUSLOG_INFO("%s: ValueChange %s event=%s prop=%s", __FUNCTION__, - added ? "Add" : "Remove", subscription->eventName, node->fullName); - if(added) - { - rbusValueChange_AddPropertyNode(handle, node); + RBUSLOG_INFO("%s: subscription with interval %s event=%s prop=%s", __FUNCTION__, + added ? "Add" : "Remove", subscription->eventName, node->fullName); + if(added) { + if((error = rbusInterval_AddSubscriptionRecord(handle, node, subscription)) != RBUS_ERROR_SUCCESS) + RBUSLOG_ERROR("rbusInterval_AddSubscriptionRecord failed with error : %d\n", error); + break; + } + else + { + rbusInterval_RemoveSubscriptionRecord(handle, node, subscription); + break; + } } - else + else if(!elementHasAutoPubSubscriptions(node, subscription)) { - rbusValueChange_RemovePropertyNode(handle, node); + /* Check if the node has other subscribers or not. If it has other + subs then we don't need to either add or remove it from ValueChange */ + RBUSLOG_INFO("%s: ValueChange %s event=%s prop=%s", __FUNCTION__, + added ? "Add" : "Remove", subscription->eventName, node->fullName); + if(added) + { + rbusValueChange_AddPropertyNode(handle, node); + } + else + { + rbusValueChange_RemovePropertyNode(handle, node); + } } - } - rtListItem_GetNext(item, &item); + rtListItem_GetNext(item, &item); + } } } @@ -1209,14 +1333,14 @@ static void unregisterTableRow (rbusHandle_t handle, elementNode* rowInstElem) } } //******************************* CALLBACKS *************************************// -static int _event_subscribe_callback_handler(elementNode* el, char const* eventName, char const* listener, int added, int componentId, int interval, int duration, rbusFilter_t filter, void* userData) +static int _event_subscribe_callback_handler(elementNode* el, char const* eventName, char const* listener, int added, int componentId, int interval, int duration, rbusFilter_t filter, void* userData, int rawData, uint64_t* subscriptionId) { rbusHandle_t handle = (rbusHandle_t)userData; rbusCoreError_t err = RBUSCORE_SUCCESS; RBUSLOG_DEBUG("%s: event subscribe callback for [%s] event! and element of type %d", __FUNCTION__, eventName, el->type); - err = subscribeHandlerImpl(handle, added, el, eventName, listener, componentId, interval, duration, filter); + err = subscribeHandlerImpl(handle, added, el, eventName, listener, componentId, interval, duration, filter, rawData, &subscriptionId); return err; } @@ -1227,9 +1351,11 @@ static void _client_disconnect_callback_handler(const char * listener) UnlockMutex(); } -void _subscribe_async_callback_handler(rbusHandle_t handle, rbusEventSubscription_t* subscription, rbusError_t error) +void _subscribe_async_callback_handler(rbusHandle_t handle, rbusEventSubscription_t* subscription, rbusError_t error, uint32_t subscriptionId) { struct _rbusHandle* handleInfo = (struct _rbusHandle*)handle; + char topic[RBUS_MAX_NAME_LENGTH] = {0}; + rbusError_t errorcode = RBUS_ERROR_SUCCESS; subscription->asyncHandler(subscription->handle, subscription, error); if(subscription) @@ -1239,15 +1365,26 @@ void _subscribe_async_callback_handler(rbusHandle_t handle, rbusEventSubscriptio HANDLE_EVENTSUBS_MUTEX_LOCK(handle); rbusEventSubscriptionInternal_t* subInternal = NULL; if ((subInternal = rbusEventSubscription_find(handleInfo->eventSubs, subscription->eventName, - subscription->filter, subscription->interval, subscription->duration)) != NULL) + subscription->filter, subscription->interval, subscription->duration, false)) != NULL) { rtVector_RemoveItem(handleInfo->eventSubs, subInternal, rbusEventSubscriptionInternal_free); } subInternal = rt_malloc(sizeof(rbusEventSubscriptionInternal_t)); subInternal->sub = subscription; subInternal->dirty = false; + subInternal->subscriptionId = subscriptionId; + subInternal->rawData = false; rtVector_PushBack(handleInfo->eventSubs, subInternal); HANDLE_EVENTSUBS_MUTEX_UNLOCK(handle); + snprintf(topic, RBUS_MAX_NAME_LENGTH, "%d.%s", subscriptionId, subscription->eventName); + + errorcode = rbusMessage_AddListener(handle, topic, _consumer_event_handler, (void *)&subInternal->subscriptionId, subInternal->subscriptionId); + if(errorcode != RBUS_ERROR_SUCCESS) + { + RBUSLOG_ERROR("%s: Listener failed err: %d", __FUNCTION__, errorcode); + return; + } + } else { @@ -1316,7 +1453,7 @@ static int _master_event_callback_handler(char const* sender, char const* eventN RBUSLOG_DEBUG("Received master event callback: sender=%s eventName=%s componentId=%d", sender, eventName, componentId); HANDLE_EVENTSUBS_MUTEX_LOCK(handleInfo); - subInternal = rbusEventSubscription_find(handleInfo->eventSubs, eventName, filter, interval, duration); + subInternal = rbusEventSubscription_find(handleInfo->eventSubs, eventName, filter, interval, duration, false); if(subInternal) { @@ -2341,10 +2478,12 @@ static void _subscribe_callback_handler (rbusHandle_t handle, rbusMessage reques int has_payload = 0; rbusMessage payload = NULL; int publishOnSubscribe = 0; + int rawData = 0; struct _rbusHandle* handleInfo = handle; int32_t componentId = 0; int32_t interval = 0; int32_t duration = 0; + uint64_t subscriptionId = 0; rbusFilter_t filter = NULL; elementNode* el = NULL; rbusError_t ret = RBUS_ERROR_SUCCESS; @@ -2396,10 +2535,11 @@ static void _subscribe_callback_handler (rbusHandle_t handle, rbusMessage reques } int added = strncmp(method, METHOD_SUBSCRIBE, MAX_METHOD_NAME_LENGTH) == 0 ? 1 : 0; - if(added) - rbusMessage_GetInt32(request, &publishOnSubscribe); + + rbusMessage_GetInt32(request, &publishOnSubscribe); + rbusMessage_GetInt32(request, &rawData); if(ret == RBUS_ERROR_SUCCESS) - ret = _event_subscribe_callback_handler(el, event_name, sender, added, componentId, interval, duration, filter, handle); + ret = _event_subscribe_callback_handler(el, event_name, sender, added, componentId, interval, duration, filter, handle, rawData, &subscriptionId); rbusMessage_SetInt32(*response, ret); if(publishOnSubscribe && ret == RBUS_ERROR_SUCCESS) @@ -2499,6 +2639,7 @@ static void _subscribe_callback_handler (rbusHandle_t handle, rbusMessage reques } rbusMessage_Release(payload); } + rbusMessage_SetInt32(*response, subscriptionId); } } } @@ -2664,6 +2805,30 @@ static int _callback_handler(char const* destination, char const* method, rbusMe return 0; } +static void _consumer_event_handler(rbusHandle_t handle, rbusMessage_t* msg, void * userData) +{ + rbusMessage message = NULL; + rbusEvent_t event = {0}; + rbusEventSubscriptionInternal_t* subInternal = NULL; + rbusEventHandler_t eventHandlerFuncPtr = NULL; + struct _rbusHandle* handleInfo = (struct _rbusHandle*)handle; + rbusFilter_t filter = NULL; + int32_t componentId = 0; + uint32_t interval = 0; + uint32_t duration = 0; + int32_t *subscriptionId = (int32_t *)userData; + + rbusMessage_FromBytes(&message, msg->data, msg->length); + rbusEventData_updateFromMessage(&event, &filter, &interval, &duration, &componentId, message); + subInternal = rbusEventSubscription_findWithId(handleInfo->eventSubs, *subscriptionId); + + if(subInternal) + { + eventHandlerFuncPtr = subInternal->sub->handler; + (eventHandlerFuncPtr)(handle, &event, subInternal->sub); + } +} + /* Handle once per process initialization or deinitialization needed by rbus_open */ @@ -2853,8 +3018,12 @@ rbusError_t rbus_openDirect(rbusHandle_t handle, rbusHandle_t* myDirectHandle, c tmpHandle = rt_calloc(1, sizeof(struct _rbusHandle)); tmpHandle->componentName = strdup(pParameterName); tmpHandle->m_connection = myDirectCon; + tmpHandle->eventSubs = handleInfo->eventSubs; + tmpHandle->m_connectionParent = handleInfo->m_connection; + tmpHandle->messageCallbacks = handleInfo->messageCallbacks; tmpHandle->m_handleType = RBUS_HWDL_TYPE_DIRECT; *myDirectHandle = tmpHandle; + rbusOpenDirect_SubAdd(handle, handle->eventSubs, pParameterName); if (!sDisConnHandler) { rbus_registerClientDisconnectHandler(_client_disconnect_callback_handler); @@ -2873,6 +3042,7 @@ rbusError_t rbus_openDirect(rbusHandle_t handle, rbusHandle_t* myDirectHandle, c ret = RBUS_ERROR_INVALID_INPUT; } + return ret; } @@ -2894,6 +3064,7 @@ rbusError_t rbus_closeDirect(rbusHandle_t handle) } } --sDisConnHandler; + ret = rbusCloseDirect_SubRemove(handle, handleInfo->eventSubs, handleInfo->componentName); rbuscore_closePrivateConnection(handleInfo->componentName); free(handleInfo->componentName); handleInfo->componentName = NULL; @@ -4508,6 +4679,7 @@ static rbusError_t _rbus_event_unsubscribe( { rbusError_t errorcode = RBUS_ERROR_SUCCESS; struct _rbusHandle* handleInfo = (struct _rbusHandle*)handle; + char rawDataTopic[RBUS_MAX_NAME_LENGTH] = {0}; rbusEventSubscription_t* subscription = (rbusEventSubscription_t*)subInternal->sub; RBUSLOG_INFO("%s: %s", __FUNCTION__, subscription->eventName); @@ -4517,7 +4689,28 @@ static rbusError_t _rbus_event_unsubscribe( payload = rbusEvent_CreateSubscribePayload(subscription, handleInfo->componentId); - coreerr = rbus_unsubscribeFromEvent(NULL, subscription->eventName, payload); + if(subInternal->rawData) + { + rtConnection myConn = rbuscore_FindClientPrivateConnection(subInternal->sub->eventName); + if(myConn) + { + errorcode = rbusMessage_RemovePrivateListener(handle, subInternal->sub->eventName, subInternal->subscriptionId); + if (errorcode != RBUS_ERROR_SUCCESS) + { + RBUSLOG_WARN("rtConnection_RemovePrivateListener failed err:%d", errorcode); + } + } + else + { + snprintf(rawDataTopic, RBUS_MAX_NAME_LENGTH, "rawdata.%s", subInternal->sub->eventName); + if(RBUS_ERROR_SUCCESS != rbusMessage_RemoveListener(handle, rawDataTopic, subInternal->subscriptionId)) + { + RBUSLOG_WARN("%s: Remove listener failed err: %d", __FUNCTION__, errorcode); + } + } + } + + coreerr = rbus_unsubscribeFromEvent(NULL, subscription->eventName, payload, subInternal->rawData); if(payload) { @@ -4558,7 +4751,8 @@ static rbusError_t rbusEvent_SubscribeWithRetries( uint32_t duration, int timeout, rbusSubscribeAsyncRespHandler_t async, - bool publishOnSubscribe) + bool publishOnSubscribe, + bool rawData) { rbusCoreError_t coreerr; int providerError = RBUS_ERROR_SUCCESS; @@ -4570,7 +4764,7 @@ static rbusError_t rbusEvent_SubscribeWithRetries( int destNotFoundTimeout; struct _rbusHandle* handleInfo = (struct _rbusHandle*)handle; HANDLE_EVENTSUBS_MUTEX_LOCK(handle); - if ((subInternal = rbusEventSubscription_find(handleInfo->eventSubs, eventName, filter, interval, duration)) != NULL) + if ((subInternal = rbusEventSubscription_find(handleInfo->eventSubs, eventName, filter, interval, duration, rawData)) != NULL) { /*Allow only for dirty subscription*/ if (subInternal->dirty) @@ -4629,7 +4823,7 @@ static rbusError_t rbusEvent_SubscribeWithRetries( { RBUSLOG_DEBUG("%s: %s subscribing", __FUNCTION__, eventName); - coreerr = rbus_subscribeToEventTimeout(NULL, sub->eventName, _event_callback_handler, payload, sub, &providerError, destNotFoundTimeout, publishOnSubscribe, &response); + coreerr = rbus_subscribeToEventTimeout(NULL, sub->eventName, _event_callback_handler, payload, sub, &providerError, destNotFoundTimeout, publishOnSubscribe, &response, rawData); if(coreerr == RBUSCORE_ERROR_DESTINATION_UNREACHABLE && destNotFoundTimeout > 0) { @@ -4666,22 +4860,65 @@ static rbusError_t rbusEvent_SubscribeWithRetries( if(coreerr == RBUSCORE_SUCCESS) { int initial_value = 0; + int32_t subscriptionId = 0; + char topic[RBUS_MAX_NAME_LENGTH] = {0}; + rbusError_t errorcode = RBUS_ERROR_SUCCESS; HANDLE_EVENTSUBS_MUTEX_LOCK(handle); - if ((subInternal = rbusEventSubscription_find(handleInfo->eventSubs, eventName, filter, interval, duration)) != NULL) + if ((subInternal = rbusEventSubscription_find(handleInfo->eventSubs, eventName, filter, interval, duration, rawData)) != NULL) { rtVector_RemoveItem(handleInfo->eventSubs, subInternal, rbusEventSubscriptionInternal_free); } subInternal = rt_malloc(sizeof(rbusEventSubscriptionInternal_t)); subInternal->sub = sub; subInternal->dirty = false; + subInternal->rawData = rawData; rtVector_PushBack(handleInfo->eventSubs, subInternal); HANDLE_EVENTSUBS_MUTEX_UNLOCK(handle); if(publishOnSubscribe) { rbusMessage_GetInt32(response, &initial_value); if(initial_value) + { _master_event_callback_handler(NULL, eventName, response, userData); + } } + rbusMessage_GetInt32(response, &subscriptionId); + subInternal->subscriptionId = subscriptionId; + if(!rawData) + { + rtConnection myConn = rbuscore_FindClientPrivateConnection(eventName); + if (myConn) + { + snprintf(topic, RBUS_MAX_NAME_LENGTH, "%d.%s", subscriptionId, eventName); + if(topic[strlen(topic) - 1] == '.') + topic[strlen(topic) - 1] = '\0'; + errorcode = rbusMessage_RemoveListener(handle, topic, subInternal->subscriptionId); + if (errorcode != RT_OK) + { + RBUSLOG_WARN("rbusMessage_RemoveListener:%d", errorcode); + } + memset(topic, '\0', strlen(topic)); + snprintf(topic, RBUS_MAX_NAME_LENGTH, "%s", eventName); + errorcode = rbusMessage_AddPrivateListener(handle, topic, _consumer_event_handler, (void *)&subInternal->subscriptionId, subInternal->subscriptionId); + if(errorcode != RBUS_ERROR_SUCCESS) + { + RBUSLOG_ERROR("rbusMessage_AddPrivateListener failed err: %d", errorcode); + } + } + else + { + snprintf(topic, RBUS_MAX_NAME_LENGTH, "%d.%s", subscriptionId, eventName); + if(topic[strlen(topic) - 1] == '.') + topic[strlen(topic) - 1] = '\0'; + errorcode = rbusMessage_AddListener(handle, topic, _consumer_event_handler, (void *)&subInternal->subscriptionId, subInternal->subscriptionId); + if(errorcode != RBUS_ERROR_SUCCESS) + { + RBUSLOG_ERROR("%s: Listener failed err: %d", __FUNCTION__, errorcode); + return errorcode; + } + } + } + if(response) rbusMessage_Release(response); RBUSLOG_INFO("%s: %s subscribe retries succeeded", __FUNCTION__, eventName); @@ -4694,7 +4931,7 @@ static rbusError_t rbusEvent_SubscribeWithRetries( RBUSLOG_DEBUG("%s: %s all subscribe retries failed because no provider could be found", __FUNCTION__, eventName); RBUSLOG_WARN("EVENT_SUBSCRIPTION_FAIL_NO_PROVIDER_COMPONENT %s", eventName);/*RDKB-33658-AC7*/ HANDLE_EVENTSUBS_MUTEX_LOCK(handle); - if ((subInternal = rbusEventSubscription_find(handleInfo->eventSubs, eventName, filter, interval, duration)) != NULL) + if ((subInternal = rbusEventSubscription_find(handleInfo->eventSubs, eventName, filter, interval, duration, rawData)) != NULL) { subInternal->dirty = true; } @@ -4711,7 +4948,7 @@ static rbusError_t rbusEvent_SubscribeWithRetries( if (providerError == RBUS_ERROR_SUBSCRIPTION_ALREADY_EXIST) { HANDLE_EVENTSUBS_MUTEX_LOCK(handle); - if ((subInternal = rbusEventSubscription_find(handleInfo->eventSubs, eventName, filter, interval, duration)) != NULL) + if ((subInternal = rbusEventSubscription_find(handleInfo->eventSubs, eventName, filter, interval, duration, rawData)) != NULL) { rtVector_RemoveItem(handleInfo->eventSubs, subInternal, rbusEventSubscriptionInternal_free); } @@ -4755,7 +4992,7 @@ static void _subscribe_rawdata_handler(rbusHandle_t handle, rbusMessage_t* msg, { HANDLE_EVENTSUBS_MUTEX_LOCK(handle); rbusEventSubscriptionInternal_t *subInternal = rbusEventSubscription_find(handleInfo->eventSubs, - ptmp->eventName, ptmp->filter, ptmp->interval, ptmp->duration); + ptmp->eventName, ptmp->filter, ptmp->interval, ptmp->duration, true); if (subInternal && subInternal->dirty) { errorcode = _rbus_event_unsubscribe(handle, subInternal); @@ -4767,7 +5004,7 @@ static void _subscribe_rawdata_handler(rbusHandle_t handle, rbusMessage_t* msg, else { snprintf(rawDataTopic, RBUS_MAX_NAME_LENGTH, "rawdata.%s", subInternal->sub->eventName); - if(RBUS_ERROR_SUCCESS != rbusMessage_RemoveListener(handle, rawDataTopic)) + if(RBUS_ERROR_SUCCESS != rbusMessage_RemoveListener(handle, rawDataTopic, subInternal->subscriptionId)) { RBUSLOG_WARN("%s: Remove listener failed err: %d", __FUNCTION__, errorcode); } @@ -4807,17 +5044,17 @@ rbusError_t rbusEvent_SubscribeRawData( RBUSLOG_DEBUG("%s: %s", __FUNCTION__, eventName); - errorcode = rbusEvent_SubscribeWithRetries(handle, eventName, handler, userData, NULL, 0, 0 , timeout, NULL, false); + errorcode = rbusEvent_SubscribeWithRetries(handle, eventName, handler, userData, NULL, 0, 0 , timeout, NULL, false, true); if(errorcode != RBUS_ERROR_SUCCESS) { RBUSLOG_ERROR("%s:Subscribe failed err: %d", __FUNCTION__, errorcode); return errorcode; } - subInternal = rbusEventSubscription_find(handleInfo->eventSubs, eventName, NULL, 0, 0); + subInternal = rbusEventSubscription_find(handleInfo->eventSubs, eventName, NULL, 0, 0, true); snprintf(rawDataTopic, RBUS_MAX_NAME_LENGTH, "rawdata.%s", subInternal->sub->eventName); errorcode = rbusMessage_AddListener(handle, rawDataTopic, - _subscribe_rawdata_handler, (void *)(subInternal->sub)); + _subscribe_rawdata_handler, (void *)(subInternal->sub), subInternal->subscriptionId); if(errorcode != RBUS_ERROR_SUCCESS) { RBUSLOG_ERROR("%s: Listener failed err: %d", __FUNCTION__, errorcode); @@ -4844,7 +5081,7 @@ rbusError_t rbusEvent_Subscribe( RBUSLOG_DEBUG("%s: %s", __FUNCTION__, eventName); - errorcode = rbusEvent_SubscribeWithRetries(handle, eventName, handler, userData, NULL, 0, 0 , timeout, NULL, false); + errorcode = rbusEvent_SubscribeWithRetries(handle, eventName, handler, userData, NULL, 0, 0 , timeout, NULL, false, false); return errorcode; } @@ -4870,7 +5107,7 @@ rbusError_t rbusEvent_SubscribeAsync( RBUSLOG_DEBUG("%s: %s", __FUNCTION__, eventName); - errorcode = rbusEvent_SubscribeWithRetries(handle, eventName, handler, userData, NULL, 0, 0, timeout, subscribeHandler, false); + errorcode = rbusEvent_SubscribeWithRetries(handle, eventName, handler, userData, NULL, 0, 0, timeout, subscribeHandler, false, false); return errorcode; } @@ -4893,13 +5130,13 @@ rbusError_t rbusEvent_Unsubscribe( /*the use of rtVector is inefficient here. I have to loop through the vector to find the sub by name, then call RemoveItem, which loops through again to find the item by address to destroy */ HANDLE_EVENTSUBS_MUTEX_LOCK(handle); - subInternal = rbusEventSubscription_find(handleInfo->eventSubs, eventName, NULL, 0, 0); + subInternal = rbusEventSubscription_find(handleInfo->eventSubs, eventName, NULL, 0, 0, false); if(subInternal) { rbusMessage payload = rbusEvent_CreateSubscribePayload(subInternal->sub, handleInfo->componentId); - rbusCoreError_t coreerr = rbus_unsubscribeFromEvent(NULL, eventName, payload); + rbusCoreError_t coreerr = rbus_unsubscribeFromEvent(NULL, eventName, payload, subInternal->rawData); if(payload) { @@ -4946,7 +5183,6 @@ rbusError_t rbusEvent_UnsubscribeRawData( char const* eventName) { struct _rbusHandle* handleInfo = (struct _rbusHandle*)handle; - char rawDataTopic[RBUS_MAX_NAME_LENGTH] = {0}; rbusEventSubscriptionInternal_t* subInternal; rbusError_t errorcode = RBUS_ERROR_SUCCESS; @@ -4961,17 +5197,12 @@ rbusError_t rbusEvent_UnsubscribeRawData( /*the use of rtVector is inefficient here. I have to loop through the vector to find the sub by name, then call RemoveItem, which loops through again to find the item by address to destroy */ HANDLE_EVENTSUBS_MUTEX_LOCK(handle); - subInternal = rbusEventSubscription_find(handleInfo->eventSubs, eventName, NULL, 0, 0); + subInternal = rbusEventSubscription_find(handleInfo->eventSubs, eventName, NULL, 0, 0, true); if (subInternal) { errorcode = _rbus_event_unsubscribe(handle, subInternal); if(errorcode != RBUS_ERROR_DESTINATION_NOT_REACHABLE) { - snprintf(rawDataTopic, RBUS_MAX_NAME_LENGTH, "rawdata.%s", subInternal->sub->eventName); - if(RBUS_ERROR_SUCCESS != rbusMessage_RemoveListener(handle, rawDataTopic)) - { - RBUSLOG_WARN("%s: Remove listener failed err: %d", __FUNCTION__, errorcode); - } rbusEventSubscriptionInternal_free(subInternal); } else @@ -5015,7 +5246,7 @@ rbusError_t rbusEvent_SubscribeEx( //the asyncsubscribe api to handle this. errorcode = rbusEvent_SubscribeWithRetries( handle, subscription[i].eventName, subscription[i].handler, subscription[i].userData, - subscription[i].filter, subscription[i].interval, subscription[i].duration, timeout, NULL, subscription[i].publishOnSubscribe); + subscription[i].filter, subscription[i].interval, subscription[i].duration, timeout, NULL, subscription[i].publishOnSubscribe, false); if(errorcode != RBUS_ERROR_SUCCESS) { /* Treat SubscribeEx like a transaction because @@ -5058,9 +5289,14 @@ rbusError_t rbusEvent_SubscribeExRawData( //For rbusEvent_Subscribe, since it a single subscribe, blocking is fine but for rbusEvent_SubscribeEx, //where we can have multiple, we need to actually run all these in parallel. So we might need to leverage //the asyncsubscribe api to handle this. + if(!subscription[i].handler) + { + errorcode = RBUS_ERROR_INVALID_INPUT; + break; + } errorcode = rbusEvent_SubscribeWithRetries( handle, subscription[i].eventName, subscription[i].handler, subscription[i].userData, - subscription[i].filter, subscription[i].interval, subscription[i].duration, timeout, NULL, subscription[i].publishOnSubscribe); + subscription[i].filter, subscription[i].interval, subscription[i].duration, timeout, NULL, subscription[i].publishOnSubscribe, true); if(errorcode != RBUS_ERROR_SUCCESS) { /* Treat SubscribeEx like a transaction because @@ -5074,13 +5310,37 @@ rbusError_t rbusEvent_SubscribeExRawData( else { HANDLE_EVENTSUBS_MUTEX_LOCK(handle); - subInternal = rbusEventSubscription_find(handleInfo->eventSubs, subscription[i].eventName, subscription[i].filter, subscription[i].interval, subscription[i].duration); - snprintf(rawDataTopic, RBUS_MAX_NAME_LENGTH, "rawdata.%s", subscription[i].eventName); - errorcode = rbusMessage_AddListener(handle, rawDataTopic, - _subscribe_rawdata_handler, (void *)(subInternal->sub)); - if(errorcode != RBUS_ERROR_SUCCESS) + rtConnection myConn = rbuscore_FindClientPrivateConnection(subscription[i].eventName); + if(myConn) { - RBUSLOG_ERROR("%s: Listener failed err: %d", __FUNCTION__, errorcode); + snprintf(rawDataTopic, RBUS_MAX_NAME_LENGTH, "rawdata.%s", subscription[i].eventName); + subInternal = rbusEventSubscription_find(handleInfo->eventSubs, subscription[i].eventName, subscription[i].filter, subscription[i].interval, subscription[i].duration, true); + if(subInternal) + { + errorcode = rbusMessage_RemoveListener(handle, rawDataTopic, subInternal->subscriptionId); + if (errorcode != RT_OK) + { + RBUSLOG_WARN("rtConnection_RemoveListener:%d", errorcode); + } + } + memset(rawDataTopic, '\0', strlen(rawDataTopic)); + snprintf(rawDataTopic, RBUS_MAX_NAME_LENGTH, "%s", subscription[i].eventName); + errorcode = rbusMessage_AddPrivateListener(handle, rawDataTopic, _subscribe_rawdata_handler, (void *)(subInternal->sub), subInternal->subscriptionId); + if(errorcode != RBUS_ERROR_SUCCESS) + { + RBUSLOG_ERROR("%s: Listener failed err: %d", __FUNCTION__, errorcode); + } + } + else + { + subInternal = rbusEventSubscription_find(handleInfo->eventSubs, subscription[i].eventName, subscription[i].filter, subscription[i].interval, subscription[i].duration, true); + snprintf(rawDataTopic, RBUS_MAX_NAME_LENGTH, "rawdata.%s", subscription[i].eventName); + errorcode = rbusMessage_AddListener(handle, rawDataTopic, + _subscribe_rawdata_handler, (void *)(subInternal->sub), subInternal->subscriptionId); + if(errorcode != RBUS_ERROR_SUCCESS) + { + RBUSLOG_ERROR("%s: Listener failed err: %d", __FUNCTION__, errorcode); + } } HANDLE_EVENTSUBS_MUTEX_UNLOCK(handle); } @@ -5114,7 +5374,7 @@ rbusError_t rbusEvent_SubscribeExAsync( errorcode = rbusEvent_SubscribeWithRetries( handle, subscription[i].eventName, subscription[i].handler, subscription[i].userData, - subscription[i].filter, subscription[i].interval, subscription[i].duration, timeout, subscribeHandler, false); + subscription[i].filter, subscription[i].interval, subscription[i].duration, timeout, subscribeHandler, false, false); if(errorcode != RBUS_ERROR_SUCCESS) { @@ -5139,7 +5399,6 @@ rbusError_t rbusEvent_UnsubscribeExRawData( int numSubscriptions) { rbusError_t errorcode = RBUS_ERROR_SUCCESS; - char rawDataTopic[RBUS_MAX_NAME_LENGTH] = {0}; struct _rbusHandle* handleInfo = (struct _rbusHandle*)handle; VERIFY_NULL(handle); @@ -5167,17 +5426,12 @@ rbusError_t rbusEvent_UnsubscribeExRawData( /*the use of rtVector is inefficient here. I have to loop through the vector to find the sub by name, then call RemoveItem, which loops through again to find the item by address to destroy */ HANDLE_EVENTSUBS_MUTEX_LOCK(handle); - subInternal = rbusEventSubscription_find(handleInfo->eventSubs, subscription[i].eventName, subscription[i].filter, subscription[i].interval, subscription[i].duration); + subInternal = rbusEventSubscription_find(handleInfo->eventSubs, subscription[i].eventName, subscription[i].filter, subscription[i].interval, subscription[i].duration, true); if(subInternal) { errorcode = _rbus_event_unsubscribe(handle, subInternal); if(errorcode != RBUS_ERROR_DESTINATION_NOT_REACHABLE) { - snprintf(rawDataTopic, RBUS_MAX_NAME_LENGTH, "rawdata.%s", subInternal->sub->eventName); - if(RBUS_ERROR_SUCCESS != rbusMessage_RemoveListener(handle, rawDataTopic)) - { - RBUSLOG_WARN("%s: Remove listener failed err: %d", __FUNCTION__, errorcode); - } rbusEventSubscriptionInternal_free(subInternal); } else @@ -5230,6 +5484,7 @@ rbusError_t rbusEvent_UnsubscribeEx( VERIFY_NULL(handle); VERIFY_NULL(subscription); VERIFY_ZERO(numSubscriptions); + char topic[RBUS_MAX_NAME_LENGTH] = {0}; if (handleInfo->m_handleType != RBUS_HWDL_TYPE_REGULAR) return RBUS_ERROR_INVALID_HANDLE; @@ -5248,9 +5503,15 @@ rbusError_t rbusEvent_UnsubscribeEx( HANDLE_EVENTSUBS_MUTEX_LOCK(handleInfo); rbusEventSubscriptionInternal_t* subInternal = NULL; subInternal = rbusEventSubscription_find(handleInfo->eventSubs, subscription[i].eventName, - subscription[i].filter, subscription[i].interval, subscription[i].duration); + subscription[i].filter, subscription[i].interval, subscription[i].duration, false); if(subInternal) { + snprintf(topic, RBUS_MAX_NAME_LENGTH, "%d.%s", subInternal->subscriptionId, subInternal->sub->eventName); + errorcode = rbusMessage_RemoveListener(handle, topic, subInternal->subscriptionId); + if (errorcode != RBUS_ERROR_SUCCESS) + { + RBUSLOG_WARN("rtConnection_RemoveListener failed err:%d", errorcode); + } errorcode = _rbus_event_unsubscribe(handle, subInternal); if(errorcode != RBUS_ERROR_DESTINATION_NOT_REACHABLE) { @@ -5294,12 +5555,12 @@ bool rbusEvent_IsSubscriptionExist( { RBUSLOG_INFO("%s: %s", __FUNCTION__, subscription->eventName); subInternal = rbusEventSubscription_find(handleInfo->eventSubs, subscription[0].eventName, - subscription[0].filter, subscription[0].interval, subscription[0].duration); + subscription[0].filter, subscription[0].interval, subscription[0].duration, false); } else { RBUSLOG_INFO("%s: %s", __FUNCTION__, eventName); - subInternal = rbusEventSubscription_find(handleInfo->eventSubs, eventName, NULL, 0, 0); + subInternal = rbusEventSubscription_find(handleInfo->eventSubs, eventName, NULL, 0, 0, false); } ret = (subInternal ? true : false); @@ -5314,7 +5575,9 @@ rbusError_t rbusEvent_PublishRawData( { struct _rbusHandle* handleInfo = (struct _rbusHandle*)handle; rbusError_t rc = RBUS_ERROR_SUCCESS; - rbusMessage_t msg; + rtListItem listItem; + rbusSubscription_t* subscription; + rtError ret = RT_OK; char rawDataTopic[RBUS_MAX_NAME_LENGTH] = {0}; VERIFY_NULL(handle); @@ -5339,13 +5602,32 @@ rbusError_t rbusEvent_PublishRawData( { return RBUS_ERROR_NOSUBSCRIBERS; } + rtList_GetFront(el->subscriptions, &listItem); + while(listItem) + { + rtListItem_GetData(listItem, (void**)&subscription); + if(!subscription || !subscription->eventName || !subscription->listener) + { + RBUSLOG_INFO("rbusEvent_Publish failed: null subscriber data"); + if(rc == RBUS_ERROR_SUCCESS) + rc = RBUS_ERROR_BUS_ERROR; + rtListItem_GetNext(listItem, &listItem); + } + const rtPrivateClientInfo *pPrivCliInfo = _rbuscore_find_server_privateconnection (subscription->eventName, subscription->listener); + if(pPrivCliInfo && subscription->rawData) + { + ret = rtRouteDirect_SendMessage (pPrivCliInfo, eventData->rawData, eventData->rawDataLen, true, subscription->eventName, subscription->subscriptionId); + if(RT_OK != ret) + { + rc = RBUS_ERROR_BUS_ERROR; + } + } + rtListItem_GetNext(listItem, &listItem); + } snprintf(rawDataTopic, RBUS_MAX_NAME_LENGTH, "rawdata.%s", eventData->name); - msg.topic = rawDataTopic; - msg.data = (uint8_t const*)eventData->rawData; - msg.length = eventData->rawDataLen; - rc = rbusMessage_Send(handle, &msg, RBUS_MESSAGE_CONFIRM_RECEIPT); - if (rc != RBUS_ERROR_SUCCESS) - RBUSLOG_ERROR("%s: rbusMessage_Send failed with return %d", __FUNCTION__, rc); + ret = rtConnection_SendBinary(handleInfo->m_connection, eventData->rawData, eventData->rawDataLen, rawDataTopic); + if(RT_OK != ret) + rc = RBUS_ERROR_BUS_ERROR; return rc; } @@ -5446,7 +5728,7 @@ rbusError_t rbusEvent_Publish( } } - if(publish) + if(publish && !subscription->rawData) { rbusMessage msg; rbusMessage_Init(&msg); @@ -5456,11 +5738,11 @@ rbusError_t rbusEvent_Publish( RBUSLOG_DEBUG("rbusEvent_Publish: publishing event %s to listener %s", subscription->eventName, subscription->listener); err = rbus_publishSubscriberEvent( - handleInfo->componentName, - subscription->eventName/*use the same eventName the consumer subscribed with; not event instance name eventData->name*/, - subscription->listener, - msg); - + handleInfo->componentName, + subscription->eventName/*use the same eventName the consumer subscribed with; not event instance name eventData->name*/, + subscription->listener, + msg, + subscription->subscriptionId); rbusMessage_Release(msg); if(err != RBUSCORE_SUCCESS) @@ -5469,8 +5751,7 @@ rbusError_t rbusEvent_Publish( errOut = err; RBUSLOG_INFO("rbusEvent_Publish failed: rbus_publishSubscriberEvent return error %d", err); } - } - + } rtListItem_GetNext(listItem, &listItem); } HANDLE_SUBS_MUTEX_UNLOCK(handle); diff --git a/src/rbus/rbus_asyncsubscribe.c b/src/rbus/rbus_asyncsubscribe.c index 6023370d..4eb592a6 100644 --- a/src/rbus/rbus_asyncsubscribe.c +++ b/src/rbus/rbus_asyncsubscribe.c @@ -41,7 +41,7 @@ #define UNLOCK() ERROR_CHECK(pthread_mutex_unlock(&gRetrier->mutexQueue)) /*defined in rbus.c*/ -void _subscribe_async_callback_handler(rbusHandle_t handle, rbusEventSubscription_t* subscription, rbusError_t error); +void _subscribe_async_callback_handler(rbusHandle_t handle, rbusEventSubscription_t* subscription, rbusError_t error, uint32_t subscriptionId); int _event_callback_handler(char const* objectName, char const* eventName, rbusMessage message, void* userData); void rbusEventSubscription_free(void* p); @@ -177,11 +177,15 @@ static void rbusAsyncSubscribeRetrier_SendSubscriptionRequests() rbusCoreError_t coreerr; int elapsed; int providerError; + rbusMessage response; + uint32_t subscriptionId = 0; RBUSLOG_INFO("%s: %s subscribing", __FUNCTION__, item->subscription->eventName); - coreerr = rbus_subscribeToEvent(NULL, item->subscription->eventName, - _event_callback_handler, item->payload, item->subscription, &providerError); + coreerr = rbus_subscribeToEventTimeout(NULL, item->subscription->eventName, + _event_callback_handler, item->payload, item->subscription, &providerError, 0, false, &response, false); + if(response) + rbusMessage_GetInt32(response, (int32_t*)&subscriptionId); rtTime_Now(&now); @@ -248,7 +252,7 @@ static void rbusAsyncSubscribeRetrier_SendSubscriptionRequests() } } - _subscribe_async_callback_handler(item->subscription->handle, item->subscription, responseErr); + _subscribe_async_callback_handler(item->subscription->handle, item->subscription, responseErr, subscriptionId); //store the next item, because we are removing this li item from list LOCK(); item->subscription = NULL; diff --git a/src/rbus/rbus_handle.h b/src/rbus/rbus_handle.h index 69972375..f77245e9 100644 --- a/src/rbus/rbus_handle.h +++ b/src/rbus/rbus_handle.h @@ -65,6 +65,7 @@ struct _rbusHandle rbusHandleType_t m_handleType; pthread_mutex_t handle_eventSubsMutex; pthread_mutex_t handle_subsMutex; + rtConnection m_connectionParent; }; void rbusHandleList_Add(struct _rbusHandle* handle); diff --git a/src/rbus/rbus_intervalsubscription.c b/src/rbus/rbus_intervalsubscription.c index 1ebc1769..2843c865 100644 --- a/src/rbus/rbus_intervalsubscription.c +++ b/src/rbus/rbus_intervalsubscription.c @@ -181,7 +181,8 @@ static void* PublishingThreadFunc(void* rec) handleInfo->componentName, sub->eventName/*use the same eventName the consumer subscribed with; not event instance name eventData->name*/, sub->listener, - msg); + msg, + sub->subscriptionId); rbusMessage_Release(msg); rbusObject_Release(data); diff --git a/src/rbus/rbus_message.c b/src/rbus/rbus_message.c index 660cf7fb..51b0113b 100644 --- a/src/rbus/rbus_message.c +++ b/src/rbus/rbus_message.c @@ -18,6 +18,7 @@ */ #include "rbus.h" #include "rbus_handle.h" +#include "rbuscore.h" #include #include @@ -30,6 +31,7 @@ typedef struct rbusMessageHandler_t handler; void* userData; rtConnection connection; + uint32_t subscriptionId; } rbusMessageHandlerContext_t; rbusError_t rtError_to_rBusError(rtError e) @@ -94,11 +96,50 @@ static int compareContextExpression(const void *left, const void *right) return strcmp(((const rbusMessageHandlerContext_t*)left)->expression, (const char*)right); } +rbusError_t rbusMessage_AddPrivateListener( + rbusHandle_t handle, + char const* expression, + rbusMessageHandler_t handler, + void* userData, + uint32_t subscriptionId) +{ + VERIFY_NULL(handle); + VERIFY_NULL(expression); + + char rawDataTopic[RBUS_MAX_NAME_LENGTH] = {0}; + + rtConnection myConn = rbuscore_FindClientPrivateConnection(expression); + if (NULL == myConn) + { + return RBUS_ERROR_DIRECT_CON_NOT_EXIST; + } + + rbusMessageHandlerContext_t* ctx = rt_malloc(sizeof(rbusMessageHandlerContext_t)); + ctx->handle = handle; + ctx->expression = strdup(expression); + ctx->handler = handler; + ctx->userData = userData; + ctx->connection = myConn; + ctx->subscriptionId = subscriptionId; + rtVector_PushBack(handle->messageCallbacks, ctx); + + snprintf(rawDataTopic, RBUS_MAX_NAME_LENGTH, "%d.%s", subscriptionId, expression); + rtError e = rtConnection_AddListener(myConn, rawDataTopic, &rtMessage_CallbackHandler, ctx, subscriptionId); + if (e != RT_OK) + { + RBUSLOG_WARN("rtConnection_AddListener:%s", rtStrError(e)); + return RBUS_ERROR_BUS_ERROR; + } + + return RBUS_ERROR_SUCCESS; +} + rbusError_t rbusMessage_AddListener( rbusHandle_t handle, char const* expression, rbusMessageHandler_t handler, - void* userData) + void* userData, + uint32_t subscriptionId) { VERIFY_NULL(handle); VERIFY_NULL(expression); @@ -110,9 +151,10 @@ rbusError_t rbusMessage_AddListener( ctx->handler = handler; ctx->userData = userData; ctx->connection = con; + ctx->subscriptionId = subscriptionId; rtVector_PushBack(handle->messageCallbacks, ctx); - rtError e = rtConnection_AddListener(con, expression, &rtMessage_CallbackHandler, ctx); + rtError e = rtConnection_AddListener(con, expression, &rtMessage_CallbackHandler, ctx, subscriptionId); if (e != RT_OK) { RBUSLOG_WARN("rtConnection_AddListener:%s", rtStrError(e)); @@ -122,16 +164,44 @@ rbusError_t rbusMessage_AddListener( return RBUS_ERROR_SUCCESS; } +rbusError_t rbusMessage_RemovePrivateListener( + rbusHandle_t handle, + char const* expression, + uint32_t subscriptionId) +{ + VERIFY_NULL(handle); + char rawDataTopic[RBUS_MAX_NAME_LENGTH] = {0}; + + rtConnection myConn = rbuscore_FindClientPrivateConnection(expression); + if (NULL == myConn) + { + return RBUS_ERROR_DIRECT_CON_NOT_EXIST; + } + + snprintf(rawDataTopic, RBUS_MAX_NAME_LENGTH, "%d.%s", subscriptionId, expression); + rtVector_RemoveItemByCompare(handle->messageCallbacks, rawDataTopic, compareContextExpression, cleanupContext); + + rtError e = rtConnection_RemoveListenerWithId(myConn, subscriptionId); + if (e != RT_OK) + { + RBUSLOG_WARN("rtConnection_RemoveListener:%s", rtStrError(e)); + return RBUS_ERROR_BUS_ERROR; + } + + return RBUS_ERROR_SUCCESS; +} + rbusError_t rbusMessage_RemoveListener( rbusHandle_t handle, - char const* expression) + char const* expression, + uint32_t subscriptionId) { VERIFY_NULL(handle); rtConnection con = ((struct _rbusHandle*)handle)->m_connection; rtVector_RemoveItemByCompare(handle->messageCallbacks, expression, compareContextExpression, cleanupContext); - rtError e = rtConnection_RemoveListener(con, expression); + rtError e = rtConnection_RemoveListenerWithId(con, subscriptionId); if (e != RT_OK) { RBUSLOG_WARN("rtConnection_RemoveListener:%s", rtStrError(e)); @@ -152,7 +222,7 @@ rbusError_t rbusMessage_RemoveAllListeners( { rbusMessageHandlerContext_t* ctx = rtVector_At(handle->messageCallbacks, i); VERIFY_NULL(ctx); - rtError e = rtConnection_RemoveListener(con, ctx->expression); + rtError e = rtConnection_RemoveListenerWithId(con, ctx->subscriptionId); if (e != RT_OK) { RBUSLOG_WARN("rbusMessage_RemoveAllListener %s :%s", ctx->expression, rtStrError(e)); diff --git a/src/rbus/rbus_subscriptions.c b/src/rbus/rbus_subscriptions.c index 9f881f01..1d5e8228 100644 --- a/src/rbus/rbus_subscriptions.c +++ b/src/rbus/rbus_subscriptions.c @@ -45,7 +45,7 @@ static void rbusSubscriptions_saveCache(rbusSubscriptions_t subscriptions); int subscribeHandlerImpl(rbusHandle_t handle, bool added, elementNode* el, char const* eventName, char const* listener, int32_t componentId, int32_t interval, int32_t duration, rbusFilter_t filter); -static int subscriptionKeyCompare(rbusSubscription_t* subscription, char const* listener, int32_t componentId, char const* eventName, rbusFilter_t filter, int32_t interval, int32_t duration) +static int subscriptionKeyCompare(rbusSubscription_t* subscription, char const* listener, int32_t componentId, char const* eventName, rbusFilter_t filter, int32_t interval, int32_t duration, bool rawData) { int rc; if((rc = strcmp(subscription->listener, listener)) == 0) @@ -57,7 +57,7 @@ static int subscriptionKeyCompare(rbusSubscription_t* subscription, char const* if ((rc = rbusFilter_Compare(subscription->filter, filter)) == 0) { rc = ((subscription->interval == interval) && - (subscription->duration == duration)) ? 0 : 1; + (subscription->duration == duration) && (subscription->rawData == rawData)) ? 0 : 1; } } } @@ -118,9 +118,13 @@ void rbusSubscriptions_destroy(rbusSubscriptions_t subscriptions) static void rbusSubscriptions_onSubscriptionCreated(rbusSubscription_t* sub, elementNode* node); /*add a new subscription*/ -rbusSubscription_t* rbusSubscriptions_addSubscription(rbusSubscriptions_t subscriptions, char const* listener, char const* eventName, int32_t componentId, rbusFilter_t filter, int32_t interval, int32_t duration, bool autoPublish, elementNode* registryElem) +rbusSubscription_t* rbusSubscriptions_addSubscription(rbusSubscriptions_t subscriptions, char const* listener, char const* eventName, int32_t componentId, rbusFilter_t filter, int32_t interval, int32_t duration, bool autoPublish, elementNode* registryElem, bool rawData) { rbusSubscription_t* sub; + static uint32_t subscriptionId = 4; /* Starting the subscription ID with 4 as the initial 3 values are allocated for the below add listener + rtconnection create internal + rbus register object + client advisory */ TokenChain* tokens; RBUSLOG_DEBUG("%s: adding %s %s", __FUNCTION__, listener, eventName); @@ -147,18 +151,21 @@ rbusSubscription_t* rbusSubscriptions_addSubscription(rbusSubscriptions_t subscr sub->autoPublish = autoPublish; sub->element = registryElem; sub->tokens = tokens; + sub->rawData = rawData; + sub->subscriptionId = subscriptionId; rtList_Create(&sub->instances); rtList_PushBack(subscriptions->subList, sub, NULL); rbusSubscriptions_onSubscriptionCreated(sub, subscriptions->root); rbusSubscriptions_saveCache(subscriptions); + subscriptionId++; return sub; } /*get an existing subscription by searching for its unique key [eventName, listener, filter]*/ -rbusSubscription_t* rbusSubscriptions_getSubscription(rbusSubscriptions_t subscriptions, char const* listener, char const* eventName, int32_t componentId, rbusFilter_t filter, int32_t interval, int32_t duration) +rbusSubscription_t* rbusSubscriptions_getSubscription(rbusSubscriptions_t subscriptions, char const* listener, char const* eventName, int32_t componentId, rbusFilter_t filter, int32_t interval, int32_t duration, bool rawData) { rtListItem item; rbusSubscription_t* sub; @@ -178,7 +185,7 @@ rbusSubscription_t* rbusSubscriptions_getSubscription(rbusSubscriptions_t subscr return NULL; RBUSLOG_DEBUG("%s: comparing to %s %s", __FUNCTION__, sub->listener, sub->eventName); - if(subscriptionKeyCompare(sub, listener, componentId, eventName, filter, interval, duration) == 0) + if(subscriptionKeyCompare(sub, listener, componentId, eventName, filter, interval, duration, rawData) == 0) { RBUSLOG_DEBUG("%s: found sub %s %s %d", __FUNCTION__, listener, eventName, componentId); return sub; @@ -344,7 +351,7 @@ void rbusSubscriptions_onElementDeleted(rbusSubscriptions_t subscriptions, eleme if(val) { subscription = rbusSubscriptions_getSubscription(subscriptions, sub->listener, sub->eventName, - sub->componentId, sub->filter, sub->interval, sub->duration); + sub->componentId, sub->filter, sub->interval, sub->duration, sub->rawData); if(!subscription) { RBUSLOG_INFO("unsubscribing from event which isn't currectly subscribed to event=%s listener=%s", sub->eventName, sub->listener); diff --git a/src/rbus/rbus_subscriptions.h b/src/rbus/rbus_subscriptions.h index 48120a90..55b0bcaf 100644 --- a/src/rbus/rbus_subscriptions.h +++ b/src/rbus/rbus_subscriptions.h @@ -46,6 +46,8 @@ typedef struct _rbusSubscription rtList instances; /* the instance elements e.g. Device.WiFi.AccessPoint.1.AssociatedDevice.1.SignalStrength Device.WiFi.AccessPoint.1.AssociatedDevice.2.SignalStrength Device.WiFi.AccessPoint.2.AssociatedDevice.1.SignalStrength */ + bool rawData; + uint32_t subscriptionId; } rbusSubscription_t; /*create a new subscriptions registry for an rbus handle*/ @@ -55,10 +57,10 @@ void rbusSubscriptions_create(rbusSubscriptions_t* subscriptions, rbusHandle_t h void rbusSubscriptions_destroy(rbusSubscriptions_t subscriptions); /*add a new subscription with unique key [listener, eventName, filter] and the corresponding*/ -rbusSubscription_t* rbusSubscriptions_addSubscription(rbusSubscriptions_t subscriptions, char const* listener, char const* eventName, int32_t componentId, rbusFilter_t filter, int32_t interval, int32_t duration, bool autoPublish, elementNode* registryElem); +rbusSubscription_t* rbusSubscriptions_addSubscription(rbusSubscriptions_t subscriptions, char const* listener, char const* eventName, int32_t componentId, rbusFilter_t filter, int32_t interval, int32_t duration, bool autoPublish, elementNode* registryElem, bool rawData); /*get an existing subscription by searching for its unique key [listener, eventName, filter]*/ -rbusSubscription_t* rbusSubscriptions_getSubscription(rbusSubscriptions_t subscriptions, char const* listener, char const* eventName, int32_t componentId, rbusFilter_t filter, int32_t interval, int32_t duration); +rbusSubscription_t* rbusSubscriptions_getSubscription(rbusSubscriptions_t subscriptions, char const* listener, char const* eventName, int32_t componentId, rbusFilter_t filter, int32_t interval, int32_t duration, bool rawData); /*remove an existing subscription*/ void rbusSubscriptions_removeSubscription(rbusSubscriptions_t subscriptions, rbusSubscription_t* sub); diff --git a/src/rtmessage/rtConnection.c b/src/rtmessage/rtConnection.c index 1f87e27e..d50f583c 100644 --- a/src/rtmessage/rtConnection.c +++ b/src/rtmessage/rtConnection.c @@ -60,7 +60,8 @@ typedef volatile int atomic_uint_least32_t; #include #include -#define RTMSG_LISTENERS_MAX 64 +#define RTMSG_LISTENERS_MAX 128 +#define RTCONNECTION_CREATE_SUBSCRIPTION_ID 1 #ifdef RDKC_BUILD #define RTMSG_SEND_BUFFER_SIZE (1024 * 8) #else @@ -244,13 +245,6 @@ rtConnection_SendRequestInternal( int32_t timeout, int flags); -static uint32_t -rtConnection_GetNextSubscriptionId() -{ - static uint32_t next_id = 1; - return next_id++; -} - static int GetRunThreadsSync(rtConnection con) { int run_threads; @@ -605,7 +599,7 @@ rtConnection_CreateInternal(rtConnection* con, char const* application_name, cha if (err == RT_OK) { - rtConnection_AddListener(c, c->inbox_name, onDefaultMessage, c); + rtConnection_AddListener(c, c->inbox_name, onDefaultMessage, c, RTCONNECTION_CREATE_SUBSCRIPTION_ID); rtConnection_StartThreads(c); *con = c; } @@ -1229,7 +1223,7 @@ rtConnection_SendInternal(rtConnection con, uint8_t const* buff, uint32_t n, cha } rtError -rtConnection_AddListener(rtConnection con, char const* expression, rtMessageCallback callback, void* closure) +rtConnection_AddListener(rtConnection con, char const* expression, rtMessageCallback callback, void* closure, uint32_t subscriptionId) { int i; @@ -1238,21 +1232,6 @@ rtConnection_AddListener(rtConnection con, char const* expression, rtMessageCall pthread_mutex_lock(&con->mutex); - /*Prevent a client from adding multiple listener callbacks to the same expression. - This is not the same as preventing duplicate entries in rtrouted (i.e. RT_ERROR_DUPLICATE_ENTRY). - This is to not break rtConnection_RemoveListener which doesn't take a callback parameter - and can't know which listener callback (if there were multi) you want to remove. - So we just prevent multi adds here to prevent RemoveListner issues*/ - for (i = 0; i < RTMSG_LISTENERS_MAX; ++i) - { - if ((con->listeners[i].in_use) && (0 == strcmp(expression, con->listeners[i].expression))) - { - rtLog_Error("Listener already exist for %s. Multiple callbacks to the same expression not allowed.", expression); - pthread_mutex_unlock(&con->mutex); - return RT_FAIL; - } - } - /*find an open listener to use*/ for (i = 0; i < RTMSG_LISTENERS_MAX; ++i) { @@ -1267,7 +1246,7 @@ rtConnection_AddListener(rtConnection con, char const* expression, rtMessageCall } con->listeners[i].in_use = 1; - con->listeners[i].subscription_id = rtConnection_GetNextSubscriptionId(); + con->listeners[i].subscription_id = subscriptionId; con->listeners[i].closure = closure; con->listeners[i].callback = callback; con->listeners[i].expression = strdup(expression); @@ -1285,10 +1264,11 @@ rtConnection_AddListener(rtConnection con, char const* expression, rtMessageCall } rtError -rtConnection_RemoveListener(rtConnection con, char const* expression) +rtConnection_RemoveListenerWithId(rtConnection con, uint32_t listenerId) { int i; int route_id = 0; + char expression[RTMSG_HEADER_MAX_TOPIC_LENGTH] = {0}; if (!con) return rtErrorFromErrno(EINVAL); @@ -1296,8 +1276,9 @@ rtConnection_RemoveListener(rtConnection con, char const* expression) pthread_mutex_lock(&con->mutex); for (i = 0; i < RTMSG_LISTENERS_MAX; ++i) { - if ((con->listeners[i].in_use) && (0 == strcmp(expression, con->listeners[i].expression))) + if ((con->listeners[i].in_use) && (listenerId == con->listeners[i].subscription_id)) { + strncpy(expression, strdup(con->listeners[i].expression), RTMSG_HEADER_MAX_TOPIC_LENGTH); con->listeners[i].in_use = 0; route_id = con->listeners[i].subscription_id; con->listeners[i].subscription_id = 0; @@ -1311,13 +1292,13 @@ rtConnection_RemoveListener(rtConnection con, char const* expression) pthread_mutex_unlock(&con->mutex); if (i >= RTMSG_LISTENERS_MAX) - return RT_ERROR_INVALID_ARG; + return RT_ERROR_INVALID_ARG; rtMessage m; rtMessage_Create(&m); rtMessage_SetInt32(m, "add", 0); rtMessage_SetString(m, "topic", expression); - rtMessage_SetInt32(m, "route_id", route_id); + rtMessage_SetInt32(m, "route_id", route_id); rtConnection_SendMessage(con, m, "_RTROUTED.INBOX.SUBSCRIBE"); rtMessage_Release(m); return 0; diff --git a/src/rtmessage/rtConnection.h b/src/rtmessage/rtConnection.h index b986c260..abe121c3 100644 --- a/src/rtmessage/rtConnection.h +++ b/src/rtmessage/rtConnection.h @@ -186,7 +186,7 @@ rtConnection_SendBinaryResponse(rtConnection con, rtMessageHeader const* request */ rtError rtConnection_AddListener(rtConnection con, char const* expression, - rtMessageCallback callback, void* closure); + rtMessageCallback callback, void* closure, uint32_t subscriptionId); /** * Remove a callback listener @@ -195,7 +195,7 @@ rtConnection_AddListener(rtConnection con, char const* expression, * @return error */ rtError -rtConnection_RemoveListener(rtConnection con, char const* expression); +rtConnection_RemoveListenerWithId(rtConnection con, uint32_t listenerId); /** * Add an alias to an existing listener diff --git a/src/rtmessage/rtrouteBase.c b/src/rtmessage/rtrouteBase.c index e75da1a9..83f480a7 100644 --- a/src/rtmessage/rtrouteBase.c +++ b/src/rtmessage/rtrouteBase.c @@ -497,21 +497,26 @@ rtRouteDirect_AcceptClientConnection(rtListener* listener) } rtError -rtRouteDirect_SendMessage(const rtPrivateClientInfo* pClient, uint8_t const* pInBuff, int inLength) +rtRouteDirect_SendMessage(const rtPrivateClientInfo* pClient, uint8_t const* pInBuff, int inLength, bool rawData, char* subEventName, uint32_t subscriptionId) { rtError ret = RT_OK; rtMessageHeader new_header; ssize_t bytes_sent; + (void)rawData; if (pClient && pInBuff && (inLength > 0)) { rtMessageHeader_Init(&new_header); new_header.sequence_number = 1; new_header.flags = rtMessageFlags_RawBinary; - new_header.control_data = pClient->clientID; - - strncpy(new_header.topic, pClient->clientTopic, RTMSG_HEADER_MAX_TOPIC_LENGTH-1); - new_header.topic_length = strlen(pClient->clientTopic); + + if(subscriptionId) + new_header.control_data = subscriptionId; /* Rawdata unique subscription ID */ + else + new_header.control_data = pClient->clientID; + + snprintf(new_header.topic, RTMSG_HEADER_MAX_TOPIC_LENGTH, "%d.%s", subscriptionId ,subEventName); + new_header.topic_length = strlen(new_header.topic); new_header.reply_topic[0] = '\0'; new_header.reply_topic_length = 0; diff --git a/src/rtmessage/rtrouteBase.h b/src/rtmessage/rtrouteBase.h index 0e0db3d2..3d87ee60 100644 --- a/src/rtmessage/rtrouteBase.h +++ b/src/rtmessage/rtrouteBase.h @@ -81,6 +81,6 @@ rtError rtRouteBase_BindListener(char const* socket_name, int no_delay, int inde rtError rtRouteBase_CloseListener(rtListener *pListener); rtError rtRouteDirect_StartInstance(const char* socket_name, rtDriectClientHandler messageHandler); -rtError rtRouteDirect_SendMessage(const rtPrivateClientInfo* pClient, uint8_t const* pInBuff, int inLength); +rtError rtRouteDirect_SendMessage(const rtPrivateClientInfo* pClient, uint8_t const* pInBuff, int inLength, bool rawData, char* subEventName, uint32_t subscriptionId); #endif /* __RTROUTEBASE_H__ */ diff --git a/unittests/rbusMessageTest.cpp b/unittests/rbusMessageTest.cpp index 0db39a5a..be72f5fd 100644 --- a/unittests/rbusMessageTest.cpp +++ b/unittests/rbusMessageTest.cpp @@ -169,7 +169,7 @@ static int rbus_recv(const char *topic, pid_t pid, int *rbus_send_status, rbusGt userData.test = test; strcpy(userData.componentName, componentName); - ret |= rbusMessage_AddListener(handle, topic, &rbusRecvHandler, (void *)(&userData)); + ret |= rbusMessage_AddListener(handle, topic, &rbusRecvHandler, (void *)(&userData), 0); EXPECT_EQ(ret,RBUS_ERROR_SUCCESS); if(rbus_send_status) { @@ -183,7 +183,7 @@ static int rbus_recv(const char *topic, pid_t pid, int *rbus_send_status, rbusGt pthread_cond_wait(&cond, &lock); } - ret |= rbusMessage_RemoveListener(handle, topic); + ret |= rbusMessage_RemoveListener(handle, topic, 0); EXPECT_EQ(ret,RBUS_ERROR_SUCCESS); ret |= rbus_close(handle); @@ -237,6 +237,7 @@ static int rbus_multi_send(const char *topic, int index) static int rbus_single_recv(const char *topic, pid_t pid, rbusGtestMsg_t test) { int i = 0, ret = RBUS_ERROR_BUS_ERROR, wait_ret = -1; + uint32_t listenerId; char user_data[32] = {0}; rbusHandle_t handle; char *componentName = NULL; @@ -264,7 +265,7 @@ static int rbus_single_recv(const char *topic, pid_t pid, rbusGtestMsg_t test) userData.cond = NULL; strcpy(userData.componentName, componentName); - ret |= rbusMessage_AddListener(handle, topic, &rbusRecvHandler, (void *)(&userData)); + ret |= rbusMessage_AddListener(handle, topic, &rbusRecvHandler, (void *)(&userData), 0); EXPECT_EQ(ret, RBUS_ERROR_SUCCESS); hasSenderStarted(handle, "rbus_multi_send0"); @@ -285,7 +286,7 @@ static int rbus_single_recv(const char *topic, pid_t pid, rbusGtestMsg_t test) ret |= (wait_ret != pid_arr[i]) ? RBUS_ERROR_BUS_ERROR : RBUS_ERROR_SUCCESS; } - ret |= rbusMessage_RemoveListener(handle, topic); + ret |= rbusMessage_RemoveListener(handle, topic, listenerId); EXPECT_EQ(ret,RBUS_ERROR_SUCCESS); ret |= rbus_close(handle); @@ -302,6 +303,7 @@ static int rbus_single_recv(const char *topic, pid_t pid, rbusGtestMsg_t test) static int rbus_multi_recv(const char *topic, int index, rbusGtestMsg_t test) { rbusHandle_t handle; + uint32_t listenerId; int ret = RBUS_ERROR_BUS_ERROR; char componentName[32] = {0}; pthread_cond_t cond = PTHREAD_COND_INITIALIZER; @@ -318,12 +320,12 @@ static int rbus_multi_recv(const char *topic, int index, rbusGtestMsg_t test) userData.cond = &cond; strcpy(userData.componentName, componentName); - ret |= rbusMessage_AddListener(handle, topic, &rbusRecvHandler, (void *)(&userData)); + ret |= rbusMessage_AddListener(handle, topic, &rbusRecvHandler, (void *)(&userData), 0); EXPECT_EQ(ret, RBUS_ERROR_SUCCESS); pthread_cond_wait(&cond, &lock); - ret |= rbusMessage_RemoveListener(handle, topic); + ret |= rbusMessage_RemoveListener(handle, topic, listenerId); EXPECT_EQ(ret,RBUS_ERROR_SUCCESS); ret |= rbus_close(handle); @@ -521,9 +523,9 @@ static void exec_msg_test(rbusGtestMsg_t test) rbusHandle_t rbus; EXPECT_EQ(rbus_open(&rbus, "rbus_recv"),0); - EXPECT_EQ(rbusMessage_AddListener(rbus, "A.B.C", &rbusRecvHandler, NULL),0); - EXPECT_NE(rbusMessage_AddListener(rbus, "A.B.C", &rbusRecvHandler, NULL),0); - EXPECT_EQ(rbusMessage_RemoveListener(rbus, "A.B.C"),0); + EXPECT_EQ(rbusMessage_AddListener(rbus, "A.B.C", &rbusRecvHandler, NULL, 0),0); + EXPECT_NE(rbusMessage_AddListener(rbus, "A.B.C", &rbusRecvHandler, NULL, 0),0); + EXPECT_EQ(rbusMessage_RemoveListener(rbus, "A.B.C", 0),0); EXPECT_EQ(rbus_close(rbus),0); } break; diff --git a/unittests/rbus_unit_test_event_client.cpp b/unittests/rbus_unit_test_event_client.cpp index b29bfafd..353fddae 100644 --- a/unittests/rbus_unit_test_event_client.cpp +++ b/unittests/rbus_unit_test_event_client.cpp @@ -192,9 +192,9 @@ TEST_F(EventClientAPIs, rbus_subscribeToEventTimeout_test1) printf("********************* CREATING CLIENT : %s \n", client_name); conn_status = CALL_RBUS_OPEN_BROKER_CONNECTION(client_name); ASSERT_EQ(conn_status, true) << "RBUS_OPEN_BROKER_CONNECTION failed"; - err = rbus_subscribeToEventTimeout(obj_name, "event_1",&event_callback, NULL, NULL, NULL, 1000, false, NULL); + err = rbus_subscribeToEventTimeout(obj_name, "event_1",&event_callback, NULL, NULL, NULL, 1000, false, NULL, false); ASSERT_EQ(err,RBUSCORE_SUCCESS) << "rbus_subscribeToEventTimeout failed"; - err = rbus_subscribeToEventTimeout(obj_name, "event_1",&event_callback, NULL, NULL, NULL, 1000, false, NULL); + err = rbus_subscribeToEventTimeout(obj_name, "event_1",&event_callback, NULL, NULL, NULL, 1000, false, NULL, false); ASSERT_EQ(err,RBUSCORE_SUCCESS) << "rbus_subscribeToEventTimeout failed"; printf("********************Subscribed Events with Event name: event_1 in 1000 ms \n"); conn_status = CALL_RBUS_CLOSE_BROKER_CONNECTION(); @@ -211,20 +211,20 @@ TEST_F(EventClientAPIs, rbus_subscribeToEventTimeout_test2) conn_status = CALL_RBUS_OPEN_BROKER_CONNECTION(client_name); ASSERT_EQ(conn_status, true) << "RBUS_OPEN_BROKER_CONNECTION failed"; //Test with invalid objname passed - err = rbus_subscribeToEventTimeout(NULL, "event_1", &event_callback, NULL, NULL, NULL, 1000, false, NULL); + err = rbus_subscribeToEventTimeout(NULL, "event_1", &event_callback, NULL, NULL, NULL, 1000, false, NULL, false); ASSERT_EQ(err,RBUSCORE_ERROR_DESTINATION_UNREACHABLE) << "rbus_subscribeToEventTimeout failed"; //Test with the event name to be NULL - err = rbus_subscribeToEventTimeout(obj_name, "NULL", &event_callback, NULL, NULL, NULL, 1000, false, NULL); + err = rbus_subscribeToEventTimeout(obj_name, "NULL", &event_callback, NULL, NULL, NULL, 1000, false, NULL, false); ASSERT_EQ(err,RBUSCORE_ERROR_GENERAL) << "rbus_subscribeToEventTimeout failed"; //Test with the callback to be NULL - err = rbus_subscribeToEventTimeout(obj_name, "event_1",NULL, NULL, NULL, NULL, 1000, false, NULL); + err = rbus_subscribeToEventTimeout(obj_name, "event_1",NULL, NULL, NULL, NULL, 1000, false, NULL, false); ASSERT_EQ(err,RBUSCORE_ERROR_INVALID_PARAM) << "rbus_subscribeToEventTimeout failed"; //Test with the valid Event name, objname, Callback - err = rbus_subscribeToEventTimeout(obj_name, "event_1",&event_callback, NULL, NULL, NULL, 1000, false, NULL); + err = rbus_subscribeToEventTimeout(obj_name, "event_1",&event_callback, NULL, NULL, NULL, 1000, false, NULL, false); ASSERT_EQ(err,RBUSCORE_SUCCESS) << "rbus_subscribeToEventTimeout failed"; printf("Subscribed Events with Event name: event_1 \n"); //Test with the already subscribed Event - err = rbus_subscribeToEventTimeout(obj_name, "event_1",&event_callback, NULL, NULL, NULL, 1000, false, NULL); + err = rbus_subscribeToEventTimeout(obj_name, "event_1",&event_callback, NULL, NULL, NULL, 1000, false, NULL, false); ASSERT_EQ(err,RBUSCORE_SUCCESS) << "rbus_subscribeToEventTimeout failed"; conn_status = CALL_RBUS_CLOSE_BROKER_CONNECTION(); ASSERT_EQ(conn_status, true) << "RBUS_CLOSE_BROKER_CONNECTION failed"; @@ -241,7 +241,7 @@ TEST_F(EventClientAPIs, rbus_subscribeToEventTimeout_test3) ASSERT_EQ(conn_status, true) << "RBUS_OPEN_BROKER_CONNECTION failed"; //Boundary Test with MAX_OBJECT_NAME_LENGTH memset(obj_name, 't', (sizeof(obj_name)- 1)); - err = rbus_subscribeToEventTimeout(obj_name, "event_1",&event_callback, NULL, NULL, NULL, 1000, false, NULL); + err = rbus_subscribeToEventTimeout(obj_name, "event_1",&event_callback, NULL, NULL, NULL, 1000, false, NULL, false); ASSERT_EQ(err,RBUSCORE_ERROR_INVALID_PARAM) << "rbus_subscribeToEventTimeout failed"; conn_status = CALL_RBUS_CLOSE_BROKER_CONNECTION(); ASSERT_EQ(conn_status, true) << "RBUS_CLOSE_BROKER_CONNECTION failed"; @@ -257,16 +257,16 @@ TEST_F(EventClientAPIs, rbus_unsubscribeFromEvent_test1) conn_status = CALL_RBUS_OPEN_BROKER_CONNECTION(client_name); ASSERT_EQ(conn_status, true) << "RBUS_OPEN_BROKER_CONNECTION failed"; //Test with objname to be NULL - err = rbus_unsubscribeFromEvent(NULL, "event_1", NULL); + err = rbus_unsubscribeFromEvent(NULL, "event_1", NULL, false); ASSERT_EQ(err,RBUSCORE_ERROR_DESTINATION_UNREACHABLE) << "rbus_unsubscribeFromEvent failed"; //Test with the event name to be NULL - err = rbus_unsubscribeFromEvent(obj_name, NULL, NULL); + err = rbus_unsubscribeFromEvent(obj_name, NULL, NULL, false); ASSERT_EQ(err,RBUSCORE_ERROR_GENERAL) << "rbus_unsubscribeFromEvent failed"; //Test with the valid Event name, objname, Callback - err = rbus_unsubscribeFromEvent(obj_name, "event_1", NULL); + err = rbus_unsubscribeFromEvent(obj_name, "event_1", NULL, false); ASSERT_EQ(err,RBUSCORE_SUCCESS) << "rbus_unsubscribeFromEvent failed"; //Test with the already unsubscribed Event - err = rbus_unsubscribeFromEvent(obj_name, "event_1", NULL); + err = rbus_unsubscribeFromEvent(obj_name, "event_1", NULL, false); ASSERT_EQ(err,RBUSCORE_SUCCESS) << "rbus_unsubscribeFromEvent failed"; conn_status = CALL_RBUS_CLOSE_BROKER_CONNECTION(); ASSERT_EQ(conn_status, true) << "RBUS_CLOSE_BROKER_CONNECTION failed"; diff --git a/unittests/rbus_unit_test_event_server.cpp b/unittests/rbus_unit_test_event_server.cpp index 5ac121b7..d1855d5c 100644 --- a/unittests/rbus_unit_test_event_server.cpp +++ b/unittests/rbus_unit_test_event_server.cpp @@ -319,19 +319,19 @@ TEST_F(EventServerAPIs, rbus_subscribeToEventTimeout_test1) char event_name[130] ="0"; rbusCoreError_t err = RBUSCORE_SUCCESS; //Neg test subscribe before establishing connection - err = rbus_subscribeToEventTimeout("obj_name", "event_1",&event_callback, NULL, NULL, NULL, 1000, false, NULL); + err = rbus_subscribeToEventTimeout("obj_name", "event_1",&event_callback, NULL, NULL, NULL, 1000, false, NULL, false); EXPECT_EQ(err,RBUSCORE_ERROR_INVALID_STATE) << "rbus_subscribeToEventTimeout failed"; RBUS_OPEN_BROKER_CONNECTION(client_name,RBUSCORE_SUCCESS); //Neg Test with more than MAX_OBJECT_NAME_LENGTH memset(obj_name, 't', (sizeof(obj_name)- 1)); - err = rbus_subscribeToEventTimeout(obj_name, "event_1",&event_callback, NULL, NULL, NULL, 1000, false, NULL); + err = rbus_subscribeToEventTimeout(obj_name, "event_1",&event_callback, NULL, NULL, NULL, 1000, false, NULL, false); EXPECT_EQ(err,RBUSCORE_ERROR_INVALID_PARAM) << "rbus_subscribeToEventTimeout failed"; //Neg Test with more than MAX_EVENT_NAME_LENGTH memset(event_name, 't', (sizeof(obj_name)- 1)); - err = rbus_subscribeToEventTimeout("object_1", event_name, &event_callback, NULL, NULL, NULL, 1000, false, NULL); + err = rbus_subscribeToEventTimeout("object_1", event_name, &event_callback, NULL, NULL, NULL, 1000, false, NULL, false); EXPECT_EQ(err,RBUSCORE_ERROR_INVALID_PARAM) << "rbus_subscribeToEventTimeout failed"; //Neg test passing object name and callback as NULL - err = rbus_subscribeToEventTimeout(NULL, "event_1",NULL, NULL, NULL, NULL, 1000, false, NULL); + err = rbus_subscribeToEventTimeout(NULL, "event_1",NULL, NULL, NULL, NULL, 1000, false, NULL, false); EXPECT_EQ(err,RBUSCORE_ERROR_INVALID_PARAM) << "rbus_subscribeToEventTimeout failed"; RBUS_CLOSE_BROKER_CONNECTION(RBUSCORE_SUCCESS); } @@ -344,10 +344,10 @@ TEST_F(EventServerAPIs, rbus_unsubscribeFromEvent_test1) RBUS_OPEN_BROKER_CONNECTION(client_name, RBUSCORE_SUCCESS); //Neg Test with more than MAX_OBJECT_NAME_LENGTH memset(object_name, 't', (sizeof(object_name)- 1)); - err = rbus_unsubscribeFromEvent(object_name, "event_1", NULL); + err = rbus_unsubscribeFromEvent(object_name, "event_1", NULL, false); EXPECT_EQ(err,RBUSCORE_ERROR_INVALID_PARAM) << "rbus_unsubscribeFromEvent failed"; //Neg test passing NULL as object name - err = rbus_unsubscribeFromEvent(NULL, NULL,NULL); + err = rbus_unsubscribeFromEvent(NULL, NULL,NULL, false); EXPECT_EQ(err,RBUSCORE_ERROR_INVALID_PARAM) << "rbus_unsubscribeFromEvent failed"; RBUS_CLOSE_BROKER_CONNECTION(RBUSCORE_SUCCESS); } @@ -360,7 +360,7 @@ TEST_F(EventServerAPIs, rbus_publishSubscriberEvent_test1) RBUS_OPEN_BROKER_CONNECTION(client_name, RBUSCORE_SUCCESS); //Neg Test with more than MAX_OBJECT_NAME_LENGTH memset(object_name, 't', (sizeof(object_name)- 1)); - err = rbus_publishSubscriberEvent(object_name, "event_1", NULL, NULL); + err = rbus_publishSubscriberEvent(object_name, "event_1", NULL, NULL, 0); EXPECT_EQ(err,RBUSCORE_ERROR_INVALID_PARAM) << "rbus_publishsubscriberEvent failed"; RBUS_CLOSE_BROKER_CONNECTION(RBUSCORE_SUCCESS); } diff --git a/utils/rbuscli/rbuscli.c b/utils/rbuscli/rbuscli.c index b483ec2d..079177bf 100644 --- a/utils/rbuscli/rbuscli.c +++ b/utils/rbuscli/rbuscli.c @@ -2154,8 +2154,8 @@ void validate_and_execute_publish_command(int argc, char *argv[], bool rawDataPu { rbusEventRawData_t event = {0}; event.name = argv[2]; - event.rawData = argv[3]; - event.rawDataLen = strlen(argv[3]); + event.rawData = argc < 4 ? "default event data" : argv[3]; + event.rawDataLen = strlen(event.rawData); rc = rbusEvent_PublishRawData(g_busHandle, &event); if(rc != RBUS_ERROR_SUCCESS) @@ -2210,11 +2210,11 @@ void validate_and_execute_listen_command(int argc, char *argv[], bool add) printf("value of userData = %s\n", userData); if(add) { - rc = rbusMessage_AddListener(g_busHandle, argv[2], message_receive_handler, userData); + rc = rbusMessage_AddListener(g_busHandle, argv[2], message_receive_handler, userData, 0); } else { - rc = rbusMessage_RemoveListener(g_busHandle, argv[2]); + rc = rbusMessage_RemoveListener(g_busHandle, argv[2], 0); } if(rc != RBUS_ERROR_SUCCESS)