Tutorial

Hello World!

Tradition dictates that we start with hello world! However rather than simply striving for the shortest program possible, we’ll aim for a more illustrative example while still restricting the functionality to sending and receiving a single message.

 1from proton import Message
 2from proton.handlers import MessagingHandler
 3from proton.reactor import Container
 4
 5
 6class HelloWorld(MessagingHandler):
 7    def __init__(self, server, address):
 8        super(HelloWorld, self).__init__()
 9        self.server = server
10        self.address = address
11
12    def on_start(self, event):
13        conn = event.container.connect(self.server)
14        event.container.create_receiver(conn, self.address)
15        event.container.create_sender(conn, self.address)
16
17    def on_sendable(self, event):
18        event.sender.send(Message(body="Hello World!"))
19        event.sender.close()
20
21    def on_message(self, event):
22        print(event.message.body)
23        event.connection.close()
24
25
26Container(HelloWorld("localhost:5672", "examples")).run()

You can see the import of Container from proton.reactor on the second line. This is a class that makes programming with proton a little easier for the common cases. It includes within it an event loop, and programs written using this utility are generally structured to react to various events. This reactive style is particularly suited to messaging applications.

To be notified of a particular event, you define a class with the appropriately named method on it. That method is then called by the event loop when the event occurs.

We define a class here, HelloWorld, which handles the key events of interest in sending and receiving a message.

The on_start() method is called when the event loop first starts. We handle that by establishing our connection (line 13), a sender over which to send the message (line 15) and a receiver over which to receive it back again (line 14).

The on_sendable() method is called when message can be transferred over the associated sender link to the remote peer. We send out our Hello World! message (line 18), then close the sender (line 19) as we only want to send one message. The closing of the sender will prevent further calls to on_sendable().

The on_message() method is called when a message is received. Within that we simply print the body of the message (line 22) and then close the connection (line 23).

Now that we have defined the logic for handling these events, we create an instance of a Container, pass it our handler and then enter the event loop by calling run(). At this point, control passes to the container instance, which will make the appropriate callbacks to any defined handlers.

To run the example, you will need to have a broker (or similar) accepting connections on that url either with a queue (or topic) matching the given address or else configured to create such a queue (or topic) dynamically. There is a simple broker.py script included alongside the examples that can be used for this purpose if desired. (It is also written using the API described here, and as such gives an example of a slightly more involved application).

Hello World, Direct!

Though often used in conjunction with a broker, AMQP does not require this. It also allows senders and receivers to communicate directly if desired.

Let’s modify our example to demonstrate this.

 1from proton import Message
 2from proton.handlers import MessagingHandler
 3from proton.reactor import Container
 4
 5
 6class HelloWorld(MessagingHandler):
 7    def __init__(self, url):
 8        super(HelloWorld, self).__init__()
 9        self.url = url
10
11    def on_start(self, event):
12        self.acceptor = event.container.listen(self.url)
13        event.container.create_sender(self.url)
14
15    def on_sendable(self, event):
16        event.sender.send(Message(body="Hello World!"))
17        event.sender.close()
18
19    def on_message(self, event):
20        print(event.message.body)
21
22    def on_accepted(self, event):
23        event.connection.close()
24
25    def on_connection_closed(self, event):
26        self.acceptor.close()
27
28
29Container(HelloWorld("localhost:8888/examples")).run()

The first difference, on line 12, is that rather than creating a receiver on the same connection as our sender, we listen for incoming connections by invoking the listen() method on the container.

As we only need then to initiate one link, the sender, we can do that by passing in a url rather than an existing connection, and the connection will also be automatically established for us.

We send the message in response to the on_sendable() callback and print the message out in response to the on_message() callback exactly as before.

However we also handle two new events. We now close the connection from the senders side once the message has been accepted (line 23). The acceptance of the message is an indication of successful transfer to the peer. We are notified of that event through the on_accepted() callback. Then, once the connection has been closed, of which we are notified through the on_closed() callback, we stop accepting incoming connections (line 26) at which point there is no work to be done and the event loop exits, and the run() method will return.

So now we have our example working without a broker involved!

