Skip to content

Commit

Permalink
Publish DistributedEventSent/Received events in `LocalDistributedEv…
Browse files Browse the repository at this point in the history
…entBus`.

And publish `DistributedEventSent` after publishing the distributed message.
  • Loading branch information
maliming committed Jan 23, 2025
1 parent 98acfca commit 489c74f
Show file tree
Hide file tree
Showing 10 changed files with 112 additions and 178 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ private async Task ProcessEventAsync(ServiceBusReceivedMessage message)

public async override Task PublishFromOutboxAsync(OutgoingEventInfo outgoingEvent, OutboxConfig outboxConfig)
{
await PublishAsync(outgoingEvent.EventName, outgoingEvent.EventData, outgoingEvent.GetCorrelationId(), outgoingEvent.Id);

using (CorrelationIdProvider.Change(outgoingEvent.GetCorrelationId()))
{
await TriggerDistributedEventSentAsync(new DistributedEventSent()
Expand All @@ -111,8 +113,6 @@ await TriggerDistributedEventSentAsync(new DistributedEventSent()
EventData = outgoingEvent.EventData
});
}

await PublishAsync(outgoingEvent.EventName, outgoingEvent.EventData, outgoingEvent.GetCorrelationId(), outgoingEvent.Id);
}

public async override Task PublishManyFromOutboxAsync(IEnumerable<OutgoingEventInfo> outgoingEvents, OutboxConfig outboxConfig)
Expand Down Expand Up @@ -141,7 +141,12 @@ public async override Task PublishManyFromOutboxAsync(IEnumerable<OutgoingEventI
throw new AbpException(
"The message is too large to fit in the batch. Set AbpEventBusBoxesOptions.OutboxWaitingEventMaxCount to reduce the number");
}
}

await publisher.SendMessagesAsync(messageBatch);

foreach (var outgoingEvent in outgoingEventArray)
{
using (CorrelationIdProvider.Change(outgoingEvent.GetCorrelationId()))
{
await TriggerDistributedEventSentAsync(new DistributedEventSent()
Expand All @@ -152,8 +157,6 @@ await TriggerDistributedEventSentAsync(new DistributedEventSent()
});
}
}

await publisher.SendMessagesAsync(messageBatch);
}

public async override Task ProcessFromInboxAsync(IncomingEventInfo incomingEvent, InboxConfig inboxConfig)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ protected override IEnumerable<EventTypeWithEventHandlerFactories> GetHandlerFac

public async override Task PublishFromOutboxAsync(OutgoingEventInfo outgoingEvent, OutboxConfig outboxConfig)
{
await PublishToDaprAsync(outgoingEvent.EventName, Serializer.Deserialize(outgoingEvent.EventData, GetEventType(outgoingEvent.EventName)), outgoingEvent.Id, outgoingEvent.GetCorrelationId());

using (CorrelationIdProvider.Change(outgoingEvent.GetCorrelationId()))
{
await TriggerDistributedEventSentAsync(new DistributedEventSent()
Expand All @@ -162,8 +164,6 @@ await TriggerDistributedEventSentAsync(new DistributedEventSent()
EventData = outgoingEvent.EventData
});
}

await PublishToDaprAsync(outgoingEvent.EventName, Serializer.Deserialize(outgoingEvent.EventData, GetEventType(outgoingEvent.EventName)), outgoingEvent.Id, outgoingEvent.GetCorrelationId());
}

