Skip to content

Commit

Permalink
feat: FluentWorkflow.RabbitMQ引入IRabbitMQExchangeSelector,以能够动态的改变消息发送…
Browse files Browse the repository at this point in the history
…时使用的交换机
  • Loading branch information
stratosblue committed Nov 22, 2024
1 parent 6c3e9e4 commit 6abd883
Show file tree
Hide file tree
Showing 10 changed files with 157 additions and 51 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ A message driven distributed asynchronous workflow framework. 消息驱动的分
### 3.1 引用 `FluentWorkflow.Core`
```xml
<ItemGroup>
<PackageReference Include="FluentWorkflow.Core" Version="1.3.1" />
<PackageReference Include="FluentWorkflow.Core" Version="1.3.3" />
</ItemGroup>
```

Expand Down Expand Up @@ -194,7 +194,7 @@ await workflow.StartAsync(default);
#### 引用 `FluentWorkflow.RabbitMQ`
```xml
<ItemGroup>
<PackageReference Include="FluentWorkflow.RabbitMQ" Version="1.3.1" />
<PackageReference Include="FluentWorkflow.RabbitMQ" Version="1.3.3" />
</ItemGroup>
```
#### 配置
Expand Down
2 changes: 1 addition & 1 deletion package.props
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<Project>
<PropertyGroup>
<Version>1.3.2</Version>
<Version>1.3.3</Version>

<Description>A message driven distributed asynchronous workflow framework. 消息驱动的分布式异步工作流程处理框架。</Description>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public static IFluentWorkflowBuilder UseRabbitMQMessageDispatcher(this IFluentWo
builder.Services.Configure<RabbitMQOptions>(optionsSetup);
}

builder.Services.TryAddSingleton<IRabbitMQExchangeSelector, OptionsBasedRabbitMQExchangeSelector>();
builder.Services.TryAddSingleton<IRabbitMQConnectionProvider, RabbitMQConnectionProvider>();
builder.Services.TryAddSingleton<IRabbitMQChannelPool, RabbitMQChannelPool>();

Expand Down
42 changes: 42 additions & 0 deletions src/FluentWorkflow.RabbitMQ.Legacy/IRabbitMQExchangeSelector.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
using System.ComponentModel;
using FluentWorkflow.Interface;
using Microsoft.Extensions.Options;

namespace FluentWorkflow.RabbitMQ;

/// <summary>
/// RabbitMQ交换机选择器
/// </summary>
public interface IRabbitMQExchangeSelector
{
#region Public 方法

/// <summary>
/// 获取消息 <paramref name="message"/> 应该使用的交换机
/// </summary>
/// <typeparam name="TMessage"></typeparam>
/// <param name="message"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
string GetExchange<TMessage>(TMessage message, CancellationToken cancellationToken = default) where TMessage : class, IWorkflowMessage, IWorkflowContextCarrier<IWorkflowContext>, IEventNameDeclaration;

#endregion Public 方法
}

/// <summary>
/// 基于 <see cref="RabbitMQOptions"/> 的交换机选择器
/// </summary>
[EditorBrowsable(EditorBrowsableState.Advanced)]
public sealed class OptionsBasedRabbitMQExchangeSelector(IOptionsMonitor<RabbitMQOptions> optionsMonitor) : IRabbitMQExchangeSelector
{
#region Public 方法

/// <inheritdoc/>
public string GetExchange<TMessage>(TMessage message, CancellationToken cancellationToken)
where TMessage : class, IWorkflowMessage, IWorkflowContextCarrier<IWorkflowContext>, IEventNameDeclaration
{
return optionsMonitor.CurrentValue.ExchangeName ?? RabbitMQOptions.DefaultExchangeName;
}

#endregion Public 方法
}
3 changes: 1 addition & 2 deletions src/FluentWorkflow.RabbitMQ.Legacy/RabbitMQOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,7 @@ public class RabbitMQOptions
public MessageRequeuePolicy ErrorMessageRequeuePolicy { get; set; } = MessageRequeuePolicy.Default;

