A multithreaded sender and receiver enhanced for flow control.Requires C++11
 
 
 
#include <atomic>
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <queue>
#include <sstream>
#include <string>
#include <thread>
 
std::mutex out_lock;
#define OUT(x) do { std::lock_guard<std::mutex> l(out_lock); x; } while (false)
 
class closed : public std::runtime_error {
  public:
    closed(const std::string& msg) : std::runtime_error(msg) {}
};
 
    
 
    
    std::mutex lock_;
    std::condition_variable sender_ready_;
    int queued_;                       
    int credit_;                       
 
  public:
        : work_queue_(0), queued_(0), credit_(0)
    {
    }
 
    
        {
            std::unique_lock<std::mutex> l(lock_);
            
            while (!work_queue_ || queued_ >= credit_) sender_ready_.wait(l);
            ++queued_;
        }
        work_queue_->
add([=]() { this->do_send(m); }); 
    }
 
    
    void close() {
        work_queue()->add([=]() { sender_.
connection().close(); });
    }
 
  private:
 
        
        std::unique_lock<std::mutex> l(lock_);
        while (!work_queue_) sender_ready_.wait(l);
        return work_queue_;
    }
 
    
 
        
        std::lock_guard<std::mutex> l(lock_);
        sender_ = s;
    }
 
        std::lock_guard<std::mutex> l(lock_);
        sender_ready_.notify_all(); 
    }
 
    
    
        std::lock_guard<std::mutex> l(lock_);
        --queued_;                    
        sender_ready_.notify_all();       
    }
 
        OUT(std::cerr << "unexpected error: " << e << std::endl);
        exit(1);
    }
};
 
    static const size_t MAX_BUFFER = 100; 
 
    
 
    
    std::mutex lock_;
    std::queue<proton::message> buffer_; 
    std::condition_variable can_receive_; 
    bool closed_;
 
  public:
 
    
    receiver(
proton::container& cont, 
const std::string& url, 
const std::string& address)
        : work_queue_(0), closed_(false)
    {
        
        
    }
 
    
        std::unique_lock<std::mutex> l(lock_);
        
        while (!closed_ && (!work_queue_ || buffer_.empty())) {
            can_receive_.wait(l);
        }
        if (closed_) throw closed("receiver closed");
        buffer_.pop();
        
        
        work_queue_->
add([=]() { this->receive_done(); });
        return m;
    }
 
    
    void close() {
        std::lock_guard<std::mutex> l(lock_);
        if (!closed_) {
            closed_ = true;
            can_receive_.notify_all();
            if (work_queue_) {
                work_queue_->
add([
this]() { this->receiver_.
connection().close(); });
            }
        }
    }
 
  private:
    
 
        receiver_ = r;
        std::lock_guard<std::mutex> l(lock_);
    }
 
        
        std::lock_guard<std::mutex> l(lock_);
        buffer_.push(m);
        can_receive_.notify_all();
    }
 
    
    void receive_done() {
        
    }
 
        OUT(std::cerr << "unexpected error: " << e << std::endl);
        exit(1);
    }
};
 
 
void send_thread(sender& s, int n) {
    auto id = std::this_thread::get_id();
    for (int i = 0; i < n; ++i) {
        std::ostringstream ss;
        ss << std::this_thread::get_id() << "-" << i;
        OUT(std::cout << id << " sent \"" << ss.str() << '"' << std::endl);
    }
    OUT(std::cout << id << " sent " << n << std::endl);
}
 
void receive_thread(receiver& r, std::atomic_int& remaining) {
    try {
        auto id = std::this_thread::get_id();
        int n = 0;
        
        
        
        while (remaining-- > 0) {
            auto m = r.receive();
            ++n;
            OUT(std::cout << 
id << 
" received \"" << m.
body() << 
'"' << std::endl);
        }
        OUT(std::cout << id << " received " << n << " messages" << std::endl);
    } catch (const closed&) {}
}
 
int main(int argc, const char **argv) {
    try {
        if (argc != 5) {
            std::cerr <<
                "Usage: " << argv[0] << " CONNECTION-URL AMQP-ADDRESS MESSAGE-COUNT THREAD-COUNT\n"
                "CONNECTION-URL: connection address, e.g.'amqp://127.0.0.1'\n"
                "AMQP-ADDRESS: AMQP node address, e.g. 'examples'\n"
                "MESSAGE-COUNT: number of messages to send\n"
                "THREAD-COUNT: number of sender/receiver thread pairs\n";
            return 1;
        }
 
        const char *url = argv[1];
        const char *address = argv[2];
        int n_messages = atoi(argv[3]);
        int n_threads = atoi(argv[4]);
        int count = n_messages * n_threads;
 
        
        std::atomic_int remaining;
        remaining.store(count);
 
        
        auto container_thread = std::thread([&]() { container.
run(); });
 
 
        
        sender send(container, url, address);
        receiver recv(container, url, address);
 
        
        
        std::vector<std::thread> threads;
        threads.reserve(n_threads*2); 
        for (int i = 0; i < n_threads; ++i)
            threads.push_back(std::thread([&]() { receive_thread(recv, remaining); }));
        for (int i = 0; i < n_threads; ++i)
            threads.push_back(std::thread([&]() { send_thread(send, n_messages); }));
 
        
        for (auto& t : threads) t.join();
        send.close();
        recv.close();
        container_thread.join();
        if (remaining > 0)
            throw std::runtime_error("not all messages were received");
        std::cout << count << " messages sent and received" << std::endl;
 
        return 0;
    } catch (const std::exception& e) {
        std::cerr << e.what() << std::endl;
    }
    return 1;
}
Options for creating a connection.
Definition: connection_options.hpp:67
A top-level container of connections, sessions, and links.
Definition: container.hpp:49
void run()
Run the container in the current thread.
returned< receiver > open_receiver(const std::string &addr_url)
Open a connection and receiver for addr_url.
returned< sender > open_sender(const std::string &addr_url)
Open a connection and sender for addr_url.
A received message.
Definition: delivery.hpp:40
Describes an endpoint error state.
Definition: error_condition.hpp:39
class work_queue & work_queue() const
Get the work_queue for the link.
int credit() const
Credit available on the link.
class connection connection() const
The connection that owns this link.
An AMQP message.
Definition: message.hpp:50
void body(const value &x)
Set the body. Equivalent to body() = x.
Handler for Proton messaging events.
Definition: messaging_handler.hpp:69
virtual void on_error(const error_condition &)
Fallback error handling.
virtual void on_message(delivery &, message &)
A message is received.
virtual void on_receiver_open(receiver &)
The remote peer opened the link.
virtual void on_sendable(sender &)
A message can be sent.
virtual void on_sender_open(sender &)
The remote peer opened the link.
Options for creating a receiver.
Definition: receiver_options.hpp:59
A channel for receiving messages.
Definition: receiver.hpp:41
void add_credit(uint32_t)
Increment the credit available to the sender.
A channel for sending messages.
Definition: sender.hpp:40
tracker send(const message &m)
Send a message on the sender.
Unsettled API - A context for thread-safe execution of work.
Definition: work_queue.hpp:327
bool add(work fn)
Unsettled API - Add work fn to the work queue.
A connection to a remote AMQP peer.
Options for creating a connection.
A top-level container of connections, sessions, and links.
Handler for Proton messaging events.
A channel for receiving messages.
Options for creating a receiver.
A channel for sending messages.
Unsettled API - A context for thread-safe execution of work.