Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Long-lived AMQP connection #107

Open
SwooshyCueb opened this issue Sep 26, 2022 · 20 comments
Open

Long-lived AMQP connection #107

SwooshyCueb opened this issue Sep 26, 2022 · 20 comments

Comments

@SwooshyCueb
Copy link
Member

At present, we make a new connection for every AMQP message we send. This is not ideal.

From the RabbitMQ documentation (emphasis theirs):

Before an application can use RabbitMQ, it has to open a connection to a RabbitMQ node. The connection then will be used to perform all subsequent operations. Connections are meant to be long-lived. Opening a connection for every operation (e.g. publishing a message) would be very inefficient and is highly discouraged.

This comes from AMQP 0-9-1-centric and .NET-centric documentation, but it still applies for us.

@trel
Copy link
Member

trel commented Sep 26, 2022

Very interesting ... and kind of obvious now that it's linked :)

Suggests the server has to hold a connection longer-lived than an individual Agent.

Hmm.....

@SwooshyCueb
Copy link
Member Author

I've been pondering this for a bit, actually. Was something I thought about working on as part of #105, but there's already so much stuff in there that I think I'd rather get a bow on it and do further refactoring of our AMQP/Proton usage as part of a separate effort.

@korydraughn
Copy link
Contributor

I agree. Handling that in a separate PR feels like the right thing to do.

@korydraughn
Copy link
Contributor

Potential Solution

Instead of sending one AMQP message per RabbitMQ connection, we can store the messages and send them at a later time. This enables the iRODS server to batch messages. This also means the server is less likely to lose messages if they were written down successfully.

Plugin Configuration

The audit plugin would grow two new configuration options:

  • shared_memory_size_in_bytes: The size of the shared memory buffer used to hold audit information
    • This would be allocated on plugin start and shared between all agents
    • Does it make sense to abstract the storage space used to hold audit information?
  • flush_interval_in_seconds: The amount of time before an agent attempts to send the messages stored in the shared memory buffer

High-Level Algorithm

The audit plugin would be changed to do the following:

  1. Attempt to write the audit information into the pre-allocated shared memory buffer.
  2. If inserting the audit information exceeds the shared_memory_size_in_bytes, flush the messages.
  3. If the message was inserted without error, check the flush_interval_in_seconds.
  4. If the flush_interval_in_seconds has been satisfied, flush the messages.
  5. If the flush_interval_in_seconds has not been satisfied, do nothing.

Flush the messages means ... opening and using a single RabbitMQ connection to send all messages stored in shared memory. Only one agent is allowed to do this.

This solution assumes access to the shared memory is synchronized across agents running on the same machine. This solution does not require zone-wide synchronization.

@alanking
Copy link
Contributor

alanking commented Sep 27, 2022

Would the messages also flush when the plugin's stop operation is invoked? Or is this being managed outside of the agents? "The server" terminology is tripping me up, I think.

@korydraughn
Copy link
Contributor

Would the messages also flush when the plugin's stop operation is invoked?

That's a possibility. We'd need to investigate and see what the pros/cons are around the stop operation.

Or is this being managed outside of the agents?

It is all happening inside the agent. The good thing about this design is that it's flexible. The audit plugin could simply write things down and let another tool handle sending the messages. The design presented just makes it so that no other tool is necessary.

@trel
Copy link
Member

trel commented Sep 27, 2022

Have to make sure we write down the new messages in the full buffer case... so...

flush_messages {
    send_all_messages_in_single_connection()
    set_last_flush_time()
}

# check the flush_interval_in_seconds
if enough seconds have passed:
    flush_messages()

# save new message
message_saved = 0
do while message_saved != 1:
    if !(save_message_to_buffer):
        flush_messages()

@alanking
Copy link
Contributor

Oh, I see. The flushing is actually managed in the shared memory in addition to the messages to flush. Mental model was missing some screws, as per usual.

@SwooshyCueb
Copy link
Member Author

I am personally not a fan of batching messages in this way. To me it feels like a janky, overcomplicated workaround for our ownership model.
Instead, I propose we have a single connection per agent, opened in start(), reused in exec_rule(), and closed in stop().

@trel
Copy link
Member

trel commented Oct 4, 2022

That seems reasonable and good.

And we'll still see a performance gain since an Agent can send hundreds of messages if all PEPs are configured to fire.

Will be fun to measure and compare.

@wijnanjo
Copy link

We adapted the irods plugin to send messages to kafka instead of over AMQP. Now we see a pretty high rate of new connections to kafka (coming from the plugin) which decreases the performance of our kafka broker.

Is it safe to create a singleton kafka client in the plugin (so a single client per irods process instead of per agent)? And if yes, how exactly? We're lacking C++ experience, so any help is very welcome.

Using a long-lived kafka client is a best practice, just like AMQP connections and the kafka client already takes care of message housekeeping like buffering and flushing.

@korydraughn
Copy link
Contributor

If I'm understanding correctly, you're asking if a single kafka client can be per iRODS server rather than per agent. Does that sound correct?

If yes, then iRODS doesn't provide any mechanism for doing that, yet. There are ways around that limitation though. Without going into detail, here is one way to deal with it.

  • Add more indirection: iRODS agent -> proxy -> kafka server
    • Add a proxy server that maintains a kafka client. Perhaps written in python.
    • Modify the audit plugin to push messages to the proxy server.
    • Proxy server just accepts/sends messages to Kafka server.
  • Or, implement the solution at Long-lived AMQP connection #107 (comment).
    • While each agent would still get its own kafka client, performance should improve due to the agents reusing their existing kafka client.

With that said, both of those solutions could prove challenging depending on your C++ experience.

@wijnanjo
Copy link

