diff --git a/samples/KafkaFlow.Sample.WildcardConsumer/KafkaFlow.Sample.WildcardConsumer.csproj b/samples/KafkaFlow.Sample.WildcardConsumer/KafkaFlow.Sample.WildcardConsumer.csproj
new file mode 100644
index 000000000..05d9e4f29
--- /dev/null
+++ b/samples/KafkaFlow.Sample.WildcardConsumer/KafkaFlow.Sample.WildcardConsumer.csproj
@@ -0,0 +1,27 @@
+
+
+
+ Exe
+ net6.0
+ enable
+ enable
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/samples/KafkaFlow.Sample.WildcardConsumer/PrintConsoleMiddleware.cs b/samples/KafkaFlow.Sample.WildcardConsumer/PrintConsoleMiddleware.cs
new file mode 100644
index 000000000..d33c68d80
--- /dev/null
+++ b/samples/KafkaFlow.Sample.WildcardConsumer/PrintConsoleMiddleware.cs
@@ -0,0 +1,18 @@
+using System.Text;
+using KafkaFlow;
+
+public class PrintConsoleMiddleware : IMessageMiddleware
+{
+ public Task Invoke(IMessageContext context, MiddlewareDelegate next)
+ {
+ Console.WriteLine(
+ "Topic: {0} | Partition: {1} | Offset: {2} | Message: {3}",
+ context.ConsumerContext.Topic,
+ context.ConsumerContext.Partition,
+ context.ConsumerContext.Offset,
+ Encoding.UTF8.GetString(
+ (byte[])context.Message.Value));
+
+ return next(context);
+ }
+}
\ No newline at end of file
diff --git a/samples/KafkaFlow.Sample.WildcardConsumer/Program.cs b/samples/KafkaFlow.Sample.WildcardConsumer/Program.cs
new file mode 100644
index 000000000..32d13df22
--- /dev/null
+++ b/samples/KafkaFlow.Sample.WildcardConsumer/Program.cs
@@ -0,0 +1,70 @@
+using System.Text;
+using Confluent.Kafka;
+using KafkaFlow;
+using KafkaFlow.Producers;
+using Microsoft.Extensions.DependencyInjection;
+using AutoOffsetReset = KafkaFlow.AutoOffsetReset;
+
+var services = new ServiceCollection();
+
+const string producerName = "RandomProducer";
+const string topicPrefix = "random-topic-";
+
+services.AddKafka(
+ kafka => kafka
+ .UseConsoleLog()
+ .AddCluster(
+ cluster => cluster
+ .WithBrokers(new[] { "localhost:9092" })
+ .AddProducer(
+ producerName, _ => { })
+ .AddConsumer(
+ consumer => consumer
+ .Topic($"^{topicPrefix}*") // Any topic starting with `random-topic-*`
+ .WithGroupId("random-topic-handler")
+ .WithBufferSize(5)
+ .WithWorkersCount(3)
+ .WithAutoOffsetReset(AutoOffsetReset.Earliest)
+ .WithConsumerConfig(new ConsumerConfig()
+ {
+ TopicMetadataRefreshIntervalMs = 5000 // discover new topics every 5 seconds
+ })
+ .AddMiddlewares(
+ middlewares => middlewares
+ .Add()
+ )
+ )
+ )
+);
+
+var provider = services.BuildServiceProvider();
+
+var bus = provider.CreateKafkaBus();
+
+await bus.StartAsync();
+
+var producer = provider
+ .GetRequiredService()
+ .GetProducer(producerName);
+
+while (true)
+{
+ Console.WriteLine("Type the name of a topic to send a message or 'exit' to quit:");
+
+ var input = Console.ReadLine();
+
+ if (input is null)
+ continue;
+
+ if (input.Equals("exit", StringComparison.OrdinalIgnoreCase))
+ {
+ await bus.StopAsync();
+ break;
+ }
+
+ await producer.ProduceAsync(
+ $"{topicPrefix}{input}",
+ Guid.NewGuid().ToString(),
+ Encoding.UTF8.GetBytes(
+ $"Message to {input}: {Guid.NewGuid()}"));
+}
\ No newline at end of file
diff --git a/src/KafkaFlow.Abstractions/Configuration/IConsumerConfigurationBuilder.cs b/src/KafkaFlow.Abstractions/Configuration/IConsumerConfigurationBuilder.cs
index 0e85e8654..00c04e422 100644
--- a/src/KafkaFlow.Abstractions/Configuration/IConsumerConfigurationBuilder.cs
+++ b/src/KafkaFlow.Abstractions/Configuration/IConsumerConfigurationBuilder.cs
@@ -14,7 +14,8 @@ public interface IConsumerConfigurationBuilder
IDependencyConfigurator DependencyConfigurator { get; }
///
- /// Sets the topic that will be used to read the messages, the partitions will be automatically assigned
+ /// Sets the topic that will be used to read the messages, the partitions will be automatically assigned.
+ /// librdkafka patterns are accepted.
///
/// Topic name
///
diff --git a/src/KafkaFlow.sln b/src/KafkaFlow.sln
index 0b4f56ea7..73f4d6c43 100644
--- a/src/KafkaFlow.sln
+++ b/src/KafkaFlow.sln
@@ -95,6 +95,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Telemetry", "Telemetry", "{
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "KafkaFlow.OpenTelemetry", "KafkaFlow.OpenTelemetry\KafkaFlow.OpenTelemetry.csproj", "{1557B135-4925-4FA2-80DA-8AD13155F3BD}"
EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "KafkaFlow.Sample.WildcardConsumer", "..\samples\KafkaFlow.Sample.WildcardConsumer\KafkaFlow.Sample.WildcardConsumer.csproj", "{E3A02BB4-6881-4568-B92F-CC0878BD8175}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -233,6 +235,10 @@ Global
{1557B135-4925-4FA2-80DA-8AD13155F3BD}.Debug|Any CPU.Build.0 = Debug|Any CPU
{1557B135-4925-4FA2-80DA-8AD13155F3BD}.Release|Any CPU.ActiveCfg = Release|Any CPU
{1557B135-4925-4FA2-80DA-8AD13155F3BD}.Release|Any CPU.Build.0 = Release|Any CPU
+ {E3A02BB4-6881-4568-B92F-CC0878BD8175}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {E3A02BB4-6881-4568-B92F-CC0878BD8175}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {E3A02BB4-6881-4568-B92F-CC0878BD8175}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {E3A02BB4-6881-4568-B92F-CC0878BD8175}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@@ -273,6 +279,7 @@ Global
{B4A9E7CE-7A37-411E-967E-D9B5FD1A3992} = {303AE78F-6C96-4DF4-AC89-5C4FD53AFF0B}
{4A16F519-FAF8-432C-AD0A-CC44F7BD392D} = {303AE78F-6C96-4DF4-AC89-5C4FD53AFF0B}
{1557B135-4925-4FA2-80DA-8AD13155F3BD} = {96F5D441-B8DE-4ABC-BEF2-F758D1B2BA39}
+ {E3A02BB4-6881-4568-B92F-CC0878BD8175} = {303AE78F-6C96-4DF4-AC89-5C4FD53AFF0B}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {6AE955B5-16B0-41CF-9F12-66D15B3DD1AB}
diff --git a/website/docs/getting-started/samples.md b/website/docs/getting-started/samples.md
index aba90fd4a..fa5d87b5a 100644
--- a/website/docs/getting-started/samples.md
+++ b/website/docs/getting-started/samples.md
@@ -53,3 +53,9 @@ You can find the code here: [/samples/KafkaFlow.Sample.PauseConsumerOnError](htt
This is a sample that shows how to throttle a consumer based on others consumers lag
You can find the code here: [/samples/KafkaFlow.Sample.ConsumerThrottling](https://github.com/Farfetch/kafkaflow/tree/master/samples/KafkaFlow.Sample.ConsumerThrottling)
+
+## Wildcard Consumers
+
+This sample shows how to use a consumer to handle all the topics according to a naming convention. This is not a feature of KafkaFlow, but a demonstration of how to use the pattern conventions exposed by [librdkafka](https://github.com/confluentinc/librdkafka/tree/95a542c87c61d2c45b445f91c73dd5442eb04f3c) ([here](https://github.com/confluentinc/librdkafka/blob/95a542c87c61d2c45b445f91c73dd5442eb04f3c/src-cpp/rdkafkacpp.h#L2681)).
+
+You can find the code here: [/samples/KafkaFlow.Sample.WildcardConsumer](https://github.com/Farfetch/kafkaflow/tree/master/samples/KafkaFlow.Sample.WildcardConsumer)
diff --git a/website/docs/guides/consumers/add-consumers.md b/website/docs/guides/consumers/add-consumers.md
index 0d2ae78bd..50f1aadf1 100644
--- a/website/docs/guides/consumers/add-consumers.md
+++ b/website/docs/guides/consumers/add-consumers.md
@@ -31,6 +31,11 @@ services.AddKafka(kafka => kafka
On a Consumer, you can configure the Middlewares that will be invoked. You can find more information on how to configure Middlewares [here](../middlewares).
+:::tip
+You can use a naming pattern such as a wildcard to connect to any topic that matches a naming convention.
+You can find a sample on [here](https://github.com/Farfetch/kafkaflow/tree/master/samples/KafkaFlow.Sample.WildcardConsumer).
+:::
+
## Automatic Partitions Assignment
Using the `Topic()` or `Topics()` methods, the consumer will trigger the automatic partition assignment that will distribute the topic partitions across the application instances.