Skip to content

Commit

Permalink
RDKB-50101:RBUS Event Producer can have multiple instances of the sam…
Browse files Browse the repository at this point in the history
…e Subscriber (#172)

Reason for change: While recovering from a crash, the provider is unaware of a valid rbus_open connection of consumer,so it lead to duplicate entres of subscriptions. Before loading old subscriptions from the cache file, a "/tmp/.rbus/<ConsumerPID>_<ComponentId>" file existence check is now performed.

Signed-off-by: Netaji Panigrahi [email protected]

Signed-off-by: Netaji Panigrahi [email protected]
  • Loading branch information
NetajiPanigrahi authored Oct 21, 2023
1 parent 6508765 commit bb53bef
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 0 deletions.
13 changes: 13 additions & 0 deletions src/rbus/rbus.c
Original file line number Diff line number Diff line change
Expand Up @@ -2701,6 +2701,7 @@ rbusError_t rbus_open(rbusHandle_t* handle, char const* componentName)
rbusHandle_t tmpHandle = NULL;
static int32_t sLastComponentId = 0;
pthread_mutexattr_t attrib;
char filename[RTMSG_HEADER_MAX_TOPIC_LENGTH];

if(!handle || !componentName)
{
Expand Down Expand Up @@ -2782,6 +2783,15 @@ rbusError_t rbus_open(rbusHandle_t* handle, char const* componentName)

RBUSLOG_INFO("%s(%s) success", __FUNCTION__, componentName);

snprintf(filename, RTMSG_HEADER_MAX_TOPIC_LENGTH-1, "%s%d_%d", "/tmp/.rbus/", getpid(), tmpHandle->componentId);
FILE *fd = fopen(filename, "w");
if (fd)
{
RBUSLOG_DEBUG("%s(%s): %s File created successfully", __FUNCTION__, componentName, filename);
fclose(fd);
}

RBUSLOG_INFO("%s(%s) success", __FUNCTION__, componentName);
return RBUS_ERROR_SUCCESS;

if((err = rbus_unregisterObj(componentName)) != RBUSCORE_SUCCESS)
Expand Down Expand Up @@ -2910,6 +2920,9 @@ rbusError_t rbus_close(rbusHandle_t handle)
RBUSLOG_INFO("%s(%s)", __FUNCTION__, handleInfo->componentName);

LockMutex();
char filename[RTMSG_HEADER_MAX_TOPIC_LENGTH];
snprintf(filename, RTMSG_HEADER_MAX_TOPIC_LENGTH-1, "%s%d_%d", "/tmp/.rbus/", getpid(), handleInfo->componentId);
remove(filename);

if(handleInfo->eventSubs)
{
Expand Down
14 changes: 14 additions & 0 deletions src/rbus/rbus_subscriptions.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <sys/stat.h>
#include <sys/types.h>
#include <signal.h>
#include <unistd.h>

#define VERIFY_NULL(T) if(NULL == T){ return; }
#define CACHE_FILE_PATH_FORMAT "%s/rbus_subs_%s"
Expand Down Expand Up @@ -563,6 +564,19 @@ static void rbusSubscriptions_loadCache(rbusSubscriptions_t subscriptions)
needSave = true;
continue;
}
else
{
char filename[RTMSG_HEADER_MAX_TOPIC_LENGTH];
snprintf(filename, RTMSG_HEADER_MAX_TOPIC_LENGTH-1, "%s%d_%d", "/tmp/.rbus/",
rbusSubscriptions_getListenerPid(sub->listener), sub->componentId);
if(access(filename, F_OK) != 0)
{
subscriptionFree(sub);
needSave = true;
continue;
RBUSLOG_DEBUG("%s: file doesn't exist %s", __FUNCTION__, filename);
}
}

rtList_Create(&sub->instances);
rtList_PushBack(subscriptions->subList, sub, NULL);
Expand Down
1 change: 1 addition & 0 deletions src/rtmessage/rtrouted.c
Original file line number Diff line number Diff line change
Expand Up @@ -1774,6 +1774,7 @@ int main(int argc, char* argv[])
exit(12);
}

mkdir("/tmp/.rbus", 0700);
#ifdef ENABLE_RDKLOGGER
rdk_logger_init("/etc/debug.ini");
#endif
Expand Down

0 comments on commit bb53bef

Please sign in to comment.