Menu Search

spout.cpp

#include <qpid/messaging/Address.h>
#include <qpid/messaging/Connection.h>
#include <qpid/messaging/Message.h>
#include <qpid/messaging/Message_io.h>
#include <qpid/messaging/Sender.h>
#include <qpid/messaging/Session.h>
#include <qpid/types/Variant.h>

#include <iostream>
#include <sstream>
#include <vector>
#include <ctime>

#include "OptionParser.h"

using namespace qpid::messaging;
using namespace qpid::types;

typedef std::vector<std::string> string_vector;

struct Options : OptionParser
{
    std::string url;
    std::string address;
    int timeout;
    bool durable;
    int count;
    std::string id;
    std::string replyto;
    string_vector properties;
    string_vector entries;
    std::string content;
    std::string connectionOptions;
    bool print;

    Options()
        : OptionParser("Usage: spout [OPTIONS] ADDRESS", "Send messages to the specified address"),
          url("127.0.0.1"),
          timeout(0),
          count(1),
          durable(false),
          print(false)
    {
        add("broker,b", url, "url of broker to connect to");
        add("timeout,t", timeout, "exit after the specified time");
        add("durable,d", durable, "make the message durable (def. transient)");
        add("count,c", count, "stop after count messages have been sent, zero disables");
        add("id,i", id, "use the supplied id instead of generating one");
        add("reply-to", replyto, "specify reply-to address");
        add("property,P", properties, "specify message property");
        add("map,M", entries, "specify entry for map content");
        add("content", content, "specify textual content");
        add("connection-options", connectionOptions, "connection options string in the form {name1:value1, name2:value2}");
        add("print", print, "print each message sent");
    }

    static bool nameval(const std::string& in, std::string& name, std::string& value)
    {
        std::string::size_type i = in.find("=");
        if (i == std::string::npos) {
            name = in;
            return false;
        } else {
            name = in.substr(0, i);
            if (i+1 < in.size()) {
                value = in.substr(i+1);
                return true;
            } else {
                return false;
            }
        }
    }

    static void setProperty(Message& message, const std::string& property)
    {
        std::string name;
        std::string value;
        if (nameval(property, name, value)) {
            message.getProperties()[name] = value;
            message.getProperties()[name].setEncoding("utf8");
        } else {
            message.getProperties()[name] = Variant();
        }
    }

    void setProperties(Message& message) const
    {
        for (string_vector::const_iterator i = properties.begin(); i != properties.end(); ++i) {
            setProperty(message, *i);
        }
    }

    void setEntries(Variant::Map& content) const
    {
        for (string_vector::const_iterator i = entries.begin(); i != entries.end(); ++i) {
            std::string name;
            std::string value;
            if (nameval(*i, name, value)) {
                content[name] = value;
            } else {
                content[name] = Variant();
            }
        }
    }

    bool checkAddress()
    {
        if (getArguments().empty()) {
            error("Address is required");
            return false;
        } else {
            address = getArguments()[0];
            return true;
        }
    }

    bool isDurable() const
    {
      return durable;
    }
};

int main(int argc, char** argv)
{
    Options options;
    if (options.parse(argc, argv) && options.checkAddress()) {
        Connection connection(options.url, options.connectionOptions);
        try {
            connection.open();
            Session session = connection.createSession();
            Sender sender = session.createSender(options.address);

            Message message;
            message.setDurable(options.isDurable());
            options.setProperties(message);
            Variant& obj = message.getContentObject();
            if (options.entries.size()) {
                Variant::Map content;
                options.setEntries(content);
                obj = content;
            } else if (options.content.size()) {
                obj = options.content;
                obj.setEncoding("utf8");
            }
            std::time_t start = std::time(0);
            for (int count = 0; 
                (count < options.count || options.count == 0) && 
                (options.timeout == 0 || std::difftime(std::time(0), start) < options.timeout); 
                count++) {
                if (!options.replyto.empty()) message.setReplyTo(Address(options.replyto));
                std::string id = options.id.empty() ? Uuid(true).str() : options.id;
                std::stringstream spoutid;
                spoutid << id << ":" << count;
                message.getProperties()["spout-id"] = spoutid.str();
                if (options.print) std::cout << message << std::endl;
                sender.send(message);
            }
            session.sync();
            connection.close();
            return 0;
        } catch(const std::exception& error) {
            std::cout << error.what() << std::endl;
            connection.close();
        }
    }
    return 1;
}

Download this file