#include "options.hpp"
#include <proton/connection.hpp>
#include <proton/connection_options.hpp>
#include <proton/container.hpp>
#include <proton/delivery.hpp>
#include <proton/link.hpp>
#include <proton/message.hpp>
#include <proton/message_id.hpp>
#include <proton/messaging_handler.hpp>
#include <proton/reconnect_options.hpp>
#include <proton/value.hpp>
#include <proton/types.hpp>
#include <iostream>
#include <map>
#include <string>
class reconnect_client : public proton::messaging_handler {
std::string url;
std::string address;
std::vector<std::string> failovers;
proton::sender sender;
int sent;
int expected;
int received;
public:
reconnect_client(const std::string &u, const std::string& a, int c, const std::vector<std::string>& f) :
url(u), address(a), failovers(f), sent(0), expected(c), received(0) {}
private:
void on_container_start(proton::container &c) override {
proton::connection_options co;
co.failover_urls(failovers);
c.connect(url, co);
}
void on_connection_open(proton::connection & c) override {
c.open_receiver(address);
c.open_sender(address);
// reconnect we probably lost the last message sent
sent = received;
std::cout << "simple_recv listening on " << url << std::endl;
}
void on_message(proton::delivery &d, proton::message &msg) override {
if (proton::coerce<int>(msg.id()) < received) {
return; // Ignore duplicate
}
if (expected == 0 || received < expected) {
std::cout << msg.body() << std::endl;
received++;
if (received == expected) {
d.receiver().close();
sender.close();
d.connection().close();
} else {
// See if we can send any messages now
send(sender);
}
}
}
void send(proton::sender& s) {
// Only send with credit and only allow one outstanding message
while (s.credit() && sent < received+1) {
std::map<std::string, int> m;
m["sequence"] = sent + 1;
proton::message msg;
msg.id(sent + 1);
msg.body(m);
std::cout << "Sending: " << sent+1 << std::endl;
s.send(msg);
sent++;
}
}
void on_sender_open(proton::sender & s) override {
sender = s;
}
void on_sendable(proton::sender &s) override {
send(s);
}
};
int main(int argc, const char** argv) {
try {
if (argc < 4) {
std ::cerr <<
"Usage: " << argv[0] << " CONNECTION-URL AMQP-ADDRESS MESSAGE-COUNT FAILOVER-URL...\n"
"CONNECTION-URL: connection address, e.g.'amqp://127.0.0.1'\n"
"AMQP-ADDRESS: AMQP node address, e.g. 'examples'\n"
"MESSAGE-COUNT: number of messages to receive\n"
"FAILOVER_URL...: zero or more failover urls\n";
return 1;
}
const char *url = argv[1];
const char *address = argv[2];
int message_count = atoi(argv[3]);
std::vector<std::string> failovers(&argv[4], &argv[argc]);
reconnect_client client(url, address, message_count, failovers);
proton::container(client).run();
return 0;
} catch (const std::exception& e) {
std::cerr << e.what() << std::endl;
}
return 1;
}
Apache Qpid, Messaging built on AMQP; Copyright © 2015 The Apache Software Foundation; Licensed under the Apache License, Version 2.0; Apache Qpid, Qpid, Qpid Proton, Proton, Apache, the Apache feather logo, and the Apache Qpid project logo are trademarks of The Apache Software Foundation; All other marks mentioned may be trademarks or registered trademarks of their respective owners