/// <summary>
/// 使用的交换机名称
/// TODO 单元测试
/// 默认使用的交换机名称(如果注册了自定义的 <see cref="IRabbitMQExchangeSelector"/>,则此配置无效)
/// </summary>
public string? ExchangeName { get; set; } = DefaultExchangeName;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Diagnostics;
using System.ComponentModel;
using System.Diagnostics;
using FluentWorkflow.Diagnostics;
using FluentWorkflow.Interface;
using FluentWorkflow.RabbitMQ;
Expand All @@ -9,37 +10,54 @@

namespace FluentWorkflow;

internal sealed class RabbitMQWorkflowMessageDispatcher
/// <summary>
/// 基于RabbitMQ的 <inheritdoc cref="WorkflowMessageDispatcher"/>
/// </summary>
[EditorBrowsable(EditorBrowsableState.Advanced)]
public class RabbitMQWorkflowMessageDispatcher
: WorkflowMessageDispatcher, IWorkflowMessageDispatcher, IDisposable
{
#region Private 字段

private readonly IObjectSerializer _objectSerializer;

private readonly IDisposable? _optionsMonitorDisposer;

private readonly IRabbitMQChannelPool _rabbitMQChannelPool;

private bool _disposed;

private RabbitMQOptions _rabbitMQOptions;

#endregion Private 字段

#region Protected 属性

/// <inheritdoc cref="IRabbitMQExchangeSelector"/>
protected IRabbitMQExchangeSelector ExchangeSelector { get; }

/// <inheritdoc cref="IObjectSerializer"/>
protected IObjectSerializer ObjectSerializer { get; }

/// <inheritdoc cref="RabbitMQOptions"/>
protected RabbitMQOptions Options { get; private set; }

/// <inheritdoc cref="IRabbitMQChannelPool"/>
protected IRabbitMQChannelPool RabbitMQChannelPool { get; }

#endregion Protected 属性

#region Public 构造函数

/// <inheritdoc cref="RabbitMQWorkflowMessageDispatcher"/>
public RabbitMQWorkflowMessageDispatcher(IRabbitMQChannelPool rabbitMQChannelPool,
IWorkflowDiagnosticSource diagnosticSource,
IObjectSerializer objectSerializer,
IRabbitMQExchangeSelector exchangeSelector,
IOptionsMonitor<RabbitMQOptions> rabbitMQOptionsMonitor,
ILogger<RabbitMQWorkflowMessageDispatcher> logger)
: base(diagnosticSource, logger)
{
_rabbitMQChannelPool = rabbitMQChannelPool;
_objectSerializer = objectSerializer ?? throw new ArgumentNullException(nameof(objectSerializer));
RabbitMQChannelPool = rabbitMQChannelPool ?? throw new ArgumentNullException(nameof(rabbitMQChannelPool));
ObjectSerializer = objectSerializer ?? throw new ArgumentNullException(nameof(objectSerializer));
ExchangeSelector = exchangeSelector ?? throw new ArgumentNullException(nameof(exchangeSelector));

_optionsMonitorDisposer = rabbitMQOptionsMonitor.OnChange(options => _rabbitMQOptions = options);
_rabbitMQOptions = rabbitMQOptionsMonitor.CurrentValue;
_optionsMonitorDisposer = rabbitMQOptionsMonitor.OnChange(options => Options = options);
Options = rabbitMQOptionsMonitor.CurrentValue;
}

#endregion Public 构造函数
Expand Down Expand Up @@ -78,15 +96,20 @@ private void SendMessage<TMessage>(IModel channel, TMessage message, Cancellatio
{ RabbitMQOptions.WorkflowIdHeaderKey, message.Id }
};

var data = _objectSerializer.SerializeToBytes(message);
var exchange = ExchangeSelector.GetExchange(message, cancellationToken);

var data = ObjectSerializer.SerializeToBytes(message);

cancellationToken.ThrowIfCancellationRequested();

channel.BasicPublish(exchange: _rabbitMQOptions.ExchangeName ?? RabbitMQOptions.DefaultExchangeName, routingKey: TMessage.EventName, basicProperties, body: data);
channel.BasicPublish(exchange: exchange,
routingKey: TMessage.EventName,
basicProperties,
body: data);

if (channel.NextPublishSeqNo > 0)
{
while (!channel.WaitForConfirms(_rabbitMQOptions.PublisherConfirmsCheckTimeout))
while (!channel.WaitForConfirms(Options.PublisherConfirmsCheckTimeout))
{
cancellationToken.ThrowIfCancellationRequested();
//TODO 取消正在确认的消息?
Expand All @@ -106,7 +129,7 @@ private async Task SendMessageAsync<TMessage>(TMessage message, CancellationToke
IModel? channel = null;
try
{
channel = await _rabbitMQChannelPool.RentAsync(cancellationToken);
channel = await RabbitMQChannelPool.RentAsync(cancellationToken);
SendMessage(channel, message, cancellationToken);
}
catch (Exception ex)
Expand All @@ -118,7 +141,7 @@ private async Task SendMessageAsync<TMessage>(TMessage message, CancellationToke
{
if (channel is not null)
{
_rabbitMQChannelPool.Return(channel);
RabbitMQChannelPool.Return(channel);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public static IFluentWorkflowBuilder UseRabbitMQMessageDispatcher(this IFluentWo
builder.Services.Configure<RabbitMQOptions>(optionsSetup);
}

builder.Services.TryAddSingleton<IRabbitMQExchangeSelector, OptionsBasedRabbitMQExchangeSelector>();
builder.Services.TryAddSingleton<IRabbitMQConnectionProvider, RabbitMQConnectionProvider>();
builder.Services.TryAddSingleton<IRabbitMQChannelPool, RabbitMQChannelPool>();

Expand Down
42 changes: 42 additions & 0 deletions src/FluentWorkflow.RabbitMQ/IRabbitMQExchangeSelector.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
using System.ComponentModel;
using FluentWorkflow.Interface;
using Microsoft.Extensions.Options;

namespace FluentWorkflow.RabbitMQ;

/// <summary>
/// RabbitMQ交换机选择器
/// </summary>
public interface IRabbitMQExchangeSelector
{
#region Public 方法

/// <summary>
/// 获取消息 <paramref name="message"/> 应该使用的交换机
/// </summary>
/// <typeparam name="TMessage"></typeparam>
/// <param name="message"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
ValueTask<string> GetExchangeAsync<TMessage>(TMessage message, CancellationToken cancellationToken = default) where TMessage : class, IWorkflowMessage, IWorkflowContextCarrier<IWorkflowContext>, IEventNameDeclaration;

#endregion Public 方法
}

/// <summary>
/// 基于 <see cref="RabbitMQOptions"/> 的交换机选择器
/// </summary>
[EditorBrowsable(EditorBrowsableState.Advanced)]
public sealed class OptionsBasedRabbitMQExchangeSelector(IOptionsMonitor<RabbitMQOptions> optionsMonitor) : IRabbitMQExchangeSelector
{
#region Public 方法

/// <inheritdoc/>
public ValueTask<string> GetExchangeAsync<TMessage>(TMessage message, CancellationToken cancellationToken)
where TMessage : class, IWorkflowMessage, IWorkflowContextCarrier<IWorkflowContext>, IEventNameDeclaration
{
return ValueTask.FromResult(optionsMonitor.CurrentValue.ExchangeName ?? RabbitMQOptions.DefaultExchangeName);
}

#endregion Public 方法
}
3 changes: 1 addition & 2 deletions src/FluentWorkflow.RabbitMQ/RabbitMQOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,7 @@ public class RabbitMQOptions
public MessageRequeuePolicy ErrorMessageRequeuePolicy { get; set; } = MessageRequeuePolicy.Default;

/// <summary>
/// 使用的交换机名称
/// TODO 单元测试
/// 默认使用的交换机名称(如果注册了自定义的 <see cref="IRabbitMQExchangeSelector"/>,则此配置无效)
/// </summary>
public string? ExchangeName { get; set; } = DefaultExchangeName;

Expand Down
53 changes: 26 additions & 27 deletions src/FluentWorkflow.RabbitMQ/RabbitMQWorkflowMessageDispatcher.cs
Original file line number Diff line number Diff line change
@@ -1,45 +1,47 @@
using System.Diagnostics;
using System.ComponentModel;
using System.Diagnostics;
using FluentWorkflow.Diagnostics;
using FluentWorkflow.Interface;
using FluentWorkflow.RabbitMQ;
using FluentWorkflow.Tracing;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using RabbitMQ.Client;

namespace FluentWorkflow;

internal sealed class RabbitMQWorkflowMessageDispatcher
/// <summary>
/// 基于RabbitMQ的 <inheritdoc cref="WorkflowMessageDispatcher"/>
/// </summary>
[EditorBrowsable(EditorBrowsableState.Advanced)]
public class RabbitMQWorkflowMessageDispatcher
: WorkflowMessageDispatcher, IWorkflowMessageDispatcher, IDisposable
{
#region Private 字段
#region Protected 属性

private readonly IObjectSerializer _objectSerializer;
/// <inheritdoc cref="IRabbitMQExchangeSelector"/>
protected IRabbitMQExchangeSelector ExchangeSelector { get; }

private readonly IDisposable? _optionsMonitorDisposer;
/// <inheritdoc cref="IObjectSerializer"/>
protected IObjectSerializer ObjectSerializer { get; }

private readonly IRabbitMQChannelPool _rabbitMQChannelPool;
/// <inheritdoc cref="IRabbitMQChannelPool"/>
protected IRabbitMQChannelPool RabbitMQChannelPool { get; }

private bool _disposed;

private RabbitMQOptions _rabbitMQOptions;

#endregion Private 字段
#endregion Protected 属性

#region Public 构造函数

/// <inheritdoc cref="RabbitMQWorkflowMessageDispatcher"/>
public RabbitMQWorkflowMessageDispatcher(IRabbitMQChannelPool rabbitMQChannelPool,
IWorkflowDiagnosticSource diagnosticSource,
IRabbitMQExchangeSelector exchangeSelector,
IObjectSerializer objectSerializer,
IOptionsMonitor<RabbitMQOptions> rabbitMQOptionsMonitor,
ILogger<RabbitMQWorkflowMessageDispatcher> logger)
: base(diagnosticSource, logger)
{
_rabbitMQChannelPool = rabbitMQChannelPool;
_objectSerializer = objectSerializer ?? throw new ArgumentNullException(nameof(objectSerializer));

_optionsMonitorDisposer = rabbitMQOptionsMonitor.OnChange(options => _rabbitMQOptions = options);
_rabbitMQOptions = rabbitMQOptionsMonitor.CurrentValue;
RabbitMQChannelPool = rabbitMQChannelPool ?? throw new ArgumentNullException(nameof(rabbitMQChannelPool));
ExchangeSelector = exchangeSelector ?? throw new ArgumentNullException(nameof(exchangeSelector));
ObjectSerializer = objectSerializer ?? throw new ArgumentNullException(nameof(objectSerializer));
}

#endregion Public 构造函数
Expand All @@ -49,11 +51,6 @@ public RabbitMQWorkflowMessageDispatcher(IRabbitMQChannelPool rabbitMQChannelPoo
/// <inheritdoc/>
public void Dispose()
{
if (!_disposed)
{
_optionsMonitorDisposer?.Dispose();
_disposed = true;
}
}

/// <inheritdoc/>
Expand All @@ -80,11 +77,13 @@ private async Task SendMessageAsync<TMessage>(IChannel channel, TMessage message
}
};

var data = _objectSerializer.SerializeToBytes(message);
var exchange = await ExchangeSelector.GetExchangeAsync(message, cancellationToken);

var data = ObjectSerializer.SerializeToBytes(message);

cancellationToken.ThrowIfCancellationRequested();

await channel.BasicPublishAsync(exchange: _rabbitMQOptions.ExchangeName ?? RabbitMQOptions.DefaultExchangeName,
await channel.BasicPublishAsync(exchange: exchange,
routingKey: TMessage.EventName,
mandatory: false,
basicProperties: basicProperties,
Expand All @@ -104,7 +103,7 @@ private async Task SendMessageAsync<TMessage>(TMessage message, CancellationToke
IChannel? channel = null;
try
{
channel = await _rabbitMQChannelPool.RentAsync(cancellationToken);
channel = await RabbitMQChannelPool.RentAsync(cancellationToken);
await SendMessageAsync(channel, message, cancellationToken);
}
catch (Exception ex)
Expand All @@ -116,7 +115,7 @@ private async Task SendMessageAsync<TMessage>(TMessage message, CancellationToke
{
if (channel is not null)
{
_rabbitMQChannelPool.Return(channel);
RabbitMQChannelPool.Return(channel);
}
}
}
Expand Down

0 comments on commit 6abd883

Please sign in to comment.