Skip to content

Commit

Permalink
Removed rbus_config files
Browse files Browse the repository at this point in the history
  • Loading branch information
NetajiPanigrahi committed Jan 16, 2025
1 parent f32d5b2 commit a5151b9
Show file tree
Hide file tree
Showing 12 changed files with 297 additions and 389 deletions.
57 changes: 40 additions & 17 deletions include/rbus.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,23 +131,7 @@ typedef enum _rbusError
RBUS_ERROR_DIRECT_CON_NOT_EXIST /**< Direct connection not exist */
} rbusError_t;

int rbusConfig_Init(rbusHandle_t handle);
int rbusConfig_Destroy(rbusHandle_t handle);
int rbusConfig_ReadGetTimeout(rbusHandle_t handle);
int rbusConfig_ReadWildcardGetTimeout(rbusHandle_t handle);
int rbusConfig_ReadSetTimeout(rbusHandle_t handle);
int rbusConfig_ReadSetMultiTimeout(rbusHandle_t handle);
int rbusConfig_ReadValueChangePeriod(rbusHandle_t handle);
int rbusConfig_ReadSubscribeTimeout(rbusHandle_t handle);
int rbusConfig_ReadSubscribeMaxWait(rbusHandle_t handle);

int rbusConfig_UpdateSetTimeout(rbusHandle_t handle,int timeout);
int rbusConfig_UpdateGetTimeout(rbusHandle_t handle,int timeout);
int rbusConfig_UpdateWildcardGetTimeout(rbusHandle_t handle, int timeout);
int rbusConfig_UpdateSetMultiTimeout(rbusHandle_t handle, int timeout);
int rbusConfig_UpdatevValueChangePeriod(rbusHandle_t handle, int period);
int rbusConfig_UpdateSubscribeTimeout(rbusHandle_t handle, int timeout);
int rbusConfig_UpdateSubscribeMaxWait(rbusHandle_t handle, int timeout);


char const * rbusError_ToString(rbusError_t e);

Expand Down Expand Up @@ -2007,6 +1991,45 @@ rbusError_t rbus_registerDynamicTableSyncHandler(
rbusError_t rbus_configureTimeoutValues(rbusHandle_t handle, int set_timeout,
int get_timeout, int setMulti_timeout, int getWildcard_timeout, int VCPeriod);

/** @fn int rbusHandle_ConfigSetTimeout(rbusHandle_t handle,int timeout)
* @brief function to update SET Timeout value.
*
* @param handle The Bus handle.
* @param timeout The Timeout value for rbus_set operation in milliseconds,
* set to default value if timeout is Zero.
* @return -1 or 0 0 on Success, -1 on failed.
*/
int rbusHandle_ConfigSetTimeout(rbusHandle_t handle,int timeout);

/** @fn int rbusHandle_ConfigGetTimeout(rbusHandle_t handle,int timeout)
* @brief function to update GET Timeout value.
*
* @param handle The Bus handle.
* @param timeout The Timeout value for rbus_get operation in milliseconds,
* set to default value if timeout is Zero.
* @return -1 or 0 0 on Success, -1 on failed.
*/
int rbusHandle_ConfigGetTimeout(rbusHandle_t handle,int timeout);

/** @fn int rbusHandle_ConfigWildcardGetTimeout(rbusHandle_t handle,int timeout)
* @brief function to update GET Wildcard query Timeout value.
*
* @param handle The Bus handle.
* @param timeout The Timeout value for rbus_get operation in milliseconds,
* set to default value if timeout is Zero.
* @return -1 or 0 0 on Success, -1 on failed.
*/
int rbusHandle_ConfigWildcardGetTimeout(rbusHandle_t handle, int timeout);

/** @fn int rbusHandle_ConfigSetMultiTimeout(rbusHandle_t handle,int timeout)
* @brief function to update SetMulti Timeout value.
*
* @param handle The Bus handle.
* @param timeout The Timeout value for rbus_setMulti operation in milliseconds,
* set to default value if timeout is Zero.
* @return -1 or 0 0 on Success, -1 on failed.
*/
int rbusHandle_ConfigSetMultiTimeout(rbusHandle_t handle, int timeout);
/** @} */

#ifdef __cplusplus
Expand Down
3 changes: 1 addition & 2 deletions src/rbus/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@ add_library(
rbus_subscriptions.c
rbus_tokenchain.c
rbus_asyncsubscribe.c
rbus_intervalsubscription.c
rbus_config.c)
rbus_intervalsubscription.c)

