Skip to content

Commit

Permalink
Merge branch 'main' into patch-1
Browse files Browse the repository at this point in the history
  • Loading branch information
karuna2git authored Jan 9, 2024
2 parents 45172bb + 674f9d3 commit f42edd1
Show file tree
Hide file tree
Showing 17 changed files with 348 additions and 181 deletions.
4 changes: 2 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ cmake_minimum_required (VERSION 2.8.12)

if (POLICY CMP0048)
cmake_policy(SET CMP0048 NEW)
project(rbus VERSION 2.0.7)
project(rbus VERSION 2.0.10)
else ()
project(rbus)
set(PROJECT_VERSION "2.0.7")
set(PROJECT_VERSION "2.0.10")
endif (POLICY CMP0048)

list(APPEND CMAKE_MODULE_PATH "${CMAKE_SOURCE_DIR}/cmake/")
Expand Down
5 changes: 5 additions & 0 deletions sampleapps/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ add_executable(rbusOpenTelemetry consumer/rbusOpenTelemetry.c)
add_dependencies(rbusOpenTelemetry rbus)
target_link_libraries(rbusOpenTelemetry rbus)

add_executable(rbusRawDataConsumer consumer/rbusRawDataConsumer.c)
add_dependencies(rbusRawDataConsumer rbus)
target_link_libraries(rbusRawDataConsumer rbus)

add_executable(rbusRawDataProvider provider/rbusRawDataProvider.c)
add_dependencies(rbusRawDataProvider rbus)
target_link_libraries(rbusRawDataProvider rbus)
Expand All @@ -141,6 +145,7 @@ install (TARGETS
rbusCSIProvider
rbusCSIConsumer
rbusDirectConsumer
rbusRawDataConsumer
rbusRawDataProvider
RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR})

Expand Down
128 changes: 128 additions & 0 deletions sampleapps/consumer/rbusRawDataConsumer.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* If not stated otherwise in this file or this component's Licenses.txt file
* the following copyright and licenses apply:
*
* Copyright 2016 RDK Management
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <sys/time.h>
#include <time.h>
#include <unistd.h>
#include <string.h>
#include <getopt.h>
#include <rbus.h>

int loopFor = 24;

static void generalEvent1Handler(
rbusHandle_t handle,
rbusEventRawData_t const* event,
rbusEventSubscription_t* subscription)
{
(void)handle;
(void)subscription;
printf("\nevent_receive_handler1 called\n\r");
printf("Event received %s\n\r", event->name);
printf("Event data: %s\n\r", (char*)event->rawData);
printf("Event data len: %d\n\r", event->rawDataLen);
printf("\n\r");
}

int main(int argc, char *argv[])
{
(void)(argc);
(void)(argv);

int rc = RBUS_ERROR_SUCCESS;
rbusHandle_t handle;
rbusHandle_t directHandle = NULL;
char* data[2] = { "My Data 1", "My Data2" };
rbusEventSubscription_t subscriptions[1] = {
{"Device.Provider1.Event1!", NULL, 0, 0, generalEvent1Handler, data[0], NULL, NULL, false},
};

printf("consumer: start\n");

rc = rbus_open(&handle, "RawDataEventConsumer");
if(rc != RBUS_ERROR_SUCCESS)
{
printf("consumer: rbus_open failed: %d\n", rc);
goto exit;
}

if(argc == 2 && strcmp(argv[1], "opendirect")==0)
{
rc = rbus_openDirect(handle, &directHandle, "Device.Provider1.Event1!");
if (RBUS_ERROR_SUCCESS != rc)
{
printf ("Failed to open direct connection to %s\n\r", "Device.Provider1.Event1!");
}
else
{
printf("******* In rbus_openDirect Mode *******\n");
}
}
printf("-----------------------------------------------------\n");
printf("Testing rbusEvent_SubscribeRawData\n");
printf("-----------------------------------------------------\n");
rc = rbusEvent_SubscribeRawData(
handle,
"Device.Provider1.Event1!",
(rbusEventHandler_t)generalEvent1Handler,
data[0],
0);

if(rc != RBUS_ERROR_SUCCESS)
{
printf("consumer: rbusEvent_Subscribe 1 failed: %d\n", rc);
goto exit;
}

sleep(loopFor/4);
printf("Unsubscribing from Event1\n");
rbusEvent_UnsubscribeRawData(handle, "Device.Provider1.Event1!");

printf("-----------------------------------------------------\n");
printf("Testing rbusEvent_SubscribeExRawData\n");
printf("-----------------------------------------------------\n");
rc = rbusEvent_SubscribeExRawData(handle, subscriptions, 1, 0);

if(rc != RBUS_ERROR_SUCCESS)
{
printf("consumer: rbusEvent_SubscribeExRawData 1 failed: %d\n", rc);
goto exit;
}

sleep(loopFor/4);

if(argc == 2 && strcmp(argv[1], "opendirect")==0)
{
rc = rbus_closeDirect(directHandle);
if (RBUS_ERROR_SUCCESS != rc)
{
printf ("Failed to close direct connection to %s\n\r", "Device.Provider1.Event1!");
}
}
rbusEvent_UnsubscribeExRawData(handle, subscriptions, 1);

exit:
rbus_close(handle);
printf("consumer: exit\n");
return rc;
}
6 changes: 3 additions & 3 deletions sampleapps/provider/rbusRawDataProvider.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
#include <getopt.h>
#include <rbus.h>

