Menu Search

flow_control.cpp

#include "options.hpp"

#include <proton/connection.hpp>
#include <proton/connection_options.hpp>
#include <proton/container.hpp>
#include <proton/delivery.hpp>
#include <proton/listen_handler.hpp>
#include <proton/listener.hpp>
#include <proton/message.hpp>
#include <proton/messaging_handler.hpp>
#include <proton/receiver_options.hpp>
#include <proton/sender.hpp>
#include <proton/tracker.hpp>

#include <iostream>
#include <sstream>


namespace {

bool verbose = true;

void verify(bool success, const std::string &msg) {
    if (!success)
        throw std::runtime_error("example failure:" + msg);
    else {
        std::cout << "success: " << msg << std::endl;
        if (verbose) std::cout << std::endl;
    }
}

}

// flow_sender manages the incoming connection and acts as the message sender.
class flow_sender : public proton::messaging_handler {
  private:
    int available;  // Number of messages the sender may send assuming sufficient credit.
    int sequence;

  public:
    flow_sender() : available(0), sequence(0) {}

    void send_available_messages(proton::sender &s) {
        while(available && s.credit() > 0) {
            std::ostringstream mbody;
            mbody << "flow_sender message " << sequence++;
            proton::message m(mbody.str());
            s.send(m);
            available--;
        }
    }

    void on_sendable(proton::sender &s) override {
        if (verbose)
            std::cout << "flow_sender in \"on_sendable\" with credit " << s.credit()
                      << " and " << available << " available messages" << std::endl;
        send_available_messages(s);
    }

    void on_sender_drain_start(proton::sender &s) override {
        if (verbose)
            std::cout << "flow_sender in \"on_drain_start\" with credit " << s.credit()
                      << " and " << available << " available messages" << std::endl;
        send_available_messages(s);
        if (s.credit()) {
            s.return_credit(); // return the rest
        }
    }

    void set_available(int n) { available = n; }
};

class flow_receiver : public proton::messaging_handler {
  public:
    int stage;
    int received;
    flow_sender &sender;

    flow_receiver(flow_sender &s) : stage(0), received(0), sender(s) {}

    void example_setup(int n) {
        received = 0;
        sender.set_available(n);
    }

    void run_stage(proton::receiver &r, const std::string &caller) {
        // Serialize the progression of the flow control examples.
        switch (stage) {
        case 0:
            if (verbose) std::cout << "Example 1.  Simple use of credit." << std::endl;
            // TODO: add timeout callbacks, show no messages until credit.
            example_setup(2);
            r.add_credit(2);
            break;
        case 1:
            if (r.credit() > 0) return;
            verify(received == 2, "Example 1: simple credit");

            if (verbose) std::cout << "Example 2.   Use basic drain, sender has 3 \"immediate\" messages." << std::endl;
            example_setup(3);
            r.add_credit(5); // ask for up to 5
            r.drain();       // but only use what's available
            break;
        case 2:
            if (caller == "on_message") return;
            if (caller == "on_receiver_drain_finish") {
                // Note that unused credit of 2 at sender is returned and is now 0.
                verify(received == 3 && r.credit() == 0, "Example 2: basic drain");

                if (verbose) std::cout << "Example 3. Drain use with no credit." << std::endl;
                example_setup(0);
                r.drain();
                break;
            }
            verify(false, "example 2 run_stage");
            return;

        case 3:
            verify(caller == "on_receiver_drain_finish" && received == 0, "Example 3: drain without credit");

            if (verbose) std::cout << "Example 4. Show using high(10)/low(3) watermark for 25 messages." << std::endl;
            example_setup(25);
            r.add_credit(10);
            break;

        case 4:
            if (received < 25) {
                // Top up credit as needed.
                uint32_t credit = r.credit();
                if (credit <= 3) {
                    uint32_t new_credit = 10;
                    uint32_t remaining = 25 - received;
                    if (new_credit > remaining)
                        new_credit = remaining;
                    if (new_credit > credit) {
                        r.add_credit(new_credit - credit);
                        if (verbose)
                            std::cout << "flow_receiver adding credit for " << new_credit - credit
                                      << " messages" << std::endl;
                    }
                }
                return;
            }

            verify(received == 25 && r.credit() == 0, "Example 4: high/low watermark");
            r.connection().close();
            break;

        default:
            throw std::runtime_error("run_stage sequencing error");
        }
        stage++;
    }

    void on_receiver_open(proton::receiver &r) override {
        run_stage(r, "on_receiver_open");
    }

    void on_message(proton::delivery &d, proton::message &m) override {
        if (verbose)
            std::cout << "flow_receiver in \"on_message\" with " << m.body() << std::endl;
        proton::receiver r(d.receiver());
        received++;
        run_stage(r, "on_message");
    }

    void on_receiver_drain_finish(proton::receiver &r) override {
        if (verbose)
            std::cout << "flow_receiver in \"on_receiver_drain_finish\"" << std::endl;
        run_stage(r, "on_receiver_drain_finish");
    }
};

class flow_listener : public proton::listen_handler {
    proton::connection_options opts;
  public:
    flow_listener(flow_sender& sh) {
        opts.handler(sh);
    }

    void on_open(proton::listener& l) override {
        std::ostringstream url;
        url << "//:" << l.port() << "/example"; // Connect to the actual listening port
        l.container().connect(url.str());
    }

    proton::connection_options on_accept(proton::listener&) override { return opts; }
};

class flow_control : public proton::messaging_handler {
  private:
    proton::listener listener;
    flow_sender send_handler;
    flow_receiver receive_handler;
    flow_listener listen_handler;

  public:
    flow_control() : receive_handler(send_handler), listen_handler(send_handler) {}

    void on_container_start(proton::container &c) override {
        // Listen on a dynamic port on the local host.
        listener = c.listen("//:0", listen_handler);
    }

    void on_connection_open(proton::connection &c) override {
        if (c.active()) {
            // outbound connection
            c.open_receiver("flow_example", proton::receiver_options().handler(receive_handler).credit_window(0));
        }
    }

    void on_connection_close(proton::connection &) override {
        listener.stop();
    }
};

int main(int argc, char **argv) {
    // Pick an "unusual" port since we are going to be talking to
    // ourselves, not a broker.
    bool quiet = false;

    example::options opts(argc, argv);
    opts.add_flag(quiet, 'q', "quiet", "suppress additional commentary of credit allocation and consumption");

    try {
        opts.parse();
        if (quiet)
            verbose = false;

        flow_control fc;
        proton::container(fc).run();

        return 0;
    } catch (const std::exception& e) {
        std::cerr << e.what() << std::endl;
    }

    return 1;
}

Download this file