A working example for accessing Service Bus session-enabled queues.Also provides some general notes on Service Bus usage.
 
 
 
 
#include "options.hpp"
 
 
#include <iostream>
#include <sstream>
 
 
 
void do_next_sequence();
 
namespace {
void check_arg(const std::string &value, const std::string &name) {
    if (value.empty())
        throw std::runtime_error("missing argument for \"" + name + "\"");
}
}
 
  private:
    const std::string &connection_url;
    const std::string &entity;
    connection_options coptions;
    int message_count;
    bool closed;
 
  public:
    session_receiver(const std::string &c, const std::string &e, const connection_options &co,
                     const char *sid) :
        connection_url(c), entity(e), coptions(co),
        message_count(0), closed(false), read_timeout(5000), last_read(0), container(0) {
        if (sid)
            session_identifier = std::string(sid);
        
        
        
        
        
 
    }
 
        message_count = 0;
        closed = false;
        c.
connect(connection_url, coptions.handler(*
this));
        container = &c;
    }
 
        sb_filter_map.
put(key, session_identifier);
        receiver = connection.
open_receiver(entity, receiver_options().source(source_options().filters(sb_filter_map)));
 
        
        
        
        
        
    }
 
        if (closed) return; 
        std::cout << "receiving messages with session identifier \"" << actual_session_id
                  << "\" from queue " << entity << std::endl;
    }
 
        message_count++;
        std::cout << 
"   received message: " << m.
body() << std::endl;
    }
 
    void process_timeout() {
        if (now >= deadline) {
            closed = true;
            if (message_count)
                do_next_sequence();
            else
                std::cout << "Done. No more messages." << std::endl;
        } else {
        }
    }
};
 
 
  private:
    const std::string &connection_url;
    const std::string &entity;
    connection_options coptions;
    int msg_count;
    int total;
    int accepts;
 
  public:
    session_sender(const std::string &c, const std::string &e, const connection_options &co) :
        connection_url(c), entity(e), coptions(co),
        msg_count(0), total(7), accepts(0) {}
 
        c.
open_sender(connection_url + 
"/" + entity, sender_options(), coptions.handler(*
this));
    }
 
        std::string gid;
        for (; msg_count < total && s.
credit() > 0; msg_count++) {
 
            switch (msg_count) {
            case 0: gid = "red"; break;
            case 1: gid = "green"; break;
            case 2: gid = "blue"; break;
            case 3: gid = "red"; break;
            case 4: gid = "black"; break;
            case 5: gid = "blue"; break;
            case 6: gid = "yellow"; break;
            }
 
            std::ostringstream mbody;
            mbody << "message " << msg_count << " in service bus session \"" << gid << "\"";
            std::cout << 
"   sent message: " << m.
body() << std::endl;
        }
    }
 
        send_remaining_messages(s);
    }
 
        accepts++;
        if (accepts == total) {
            
            do_next_sequence();
        }
    }
};
 
 
  private:
    int sequence_no;
    session_sender snd;
    session_receiver rcv_red, rcv_green, rcv_null;
 
  public:
    static sequence *the_sequence;
 
    sequence (const std::string &c, const std::string &e, const connection_options &co) :
        container(0), sequence_no(0),
        snd(c, e, co), rcv_red(c, e, co, "red"), rcv_green(c, e, co, "green"), rcv_null(c, e, co, NULL) {
        the_sequence = this;
    }
 
        container = &c;
        next_sequence();
    }
 
    void next_sequence() {
        switch (sequence_no++) {
        
        case 0: snd.run(*container); break;
        case 1: rcv_green.run(*container); break;
        case 2: rcv_red.run(*container); break;
        
        default: rcv_null.run(*container); break;
        }
    }
};
 
sequence *sequence::the_sequence = NULL;
 
