This is a brief guide to to the fundamentals of building messaging applications using Qpid Proton C++.
Proton provides an "event-driven" programming model, where you implement a subclass of proton::messaging_handler
and override functions that react to various AMQP events (connections opening and closing, messages being delivered, and so on).
The examples below show how to implement handlers for clients and servers and how to run them using the proton::container
, a portable, easy-to-use way to build single-threaded clients or servers.
Some of the examples require an AMQP broker that can receive, store, and send messages. broker.cpp defines a simple example broker. If run without arguments, it listens on 0.0.0.0:5672
, the standard AMQP port on all network interfaces. To use a different port or network interface, use the -a
option.
broker -a <host>:<port>
Instead of the example broker, you can use any AMQP 1.0-compliant broker. You must configure your broker to have a queue (or topic) named "examples".
The helloworld
examples take an optional URL argument. The other examples take an option -a URL
. A URL looks like this:
HOST:PORT/ADDRESS
It usually defaults to 127.0.0.1:5672/examples
, but you can change this if your broker is on a different host or port, or you want to use a different queue or topic name (the ADDRESS part of the URL).
Tradition dictates that we start with Hello World! This example sends a message to a broker and then receives the same message back. In a realistic system the sender and receiver would normally be in different processes. The complete example is helloworld.cpp
We will include the following classes. proton::container
runs an event loop which dispatches events to a proton::messaging_handler
. This allows a reactive style of programming which is well suited to messaging applications. proton::connection
and proton::delivery
are AMQP entities used in the handler functions. proton::url
is a simple parser for the URL format mentioned above.
We will define a class hello_world
which is a subclass of proton::messaging_handler
and overrides functions to handle the events of interest in sending and receiving a message.
proton::messaging_handler::on_container_start()
is called when the event loop first starts. We handle that by establishing a connection and creating a sender and a receiver.
proton::messaging_handler::on_sendable()
is called when the message can be transferred over the associated sender link to the remote peer. We create a proton::message
, set the message body to "Hello
World!"
, and send the message. Then we close the sender, since we only want to send one message. Closing the sender will prevent further calls to proton::messaging_handler::on_sendable()
.
proton::messaging_handler::on_message()
is called when a message is received. We just print the body of the message and close the connection, as we only want one message.
The message body is a proton::value
. See AMQP and C++ types for more on how to extract the message body as type-safe C++ values.
Our main
function creates an instance of the hello_world
handler and a proton::container
using that handler. Calling proton::container::run()
sets things in motion and returns when we close the connection. It may throw an exception, which will be a subclass of proton::error
. That in turn is a subclass of std::exception
.
Of course, these Hello World!
examples are very artificial, communicating as they do over a network connection but with the same process. A more realistic example involves communication between separate processes, which could indeed be running on completely separate machines.
Let's separate the sender from the receiver, and transfer more than a single message between them.
We'll start with a simple sender, simple_send.cpp.
As with the previous example, we define the application logic in a class that handles events. Because we are transferring more than one message, we need to keep track of how many we have sent. We'll use a sent
member variable for that. The total
member variable will hold the number of messages we want to send.
As before, we use the proton::messaging_handler::on_container_start()
event to establish our sender link over which we will transfer messages.
AMQP defines a credit-based flow-control mechanism. Flow control allows the receiver to control how many messages it is prepared to receive at a given time and thus prevents any component being overwhelmed by the number of messages it is sent.
In the proton::messaging_handler::on_sendable()
callback, we check that our sender has credit before sending messages. We also check that we haven't already sent the required number of messages.
The proton::sender::send()
call above is asynchronous. When it returns, the message has not yet actually been transferred across the network to the receiver. By handling the proton::messaging_handler::on_tracker_accept()
event, we can get notified when the receiver has received and accepted the message. In our example we use this event to track the confirmation of the messages we have sent. We only close the connection and exit when the receiver has received all the messages we wanted to send.
If we are disconnected after a message is sent and before it has been confirmed by the receiver, it is said to be "in doubt". We don't know whether or not it was received. In this example, we will handle that by resending any in-doubt messages. This is known as an "at-least-once" guarantee, since each message should eventually be received at least once, though a given message may be received more than once (i.e., duplicates are possible). In the proton::messaging_handler::on_transport_close()
callback, we reset the sent count to reflect only those that have been confirmed. The library will automatically try to reconnect for us, and when our sender is sendable again, we can restart from the point we know the receiver got to.
Now let's look at the corresponding receiver, simple_recv.cpp.
This time we'll use an expected
member variable for for the number of messages we expect and a received
variable to count how many we have received so far.
We handle proton::messaging_handler::on_container_start()
by creating our receiver, much like we did for the sender.
We also handle the proton::messaging_handler::on_message()
event for received messages and print the message out as in the Hello World!
examples. However, we add some logic to allow the receiver to wait for a given number of messages and then close the connection and exit. We also add some logic to check for and ignore duplicates, using a simple sequential ID scheme.
Sending between these two examples requires an intermediary broker since neither accepts incoming connections. AMQP allows us to send messages directly between two processes. In that case, one or other of the processes needs to accept incoming connections. Let's create a modified version of the receiving example that does this with direct_recv.cpp.
There are only two differences here. Instead of initiating a link (and implicitly a connection), we listen for incoming connections.
When we have received all the expected messages, we then stop listening for incoming connections by calling proton::listener::stop()
You can use the simple_send.cpp example to send to this receiver directly. (Note: you will need to stop any broker that is listening on the 5672 port, or else change the port used by specifying a different address to each example via the -a
command-line switch).
We can also modify the sender to allow the original receiver to connect to it, in direct_send.cpp. Again, that requires just two modifications:
As with the modified receiver, instead of initiating establishment of a link, we listen for incoming connections.
When we have received confirmation of all the messages we sent, we call container::listener::stop()
to exit.
To try this modified sender, run the original simple_recv.cpp against it.
The symmetry in the underlying AMQP wire protocol that enables this is quite unique and elegant, and in reflecting this the Proton API provides a flexible toolkit for implementing all sorts of interesting intermediaries.
A common pattern is to send a request message and expect a response message in return. AMQP has special support for this pattern. Let's have a look at a simple example. We'll start with server.cpp, the program that will process the request and send the response. Note that we are still using a broker in this example.
Our server will provide a very simple service: it will respond with the body of the request converted to uppercase.
The code here is not too different from the simple receiver example. However, when we receive a request in proton::messaging_handler::on_message
, we look at the proton::message::reply_to
address and create a sender with that address for the response. We'll cache the senders in case we get further requests with the same reply_to
.
Now let's create a simple client.cpp to test this service out.
Our client takes a list of strings to send as requests.
Since we will be sending and receiving, we create a sender and a receiver in proton::messaging_handler::on_container_start
. Our receiver has a blank address and sets the dynamic
flag to true, which means we expect the remote end (the broker or server) to assign a unique address for us.
Now we need a function to send the next request from our list of requests. We set the reply_to
address to be the dynamically assigned address of our receiver.
We need to use the address assigned by the broker as the reply_to
address of our requests, so we can't send them until our receiver has been set up. To do that, we add an proton::messaging_handler::on_receiver_open()
method to our handler class and use that as the trigger to send our first request.
When we receive a reply, we send the next request.
We can avoid the intermediary process by writing a server that accepts connections directly, server_direct.cpp. It involves the following changes to our original server:
Our server must generate unique reply-to
addresses for links from the client that request a dynamic address (previously this was done by the broker). We use a simple counter.
Next we need to handle incoming requests for links with dynamic addresses from the client. We give the link a unique address and record it in our senders
map.
Note that we are interested in sender links above because we are implementing the server. A receiver link created on the client corresponds to a sender link on the server.
Finally when we receive a message we look up its reply_to
in our senders map and send the reply.