#include "options.hpp"
#include <deque>
#include <iostream>
#include <map>
#include <string>
#include <thread>
bool verbose;
#define DOUT(x) do {if (verbose) {x};} while (false)
class Queue;
class Sender;
friend class connection_handler;
std::string queue_name_;
Queue* queue_;
int pending_credit_;
public:
sender_(s), work_queue_(s.work_queue()), queue_(0), pending_credit_(0)
{
}
bool add(proton::work f) {
return work_queue_.
add(f);
}
return reinterpret_cast<Sender*
>(s.
user_data());
}
void boundQueue(Queue* q, std::string qn);
DOUT(std::cerr << "Sender: " << this << " sending\n";);
}
void unsubscribed() {
DOUT(std::cerr << "Sender: " << this << " deleting\n";);
delete this;
}
};
class Queue {
const std::string name_;
std::deque<proton::message> messages_;
typedef std::map<Sender*, int> subscriptions;
subscriptions subscriptions_;
subscriptions::iterator current_;
void tryToSend() {
DOUT(std::cerr << "Queue: " << this << " tryToSend: " << subscriptions_.size(););
size_t outOfCredit = 0;
while (!messages_.empty() && outOfCredit<subscriptions_.size()) {
if (current_==subscriptions_.end()) {
current_=subscriptions_.begin();
}
DOUT(std::cerr << "(" << current_->second << ") ";);
if (current_->second>0) {
DOUT(std::cerr << current_->first << " ";);
auto msg = messages_.front();
auto sender = current_->first;
sender->
add([=]{sender->sendMsg(msg);});
messages_.pop_front();
--current_->second;
++current_;
} else {
++outOfCredit;
}
}
DOUT(std::cerr << "\n";);
}
public:
work_queue_(c), name_(n), current_(subscriptions_.end())
{}
bool add(proton::work f) {
return work_queue_.
add(f);
}
DOUT(std::cerr << "Queue: " << this << "(" << name_ << ") queueMsg\n";);
messages_.push_back(m);
tryToSend();
}
void flow(Sender* s, int c) {
DOUT(std::cerr << "Queue: " << this << "(" << name_ << ") flow: " << c << " to " << s << "\n";);
subscriptions_[s] = c;
tryToSend();
}
void subscribe(Sender* s) {
DOUT(std::cerr << "Queue: " << this << "(" << name_ << ") subscribe Sender: " << s << "\n";);
subscriptions_[s] = 0;
}
void unsubscribe(Sender* s) {
DOUT(std::cerr << "Queue: " << this << "(" << name_ << ") unsubscribe Sender: " << s << "\n";);
if (current_ != subscriptions_.end() && current_->first==s) ++current_;
subscriptions_.erase(s);
s->add([=]{s->unsubscribed();});
}
};
if (queue_) {
auto credit = sender.
credit();
queue_->add([=]{queue_->flow(this, credit);});
} else {
pending_credit_ = sender.
credit();
}
}
if (queue_) {
queue_->add([=]{queue_->unsubscribe(this);});
} else {
}
}
void Sender::boundQueue(Queue* q, std::string qn) {
DOUT(std::cerr << "Sender: " << this << " bound to Queue: " << q <<"(" << qn << ")\n";);
queue_ = q;
queue_name_ = qn;
.handler(*this));
auto credit = pending_credit_;
q->add([=]{
q->subscribe(this);
if (credit>0) {
q->flow(this, credit);
}
});
std::cout << "sending from " << queue_name_ << std::endl;
}
class QueueManager;
friend class connection_handler;
Queue* queue_;
QueueManager& queue_manager_;
std::deque<proton::message> messages_;
auto to_address = m.
to();
if (queue_) {
messages_.push_back(m);
queueMsgs();
} else if (!to_address.empty()) {
queueMsgToNamedQueue(m, to_address);
} else {
}
}
void queueMsgs() {
DOUT(std::cerr << "Receiver: " << this << " queueing " << messages_.size() << " msgs to: " << queue_ << "\n";);
while (!messages_.empty()) {
auto msg = messages_.front();
queue_->add([=]{queue_->queueMsg(msg);});
messages_.pop_front();
}
}
public:
receiver_(r), work_queue_(r.work_queue()), queue_(0), queue_manager_(qm)
{}
bool add(proton::work f) {
return work_queue_.
add(f);
}
void boundQueue(Queue* q, std::string qn) {
DOUT(std::cerr << "Receiver: " << this << " bound to Queue: " << q << "(" << qn << ")\n";);
queue_ = q;
.handler(*this));
std::cout << "receiving to " << qn << std::endl;
queueMsgs();
}
};
class QueueManager {
typedef std::map<std::string, Queue*> queues;
queues queues_;
int next_id_;
public:
container_(c), work_queue_(c), next_id_(0)
{}
bool add(proton::work f) {
return work_queue_.
add(f);
}
template <class T>
void findQueue(T& connection, std::string& qn) {
if (qn.empty()) {
std::ostringstream os;
os << "_dynamic_" << next_id_++;
qn = os.str();
}
Queue* q = 0;
auto i = queues_.find(qn);
if (i==queues_.end()) {
q = new Queue(container_, qn);
queues_[qn] = q;
} else {
q = i->second;
}
connection.add([=, &connection] {connection.boundQueue(q, qn);});
}
Queue* q = 0;
auto i = queues_.find(address);
if (i==queues_.end()) {
q = new Queue(container_, address);
queues_[address] = q;
} else {
q = i->second;
}
q->add([=] {q->queueMsg(m);});
}
void findQueueSender(Sender* s, std::string qn) {
findQueue(*s, qn);
}
void findQueueReceiver(Receiver* r, std::string qn) {
findQueue(*r, qn);
}
};
void Receiver::queueMsgToNamedQueue(
proton::message& m, std::string address) {
DOUT(std::cerr << "Receiver: " << this << " send msg to Queue: " << address << "\n";);
queue_manager_.add([=]{queue_manager_.queueMessage(m, address);});
}
QueueManager& queue_manager_;
public:
connection_handler(QueueManager& qm) :
queue_manager_(qm)
{}
}
Sender* s = new Sender(sender);
queue_manager_.add([=]{queue_manager_.findQueueSender(s, qn);});
}
Receiver* r = new Receiver(receiver, queue_manager_);
if (qname.empty()) {
} else {
queue_manager_.add([=]{queue_manager_.findQueueReceiver(r, qname);});
}
}
for (proton::sender_iterator i = session.
senders().begin(); i != session.
senders().end(); ++i) {
Sender* s = Sender::get(*i);
if (s && s->queue_) {
auto q = s->queue_;
s->queue_->add([=]{q->unsubscribe(s);});
}
}
}
std::cout <<
"protocol error: " << e.
what() << std::endl;
}
Sender* s = Sender::get(*i);
if (s && s->queue_) {
auto q = s->queue_;
s->queue_->add([=]{q->unsubscribe(s);});
}
}
delete this;
}
};
class broker {
public:
broker(const std::string addr) :
container_("broker"), queues_(container_), listener_(queues_)
{
container_.
listen(addr, listener_);
}
void run() {
container_.
run(std::thread::hardware_concurrency());
}
private:
listener(QueueManager& c) : queues_(c) {}
}
std::cout <<
"broker listening on " << l.
port() << std::endl;
}
std::cerr << "listen error: " << s << std::endl;
throw std::runtime_error(s);
}
QueueManager& queues_;
};
QueueManager queues_;
listener listener_;
};
int main(int argc, char **argv) {
std::string address("0.0.0.0");
example::options opts(argc, argv);
opts.add_flag(verbose, 'v', "verbose", "verbose (debugging) output");
opts.add_value(address, 'a', "address", "listen on URL", "URL");
try {
verbose = false;
opts.parse();
broker(address).run();
return 0;
} catch (const example::bad_option& e) {
std::cout << opts << std::endl << e.what() << std::endl;
} catch (const std::exception& e) {
std::cout << "broker shutdown: " << e.what() << std::endl;
}
return 1;
}
Options for creating a connection.
Definition connection_options.hpp:67
connection_options & handler(class messaging_handler &)
Set a connection handler.
connection_options & offered_capabilities(const std::vector< symbol > &)
Unsettled API - Extension capabilities offered to the remote peer.
A connection to a remote AMQP peer.
Definition connection.hpp:47
sender_range senders() const
Return all senders on this connection.
void open()
Open the connection.
A top-level container of connections, sessions, and links.
Definition container.hpp:50
void run()
Run the container in the current thread.
listener listen(const std::string &listen_url, listen_handler &handler)
Listen for new connections on listen_url.
A received message.
Definition delivery.hpp:40
void reject()
Settle with REJECTED state.
Describes an endpoint error state.
Definition error_condition.hpp:39
std::string what() const
Simple printable string for condition.
void close()
Close the endpoint.
void user_data(void *user_data) const
Set user data on this link.
int credit() const
Credit available on the link.
Unsettled API - A handler for incoming connections.
Definition listen_handler.hpp:39
A listener for incoming connections.
Definition listener.hpp:33
int port()
Unsettedled API
An AMQP message.
Definition message.hpp:48
void to(const std::string &)
Set the destination address.
Handler for Proton messaging events.
Definition messaging_handler.hpp:69
virtual void on_error(const error_condition &)
Fallback error handling.
virtual void on_connection_open(connection &)
The remote peer opened the connection: called once on initial open, and again on each successful auto...
virtual void on_message(delivery &, message &)
A message is received.
virtual void on_receiver_open(receiver &)
The remote peer opened the link.
virtual void on_sendable(sender &)
A message can be sent.
virtual void on_transport_close(transport &)
The final event for a connection: there will be no more reconnect attempts and no more event function...
virtual void on_session_close(session &)
The remote peer closed the session.
virtual void on_sender_close(sender &)
The remote peer closed the link.
virtual void on_sender_open(sender &)
The remote peer opened the link.
Options for creating a receiver.
Definition receiver_options.hpp:59
receiver_options & handler(class messaging_handler &)
Set a messaging_handler for receiver events only.
A channel for receiving messages.
Definition receiver.hpp:41
class target target() const
Get the target node.
void open()
Open the receiver.
Options for creating a sender.
Definition sender_options.hpp:60
A channel for sending messages.
Definition sender.hpp:40
tracker send(const message &m)
Send a message on the sender.
class source source() const
Get the source node.
void open()
Open the sender.
A container of senders and receivers.
Definition session.hpp:42
sender_range senders() const
Return the senders on this session.
Options for creating a source node for a sender or receiver.
Definition source_options.hpp:46
std::string address() const
The address of the source.
std::string address() const
The address of the target.
bool dynamic() const
True if the remote node is created dynamically.
A network channel supporting an AMQP connection.
Definition transport.hpp:37
class connection connection() const
Get the connection associated with this transport.
Unsettled API - A context for thread-safe execution of work.
Definition work_queue.hpp:327
bool add(work fn)
Unsettled API - Add work fn to the work queue.
A connection to a remote AMQP peer.
Options for creating a connection.
A top-level container of connections, sessions, and links.
Describes an endpoint error state.
Unsettled API - A handler for incoming connections.
A listener for incoming connections.
Handler for Proton messaging events.
T get(const scalar &s)
Get a contained value of type T.
Definition scalar.hpp:60
Options for creating a receiver.
Options for creating a sender.
Options for creating a source node for a sender or receiver.
A destination for messages.
Options for creating a target node for a sender or receiver.
A tracker for a sent message.
A network channel supporting an AMQP connection.
Unsettled API - A context for thread-safe execution of work.