If I'm understanding correctly, you're asking if a single kafka client can be per iRODS server rather than per agent. Does that sound correct?

That's correct and pretty important since the irods server spawns a lot of agents (mainly due to Globus transfers creating a connection for each file to be transferred, if I'm not mistaken). We already implemented your second suggestion (that is created kafka client in plugin.start and destoy in plugin.stop). But that only partly solves our issue.

We'll go for a proxy solution.

@trel
Copy link
Member

trel commented Jun 20, 2024

Hi @wijnanjo, would love to know where you are doing this work. Please consider making contact at [email protected] if mentioning here is not desired. Thanks.

@trel
Copy link
Member

trel commented Jun 26, 2024

new 5.0 process model

  • main irodsServer only has children with jobs, one would be a buffer/proxy for this purpose
  • would launch a long-running proxy process
    • holds series of messages - perhaps message-type agnostic?
    • IPC between agent and long-running process - so no copying when it's time to 'send'
    • kafka can't just be a series of bytes - has to deserialize the json to see the key/pid-host-zone 'group' information for grouping AND ordering
      • suggests different flavors of 'proxy' based on message technology / protocol
  • would need 5.0 first

or

a completely separate binary/service (k8s?)


very interested in any performance / throughput numbers with today's various solutions

@korydraughn
Copy link
Contributor

@wijnanjo Any updates on the proxy solution?

@wijnanjo
Copy link

Hi @korydraughn, we haven't changed our current solution, apart from some configuration tweaks. Just last week I noticed in our logs that we are loosing a significant number of kafka auditmessages and this typically happens in auditplugin.stop (timeouts when we flush and close the kafka producer). The root cause is yet unknown (maybe too much load on kafka or the network ?). Those errors occur only now and then but it adds up over time. So, a solution as outlined above by @trel is still urgent.
And so I'm trying the following:

  • define an API in protobuf to export auditrecords. I call this the collector-API (I stole the idea from OpenTelemetry project where they basically solved a similar issue)
  • run a gRPC server that implements this 'collector API'. In my case this collector will export the auditrecords to kafka. The official kafkaclient does batching and retries out-of-the box and now it'll be a long living instance, thus we don't have to call that dreaded 'flush' call anymore.
  • the auditplugin becomes a gRPC client, sending records to the collector.

This designs offers flexibility:

  • gRPC is fast and can use tcp or unix domain sockets
  • we can deploy the collector server as a separate (loadbalanced) service/container (gRPC over tcp) or as a single long running irods process (in 5.0 and gRPC over a socket).
  • users can create their own collector in their language of choice and send the data to whatever they like.
  • a minimal solution could even work without a gRPC server. In this case the auditplugin would call a 'local' collector: simply a class that implements the collector API and directly sends over AMQP, kinda like the current solution.

Downside is (slightly) increased complexity and less resilience (another component that may fail). A specific concern for us (related to ordering gurantee of audit messages in kafka) is that we must send all messages from a particular irods process to the same kafka producer (ie collector) as this preservers ordering of messages in kafka.

I can do the gRPC server in Go but I might ask your help for the C++ part. We'll see how it works out.

@trel
Copy link
Member

trel commented Oct 21, 2024

So we're leaning towards a 5.0 server with a long-running child acting as proxy-broker / passthrough / collector. ( #107 (comment) ). Thank you @wijnanjo for thinking through the scenarios.

We need to consider local memory usage when the 'real' broker is not online / connected. Have to decide whether to drop any outgoing messages that do not fit in the buffer on the ground... or provide some kind of backlog and persistence (to disk?)... and then what happens on restart of the iRODS server and/or the proxy process? If there is a memory buffer, and the proxy crashes... those buffered in-memory messages will be lost. What guarantees are we claiming to meet?

The current setup requires an admin to immediately fix the broker and there is no buffer to manage. No messages go out because having a live connection is a prerequisite of the iRODS server doing any work. This was a design decision early on so that managing / losing / holding messages could not be the responsibility / fault of the iRODS server and its AMQP plugin.

Going in this new direction would shift that responsibility to the iRODS server and its new proxy collector / service.

I think that's okay and good - just want to make sure we're enumerating the differences and the impacts on expectations.

@wijnanjo
Copy link

No messages go out because having a live connection is a prerequisite of the iRODS server doing any work

That's a valid concern. A simple collector could still send the payload immediately to the broker and ack on each request; that would give the same behavior as the current solution.

Our "kafka flavour" of the collector would initially not provide these guarantees (instead it's expected to work very performant). But we could enhance it by persisting every message that is 'on its way to kafka' (=either in a memory queue of the kafkaclient either in flight) to disk, then ack'ing, and later on removing the message from disk when we have an ack from kafka. Or in case of broker is down, recover the messages from disk and retry.

@korydraughn
Copy link
Contributor

I can do the gRPC server in Go but I might ask your help for the C++ part. We'll see how it works out.

@wijnanjo Sounds good. Happy to assist with the C++ part.

What I'm about to say is similar to what's been said already. Consider the following:

The audit plugin can write generic messages to an abstract storage device. The abstract storage device could represent a memory buffer, a disk, a database, or anything else you can think of. The important thing to keep in mind is:

  • The audit plugin doesn't need to know about kafka or amqp
  • The audit plugin just writes the messages down as fast as possible
    • No network required (unless the abstract storage device uses one internally)

From there, a process hanging off of the iRODS server or an external application could pull messages from the abstract storage device, convert the messages to the target format, and send them to their final destination (e.g. kafka).

We'd include an abstract storage device implementation and binary for AMQP with the audit plugin. The two components would serve as the reference for other implementations.

Questions for @wijnanjo:

  • How frequently is the audit information accessed?
  • When accessing the audit information, how up-to-date should it be?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

No branches or pull requests

5 participants