target_link_libraries(rbus rbuscore rtMessage -fPIC -pthread)

Expand Down
41 changes: 20 additions & 21 deletions src/rbus/rbus.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
#include "rbus_subscriptions.h"
#include "rbus_asyncsubscribe.h"
#include "rbus_intervalsubscription.h"
#include "rbus_config.h"
#include "rbus_log.h"
#include "rbus_handle.h"
#include "rbus_message.h"
Expand Down Expand Up @@ -283,10 +282,10 @@ rbusError_t rbus_configureTimeoutValues(rbusHandle_t handle, int set_timeout,
int get_timeout, int setMulti_timeout, int getWildcard_timeout, int VCPeriod)
{
VERIFY_NULL(handle);
rbusConfig_UpdateSetTimeout(handle, set_timeout);
rbusConfig_UpdateGetTimeout(handle, get_timeout);
rbusConfig_UpdateSetMultiTimeout(handle, setMulti_timeout);
rbusConfig_UpdateWildcardGetTimeout(handle, getWildcard_timeout);
rbusHandle_ConfigSetTimeout(handle, set_timeout);
rbusHandle_ConfigGetTimeout(handle, get_timeout);
rbusHandle_ConfigSetMultiTimeout(handle, setMulti_timeout);
rbusHandle_ConfigWildcardGetTimeout(handle, getWildcard_timeout);
rbusConfig_UpdatevValueChangePeriod(handle, VCPeriod);
return RBUS_ERROR_SUCCESS;
}
Expand Down Expand Up @@ -2890,9 +2889,9 @@ rbusError_t rbus_open(rbusHandle_t* handle, char const* componentName)
RBUSLOG_ERROR("(%s): rbus_registerObj error %d", componentName, err);
goto exit_error2;
}
if (rbusConfig_Init(tmpHandle) != 0)
if (rbusHandle_ConfigInit(tmpHandle) != 0)
{
RBUSLOG_ERROR("(%s): rbusConfig_Init failed", componentName);
RBUSLOG_ERROR("(%s): rbusHandle_ConfigInit failed", componentName);
goto exit_error2;
}
tmpHandle->componentName = strdup(componentName);
Expand Down Expand Up @@ -3118,7 +3117,7 @@ rbusError_t rbus_close(rbusHandle_t handle)

componentName = handleInfo->componentName;
handleInfo->componentName=NULL;
rbusConfig_Destroy(handle);
rbusHamdle_ConfigDestroy(handle);
handleInfo->config = NULL;
ERROR_CHECK(pthread_mutex_destroy(&handleInfo->handle_eventSubsMutex));
ERROR_CHECK(pthread_mutex_destroy(&handleInfo->handle_subsMutex));
Expand Down Expand Up @@ -3387,7 +3386,7 @@ rbusError_t rbus_get(rbusHandle_t handle, char const* name, rbusValue_t* value)
if (NULL == myConn)
myConn = handleInfo->m_connection;

err = rbus_invokeRemoteMethod2(myConn, name, METHOD_GETPARAMETERVALUES, request, rbusConfig_ReadGetTimeout(handle), &response);
err = rbus_invokeRemoteMethod2(myConn, name, METHOD_GETPARAMETERVALUES, request, rbusHandle_ConfigGetTimeout(handle), &response);

