Menu Search

drain.pl

use strict;
use warnings;

use qpid;
use Getopt::Long;
use Pod::Usage;

my $url               = "127.0.0.1";
my $timeout           = 0;
my $forever           = 0;
my $count             = 0;
my $connectionOptions = "";
my $address           = "amq.direct";
my $help;

my $result = GetOptions(
    "broker|b=s"           => \$url,
    "timeout|t=i"          => \$timeout,
    "forever|f"            => \$forever,
    "connection-options=s" => \$connectionOptions,
    "count|c=i"            => \$count,
    "help|h"               => \$help
) || pod2usage( -verbose => 0 );

pod2usage( -verbose => 1 ) if $help;

if ( $#ARGV ge 0 ) {
    $address = $ARGV[0];
}

sub getTimeout {

    # returns either the named duration FOREVER if the
    # forever cmdline argument was used, otherwise creates
    # a new Duration of the specified length
    return ($forever)
      ? qpid::messaging::Duration::FOREVER
      : new qpid::messaging::Duration( $timeout * 1000 );
}

sub printProperties {
    my $h = shift();
    return qq[{${\(join', ',map"'$_': '$h->{$_}'",keys%$h)}}];
}

# create a connection object
my $connection = new qpid::messaging::Connection( $url, $connectionOptions );

eval {
    # open the connection, then create a session and receiver
    $connection->open();
    my $session  = $connection->create_session();
    my $receiver = $session->create_receiver($address);
    my $timeout  = getTimeout();
    my $message  = new qpid::messaging::Message();
    my $i        = 0;

    for ( ; ; ) {
        eval { $message = $receiver->fetch($timeout); };

        if ($@) {
            last;
        }

        # check if the message was on that was redelivered
        my $redelivered =
          ( $message->get_redelivered ) ? "redelivered=True, " : "";
        print "Message("
          . $redelivered
          . "properties="
          . printProperties( $message->get_properties() )
          . ", content='";

        # if the message content was a map, then we will print
        # it out as a series of name => value pairs
        if ( $message->get_content_type() eq "amqp/map" ) {
            my $content = $message->get_content();
            map { print "\n$_ => $content->{$_}"; } keys %{$content};
        }
        else {
            # it's not a map, so just print the content as a string
            print $message->get_content();
        }
        print "')\n";

        # if the message had a reply-to address, then we'll send a
        # response back letting the send know the message was processed
        my $replyto = $message->get_reply_to();
        if ( $replyto->get_name() ) {
            print "Replying to " . $message->get_reply_to()->str() . "...\n";

            # create a temporary sender for the specified queue
            my $sender = $session->create_sender($replyto);
            my $response =
              new qpid::messaging::Message("received by the server.");
            $sender->send($response);
        }

        # acknowledge all messages received on this queue so far
        $session->acknowledge();

        if ( $count and ( ++$i == $count ) ) {
            last;
        }
    }

    # close everything to clean up
    $receiver->close();
    $session->close();
    $connection->close();
};

if ($@) {
    $connection->close();
    die $@;
}

__END__

=head1 NAME

drain - Drains messages from the specified address

=head1 SYNOPSIS

  Options:
  -h, --help                    show this message
  -b VALUE, --broker VALUE      url of broker to connect to
  -t VALUE, --timeout VALUE     timeout in seconds to wait before exiting
  -f, --forever                 ignore timeout and wait forever
  --connection-options VALUE    connection options string in the form {name1:value1, name2:value2}
  -c VALUE, --count VALUE       number of messages to read before exiting

=cut

Download this file