Skip to content

Documentation

BraveSirRobin edited this page Feb 26, 2011 · 47 revisions

AMQPHP User Documentation

Introduction

AMQPHP is an Amqp implementation written in pure PHP (the clue's in the name!). AMQPHP is written using namespaces (PHP 5.3+) and features an XSLT based code generation system. Using code generation means that AMQPHP can more easily keep up with changes in the Amqp 0.9.1 protocol, for example exchange to exchange bindings in RabbitMQ can be supported without re-writing any PHP code. AMQPHP can be used to write both Amqp producers and consumers, and can use SSL, with client certificates and peer-verification.

Getting Started.

First you need to generate the Amqp protocol bindings, there's a PHP script to do this in the 'tools' subdirectory, simply run the generate-bindings.php script and copy the generated code to the correct location.

cd tools/

php generate-bindings.php

mv gencode ../

You should now be able to run one of the test scripts, from the CLI it's probably best to run tests/forker.php - you can manipulate the RabbitMQ connection parameters and the number of Proudcers / Consumers in the forker.xml config file.

A Simple producer

Here's the minimum amount of code you need to send a message to a pre-existing message queue (Note that this file is in the Github repo):

use amqp_091 as amqp;
use amqp_091\protocol;
use amqp_091\wire;

require __DIR__ . '/../amqp.php';
require __DIR__ . '/demo-common.php';

// Basic RabbitMQ connection settings
$config = array (
                 'username' => 'testing',
                 'userpass' => 'letmein',
                 'vhost' => 'robin'
                 );

// Connect to the RabbitMQ server, set up an Amqp channel
$conn = new amqp\Connection($config);
$conn->connect();
$chan = $conn->getChannel();

initialiseDemo();

// Prepare the 'header parameters' and message content - these will
// be sent to RabbitMQ
$publishParams = array('content-type' => 'text/plain',
                       'content-encoding' => 'UTF-8',
                       'routing-key' => '',
                       'mandatory' => false,
                       'immediate' => false,
                       'exchange' => $EX_NAME);


// Create a Message object
$basicP = $chan->basic('publish', $publishParams);

// Send multiple messages to the RabbitMQ broker using the channel set up earlier.
$messages = array('Hi!', 'guten Tag', 'ciao', 'buenos días', 'end');
foreach ($messages as $m) {
    $basicP->setContent($m);
    $chan->invoke($basicP);
}


$chan->shutdown();
$conn->shutdown();
echo "Test complete!\n";

This script will attempt to post a message to a RabbitMQ broker on localhost:5672, then gracefully disconnect from the broker. The steps taken are as follows:

  • Connect to the RabbitMQ broker, this is triggered by $conn->connect(). In this step, the underlying TCP connection is established, the client/broker handshake is completed, and the connection is initialised. Once this step has completed, you've got a connection to your broker which you can start to use.
  • Create an Amqp channel to send messages through. By calling $conn->getChannel() without an integer parameter, the connection sets up a new Channel by sending commands to the RabbitMQ server (whereas calling $conn->getChannel() with an integer parameter gives you access to existing channels). Once this step has completed, you're ready to send (or receive) messages.
  • Create an Amqp message. One of the key components of the AMQPHP API is the \amqp_091\wire\Method class - this is used to represent ALL communications (incoming or outgoing) with the remote broker. Methods are either in read or write mode - in read mode, objects are created by reading an encoded message received from the broker, and in "write mode" methods are created via. the Channel or Connection object in order to send data to the broker. In this example, we create a "write mode" basic.publish method, which carries a single message to the broker. After this step is complete we're ready to send our message to the broker, although nothing has yet been written to the wire.
  • Send the Amqp message to the broker. The line $chan->invoke($basicP) causes AMQPHP to serialise the message you provide, and send this to the RabbitMQ broker. Note that you could send multiple messages using the same $basicP object, or even change the message by saying $basicP->setcontent('something else'), then $chan->invoke($basicP) to send a different message. Once this step has completed, the RabbitMQ server should have received (and possibly delivered) your message(s).
  • Disconnect from the channel and connection. The shutdown methods should be used to gracefully disconnect from the broker by sending Amqp disconnect commands. If you omit these steps, you won't get any errors in your consumer script, but your RabbitMQ server won't be very happy, as it's stongly advised (by the RabbitMQ team) to gracefully disconnect in this way. After this step completes, you've shut down your Channel, Connection and the underlying TCP connection, if you want to send more messages you need to re-connect the $conn object.

A Simple message reader

Amqp defines 2 ways of receiving messages from a broker, the simplest of these is the basic.get command. This command will be familiar to web programmers as it uses an HTTP-like request/response messaging pattern. Here's a snip of code that demonstrates this in Amqphp:

// Prepare the basic.get command, this is like an HTTP request (except you can't POST anything ;-)
$getParams = array('queue' => $Q, 'no-ack' => false);
$getCommand = $chan->basic('get', $getParams);

// Send our command
$response = $chan->invoke($getCommand);

// Examine the response to see what we've got...
if ($response->getClassProto()->getSpecName() == 'basic' && 
    $response->getMethodProto()->getSpecName() == 'get-empty') {
    // This response means that the target $Q has no messages at this time
} else if ($response->getClassProto()->getSpecName() == 'basic' && 
           $response->getMethodProto()->getSpecName() == 'get-ok') {
    // Message payload is here: $response->getContent()
}

The basic.get command has 2 possible response messages, a 'get-ok' response means that there was a message available on the target queue (this is included in the response), the get-empty means "there are no messages at this time". Normally, you would also either ack or reject the message by sending basic.get or basic.ack, copying the delivery-tag to the outgoing message. There's a simple demo script in the repos.

AMQP 0.9.1 and the Amqphp API

Amqp is a wire level messaging protocol that sits at the same level in the "conceptual network layering model" (i.e. OSI) as HTTP - it sits on top of the TCP protocol. TCP is a fully asynchronous communications channel (i.e. boths sides can read and write at the same time) - this is something that HTTP doesn't take full advantage of, as the protocol itself (HTTP) states that you have to disconnect after the initial "request" is completed (i.e. the document has been delivered). HTTP presents a very simple messaging model to work with in programs, it leads to the "request response" model that is so prevalent in the web, but it can be limiting and inefficient if you need to build distributed systems that implement complex messaging patterns.

Enter Amqp, a protocol that takes more advantage of the stateful, and bi-directional aspects of TCP. This protocol doesn't want to disconnect the underlying TCP protocol as regularly as HTTP (which can make it more efficient) and expects client programs to read and write to the network connection in more complex sequences than HTTP.

The Amqp 0.9.1 protocol specifies two roles, broker and client, in an analogous way that HTTP defines the roles server and client. All communication in Amqp goes via. the broker; think of the broker as a message intermediary, clients write messages to the broker and clients read messages from the broker, sometimes clients can both read and write messages. Amqp defines lots of different types of messages, each of these types means a different thing, some messages are defined as having a response message type, some have no response message (i.e. they are uni-directional), some "carry content". Messages that are said to be "synchronous" are ones that have a response message type, for instance the message of type "declare a queue" has a response message of "queue declared OK" so that the client knows the "command" was successful. Amqp messages are arranged in to a hierarchy whereby each message has a name and a class - the class groups the message types together and names are unique within each class. You could easily represent this hierarchy in an XML document: <?xml version="1.0"> This pseudo-code defines 3 methods and one class, in Amqp parlance these would be foo.bar, foo.baz, foo.bat. Of course, it's no coincidence that I used an XML example, the main Amqp 0.9.1 spec is also defined in XML.

The Amqphp API is set up so that creating and sending messages to the broker follows a simple pattern: $msg = $chan->$amqpClass($amqpMethod, array('method' => 'args'), "method payload"); $resp = $chan->invoke($msg); The 2 steps are "create message" then "send message", if the message is synchronous (i.e. has a response type) then the response message is returned from the second step, in this example in $resp. This means that in the above 2 lines of code, you have created a message, sent it to the broker and then waited for the response. If the message that you create doesn't have a defined response (such as basic.publish) then Amqp doesn't wait for a response and nothing is returned. Amqp messages can also have "parameters" (analogous to HTTP headers), each message type has it's own pre-defined set of parameters, and you provide these to Amqp as an associative array of parameter name / value pairs.

Implementing a Consumer

Consumers are objects which you register with a Channel object that are called each time the Amqp broker sends you a message. A good way to understand what's happening here is to look at how the Amqp protocol begins and stops consume sessions:

  1. Client -> Broker : "Hi, I'd like to receive messages from Queue XXX"
  2. Broker -> Client : "Okay, get ready!"
  3. Broker -> Client : "Here's message abc123"
  4. Client -> Broker : "Acknowledge that one"
  5. Broker -> Client : "Here's message def456"
  6. Client -> Broker : "Reject that one"
  7. Client -> Broker : "I want to stop recieving now."

The client is expected to listen for messages that come from the broker, and then acknowledge (ack) or reject them. If you're used to the plain old HTTP web world where everything comes in request/response pairs this looks quite different.

The Amqphp API provides methods to start and stop consumers, and a method for listening to messages. The function that you call to listen out for messages is Connection->select() - this is named after the standard Unix system function of the same name, which is implemented more than once in PHP. Once you've called select(), Amqphp will listen for incoming messages and deliver them as soon as they've arrived.

Consumer classes that receive messages should always implement the Consumer interface, but usually you should subclass the SimpleConsumer class instead. Here's an example (again, this one's in the demos folder):

