A multithreaded sender and receiver.Requires C++11
 
 
 
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <queue>
#include <sstream>
#include <string>
#include <thread>
 
std::mutex out_lock;
#define OUT(x) do { std::lock_guard<std::mutex> l(out_lock); x; } while (false)
 
    
    const std::string url_;
    const std::string address_;
 
    
 
    
    std::mutex lock_;
    std::condition_variable sender_ready_;
    std::queue<proton::message> messages_;
    std::condition_variable messages_ready_;
 
  public:
    client(const std::string& url, const std::string& address) : url_(url), address_(address), work_queue_(0) {}
 
    
        
        
        work_queue()->
add([=]() { sender_.
send(msg); });
    }
 
    
        std::unique_lock<std::mutex> l(lock_);
        while (messages_.empty()) messages_ready_.wait(l);
        auto msg = std::move(messages_.front());
        messages_.pop();
        return msg;
    }
 
    
    void close() {
        work_queue()->add([=]() { sender_.
connection().close(); });
    }
 
  private:
 
        
        std::unique_lock<std::mutex> l(lock_);
        while (!work_queue_) sender_ready_.wait(l);
        return work_queue_;
    }
 
    
 
    
    
    
    
    }
 
    }
 
        
        std::lock_guard<std::mutex> l(lock_);
        sender_ = s;
        sender_ready_.notify_all();
    }
 
        std::lock_guard<std::mutex> l(lock_);
        messages_.push(msg);
        messages_ready_.notify_all();
    }
 
        OUT(std::cerr << "unexpected error: " << e << std::endl);
        exit(1);
    }
};
 
int main(int argc, const char** argv) {
    try {
        if (argc != 4) {
            std ::cerr <<
                "Usage: " << argv[0] << " CONNECTION-URL AMQP-ADDRESS MESSAGE-COUNT\n"
                "CONNECTION-URL: connection address, e.g.'amqp://127.0.0.1'\n"
                "AMQP-ADDRESS: AMQP node address, e.g. 'examples'\n"
                "MESSAGE-COUNT: number of messages to send\n";
            return 1;
        }
        const char *url = argv[1];
        const char *address = argv[2];
        int n_messages = atoi(argv[3]);
 
        client cl(url, address);
        std::thread container_thread([&]() { container.run(); });
 
        std::thread sender([&]() {
                for (int i = 0; i < n_messages; ++i) {
                    cl.send(msg);
                    OUT(std::cout << "sent \"" << msg.body() << '"' << std::endl);
                }
            });
 
        int received = 0;
        std::thread receiver([&]() {
                for (int i = 0; i < n_messages; ++i) {
                    auto msg = cl.receive();
                    OUT(std::cout << "received \"" << msg.body() << '"' << std::endl);
                    ++received;
                }
            });
 
        sender.join();
        receiver.join();
        cl.close();
        container_thread.join();
        std::cout << received << " messages sent and received" << std::endl;
 
        return 0;
    } catch (const std::exception& e) {
        std::cerr << e.what() << std::endl;
    }
 
    return 1;
}
A connection to a remote AMQP peer.
Definition: connection.hpp:47
receiver open_receiver(const std::string &addr)
Open a receiver for addr on default_session().
sender open_sender(const std::string &addr)
Open a sender for addr on default_session().
A top-level container of connections, sessions, and links.
Definition: container.hpp:49
returned< connection > connect(const std::string &conn_url, const connection_options &conn_opts)
Connect to conn_url and send an open request to the remote peer.
A received message.
Definition: delivery.hpp:40
Describes an endpoint error state.
Definition: error_condition.hpp:39
class work_queue & work_queue() const
Get the work_queue for the link.
class connection connection() const
The connection that owns this link.
An AMQP message.
Definition: message.hpp:50
Handler for Proton messaging events.
Definition: messaging_handler.hpp:69
virtual void on_error(const error_condition &)
Fallback error handling.
virtual void on_connection_open(connection &)
The remote peer opened the connection: called once on initial open, and again on each successful auto...
virtual void on_message(delivery &, message &)
A message is received.
virtual void on_container_start(container &)
The container event loop is starting.
virtual void on_sender_open(sender &)
The remote peer opened the link.
A channel for sending messages.
Definition: sender.hpp:40
tracker send(const message &m)
Send a message on the sender.
Unsettled API - A context for thread-safe execution of work.
Definition: work_queue.hpp:327
bool add(work fn)
Unsettled API - Add work fn to the work queue.
A connection to a remote AMQP peer.
Options for creating a connection.
A top-level container of connections, sessions, and links.
Handler for Proton messaging events.
std::string to_string(const message &)
Human readable string representation.
A channel for receiving messages.
A channel for sending messages.
Unsettled API - A context for thread-safe execution of work.