Skip to content

Commit

Permalink
docs: sample using wildcard for a topic name (#472)
Browse files Browse the repository at this point in the history
  • Loading branch information
Gui Ferreira authored Nov 27, 2023
1 parent 9c57625 commit ec1a7e9
Show file tree
Hide file tree
Showing 7 changed files with 135 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net6.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\..\src\KafkaFlow.Admin\KafkaFlow.Admin.csproj" />
<ProjectReference Include="..\..\src\KafkaFlow.Compressor.Gzip\KafkaFlow.Compressor.Gzip.csproj" />
<ProjectReference Include="..\..\src\KafkaFlow.Compressor\KafkaFlow.Compressor.csproj" />
<ProjectReference Include="..\..\src\KafkaFlow.LogHandler.Console\KafkaFlow.LogHandler.Console.csproj" />
<ProjectReference Include="..\..\src\KafkaFlow.Microsoft.DependencyInjection\KafkaFlow.Microsoft.DependencyInjection.csproj" />
<ProjectReference Include="..\..\src\KafkaFlow.Serializer.JsonCore\KafkaFlow.Serializer.JsonCore.csproj" />
<ProjectReference Include="..\..\src\KafkaFlow.Serializer.ProtobufNet\KafkaFlow.Serializer.ProtobufNet.csproj" />
<ProjectReference Include="..\..\src\KafkaFlow.Serializer\KafkaFlow.Serializer.csproj" />
<ProjectReference Include="..\..\src\KafkaFlow.TypedHandler\KafkaFlow.TypedHandler.csproj" />
<ProjectReference Include="..\..\src\KafkaFlow\KafkaFlow.csproj" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="3.1.17" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
using System.Text;
using KafkaFlow;

public class PrintConsoleMiddleware : IMessageMiddleware
{
public Task Invoke(IMessageContext context, MiddlewareDelegate next)
{
Console.WriteLine(
"Topic: {0} | Partition: {1} | Offset: {2} | Message: {3}",
context.ConsumerContext.Topic,
context.ConsumerContext.Partition,
context.ConsumerContext.Offset,
Encoding.UTF8.GetString(
(byte[])context.Message.Value));

return next(context);
}
}
70 changes: 70 additions & 0 deletions samples/KafkaFlow.Sample.WildcardConsumer/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
using System.Text;
using Confluent.Kafka;
using KafkaFlow;
using KafkaFlow.Producers;
using Microsoft.Extensions.DependencyInjection;
using AutoOffsetReset = KafkaFlow.AutoOffsetReset;

var services = new ServiceCollection();

const string producerName = "RandomProducer";
const string topicPrefix = "random-topic-";

services.AddKafka(
kafka => kafka
.UseConsoleLog()
.AddCluster(
cluster => cluster
.WithBrokers(new[] { "localhost:9092" })
.AddProducer(
producerName, _ => { })
.AddConsumer(
consumer => consumer
.Topic($"^{topicPrefix}*") // Any topic starting with `random-topic-*`
.WithGroupId("random-topic-handler")
.WithBufferSize(5)
.WithWorkersCount(3)
.WithAutoOffsetReset(AutoOffsetReset.Earliest)
.WithConsumerConfig(new ConsumerConfig()
{
TopicMetadataRefreshIntervalMs = 5000 // discover new topics every 5 seconds
})
.AddMiddlewares(
middlewares => middlewares
.Add<PrintConsoleMiddleware>()
)
)
)
);

var provider = services.BuildServiceProvider();

var bus = provider.CreateKafkaBus();

await bus.StartAsync();

var producer = provider
.GetRequiredService<IProducerAccessor>()
.GetProducer(producerName);

while (true)
{
Console.WriteLine("Type the name of a topic to send a message or 'exit' to quit:");

var input = Console.ReadLine();

if (input is null)
continue;

if (input.Equals("exit", StringComparison.OrdinalIgnoreCase))
{
await bus.StopAsync();
break;
}

await producer.ProduceAsync(
$"{topicPrefix}{input}",
Guid.NewGuid().ToString(),
Encoding.UTF8.GetBytes(
$"Message to {input}: {Guid.NewGuid()}"));
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ public interface IConsumerConfigurationBuilder
IDependencyConfigurator DependencyConfigurator { get; }

/// <summary>
/// Sets the topic that will be used to read the messages, the partitions will be automatically assigned
/// Sets the topic that will be used to read the messages, the partitions will be automatically assigned.
/// librdkafka patterns are accepted.
/// </summary>
/// <param name="topicName">Topic name</param>
/// <returns></returns>
Expand Down
7 changes: 7 additions & 0 deletions src/KafkaFlow.sln
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Telemetry", "Telemetry", "{
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "KafkaFlow.OpenTelemetry", "KafkaFlow.OpenTelemetry\KafkaFlow.OpenTelemetry.csproj", "{1557B135-4925-4FA2-80DA-8AD13155F3BD}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "KafkaFlow.Sample.WildcardConsumer", "..\samples\KafkaFlow.Sample.WildcardConsumer\KafkaFlow.Sample.WildcardConsumer.csproj", "{E3A02BB4-6881-4568-B92F-CC0878BD8175}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -233,6 +235,10 @@ Global
{1557B135-4925-4FA2-80DA-8AD13155F3BD}.Debug|Any CPU.Build.0 = Debug|Any CPU
{1557B135-4925-4FA2-80DA-8AD13155F3BD}.Release|Any CPU.ActiveCfg = Release|Any CPU
{1557B135-4925-4FA2-80DA-8AD13155F3BD}.Release|Any CPU.Build.0 = Release|Any CPU
{E3A02BB4-6881-4568-B92F-CC0878BD8175}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{E3A02BB4-6881-4568-B92F-CC0878BD8175}.Debug|Any CPU.Build.0 = Debug|Any CPU
{E3A02BB4-6881-4568-B92F-CC0878BD8175}.Release|Any CPU.ActiveCfg = Release|Any CPU
{E3A02BB4-6881-4568-B92F-CC0878BD8175}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -273,6 +279,7 @@ Global
{B4A9E7CE-7A37-411E-967E-D9B5FD1A3992} = {303AE78F-6C96-4DF4-AC89-5C4FD53AFF0B}
{4A16F519-FAF8-432C-AD0A-CC44F7BD392D} = {303AE78F-6C96-4DF4-AC89-5C4FD53AFF0B}
{1557B135-4925-4FA2-80DA-8AD13155F3BD} = {96F5D441-B8DE-4ABC-BEF2-F758D1B2BA39}
{E3A02BB4-6881-4568-B92F-CC0878BD8175} = {303AE78F-6C96-4DF4-AC89-5C4FD53AFF0B}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {6AE955B5-16B0-41CF-9F12-66D15B3DD1AB}
Expand Down
6 changes: 6 additions & 0 deletions website/docs/getting-started/samples.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,9 @@ You can find the code here: [/samples/KafkaFlow.Sample.PauseConsumerOnError](htt
This is a sample that shows how to throttle a consumer based on others consumers lag

You can find the code here: [/samples/KafkaFlow.Sample.ConsumerThrottling](https://github.com/Farfetch/kafkaflow/tree/master/samples/KafkaFlow.Sample.ConsumerThrottling)

## Wildcard Consumers

This sample shows how to use a consumer to handle all the topics according to a naming convention. This is not a feature of KafkaFlow, but a demonstration of how to use the pattern conventions exposed by [librdkafka](https://github.com/confluentinc/librdkafka/tree/95a542c87c61d2c45b445f91c73dd5442eb04f3c) ([here](https://github.com/confluentinc/librdkafka/blob/95a542c87c61d2c45b445f91c73dd5442eb04f3c/src-cpp/rdkafkacpp.h#L2681)).

You can find the code here: [/samples/KafkaFlow.Sample.WildcardConsumer](https://github.com/Farfetch/kafkaflow/tree/master/samples/KafkaFlow.Sample.WildcardConsumer)
5 changes: 5 additions & 0 deletions website/docs/guides/consumers/add-consumers.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ services.AddKafka(kafka => kafka

On a Consumer, you can configure the Middlewares that will be invoked. You can find more information on how to configure Middlewares [here](../middlewares).

:::tip
You can use a naming pattern such as a wildcard to connect to any topic that matches a naming convention.
You can find a sample on [here](https://github.com/Farfetch/kafkaflow/tree/master/samples/KafkaFlow.Sample.WildcardConsumer).
:::

## Automatic Partitions Assignment

Using the `Topic()` or `Topics()` methods, the consumer will trigger the automatic partition assignment that will distribute the topic partitions across the application instances.
Expand Down

0 comments on commit ec1a7e9

Please sign in to comment.