int loopFor = 30;
int loopFor = 60;
int subscribed = 0;

rbusError_t eventSubHandler(rbusHandle_t handle, rbusEventSubAction_t action, const char* eventName, rbusFilter_t filter, int32_t interval, bool* autoPublish)
Expand All @@ -45,7 +45,7 @@ rbusError_t eventSubHandler(rbusHandle_t handle, rbusEventSubAction_t action, co
action == RBUS_EVENT_ACTION_SUBSCRIBE ? "subscribe" : "unsubscribe",
eventName);

if(!strcmp("Device.Provider1.Event", eventName))
if(!strcmp("Device.Provider1.Event1!", eventName))
{
subscribed = action == RBUS_EVENT_ACTION_SUBSCRIBE ? 1 : 0;
}
Expand All @@ -64,7 +64,7 @@ int main(int argc, char *argv[])
char componentName[] = "RawDataEventProvider";

rbusDataElement_t dataElements[1] = {
{"Device.Provider1.Event", RBUS_ELEMENT_TYPE_EVENT, {NULL, NULL, NULL, NULL, eventSubHandler, NULL}}
{"Device.Provider1.Event1!", RBUS_ELEMENT_TYPE_EVENT, {NULL, NULL, NULL, NULL, eventSubHandler, NULL}}
};

printf("provider: start\n");
Expand Down
114 changes: 100 additions & 14 deletions src/core/rbuscore.c
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,7 @@ static pthread_mutex_t g_mutex;
static pthread_mutex_t g_directCliMutex;
static pthread_mutex_t g_directServMutex;
static int g_mutex_init = 0;
static bool g_run_event_client_dispatch = false;
static rtVector g_event_subscriptions_for_client; /*client_subscription_t list. Used by the subscriber to track all active subscriptions. */
static rtVector g_queued_requests; /*list of queued_request */

