A working example for accessing Service Bus session-enabled queues.Also provides some general notes on Service Bus usage.
#include "options.hpp"
#include <iostream>
#include <sstream>
#include "fake_cpp11.hpp"
void do_next_sequence();
namespace {
void check_arg(const std::string &value, const std::string &name) {
if (value.empty())
throw std::runtime_error("missing argument for \"" + name + "\"");
}
}
private:
const std::string &connection_url;
const std::string &entity;
connection_options coptions;
int message_count;
bool closed;
public:
session_receiver(const std::string &c, const std::string &e, const connection_options &co,
const char *sid) :
connection_url(c), entity(e), coptions(co),
message_count(0), closed(false), read_timeout(5000), last_read(0), container(0) {
if (sid)
session_identifier = std::string(sid);
}
message_count = 0;
closed = false;
c.
connect(connection_url, coptions.handler(*
this));
container = &c;
}
sb_filter_map.
put(key, session_identifier);
receiver = connection.open_receiver(entity, receiver_options().source(source_options().filters(sb_filter_map)));
connection.work_queue().schedule(read_timeout, [this]() { this->process_timeout(); });
}
if (closed) return;
proton::value actual_session_id = r.source().filters().
get(
"com.microsoft:session-filter");
std::cout << "receiving messages with session identifier \"" << actual_session_id
<< "\" from queue " << entity << std::endl;
}
message_count++;
std::cout << " received message: " << m.body() << std::endl;
}
void process_timeout() {
if (now >= deadline) {
closed = true;
if (message_count)
do_next_sequence();
else
std::cout << "Done. No more messages." << std::endl;
} else {
}
}
};
private:
const std::string &connection_url;
const std::string &entity;
connection_options coptions;
int msg_count;
int total;
int accepts;
public:
session_sender(const std::string &c, const std::string &e, const connection_options &co) :
connection_url(c), entity(e), coptions(co),
msg_count(0), total(7), accepts(0) {}
c.
open_sender(connection_url +
"/" + entity, sender_options(), coptions.handler(*
this));
}
std::string gid;
for (; msg_count < total && s.
credit() > 0; msg_count++) {
switch (msg_count) {
case 0: gid = "red"; break;
case 1: gid = "green"; break;
case 2: gid = "blue"; break;
case 3: gid = "red"; break;
case 4: gid = "black"; break;
case 5: gid = "blue"; break;
case 6: gid = "yellow"; break;
}
std::ostringstream mbody;
mbody << "message " << msg_count << " in service bus session \"" << gid << "\"";
m.group_id(gid);
std::cout << " sent message: " << m.body() << std::endl;
}
}
send_remaining_messages(s);
}
accepts++;
if (accepts == total) {
t.sender().close();
t.sender().connection().close();
do_next_sequence();
}
}
};
private:
int sequence_no;
session_sender snd;
session_receiver rcv_red, rcv_green, rcv_null;
public:
static sequence *the_sequence;
sequence (const std::string &c, const std::string &e, const connection_options &co) :
container(0), sequence_no(0),
snd(c, e, co), rcv_red(c, e, co, "red"), rcv_green(c, e, co, "green"), rcv_null(c, e, co, NULL) {
the_sequence = this;
}
container = &c;
next_sequence();
}
void next_sequence() {
switch (sequence_no++) {
case 0: snd.run(*container); break;
case 1: rcv_green.run(*container); break;
case 2: rcv_red.run(*container); break;
default: rcv_null.run(*container); break;
}
}
};
sequence *sequence::the_sequence = NULL;
void do_next_sequence() { sequence::the_sequence->next_sequence(); }
int main(int argc, char **argv) {
std::string sb_namespace;
std::string sb_key_name;
std::string sb_key;
std::string sb_entity;
example::options opts(argc, argv);
opts.add_value(sb_namespace, 'n', "namespace", "Service Bus full namespace", "NAMESPACE");
opts.add_value(sb_key_name, 'p', "policy", "policy name that specifies access rights (key name)", "POLICY");
opts.add_value(sb_key, 'k', "key", "secret key for the policy", "key");
opts.add_value(sb_entity, 'e', "entity", "entity path (queue name)", "ENTITY");
try {
opts.parse();
check_arg(sb_namespace, "namespace");
check_arg(sb_key_name, "policy");
check_arg(sb_key, "key");
check_arg(sb_entity, "entity");
std::string connection_string("amqps://" + sb_namespace);
sequence seq(connection_string, sb_entity,
connection_options()
.user(sb_key_name)
.password(sb_key)
.sasl_allowed_mechs("PLAIN"));
return 0;
} catch (const std::exception& e) {
std::cerr << e.what() << std::endl;
}
return 1;
}