A multithreaded sender and receiver.
A multithreaded sender and receiver.Requires C++11
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <queue>
#include <sstream>
#include <string>
#include <thread>
std::mutex out_lock;
#define OUT(x) do { std::lock_guard<std::mutex> l(out_lock); x; } while (false)
const std::string url_;
const std::string address_;
std::mutex lock_;
std::condition_variable sender_ready_;
std::queue<proton::message> messages_;
std::condition_variable messages_ready_;
public:
client(const std::string& url, const std::string& address) : url_(url), address_(address), work_queue_(0) {}
work_queue()->
add([=]() { sender_.
send(msg); });
}
std::unique_lock<std::mutex> l(lock_);
while (messages_.empty()) messages_ready_.wait(l);
auto msg = std::move(messages_.front());
messages_.pop();
return msg;
}
void close() {
work_queue()->add([=]() { sender_.
connection().close(); });
}
private:
std::unique_lock<std::mutex> l(lock_);
while (!work_queue_) sender_ready_.wait(l);
return work_queue_;
}
}
}
std::lock_guard<std::mutex> l(lock_);
sender_ = s;
sender_ready_.notify_all();
}
std::lock_guard<std::mutex> l(lock_);
messages_.push(msg);
messages_ready_.notify_all();
}
OUT(std::cerr << "unexpected error: " << e << std::endl);
exit(1);
}
};
int main(int argc, const char** argv) {
try {
if (argc != 4) {
std ::cerr <<
"Usage: " << argv[0] << " CONNECTION-URL AMQP-ADDRESS MESSAGE-COUNT\n"
"CONNECTION-URL: connection address, e.g.'amqp://127.0.0.1'\n"
"AMQP-ADDRESS: AMQP node address, e.g. 'examples'\n"
"MESSAGE-COUNT: number of messages to send\n";
return 1;
}
const char *url = argv[1];
const char *address = argv[2];
int n_messages = atoi(argv[3]);
client cl(url, address);
std::thread container_thread([&]() { container.run(); });
std::thread sender([&]() {
for (int i = 0; i < n_messages; ++i) {
cl.send(msg);
OUT(std::cout << "sent \"" << msg.body() << '"' << std::endl);
}
});
int received = 0;
std::thread receiver([&]() {
for (int i = 0; i < n_messages; ++i) {
auto msg = cl.receive();
OUT(std::cout << "received \"" << msg.body() << '"' << std::endl);
++received;
}
});
sender.join();
receiver.join();
cl.close();
container_thread.join();
std::cout << received << " messages sent and received" << std::endl;
return 0;
} catch (const std::exception& e) {
std::cerr << e.what() << std::endl;
}
return 1;
}
A connection to a remote AMQP peer.
Definition: connection.hpp:47
receiver open_receiver(const std::string &addr)
Open a receiver for addr on default_session().
sender open_sender(const std::string &addr)
Open a sender for addr on default_session().
A top-level container of connections, sessions, and links.
Definition: container.hpp:50
returned< connection > connect(const std::string &conn_url, const connection_options &conn_opts)
Connect to conn_url and send an open request to the remote peer.
A received message.
Definition: delivery.hpp:40
Describes an endpoint error state.
Definition: error_condition.hpp:39
class work_queue & work_queue() const
Get the work_queue for the link.
class connection connection() const
The connection that owns this link.
An AMQP message.
Definition: message.hpp:48
Handler for Proton messaging events.
Definition: messaging_handler.hpp:69
virtual void on_error(const error_condition &)
Fallback error handling.
virtual void on_connection_open(connection &)
The remote peer opened the connection: called once on initial open, and again on each successful auto...
virtual void on_message(delivery &, message &)
A message is received.
virtual void on_container_start(container &)
The container event loop is starting.
virtual void on_sender_open(sender &)
The remote peer opened the link.
A channel for sending messages.
Definition: sender.hpp:40
tracker send(const message &m)
Send a message on the sender.
Unsettled API - A context for thread-safe execution of work.
Definition: work_queue.hpp:327
bool add(work fn)
Unsettled API - Add work fn to the work queue.
A connection to a remote AMQP peer.
Options for creating a connection.
A top-level container of connections, sessions, and links.
Handler for Proton messaging events.
A channel for receiving messages.
A channel for sending messages.
Unsettled API - A context for thread-safe execution of work.