Qpid Proton C++ API  0.12.2
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Pages
Tutorial

This is a brief tutorial that will walk you through the fundamentals of building messaging applications in incremental steps.

There are further examples, in addition the ones mentioned in the tutorial.

Some of the examples require an AMQP broker that can receive, store and send messages. broker.hpp and broker.cpp define a simple example broker. Run without arguments it listens on 0.0.0.0:5672, the standard AMQP port on all network interfaces. To use a different port or network interface:

broker -a <host>:<port>

Instead of the example broker, you can use any AMQP 1.0 compliant broker. You must configure your broker to have a queue (or topic) named "examples".

The helloworld examples take an optional URL argument. The other examples take an option -a URL. A URL looks like:

HOST:PORT/ADDRESS

It usually defaults to 127.0.0.1:5672/examples, but you can change this if your broker is on a different host or port, or you want to use a different queue or topic name (the ADDRESS part of the URL). URL details are at proton::url

The first part of the tutorial uses the proton::container, later we will show some of the same examples implemented using the proton::connection_engine. Most of the code is the same for either approach.

Hello World!

Tradition dictates that we start with hello world! This example sends a message to a broker and the receives the same message back to demonstrate sending and receiving. In a realistic system the sender and receiver would normally be in different processes. The complete example is helloworld.cpp

We will include the following classes: proton::container runs an event loop which dispatches events to a proton::handler. This allows a reactive style of programming which is well suited to messaging applications. proton::url is a simple parser for the URL format mentioned above.

#include "proton/container.hpp"
#include "proton/event.hpp"
#include "proton/handler.hpp"
#include "proton/url.hpp"

We will define a class hello_world which is a subclass of proton::handler and over-rides functions to handle the events of interest in sending and receiving a message.

