Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Jetstream broker limitations #186

Open
nuttert opened this issue Apr 5, 2024 · 3 comments
Open

Jetstream broker limitations #186

nuttert opened this issue Apr 5, 2024 · 3 comments
Labels
enhancement New feature or request

Comments

@nuttert
Copy link

nuttert commented Apr 5, 2024

Hey! I tried to use Jetstreams, it’s cool that it supports playback and acks, but I found some limitations from the natsjetstream wrapper.

  1. As I can see, you put the channel as a map key:
image

But sometimes it's very convenient to use wildcards (and native NATS jetsteams support this) like "ORDERS.*" and I expect to subscribe to all such streams, not just certain ones, this works fine in the broker without jetstreams because you don't save it on the map:
image

  1. It is quite inconvenient to specify all the subjects when creating a jetstream broker:
natsjetstream.WithStreamConfig(jetstream.StreamConfig{
	Name: "pingv3",
	Subjects: []string{
		"ping.v3", "pong.v3",
	},
}), // Create the stream "ping"

Since this defeats the purpose of code generation, you should specify the channel names in the code manually and not just in the yaml schema.

I expect the Jetstream client to perform similarly to the classic client, with at least two advantages:

  • it supports acks
  • it supports replying to some channels/streams

And also, what if it sends a lot of traffic, different services use different channels, etc., and as far as I understand the current logic, all clients will receive data from all channels, so it will load networks with channels that are not used(and make a warning log)
image

In the official example everything is quite clear:

image

But in the current wrapper the meaning of the names is unclear - streams, channel, subject, topic, etc. and I assume that there is a different logic there, not like in the example above. But correct me if I'm wrong.

@lerenn
Copy link
Owner

lerenn commented Apr 7, 2024

Hello @nuttert !

I'll try to answer as much as I understand everything, but feel free to correct me if there is something that I might have misunderstood:

  1. For this, I would assume that, even with the *, it would work. I would assume that specifying a channel, with * at the end in the address field, in an AsyncAPI document would work. But if you're pointing it to me, it means that it doesn't work. I'll try it out, but if you have more info one whats happening on Jetstream case, I would be glad to have it :)

  2. Okay, so there is 3 topics here, I'll try to pick them one by one:

    2.1. Subjects creation: I do agree that it is not ideal to manually specify each subject, as there is enough information in the asyncapi document to get the information from. To be honest, I was planning on this, but didn't take the time to implement it. If that's good for you, I'll make an issue with some solution I can think of.

    2.2. One consumer per controller: the logic behind it is that we have one connection for the controller, and avoid mutliplying connection, which I suppose, was a good use for @stefanmeschke who implemented it. However, that's right that the message shouldn't be acked by default. So I would add 2 new changes: specify either we want a consumer per subscription (and not listening to everything) or one consumer for all subscription (and listening to everything) and specify what we want to do with ones from unsubscribed subjects.

    2.3. Naming: Sorry about that, it's difficult to make a clear implementation of something as specific as a broker with something as agnostic as AsyncAPI.
    - Streams and subjects are specific to NATS Jetstream and both of them assembled are the same as the channel mentionned in AsyncAPI.
    - Topic is specific to Kafka and is the same as the channel mentionnend in AsyncAPI.
    Regarding the NATS Jetstream implementation, that's why the ConsumeMessage function correspond to a consumer from the NATS Jetstream schema you linked and new messages are sent to channelsthat are the AsyncAPI concept used by the codegen. Hope that's clearer, but if you have specific code extract that is obscure, feel free to point it out: I'll try to explain the names or change them if they need to be.

So if I'm correct, I would create 2 issues:

  • One for automatic subject creation in Nats Jetstream
  • One for 2 modes in Jetstream: single consumer or multiple consumers

Please let me know I got everything right and I would be happy to know what you think for every point, and change things accordingly. Thanks ☺️

@lerenn lerenn added the enhancement New feature or request label Apr 7, 2024
@nuttert
Copy link
Author

nuttert commented Apr 8, 2024

Thanks for the answer @lerenn!
I think you're right, but may be we have some misunderstanding about the point with streams. Let's me explain in the following code.

Here is Subscribe:

// Subscribe to messages from the broker.
func (c *Controller) Subscribe(ctx context.Context, channel string) (extensions.BrokerChannelSubscription, error) {
	// Create a new subscription
	sub := extensions.NewBrokerChannelSubscription(
		make(chan extensions.AcknowledgeableBrokerMessage, brokers.BrokerMessagesQueueSize),
		make(chan any, 1),
	)

	config := &nats.StreamConfig{
		Name:     randString(),
		Subjects: []string{channel},
		NoAck:    true,
	}
	c.jetStream.AddStream(config)

	c.jetStream.Subscribe(channel, func(msg *nats.Msg) {
		c.logger.Info(ctx, fmt.Sprintf("Received message for %s", channel), extensions.LogInfo{
			Key:   "message",
			Value: *msg,
		})
		c.HandleMessage(ctx, msg, sub)
	}, nats.DeliverAll())

	return sub, nil
}

And here is Publish:

// Publish a message to the broker.
func (c *Controller) Publish(ctx context.Context, channel string, bm extensions.BrokerMessage) error {
	msg := nats.NewMsg(channel)

	// Set message headers and content
	for k, v := range bm.Headers {
		msg.Header.Set(k, string(v))
	}
	msg.Data = bm.Payload

	// Publish message
	if _, err := c.jetStream.PublishMsgAsync(msg); err != nil {
		return err
	}

	return nil
}

I used "legacy jetstream" and implemented what I meant above. I guess the advantages of the implementation in that:

  • All streams are independent (c.jetStream.AddStream(config) on each subscribe)
  • I don't need to specify stream names when creating the broker client
  • It supports replaying (nats.DeliverAll() that could be controlled using some custom option in yaml e.g. x-replay : true)

But it uses the legacy jetstreams)

@stefanmeschke
Copy link
Contributor

Hey 👋

Interesting issue. Lemme add some thoughts on this:

All streams are independent (c.jetStream.AddStream(config) on each subscribe)

From my point of view each Stream comes with an own AsyncAPI definition and therefore also with an own code generation.

I used "legacy jetstream" and implemented what I meant above.

The legacy API is much powerful than the new API (e.g. subscriptions to one channel). But I don’t think it’s a good idea to implement this with the old API. There might be real world examples where ack’ing messages where the service is not interested in will lead to some issues, but I think with the new acking of messages in this the warning log could easily be circumvented? What I’ve seen so far acking non interesting messages is kind of a pattern.

I don't need to specify stream names when creating the broker client

This information could maybe also derived from the AsyncAPI. Next to this I would like to mention that’s also a good pattern not to provisioning the NATS/JetStream on every service/pod start (consumer, streams, …).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

3 participants