-
-
Notifications
You must be signed in to change notification settings - Fork 22
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Jetstream broker limitations #186
Comments
Hello @nuttert ! I'll try to answer as much as I understand everything, but feel free to correct me if there is something that I might have misunderstood:
So if I'm correct, I would create 2 issues:
Please let me know I got everything right and I would be happy to know what you think for every point, and change things accordingly. Thanks |
Thanks for the answer @lerenn! Here is Subscribe: // Subscribe to messages from the broker.
func (c *Controller) Subscribe(ctx context.Context, channel string) (extensions.BrokerChannelSubscription, error) {
// Create a new subscription
sub := extensions.NewBrokerChannelSubscription(
make(chan extensions.AcknowledgeableBrokerMessage, brokers.BrokerMessagesQueueSize),
make(chan any, 1),
)
config := &nats.StreamConfig{
Name: randString(),
Subjects: []string{channel},
NoAck: true,
}
c.jetStream.AddStream(config)
c.jetStream.Subscribe(channel, func(msg *nats.Msg) {
c.logger.Info(ctx, fmt.Sprintf("Received message for %s", channel), extensions.LogInfo{
Key: "message",
Value: *msg,
})
c.HandleMessage(ctx, msg, sub)
}, nats.DeliverAll())
return sub, nil
} And here is Publish: // Publish a message to the broker.
func (c *Controller) Publish(ctx context.Context, channel string, bm extensions.BrokerMessage) error {
msg := nats.NewMsg(channel)
// Set message headers and content
for k, v := range bm.Headers {
msg.Header.Set(k, string(v))
}
msg.Data = bm.Payload
// Publish message
if _, err := c.jetStream.PublishMsgAsync(msg); err != nil {
return err
}
return nil
} I used "legacy jetstream" and implemented what I meant above. I guess the advantages of the implementation in that:
But it uses the legacy jetstreams) |
Hey 👋 Interesting issue. Lemme add some thoughts on this:
From my point of view each Stream comes with an own AsyncAPI definition and therefore also with an own code generation.
The legacy API is much powerful than the new API (e.g. subscriptions to one channel). But I don’t think it’s a good idea to implement this with the old API. There might be real world examples where ack’ing messages where the service is not interested in will lead to some issues, but I think with the new acking of messages in this the warning log could easily be circumvented? What I’ve seen so far acking non interesting messages is kind of a pattern.
This information could maybe also derived from the AsyncAPI. Next to this I would like to mention that’s also a good pattern not to provisioning the NATS/JetStream on every service/pod start (consumer, streams, …). |
Hey! I tried to use Jetstreams, it’s cool that it supports playback and acks, but I found some limitations from the natsjetstream wrapper.
But sometimes it's very convenient to use wildcards (and native NATS jetsteams support this) like "ORDERS.*" and I expect to subscribe to all such streams, not just certain ones, this works fine in the broker without jetstreams because you don't save it on the map:
Since this defeats the purpose of code generation, you should specify the channel names in the code manually and not just in the yaml schema.
I expect the Jetstream client to perform similarly to the classic client, with at least two advantages:
And also, what if it sends a lot of traffic, different services use different channels, etc., and as far as I understand the current logic, all clients will receive data from all channels, so it will load networks with channels that are not used(and make a warning log)
In the official example everything is quite clear:
But in the current wrapper the meaning of the names is unclear - streams, channel, subject, topic, etc. and I assume that there is a different logic there, not like in the example above. But correct me if I'm wrong.
The text was updated successfully, but these errors were encountered: