diff --git a/src/rbus/rbus.c b/src/rbus/rbus.c index bcd250fd..bd333c22 100644 --- a/src/rbus/rbus.c +++ b/src/rbus/rbus.c @@ -60,26 +60,45 @@ #define LockMutex() pthread_mutex_lock(&gMutex) #define UnlockMutex() pthread_mutex_unlock(&gMutex) -#define HANDLE_MUTEX_LOCK(HANDLE) \ +#define HANDLE_EVENTSUBS_MUTEX_LOCK(HANDLE) \ { \ int err; \ rbusHandle_t pTmp = (rbusHandle_t) HANDLE; \ - if((err = pthread_mutex_lock(&pTmp->handleMutex)) != 0) \ + if((err = pthread_mutex_lock(&pTmp->handle_eventSubsMutex)) != 0) \ { \ RBUSLOG_ERROR("Error @ mutex lock.. Err=%d:%s ", err, strerror(err)); \ } \ } -#define HANDLE_MUTEX_UNLOCK(HANDLE) \ +#define HANDLE_EVENTSUBS_MUTEX_UNLOCK(HANDLE) \ { \ int err; \ rbusHandle_t pTmp = (rbusHandle_t) HANDLE; \ - if((err = pthread_mutex_unlock(&pTmp->handleMutex)) != 0) \ + if((err = pthread_mutex_unlock(&pTmp->handle_eventSubsMutex)) != 0) \ { \ RBUSLOG_ERROR("Error @ mutex unlock.. Err=%d:%s ", err, strerror(err)); \ } \ } +#define HANDLE_SUBS_MUTEX_LOCK(HANDLE) \ +{ \ + int err; \ + rbusHandle_t pTmp = (rbusHandle_t) HANDLE; \ + if((err = pthread_mutex_lock(&pTmp->handle_subsMutex)) != 0) \ + { \ + RBUSLOG_ERROR("Error @ mutex lock.. Err=%d:%s ", err, strerror(err)); \ + } \ +} + +#define HANDLE_SUBS_MUTEX_UNLOCK(HANDLE) \ +{ \ + int err; \ + rbusHandle_t pTmp = (rbusHandle_t) HANDLE; \ + if((err = pthread_mutex_unlock(&pTmp->handle_subsMutex)) != 0) \ + { \ + RBUSLOG_ERROR("Error @ mutex unlock.. Err=%d:%s ", err, strerror(err)); \ + } \ +} #define ERROR_CHECK(CMD) \ { \ int err; \ @@ -954,7 +973,7 @@ int subscribeHandlerImpl( RBUSLOG_INFO("Consumer=%s %s to event=%s", listener, added ? "SUBSCRIBED" : "UNSUBSCRIBED", eventName); - HANDLE_MUTEX_LOCK(handle); + HANDLE_SUBS_MUTEX_LOCK(handle); /* call the provider subHandler first to see if it overrides autoPublish */ if(el->cbTable.eventSubHandler) { @@ -972,7 +991,7 @@ int subscribeHandlerImpl( if(err != RBUS_ERROR_SUCCESS) { RBUSLOG_DEBUG("%s provider subHandler return err=%d", __FUNCTION__, err); - HANDLE_MUTEX_UNLOCK(handle); + HANDLE_SUBS_MUTEX_UNLOCK(handle); return err; } } @@ -987,7 +1006,7 @@ int subscribeHandlerImpl( if (interval && eventName[strlen(eventName)-1] == '.') { RBUSLOG_ERROR("rbus interval subscription not supported for this event %s\n", eventName); - HANDLE_MUTEX_UNLOCK(handle); + HANDLE_SUBS_MUTEX_UNLOCK(handle); return RBUS_ERROR_INVALID_OPERATION; } @@ -997,13 +1016,13 @@ int subscribeHandlerImpl( subscription = rbusSubscriptions_addSubscription(handleInfo->subscriptions, listener, eventName, componentId, filter, interval, duration, autoPublish, el); if(!subscription) { - HANDLE_MUTEX_UNLOCK(handle); + HANDLE_SUBS_MUTEX_UNLOCK(handle); return RBUS_ERROR_INVALID_INPUT; // Adding fails because of invalid input } } else { - HANDLE_MUTEX_UNLOCK(handle); + HANDLE_SUBS_MUTEX_UNLOCK(handle); return RBUS_ERROR_SUBSCRIPTION_ALREADY_EXIST; } } @@ -1014,7 +1033,7 @@ int subscribeHandlerImpl( if(!subscription) { RBUSLOG_INFO("unsubscribing from event which isn't currectly subscribed to event=%s listener=%s", eventName, listener); - HANDLE_MUTEX_UNLOCK(handle); + HANDLE_SUBS_MUTEX_UNLOCK(handle); return RBUS_ERROR_INVALID_INPUT; /*unsubscribing from event which isn't currectly subscribed to*/ } } @@ -1071,7 +1090,7 @@ int subscribeHandlerImpl( { rbusSubscriptions_removeSubscription(handleInfo->subscriptions, subscription); } - HANDLE_MUTEX_UNLOCK(handle); + HANDLE_SUBS_MUTEX_UNLOCK(handle); return RBUS_ERROR_SUCCESS; } @@ -1084,9 +1103,9 @@ static void registerTableRow (rbusHandle_t handle, elementNode* tableInstElem, c rowElem = instantiateTableRow(tableInstElem, instNum, aliasName); - HANDLE_MUTEX_LOCK(handle); + HANDLE_SUBS_MUTEX_LOCK(handle); rbusSubscriptions_onTableRowAdded(handleInfo->subscriptions, rowElem); - HANDLE_MUTEX_UNLOCK(handle); + HANDLE_SUBS_MUTEX_UNLOCK(handle); /*update ValueChange after rbusSubscriptions_onTableRowAdded */ valueChangeTableRowUpdate(handle, rowElem, true); @@ -1128,9 +1147,9 @@ static void registerTableRow (rbusHandle_t handle, elementNode* tableInstElem, c /* Re-subscribe all the child elements of this row */ if(handleInfo->subscriptions) { - HANDLE_MUTEX_LOCK(handle); + HANDLE_SUBS_MUTEX_LOCK(handle); rbusSubscriptions_resubscribeRowElementCache(handle, handleInfo->subscriptions, rowElem); - HANDLE_MUTEX_UNLOCK(handle); + HANDLE_SUBS_MUTEX_UNLOCK(handle); } rbusValue_Release(rowNameVal); rbusValue_Release(instNumVal); @@ -1150,9 +1169,9 @@ static void unregisterTableRow (rbusHandle_t handle, elementNode* rowInstElem) /*update ValueChange before rbusSubscriptions_onTableRowRemoved */ valueChangeTableRowUpdate(handle, rowInstElem, false); - HANDLE_MUTEX_LOCK(handle); + HANDLE_SUBS_MUTEX_LOCK(handle); rbusSubscriptions_onTableRowRemoved(handleInfo->subscriptions, rowInstElem); - HANDLE_MUTEX_UNLOCK(handle); + HANDLE_SUBS_MUTEX_UNLOCK(handle); deleteTableRow(rowInstElem); @@ -1219,9 +1238,9 @@ void _subscribe_async_callback_handler(rbusHandle_t handle, rbusEventSubscriptio rbusEventSubscriptionInternal_t* subInternal = rt_malloc(sizeof(rbusEventSubscriptionInternal_t)); subInternal->sub = subscription; subInternal->dirty = false; - HANDLE_MUTEX_LOCK(handle); + HANDLE_EVENTSUBS_MUTEX_LOCK(handle); rtVector_PushBack(handleInfo->eventSubs, subInternal); - HANDLE_MUTEX_UNLOCK(handle); + HANDLE_EVENTSUBS_MUTEX_UNLOCK(handle); } else { @@ -1288,7 +1307,7 @@ static int _master_event_callback_handler(char const* sender, char const* eventN RBUSLOG_DEBUG("Received master event callback: sender=%s eventName=%s componentId=%d", sender, eventName, componentId); - HANDLE_MUTEX_LOCK(handleInfo); + HANDLE_EVENTSUBS_MUTEX_LOCK(handleInfo); subInternal = rbusEventSubscription_find(handleInfo->eventSubs, eventName, filter, interval, duration); if(subInternal) @@ -1316,14 +1335,14 @@ static int _master_event_callback_handler(char const* sender, char const* eventN else { RBUSLOG_DEBUG("Received master event callback: sender=%s eventName=%s, but no subscription found", sender, event.name); - HANDLE_MUTEX_UNLOCK(handleInfo); + HANDLE_EVENTSUBS_MUTEX_UNLOCK(handleInfo); return RBUSCORE_ERROR_EVENT_NOT_HANDLED; } exit_1: rbusObject_Release(event.data); rbusFilter_Release(filter); - HANDLE_MUTEX_UNLOCK(handleInfo); + HANDLE_EVENTSUBS_MUTEX_UNLOCK(handleInfo); return RBUSCORE_SUCCESS; } @@ -2744,7 +2763,8 @@ rbusError_t rbus_open(rbusHandle_t* handle, char const* componentName) rtVector_Create(&tmpHandle->messageCallbacks); ERROR_CHECK(pthread_mutexattr_init(&attrib)); ERROR_CHECK(pthread_mutexattr_settype(&attrib, PTHREAD_MUTEX_ERRORCHECK)); - ERROR_CHECK(pthread_mutex_init(&tmpHandle->handleMutex, &attrib)); + ERROR_CHECK(pthread_mutex_init(&tmpHandle->handle_eventSubsMutex, &attrib)); + ERROR_CHECK(pthread_mutex_init(&tmpHandle->handle_subsMutex, &attrib)); *handle = tmpHandle; @@ -2886,7 +2906,7 @@ rbusError_t rbus_close(rbusHandle_t handle) if(handleInfo->eventSubs) { int i; - HANDLE_MUTEX_LOCK(handle); + HANDLE_EVENTSUBS_MUTEX_LOCK(handle); int count = (int)rtVector_Size(handleInfo->eventSubs); RBUSLOG_INFO("Cleaning up all (%d) subscriptions", count); for(i = 0; i < count; ++i) @@ -2906,7 +2926,7 @@ rbusError_t rbus_close(rbusHandle_t handle) } rtVector_Destroy(handleInfo->eventSubs, NULL); handleInfo->eventSubs = NULL; - HANDLE_MUTEX_UNLOCK(handle); + HANDLE_EVENTSUBS_MUTEX_UNLOCK(handle); } if (handleInfo->messageCallbacks) @@ -2918,9 +2938,9 @@ rbusError_t rbus_close(rbusHandle_t handle) if(handleInfo->subscriptions != NULL) { - HANDLE_MUTEX_LOCK(handle); + HANDLE_SUBS_MUTEX_LOCK(handle); rbusSubscriptions_destroy(handleInfo->subscriptions); - HANDLE_MUTEX_UNLOCK(handle); + HANDLE_SUBS_MUTEX_UNLOCK(handle); handleInfo->subscriptions = NULL; } @@ -2964,7 +2984,8 @@ rbusError_t rbus_close(rbusHandle_t handle) _rbus_open_pre_initialize(false); } - ERROR_CHECK(pthread_mutex_destroy(&handleInfo->handleMutex)); + ERROR_CHECK(pthread_mutex_destroy(&handleInfo->handle_eventSubsMutex)); + ERROR_CHECK(pthread_mutex_destroy(&handleInfo->handle_subsMutex)); UnlockMutex(); @@ -3042,9 +3063,9 @@ rbusError_t rbus_regDataElements( } else { - HANDLE_MUTEX_LOCK(handle); + HANDLE_SUBS_MUTEX_LOCK(handle); rbusSubscriptions_resubscribeElementCache(handle, handleInfo->subscriptions, name, node); - HANDLE_MUTEX_UNLOCK(handle); + HANDLE_SUBS_MUTEX_UNLOCK(handle); RBUSLOG_DEBUG("%s inserted successfully!", name); } } @@ -4527,22 +4548,22 @@ static rbusError_t rbusEvent_SubscribeWithRetries( int destNotFoundSleep = 1000; /*miliseconds*/ int destNotFoundTimeout; struct _rbusHandle* handleInfo = (struct _rbusHandle*)handle; - HANDLE_MUTEX_LOCK(handle); + HANDLE_EVENTSUBS_MUTEX_LOCK(handle); if ((subInternal = rbusEventSubscription_find(handleInfo->eventSubs, eventName, filter, interval, duration)) != NULL) { if (!subInternal->dirty) { - HANDLE_MUTEX_UNLOCK(handle); + HANDLE_EVENTSUBS_MUTEX_UNLOCK(handle); return RBUS_ERROR_SUBSCRIPTION_ALREADY_EXIST; } } else if (rbusAsyncSubscribe_GetSubscription(handle, eventName, filter)) { - HANDLE_MUTEX_UNLOCK(handle); + HANDLE_EVENTSUBS_MUTEX_UNLOCK(handle); return RBUS_ERROR_SUBSCRIPTION_ALREADY_EXIST; } - HANDLE_MUTEX_UNLOCK(handle); + HANDLE_EVENTSUBS_MUTEX_UNLOCK(handle); if(timeout == -1) { @@ -4630,9 +4651,9 @@ static rbusError_t rbusEvent_SubscribeWithRetries( subInternal->sub = sub; subInternal->dirty = false; - HANDLE_MUTEX_LOCK(handle); + HANDLE_EVENTSUBS_MUTEX_LOCK(handle); rtVector_PushBack(handleInfo->eventSubs, subInternal); - HANDLE_MUTEX_UNLOCK(handle); + HANDLE_EVENTSUBS_MUTEX_UNLOCK(handle); if(publishOnSubscribe) { @@ -4700,7 +4721,7 @@ static void _subscribe_rawdata_handler(rbusHandle_t handle, rbusMessage_t* msg, rbusEventSubscription_t *ptmp = (rbusEventSubscription_t *)userData; if (ptmp) { - HANDLE_MUTEX_LOCK(handle); + HANDLE_EVENTSUBS_MUTEX_LOCK(handle); rbusEventSubscriptionInternal_t *subInternal = rbusEventSubscription_find(handleInfo->eventSubs, ptmp->eventName, ptmp->filter, ptmp->interval, ptmp->duration); if (subInternal && subInternal->dirty) @@ -4719,11 +4740,11 @@ static void _subscribe_rawdata_handler(rbusHandle_t handle, rbusMessage_t* msg, RBUSLOG_WARN("%s: Remove listener failed err: %d", __FUNCTION__, errorcode); } rbusEventSubscriptionInternal_free(subInternal); - HANDLE_MUTEX_UNLOCK(handle); + HANDLE_EVENTSUBS_MUTEX_UNLOCK(handle); return; } } - HANDLE_MUTEX_UNLOCK(handle); + HANDLE_EVENTSUBS_MUTEX_UNLOCK(handle); } rbusEventHandlerRawData_t eventHandlerFuncPtr = ptmp->handler; if(eventHandlerFuncPtr) @@ -4839,7 +4860,7 @@ rbusError_t rbusEvent_Unsubscribe( /*the use of rtVector is inefficient here. I have to loop through the vector to find the sub by name, then call RemoveItem, which loops through again to find the item by address to destroy */ - HANDLE_MUTEX_LOCK(handle); + HANDLE_EVENTSUBS_MUTEX_LOCK(handle); subInternal = rbusEventSubscription_find(handleInfo->eventSubs, eventName, NULL, 0, 0); if(subInternal) @@ -4857,7 +4878,7 @@ rbusError_t rbusEvent_Unsubscribe( if(coreerr == RBUSCORE_SUCCESS) { rtVector_RemoveItem(handleInfo->eventSubs, subInternal, rbusEventSubscriptionInternal_free); - HANDLE_MUTEX_UNLOCK(handle); + HANDLE_EVENTSUBS_MUTEX_UNLOCK(handle); return RBUS_ERROR_SUCCESS; } else @@ -4873,7 +4894,7 @@ rbusError_t rbusEvent_Unsubscribe( { RBUSLOG_INFO("%s: %s failed with core err=%d", __FUNCTION__, eventName, coreerr); rtVector_RemoveItem(handleInfo->eventSubs, subInternal, rbusEventSubscriptionInternal_free); - HANDLE_MUTEX_UNLOCK(handle); + HANDLE_EVENTSUBS_MUTEX_UNLOCK(handle); return RBUS_ERROR_BUS_ERROR; } } @@ -4881,10 +4902,10 @@ rbusError_t rbusEvent_Unsubscribe( else { RBUSLOG_INFO("%s: %s no existing subscription found", __FUNCTION__, eventName); - HANDLE_MUTEX_UNLOCK(handle); + HANDLE_EVENTSUBS_MUTEX_UNLOCK(handle); return RBUS_ERROR_INVALID_OPERATION; //TODO - is the the right error to return } - HANDLE_MUTEX_UNLOCK(handle); + HANDLE_EVENTSUBS_MUTEX_UNLOCK(handle); return RBUS_ERROR_SUCCESS; } @@ -4907,7 +4928,7 @@ rbusError_t rbusEvent_UnsubscribeRawData( /*the use of rtVector is inefficient here. I have to loop through the vector to find the sub by name, then call RemoveItem, which loops through again to find the item by address to destroy */ - HANDLE_MUTEX_LOCK(handle); + HANDLE_EVENTSUBS_MUTEX_LOCK(handle); subInternal = rbusEventSubscription_find(handleInfo->eventSubs, eventName, NULL, 0, 0); if (subInternal) { @@ -4931,7 +4952,7 @@ rbusError_t rbusEvent_UnsubscribeRawData( RBUSLOG_INFO("%s: %s no existing subscription found", __FUNCTION__, eventName); errorcode = RBUS_ERROR_INVALID_OPERATION; } - HANDLE_MUTEX_UNLOCK(handle); + HANDLE_EVENTSUBS_MUTEX_UNLOCK(handle); return errorcode; } @@ -5020,7 +5041,7 @@ rbusError_t rbusEvent_SubscribeExRawData( } else { - HANDLE_MUTEX_LOCK(handle); + HANDLE_EVENTSUBS_MUTEX_LOCK(handle); subInternal = rbusEventSubscription_find(handleInfo->eventSubs, subscription[i].eventName, subscription[i].filter, subscription[i].interval, subscription[i].duration); snprintf(rawDataTopic, RBUS_MAX_NAME_LENGTH, "rawdata.%s", subscription[i].eventName); errorcode = rbusMessage_AddListener(handle, rawDataTopic, @@ -5029,7 +5050,7 @@ rbusError_t rbusEvent_SubscribeExRawData( { RBUSLOG_ERROR("%s: Listener failed err: %d", __FUNCTION__, errorcode); } - HANDLE_MUTEX_UNLOCK(handle); + HANDLE_EVENTSUBS_MUTEX_UNLOCK(handle); } } @@ -5113,11 +5134,10 @@ rbusError_t rbusEvent_UnsubscribeExRawData( /*the use of rtVector is inefficient here. I have to loop through the vector to find the sub by name, then call RemoveItem, which loops through again to find the item by address to destroy */ - HANDLE_MUTEX_LOCK(handle); + HANDLE_EVENTSUBS_MUTEX_LOCK(handle); subInternal = rbusEventSubscription_find(handleInfo->eventSubs, subscription[i].eventName, subscription[i].filter, subscription[i].interval, subscription[i].duration); if(subInternal) { - errorcode = _rbus_event_unsubscribe(handle, subInternal); if(errorcode != RBUS_ERROR_DESTINATION_NOT_REACHABLE) { @@ -5139,8 +5159,7 @@ rbusError_t rbusEvent_UnsubscribeExRawData( errorcode = RBUS_ERROR_INVALID_OPERATION; //TODO - is the the right error to return } } - - HANDLE_MUTEX_UNLOCK(handle); + HANDLE_EVENTSUBS_MUTEX_UNLOCK(handle); return errorcode; } @@ -5194,7 +5213,7 @@ rbusError_t rbusEvent_UnsubscribeEx( for(i = 0; i < numSubscriptions; ++i) { - HANDLE_MUTEX_LOCK(handleInfo); + HANDLE_EVENTSUBS_MUTEX_LOCK(handleInfo); rbusEventSubscriptionInternal_t* subInternal = NULL; subInternal = rbusEventSubscription_find(handleInfo->eventSubs, subscription[i].eventName, subscription[i].filter, subscription[i].interval, subscription[i].duration); @@ -5212,7 +5231,7 @@ rbusError_t rbusEvent_UnsubscribeEx( { errorcode = _rbus_AsyncSubscribe_remove_subscription(handle, &subscription[i]); } - HANDLE_MUTEX_UNLOCK(handleInfo); + HANDLE_EVENTSUBS_MUTEX_UNLOCK(handleInfo); } return errorcode; } @@ -5238,7 +5257,7 @@ bool rbusEvent_IsSubscriptionExist( return false; } - HANDLE_MUTEX_LOCK(handle); + HANDLE_EVENTSUBS_MUTEX_LOCK(handle); if (subscription) { RBUSLOG_INFO("%s: %s", __FUNCTION__, subscription->eventName); @@ -5252,7 +5271,7 @@ bool rbusEvent_IsSubscriptionExist( } ret = (subInternal ? true : false); - HANDLE_MUTEX_UNLOCK(handle); + HANDLE_EVENTSUBS_MUTEX_UNLOCK(handle); return ret; } @@ -5347,7 +5366,7 @@ rbusError_t rbusEvent_Publish( } /*Loop through element's subscriptions*/ - HANDLE_MUTEX_LOCK(handle); + HANDLE_SUBS_MUTEX_LOCK(handle); rtList_GetFront(el->subscriptions, &listItem); while(listItem) { @@ -5422,7 +5441,7 @@ rbusError_t rbusEvent_Publish( rtListItem_GetNext(listItem, &listItem); } - HANDLE_MUTEX_UNLOCK(handle); + HANDLE_SUBS_MUTEX_UNLOCK(handle); return errOut == RBUSCORE_SUCCESS ? RBUS_ERROR_SUCCESS: RBUS_ERROR_BUS_ERROR; } diff --git a/src/rbus/rbus_handle.h b/src/rbus/rbus_handle.h index f22bb9ff..69972375 100644 --- a/src/rbus/rbus_handle.h +++ b/src/rbus/rbus_handle.h @@ -63,7 +63,8 @@ struct _rbusHandle rtVector messageCallbacks; rtConnection m_connection; rbusHandleType_t m_handleType; - pthread_mutex_t handleMutex; + pthread_mutex_t handle_eventSubsMutex; + pthread_mutex_t handle_subsMutex; }; void rbusHandleList_Add(struct _rbusHandle* handle); diff --git a/src/rbus/rbus_intervalsubscription.c b/src/rbus/rbus_intervalsubscription.c index 6b5c9f69..1ebc1769 100644 --- a/src/rbus/rbus_intervalsubscription.c +++ b/src/rbus/rbus_intervalsubscription.c @@ -202,9 +202,9 @@ static void* PublishingThreadFunc(void* rec) ERROR_CHECK(pthread_mutex_unlock(&sub_rec->mutex)); if(duration_complete) { ERROR_CHECK(pthread_mutex_lock(&gMutex)); - ERROR_CHECK(pthread_mutex_lock(&handleInfo->handleMutex)); + ERROR_CHECK(pthread_mutex_lock(&handleInfo->handle_subsMutex)); rbusSubscriptions_removeSubscription(handleInfo->subscriptions, sub); - ERROR_CHECK(pthread_mutex_unlock(&handleInfo->handleMutex)); + ERROR_CHECK(pthread_mutex_unlock(&handleInfo->handle_subsMutex)); rtVector_RemoveItem(gRecord, sub_rec, sub_Free); ERROR_CHECK(pthread_mutex_unlock(&gMutex)); }