A multithreaded sender and receiver enhanced for flow control.Requires C++11
#include <atomic>
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <queue>
#include <sstream>
#include <string>
#include <thread>
std::mutex out_lock;
#define OUT(x) do { std::lock_guard<std::mutex> l(out_lock); x; } while (false)
class closed : public std::runtime_error {
public:
closed(const std::string& msg) : std::runtime_error(msg) {}
};
std::mutex lock_;
std::condition_variable sender_ready_;
int queued_;
int credit_;
public:
: work_queue_(0), queued_(0), credit_(0)
{
}
{
std::unique_lock<std::mutex> l(lock_);
while (!work_queue_ || queued_ >= credit_) sender_ready_.wait(l);
++queued_;
}
work_queue_->
add([=]() { this->do_send(m); });
}
void close() {
work_queue()->add([=]() { sender_.
connection().close(); });
}
private:
std::unique_lock<std::mutex> l(lock_);
while (!work_queue_) sender_ready_.wait(l);
return work_queue_;
}
std::lock_guard<std::mutex> l(lock_);
sender_ = s;
}
std::lock_guard<std::mutex> l(lock_);
sender_ready_.notify_all();
}
std::lock_guard<std::mutex> l(lock_);
--queued_;
sender_ready_.notify_all();
}
OUT(std::cerr << "unexpected error: " << e << std::endl);
exit(1);
}
};
static const size_t MAX_BUFFER = 100;
std::mutex lock_;
std::queue<proton::message> buffer_;
std::condition_variable can_receive_;
bool closed_;
public:
receiver(
proton::container& cont,
const std::string& url,
const std::string& address)
: work_queue_(0), closed_(false)
{
}
std::unique_lock<std::mutex> l(lock_);
while (!closed_ && (!work_queue_ || buffer_.empty())) {
can_receive_.wait(l);
}
if (closed_) throw closed("receiver closed");
buffer_.pop();
work_queue_->
add([=]() { this->receive_done(); });
return m;
}
void close() {
std::lock_guard<std::mutex> l(lock_);
if (!closed_) {
closed_ = true;
can_receive_.notify_all();
if (work_queue_) {
work_queue_->
add([
this]() { this->receiver_.
connection().close(); });
}
}
}
private:
receiver_ = r;
std::lock_guard<std::mutex> l(lock_);
}
std::lock_guard<std::mutex> l(lock_);
buffer_.push(m);
can_receive_.notify_all();
}
void receive_done() {
}
OUT(std::cerr << "unexpected error: " << e << std::endl);
exit(1);
}
};
void send_thread(sender& s, int n) {
auto id = std::this_thread::get_id();
for (int i = 0; i < n; ++i) {
std::ostringstream ss;
ss << std::this_thread::get_id() << "-" << i;
OUT(std::cout << id << " sent \"" << ss.str() << '"' << std::endl);
}
OUT(std::cout << id << " sent " << n << std::endl);
}
void receive_thread(receiver& r, std::atomic_int& remaining) {
try {
auto id = std::this_thread::get_id();
int n = 0;
while (remaining-- > 0) {
auto m = r.receive();
++n;
OUT(std::cout <<
id <<
" received \"" << m.
body() <<
'"' << std::endl);
}
OUT(std::cout << id << " received " << n << " messages" << std::endl);
} catch (const closed&) {}
}
int main(int argc, const char **argv) {
try {
if (argc != 5) {
std::cerr <<
"Usage: " << argv[0] << " CONNECTION-URL AMQP-ADDRESS MESSAGE-COUNT THREAD-COUNT\n"
"CONNECTION-URL: connection address, e.g.'amqp://127.0.0.1'\n"
"AMQP-ADDRESS: AMQP node address, e.g. 'examples'\n"
"MESSAGE-COUNT: number of messages to send\n"
"THREAD-COUNT: number of sender/receiver thread pairs\n";
return 1;
}
const char *url = argv[1];
const char *address = argv[2];
int n_messages = atoi(argv[3]);
int n_threads = atoi(argv[4]);
int count = n_messages * n_threads;
std::atomic_int remaining;
remaining.store(count);
auto container_thread = std::thread([&]() { container.
run(); });
sender send(container, url, address);
receiver recv(container, url, address);
std::vector<std::thread> threads;
threads.reserve(n_threads*2);
for (int i = 0; i < n_threads; ++i)
threads.push_back(std::thread([&]() { receive_thread(recv, remaining); }));
for (int i = 0; i < n_threads; ++i)
threads.push_back(std::thread([&]() { send_thread(send, n_messages); }));
for (auto& t : threads) t.join();
send.close();
recv.close();
container_thread.join();
if (remaining > 0)
throw std::runtime_error("not all messages were received");
std::cout << count << " messages sent and received" << std::endl;
return 0;
} catch (const std::exception& e) {
std::cerr << e.what() << std::endl;
}
return 1;
}
Options for creating a connection.
Definition: connection_options.hpp:67
A top-level container of connections, sessions, and links.
Definition: container.hpp:49
void run()
Run the container in the current thread.
returned< receiver > open_receiver(const std::string &addr_url)
Open a connection and receiver for addr_url.
returned< sender > open_sender(const std::string &addr_url)
Open a connection and sender for addr_url.
A received message.
Definition: delivery.hpp:40
Describes an endpoint error state.
Definition: error_condition.hpp:39
class work_queue & work_queue() const
Get the work_queue for the link.
int credit() const
Credit available on the link.
class connection connection() const
The connection that owns this link.
An AMQP message.
Definition: message.hpp:48
void body(const value &x)
Set the body. Equivalent to body() = x.
Handler for Proton messaging events.
Definition: messaging_handler.hpp:69
virtual void on_error(const error_condition &)
Fallback error handling.
virtual void on_message(delivery &, message &)
A message is received.
virtual void on_receiver_open(receiver &)
The remote peer opened the link.
virtual void on_sendable(sender &)
A message can be sent.
virtual void on_sender_open(sender &)
The remote peer opened the link.
Options for creating a receiver.
Definition: receiver_options.hpp:59
A channel for receiving messages.
Definition: receiver.hpp:41
void add_credit(uint32_t)
Increment the credit available to the sender.
A channel for sending messages.
Definition: sender.hpp:40
tracker send(const message &m)
Send a message on the sender.
Unsettled API - A context for thread-safe execution of work.
Definition: work_queue.hpp:327
bool add(work fn)
Unsettled API - Add work fn to the work queue.
A connection to a remote AMQP peer.
Options for creating a connection.
A top-level container of connections, sessions, and links.
Handler for Proton messaging events.
A channel for receiving messages.
Options for creating a receiver.
A channel for sending messages.
Unsettled API - A context for thread-safe execution of work.