A multithreaded sender and receiver.Requires C++11
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <queue>
#include <sstream>
#include <string>
#include <thread>
std::mutex out_lock;
#define OUT(x) do { std::lock_guard<std::mutex> l(out_lock); x; } while (false)
const std::string url_;
const std::string address_;
std::mutex lock_;
std::condition_variable sender_ready_;
std::queue<proton::message> messages_;
std::condition_variable messages_ready_;
public:
client(const std::string& url, const std::string& address) : url_(url), address_(address), work_queue_(0) {}
work_queue()->
add([=]() { sender_.
send(msg); });
}
std::unique_lock<std::mutex> l(lock_);
while (messages_.empty()) messages_ready_.wait(l);
auto msg = std::move(messages_.front());
messages_.pop();
return msg;
}
void close() {
work_queue()->add([=]() { sender_.
connection().close(); });
}
private:
std::unique_lock<std::mutex> l(lock_);
while (!work_queue_) sender_ready_.wait(l);
return work_queue_;
}
}
}
std::lock_guard<std::mutex> l(lock_);
sender_ = s;
sender_ready_.notify_all();
}
std::lock_guard<std::mutex> l(lock_);
messages_.push(msg);
messages_ready_.notify_all();
}
OUT(std::cerr << "unexpected error: " << e << std::endl);
exit(1);
}
};
int main(int argc, const char** argv) {
try {
if (argc != 4) {
std ::cerr <<
"Usage: " << argv[0] << " CONNECTION-URL AMQP-ADDRESS MESSAGE-COUNT\n"
"CONNECTION-URL: connection address, e.g.'amqp://127.0.0.1'\n"
"AMQP-ADDRESS: AMQP node address, e.g. 'examples'\n"
"MESSAGE-COUNT: number of messages to send\n";
return 1;
}
const char *url = argv[1];
const char *address = argv[2];
int n_messages = atoi(argv[3]);
client cl(url, address);
std::thread container_thread([&]() { container.run(); });
std::thread sender([&]() {
for (int i = 0; i < n_messages; ++i) {
cl.send(msg);
OUT(std::cout << "sent \"" << msg.body() << '"' << std::endl);
}
});
int received = 0;
std::thread receiver([&]() {
for (int i = 0; i < n_messages; ++i) {
auto msg = cl.receive();
OUT(std::cout << "received \"" << msg.body() << '"' << std::endl);
++received;
}
});
sender.join();
receiver.join();
cl.close();
container_thread.join();
std::cout << received << " messages sent and received" << std::endl;
return 0;
} catch (const std::exception& e) {
std::cerr << e.what() << std::endl;
}
return 1;
}