-
Notifications
You must be signed in to change notification settings - Fork 1
PConnections
The Amqphp PConnection extension is designed to be used in a web server, the
feature was started after version 0.9.0 in response to a request from
the RabbitMQ mailing list. The idea of and motivation for persistent
Amqp connections is basically the same as for persistent database
connections, for example when using mysql_pconnect
. By using the
STREAM_CLIENT_PERSISTENT
flag in the stream socket
connect function you can set
up a connection that doesn't close once the current web request
completes, therefore you avoid the deconnection / reconnection
overhead for subsequent requests. Amqphp's persistence extension
allows you to persist the state of an open amqp connection, including
any open channels and consumers. The main elements of the API are in
the \amqphp\persistent
namespace,
they descend from and re-use the implementation of their
non-persistent counterparts. Therefore, using persistent connections
is the same as using non-persistent connections, the only difference
is that you must take care of persisting Connection metadata between
requests. There are two different persistence methods, these use the
same underlying implementation, but are targeted at different use
cases and require a slightly different implementation. I'll outline
these 2 methods next.
Automatic persistence is the easiest method to implement, in fact in most cases I expect this is what people will use. With this method, you have to specify a persistence helper implementation and the API makes connecting / reconnecting to the Amqp server "invisible".
// Connect / Reconnect
$conn = new pconn\PConnection($params);
$conn->connect();
$conn->setPersistenceHelperImpl('\\amqphp\\persistent\\FilePersistenceHelper');
// ...Use $conn in some way
$conn->sleep();
Notice how, in this method, you always create a new PConnection
object, the logic required to detect and re-use existing connections
is hidden inside the connect() method. The connection metadata is
written to cache when you call $conn->sleep()
, so the user needs to
make sure this gets called exactly once in every web request.
Similarly, the user needs to make sure that the connection isn't used
after the call to sleep()
, otherwise the next request which uses
this connection may experience issues.
With this method, the user must take responsibility to call
serialize()
and unserialize()
themselves (PConnection and other
Amqphp classes implement the built-in \Serializable
interface).
// Load serialized connection, maybe from a file
$serialized = loadPersistedConnectionFromSomewhere();
if ($serialized) {
// Re-use existing TCP / Amqp connection, the call to $conn->connect() happens automatically in here
$conn = unserialize($serialized);
} else {
// Create the object as normal the first time.
$conn = new PConnection($params);
$conn->connect();
}
// ...Use $conn in some way
yourCustomSearialzeRoutine(serialize($conn));
The same caveat regarding the use of the connection after serialization applies to this method as well. Please note, It is not a good idea to have connections persist in web sessions
The following items are, or can be placed in to the PConnection cache:
- Connections - obviously, these are the PConnection objects.
- Channels. When you call
PConnection->openChannel()
you get back an instance ofPChannel
, unless you specifically close this channel before the end of the request, this will be written to cache, and therefore automatically available to use in the next request. - Consumers. If you start a consumer on a channel, and the consumer
implements the
\Serializable
interface, and you don't remove this consumer before the end of the request, the consumer will be persisted along with the PConnection and PChannel, therefore the consumer will still be present in the next request when you callnew PConnection
orunserialize($serialized)
. - Internal connection state, possibly including undelivered and partially delivered messages. This is the biggest single danger of using PConnections, if you're not careful, messages will be written to cache and then made available during the next request, this could mean that private data is leaked, and / or certain messages will be lost. This is only really a danger if you leave open consume sessions on channels or use publish confirms, and can be mitigated by setting the 'suspendOnSerialize' and 'resumeOnHydrate' flags on your PChannel objects. (see below)
All of these components are serialized, if required, when you call
serialize($conn)
or $conn->sleep()
, the implementation
automatically detects elements which need saving, regardless of the
persistence mode you're using.
Persistent connections can be confusing to work with at first, and there are circumstances where it's possible for unusual and unwelcome behavior to occur, so this section runs through some basic concepts, guidelines, and easy-to-make mistakes.
PConnections are only useful in web servers, and web servers normally have many "worker processes", therefore each connection will be present in each worker process. If your application opens 2 persistent connections, say to rabbit1 and rabbit2, and the web server has a maximum of 10 worker processes, you'll eventually end up with 10 × 2 Amqp connections, 2 in each process.
The most immediate and important note here regards the use of the Manual Persistence mechansim - you, the user MUST ensure that the cache is performed on a per process basis. Here's an example of one way to do this:
// Load serialized connection, maybe from a file
if ($serialized) {
$conn = unserialize($serialized);
} else {
$conn = new PConnection($params);
$conn->connect();
}
// ...Use $conn in some way
// !!!!Make sure that each worker process has it's own cache!!!
$fileName = 'cache-' . getmypid() . '.txt';
file_put_contents($fileName, serialize($conn));
Notice how the current process ID is included in the file name, without this process B will pick up the cache from process A, assume that this applies to process A and trigger errors. The same thing applies to Automatic persistence mode, but the built-in handlers (File and APC based) do this for you.
Consider the following naive code, which uses automatic persistence:
// Connect / Reconnect
$conn = new pconn\PConnection($params);
$conn->connect();
$conn->setPersistenceHelperImpl('\\amqphp\\persistent\\FilePersistenceHelper');
$chan = $conn->openChannel()
// ...Use $conn and $chan in some way
$conn->sleep();
In this instance, $chan is never closed, meaning that you're opening a
new channel on every request, eventually, you'll overwhelm your Amqp
broker with duplicate channels. Therefore, any code which uses
automatic persistence must check for existing Channels / Consumers
before creating new ones. You can use
PConnection->getPersistenceStatus()
, PConnection->getChannels()
,
PChannel->getConsumerTags()
to check the existing state of the
connection.
If you start a non-persistent consumer during a web request, make sure that you call remove that consumer by returning 'amqp\CONSUMER_CANCEL' from your Consumer implementation or by closing the underlying PChannel. Failure to do so will mean that the broker will deliver messages that the PConnection doesn't expect, and trigger an unrecoverable error.
The Amqphp persistence implementation can save open consumers, however
doing so can lead to unusual and / or undesired behavior. If your
application needs to read messages from the Amqp broker, It's probably
worth trying to use the basic.get
Amqp call rather than having a
persistent consumer. If you do use a persistent consumer, make sure
that you set a low prefetch-count
value (preferably 1), and consider
using the PChannel suspendOnSerialize and resumeOnHydrate flags (see
below for more info). The basic aim here must be to avoid writing
messages to the PConnection cache, if this happens, you'll see lots of
warnings in your error logs like this:
PConnection will persist application data unDelivered.
This means that an incoming message wasn't processed during that request, assuming there's no data leakage issue here you've still got the problem that this will skew response times, leading to some messages being processed much slower than others.
The PChannel class has a couple of flags which can help mitigate the
persistent consumer issues outlined above. This method makes use of
the channel.flow
method, which the Amqp spec describes thus:
Enable/disable flow from peer. This method asks the peer to pause or restart the flow of content data sent by a consumer. This is a simple flow-control mechanism that a peer can use to avoid overflowing its queues or otherwise finding itself receiving more messages than it can process. Note that this method is not intended for window control. It does not affect contents returned by Basic.Get-Ok methods
The 2 flags are:
-
PChannel->$resumeOnHydrate
- set to true and the unserialize routine will automatically set the Channel to flow=YES. Defaults to false. -
PChannel->$suspendOnSerialize
- set to true and the serialize routine will automatically set the Channel to flow=NO. Defaults to false.
Recall the persistent connections are always alive, even when the web server process that own it is inactive, so these flags can help the broker to avoid delivering messages to connections which are not responsive.