diff --git a/src/rbus/rbus.c b/src/rbus/rbus.c index bd333c22..8c4592ad 100644 --- a/src/rbus/rbus.c +++ b/src/rbus/rbus.c @@ -1232,19 +1232,27 @@ void _subscribe_async_callback_handler(rbusHandle_t handle, rbusEventSubscriptio struct _rbusHandle* handleInfo = (struct _rbusHandle*)handle; subscription->asyncHandler(subscription->handle, subscription, error); - - if(error == RBUS_ERROR_SUCCESS) - { - rbusEventSubscriptionInternal_t* subInternal = rt_malloc(sizeof(rbusEventSubscriptionInternal_t)); - subInternal->sub = subscription; - subInternal->dirty = false; - HANDLE_EVENTSUBS_MUTEX_LOCK(handle); - rtVector_PushBack(handleInfo->eventSubs, subInternal); - HANDLE_EVENTSUBS_MUTEX_UNLOCK(handle); - } - else + if(subscription) { - rbusEventSubscription_free(subscription); + if(error == RBUS_ERROR_SUCCESS) + { + HANDLE_EVENTSUBS_MUTEX_LOCK(handle); + rbusEventSubscriptionInternal_t* subInternal = NULL; + if ((subInternal = rbusEventSubscription_find(handleInfo->eventSubs, subscription->eventName, + subscription->filter, subscription->interval, subscription->duration)) != NULL) + { + rtVector_RemoveItem(handleInfo->eventSubs, subInternal, rbusEventSubscriptionInternal_free); + } + subInternal = rt_malloc(sizeof(rbusEventSubscriptionInternal_t)); + subInternal->sub = subscription; + subInternal->dirty = false; + rtVector_PushBack(handleInfo->eventSubs, subInternal); + HANDLE_EVENTSUBS_MUTEX_UNLOCK(handle); + } + else + { + rbusEventSubscription_free(subscription); + } } } @@ -1312,7 +1320,7 @@ static int _master_event_callback_handler(char const* sender, char const* eventN if(subInternal) { - if(subInternal->dirty && !(subInternal->sub->asyncHandler)) + if(subInternal->dirty) { errorcode = _rbus_event_unsubscribe(handleInfo, subInternal); if(errorcode != RBUS_ERROR_DESTINATION_NOT_REACHABLE) @@ -4551,7 +4559,12 @@ static rbusError_t rbusEvent_SubscribeWithRetries( HANDLE_EVENTSUBS_MUTEX_LOCK(handle); if ((subInternal = rbusEventSubscription_find(handleInfo->eventSubs, eventName, filter, interval, duration)) != NULL) { - if (!subInternal->dirty) + /*Allow only for dirty subscription*/ + if (subInternal->dirty) + { + subInternal->dirty = false; + } + else { HANDLE_EVENTSUBS_MUTEX_UNLOCK(handle); return RBUS_ERROR_SUBSCRIPTION_ALREADY_EXIST; @@ -4574,30 +4587,23 @@ static rbusError_t rbusEvent_SubscribeWithRetries( destNotFoundTimeout = timeout * 1000; /*convert seconds to milliseconds */ } - if (subInternal && subInternal->dirty) - { - sub = subInternal->sub; - } - else - { - sub = rt_malloc(sizeof(rbusEventSubscription_t)); + sub = rt_malloc(sizeof(rbusEventSubscription_t)); - sub->handle = handle; - sub->eventName = strdup(eventName); - sub->handler = handler; - sub->userData = userData; - sub->filter = filter; - sub->duration = duration; - sub->interval = interval; - sub->asyncHandler = async; + sub->handle = handle; + sub->eventName = strdup(eventName); + sub->handler = handler; + sub->userData = userData; + sub->filter = filter; + sub->duration = duration; + sub->interval = interval; + sub->asyncHandler = async; - if(sub->filter) - rbusFilter_Retain(sub->filter); + if(sub->filter) + rbusFilter_Retain(sub->filter); - } payload = rbusEvent_CreateSubscribePayload(sub, handleInfo->componentId); - if ((subInternal && !subInternal->dirty) || sub->asyncHandler) + if (sub->asyncHandler) { //FIXME: this should take the payload too (mrollins) because rbus_asynsubscribe is passing NULL for filter to rbus_subscribeToEvent rbusAsyncSubscribe_AddSubscription(sub, payload); @@ -4647,14 +4653,16 @@ static rbusError_t rbusEvent_SubscribeWithRetries( if(coreerr == RBUSCORE_SUCCESS) { int initial_value = 0; + HANDLE_EVENTSUBS_MUTEX_LOCK(handle); + if ((subInternal = rbusEventSubscription_find(handleInfo->eventSubs, eventName, filter, interval, duration)) != NULL) + { + rtVector_RemoveItem(handleInfo->eventSubs, subInternal, rbusEventSubscriptionInternal_free); + } subInternal = rt_malloc(sizeof(rbusEventSubscriptionInternal_t)); subInternal->sub = sub; subInternal->dirty = false; - - HANDLE_EVENTSUBS_MUTEX_LOCK(handle); rtVector_PushBack(handleInfo->eventSubs, subInternal); HANDLE_EVENTSUBS_MUTEX_UNLOCK(handle); - if(publishOnSubscribe) { rbusMessage_GetInt32(response, &initial_value); @@ -4672,9 +4680,16 @@ static rbusError_t rbusEvent_SubscribeWithRetries( { RBUSLOG_DEBUG("%s: %s all subscribe retries failed because no provider could be found", __FUNCTION__, eventName); RBUSLOG_WARN("EVENT_SUBSCRIPTION_FAIL_NO_PROVIDER_COMPONENT %s", eventName);/*RDKB-33658-AC7*/ - if (!(subInternal && subInternal->dirty)) + HANDLE_EVENTSUBS_MUTEX_LOCK(handle); + if ((subInternal = rbusEventSubscription_find(handleInfo->eventSubs, eventName, filter, interval, duration)) != NULL) + { + subInternal->dirty = true; + } + else + { rbusEventSubscription_free(sub); - + } + HANDLE_EVENTSUBS_MUTEX_UNLOCK(handle); return RBUS_ERROR_TIMEOUT; } else if(providerError != RBUS_ERROR_SUCCESS) @@ -4682,26 +4697,30 @@ static rbusError_t rbusEvent_SubscribeWithRetries( RBUSLOG_DEBUG("%s: %s subscribe retries failed due provider error %d", __FUNCTION__, eventName, providerError); if (providerError == RBUS_ERROR_SUBSCRIPTION_ALREADY_EXIST) { - if (subInternal) + HANDLE_EVENTSUBS_MUTEX_LOCK(handle); + if ((subInternal = rbusEventSubscription_find(handleInfo->eventSubs, eventName, filter, interval, duration)) != NULL) { - subInternal->dirty = false; + rtVector_RemoveItem(handleInfo->eventSubs, subInternal, rbusEventSubscriptionInternal_free); } + subInternal = rt_malloc(sizeof(rbusEventSubscriptionInternal_t)); + subInternal->sub = sub; + subInternal->dirty = false; + rtVector_PushBack(handleInfo->eventSubs, subInternal); RBUSLOG_INFO("EVENT_SUBSCRIPTION_ALREADY_EXIST %s", subInternal->sub->eventName); + HANDLE_EVENTSUBS_MUTEX_UNLOCK(handle); return RBUS_ERROR_SUCCESS; } else { RBUSLOG_WARN("EVENT_SUBSCRIPTION_FAIL_INVALID_INPUT %s", eventName);/*RDKB-33658-AC9*/ - if (!(subInternal && subInternal->dirty)) - rbusEventSubscription_free(sub); + rbusEventSubscription_free(sub); return providerError; } } else { RBUSLOG_WARN("%s: %s subscribe retries failed due to core error %d", __FUNCTION__, eventName, coreerr); - if (!(subInternal && subInternal->dirty)) - rbusEventSubscription_free(sub); + rbusEventSubscription_free(sub); return RBUS_ERROR_BUS_ERROR; } }