void do_next_sequence() { sequence::the_sequence->next_sequence(); }
 
 
int main(int argc, char **argv) {
    std::string sb_namespace; 
    std::string sb_key_name;  
    std::string sb_key;       
    std::string sb_entity;    
                              
 
    example::options opts(argc, argv);
    opts.add_value(sb_namespace, 'n', "namespace", "Service Bus full namespace", "NAMESPACE");
    opts.add_value(sb_key_name, 'p', "policy", "policy name that specifies access rights (key name)", "POLICY");
    opts.add_value(sb_key, 'k', "key", "secret key for the policy", "key");
    opts.add_value(sb_entity, 'e', "entity", "entity path (queue name)", "ENTITY");
 
    try {
        opts.parse();
        check_arg(sb_namespace, "namespace");
        check_arg(sb_key_name, "policy");
        check_arg(sb_key, "key");
        check_arg(sb_entity, "entity");
        std::string connection_string("amqps://" + sb_namespace);
 
        sequence seq(connection_string, sb_entity,
                     connection_options()
                     .user(sb_key_name)
                     .password(sb_key)
                     .sasl_allowed_mechs("PLAIN"));
        return 0;
    } catch (const std::exception& e) {
        std::cerr << e.what() << std::endl;
    }
 
    return 1;
}
Options for creating a connection.
Definition: connection_options.hpp:67
A connection to a remote AMQP peer.
Definition: connection.hpp:47
void close()
Close the connection.
class work_queue & work_queue() const
Get the work_queue for the connection.
receiver open_receiver(const std::string &addr)
Open a receiver for addr on default_session().
A top-level container of connections, sessions, and links.
Definition: container.hpp:49
void run()
Run the container in the current thread.
returned< connection > connect(const std::string &conn_url, const connection_options &conn_opts)
Connect to conn_url and send an open request to the remote peer.
returned< sender > open_sender(const std::string &addr_url)
Open a connection and sender for addr_url.
A received message.
Definition: delivery.hpp:40
A span of time in milliseconds.
Definition: duration.hpp:39
void close()
Close the endpoint.
class work_queue & work_queue() const
Get the work_queue for the link.
int credit() const
Credit available on the link.
class connection connection() const
The connection that owns this link.
T get(const K &k) const
Get the map entry for key k.
void put(const K &k, const T &v)
Put a map entry for key k.
An AMQP message.
Definition: message.hpp:50
void group_id(const std::string &)
Set the message group ID.
void body(const value &x)
Set the body. Equivalent to body() = x.
Handler for Proton messaging events.
Definition: messaging_handler.hpp:69
virtual void on_connection_open(connection &)
The remote peer opened the connection: called once on initial open, and again on each successful auto...
virtual void on_tracker_accept(tracker &)
The receiving peer accepted a transfer.
virtual void on_message(delivery &, message &)
A message is received.
virtual void on_receiver_open(receiver &)
The remote peer opened the link.
virtual void on_sendable(sender &)
A message can be sent.
Options for creating a receiver.
Definition: receiver_options.hpp:59
A channel for receiving messages.
Definition: receiver.hpp:41
class source source() const
Get the source node.
Options for creating a sender.
Definition: sender_options.hpp:60
A channel for sending messages.
Definition: sender.hpp:40
tracker send(const message &m)
Send a message on the sender.
Options for creating a source node for a sender or receiver.
Definition: source_options.hpp:44
const filter_map & filters() const
Unsettled API - Obtain the set of message filters.
Unsettled API - SSL configuration for outbound connections.
Definition: ssl.hpp:153
A string that represents the AMQP symbol type.
Definition: symbol.hpp:35
A 64-bit timestamp in milliseconds since the Unix epoch.
Definition: timestamp.hpp:35
static timestamp now()
The current wall-clock time.
A tracker for a sent message.
Definition: tracker.hpp:41
class sender sender() const
Get the sender for this tracker.
A holder for any AMQP value, simple or complex.
Definition: value.hpp:57
void schedule(duration, work fn)
Unsettled API - Add work fn to the work queue after a duration.
A connection to a remote AMQP peer.
Options for creating a connection.
A top-level container of connections, sessions, and links.
Handler for Proton messaging events.
Options for creating a receiver.
A channel for sending messages.
Options for creating a sender.
Options for creating a source node for a sender or receiver.
A tracker for a sent message.
Unsettled API - A context for thread-safe execution of work.