public async override Task PublishManyFromOutboxAsync(IEnumerable<OutgoingEventInfo> outgoingEvents, OutboxConfig outboxConfig)
Expand All @@ -172,6 +172,8 @@ public async override Task PublishManyFromOutboxAsync(IEnumerable<OutgoingEventI

foreach (var outgoingEvent in outgoingEventArray)
{
await PublishToDaprAsync(outgoingEvent.EventName, Serializer.Deserialize(outgoingEvent.EventData, GetEventType(outgoingEvent.EventName)), outgoingEvent.Id, outgoingEvent.GetCorrelationId());

using (CorrelationIdProvider.Change(outgoingEvent.GetCorrelationId()))
{
await TriggerDistributedEventSentAsync(new DistributedEventSent()
Expand All @@ -181,8 +183,6 @@ await TriggerDistributedEventSentAsync(new DistributedEventSent()
EventData = outgoingEvent.EventData
});
}

await PublishToDaprAsync(outgoingEvent.EventName, Serializer.Deserialize(outgoingEvent.EventData, GetEventType(outgoingEvent.EventName)), outgoingEvent.Id, outgoingEvent.GetCorrelationId());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,16 +197,6 @@ public async override Task PublishFromOutboxAsync(
OutgoingEventInfo outgoingEvent,
OutboxConfig outboxConfig)
{
using (CorrelationIdProvider.Change(outgoingEvent.GetCorrelationId()))
{
await TriggerDistributedEventSentAsync(new DistributedEventSent()
{
Source = DistributedEventSource.Outbox,
EventName = outgoingEvent.EventName,
EventData = outgoingEvent.EventData
});
}

var headers = new Headers
{
{ "messageId", System.Text.Encoding.UTF8.GetBytes(outgoingEvent.Id.ToString("N")) }
Expand All @@ -222,6 +212,16 @@ await PublishAsync(
outgoingEvent.EventData,
headers
);

using (CorrelationIdProvider.Change(outgoingEvent.GetCorrelationId()))
{
await TriggerDistributedEventSentAsync(new DistributedEventSent()
{
Source = DistributedEventSource.Outbox,
EventName = outgoingEvent.EventName,
EventData = outgoingEvent.EventData
});
}
}

public async override Task PublishManyFromOutboxAsync(IEnumerable<OutgoingEventInfo> outgoingEvents, OutboxConfig outboxConfig)
Expand All @@ -242,6 +242,15 @@ public async override Task PublishManyFromOutboxAsync(IEnumerable<OutgoingEventI
headers.Add(EventBusConsts.CorrelationIdHeaderName, System.Text.Encoding.UTF8.GetBytes(outgoingEvent.GetCorrelationId()!));
}

producer.Produce(
AbpKafkaEventBusOptions.TopicName,
new Message<string, byte[]>
{
Key = outgoingEvent.EventName,
Value = outgoingEvent.EventData,
Headers = headers
});

using (CorrelationIdProvider.Change(outgoingEvent.GetCorrelationId()))
{
await TriggerDistributedEventSentAsync(new DistributedEventSent()
Expand All @@ -251,15 +260,6 @@ await TriggerDistributedEventSentAsync(new DistributedEventSent()
EventData = outgoingEvent.EventData
});
}

producer.Produce(
AbpKafkaEventBusOptions.TopicName,
new Message<string, byte[]>
{
Key = outgoingEvent.EventName,
Value = outgoingEvent.EventData,
Headers = headers
});
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,8 @@ public async override Task PublishFromOutboxAsync(
OutgoingEventInfo outgoingEvent,
OutboxConfig outboxConfig)
{
await PublishAsync(outgoingEvent.EventName, outgoingEvent.EventData, eventId: outgoingEvent.Id, correlationId: outgoingEvent.GetCorrelationId());

using (CorrelationIdProvider.Change(outgoingEvent.GetCorrelationId()))
{
await TriggerDistributedEventSentAsync(new DistributedEventSent()
Expand All @@ -216,8 +218,6 @@ await TriggerDistributedEventSentAsync(new DistributedEventSent()
EventData = outgoingEvent.EventData
});
}

await PublishAsync(outgoingEvent.EventName, outgoingEvent.EventData, eventId: outgoingEvent.Id, correlationId: outgoingEvent.GetCorrelationId());
}

public async override Task PublishManyFromOutboxAsync(
Expand All @@ -231,6 +231,13 @@ public async override Task PublishManyFromOutboxAsync(

foreach (var outgoingEvent in outgoingEventArray)
{
await PublishAsync(
channel,
outgoingEvent.EventName,
outgoingEvent.EventData,
eventId: outgoingEvent.Id,
correlationId: outgoingEvent.GetCorrelationId());

using (CorrelationIdProvider.Change(outgoingEvent.GetCorrelationId()))
{
await TriggerDistributedEventSentAsync(new DistributedEventSent()
Expand All @@ -240,13 +247,6 @@ await TriggerDistributedEventSentAsync(new DistributedEventSent()
EventData = outgoingEvent.EventData
});
}

await PublishAsync(
channel,
outgoingEvent.EventName,
outgoingEvent.EventData,
eventId: outgoingEvent.Id,
correlationId: outgoingEvent.GetCorrelationId());
}

channel.WaitForConfirmsOrDie();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,14 @@ public async override Task PublishFromOutboxAsync(
var eventType = EventTypes.GetOrDefault(outgoingEvent.EventName)!;
var eventData = Serializer.Deserialize(outgoingEvent.EventData, eventType);

var headers = new Dictionary<string, string>();
if (outgoingEvent.GetCorrelationId() != null)
{
headers.Add(EventBusConsts.CorrelationIdHeaderName, outgoingEvent.GetCorrelationId()!);
}

await PublishAsync(eventType, eventData, eventId: outgoingEvent.Id, headersArguments: headers);

using (CorrelationIdProvider.Change(outgoingEvent.GetCorrelationId()))
{
await TriggerDistributedEventSentAsync(new DistributedEventSent() {
Expand All @@ -261,14 +269,6 @@ await TriggerDistributedEventSentAsync(new DistributedEventSent() {
EventData = outgoingEvent.EventData
});
}

var headers = new Dictionary<string, string>();
if (outgoingEvent.GetCorrelationId() != null)
{
headers.Add(EventBusConsts.CorrelationIdHeaderName, outgoingEvent.GetCorrelationId()!);
}

await PublishAsync(eventType, eventData, eventId: outgoingEvent.Id, headersArguments: headers);
}

public async override Task PublishManyFromOutboxAsync(IEnumerable<OutgoingEventInfo> outgoingEvents, OutboxConfig outboxConfig)
Expand All @@ -279,6 +279,8 @@ public async override Task PublishManyFromOutboxAsync(IEnumerable<OutgoingEventI
{
foreach (var outgoingEvent in outgoingEventArray)
{
await PublishFromOutboxAsync(outgoingEvent, outboxConfig);

using (CorrelationIdProvider.Change(outgoingEvent.GetCorrelationId()))
{
await TriggerDistributedEventSentAsync(new DistributedEventSent()
Expand All @@ -288,8 +290,6 @@ await TriggerDistributedEventSentAsync(new DistributedEventSent()
EventData = outgoingEvent.EventData
});
}

await PublishFromOutboxAsync(outgoingEvent, outboxConfig);
}

await scope.CompleteAsync();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using System;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
Expand Down Expand Up @@ -85,14 +85,14 @@ public async Task PublishAsync(
}
}

await PublishToEventBusAsync(eventType, eventData);

await TriggerDistributedEventSentAsync(new DistributedEventSent()
{
Source = DistributedEventSource.Direct,
EventName = EventNameAttribute.GetNameOrDefault(eventType),
EventData = eventData
});

await PublishToEventBusAsync(eventType, eventData);
}

public abstract Task PublishFromOutboxAsync(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,24 +122,50 @@ public void UnsubscribeAll(Type eventType)
_localEventBus.UnsubscribeAll(eventType);
}

public Task PublishAsync<TEvent>(TEvent eventData, bool onUnitOfWorkComplete = true)
public async Task PublishAsync<TEvent>(TEvent eventData, bool onUnitOfWorkComplete = true)
where TEvent : class
{
return _localEventBus.PublishAsync(eventData, onUnitOfWorkComplete);
await _localEventBus.PublishAsync(eventData, onUnitOfWorkComplete);
await PublishDistributedEventSentReceivedAsync(typeof(TEvent), eventData, onUnitOfWorkComplete);
}

public Task PublishAsync(Type eventType, object eventData, bool onUnitOfWorkComplete = true)
public async Task PublishAsync(Type eventType, object eventData, bool onUnitOfWorkComplete = true)
{
return _localEventBus.PublishAsync(eventType, eventData, onUnitOfWorkComplete);
await _localEventBus.PublishAsync(eventType, eventData, onUnitOfWorkComplete);
await PublishDistributedEventSentReceivedAsync(eventType, eventData, onUnitOfWorkComplete);
}

public Task PublishAsync<TEvent>(TEvent eventData, bool onUnitOfWorkComplete = true, bool useOutbox = true) where TEvent : class
public async Task PublishAsync<TEvent>(TEvent eventData, bool onUnitOfWorkComplete = true, bool useOutbox = true) where TEvent : class
{
return _localEventBus.PublishAsync(eventData, onUnitOfWorkComplete);
await _localEventBus.PublishAsync(eventData, onUnitOfWorkComplete);
await PublishDistributedEventSentReceivedAsync(typeof(TEvent), eventData, onUnitOfWorkComplete);
}

public Task PublishAsync(Type eventType, object eventData, bool onUnitOfWorkComplete = true, bool useOutbox = true)
public async Task PublishAsync(Type eventType, object eventData, bool onUnitOfWorkComplete = true, bool useOutbox = true)
{
return _localEventBus.PublishAsync(eventType, eventData, onUnitOfWorkComplete);
await _localEventBus.PublishAsync(eventType, eventData, onUnitOfWorkComplete);
await PublishDistributedEventSentReceivedAsync(eventType, eventData, onUnitOfWorkComplete);
}

private async Task PublishDistributedEventSentReceivedAsync(Type eventType, object eventData, bool onUnitOfWorkComplete)
{
if (eventType == typeof(DistributedEventSent) || eventType == typeof(DistributedEventReceived))
{
return;
}

await _localEventBus.PublishAsync(new DistributedEventSent
{
Source = DistributedEventSource.Direct,
EventName = EventNameAttribute.GetNameOrDefault(eventType),
EventData = eventData
}, onUnitOfWorkComplete);

await _localEventBus.PublishAsync(new DistributedEventReceived
{
Source = DistributedEventSource.Direct,
EventName = EventNameAttribute.GetNameOrDefault(eventType),
EventData = eventData
}, onUnitOfWorkComplete);
}
}

This file was deleted.

Loading

0 comments on commit 489c74f

Please sign in to comment.