Module proton :: Class Messenger
[frames] | no frames]

Class Messenger

source code


The Messenger class defines a high level interface for sending and receiving Messages. Every Messenger contains a single logical queue of incoming messages and a single logical queue of outgoing messages. These messages in these queues may be destined for, or originate from, a variety of addresses.

The messenger interface is single-threaded. All methods except one (interrupt) are intended to be used from within the messenger thread.

Address Syntax

An address has the following form:

 [ amqp[s]:// ] [user[:password]@] domain [/[name]]

Where domain can be one of:

 host | host:port | ip | ip:port | name

The following are valid examples of addresses:

Sending & Receiving Messages

The Messenger class works in conjuction with the Message class. The Message class is a mutable holder of message content.

The put method copies its Message to the outgoing queue, and may send queued messages if it can do so without blocking. The send method blocks until it has sent the requested number of messages, or until a timeout interrupts the attempt.

>>> message = Message()
>>> for i in range(3):
...   message.address = "amqp://host/queue"
...   message.subject = "Hello World %i" % i
...   messenger.put(message)
>>> messenger.send()

Similarly, the recv method receives messages into the incoming queue, and may block as it attempts to receive the requested number of messages, or until timeout is reached. It may receive fewer than the requested number. The get method pops the eldest Message off the incoming queue and copies it into the Message object that you supply. It will not block.

>>> message = Message()
>>> messenger.recv(10):
>>> while messenger.incoming > 0:
...   messenger.get(message)
...   print message.subject
Hello World 0
Hello World 1
Hello World 2

The blocking flag allows you to turn off blocking behavior entirely, in which case send and recv will do whatever they can without blocking, and then return. You can then look at the number of incoming and outgoing messages to see how much outstanding work still remains.

Instance Methods
 
__init__(self, name=None)
Construct a new Messenger with the given name.
source code
 
__del__(self)
Destroy the Messenger.
source code
 
start(self)
Currently a no-op placeholder.
source code
 
stop(self)
Transitions the Messenger to an inactive state.
source code
 
subscribe(self, source)
Subscribes the Messenger to messages originating from the specified source.
source code
 
put(self, message)
Places the content contained in the message onto the outgoing queue of the Messenger.
source code
 
status(self, tracker)
Gets the last known remote state of the delivery associated with the given tracker.
source code
 
buffered(self, tracker)
Checks if the delivery associated with the given tracker is still waiting to be sent.
source code
 
settle(self, tracker=None)
Frees a Messenger from tracking the status associated with a given tracker.
source code
 
send(self, n=-1)
This call will block until the indicated number of messages have been sent, or until the operation times out.
source code
 
recv(self, n=None)
Receives up to n messages into the incoming queue.
source code
 
work(self, timeout=None)
Sends or receives any outstanding messages queued for a Messenger.
source code
 
interrupt(self)
The Messenger interface is single-threaded.
source code
 
get(self, message=None)
Moves the message from the head of the incoming message queue into the supplied message object.
source code
 
accept(self, tracker=None)
Signal the sender that you have acted on the Message pointed to by the tracker.
source code
 
reject(self, tracker=None)
Rejects the Message indicated by the tracker.
source code
 
route(self, pattern, address)
Adds a routing rule to a Messenger's internal routing table.
source code
 
rewrite(self, pattern, address)
Similar to route(), except that the destination of the Message is determined before the message address is rewritten.
source code

Inherited from object: __delattr__, __format__, __getattribute__, __hash__, __new__, __reduce__, __reduce_ex__, __repr__, __setattr__, __sizeof__, __str__, __subclasshook__

Properties
  name
The name of the Messenger.
  certificate
Path to a certificate file for the Messenger.
  private_key
Path to a private key file for the Messenger's certificate.
  password
This property contains the password for the Messenger.private_key file, or None if the file is not encrypted.
  trusted_certificates
A path to a database of trusted certificates for use in verifying the peer on an SSL/TLS connection.
  timeout
The timeout property contains the default timeout for blocking operations performed by the Messenger.
  blocking
Enable or disable blocking behavior during Message sending and receiving.
  incoming_window
The incoming tracking window for the messenger.
  outgoing_window
The outgoing tracking window for the messenger.
  stopped
Returns true iff a Messenger is in the stopped state.
  receiving
  outgoing
The outgoing queue depth.
  incoming
The incoming queue depth.

Inherited from object: __class__

Method Details

__init__(self, name=None)
(Constructor)

source code 

Construct a new Messenger with the given name. The name has global scope. If a NULL name is supplied, a uuid.UUID based name will be chosen.

Parameters:
  • name (string) - the name of the messenger or None
Overrides: object.__init__

__del__(self)
(Destructor)

source code 

Destroy the Messenger. This will close all connections that are managed by the Messenger. Call the stop method before destroying the Messenger.

start(self)

source code 

Currently a no-op placeholder. For future compatibility, do not send or recv messages before starting the Messenger.

stop(self)

source code 

Transitions the Messenger to an inactive state. An inactive Messenger will not send or receive messages from its internal queues. A Messenger should be stopped before being discarded to ensure a clean shutdown handshake occurs on any internally managed connections.

subscribe(self, source)

source code 

Subscribes the Messenger to messages originating from the specified source. The source is an address as specified in the Messenger introduction with the following addition. If the domain portion of the address begins with the '~' character, the Messenger will interpret the domain as host/port, bind to it, and listen for incoming messages. For example "~0.0.0.0", "amqp://~0.0.0.0", and "amqps://~0.0.0.0" will all bind to any local interface and listen for incoming messages with the last variant only permitting incoming SSL connections.

Parameters:
  • source (string) - the source of messages to subscribe to

put(self, message)

source code 

Places the content contained in the message onto the outgoing queue of the Messenger. This method will never block, however it will send any unblocked Messages in the outgoing queue immediately and leave any blocked Messages remaining in the outgoing queue. The send call may be used to block until the outgoing queue is empty. The outgoing property may be used to check the depth of the outgoing queue.

When the content in a given Message object is copied to the outgoing message queue, you may then modify or discard the Message object without having any impact on the content in the outgoing queue.

This method returns an outgoing tracker for the Message. The tracker can be used to determine the delivery status of the Message.

Parameters:
  • message (Message) - the message to place in the outgoing queue
Returns:
a tracker

status(self, tracker)

source code 

Gets the last known remote state of the delivery associated with the given tracker.

Parameters:
  • tracker (tracker) - the tracker whose status is to be retrieved
Returns:
one of None, PENDING, REJECTED, or ACCEPTED

buffered(self, tracker)

source code 

Checks if the delivery associated with the given tracker is still waiting to be sent.

Parameters:
  • tracker (tracker) - the tracker whose status is to be retrieved

    @return true if delivery is still buffered

settle(self, tracker=None)

source code 

Frees a Messenger from tracking the status associated with a given tracker. If you don't supply a tracker, all outgoing messages up to the most recent will be settled.

send(self, n=-1)

source code 

This call will block until the indicated number of messages have been sent, or until the operation times out. If n is -1 this call will block until all outgoing messages have been sent. If n is 0 then this call will send whatever it can without blocking.

recv(self, n=None)

source code 

Receives up to n messages into the incoming queue. If no value for n is supplied, this call will receive as many messages as it can buffer internally. If the Messenger is in blocking mode, this call will block until at least one Message is available in the incoming queue.

work(self, timeout=None)

source code 

Sends or receives any outstanding messages queued for a Messenger. This will block for the indicated timeout. This method may also do I/O work other than sending and receiving messages. For example, closing connections after messenger.stop() has been called.

interrupt(self)

source code 

The Messenger interface is single-threaded. This is the only Messenger function intended to be called from outside of the Messenger thread. Call this from a non-messenger thread to interrupt a Messenger that is blocking. This will cause any in-progress blocking call to throw the Interrupt exception. If there is no currently blocking call, then the next blocking call will be affected, even if it is within the same thread that interrupt was called from.

get(self, message=None)

source code 

Moves the message from the head of the incoming message queue into the supplied message object. Any content in the message will be overwritten.

A tracker for the incoming Message is returned. The tracker can later be used to communicate your acceptance or rejection of the Message.

If None is passed in for the Message object, the Message popped from the head of the queue is discarded.

Parameters:
  • message (Message) - the destination message object
Returns:
a tracker

accept(self, tracker=None)

source code 

Signal the sender that you have acted on the Message pointed to by the tracker. If no tracker is supplied, then all messages that have been returned by the get method are accepted, except those that have already been auto-settled by passing beyond your incoming window size.

Parameters:
  • tracker (tracker) - a tracker as returned by get

reject(self, tracker=None)

source code 

Rejects the Message indicated by the tracker. If no tracker is supplied, all messages that have been returned by the get method are rejected, except those that have already been auto-settled by passing beyond your outgoing window size.

Parameters:
  • tracker (tracker) - a tracker as returned by get

route(self, pattern, address)

source code 

Adds a routing rule to a Messenger's internal routing table.

The route procedure may be used to influence how a Messenger will internally treat a given address or class of addresses. Every call to the route procedure will result in Messenger appending a routing rule to its internal routing table.

Whenever a Message is presented to a Messenger for delivery, it will match the address of this message against the set of routing rules in order. The first rule to match will be triggered, and instead of routing based on the address presented in the message, the Messenger will route based on the address supplied in the rule.

The pattern matching syntax supports two types of matches, a '%' will match any character except a '/', and a '*' will match any character including a '/'.

A routing address is specified as a normal AMQP address, however it may additionally use substitution variables from the pattern match that triggered the rule.

Any message sent to "foo" will be routed to "amqp://foo.com":

>>> messenger.route("foo", "amqp://foo.com");

Any message sent to "foobar" will be routed to "amqp://foo.com/bar":

>>> messenger.route("foobar", "amqp://foo.com/bar");

Any message sent to bar/<path> will be routed to the corresponding path within the amqp://bar.com domain:

>>> messenger.route("bar/*", "amqp://bar.com/$1");

Route all messages over TLS:

>>> messenger.route("amqp:*", "amqps:$1")

Supply credentials for foo.com:

>>> messenger.route("amqp://foo.com/*", "amqp://user:password@foo.com/$1");

Supply credentials for all domains:

>>> messenger.route("amqp://*", "amqp://user:password@$1");

Route all addresses through a single proxy while preserving the original destination:

>>> messenger.route("amqp://%/*", "amqp://user:password@proxy/$1/$2");

Route any address through a single broker:

>>> messenger.route("*", "amqp://user:password@broker/$1");

rewrite(self, pattern, address)

source code 

Similar to route(), except that the destination of the Message is determined before the message address is rewritten.

The outgoing address is only rewritten after routing has been finalized. If a message has an outgoing address of "amqp://0.0.0.0:5678", and a rewriting rule that changes its outgoing address to "foo", it will still arrive at the peer that is listening on "amqp://0.0.0.0:5678", but when it arrives there, the receiver will see its outgoing address as "foo".

The default rewrite rule removes username and password from addresses before they are transmitted.


Property Details

name

The name of the Messenger.

Get Method:
unreachable.name(self) - The name of the Messenger.

certificate

Path to a certificate file for the Messenger. This certificate is used when the Messenger accepts or establishes SSL/TLS connections. This property must be specified for the Messenger to accept incoming SSL/TLS connections and to establish client authenticated outgoing SSL/TLS connection. Non client authenticated outgoing SSL/TLS connections do not require this property.

Get Method:
_get_certificate(self)
Set Method:
_set_certificate(self, value)

private_key

Path to a private key file for the Messenger's certificate. This property must be specified for the Messenger to accept incoming SSL/TLS connections and to establish client authenticated outgoing SSL/TLS connection. Non client authenticated SSL/TLS connections do not require this property.

Get Method:
_get_private_key(self)
Set Method:
_set_private_key(self, value)

password

This property contains the password for the Messenger.private_key file, or None if the file is not encrypted.

Get Method:
_get_password(self)
Set Method:
_set_password(self, value)

trusted_certificates

A path to a database of trusted certificates for use in verifying the peer on an SSL/TLS connection. If this property is None, then the peer will not be verified.

Get Method:
_get_trusted_certificates(self)
Set Method:
_set_trusted_certificates(self, value)

timeout

The timeout property contains the default timeout for blocking operations performed by the Messenger.

Get Method:
_get_timeout(self)
Set Method:
_set_timeout(self, value)

blocking

Enable or disable blocking behavior during Message sending and receiving. This affects every blocking call, with the exception of work. Currently, the affected calls are send, recv, and stop.

Get Method:
_is_blocking(self)
Set Method:
_set_blocking(self, b)

incoming_window

The incoming tracking window for the messenger. The messenger will track the remote status of this many incoming deliveries after they have been accepted or rejected. Defaults to zero.

Messages enter this window only when you take them into your application using get. If your incoming window size is n, and you get n+1 messages without explicitly accepting or rejecting the oldest message, then the message that passes beyond the edge of the incoming window will be assigned the default disposition of its link.

Get Method:
_get_incoming_window(self)
Set Method:
_set_incoming_window(self, window)

outgoing_window

The outgoing tracking window for the messenger. The messenger will track the remote status of this many outgoing deliveries after calling send. Defaults to zero.

A Message enters this window when you call the put() method with the message. If your outgoing window size is n, and you call put n+1 times, status information will no longer be available for the first message.

Get Method:
_get_outgoing_window(self)
Set Method:
_set_outgoing_window(self, window)

stopped

Returns true iff a Messenger is in the stopped state. This function does not block.

Get Method:
unreachable.stopped(self) - Returns true iff a Messenger is in the stopped state.

receiving

Get Method:
unreachable.receiving(self)

outgoing

The outgoing queue depth.

Get Method:
unreachable.outgoing(self) - The outgoing queue depth.

incoming

The incoming queue depth.

Get Method:
unreachable.incoming(self) - The incoming queue depth.