Expand Down Expand Up @@ -811,7 +812,10 @@ rbusCoreError_t rbus_registerObj(const char * object_name, rbus_callback_t handl
server_object_create(&obj, object_name, handler, user_data);

//TODO: callback signature translation. rbusMessage uses a significantly wider signature for callbacks. Translate to something simpler.
err = rtConnection_AddListener(g_connection, object_name, RBUS_REGISTER_OBJECT_EXPRESSION_ID, onMessage, obj);
err = rtConnection_AddListener(g_connection, object_name, onMessage, obj); /* Avoiding rtConnection_AddListenerWithId call here as ccsp code
uses this rbus_registerObj() function call directly and usage
of rtConnection_AddListenerWithId() function will result in
conflict with subscriptionId */

if(RT_OK == err)
{
Expand Down Expand Up @@ -956,8 +960,7 @@ rbusCoreError_t rbus_unregisterObj(const char * object_name)
RBUSCORELOG_ERROR("object_name is invalid.");
return RBUSCORE_ERROR_INVALID_PARAM;
}

err = rtConnection_RemoveListener(g_connection, object_name, RBUS_REGISTER_OBJECT_EXPRESSION_ID);
err = rtConnection_RemoveListener(g_connection, object_name);
if(RT_OK != err)
{
RBUSCORELOG_ERROR("rtConnection_RemoveListener %s failed: Err=%d", object_name, err);
Expand Down Expand Up @@ -1445,6 +1448,85 @@ rbusCoreError_t rbus_unregisterEvent(const char* object_name, const char * event
return ret;
}

static void master_event_callback(rtMessageHeader const* hdr, uint8_t const* data, uint32_t dataLen, void* closure)
{
/*using namespace rbus_client;*/
rbusMessage msg = NULL;
const char * sender = hdr->reply_topic;
const char * event_name = NULL;
const char * object_name = NULL;
int32_t is_rbus_flag = 1;
rtError err;
size_t subs_len;
size_t i;
(void)closure;

/*Sanitize the incoming data.*/
if(MAX_OBJECT_NAME_LENGTH <= strlen(sender))
{
RBUSCORELOG_ERROR("Object name length exceeds limits.");
return;
}

rbusMessage_FromBytes(&msg, data, dataLen);

rbusMessage_BeginMetaSectionRead(msg);
err = rbusMessage_GetString(msg, &event_name);
err = rbusMessage_GetString(msg, &object_name);
err = rbusMessage_GetInt32(msg, &is_rbus_flag);
rbusMessage_EndMetaSectionRead(msg);
if(RT_OK != err)
{
RBUSCORELOG_ERROR("Event message doesn't contain an event name.");
rbusMessage_Release(msg);
return;
}

if(is_rbus_flag)
{
if(g_master_event_callback)
{
err = g_master_event_callback(sender, event_name, msg, g_master_event_user_data);
if(err != RBUSCORE_ERROR_EVENT_NOT_HANDLED)
{
rbusMessage_Release(msg);
return;
}
}
else
{
RBUSCORELOG_ERROR("Received rbus event but no master callback registered yet.");
}
}

lock();
subs_len = rtVector_Size(g_event_subscriptions_for_client);
for(i = 0; i < subs_len; ++i)
{
client_subscription_t sub = rtVector_At(g_event_subscriptions_for_client, i);

if( strncmp(sub->object, sender, MAX_OBJECT_NAME_LENGTH) == 0 ||
strncmp(sub->object, event_name, MAX_OBJECT_NAME_LENGTH) == 0 ) /* support rbus events being elements : the object name will be the event name */
{
client_event_t evt = rtVector_Find(sub->events, event_name, client_event_compare);

if(evt)
{
unlock();
evt->callback(sender, event_name, msg, evt->data);
rbusMessage_Release(msg);
return;
}
/* support rbus events being elements : keep searching */
}
}
/* If no matching objects exist in records. Create a new entry.*/
unlock();
rbusMessage_Release(msg);
RBUSCORELOG_WARN("Received event %s::%s for which no subscription exists.", sender, event_name);
return;
}

static rbusCoreError_t remove_subscription_callback(const char * object_name, const char * event_name)
{
/*using namespace rbus_client;*/
Expand Down Expand Up @@ -1515,6 +1597,13 @@ static rbusCoreError_t rbus_subscribeToEventInternal(const char * object_name,
if(NULL == event_name)
event_name = DEFAULT_EVENT;

if(false == g_run_event_client_dispatch)
{
RBUSCORELOG_DEBUG("Starting event dispatching.");
rtConnection_AddDefaultListener(g_connection, master_event_callback, NULL);
g_run_event_client_dispatch = true;
}

if(g_master_event_callback == NULL)
{
sub = rtVector_Find(g_event_subscriptions_for_client, object_name, client_subscription_compare);
Expand Down Expand Up @@ -1695,7 +1784,7 @@ rbusCoreError_t rbus_registerClientDisconnectHandler(rbus_client_disconnect_call
lock();
if(!g_advisory_listener_installed)
{
rtError err = rtConnection_AddListener(g_connection, RTMSG_ADVISORY_TOPIC, RBUS_ADVISORY_EXPRESSION_ID, &rtrouted_advisory_callback, g_connection);
rtError err = rtConnection_AddListenerWithId(g_connection, RTMSG_ADVISORY_TOPIC, RBUS_ADVISORY_EXPRESSION_ID, &rtrouted_advisory_callback, g_connection);
if(err == RT_OK)
{
RBUSCORELOG_DEBUG("Listening for advisory messages");
Expand All @@ -1718,32 +1807,31 @@ rbusCoreError_t rbus_unregisterClientDisconnectHandler()
lock();
if(g_advisory_listener_installed)
{
rtConnection_RemoveListener(g_connection, RTMSG_ADVISORY_TOPIC, RBUS_ADVISORY_EXPRESSION_ID);
rtConnection_RemoveListenerWithId(g_connection, RTMSG_ADVISORY_TOPIC, RBUS_ADVISORY_EXPRESSION_ID);
g_advisory_listener_installed = false;
}
unlock();
return RBUSCORE_SUCCESS;
}

rbusCoreError_t rbuscore_publishDirectSubscriberEvent(const char * event_name, const char* listener, const void* data, uint32_t dataLength, uint32_t subscriptionId)
rbusCoreError_t rbuscore_publishDirectSubscriberEvent(const char * event_name, const char* listener, const void* data, uint32_t dataLength, uint32_t subscriptionId, bool rawData)
{
rtError err = RT_OK;

directServerLock();
const rtPrivateClientInfo *pPrivCliInfo = _rbuscore_find_server_privateconnection (event_name, listener);
if(pPrivCliInfo)
{
err = rtRouteDirect_SendMessage (pPrivCliInfo, data, dataLength, (char*)event_name, subscriptionId);
err = rtRouteDirect_SendMessage (pPrivCliInfo, data, dataLength, subscriptionId, rawData);
}
directServerUnlock();
return translate_rt_error(err);
}

rbusCoreError_t rbus_publishSubscriberEvent(const char* object_name, const char * event_name, const char* listener, rbusMessage out, uint32_t subscriptionId)
rbusCoreError_t rbus_publishSubscriberEvent(const char* object_name, const char * event_name, const char* listener, rbusMessage out, uint32_t subscriptionId, bool rawData)
{
/*using namespace rbus_server;*/
rbusCoreError_t ret = RBUSCORE_SUCCESS;
char topic[MAX_OBJECT_NAME_LENGTH] = {0};

if(NULL == event_name)
event_name = DEFAULT_EVENT;
Expand All @@ -1765,15 +1853,12 @@ rbusCoreError_t rbus_publishSubscriberEvent(const char* object_name, const char
uint8_t* data;
uint32_t dataLength;
rbusMessage_ToBytes(out, &data, &dataLength);
rtRouteDirect_SendMessage (pPrivCliInfo, data, dataLength, (char*)event_name, subscriptionId);
rtRouteDirect_SendMessage (pPrivCliInfo, data, dataLength, subscriptionId, rawData);
}
directServerUnlock();
if (!pPrivCliInfo)
{
lock();
snprintf(topic, MAX_OBJECT_NAME_LENGTH, "%d.%s", subscriptionId ,event_name);
if(topic[strlen(topic) - 1] == '.')
topic[strlen(topic) - 1] = '\0';
server_object_t obj = get_object(object_name);
if(NULL == obj)
{
Expand All @@ -1782,7 +1867,7 @@ rbusCoreError_t rbus_publishSubscriberEvent(const char* object_name, const char
ret = RBUSCORE_ERROR_INVALID_PARAM;
}

if(rbus_sendMessage(out, topic, object_name) != RBUSCORE_SUCCESS)
if(rbus_sendMessage(out, listener, object_name) != RBUSCORE_SUCCESS)
{
RBUSCORELOG_ERROR("Couldn't send event %s::%s to %s.", object_name, event_name, listener);
}
Expand Down Expand Up @@ -2888,6 +2973,7 @@ rbusCoreError_t rbuscore_openPrivateConnectionToProvider(rtConnection *pPrivateC
}
*pPrivateConn = connection;

rtConnection_AddDefaultListener(connection, master_event_callback, NULL);
RBUSCORELOG_DEBUG("pPrivateConn new = %p", connection);
rtMessage_Release(config);
}
Expand Down
Loading

0 comments on commit f42edd1

Please sign in to comment.