Skip to content

Commit

Permalink
fix: kakfa topic creation
Browse files Browse the repository at this point in the history
  • Loading branch information
lerenn committed Dec 21, 2023
1 parent 9c60bdf commit e18d9cb
Showing 1 changed file with 51 additions and 6 deletions.
57 changes: 51 additions & 6 deletions pkg/extensions/brokers/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"errors"
"fmt"
"io"
"time"

"github.com/lerenn/asyncapi-codegen/pkg/extensions"
"github.com/lerenn/asyncapi-codegen/pkg/extensions/brokers"
Expand Down Expand Up @@ -87,9 +86,6 @@ func (c *Controller) Publish(ctx context.Context, channel string, um extensions.
Balancer: &kafka.LeastBytes{},
})

// Allow topic creation
w.AllowAutoTopicCreation = true

// Create the message
msg := kafka.Message{
Headers: make([]kafka.Header, 0),
Expand All @@ -113,8 +109,11 @@ func (c *Controller) Publish(ctx context.Context, channel string, um extensions.
// Create topic if not exists, then it means that the topic is being
// created, so let's retry
if errors.Is(err, kafka.UnknownTopicOrPartition) {
c.logger.Warning(ctx, fmt.Sprintf("Topic %s does not exists: waiting for creation and retry", channel))
time.Sleep(time.Second)
c.logger.Warning(ctx, fmt.Sprintf("Topic %s does not exists: request creation and retry", channel))
if err := c.checkTopicExistOrCreateIt(ctx, channel); err != nil {
return err
}

continue
}

Expand All @@ -125,6 +124,12 @@ func (c *Controller) Publish(ctx context.Context, channel string, um extensions.

// Subscribe to messages from the broker.
func (c *Controller) Subscribe(ctx context.Context, channel string) (extensions.BrokerChannelSubscription, error) {
// Check that topic exists before
if err := c.checkTopicExistOrCreateIt(ctx, channel); err != nil {
return extensions.BrokerChannelSubscription{}, err
}

// Create reader
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: c.hosts,
Topic: channel,
Expand Down Expand Up @@ -177,3 +182,43 @@ func (c *Controller) messagesHandler(ctx context.Context, r *kafka.Reader, sub e
})
}
}

func (c Controller) checkTopicExistOrCreateIt(ctx context.Context, topic string) error {
// Get connection to first host
conn, err := kafka.Dial("tcp", c.hosts[0])
if err != nil {
return err
}
defer conn.Close()

for i := 0; ; i++ {
// Create topic
topicConfigs := []kafka.TopicConfig{{
Topic: topic,
NumPartitions: 1,
ReplicationFactor: 1,
}}
err = conn.CreateTopics(topicConfigs...)
if err != nil {
return err
}

// Read partitions
partitions, err := conn.ReadPartitions()
if err != nil {
return err
}

// Get topic from partitions
for _, p := range partitions {
if topic == p.Topic {
if i > 0 {
c.logger.Warning(ctx, fmt.Sprintf("Topic %s has been created.", topic))
}
return nil
}
}

c.logger.Warning(ctx, fmt.Sprintf("Topic %s doesn't exists yet, retrying (#%d)", topic, i))
}
}

0 comments on commit e18d9cb

Please sign in to comment.