Asynchronous Send and Receive

Of course, these HelloWorld! examples are very artificial, communicating as they do over a network connection but with the same process. A more realistic example involves communication between separate processes (which could indeed be running on completely separate machines).

Let’s separate the sender from the receiver, and let’s transfer more than a single message between them.

We’ll start with a simple sender.

 1import optparse
 2from proton import Message
 3from proton.handlers import MessagingHandler
 4from proton.reactor import Container
 5
 6
 7class Send(MessagingHandler):
 8    def __init__(self, url, messages):
 9        super(Send, self).__init__()
10        self.url = url
11        self.sent = 0
12        self.confirmed = 0
13        self.total = messages
14
15    def on_start(self, event):
16        event.container.create_sender(self.url)
17
18    def on_sendable(self, event):
19        while event.sender.credit and self.sent < self.total:
20            msg = Message(id=(self.sent + 1), body={'sequence': (self.sent + 1)})
21            event.sender.send(msg)
22            self.sent += 1
23
24    def on_accepted(self, event):
25        self.confirmed += 1
26        if self.confirmed == self.total:
27            print("all messages confirmed")
28            event.connection.close()
29
30    def on_disconnected(self, event):
31        self.sent = self.confirmed
32
33
34parser = optparse.OptionParser(usage="usage: %prog [options]",
35                               description="Send messages to the supplied address.")
36parser.add_option("-a", "--address", default="localhost:5672/examples",
37                  help="address to which messages are sent (default %default)")
38parser.add_option("-m", "--messages", type="int", default=100,
39                  help="number of messages to send (default %default)")
40opts, args = parser.parse_args()
41
42try:
43    Container(Send(opts.address, opts.messages)).run()
44except KeyboardInterrupt:
45    pass

As with the previous example, we define the application logic in a class that handles various events. As before, we use the on_start() event to establish our sender link over which we will transfer messages and the on_sendable() event to know when we can transfer our messages.

Because we are transferring more than one message, we need to keep track of how many we have sent. We’ll use a sent member variable for that. The total member variable will hold the number of messages we want to send.

AMQP defines a credit-based flow control mechanism. Flow control allows the receiver to control how many messages it is prepared to receive at a given time and thus prevents any component being overwhelmed by the number of messages it is sent.

In the on_sendable() callback, we check that our sender has credit before sending messages. We also check that we haven’t already sent the required number of messages.

The send() call on line 21 is of course asynchronous. When it returns, the message has not yet actually been transferred across the network to the receiver. By handling the on_accepted() event, we can get notified when the receiver has received and accepted the message. In our example we use this event to track the confirmation of the messages we have sent. We only close the connection and exit when the receiver has received all the messages we wanted to send.

If we are disconnected after a message is sent and before it has been confirmed by the receiver, it is said to be in doubt. We don’t know whether or not it was received. In this example, we will handle that by resending any in-doubt messages. This is known as an ‘at-least-once’ guarantee, since each message should eventually be received at least once, though a given message may be received more than once (i.e. duplicates are possible). In the on_disconnected() callback, we reset the sent count to reflect only those that have been confirmed. The library will automatically try to reconnect for us, and when our sender is sendable again, we can restart from the point we know the receiver got to.

Now let’s look at the corresponding receiver:

 1import optparse
 2from proton.handlers import MessagingHandler
 3from proton.reactor import Container
 4
 5
 6class Recv(MessagingHandler):
 7    def __init__(self, url, count):
 8        super(Recv, self).__init__()
 9        self.url = url
10        self.expected = count
11        self.received = 0
12
13    def on_start(self, event):
14        event.container.create_receiver(self.url)
15
16    def on_message(self, event):
17        if event.message.id and event.message.id < self.received:
18            # ignore duplicate message
19            return
20        if self.expected == 0 or self.received < self.expected:
21            print(event.message.body)
22            self.received += 1
23            if self.received == self.expected:
24                event.receiver.close()
25                event.connection.close()
26
27
28parser = optparse.OptionParser(usage="usage: %prog [options]")
29parser.add_option("-a", "--address", default="localhost:5672/examples",
30                  help="address from which messages are received (default %default)")
31parser.add_option("-m", "--messages", type="int", default=100,
32                  help="number of messages to receive; 0 receives indefinitely (default %default)")
33opts, args = parser.parse_args()
34
35try:
36    Container(Recv(opts.address, opts.messages)).run()
37except KeyboardInterrupt:
38    pass

