Skip to content

Commit

Permalink
Merge branch 'rawdata_opendirect' of github.com:gururaajar/rbus into …
Browse files Browse the repository at this point in the history
…rawdata_opendirect
  • Loading branch information
Gururaaja E S R authored and Gururaaja E S R committed Oct 29, 2023
2 parents ecea898 + 6cbe1cb commit 5cf7048
Showing 1 changed file with 22 additions and 4 deletions.
26 changes: 22 additions & 4 deletions src/rbus/rbus_subscriptions.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ struct _rbusSubscriptions
static void rbusSubscriptions_loadCache(rbusSubscriptions_t subscriptions);
static void rbusSubscriptions_saveCache(rbusSubscriptions_t subscriptions);

int subscribeHandlerImpl(rbusHandle_t handle, bool added, elementNode* el, char const* eventName, char const* listener, int32_t componentId, int32_t interval, int32_t duration, rbusFilter_t filter);
int subscribeHandlerImpl(rbusHandle_t handle, bool added, elementNode* el, char const* eventName, char const* listener, int32_t componentId, int32_t interval, int32_t duration, rbusFilter_t filter, int rawData, uint32_t **subscriptionId);

static int subscriptionKeyCompare(rbusSubscription_t* subscription, char const* listener, int32_t componentId, char const* eventName, rbusFilter_t filter, int32_t interval, int32_t duration, bool rawData)
{
Expand Down Expand Up @@ -543,6 +543,18 @@ static void rbusSubscriptions_loadCache(rbusSubscriptions_t subscriptions)
if(type != RBUS_INT32 && length != sizeof(int32_t)) goto remove_bad_file;
if(rbusBuffer_ReadInt32(buff, (int*)&sub->autoPublish) < 0) goto remove_bad_file;

//read subscriptionId
if(rbusBuffer_ReadUInt16(buff, &type) < 0) goto remove_bad_file;
if(rbusBuffer_ReadUInt16(buff, &length) < 0) goto remove_bad_file;
if(type != RBUS_INT32 && length != sizeof(int32_t)) goto remove_bad_file;
if(rbusBuffer_ReadInt32(buff, (int*)&sub->subscriptionId) < 0) goto remove_bad_file;

//read rawData
if(rbusBuffer_ReadUInt16(buff, &type) < 0) goto remove_bad_file;
if(rbusBuffer_ReadUInt16(buff, &length) < 0) goto remove_bad_file;
if(type != RBUS_INT32 && length != sizeof(int32_t)) goto remove_bad_file;
if(rbusBuffer_ReadInt32(buff, (int*)&sub->rawData) < 0) goto remove_bad_file;

//read hasFilter
if(rbusBuffer_ReadUInt16(buff, &type) < 0) goto remove_bad_file;
if(rbusBuffer_ReadUInt16(buff, &length) < 0) goto remove_bad_file;
Expand Down Expand Up @@ -660,6 +672,8 @@ static void rbusSubscriptions_saveCache(rbusSubscriptions_t subscriptions)
rbusBuffer_WriteInt32TLV(buff, sub->interval);
rbusBuffer_WriteInt32TLV(buff, sub->duration);
rbusBuffer_WriteInt32TLV(buff, sub->autoPublish);
rbusBuffer_WriteInt32TLV(buff, sub->subscriptionId);
rbusBuffer_WriteInt32TLV(buff, sub->rawData);
rbusBuffer_WriteInt32TLV(buff, sub->filter ? 1 : 0);
if(sub->filter)
rbusFilter_Encode(sub->filter, buff);
Expand Down Expand Up @@ -726,6 +740,7 @@ void rbusSubscriptions_resubscribeElementCache(rbusHandle_t handle, rbusSubscrip
{
rtListItem item;
rbusSubscription_t* sub;
uint32_t* subscriptionId;

VERIFY_NULL(subscriptions);
VERIFY_NULL(el);
Expand All @@ -747,7 +762,8 @@ void rbusSubscriptions_resubscribeElementCache(rbusHandle_t handle, rbusSubscrip
RBUSLOG_INFO("%s: resubscribing %s for %s", __FUNCTION__, sub->eventName, sub->listener);
rtListItem_GetNext(item, &next);
rtList_RemoveItem(subscriptions->subList, item, NULL);/*remove before calling subscribeHandlerImpl to avoid dupes in cache file*/
err = subscribeHandlerImpl(handle, true, el, sub->eventName, sub->listener, sub->componentId, sub->interval, sub->duration, sub->filter);
subscriptionId = &sub->subscriptionId;
err = subscribeHandlerImpl(handle, true, el, sub->eventName, sub->listener, sub->componentId, sub->interval, sub->duration, sub->filter, sub->rawData, &subscriptionId);
/*TODO figure out what to do if we get an error resubscribing
It's conceivable that a provider might not like the sub due to some state change between this and the previous process run
*/
Expand All @@ -770,6 +786,7 @@ void rbusSubscriptions_resubscribeRowElementCache(rbusHandle_t handle, rbusSubsc
rbusError_t err = RBUS_ERROR_SUCCESS;
size_t size = 0;
unsigned int count = 0;
uint32_t *subscriptionId;
struct _rbusHandle* handleInfo = (struct _rbusHandle*)handle;

if(subscriptions)
Expand All @@ -794,7 +811,8 @@ void rbusSubscriptions_resubscribeRowElementCache(rbusHandle_t handle, rbusSubsc
{
RBUSLOG_INFO("%s: resubscribing %s for %s", __FUNCTION__, sub->eventName, sub->listener);
rtList_RemoveItem(subscriptions->subList, item, NULL);
err = subscribeHandlerImpl(handle, true, el, sub->eventName, sub->listener, sub->componentId, sub->interval, sub->duration, sub->filter);
subscriptionId = &sub->subscriptionId;
err = subscribeHandlerImpl(handle, true, el, sub->eventName, sub->listener, sub->componentId, sub->interval, sub->duration, sub->filter, sub->rawData, &subscriptionId);
(void)err;
subscriptionFree(sub);
}
Expand Down Expand Up @@ -830,7 +848,7 @@ void rbusSubscriptions_handleClientDisconnect(rbusHandle_t handle, rbusSubscript
el = retrieveInstanceElement(handleInfo->elementRoot, sub->eventName);
if(el)
{
subscribeHandlerImpl(handle, false, sub->element, sub->eventName, sub->listener, sub->componentId, 0, 0, 0);
subscribeHandlerImpl(handle, false, sub->element, sub->eventName, sub->listener, sub->componentId, 0, 0, 0, 0, 0);
}
else
{
Expand Down

0 comments on commit 5cf7048

Please sign in to comment.