Skip to content

Commit

Permalink
Merge branch 'Opendirect_rawdata' of github.com:gururaajar/rbus into …
Browse files Browse the repository at this point in the history
…Opendirect_rawdata
  • Loading branch information
Gururaaja E S R authored and Gururaaja E S R committed Nov 25, 2023
2 parents cbe4779 + 8956f8b commit 9645d41
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 25 deletions.
18 changes: 4 additions & 14 deletions src/rbus/rbus.c
Original file line number Diff line number Diff line change
Expand Up @@ -1068,7 +1068,7 @@ int subscribeHandlerImpl(
int32_t duration,
rbusFilter_t filter,
int rawData,
uint64_t** subscriptionId)
uint32_t* subscriptionId)
{
int error = RBUS_ERROR_SUCCESS;
rbusSubscription_t* subscription = NULL;
Expand Down Expand Up @@ -1144,7 +1144,7 @@ int subscribeHandlerImpl(
return RBUS_ERROR_INVALID_INPUT; // Adding fails because of invalid input
}
else
**subscriptionId = subscription->subscriptionId;
*subscriptionId = subscription->subscriptionId;
}
else
{
Expand Down Expand Up @@ -1343,16 +1343,6 @@ static void unregisterTableRow (rbusHandle_t handle, elementNode* rowInstElem)
}
}
//******************************* CALLBACKS *************************************//
static int _event_subscribe_callback_handler(elementNode* el, char const* eventName, char const* listener, int added, int componentId, int interval, int duration, rbusFilter_t filter, void* userData, int rawData, uint64_t* subscriptionId)
{
rbusHandle_t handle = (rbusHandle_t)userData;
rbusCoreError_t err = RBUSCORE_SUCCESS;

RBUSLOG_DEBUG("event subscribe callback for [%s] event! and element of type %d", eventName, el->type);

err = subscribeHandlerImpl(handle, added, el, eventName, listener, componentId, interval, duration, filter, rawData, &subscriptionId);
return err;
}

static void _client_disconnect_callback_handler(const char * listener)
{
Expand Down Expand Up @@ -2493,7 +2483,7 @@ static void _subscribe_callback_handler (rbusHandle_t handle, rbusMessage reques
int32_t componentId = 0;
int32_t interval = 0;
int32_t duration = 0;
uint64_t subscriptionId = 0;
uint32_t subscriptionId = 0;
rbusFilter_t filter = NULL;
elementNode* el = NULL;
rbusError_t ret = RBUS_ERROR_SUCCESS;
Expand Down Expand Up @@ -2549,7 +2539,7 @@ static void _subscribe_callback_handler (rbusHandle_t handle, rbusMessage reques
rbusMessage_GetInt32(request, &publishOnSubscribe);
rbusMessage_GetInt32(request, &rawData);
if(ret == RBUS_ERROR_SUCCESS)
ret = _event_subscribe_callback_handler(el, event_name, sender, added, componentId, interval, duration, filter, handle, rawData, &subscriptionId);
ret = subscribeHandlerImpl(handle, added, el, event_name, sender, componentId, interval, duration, filter, rawData, &subscriptionId);
rbusMessage_SetInt32(*response, ret);

if(publishOnSubscribe && ret == RBUS_ERROR_SUCCESS)
Expand Down
12 changes: 8 additions & 4 deletions src/rbus/rbus_message.c
Original file line number Diff line number Diff line change
Expand Up @@ -277,11 +277,15 @@ int rbusMessage_HasListener(
for (i = 0, n = rtVector_Size(handle->messageCallbacks); i < n; ++i)
{
rbusMessageHandlerContext_t* ctx = rtVector_At(handle->messageCallbacks, i);
VERIFY_NULL(ctx);
if(!strcmp(ctx->expression, topic))
if(!ctx)
continue;
else
{
ret = 1;
break;
if(!strcmp(ctx->expression, topic))
{
ret = 1;
break;
}
}
}
RBUS_MESSAGE_MUTEX_UNLOCK();
Expand Down
10 changes: 3 additions & 7 deletions src/rbus/rbus_subscriptions.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ struct _rbusSubscriptions
static void rbusSubscriptions_loadCache(rbusSubscriptions_t subscriptions);
static void rbusSubscriptions_saveCache(rbusSubscriptions_t subscriptions);

int subscribeHandlerImpl(rbusHandle_t handle, bool added, elementNode* el, char const* eventName, char const* listener, int32_t componentId, int32_t interval, int32_t duration, rbusFilter_t filter, int rawData, uint32_t **subscriptionId);
int subscribeHandlerImpl(rbusHandle_t handle, bool added, elementNode* el, char const* eventName, char const* listener, int32_t componentId, int32_t interval, int32_t duration, rbusFilter_t filter, int rawData, uint32_t *subscriptionId);

static int subscriptionKeyCompare(rbusSubscription_t* subscription, char const* listener, int32_t componentId, char const* eventName, rbusFilter_t filter, int32_t interval, int32_t duration, bool rawData)
{
Expand Down Expand Up @@ -740,7 +740,6 @@ void rbusSubscriptions_resubscribeElementCache(rbusHandle_t handle, rbusSubscrip
{
rtListItem item;
rbusSubscription_t* sub;
uint32_t* subscriptionId;

VERIFY_NULL(subscriptions);
VERIFY_NULL(el);
Expand All @@ -762,8 +761,7 @@ void rbusSubscriptions_resubscribeElementCache(rbusHandle_t handle, rbusSubscrip
RBUSLOG_INFO("resubscribing %s for %s", sub->eventName, sub->listener);
rtListItem_GetNext(item, &next);
rtList_RemoveItem(subscriptions->subList, item, NULL);/*remove before calling subscribeHandlerImpl to avoid dupes in cache file*/
subscriptionId = &sub->subscriptionId;
err = subscribeHandlerImpl(handle, true, el, sub->eventName, sub->listener, sub->componentId, sub->interval, sub->duration, sub->filter, sub->rawData, &subscriptionId);
err = subscribeHandlerImpl(handle, true, el, sub->eventName, sub->listener, sub->componentId, sub->interval, sub->duration, sub->filter, sub->rawData, &sub->subscriptionId);
/*TODO figure out what to do if we get an error resubscribing
It's conceivable that a provider might not like the sub due to some state change between this and the previous process run
*/
Expand All @@ -786,7 +784,6 @@ void rbusSubscriptions_resubscribeRowElementCache(rbusHandle_t handle, rbusSubsc
rbusError_t err = RBUS_ERROR_SUCCESS;
size_t size = 0;
unsigned int count = 0;
uint32_t *subscriptionId;
struct _rbusHandle* handleInfo = (struct _rbusHandle*)handle;

if(subscriptions)
Expand All @@ -811,8 +808,7 @@ void rbusSubscriptions_resubscribeRowElementCache(rbusHandle_t handle, rbusSubsc
{
RBUSLOG_INFO("resubscribing %s for %s", sub->eventName, sub->listener);
rtList_RemoveItem(subscriptions->subList, item, NULL);
subscriptionId = &sub->subscriptionId;
err = subscribeHandlerImpl(handle, true, el, sub->eventName, sub->listener, sub->componentId, sub->interval, sub->duration, sub->filter, sub->rawData, &subscriptionId);
err = subscribeHandlerImpl(handle, true, el, sub->eventName, sub->listener, sub->componentId, sub->interval, sub->duration, sub->filter, sub->rawData, &sub->subscriptionId);
(void)err;
subscriptionFree(sub);
}
Expand Down

0 comments on commit 9645d41

Please sign in to comment.