Here we handle the on_start() by creating our receiver, much like we did for the sender. We also handle the on_message() event for received messages and print the message out as in the Hello World! examples. However, we add some logic to allow the receiver to wait for a given number of messages, then close the connection and exit. We also add some logic to check for and ignore duplicates, using a simple sequential id scheme.

Again, though sending between these two examples requires some sort of intermediary process (e.g. a broker), AMQP allows us to send messages directly between two processes without this if we so wish. In that case, one of the processes needs to accept incoming socket connections. Let’s create a modified version of the receiving example that does this:

 1import optparse
 2from proton.handlers import MessagingHandler
 3from proton.reactor import Container
 4
 5
 6class Recv(MessagingHandler):
 7    def __init__(self, url, count):
 8        super(Recv, self).__init__()
 9        self.url = url
10        self.expected = count
11        self.received = 0
12
13    def on_start(self, event):
14        self.acceptor = event.container.listen(self.url)
15
16    def on_message(self, event):
17        if event.message.id and event.message.id < self.received:
18            # ignore duplicate message
19            return
20        if self.expected == 0 or self.received < self.expected:
21            print(event.message.body)
22            self.received += 1
23            if self.received == self.expected:
24                event.receiver.close()
25                event.connection.close()
26                self.acceptor.close()
27
28
29parser = optparse.OptionParser(usage="usage: %prog [options]")
30parser.add_option("-a", "--address", default="localhost:5672/examples",
31                  help="address from which messages are received (default %default)")
32parser.add_option("-m", "--messages", type="int", default=100,
33                  help="number of messages to receive; 0 receives indefinitely (default %default)")
34opts, args = parser.parse_args()
35
36try:
37    Container(Recv(opts.address, opts.messages)).run()
38except KeyboardInterrupt:
39    pass

There are only two differences here. On line 14, instead of initiating a link (and implicitly a connection), we listen for incoming connections. On line 26, when we have received all the expected messages, we then stop listening for incoming connections by closing the listener object.

You can use the original send example now to send to this receiver directly. (Note: you will need to stop any broker that is listening on the 5672 port, or else change the port used by specifying a different address to each example via the -a command line switch).

We could also modify the original sender to allow the original receiver to connect to it. Again, that just requires two modifications:

 1import optparse
 2from proton import Message
 3from proton.handlers import MessagingHandler
 4from proton.reactor import Container
 5
 6
 7class Send(MessagingHandler):
 8    def __init__(self, url, messages):
 9        super(Send, self).__init__()
10        self.url = url
11        self.sent = 0
12        self.confirmed = 0
13        self.total = messages
14
15    def on_start(self, event):
16        self.acceptor = event.container.listen(self.url)
17
18    def on_sendable(self, event):
19        while event.sender.credit and self.sent < self.total:
20            msg = Message(id=(self.sent + 1), body={'sequence': (self.sent + 1)})
21            event.sender.send(msg)
22            self.sent += 1
23
24    def on_accepted(self, event):
25        self.confirmed += 1
26        if self.confirmed == self.total:
27            print("all messages confirmed")
28            event.connection.close()
29            self.acceptor.close()
30
31    def on_disconnected(self, event):
32        self.sent = self.confirmed
33
34
35parser = optparse.OptionParser(usage="usage: %prog [options]",
36                               description="Send messages to the supplied address.")
37parser.add_option("-a", "--address", default="localhost:5672/examples",
38                  help="address to which messages are sent (default %default)")
39parser.add_option("-m", "--messages", type="int", default=100,
40                  help="number of messages to send (default %default)")
41opts, args = parser.parse_args()
42
43try:
44    Container(Send(opts.address, opts.messages)).run()
45except KeyboardInterrupt:
46    pass

