Skip to content

PConnections

Robin Harvey edited this page Sep 28, 2011 · 8 revisions

Chapter 7 - persistent connections

Amqphp Persistent Connections

The Amqphp PConnection extension is designed to be used in a web server, the feature was started after version 0.9.0 in response to a request from the RabbitMQ mailing list. The idea of and motivation for persistent Amqp connections is basically the same as for persistent database connections, for example when using mysql_pconnect. By using the STREAM_CLIENT_PERSISTENT flag in the stream socket connect function you can set up a connection that doesn't close once the current web request completes, therefore you avoid the deconnection / reconnection overhead for subsequent requests. Amqphp's persistence extension allows you to persist the state of an open amqp connection, including any open channels and consumers. The main elements of the API are in the \amqphp\persistent namespace, they descend from and re-use the implementation of their non-persistent counterparts. Therefore, using persistent connections is the same as using non-persistent connections, the only difference is that you must take care of persisting Connection metadata between requests. There are two different persistence methods, these use the same underlying implementation, but are targeted at different use cases and require a slightly different implementation. I'll outline these 2 methods next.

Automatic Persistence

Automatic persistence is the easiest method to implement, in fact in most cases I expect this is what people will use. With this method, you have to specify a persistence helper implementation and the API makes connecting / reconnecting to the Amqp server "invisible".

// Connect / Reconnect
$conn = new pconn\PConnection($params);
$conn->connect();
$conn->setPersistenceHelperImpl('\\amqphp\\persistent\\FilePersistenceHelper');
// ...Use $conn in some way
$conn->sleep();

Notice how, in this method, you always create a new PConnection object, the logic required to detect and re-use existing connections is hidden inside the connect() method. The connection metadata is written to cache when you call $conn->sleep(), so the user needs to make sure this gets called exactly once in every web request. Similarly, the user needs to make sure that the connection isn't used after the call to sleep(), otherwise the next request which uses this connection may experience issues.

Manual Persistence

With this method, the user must take responsibility to call serialize() and unserialize() themselves (PConnection and other Amqphp classes implement the built-in \Serializable interface).

// Load serialized connection, maybe from a file
$serialized = loadPersistedConnectionFromSomewhere();
if ($serialized) {
    // Re-use existing TCP / Amqp connection, the call to $conn->connect() happens automatically in here
    $conn = unserialize($serialized);
} else {
    // Create the object as normal the first time.
    $conn = new PConnection($params);
    $conn->connect();
}
// ...Use $conn in some way
yourCustomSearialzeRoutine(serialize($conn));

The same caveat regarding the use of the connection after serialization applies to this method as well. Please note, It is not a good idea to have connections persist in web sessions

What gets persisted?

The following items are, or can be placed in to the PConnection cache:

  • Connections - obviously, these are the PConnection objects.
  • Channels. When you call PConnection->openChannel() you get back an instance of PChannel, unless you specifically close this channel before the end of the request, this will be written to cache, and therefore automatically available to use in the next request.
  • Consumers. If you start a consumer on a channel, and the consumer implements the \Serializable interface, and you don't remove this consumer before the end of the request, the consumer will be persisted along with the PConnection and PChannel, therefore the consumer will still be present in the next request when you call new PConnection or unserialize($serialized).
  • Internal connection state, possibly including undelivered and partially delivered messages. This is the biggest single danger of using PConnections, if you're not careful, messages will be written to cache and then made available during the next request, this could mean that private data is leaked, and / or certain messages will be lost. This is only really a danger if you leave open consume sessions on channels or use publish confirms, and can be mitigated by setting the 'suspendOnSerialize' and 'resumeOnHydrate' flags on your PChannel objects. (see below)

All of these components are serialized, if required, when you call serialize($conn) or $conn->sleep(), the implementation automatically detects elements which need saving, regardless of the persistence mode you're using.

Real world usage, pitfalls, gotchas

Persistent connections can be confusing to work with at first, and there are circumstances where it's possible for unusual and unwelcome behavior to occur, so this section runs through some basic concepts, guidelines, and easy-to-make mistakes.

