Package proton :: Class Messenger
[frames] | no frames]

Class Messenger

source code


The Messenger class defines a high level interface for sending and receiving Messages. Every Messenger contains a single logical queue of incoming messages and a single logical queue of outgoing messages. These messages in these queues may be destined for, or originate from, a variety of addresses.

The messenger interface is single-threaded. All methods except one (interrupt) are intended to be used from within the messenger thread.

Address Syntax

An address has the following form:

 [ amqp[s]:// ] [user[:password]@] domain [/[name]]

Where domain can be one of:

 host | host:port | ip | ip:port | name

The following are valid examples of addresses:

Sending & Receiving Messages

The Messenger class works in conjuction with the Message class. The Message class is a mutable holder of message content.

The put method copies its Message to the outgoing queue, and may send queued messages if it can do so without blocking. The send method blocks until it has sent the requested number of messages, or until a timeout interrupts the attempt.

>>> message = Message()
>>> for i in range(3):
...   message.address = "amqp://host/queue"
...   message.subject = "Hello World %i" % i
...   messenger.put(message)
>>> messenger.send()

Similarly, the recv method receives messages into the incoming queue, and may block as it attempts to receive the requested number of messages, or until timeout is reached. It may receive fewer than the requested number. The get method pops the eldest Message off the incoming queue and copies it into the Message object that you supply. It will not block.

>>> message = Message()
>>> messenger.recv(10):
>>> while messenger.incoming > 0:
...   messenger.get(message)
...   print message.subject
Hello World 0
Hello World 1
Hello World 2

The blocking flag allows you to turn off blocking behavior entirely, in which case send and recv will do whatever they can without blocking, and then return. You can then look at the number of incoming and outgoing messages to see how much outstanding work still remains.

Instance Methods
 
__init__(self, name=None)
Construct a new Messenger with the given name.
source code
 
__del__(self)
Destroy the Messenger.
source code
 
name(self)
The name of the Messenger.
source code
 
start(self)
Currently a no-op placeholder.
source code
 
stop(self)
Transitions the Messenger to an inactive state.
source code
 
stopped(self)
Returns true iff a Messenger is in the stopped state.
source code
 
subscribe(self, source)
Subscribes the Messenger to messages originating from the specified source.
source code
 
put(self, message)
Places the content contained in the message onto the outgoing queue of the Messenger.
source code
 
status(self, tracker)
Gets the last known remote state of the delivery associated with the given tracker.
source code
 
buffered(self, tracker)
Checks if the delivery associated with the given tracker is still waiting to be sent.
source code
 
settle(self, tracker=None)
Frees a Messenger from tracking the status associated with a given tracker.
source code
 
send(self, n=-1)
This call will block until the indicated number of messages have been sent, or until the operation times out.
source code
 
recv(self, n=None)
Receives up to n messages into the incoming queue.
source code
 
work(self, timeout=None)
Sends or receives any outstanding messages queued for a Messenger.
source code
 
receiving(self) source code
 
interrupt(self)
The Messenger interface is single-threaded.
source code
 
get(self, message=None)
Moves the message from the head of the incoming message queue into the supplied message object.
source code
 
accept(self, tracker=None)
Signal the sender that you have acted on the Message pointed to by the tracker.
source code
 
reject(self, tracker=None)
Rejects the Message indicated by the tracker.
source code
 
outgoing(self)
The outgoing queue depth.
source code
 
incoming(self)
The incoming queue depth.
source code
 
route(self, pattern, address)
Adds a routing rule to a Messenger's internal routing table.
source code
 
rewrite(self, pattern, address)
Similar to route(), except that the destination of the Message is determined before the message address is rewritten.
source code
 
selectable(self) source code
 
deadline(self) source code
Class Variables
  certificate = property(_get_certificate, _set_certificate, doc...
  private_key = property(_get_private_key, _set_private_key, doc...
  password = property(_get_password, _set_password, doc= ...
  trusted_certificates = property(_get_trusted_certificates, _se...
  timeout = property(_get_timeout, _set_timeout, doc= ...
  blocking = property(_is_blocking, _set_blocking, doc= ...
  passive = property(_is_passive, _set_passive, doc= ...
  incoming_window = property(_get_incoming_window, _set_incoming...
  outgoing_window = property(_get_outgoing_window, _set_outgoing...
Method Details

__init__(self, name=None)
(Constructor)

source code 

Construct a new Messenger with the given name. The name has global scope. If a NULL name is supplied, a UUID based name will be chosen.

Parameters:
  • name (string) - the name of the messenger or None
Overrides: object.__init__

__del__(self)
(Destructor)

source code 

Destroy the Messenger. This will close all connections that are managed by the Messenger. Call the stop method before destroying the Messenger.

name(self)

source code 

The name of the Messenger.

Decorators:
  • @property

start(self)

source code 

Currently a no-op placeholder. For future compatibility, do not send or recv messages before starting the Messenger.

stop(self)

source code 

Transitions the Messenger to an inactive state. An inactive Messenger will not send or receive messages from its internal queues. A Messenger should be stopped before being discarded to ensure a clean shutdown handshake occurs on any internally managed connections.

stopped(self)

source code 

Returns true iff a Messenger is in the stopped state. This function does not block.

Decorators:
  • @property

subscribe(self, source)

source code 

Subscribes the Messenger to messages originating from the specified source. The source is an address as specified in the Messenger introduction with the following addition. If the domain portion of the address begins with the '~' character, the Messenger will interpret the domain as host/port, bind to it, and listen for incoming messages. For example "~0.0.0.0", "amqp://~0.0.0.0", and "amqps://~0.0.0.0" will all bind to any local interface and listen for incoming messages with the last variant only permitting incoming SSL connections.

Parameters:
  • source (string) - the source of messages to subscribe to

put(self, message)

source code 

Places the content contained in the message onto the outgoing queue of the Messenger. This method will never block, however it will send any unblocked Messages in the outgoing queue immediately and leave any blocked Messages remaining in the outgoing queue. The send call may be used to block until the outgoing queue is empty. The outgoing property may be used to check the depth of the outgoing queue.

When the content in a given Message object is copied to the outgoing message queue, you may then modify or discard the Message object without having any impact on the content in the outgoing queue.

This method returns an outgoing tracker for the Message. The tracker can be used to determine the delivery status of the Message.

Parameters:
  • message (Message) - the message to place in the outgoing queue
Returns:
a tracker

status(self, tracker)

source code 

Gets the last known remote state of the delivery associated with the given tracker.

Parameters:
  • tracker (tracker) - the tracker whose status is to be retrieved
Returns:
one of None, PENDING, REJECTED, MODIFIED, or ACCEPTED

buffered(self, tracker)

source code 

Checks if the delivery associated with the given tracker is still waiting to be sent.

Parameters:
  • tracker (tracker) - the tracker whose status is to be retrieved
Returns:
true if delivery is still buffered

settle(self, tracker=None)

source code 

Frees a Messenger from tracking the status associated with a given tracker. If you don't supply a tracker, all outgoing messages up to the most recent will be settled.

send(self, n=-1)

source code 

This call will block until the indicated number of messages have been sent, or until the operation times out. If n is -1 this call will block until all outgoing messages have been sent. If n is 0 then this call will send whatever it can without blocking.

recv(self, n=None)

source code 

Receives up to n messages into the incoming queue. If no value for n is supplied, this call will receive as many messages as it can buffer internally. If the Messenger is in blocking mode, this call will block until at least one Message is available in the incoming queue.

work(self, timeout=None)

source code 

Sends or receives any outstanding messages queued for a Messenger. This will block for the indicated timeout. This method may also do I/O work other than sending and receiving messages. For example, closing connections after messenger.stop() has been called.

receiving(self)

source code 
Decorators:
  • @property

interrupt(self)

source code 

The Messenger interface is single-threaded. This is the only Messenger function intended to be called from outside of the Messenger thread. Call this from a non-messenger thread to interrupt a Messenger that is blocking. This will cause any in-progress blocking call to throw the Interrupt exception. If there is no currently blocking call, then the next blocking call will be affected, even if it is within the same thread that interrupt was called from.

get(self, message=None)

source code 

Moves the message from the head of the incoming message queue into the supplied message object. Any content in the message will be overwritten.

A tracker for the incoming Message is returned. The tracker can later be used to communicate your acceptance or rejection of the Message.

If None is passed in for the Message object, the Message popped from the head of the queue is discarded.

Parameters:
  • message (Message) - the destination message object
Returns:
a tracker

accept(self, tracker=None)

source code 

Signal the sender that you have acted on the Message pointed to by the tracker. If no tracker is supplied, then all messages that have been returned by the get method are accepted, except those that have already been auto-settled by passing beyond your incoming window size.

Parameters:
  • tracker (tracker) - a tracker as returned by get

reject(self, tracker=None)

source code 

Rejects the Message indicated by the tracker. If no tracker is supplied, all messages that have been returned by the get method are rejected, except those that have already been auto-settled by passing beyond your outgoing window size.

Parameters:
  • tracker (tracker) - a tracker as returned by get

outgoing(self)

source code 

The outgoing queue depth.

Decorators:
  • @property

incoming(self)

source code 

The incoming queue depth.

Decorators:
  • @property

route(self, pattern, address)

source code 

Adds a routing rule to a Messenger's internal routing table.

The route procedure may be used to influence how a Messenger will internally treat a given address or class of addresses. Every call to the route procedure will result in Messenger appending a routing rule to its internal routing table.

Whenever a Message is presented to a Messenger for delivery, it will match the address of this message against the set of routing rules in order. The first rule to match will be triggered, and instead of routing based on the address presented in the message, the Messenger will route based on the address supplied in the rule.

The pattern matching syntax supports two types of matches, a '%' will match any character except a '/', and a '*' will match any character including a '/'.

A routing address is specified as a normal AMQP address, however it may additionally use substitution variables from the pattern match that triggered the rule.

Any message sent to "foo" will be routed to "amqp://foo.com":

>>> messenger.route("foo", "amqp://foo.com");

Any message sent to "foobar" will be routed to "amqp://foo.com/bar":

>>> messenger.route("foobar", "amqp://foo.com/bar");

Any message sent to bar/<path> will be routed to the corresponding path within the amqp://bar.com domain:

>>> messenger.route("bar/*", "amqp://bar.com/$1");

Route all messages over TLS:

>>> messenger.route("amqp:*", "amqps:$1")

Supply credentials for foo.com:

>>> messenger.route("amqp://foo.com/*", "amqp://user:password@foo.com/$1");

Supply credentials for all domains:

>>> messenger.route("amqp://*", "amqp://user:password@$1");

Route all addresses through a single proxy while preserving the original destination:

>>> messenger.route("amqp://%/*", "amqp://user:password@proxy/$1/$2");

Route any address through a single broker:

>>> messenger.route("*", "amqp://user:password@broker/$1");

rewrite(self, pattern, address)

source code 

Similar to route(), except that the destination of the Message is determined before the message address is rewritten.

The outgoing address is only rewritten after routing has been finalized. If a message has an outgoing address of "amqp://0.0.0.0:5678", and a rewriting rule that changes its outgoing address to "foo", it will still arrive at the peer that is listening on "amqp://0.0.0.0:5678", but when it arrives there, the receiver will see its outgoing address as "foo".

The default rewrite rule removes username and password from addresses before they are transmitted.

deadline(self)

source code 
Decorators:
  • @property

Class Variable Details

certificate

Value:
property(_get_certificate, _set_certificate, doc= """
Path to a certificate file for the L{Messenger}. This certificate is
used when the L{Messenger} accepts or establishes SSL/TLS connections.
This property must be specified for the L{Messenger} to accept
incoming SSL/TLS connections and to establish client authenticated
outgoing SSL/TLS connection. Non client authenticated outgoing SSL/TLS
connections do not require this property.
""")

private_key

Value:
property(_get_private_key, _set_private_key, doc= """
Path to a private key file for the L{Messenger's<Messenger>}
certificate. This property must be specified for the L{Messenger} to
accept incoming SSL/TLS connections and to establish client
authenticated outgoing SSL/TLS connection. Non client authenticated
SSL/TLS connections do not require this property.
""")

password

Value:
property(_get_password, _set_password, doc= """
This property contains the password for the L{Messenger.private_key}
file, or None if the file is not encrypted.
""")

trusted_certificates

Value:
property(_get_trusted_certificates, _set_trusted_certificates, doc= ""\
"
A path to a database of trusted certificates for use in verifying the
peer on an SSL/TLS connection. If this property is None, then the peer
will not be verified.
""")

timeout

Value:
property(_get_timeout, _set_timeout, doc= """
The timeout property contains the default timeout for blocking
operations performed by the L{Messenger}.
""")

blocking

Value:
property(_is_blocking, _set_blocking, doc= """
Enable or disable blocking behavior during L{Message} sending
and receiving.  This affects every blocking call, with the
exception of L{work}.  Currently, the affected calls are
L{send}, L{recv}, and L{stop}.
""")

passive

Value:
property(_is_passive, _set_passive, doc= """
When passive is set to true, Messenger will not attempt to perform I/O
internally. In this mode it is necessary to use the selectables API to
drive any I/O needed to perform requested actions. In this mode
Messenger will never block.
""")

incoming_window

Value:
property(_get_incoming_window, _set_incoming_window, doc= """
The incoming tracking window for the messenger. The messenger will
track the remote status of this many incoming deliveries after they
have been accepted or rejected. Defaults to zero.

L{Messages<Message>} enter this window only when you take them into yo\
ur application
using L{get}.  If your incoming window size is I{n}, and you get I{n}+\
...

outgoing_window

Value:
property(_get_outgoing_window, _set_outgoing_window, doc= """
The outgoing tracking window for the messenger. The messenger will
track the remote status of this many outgoing deliveries after calling
send. Defaults to zero.

A L{Message} enters this window when you call the put() method with th\
e
message.  If your outgoing window size is I{n}, and you call L{put} I{\
...