As with the modified receiver, instead of initiating establishment of a link, we listen for incoming connections on line 16 and then on line 29, when we have received confirmation of all the messages we sent, we can close the listener in order to exit. The symmetry in the underlying AMQP that enables this is quite unique and elegant, and in reflecting this the proton API provides a flexible toolkit for implementing all sorts of interesting intermediaries (the broker.py script provided as a simple broker for testing purposes provides an example of this).

To try this modified sender, run the original receiver against it.

Request/Response

A common pattern is to send a request message and expect a response message in return. AMQP has special support for this pattern. Let’s have a look at a simple example. We’ll start with the ‘server’, i.e. the program that will process the request and send the response. Note that we are still using a broker in this example.

Our server will provide a very simple service: it will respond with the body of the request converted to uppercase.

 1import optparse
 2from proton import Message, Url
 3from proton.handlers import MessagingHandler
 4from proton.reactor import Container
 5
 6
 7class Server(MessagingHandler):
 8    def __init__(self, url, address):
 9        super(Server, self).__init__()
10        self.url = url
11        self.address = address
12
13    def on_start(self, event):
14        print("Listening on", self.url)
15        self.container = event.container
16        self.conn = event.container.connect(self.url)
17        self.receiver = event.container.create_receiver(self.conn, self.address)
18        self.server = self.container.create_sender(self.conn, None)
19
20    def on_message(self, event):
21        print("Received", event.message)
22        self.server.send(Message(address=event.message.reply_to, body=event.message.body.upper(),
23                                 correlation_id=event.message.correlation_id))
24
25
26parser = optparse.OptionParser(usage="usage: %prog [options]")
27parser.add_option("-a", "--address", default="localhost:5672/examples",
28                  help="address from which messages are received (default %default)")
29opts, args = parser.parse_args()
30
31url = Url(opts.address)
32
33try:
34    Container(Server(url, url.path)).run()
35except KeyboardInterrupt:
36    pass

The code here is not too different from the simple receiver example. When we receive a request however, we look at the reply_to address on the Message and create a sender for that over which to send the response. We’ll cache the senders in case we get further requests with the same reply_to.

Now let’s create a simple client to test this service out.

 1import optparse
 2from proton import Message
 3from proton.handlers import MessagingHandler
 4from proton.reactor import Container
 5
 6
 7class Client(MessagingHandler):
 8    def __init__(self, url, requests):
 9        super(Client, self).__init__()
10        self.url = url
11        self.requests = requests
12
13    def on_start(self, event):
14        self.sender = event.container.create_sender(self.url)
15        self.receiver = event.container.create_receiver(self.sender.connection, None, dynamic=True)
16
17    def next_request(self):
18        if self.receiver.remote_source.address:
19            req = Message(reply_to=self.receiver.remote_source.address, body=self.requests[0])
20            self.sender.send(req)
21
22    def on_link_opened(self, event):
23        if event.receiver == self.receiver:
24            self.next_request()
25
26    def on_message(self, event):
27        print("%s => %s" % (self.requests.pop(0), event.message.body))
28        if self.requests:
29            self.next_request()
30        else:
31            event.connection.close()
32
33
34REQUESTS = ["Twas brillig, and the slithy toves",
35            "Did gire and gymble in the wabe.",
36            "All mimsy were the borogroves,",
37            "And the mome raths outgrabe."]
38
39parser = optparse.OptionParser(usage="usage: %prog [options]",
40                               description="Send requests to the supplied address and print responses.")
41parser.add_option("-a", "--address", default="localhost:5672/examples",
42                  help="address to which messages are sent (default %default)")
43opts, args = parser.parse_args()
44
45Container(Client(opts.address, args or REQUESTS)).run()

As well as sending requests, we need to be able to get back the responses. We create a receiver for that (see line 15), but we don’t specify an address, we set the dynamic option which tells the broker to create a temporary address over which we can receive our responses.

We need to use the address allocated by the broker as the reply_to address of our requests, so we can’t send them until the broker has confirmed our receiving link has been set up (at which point we will have our allocated address). To do that, we add an on_link_opened() method to our handler class, and if the link associated with the event is the receiver, we use that as the trigger to send our first request.