if(err != RBUSCORE_SUCCESS)
{
Expand Down Expand Up @@ -3548,7 +3547,7 @@ rbusError_t rbus_getExt(rbusHandle_t handle, int paramCount, char const** pParam
rbusMessage_SetString(request, pParamNames[0]);
/* Invoke the method */
err = rbus_invokeRemoteMethod(destinations[i], METHOD_GETPARAMETERVALUES,
request, rbusConfig_ReadWildcardGetTimeout(handle), &response);
request, rbusHandle_ReadWildcardGetTimeout(handle), &response);

if(err != RBUSCORE_SUCCESS)
{
Expand Down Expand Up @@ -3711,7 +3710,7 @@ rbusError_t rbus_getExt(rbusHandle_t handle, int paramCount, char const** pParam
RBUSLOG_DEBUG("sending batch request with %d params to component %s", batchCount, componentName);
free(componentName);

if((err = rbus_invokeRemoteMethod(firstParamName, METHOD_GETPARAMETERVALUES, request, rbusConfig_ReadGetTimeout(handle), &response)) != RBUSCORE_SUCCESS)
if((err = rbus_invokeRemoteMethod(firstParamName, METHOD_GETPARAMETERVALUES, request, rbusHandle_ConfigGetTimeout(handle), &response)) != RBUSCORE_SUCCESS)
{
RBUSLOG_ERROR("get by %s failed; Received error %d from RBUS Daemon for the object %s", handle->componentName, err, firstParamName);
errorcode = rbusCoreError_to_rbusError(err);
Expand Down Expand Up @@ -3865,7 +3864,7 @@ rbusError_t rbus_set(rbusHandle_t handle, char const* name,rbusValue_t value, rb
if (NULL == myConn)
myConn = handleInfo->m_connection;

if((err = rbus_invokeRemoteMethod2(myConn, name, METHOD_SETPARAMETERVALUES, setRequest, rbusConfig_ReadSetTimeout(handle), &setResponse)) != RBUSCORE_SUCCESS)
if((err = rbus_invokeRemoteMethod2(myConn, name, METHOD_SETPARAMETERVALUES, setRequest, rbusHandle_ReadSetTimeout(handle), &setResponse)) != RBUSCORE_SUCCESS)
{
RBUSLOG_ERROR("set by %s failed; Received error %d from RBUS Daemon for the object %s", handle->componentName, err, name);
errorcode = rbusCoreError_to_rbusError(err);
Expand Down Expand Up @@ -3933,7 +3932,7 @@ rbusError_t rbus_setCommit(rbusHandle_t handle, char const* name, rbusSetOptions
rtConnection myConn = rbuscore_FindClientPrivateConnection(name);
if (NULL == myConn)
myConn = handleInfo->m_connection;
if((err = rbus_invokeRemoteMethod2(myConn, name, METHOD_COMMIT, setRequest, rbusConfig_ReadSetTimeout(handle), &setResponse)) != RBUSCORE_SUCCESS)
if((err = rbus_invokeRemoteMethod2(myConn, name, METHOD_COMMIT, setRequest, rbusHandle_ReadSetTimeout(handle), &setResponse)) != RBUSCORE_SUCCESS)
{
RBUSLOG_ERROR("set commit by %s failed; Received error %d from RBUS Daemon for the object %s", handle->componentName, err, name);
errorcode = rbusCoreError_to_rbusError(err);
Expand Down Expand Up @@ -4104,7 +4103,7 @@ rbusError_t rbus_setMulti(rbusHandle_t handle, int numProps, rbusProperty_t prop
/* Set the Commit value; FIXME: Should we use string? */
rbusMessage_SetString(setRequest, (!opts || opts->commit) ? "TRUE" : "FALSE");

if((err = rbus_invokeRemoteMethod(firstParamName, METHOD_SETPARAMETERVALUES, setRequest, rbusConfig_ReadSetTimeout(handle), &setResponse)) != RBUSCORE_SUCCESS)
if((err = rbus_invokeRemoteMethod(firstParamName, METHOD_SETPARAMETERVALUES, setRequest, rbusHandle_ReadSetTimeout(handle), &setResponse)) != RBUSCORE_SUCCESS)
{
RBUSLOG_ERROR("set by %s failed; Received error %d from RBUS Daemon for the object %s", handle->componentName, err, firstParamName);
errorcode = rbusCoreError_to_rbusError(err);
Expand Down Expand Up @@ -4271,7 +4270,7 @@ rbusError_t rbusTable_addRow(
because the broker simlpy looks at the top level nodes that are owned by a component route. maybe this breaks if the broker changes*/
METHOD_ADDTBLROW,
request,
rbusConfig_ReadSetTimeout(handle),
rbusHandle_ReadSetTimeout(handle),
&response)) != RBUSCORE_SUCCESS)
{
RBUSLOG_ERROR("Add row by %s failed; Received error %d from RBUS Daemon for the object %s", handle->componentName, err, tableName);
Expand Down Expand Up @@ -4337,7 +4336,7 @@ rbusError_t rbusTable_removeRow(
rowName,
METHOD_DELETETBLROW,
request,
rbusConfig_ReadSetTimeout(handle),
rbusHandle_ReadSetTimeout(handle),
&response)) != RBUSCORE_SUCCESS)
{
RBUSLOG_ERROR("Delete row by %s failed; Received error %d from RBUS Daemon for the object %s", handle->componentName, err, rowName);
Expand Down Expand Up @@ -4465,7 +4464,7 @@ rbusError_t rbusTable_getRowNames(
if (NULL == myConn)
myConn = handleInfo->m_connection;

if((err = rbus_invokeRemoteMethod2(myConn, tableName, METHOD_GETPARAMETERNAMES, request, rbusConfig_ReadGetTimeout(handle), &response)) == RBUSCORE_SUCCESS)
if((err = rbus_invokeRemoteMethod2(myConn, tableName, METHOD_GETPARAMETERNAMES, request, rbusHandle_ConfigGetTimeout(handle), &response)) == RBUSCORE_SUCCESS)
{
rbusLegacyReturn_t legacyRetCode = RBUS_LEGACY_ERR_FAILURE;
int ret = -1;
Expand Down Expand Up @@ -4605,7 +4604,7 @@ rbusError_t rbusElementInfo_get(
rbusMessage_SetInt32(request, depth);/*depth*/
rbusMessage_SetInt32(request, 0);/*not row names*/

if((err = rbus_invokeRemoteMethod(destinations[d], METHOD_GETPARAMETERNAMES, request, rbusConfig_ReadGetTimeout(handle), &response)) != RBUSCORE_SUCCESS)
if((err = rbus_invokeRemoteMethod(destinations[d], METHOD_GETPARAMETERNAMES, request, rbusHandle_ConfigGetTimeout(handle), &response)) != RBUSCORE_SUCCESS)
{
RBUSLOG_ERROR("invokeRemoteMethod %s destination=%s object=%s failed: err=%d", METHOD_GETPARAMETERNAMES, destinations[d], elemName, err);
errorcode = rbusCoreError_to_rbusError(err);
Expand Down Expand Up @@ -4836,7 +4835,7 @@ static rbusError_t rbusEvent_SubscribeWithRetries(

if(timeout == -1)
{
destNotFoundTimeout = rbusConfig_ReadSubscribeTimeout(handleInfo);
destNotFoundTimeout = RBUS_SUBSCRIBE_TIMEOUT;
}
else
{
Expand Down Expand Up @@ -5897,7 +5896,7 @@ rbusError_t rbusMethod_Invoke(
if (handleInfo->m_handleType != RBUS_HWDL_TYPE_REGULAR)
return RBUS_ERROR_INVALID_HANDLE;

return rbusMethod_InvokeInternal(handle, methodName, inParams, outParams, rbusConfig_ReadSetTimeout(handle));
return rbusMethod_InvokeInternal(handle, methodName, inParams, outParams, rbusHandle_ReadSetTimeout(handle));
}

typedef struct _rbusMethodInvokeAsyncData_t
Expand Down Expand Up @@ -5960,7 +5959,7 @@ rbusError_t rbusMethod_InvokeAsync(
data->methodName = strdup(methodName);
data->inParams = inParams;
data->callback = callback;
data->timeout = timeout > 0 ? (timeout * 1000) : rbusConfig_ReadSetTimeout(handle); /* convert seconds to milliseconds */
data->timeout = timeout > 0 ? (timeout * 1000) : rbusHandle_ReadSetTimeout(handle); /* convert seconds to milliseconds */

if((err = pthread_create(&pid, NULL, rbusMethod_InvokeAsyncThreadFunc, data)) != 0)
{
Expand Down
27 changes: 14 additions & 13 deletions src/rbus/rbus_asyncsubscribe.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
#define _GNU_SOURCE 1
#include "rbus_asyncsubscribe.h"
#include "rbus_config.h"
#include "rbus_log.h"
#include <rtTime.h>
#include <rtList.h>
Expand All @@ -40,6 +39,8 @@
#define LOCK() ERROR_CHECK(pthread_mutex_lock(&gRetrier->mutexQueue))
#define UNLOCK() ERROR_CHECK(pthread_mutex_unlock(&gRetrier->mutexQueue))

#define RBUS_SUBSCRIBE_MAXWAIT 60000 /*subscribe retry max wait between retries in miliseconds*/

/*defined in rbus.c*/
void _subscribe_async_callback_handler(rbusHandle_t handle, rbusEventSubscription_t* subscription, rbusError_t error, uint32_t subscriptionId);
int _event_callback_handler(char const* objectName, char const* eventName, rbusMessage message, void* userData);
Expand Down Expand Up @@ -102,15 +103,15 @@ static int rbusAsyncSubscribeRetrier_CompareSubscription(const void *pitem, cons
return 1;
}

static int rbusAsyncSubscribeRetrier_DetermineNextSendTime(rbusHandle_t handle, rtTime_t* nextSendTime)
static int rbusAsyncSubscribeRetrier_DetermineNextSendTime(rtTime_t* nextSendTime)
{
rtTime_t now;
rtListItem li;
char tbuff[200];

//find the earliest nextRetryTime using nextSendTime to compare and store
rtTime_Now(&now);
rtTime_Later(&now, rbusConfig_ReadSubscribeMaxWait(handle) + 1000, nextSendTime);
rtTime_Later(&now, RBUS_SUBSCRIBE_MAXWAIT + 1000, nextSendTime);

RBUSLOG_DEBUG("now=%s", rtTime_ToString(&now, tbuff));

Expand Down Expand Up @@ -190,33 +191,33 @@ static void rbusAsyncSubscribeRetrier_SendSubscriptionRequests()
elapsed = rtTime_Elapsed(&item->startTime, &now);

if(coreerr == RBUSCORE_ERROR_ENTRY_NOT_FOUND && /*the only error that means provider not found yet*/
elapsed < rbusConfig_ReadSubscribeTimeout(item->subscription->handle)) /*if we haven't timeout out yet*/
elapsed < RBUS_SUBSCRIBE_TIMEOUT) /*if we haven't timeout out yet*/
{
if(item->nextWaitTime == 0)
item->nextWaitTime = 1000; //miliseconds
else
item->nextWaitTime *= 2;//just double the time

//apply a limit to our doubling
if(item->nextWaitTime > rbusConfig_ReadSubscribeMaxWait(item->subscription->handle))
item->nextWaitTime = rbusConfig_ReadSubscribeMaxWait(item->subscription->handle);
if(item->nextWaitTime > RBUS_SUBSCRIBE_MAXWAIT)
item->nextWaitTime = RBUS_SUBSCRIBE_MAXWAIT;

//update nextRetryTime to nextWaitTime miliseconds from now, without exceeding subscribeTimeout
if(elapsed + item->nextWaitTime < rbusConfig_ReadSubscribeTimeout(item->subscription->handle))
if(elapsed + item->nextWaitTime < RBUS_SUBSCRIBE_TIMEOUT)
{
rtTime_Later(&now, item->nextWaitTime, &item->nextRetryTime);
}
else
{
//its possible to have the odd situation, based on how subscribeTimeout/subscribeMaxWait are configured,
//where this final retry happens very close to the previous retry (e.g. ... wait 60, sub, wait 60, sub, wait 1, sub)
rtTime_Later(&item->startTime, rbusConfig_ReadSubscribeTimeout(item->subscription->handle), &item->nextRetryTime);
rtTime_Later(&item->startTime, RBUS_SUBSCRIBE_TIMEOUT, &item->nextRetryTime);
}

RBUSLOG_INFO("%s no provider. retry in %d ms with %d left",
item->subscription->eventName,
rtTime_Elapsed(&now, &item->nextRetryTime),
rbusConfig_ReadSubscribeTimeout(item->subscription->handle) - elapsed );
RBUS_SUBSCRIBE_TIMEOUT- elapsed );
}
else
{
Expand Down Expand Up @@ -273,13 +274,13 @@ static void rbusAsyncSubscribeRetrier_SendSubscriptionRequests()

static void* AsyncSubscribeRetrier_threadFunc(void* data)
{
rbusHandle_t handle = (rbusHandle_t)data;
(void)data;
LOCK();
while(gRetrier->isRunning)
{
rtTime_t nextSendTime;

while(rbusAsyncSubscribeRetrier_DetermineNextSendTime(handle, &nextSendTime))
while(rbusAsyncSubscribeRetrier_DetermineNextSendTime(&nextSendTime))
{
UNLOCK();
rbusAsyncSubscribeRetrier_SendSubscriptionRequests();
Expand Down Expand Up @@ -311,7 +312,7 @@ static void* AsyncSubscribeRetrier_threadFunc(void* data)
return NULL;
}

static void rbusAsyncSubscribeRetrier_Create(rbusHandle_t handle)
static void rbusAsyncSubscribeRetrier_Create()
{
pthread_mutexattr_t mattrib;
pthread_condattr_t cattrib;
Expand Down Expand Up @@ -368,7 +369,7 @@ void rbusAsyncSubscribe_AddSubscription(rbusEventSubscription_t* subscription, r

if(!gRetrier)
{
rbusAsyncSubscribeRetrier_Create((rbusHandle_t)subscription->handle);
rbusAsyncSubscribeRetrier_Create();
}

AsyncSubscription_t* item = rt_malloc(sizeof(struct AsyncSubscription_t));
Expand Down
Loading

0 comments on commit a5151b9

Please sign in to comment.