-
Notifications
You must be signed in to change notification settings - Fork 14
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Long-lived AMQP connection #107
Comments
Very interesting ... and kind of obvious now that it's linked :) Suggests the server has to hold a connection longer-lived than an individual Agent. Hmm..... |
I've been pondering this for a bit, actually. Was something I thought about working on as part of #105, but there's already so much stuff in there that I think I'd rather get a bow on it and do further refactoring of our AMQP/Proton usage as part of a separate effort. |
I agree. Handling that in a separate PR feels like the right thing to do. |
Potential SolutionInstead of sending one AMQP message per RabbitMQ connection, we can store the messages and send them at a later time. This enables the iRODS server to batch messages. This also means the server is less likely to lose messages if they were written down successfully. Plugin ConfigurationThe audit plugin would grow two new configuration options:
High-Level AlgorithmThe audit plugin would be changed to do the following:
Flush the messages means ... opening and using a single RabbitMQ connection to send all messages stored in shared memory. Only one agent is allowed to do this. This solution assumes access to the shared memory is synchronized across agents running on the same machine. This solution does not require zone-wide synchronization. |
Would the messages also flush when the plugin's |
That's a possibility. We'd need to investigate and see what the pros/cons are around the
It is all happening inside the agent. The good thing about this design is that it's flexible. The audit plugin could simply write things down and let another tool handle sending the messages. The design presented just makes it so that no other tool is necessary. |
Have to make sure we write down the new messages in the full buffer case... so...
|
Oh, I see. The flushing is actually managed in the shared memory in addition to the messages to flush. Mental model was missing some screws, as per usual. |
I am personally not a fan of batching messages in this way. To me it feels like a janky, overcomplicated workaround for our ownership model. |
That seems reasonable and good. And we'll still see a performance gain since an Agent can send hundreds of messages if all PEPs are configured to fire. Will be fun to measure and compare. |
We adapted the irods plugin to send messages to kafka instead of over AMQP. Now we see a pretty high rate of new connections to kafka (coming from the plugin) which decreases the performance of our kafka broker. Is it safe to create a singleton kafka client in the plugin (so a single client per irods process instead of per agent)? And if yes, how exactly? We're lacking C++ experience, so any help is very welcome. Using a long-lived kafka client is a best practice, just like AMQP connections and the kafka client already takes care of message housekeeping like buffering and flushing. |
If I'm understanding correctly, you're asking if a single kafka client can be per iRODS server rather than per agent. Does that sound correct? If yes, then iRODS doesn't provide any mechanism for doing that, yet. There are ways around that limitation though. Without going into detail, here is one way to deal with it.
With that said, both of those solutions could prove challenging depending on your C++ experience. |
That's correct and pretty important since the irods server spawns a lot of agents (mainly due to Globus transfers creating a connection for each file to be transferred, if I'm not mistaken). We already implemented your second suggestion (that is created kafka client in plugin.start and destoy in plugin.stop). But that only partly solves our issue. We'll go for a proxy solution. |
Hi @wijnanjo, would love to know where you are doing this work. Please consider making contact at [email protected] if mentioning here is not desired. Thanks. |
new 5.0 process model
or a completely separate binary/service (k8s?)
very interested in any performance / throughput numbers with today's various solutions |
@wijnanjo Any updates on the proxy solution? |
Hi @korydraughn, we haven't changed our current solution, apart from some configuration tweaks. Just last week I noticed in our logs that we are loosing a significant number of kafka auditmessages and this typically happens in
This designs offers flexibility:
Downside is (slightly) increased complexity and less resilience (another component that may fail). A specific concern for us (related to ordering gurantee of audit messages in kafka) is that we must send all messages from a particular irods process to the same kafka producer (ie collector) as this preservers ordering of messages in kafka. I can do the gRPC server in Go but I might ask your help for the C++ part. We'll see how it works out. |
So we're leaning towards a 5.0 server with a long-running child acting as proxy-broker / passthrough / collector. ( #107 (comment) ). Thank you @wijnanjo for thinking through the scenarios. We need to consider local memory usage when the 'real' broker is not online / connected. Have to decide whether to drop any outgoing messages that do not fit in the buffer on the ground... or provide some kind of backlog and persistence (to disk?)... and then what happens on restart of the iRODS server and/or the proxy process? If there is a memory buffer, and the proxy crashes... those buffered in-memory messages will be lost. What guarantees are we claiming to meet? The current setup requires an admin to immediately fix the broker and there is no buffer to manage. No messages go out because having a live connection is a prerequisite of the iRODS server doing any work. This was a design decision early on so that managing / losing / holding messages could not be the responsibility / fault of the iRODS server and its AMQP plugin. Going in this new direction would shift that responsibility to the iRODS server and its new proxy collector / service. I think that's okay and good - just want to make sure we're enumerating the differences and the impacts on expectations. |
That's a valid concern. A simple collector could still send the payload immediately to the broker and ack on each request; that would give the same behavior as the current solution. Our "kafka flavour" of the collector would initially not provide these guarantees (instead it's expected to work very performant). But we could enhance it by persisting every message that is 'on its way to kafka' (=either in a memory queue of the kafkaclient either in flight) to disk, then ack'ing, and later on removing the message from disk when we have an ack from kafka. Or in case of broker is down, recover the messages from disk and retry. |
@wijnanjo Sounds good. Happy to assist with the C++ part. What I'm about to say is similar to what's been said already. Consider the following: The audit plugin can write generic messages to an abstract storage device. The abstract storage device could represent a memory buffer, a disk, a database, or anything else you can think of. The important thing to keep in mind is:
From there, a process hanging off of the iRODS server or an external application could pull messages from the abstract storage device, convert the messages to the target format, and send them to their final destination (e.g. kafka). We'd include an abstract storage device implementation and binary for AMQP with the audit plugin. The two components would serve as the reference for other implementations. Questions for @wijnanjo:
|
At present, we make a new connection for every AMQP message we send. This is not ideal.
From the RabbitMQ documentation (emphasis theirs):
This comes from AMQP 0-9-1-centric and .NET-centric documentation, but it still applies for us.
The text was updated successfully, but these errors were encountered: