Skip to content

Commit

Permalink
Merge branch 'Opendirect_rawdata' of github.com:gururaajar/rbus into …
Browse files Browse the repository at this point in the history
…Opendirect_rawdata
  • Loading branch information
Gururaaja E S R authored and Gururaaja E S R committed Nov 26, 2023
2 parents 9645d41 + 32b62e9 commit 0387070
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 15 deletions.
7 changes: 4 additions & 3 deletions src/rbus/rbus.c
Original file line number Diff line number Diff line change
Expand Up @@ -1068,7 +1068,8 @@ int subscribeHandlerImpl(
int32_t duration,
rbusFilter_t filter,
int rawData,
uint32_t* subscriptionId)
uint32_t* subscriptionId,
int resubscribe)
{
int error = RBUS_ERROR_SUCCESS;
rbusSubscription_t* subscription = NULL;
Expand Down Expand Up @@ -1137,7 +1138,7 @@ int subscribeHandlerImpl(
subscription = rbusSubscriptions_getSubscription(handleInfo->subscriptions, listener, eventName, componentId, filter, interval, duration, rawData);
if(!subscription)
{
subscription = rbusSubscriptions_addSubscription(handleInfo->subscriptions, listener, eventName, componentId, filter, interval, duration, autoPublish, el, rawData);
subscription = rbusSubscriptions_addSubscription(handleInfo->subscriptions, listener, eventName, componentId, filter, interval, duration, autoPublish, el, rawData, resubscribe, *subscriptionId);
if(!subscription)
{
HANDLE_SUBS_MUTEX_UNLOCK(handle);
Expand Down Expand Up @@ -2539,7 +2540,7 @@ static void _subscribe_callback_handler (rbusHandle_t handle, rbusMessage reques
rbusMessage_GetInt32(request, &publishOnSubscribe);
rbusMessage_GetInt32(request, &rawData);
if(ret == RBUS_ERROR_SUCCESS)
ret = subscribeHandlerImpl(handle, added, el, event_name, sender, componentId, interval, duration, filter, rawData, &subscriptionId);
ret = subscribeHandlerImpl(handle, added, el, event_name, sender, componentId, interval, duration, filter, rawData, &subscriptionId, 0);
rbusMessage_SetInt32(*response, ret);

if(publishOnSubscribe && ret == RBUS_ERROR_SUCCESS)
Expand Down
58 changes: 47 additions & 11 deletions src/rbus/rbus_subscriptions.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,31 @@
#define VERIFY_NULL(T) if(NULL == T){ return; }
#define CACHE_FILE_PATH_FORMAT "%s/rbus_subs_%s"

#define HANDLE_SUBS_MUTEX_LOCK(HANDLE) \
{ \
int err; \
rbusHandle_t pTmp = (rbusHandle_t) HANDLE; \
if((err = pthread_mutex_lock(&pTmp->handle_subsMutex)) != 0) \
{ \
RBUSLOG_ERROR("Error @ mutex lock.. Err=%d:%s ", err, strerror(err)); \
} \
}

#define HANDLE_SUBS_MUTEX_UNLOCK(HANDLE) \
{ \
int err; \
rbusHandle_t pTmp = (rbusHandle_t) HANDLE; \
if((err = pthread_mutex_unlock(&pTmp->handle_subsMutex)) != 0) \
{ \
RBUSLOG_ERROR("Error @ mutex unlock.. Err=%d:%s ", err, strerror(err)); \
} \
}

static uint32_t gSubscriptionId = 4; /* Starting the subscription ID with 4 as the initial 3 values are allocated for the below add listener
rtconnection create internal
rbus register object
client advisory */

struct _rbusSubscriptions
{
rbusHandle_t handle;
Expand All @@ -43,7 +68,7 @@ struct _rbusSubscriptions
static void rbusSubscriptions_loadCache(rbusSubscriptions_t subscriptions);
static void rbusSubscriptions_saveCache(rbusSubscriptions_t subscriptions);

int subscribeHandlerImpl(rbusHandle_t handle, bool added, elementNode* el, char const* eventName, char const* listener, int32_t componentId, int32_t interval, int32_t duration, rbusFilter_t filter, int rawData, uint32_t *subscriptionId);
int subscribeHandlerImpl(rbusHandle_t handle, bool added, elementNode* el, char const* eventName, char const* listener, int32_t componentId, int32_t interval, int32_t duration, rbusFilter_t filter, int rawData, uint32_t *subscriptionId, int resubscribe);

static int subscriptionKeyCompare(rbusSubscription_t* subscription, char const* listener, int32_t componentId, char const* eventName, rbusFilter_t filter, int32_t interval, int32_t duration, bool rawData)
{
Expand Down Expand Up @@ -118,13 +143,9 @@ void rbusSubscriptions_destroy(rbusSubscriptions_t subscriptions)
static void rbusSubscriptions_onSubscriptionCreated(rbusSubscription_t* sub, elementNode* node);

/*add a new subscription*/
rbusSubscription_t* rbusSubscriptions_addSubscription(rbusSubscriptions_t subscriptions, char const* listener, char const* eventName, int32_t componentId, rbusFilter_t filter, int32_t interval, int32_t duration, bool autoPublish, elementNode* registryElem, bool rawData)
rbusSubscription_t* rbusSubscriptions_addSubscription(rbusSubscriptions_t subscriptions, char const* listener, char const* eventName, int32_t componentId, rbusFilter_t filter, int32_t interval, int32_t duration, bool autoPublish, elementNode* registryElem, bool rawData, int resubscribe, int subscriptionId)
{
rbusSubscription_t* sub;
static uint32_t subscriptionId = 4; /* Starting the subscription ID with 4 as the initial 3 values are allocated for the below add listener
rtconnection create internal
rbus register object
client advisory */
TokenChain* tokens;

RBUSLOG_DEBUG("adding %s %s", listener, eventName);
Expand Down Expand Up @@ -152,14 +173,20 @@ rbusSubscription_t* rbusSubscriptions_addSubscription(rbusSubscriptions_t subscr
sub->element = registryElem;
sub->tokens = tokens;
sub->rawData = rawData;
sub->subscriptionId = subscriptionId;
if(resubscribe)
{
sub->subscriptionId = subscriptionId;
gSubscriptionId = subscriptionId;
}
else
sub->subscriptionId = gSubscriptionId;
rtList_Create(&sub->instances);
rtList_PushBack(subscriptions->subList, sub, NULL);

rbusSubscriptions_onSubscriptionCreated(sub, subscriptions->root);

rbusSubscriptions_saveCache(subscriptions);
subscriptionId++;
gSubscriptionId++;

return sub;
}
Expand Down Expand Up @@ -761,7 +788,10 @@ void rbusSubscriptions_resubscribeElementCache(rbusHandle_t handle, rbusSubscrip
RBUSLOG_INFO("resubscribing %s for %s", sub->eventName, sub->listener);
rtListItem_GetNext(item, &next);
rtList_RemoveItem(subscriptions->subList, item, NULL);/*remove before calling subscribeHandlerImpl to avoid dupes in cache file*/
err = subscribeHandlerImpl(handle, true, el, sub->eventName, sub->listener, sub->componentId, sub->interval, sub->duration, sub->filter, sub->rawData, &sub->subscriptionId);
HANDLE_SUBS_MUTEX_UNLOCK(handle);/*unlocking here to avoid deadlock as rbusSubscriptions_resubscribeElementCache() is called after
locking mutex and same mutex is locked inside subscribeHandlerImpl()*/
err = subscribeHandlerImpl(handle, true, el, sub->eventName, sub->listener, sub->componentId, sub->interval, sub->duration, sub->filter, sub->rawData, &sub->subscriptionId, 1);
HANDLE_SUBS_MUTEX_LOCK(handle);
/*TODO figure out what to do if we get an error resubscribing
It's conceivable that a provider might not like the sub due to some state change between this and the previous process run
*/
Expand Down Expand Up @@ -808,7 +838,10 @@ void rbusSubscriptions_resubscribeRowElementCache(rbusHandle_t handle, rbusSubsc
{
RBUSLOG_INFO("resubscribing %s for %s", sub->eventName, sub->listener);
rtList_RemoveItem(subscriptions->subList, item, NULL);
err = subscribeHandlerImpl(handle, true, el, sub->eventName, sub->listener, sub->componentId, sub->interval, sub->duration, sub->filter, sub->rawData, &sub->subscriptionId);
HANDLE_SUBS_MUTEX_UNLOCK(handle);/*unlocking here to avoid deadlock as rbusSubscriptions_resubscribeElementCache() is called
after locking mutex and same mutex is locked inside subscribeHandlerImpl()*/
err = subscribeHandlerImpl(handle, true, el, sub->eventName, sub->listener, sub->componentId, sub->interval, sub->duration, sub->filter, sub->rawData, &sub->subscriptionId, 1);
HANDLE_SUBS_MUTEX_LOCK(handle);
(void)err;
subscriptionFree(sub);
}
Expand Down Expand Up @@ -844,7 +877,10 @@ void rbusSubscriptions_handleClientDisconnect(rbusHandle_t handle, rbusSubscript
el = retrieveInstanceElement(handleInfo->elementRoot, sub->eventName);
if(el)
{
subscribeHandlerImpl(handle, false, sub->element, sub->eventName, sub->listener, sub->componentId, 0, 0, 0, 0, 0);
HANDLE_SUBS_MUTEX_UNLOCK(handle);/*unlocking here to avoid deadlock as rbusSubscriptions_resubscribeElementCache() is called after
locking mutex and same mutex is locked inside subscribeHandlerImpl()*/
subscribeHandlerImpl(handle, false, sub->element, sub->eventName, sub->listener, sub->componentId, 0, 0, 0, 0, 0, 0);
HANDLE_SUBS_MUTEX_LOCK(handle);
}
else
{
Expand Down
2 changes: 1 addition & 1 deletion src/rbus/rbus_subscriptions.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ void rbusSubscriptions_create(rbusSubscriptions_t* subscriptions, rbusHandle_t h
void rbusSubscriptions_destroy(rbusSubscriptions_t subscriptions);

/*add a new subscription with unique key [listener, eventName, filter] and the corresponding*/
rbusSubscription_t* rbusSubscriptions_addSubscription(rbusSubscriptions_t subscriptions, char const* listener, char const* eventName, int32_t componentId, rbusFilter_t filter, int32_t interval, int32_t duration, bool autoPublish, elementNode* registryElem, bool rawData);
rbusSubscription_t* rbusSubscriptions_addSubscription(rbusSubscriptions_t subscriptions, char const* listener, char const* eventName, int32_t componentId, rbusFilter_t filter, int32_t interval, int32_t duration, bool autoPublish, elementNode* registryElem, bool rawData, int resubscribe, int subscriptionId);

/*get an existing subscription by searching for its unique key [listener, eventName, filter]*/
rbusSubscription_t* rbusSubscriptions_getSubscription(rbusSubscriptions_t subscriptions, char const* listener, char const* eventName, int32_t componentId, rbusFilter_t filter, int32_t interval, int32_t duration, bool rawData);
Expand Down

0 comments on commit 0387070

Please sign in to comment.