PConnections are Per process

PConnections are only useful in web servers, and web servers normally have many "worker processes", therefore each connection will be present in each worker process. If your application opens 2 persistent connections, say to rabbit1 and rabbit2, and the web server has a maximum of 10 worker processes, you'll eventually end up with 10 × 2 Amqp connections, 2 in each process.

Manual persistence

The most immediate and important note here regards the use of the Manual Persistence mechansim - you, the user MUST ensure that the cache is performed on a per process basis. Here's an example of one way to do this:

// Load serialized connection, maybe from a file
if ($serialized) {
    $conn = unserialize($serialized);
} else {
    $conn = new PConnection($params);
    $conn->connect();
}
// ...Use $conn in some way
// !!!!Make sure that each worker process has it's own cache!!!
$fileName = 'cache-' . getmypid() . '.txt';
file_put_contents($fileName, serialize($conn));

Notice how the current process ID is included in the file name, without this process B will pick up the cache from process A, assume that this applies to process A and trigger errors. The same thing applies to Automatic persistence mode, but the built-in handlers (File and APC based) do this for you.

Automatic Persistence still requires code to handle re hydrated connections

Consider the following naive code, which uses automatic persistence:

// Connect / Reconnect
$conn = new pconn\PConnection($params);
$conn->connect();
$conn->setPersistenceHelperImpl('\\amqphp\\persistent\\FilePersistenceHelper');
$chan = $conn->openChannel()
// ...Use $conn and $chan in some way
$conn->sleep();

In this instance, $chan is never closed, meaning that you're opening a new channel on every request, eventually, you'll overwhelm your Amqp broker with duplicate channels. Therefore, any code which uses automatic persistence must check for existing Channels / Consumers before creating new ones. You can use PConnection->getPersistenceStatus(), PConnection->getChannels(), PChannel->getConsumerTags() to check the existing state of the connection.

Remember to cancel non-persistent consumers

If you start a non-persistent consumer during a web request, make sure that you call remove that consumer by returning 'amqp\CONSUMER_CANCEL' from your Consumer implementation or by closing the underlying PChannel. Failure to do so will mean that the broker will deliver messages that the PConnection doesn't expect, and trigger an unrecoverable error.

Be careful with persisted consumers

The Amqphp persistence implementation can save open consumers, however doing so can lead to unusual and / or undesired behavior. If your application needs to read messages from the Amqp broker, It's probably worth trying to use the basic.get Amqp call rather than having a persistent consumer. If you do use a persistent consumer, make sure that you set a low prefetch-count value (preferably 1), and consider using the PChannel suspendOnSerialize and resumeOnHydrate flags (see below for more info). The basic aim here must be to avoid writing messages to the PConnection cache, if this happens, you'll see lots of warnings in your error logs like this:

PConnection will persist application data unDelivered.

This means that an incoming message wasn't processed during that request, assuming there's no data leakage issue here you've still got the problem that this will skew response times, leading to some messages being processed much slower than others.

PChannel flags

The PChannel class has a couple of flags which can help mitigate the persistent consumer issues outlined above. This method makes use of the channel.flow method, which the Amqp spec describes thus:

Enable/disable flow from peer. This method asks the peer to pause or restart the flow of content data sent by a consumer. This is a simple flow-control mechanism that a peer can use to avoid overflowing its queues or otherwise finding itself receiving more messages than it can process. Note that this method is not intended for window control. It does not affect contents returned by Basic.Get-Ok methods

The 2 flags are:

  • PChannel->$resumeOnHydrate - set to true and the unserialize routine will automatically set the Channel to flow=YES. Defaults to false.
  • PChannel->$suspendOnSerialize - set to true and the serialize routine will automatically set the Channel to flow=NO. Defaults to false.

Recall the persistent connections are always alive, even when the web server process that own it is inactive, so these flags can help the broker to avoid delivering messages to connections which are not responsive.

Previous Chapter: RabbitMQ Extensions

Documentation home