class DemoConsumer extends amqp\SimpleConsumer
{
    // Print the message we've received and send an Ack back to the broker
    function handleDelivery (wire\Method $meth, amqp\Channel $chan) {
        printf("[message received]\n%s\n", $meth->getContent());
        if ($meth->getContent() == 'end') {
            return array(amqp\CONSUMER_CANCEL, amqp\CONSUMER_ACK);
        } else if ($meth->getContent() == 'reject') {
            return amqp\CONSUMER_REJECT;
        } else {
            return amqp\CONSUMER_ACK;
        }
    }
}

Pretty basic really! There are other callbacks available for different parts of the lifecycle, the most important one is handleDelivery() which is called when a message is received. You respond to the message by retuning one or more response codes - these instruct the Channel object to respond to the message in one or more ways. You can also send other Amqp messages from inside the handleDelivery() method (perhaps a basic.publish as part of a RPC type messaging pattern), you should use the Channel object in the second argument.

Amqphp also provides several methods of escaping from the select loop (in case you don't want to loop for ever, perhaps inside a web server). There are timeout, callback and logic-based exit strategies available, these are listed and explained in the demo consumer script

Publishing Confirms

The RabbitMQ people have released an extension to the core AMQP 0.9.1 protocol to support publishing confirmation messages. Amqphp supports these via. the use of a special callback function:

$conn = new amqp\Connection($config);
$conn->connect();
$chan = $conn->getChannel();

initialiseDemo();

// Prepare the 'header parameters' and message content - these will
// be sent to RabbitMQ
$publishParams = array('content-type' => 'text/plain',
                       'content-encoding' => 'UTF-8',
                       'routing-key' => '',
                       'mandatory' => false,
                       'immediate' => false,
                       'exchange' => $EX_NAME);


// Create a Message object
$basicP = $chan->basic('publish', $publishParams, 'Content goes here.');

// Do this before sending the message, it sends the required amqp commands
// and starts numbering outgoing messages.
$chan->setConfirmMode();

$chan->invoke($basicP);

// Set listener functions on the channel
$chan->setPublishConfirmCallback(function ($meth) {
        printf("Publish confirmed for message %s\n", $meth->getField('delivery-tag'));
});

// invoke the select method to listen for responses
$conn->select();

As with Consumers, you use the Channel->select() function to listen for the publishing acks, these are delivered to the function that is given to setPublishConfirmCallback(). Note that Amqphp can deal with publishing confirms and incoming Consumer methods at the same time, so if the above code was called from inside the select loop you wouldn't want to call select() (this would generate an error, and it's unnecessary). The same select loop exit methods are available when listening for publish acks as exist for Consumers.

Clone this wiki locally