Skip to content

Commit

Permalink
Add delay to topic creation, metadata doesn't propagate synchronously.
Browse files Browse the repository at this point in the history
  • Loading branch information
niemyjski committed Jun 17, 2022
1 parent 2912b46 commit 577fdb3
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 10 deletions.
1 change: 1 addition & 0 deletions src/Foundatio.Kafka/Messaging/KafkaMessageBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ private async Task EnsureTopicCreatedAsync() {
topicSpecification.ReplicationFactor = _options.TopicReplicationFactor.Value;

await adminClient.CreateTopicsAsync(new[] { topicSpecification }).AnyContext();
await Task.Delay(TimeSpan.FromSeconds(2));
_topicCreated = true;

if (_logger.IsEnabled(LogLevel.Trace))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using System;
using System;
using System.Threading.Tasks;
using Foundatio.Messaging;
using Foundatio.Tests.Messaging;
Expand All @@ -25,12 +25,12 @@ protected override IMessageBus GetMessageBus(Func<SharedMessageBusOptions, Share
);
}

[Fact(Skip = "https://github.com/confluentinc/confluent-kafka-dotnet/issues/1832")]
[Fact]
public override Task CanReceiveFromMultipleSubscribersAsync() {
return base.CanReceiveFromMultipleSubscribersAsync();
}

[Fact(Skip = "https://github.com/confluentinc/confluent-kafka-dotnet/issues/1832")]
[Fact]
public override Task CanReceiveMessagesConcurrentlyAsync() {
return base.CanReceiveMessagesConcurrentlyAsync();
}
Expand Down
14 changes: 7 additions & 7 deletions tests/Foundatio.Kafka.Tests/Messaging/KafkaMessageBusTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public override Task CanUseMessageOptionsAsync() {
return base.CanUseMessageOptionsAsync();
}

[Fact(Skip = "https://github.com/confluentinc/confluent-kafka-dotnet/issues/1832")]
[Fact]
public override Task CanSendMessageAsync() {
return base.CanSendMessageAsync();
}
Expand All @@ -29,7 +29,7 @@ public override Task CanHandleNullMessageAsync() {
return base.CanHandleNullMessageAsync();
}

[Fact(Skip = "https://github.com/confluentinc/confluent-kafka-dotnet/issues/1832")]
[Fact]
public override Task CanSendDerivedMessageAsync() {
return base.CanSendDerivedMessageAsync();
}
Expand All @@ -39,12 +39,12 @@ public override Task CanSendDelayedMessageAsync() {
return base.CanSendDelayedMessageAsync();
}

[Fact(Skip = "https://github.com/confluentinc/confluent-kafka-dotnet/issues/1832")]
[Fact]
public override Task WillReceiveDerivedMessageTypesAsync() {
return base.WillReceiveDerivedMessageTypesAsync();
}

[Fact(Skip = "https://github.com/confluentinc/confluent-kafka-dotnet/issues/1832")]
[Fact]
public override Task CanSubscribeToAllMessageTypesAsync() {
return base.CanSubscribeToAllMessageTypesAsync();
}
Expand All @@ -54,7 +54,7 @@ public override Task CanSubscribeToRawMessagesAsync() {
return base.CanSubscribeToRawMessagesAsync();
}

[Fact(Skip = "https://github.com/confluentinc/confluent-kafka-dotnet/issues/1832")]
[Fact]
public override Task CanTolerateSubscriberFailureAsync() {
return base.CanTolerateSubscriberFailureAsync();
}
Expand All @@ -64,12 +64,12 @@ public override Task CanSendMessageToMultipleSubscribersAsync() {
return base.CanSendMessageToMultipleSubscribersAsync();
}

[Fact(Skip = "https://github.com/confluentinc/confluent-kafka-dotnet/issues/1832")]
[Fact]
public override Task CanSubscribeConcurrentlyAsync() {
return base.CanSubscribeConcurrentlyAsync();
}

[Fact(Skip = "https://github.com/confluentinc/confluent-kafka-dotnet/issues/1832")]
[Fact]
public override Task WillOnlyReceiveSubscribedMessageTypeAsync() {
return base.WillOnlyReceiveSubscribedMessageTypeAsync();
}
Expand Down

0 comments on commit 577fdb3

Please sign in to comment.