Again, we could avoid having any intermediary process here if we wished. The following code implementas a server to which the client above could connect directly without any need for a broker or similar.

 1import uuid
 2from proton import Message
 3from proton.handlers import MessagingHandler
 4from proton.reactor import Container
 5
 6
 7class Server(MessagingHandler):
 8    def __init__(self, url):
 9        super(Server, self).__init__()
10        self.url = url
11        self.senders = {}
12
13    def on_start(self, event):
14        print("Listening on", self.url)
15        self.container = event.container
16        self.acceptor = event.container.listen(self.url)
17
18    def on_link_opening(self, event):
19        if event.link.is_sender:
20            if event.link.remote_source and event.link.remote_source.dynamic:
21                event.link.source.address = str(uuid.uuid4())
22                self.senders[event.link.source.address] = event.link
23            elif event.link.remote_target and event.link.remote_target.address:
24                event.link.target.address = event.link.remote_target.address
25                self.senders[event.link.remote_target.address] = event.link
26            elif event.link.remote_source:
27                event.link.source.address = event.link.remote_source.address
28        elif event.link.remote_target:
29            event.link.target.address = event.link.remote_target.address
30
31    def on_message(self, event):
32        print("Received", event.message)
33        sender = self.senders.get(event.message.reply_to)
34        if not sender:
35            print("No link for reply")
36            return
37        sender.send(Message(address=event.message.reply_to, body=event.message.body.upper(),
38                            correlation_id=event.message.correlation_id))
39
40
41try:
42    Container(Server("0.0.0.0:8888")).run()
43except KeyboardInterrupt:
44    pass

Though this requires some more extensive changes than the simple sending and receiving examples, the essence of the program is still the same. Here though, rather than the server establishing a link for the response, it relies on the link that the client established, since that now comes in directly to the server process.

Miscellaneous

Many brokers offer the ability to consume messages based on a ‘selector’ that defines which messages are of interest based on particular values of the headers. The following example shows how that can be achieved:

 1import optparse
 2from proton import Url
 3from proton.reactor import Container, Selector
 4from proton.handlers import MessagingHandler
 5
 6
 7class Recv(MessagingHandler):
 8    def __init__(self, url, count):
 9        super(Recv, self).__init__()
10        self.url = Url(url)
11        self.expected = count
12        self.received = 0
13
14    def on_start(self, event):
15        conn = event.container.connect(self.url)
16        event.container.create_receiver(conn, self.url.path, options=Selector("colour = 'green'"))
17
18    def on_message(self, event):
19        print(event.message.body)
20        self.received += 1
21        if self.received == self.expected:
22            event.receiver.close()
23            event.connection.close()
24
25
26parser = optparse.OptionParser(usage="usage: %prog [options]")
27parser.add_option("-a", "--address", default="localhost:5672/examples",
28                  help="address from which messages are received (default %default)")
29parser.add_option("-m", "--messages", type="int", default=0,
30                  help="number of messages to receive; 0 receives indefinitely (default %default)")
31opts, args = parser.parse_args()
32
33try:
34    Container(Recv(opts.address, opts.messages)).run()
35except KeyboardInterrupt:
36    pass

When creating the receiver, we specify a Selector object as an option. The options argument can take a single object or a list. Another option that is sometimes of interest when using a broker is the ability to ‘browse’ the messages on a queue, rather than consuming them. This is done in AMQP by specifying a distribution mode of ‘copy’ (instead of ‘move’ which is the expected default for queues). An example of that is shown next:

 1from proton.reactor import Container, Copy
 2from proton.handlers import MessagingHandler
 3
 4
 5class Recv(MessagingHandler):
 6    def __init__(self):
 7        super(Recv, self).__init__()
 8
 9    def on_start(self, event):
10        conn = event.container.connect("localhost:5672")
11        event.container.create_receiver(conn, "examples", options=Copy())
12
13    def on_message(self, event):
14        print(event.message)
15        if event.receiver.queued == 0 and event.receiver.drained:
16            event.connection.close()
17
18
19try:
20    Container(Recv()).run()
21except KeyboardInterrupt:
22    pass