class hello_world : public proton::handler {
private:
public:
hello_world(const proton::url& u) : url(u) {}

on_start() is called when the event loop first starts. We handle that by establishing a connection and creating a sender and a receiver.

void on_start(proton::event &e) {
conn.open_receiver(url.path());
conn.open_sender(url.path());
}

on_sendable() is called when message can be transferred over the associated sender link to the remote peer. We create a proton::message, set the message body to "Hello World!" and send the message. Then we close the sender as we only want to send one message. Closing the sender will prevent further calls to on_sendable().

void on_sendable(proton::event &e) {
proton::message m("Hello World!");
e.sender().send(m);
e.sender().close();
}

on_message() is called when a message is received. We just print the body of the message and close the connection, as we only want one message

void on_message(proton::event &e) {
std::cout << e.message().body() << std::endl;
}

The message body is a proton::value, see the documentation for more on how to extract the message body as type-safe C++ values.

Our main function creates an instance of the hello_world handler and a proton::container using that handler. Calling proton::container::run sets things in motion and returns when we close the connection as there is nothing further to do. It may throw an exception, which will be a subclass of proton::error. That in turn is a subclass of std::exception.

int main(int argc, char **argv) {
try {
std::string url = argc > 1 ? argv[1] : "127.0.0.1:5672/examples";
hello_world hw(url);
return 0;
} catch (const std::exception& e) {
std::cerr << e.what() << std::endl;
}
return 1;
}

Hello World, Direct!

Though often used in conjunction with a broker, AMQP does not require this. It also allows senders and receivers to communicate directly if desired.

We will modify our example to send a message directly to itself. This is a bit contrived but illustrates both sides of the direct send/receive scenario. Full code at helloworld_direct.cpp

The first difference, is that rather than creating a receiver on the same connection as our sender, we listen for incoming connections by invoking the proton::container::listen() method on the container.

void on_start(proton::event &e) {
acceptor = e.container().listen(url);
}

As we only need then to initiate one link, the sender, we can do that by passing in a url rather than an existing connection, and the connection will also be automatically established for us.

We send the message in response to the on_sendable() callback and print the message out in response to the on_message() callback exactly as before.

void on_sendable(proton::event &e) {
proton::message m("Hello World!");
e.sender().send(m);
e.sender().close();
}
void on_message(proton::event &e) {
std::cout << e.message().body() << std::endl;
}

However we also handle two new events. We now close the connection from the senders side once the message has been accepted. The acceptance of the message is an indication of successful transfer to the peer. We are notified of that event through the on_delivery_accept() callback.

void on_delivery_accept(proton::event &e) {
}

Then, once the connection has been closed, of which we are notified through the on_connection_close() callback, we stop accepting incoming connections at which point there is no work to be done and the event loop exits, and the run() method will return.

void on_connection_close(proton::event &e) {
acceptor.close();
}

So now we have our example working without a broker involved!

Note that for this example we pick an "unusual" port 8888 since we are talking to ourselves rather than a broker.

std::string url = argc > 1 ? argv[1] : "127.0.0.1:8888/examples";

Asynchronous Send and Receive

Of course, these HelloWorld! examples are very artificial, communicating as they do over a network connection but with the same process. A more realistic example involves communication between separate processes (which could indeed be running on completely separate machines).

Let's separate the sender from the receiver, and transfer more than a single message between them.

We'll start with a simple sender simple_send.cpp.

As with the previous example, we define the application logic in a class that handles events. Because we are transferring more than one message, we need to keep track of how many we have sent. We'll use a sent member variable for that. The total member variable will hold the number of messages we want to send.

class simple_send : public proton::handler {
private:
int sent;
int confirmed;
int total;

As before, we use the on_start() event to establish our sender link over which we will transfer messages.

void on_start(proton::event &e) {
sender = e.container().open_sender(url);
}

AMQP defines a credit-based flow control mechanism. Flow control allows the receiver to control how many messages it is prepared to receive at a given time and thus prevents any component being overwhelmed by the number of messages it is sent.

In the on_sendable() callback, we check that our sender has credit before sending messages. We also check that we haven't already sent the required number of messages.

void on_sendable(proton::event &e) {
proton::sender sender = e.sender();
while (sender.credit() && sent < total) {
std::map<std::string, int> m;
m["sequence"] = sent + 1;
msg.id(sent + 1);
msg.body(m);
sender.send(msg);
sent++;
}
}

The proton::sender::send() call above is asynchronous. When it returns the message has not yet actually been transferred across the network to the receiver. By handling the on_accepted() event, we can get notified when the receiver has received and accepted the message. In our example we use this event to track the confirmation of the messages we have sent. We only close the connection and exit when the receiver has received all the messages we wanted to send.

void on_delivery_accept(proton::event &e) {
confirmed++;
if (confirmed == total) {
std::cout << "all messages confirmed" << std::endl;
}
}

If we are disconnected after a message is sent and before it has been confirmed by the receiver, it is said to be in doubt. We don't know whether or not it was received. In this example, we will handle that by resending any in-doubt messages. This is known as an 'at-least-once' guarantee, since each message should eventually be received at least once, though a given message may be received more than once (i.e. duplicates are possible). In the on_disconnected() callback, we reset the sent count to reflect only those that have been confirmed. The library will automatically try to reconnect for us, and when our sender is sendable again, we can restart from the point we know the receiver got to.

}

Now let's look at the corresponding receiver simple_recv.cpp

This time we'll use an expected member variable for for the number of messages we expect and a received variable to count how many we have received so far.

class simple_recv : public proton::handler {
private:
proton::receiver receiver;
uint64_t expected;
uint64_t received;

We handle on_start() by creating our receiver, much like we did for the sender.

void on_start(proton::event &e) {
receiver = e.container().open_receiver(url);
std::cout << "simple_recv listening on " << url << std::endl;
}

We also handle the on_message() event for received messages and print the message out as in the Hello World! examples. However we add some logic to allow the receiver to wait for a given number of messages, then to close the connection and exit. We also add some logic to check for and ignore duplicates, using a simple sequential id scheme.

void on_message(proton::event &e) {
if (msg.id().get<uint64_t>() < received) {
return; // Ignore duplicate
}

Direct Send and Receive

Sending between these two examples requires an intermediary broker since neither accepts incoming connections. AMQP allows us to send messages directly between two processes. In that case one or other of the processes needs to accept incoming connections. Let's create a modified version of the receiving example that does this with direct_recv.cpp

There are only two differences here. Instead of initiating a link (and implicitly a connection), we listen for incoming connections.

void on_start(proton::event &e) {
acceptor = e.container().listen(url);
std::cout << "direct_recv listening on " << url << std::endl;
}

When we have received all the expected messages, we then stop listening for incoming connections by closing the acceptor object.

void on_message(proton::event &e) {
if (msg.id().get<uint64_t>() < received) {
return; // Ignore duplicate
}
if (expected == 0 || received < expected) {
std::cout << msg.body() << std::endl;
received++;
}
if (received == expected) {
e.receiver().close();
if (!!acceptor) acceptor.close();
}
}

You can use the simple_send.cpp example to send to this receiver directly. (Note: you will need to stop any broker that is listening on the 5672 port, or else change the port used by specifying a different address to each example via the -a command line switch).

We can also modify the sender to allow the original receiver to connect to it, in direct_send.cpp. Again that just requires two modifications:

As with the modified receiver, instead of initiating establishment of a link, we listen for incoming connections.

void on_start(proton::event &e) {
acceptor = e.container().listen(url);
std::cout << "direct_send listening on " << url << std::endl;
}

When we have received confirmation of all the messages we sent, we can close the acceptor in order to exit.

void on_delivery_accept(proton::event &e) {
confirmed++;
if (confirmed == total) {
std::cout << "all messages confirmed" << std::endl;
acceptor.close();
}
}

To try this modified sender, run the original simple_recv.cpp against it.

The symmetry in the underlying AMQP that enables this is quite unique and elegant, and in reflecting this the proton API provides a flexible toolkit for implementing all sorts of interesting intermediaries (broker.hpp and broker.cpp provide a simple broker for testing purposes is an example of this).

Request/Response

A common pattern is to send a request message and expect a response message in return. AMQP has special support for this pattern. Let's have a look at a simple example. We'll start with server.cpp, the program that will process the request and send the response. Note that we are still using a broker in this example.

Our server will provide a very simple service: it will respond with the body of the request converted to uppercase.

class server : public proton::handler {
private:
typedef std::map<std::string, proton::sender> sender_map;
proton::connection connection;
sender_map senders;
public:
server(const std::string &u) : url(u) {}
connection = e.container().connect(url);
connection.open_receiver(url.path());
std::cout << "server connected to " << url << std::endl;
}
std::string to_upper(const std::string &s) {
std::string uc(s);
size_t l = uc.size();
for (size_t i=0; i<l; i++) uc[i] = std::toupper(uc[i]);
return uc;
}
std::cout << "Received " << e.message().body() << std::endl;
std::string reply_to = e.message().reply_to();
reply.address(reply_to);
reply.body(to_upper(e.message().body().get<std::string>()));
reply.correlation_id(e.message().correlation_id());
if (!senders[reply_to]) {
senders[reply_to] = connection.open_sender(reply_to);
}
senders[reply_to].send(reply);
}
};

The code here is not too different from the simple receiver example. When we receive a request in on_message however, we look at the proton::message::reply_to address and create a sender with that address for the response. We'll cache the senders incase we get further requests with the same reply_to.

Now let's create a simple client.cpp to test this service out.

Our client takes a list of strings to send as requests

client(const proton::url &u, const std::vector<std::string>& r) : url(u), requests(r) {}

Since we will be sending and receiving, we create a sender and a receiver in on_start. Our receiver has a blank address and sets the dynamic flag to true, which means we expect the remote end (broker or server) to assign a unique address for us.

void on_start(proton::event &e) {
sender = e.container().open_sender(url);
// Create a receiver with a dynamically chosen unique address.
receiver = sender.connection().open_receiver("", proton::link_options().dynamic_address(true));
}

Now a function to send the next request from our list of requests. We set the reply_to address to be the dynamically assigned address of our receiver.

void send_request() {
req.body(requests.front());
req.reply_to(receiver.remote_source().address());
sender.send(req);
}

We need to use the address assigned by the broker as the reply_to address of our requests, so we can't send them until our receiver has been set up. To do that, we add an on_link_open() method to our handler class, and if the link associated with event is the receiver, we use that as the trigger to send our first request.

void on_link_open(proton::event &e) {
if (e.link() == receiver) {
send_request();
}

When we receive a reply, we send the next request.

void on_message(proton::event &e) {
if (requests.empty()) return; // Spurious extra message!
proton::message& response = e.message();
std::cout << requests.front() << " => " << response.body() << std::endl;
requests.erase(requests.begin());
if (!requests.empty()) {
send_request();
} else {
}
}

Direct Request/Response

We can avoid the intermediary process by writing a server that accepts connections directly, server_direct.cpp. It involves the following changes to our original server:

Our server must generate a unique reply-to addresses for links from the client that request a dynamic address (previously this was done by the broker.) We use a simple counter.

std::string generate_address() {
std::ostringstream addr;
addr << "server" << address_counter++;
return addr.str();
}

Next we need to handle incoming requests for links with dynamic addresses from the client. We give the link a unique address and record it in our senders map.

void on_link_open(proton::event& e) {
proton::link link = e.link();
if (!!link.sender() && link.remote_source().dynamic()) {
link.local_source().address(generate_address());
senders[link.local_source().address()] = link.sender();
}

Note we are interested in sender links above because we are implementing the server. A receiver link created on the client corresponds to a sender link on the server.

Finally when we receive a message we look up its reply_to in our senders map and send the reply.

void on_message(proton::event &e) {
std::cout << "Received " << e.message().body() << std::endl;
std::string reply_to = e.message().reply_to();
sender_map::iterator it = senders.find(reply_to);
if (it == senders.end()) {
std::cout << "No link for reply_to: " << reply_to << std::endl;
} else {
proton::sender sender = it->second;
reply.address(reply_to);
reply.body(to_upper(e.message().body().get<std::string>()));
reply.correlation_id(e.message().correlation_id());
sender.send(reply);
}
}

Connection Engine

The proton::connection_engine is an alternative to the container. For simple applications with a single connection, its use is about the same as the the proton::container, but it allows more flexibility for multi-threaded applications or applications with unusual IO requirements.

We'll look at the engine/helloworld.cpp example step-by-step to see how it differs from the container helloworld.cpp version.

First we include the proton::io::socket_engine class, which is a proton::connection_engine that uses socket IO.

#include "proton/io.hpp"

Our hello_world class differs only in the on_start() method. Instead of calling container.connect(), we simply call proton::connection::open to open the engine's' connection:

void on_start(proton::event &e) {
e.connection().open();
e.connection().open_receiver(address_);
e.connection().open_sender(address_);
}

Our main function only differs in that it creates and runs a socket_engine instead of a container.

int main(int argc, char **argv) {
try {
proton::url url(argc > 1 ? argv[1] : "127.0.0.1:5672/examples");
hello_world hw(url.path());
return 0;
} catch (const std::exception& e) {
std::cerr << e.what() << std::endl;
}
return 1;
}