Tutorial¶
Hello World!¶
Tradition dictates that we start with hello world! However rather than simply striving for the shortest program possible, we’ll aim for a more illustrative example while still restricting the functionality to sending and receiving a single message.
1from proton import Message
2from proton.handlers import MessagingHandler
3from proton.reactor import Container
4
5
6class HelloWorld(MessagingHandler):
7 def __init__(self, server, address):
8 super(HelloWorld, self).__init__()
9 self.server = server
10 self.address = address
11
12 def on_start(self, event):
13 conn = event.container.connect(self.server)
14 event.container.create_receiver(conn, self.address)
15 event.container.create_sender(conn, self.address)
16
17 def on_sendable(self, event):
18 event.sender.send(Message(body="Hello World!"))
19 event.sender.close()
20
21 def on_message(self, event):
22 print(event.message.body)
23 event.connection.close()
24
25
26Container(HelloWorld("localhost:5672", "examples")).run()
You can see the import of Container
from proton.reactor
on the
second line. This is a class that makes programming with proton a
little easier for the common cases. It includes within it an event
loop, and programs written using this utility are generally structured
to react to various events. This reactive style is particularly suited
to messaging applications.
To be notified of a particular event, you define a class with the appropriately named method on it. That method is then called by the event loop when the event occurs.
We define a class here, HelloWorld
, which handles the key events of
interest in sending and receiving a message.
The on_start()
method is called when the event loop first
starts. We handle that by establishing our connection (line 13), a
sender over which to send the message (line 15) and a receiver over
which to receive it back again (line 14).
The on_sendable()
method is called when message can be transferred
over the associated sender link to the remote peer. We send out our
Hello World!
message (line 18), then close the sender (line 19) as
we only want to send one message. The closing of the sender will
prevent further calls to on_sendable()
.
The on_message()
method is called when a message is
received. Within that we simply print the body of the message (line
22) and then close the connection (line 23).
Now that we have defined the logic for handling these events, we
create an instance of a Container
, pass it
our handler and then enter the event loop by calling
run()
. At this point, control
passes to the container instance, which will make the appropriate
callbacks to any defined handlers.
To run the example, you will need to have a broker (or similar) accepting connections on that url either with a queue (or topic) matching the given address or else configured to create such a queue (or topic) dynamically. There is a simple broker.py script included alongside the examples that can be used for this purpose if desired. (It is also written using the API described here, and as such gives an example of a slightly more involved application).
Hello World, Direct!¶
Though often used in conjunction with a broker, AMQP does not require this. It also allows senders and receivers to communicate directly if desired.
Let’s modify our example to demonstrate this.
1from proton import Message
2from proton.handlers import MessagingHandler
3from proton.reactor import Container
4
5
6class HelloWorld(MessagingHandler):
7 def __init__(self, url):
8 super(HelloWorld, self).__init__()
9 self.url = url
10
11 def on_start(self, event):
12 self.acceptor = event.container.listen(self.url)
13 event.container.create_sender(self.url)
14
15 def on_sendable(self, event):
16 event.sender.send(Message(body="Hello World!"))
17 event.sender.close()
18
19 def on_message(self, event):
20 print(event.message.body)
21
22 def on_accepted(self, event):
23 event.connection.close()
24
25 def on_connection_closed(self, event):
26 self.acceptor.close()
27
28
29Container(HelloWorld("localhost:8888/examples")).run()
The first difference, on line 12, is that rather than creating a
receiver on the same connection as our sender, we listen for incoming
connections by invoking the
listen()
method on the
container.
As we only need then to initiate one link, the sender, we can do that by passing in a url rather than an existing connection, and the connection will also be automatically established for us.
We send the message in response to the on_sendable()
callback and
print the message out in response to the on_message()
callback
exactly as before.
However we also handle two new events. We now close the connection
from the senders side once the message has been accepted (line
23). The acceptance of the message is an indication of successful
transfer to the peer. We are notified of that event through the
on_accepted()
callback. Then, once the connection has been closed,
of which we are notified through the on_closed()
callback, we stop
accepting incoming connections (line 26) at which point there is no
work to be done and the event loop exits, and the run() method will
return.
So now we have our example working without a broker involved!
Asynchronous Send and Receive¶
Of course, these HelloWorld!
examples are very artificial,
communicating as they do over a network connection but with the same
process. A more realistic example involves communication between
separate processes (which could indeed be running on completely
separate machines).
Let’s separate the sender from the receiver, and let’s transfer more than a single message between them.
We’ll start with a simple sender.
1import optparse
2from proton import Message
3from proton.handlers import MessagingHandler
4from proton.reactor import Container
5
6
7class Send(MessagingHandler):
8 def __init__(self, url, messages):
9 super(Send, self).__init__()
10 self.url = url
11 self.sent = 0
12 self.confirmed = 0
13 self.total = messages
14
15 def on_start(self, event):
16 event.container.create_sender(self.url)
17
18 def on_sendable(self, event):
19 while event.sender.credit and self.sent < self.total:
20 msg = Message(id=(self.sent + 1), body={'sequence': (self.sent + 1)})
21 event.sender.send(msg)
22 self.sent += 1
23
24 def on_accepted(self, event):
25 self.confirmed += 1
26 if self.confirmed == self.total:
27 print("all messages confirmed")
28 event.connection.close()
29
30 def on_disconnected(self, event):
31 self.sent = self.confirmed
32
33
34parser = optparse.OptionParser(usage="usage: %prog [options]",
35 description="Send messages to the supplied address.")
36parser.add_option("-a", "--address", default="localhost:5672/examples",
37 help="address to which messages are sent (default %default)")
38parser.add_option("-m", "--messages", type="int", default=100,
39 help="number of messages to send (default %default)")
40opts, args = parser.parse_args()
41
42try:
43 Container(Send(opts.address, opts.messages)).run()
44except KeyboardInterrupt:
45 pass
As with the previous example, we define the application logic in a
class that handles various events. As before, we use the
on_start()
event to establish our sender link over which we will
transfer messages and the on_sendable()
event to know when we can
transfer our messages.
Because we are transferring more than one message, we need to keep
track of how many we have sent. We’ll use a sent
member variable
for that. The total
member variable will hold the number of
messages we want to send.
AMQP defines a credit-based flow control mechanism. Flow control allows the receiver to control how many messages it is prepared to receive at a given time and thus prevents any component being overwhelmed by the number of messages it is sent.
In the on_sendable()
callback, we check that our sender has credit
before sending messages. We also check that we haven’t already sent
the required number of messages.
The send()
call on line 21 is of course asynchronous. When it
returns, the message has not yet actually been transferred across the
network to the receiver. By handling the on_accepted()
event, we
can get notified when the receiver has received and accepted the
message. In our example we use this event to track the confirmation of
the messages we have sent. We only close the connection and exit when
the receiver has received all the messages we wanted to send.
If we are disconnected after a message is sent and before it has been
confirmed by the receiver, it is said to be in doubt
. We don’t
know whether or not it was received. In this example, we will handle
that by resending any in-doubt messages. This is known as an
‘at-least-once’ guarantee, since each message should eventually be
received at least once, though a given message may be received more
than once (i.e. duplicates are possible). In the on_disconnected()
callback, we reset the sent count to reflect only those that have been
confirmed. The library will automatically try to reconnect for us, and
when our sender is sendable again, we can restart from the point we
know the receiver got to.
Now let’s look at the corresponding receiver:
1import optparse
2from proton.handlers import MessagingHandler
3from proton.reactor import Container
4
5
6class Recv(MessagingHandler):
7 def __init__(self, url, count):
8 super(Recv, self).__init__()
9 self.url = url
10 self.expected = count
11 self.received = 0
12
13 def on_start(self, event):
14 event.container.create_receiver(self.url)
15
16 def on_message(self, event):
17 if event.message.id and event.message.id < self.received:
18 # ignore duplicate message
19 return
20 if self.expected == 0 or self.received < self.expected:
21 print(event.message.body)
22 self.received += 1
23 if self.received == self.expected:
24 event.receiver.close()
25 event.connection.close()
26
27
28parser = optparse.OptionParser(usage="usage: %prog [options]")
29parser.add_option("-a", "--address", default="localhost:5672/examples",
30 help="address from which messages are received (default %default)")
31parser.add_option("-m", "--messages", type="int", default=100,
32 help="number of messages to receive; 0 receives indefinitely (default %default)")
33opts, args = parser.parse_args()
34
35try:
36 Container(Recv(opts.address, opts.messages)).run()
37except KeyboardInterrupt:
38 pass
Here we handle the on_start()
by creating our receiver, much like
we did for the sender. We also handle the on_message()
event for
received messages and print the message out as in the Hello World!
examples. However, we add some logic to allow the receiver to wait for
a given number of messages, then close the connection and exit. We
also add some logic to check for and ignore duplicates, using a simple
sequential id scheme.
Again, though sending between these two examples requires some sort of intermediary process (e.g. a broker), AMQP allows us to send messages directly between two processes without this if we so wish. In that case, one of the processes needs to accept incoming socket connections. Let’s create a modified version of the receiving example that does this:
1import optparse
2from proton.handlers import MessagingHandler
3from proton.reactor import Container
4
5
6class Recv(MessagingHandler):
7 def __init__(self, url, count):
8 super(Recv, self).__init__()
9 self.url = url
10 self.expected = count
11 self.received = 0
12
13 def on_start(self, event):
14 self.acceptor = event.container.listen(self.url)
15
16 def on_message(self, event):
17 if event.message.id and event.message.id < self.received:
18 # ignore duplicate message
19 return
20 if self.expected == 0 or self.received < self.expected:
21 print(event.message.body)
22 self.received += 1
23 if self.received == self.expected:
24 event.receiver.close()
25 event.connection.close()
26 self.acceptor.close()
27
28
29parser = optparse.OptionParser(usage="usage: %prog [options]")
30parser.add_option("-a", "--address", default="localhost:5672/examples",
31 help="address from which messages are received (default %default)")
32parser.add_option("-m", "--messages", type="int", default=100,
33 help="number of messages to receive; 0 receives indefinitely (default %default)")
34opts, args = parser.parse_args()
35
36try:
37 Container(Recv(opts.address, opts.messages)).run()
38except KeyboardInterrupt:
39 pass
There are only two differences here. On line 14, instead of initiating a link (and implicitly a connection), we listen for incoming connections. On line 26, when we have received all the expected messages, we then stop listening for incoming connections by closing the listener object.
You can use the original send example now to send to this receiver directly. (Note: you will need to stop any broker that is listening on the 5672 port, or else change the port used by specifying a different address to each example via the -a command line switch).
We could also modify the original sender to allow the original receiver to connect to it. Again, that just requires two modifications:
1import optparse
2from proton import Message
3from proton.handlers import MessagingHandler
4from proton.reactor import Container
5
6
7class Send(MessagingHandler):
8 def __init__(self, url, messages):
9 super(Send, self).__init__()
10 self.url = url
11 self.sent = 0
12 self.confirmed = 0
13 self.total = messages
14
15 def on_start(self, event):
16 self.acceptor = event.container.listen(self.url)
17
18 def on_sendable(self, event):
19 while event.sender.credit and self.sent < self.total:
20 msg = Message(id=(self.sent + 1), body={'sequence': (self.sent + 1)})
21 event.sender.send(msg)
22 self.sent += 1
23
24 def on_accepted(self, event):
25 self.confirmed += 1
26 if self.confirmed == self.total:
27 print("all messages confirmed")
28 event.connection.close()
29 self.acceptor.close()
30
31 def on_disconnected(self, event):
32 self.sent = self.confirmed
33
34
35parser = optparse.OptionParser(usage="usage: %prog [options]",
36 description="Send messages to the supplied address.")
37parser.add_option("-a", "--address", default="localhost:5672/examples",
38 help="address to which messages are sent (default %default)")
39parser.add_option("-m", "--messages", type="int", default=100,
40 help="number of messages to send (default %default)")
41opts, args = parser.parse_args()
42
43try:
44 Container(Send(opts.address, opts.messages)).run()
45except KeyboardInterrupt:
46 pass
As with the modified receiver, instead of initiating establishment of a link, we listen for incoming connections on line 16 and then on line 29, when we have received confirmation of all the messages we sent, we can close the listener in order to exit. The symmetry in the underlying AMQP that enables this is quite unique and elegant, and in reflecting this the proton API provides a flexible toolkit for implementing all sorts of interesting intermediaries (the broker.py script provided as a simple broker for testing purposes provides an example of this).
To try this modified sender, run the original receiver against it.
Request/Response¶
A common pattern is to send a request message and expect a response message in return. AMQP has special support for this pattern. Let’s have a look at a simple example. We’ll start with the ‘server’, i.e. the program that will process the request and send the response. Note that we are still using a broker in this example.
Our server will provide a very simple service: it will respond with the body of the request converted to uppercase.
1import optparse
2from proton import Message, Url
3from proton.handlers import MessagingHandler
4from proton.reactor import Container
5
6
7class Server(MessagingHandler):
8 def __init__(self, url, address):
9 super(Server, self).__init__()
10 self.url = url
11 self.address = address
12
13 def on_start(self, event):
14 print("Listening on", self.url)
15 self.container = event.container
16 self.conn = event.container.connect(self.url)
17 self.receiver = event.container.create_receiver(self.conn, self.address)
18 self.server = self.container.create_sender(self.conn, None)
19
20 def on_message(self, event):
21 print("Received", event.message)
22 self.server.send(Message(address=event.message.reply_to, body=event.message.body.upper(),
23 correlation_id=event.message.correlation_id))
24
25
26parser = optparse.OptionParser(usage="usage: %prog [options]")
27parser.add_option("-a", "--address", default="localhost:5672/examples",
28 help="address from which messages are received (default %default)")
29opts, args = parser.parse_args()
30
31url = Url(opts.address)
32
33try:
34 Container(Server(url, url.path)).run()
35except KeyboardInterrupt:
36 pass
The code here is not too different from the simple receiver
example. When we receive a request however, we look at the
reply_to
address on the
Message
and create a sender for that over which to
send the response. We’ll cache the senders in case we get further
requests with the same reply_to.
Now let’s create a simple client to test this service out.
1import optparse
2from proton import Message
3from proton.handlers import MessagingHandler
4from proton.reactor import Container
5
6
7class Client(MessagingHandler):
8 def __init__(self, url, requests):
9 super(Client, self).__init__()
10 self.url = url
11 self.requests = requests
12
13 def on_start(self, event):
14 self.sender = event.container.create_sender(self.url)
15 self.receiver = event.container.create_receiver(self.sender.connection, None, dynamic=True)
16
17 def next_request(self):
18 if self.receiver.remote_source.address:
19 req = Message(reply_to=self.receiver.remote_source.address, body=self.requests[0])
20 self.sender.send(req)
21
22 def on_link_opened(self, event):
23 if event.receiver == self.receiver:
24 self.next_request()
25
26 def on_message(self, event):
27 print("%s => %s" % (self.requests.pop(0), event.message.body))
28 if self.requests:
29 self.next_request()
30 else:
31 event.connection.close()
32
33
34REQUESTS = ["Twas brillig, and the slithy toves",
35 "Did gire and gymble in the wabe.",
36 "All mimsy were the borogroves,",
37 "And the mome raths outgrabe."]
38
39parser = optparse.OptionParser(usage="usage: %prog [options]",
40 description="Send requests to the supplied address and print responses.")
41parser.add_option("-a", "--address", default="localhost:5672/examples",
42 help="address to which messages are sent (default %default)")
43opts, args = parser.parse_args()
44
45Container(Client(opts.address, args or REQUESTS)).run()
As well as sending requests, we need to be able to get back the responses. We create a receiver for that (see line 15), but we don’t specify an address, we set the dynamic option which tells the broker to create a temporary address over which we can receive our responses.
We need to use the address allocated by the broker as the reply_to
address of our requests, so we can’t send them until the broker has
confirmed our receiving link has been set up (at which point we will
have our allocated address). To do that, we add an
on_link_opened()
method to our handler class, and if the link
associated with the event is the receiver, we use that as the trigger to
send our first request.
Again, we could avoid having any intermediary process here if we wished. The following code implementas a server to which the client above could connect directly without any need for a broker or similar.
1import uuid
2from proton import Message
3from proton.handlers import MessagingHandler
4from proton.reactor import Container
5
6
7class Server(MessagingHandler):
8 def __init__(self, url):
9 super(Server, self).__init__()
10 self.url = url
11 self.senders = {}
12
13 def on_start(self, event):
14 print("Listening on", self.url)
15 self.container = event.container
16 self.acceptor = event.container.listen(self.url)
17
18 def on_link_opening(self, event):
19 if event.link.is_sender:
20 if event.link.remote_source and event.link.remote_source.dynamic:
21 event.link.source.address = str(uuid.uuid4())
22 self.senders[event.link.source.address] = event.link
23 elif event.link.remote_target and event.link.remote_target.address:
24 event.link.target.address = event.link.remote_target.address
25 self.senders[event.link.remote_target.address] = event.link
26 elif event.link.remote_source:
27 event.link.source.address = event.link.remote_source.address
28 elif event.link.remote_target:
29 event.link.target.address = event.link.remote_target.address
30
31 def on_message(self, event):
32 print("Received", event.message)
33 sender = self.senders.get(event.message.reply_to)
34 if not sender:
35 print("No link for reply")
36 return
37 sender.send(Message(address=event.message.reply_to, body=event.message.body.upper(),
38 correlation_id=event.message.correlation_id))
39
40
41try:
42 Container(Server("0.0.0.0:8888")).run()
43except KeyboardInterrupt:
44 pass
Though this requires some more extensive changes than the simple sending and receiving examples, the essence of the program is still the same. Here though, rather than the server establishing a link for the response, it relies on the link that the client established, since that now comes in directly to the server process.
Miscellaneous¶
Many brokers offer the ability to consume messages based on a ‘selector’ that defines which messages are of interest based on particular values of the headers. The following example shows how that can be achieved:
1import optparse
2from proton import Url
3from proton.reactor import Container, Selector
4from proton.handlers import MessagingHandler
5
6
7class Recv(MessagingHandler):
8 def __init__(self, url, count):
9 super(Recv, self).__init__()
10 self.url = Url(url)
11 self.expected = count
12 self.received = 0
13
14 def on_start(self, event):
15 conn = event.container.connect(self.url)
16 event.container.create_receiver(conn, self.url.path, options=Selector("colour = 'green'"))
17
18 def on_message(self, event):
19 print(event.message.body)
20 self.received += 1
21 if self.received == self.expected:
22 event.receiver.close()
23 event.connection.close()
24
25
26parser = optparse.OptionParser(usage="usage: %prog [options]")
27parser.add_option("-a", "--address", default="localhost:5672/examples",
28 help="address from which messages are received (default %default)")
29parser.add_option("-m", "--messages", type="int", default=0,
30 help="number of messages to receive; 0 receives indefinitely (default %default)")
31opts, args = parser.parse_args()
32
33try:
34 Container(Recv(opts.address, opts.messages)).run()
35except KeyboardInterrupt:
36 pass
When creating the receiver, we specify a Selector object as an option. The options argument can take a single object or a list. Another option that is sometimes of interest when using a broker is the ability to ‘browse’ the messages on a queue, rather than consuming them. This is done in AMQP by specifying a distribution mode of ‘copy’ (instead of ‘move’ which is the expected default for queues). An example of that is shown next:
1from proton.reactor import Container, Copy
2from proton.handlers import MessagingHandler
3
4
5class Recv(MessagingHandler):
6 def __init__(self):
7 super(Recv, self).__init__()
8
9 def on_start(self, event):
10 conn = event.container.connect("localhost:5672")
11 event.container.create_receiver(conn, "examples", options=Copy())
12
13 def on_message(self, event):
14 print(event.message)
15 if event.receiver.queued == 0 and event.receiver.drained:
16 event.connection.close()
17
18
19try:
20 Container(Recv()).run()
21except KeyboardInterrupt:
22 pass