Module proton.reactor

Module Summary


Container

A representation of the AMQP concept of a ‘container’, which loosely speaking is something that establishes links to or from another container, over which messages are transfered.

ApplicationEvent

Application defined event, which can optionally be associated with an engine object and or an arbitrary subject.

EventInjector

Can be added to a Container to allow events to be triggered by an external thread but handled on the event thread associated with the container.

Backoff

A reconnect strategy involving an increasing delay between retries, up to a maximum or 10 seconds.

Transaction

Tracks the state of an AMQP 1.0 local transaction.


Module Detail


class proton.reactor.ApplicationEvent(typename: str, connection: Optional[proton._endpoints.Connection] = None, session: Optional[proton._endpoints.Session] = None, link: Optional[proton._endpoints.Link] = None, delivery: Optional[proton._delivery.Delivery] = None, subject: Optional[Any] = None)[source]

Bases: proton._events.EventBase

Application defined event, which can optionally be associated with an engine object and or an arbitrary subject. This produces extended event types - see proton.EventType for details.

Parameters
  • typename – Event type name

  • connection – Associates this event with a connection.

  • session – Associates this event with a session.

  • link – Associate this event with a link.

  • delivery – Associate this event with a delivery.

  • subject – Associate this event with an arbitrary object

TYPES = {}
property context: proton._reactor.ApplicationEvent

A reference to this event.

dispatch(handler: proton._events.Handler, type: Optional[proton._events.EventType] = None) None

Process this event by sending it to all known handlers that are valid for this event type.

Parameters
  • handler – Parent handler to process this event

  • type – Event type

property type: proton._events.EventType

The type of this event.


class proton.reactor.AtLeastOnce[source]

Bases: proton._reactor.LinkOption

Set at-least-once delivery semantics for message delivery. This is achieved by setting the sender link settle mode to proton.Link.SND_UNSETTLED and the receiver link settle mode to proton.Link.RCV_FIRST. This forces the receiver to settle all messages once they are successfully received.

apply(link: proton._endpoints.Link) None[source]

Set the at-least-once delivery semantics on the link.

Parameters

link (proton.Link) – The link on which this option is to be applied.


class proton.reactor.AtMostOnce[source]

Bases: proton._reactor.LinkOption

Set at-most-once delivery semantics for message delivery. This is achieved by setting the sender link settle mode to proton.Link.SND_SETTLED (ie pre-settled).

apply(link: proton._endpoints.Link) None[source]

Set the at-most-once delivery semantics on the link.

Parameters

link (proton.Link) – The link on which this option is to be applied.


class proton.reactor.Backoff(**kwargs)[source]

Bases: object

A reconnect strategy involving an increasing delay between retries, up to a maximum or 10 seconds. Repeated calls to next() returns a value for the next delay, starting with an initial value of 0 seconds.


class proton.reactor.Container(*handlers, **kwargs)[source]

Bases: proton._reactor.Reactor

A representation of the AMQP concept of a ‘container’, which loosely speaking is something that establishes links to or from another container, over which messages are transfered. This is an extension to the Reactor class that adds convenience methods for creating connections and sender- or receiver- links.

connect(url: Optional[Union[str, proton._url.Url]] = None, urls: Optional[List[str]] = None, address: Optional[str] = None, handler: Optional[proton._events.Handler] = None, reconnect: Union[None, Literal[False], proton._reactor.Backoff] = None, heartbeat: Optional[float] = None, ssl_domain: Optional[proton._transport.SSLDomain] = None, **kwargs) proton._endpoints.Connection[source]

Initiates the establishment of an AMQP connection.

An optional JSON configuration file may be used to specify some connection parameters. If present, these will override some of those given in this call (see note below). Some connection parameters (for SSL/TLS) can only be provided through this file. The configuration file is located by searching for it as follows:

  1. The location set in the environment variable MESSAGING_CONNECT_FILE

  2. ./connect.json

  3. ~/.config/messaging/connect.json

  4. /etc/messaging/connect.json

To use SSL/TLS for encryption (when an amqps URL scheme is used), the above configuration file must contain a tls submap containing the following configuration entries (See proton.SSLDomain for details):

  • ca: Path to a database of trusted CAs that the server will advertise.

  • cert: Path to a file/database containing the identifying certificate.

  • key: An optional key to access the identifying certificate.

  • verify: If False, do not verify the peer name (proton.SSLDomain.ANONYMOUS_PEER) or certificate. By default (or if True) verify the peer name and certificate using the ca above (proton.SSLDomain.VERIFY_PEER_NAME).

Parameters
  • url – URL string of process to connect to

  • urls – list of URL strings of process to try to connect to

  • reconnect – Reconnect is enabled by default. You can pass in an instance of Backoff to control reconnect behavior. A value of False will prevent the library from automatically trying to reconnect if the underlying socket is disconnected before the connection has been closed.

  • heartbeat – A value in seconds indicating the desired frequency of heartbeats used to test the underlying socket is alive.

  • ssl_domain – SSL configuration.

  • handler – a connection scoped handler that will be called to process any events in the scope of this connection or its child links.

  • kwargs

    • sasl_enabled (bool), which determines whether a sasl layer is used for the connection.

    • allowed_mechs (str), an optional string specifying the SASL mechanisms allowed for this connection; the value is a space-separated list of mechanism names; the mechanisms allowed by default are determined by your SASL library and system configuration, with two exceptions: GSSAPI and GSS-SPNEGO are disabled by default; to enable them, you must explicitly add them using this option; clients must set the allowed mechanisms before the outgoing connection is attempted; servers must set them before the listening connection is setup.

    • allow_insecure_mechs (bool), a flag indicating whether insecure mechanisms, such as PLAIN over a non-encrypted socket, are allowed.

    • password (str), the authentication secret. Ignored without user kwarg also being present.

    • user (str), the user to authenticate.

    • virtual_host (str), the hostname to set in the Open performative used by peer to determine the correct back-end service for the client; if virtual_host is not supplied the host field from the URL is used instead.

    • offered_capabilities, a list of capabilities being offered to the peer. The list must contain symbols (or strings, which will be converted to symbols).

    • desired_capabilities, a list of capabilities desired from the peer. The list must contain symbols (or strings, which will be converted to symbols).

    • properties, a list of connection properties. This must be a map with symbol keys (or string keys, which will be converted to symbol keys).

    • sni (str), a hostname to use with SSL/TLS Server Name Indication (SNI)

    • max_frame_size (int), the maximum allowable TCP packet size between the peers.

Returns

A new connection object.

Note

Only one of url or urls should be specified.

Note

The following kwargs will be overridden by the values found in the JSON configuration file (if they exist there):

  • password

  • user

and the following kwargs will be overridden by the values found in the sasl sub-map of the above configuration file (if they exist there):

  • sasl_enabled

  • allowed_mechs

create_receiver(context: Union[proton._endpoints.Connection, proton._url.Url, str], source: Optional[str] = None, target: Optional[str] = None, name: Optional[str] = None, dynamic: bool = False, handler: Optional[proton._events.Handler] = None, options: Optional[Union[proton._reactor.ReceiverOption, List[proton._reactor.ReceiverOption], proton._reactor.LinkOption, List[proton._reactor.LinkOption]]] = None) Receiver[source]

Initiates the establishment of a link over which messages can be received (aka a subscription).

There are two patterns of use:

(1) A connection can be passed as the first argument, in which case the link is established on that connection. In this case the source address can be specified as the second argument (or as a keyword argument). The target address can also be specified if desired.

(2) Alternatively a URL can be passed as the first argument. In this case a new connection will be established on which the link will be attached. If a path is specified and the source is not, then the path of the URL is used as the target address.

The name of the link may be specified if desired, otherwise a unique name will be generated.

Various LinkOption s can be specified to further control the attachment.

Parameters
  • context – A connection object or a URL.

  • source – Address of source node.

  • target – Address of target node.

  • name – Receiver name.

  • dynamic – If True, indicates dynamic creation of the receiver.

  • handler – Event handler for this receiver.

  • options – A single option, or a list of receiver options

Returns

New receiver instance.

create_sender(context: Union[str, proton._url.Url, proton._endpoints.Connection], target: Optional[str] = None, source: Optional[str] = None, name: Optional[str] = None, handler: Optional[proton._events.Handler] = None, tags: Optional[Callable[[], bytes]] = None, options: Optional[Union[SenderOption, List[SenderOption], LinkOption, List[LinkOption]]] = None) Sender[source]

Initiates the establishment of a link over which messages can be sent.

There are two patterns of use:

  1. A connection can be passed as the first argument, in which case the link is established on that connection. In this case the target address can be specified as the second argument (or as a keyword argument). The source address can also be specified if desired.

  2. Alternatively a URL can be passed as the first argument. In this case a new connection will be established on which the link will be attached. If a path is specified and the target is not, then the path of the URL is used as the target address.

The name of the link may be specified if desired, otherwise a unique name will be generated.

Various LinkOption s can be specified to further control the attachment.

Parameters
  • context – A connection object or a URL.

  • target – Address of target node.

  • source – Address of source node.

  • name – Sender name.

  • handler – Event handler for this sender.

  • tags – Function to generate tags for this sender of the form def simple_tags(): and returns a bytes type

  • options – A single option, or a list of sender options

Returns

New sender instance.

declare_transaction(context: proton._endpoints.Connection, handler: Optional[TransactionHandler] = None, settle_before_discharge: bool = False) proton._reactor.Transaction[source]

Declare a local transaction.

Parameters
  • context – Context for the transaction, usually the connection.

  • handler – Handler for transactional events.

  • settle_before_discharge – Settle all transaction control messages before the transaction is discharged.

do_work(timeout: Optional[float] = None) bool[source]
listen(url: Union[str, proton._url.Url], ssl_domain: Optional[proton._transport.SSLDomain] = None) proton._reactor.Acceptor[source]

Initiates a server socket, accepting incoming AMQP connections on the interface and port specified.

Parameters
  • url – URL on which to listen for incoming AMQP connections.

  • ssl_domain – SSL configuration object if SSL is to be used, None otherwise.

run() None

Start the processing of events and messages for this container.

schedule(delay: Union[float, int], handler: proton._events.Handler) proton._reactor.Task

Schedule a task to run on this container after a given delay, and using the supplied handler.

Parameters
  • delay

  • handler

selectable(handler: Optional[Union[Acceptor, EventInjector]] = None, delegate: Optional[socket] = None) proton._selectable.Selectable

NO IDEA!

Parameters
  • handler – no idea

  • delegate – no idea


class proton.reactor.Copy[source]

Bases: proton._reactor.ReceiverOption

Receiver option which copies messages to the receiver. This ensures that all receivers receive all incoming messages, no matter how many receivers there are. This is achieved by setting the receiver source distribution mode to proton.Terminus.DIST_MODE_COPY.

apply(receiver: Receiver)[source]

Set message copy semantics on the specified receiver.

Parameters

receiver – The receiver on which message copy semantics is to be set.


class proton.reactor.DurableSubscription[source]

Bases: proton._reactor.ReceiverOption

Receiver option which sets both the configuration and delivery state to durable. This is achieved by setting the receiver’s source durability to proton.Terminus.DELIVERIES and the source expiry policy to proton.Terminus.EXPIRE_NEVER.

apply(receiver: Receiver)[source]

Set durability on the specified receiver.

Parameters

receiver – The receiver on which durability is to be set.


class proton.reactor.DynamicNodeProperties(props: dict = {})[source]

Bases: proton._reactor.LinkOption

Allows a map of link properties to be set on a link. The keys may be proton.symbol or strings (in which case they will be converted to symbols before being applied).

Parameters

props – A map of link options to be applied to a link.

apply(link: proton._endpoints.Link) None[source]

Set the map of properties on the specified link.

Parameters

link – The link on which this property map is to be set.


class proton.reactor.EventInjector[source]

Bases: object

Can be added to a Container to allow events to be triggered by an external thread but handled on the event thread associated with the container. An instance of this class can be passed to the Container.selectable() method in order to activate it. close() should be called when it is no longer needed, to allow the event loop to end if needed.

close() None[source]

Request that this EventInjector be closed. Existing events will be dispatched on the container’s event dispatch thread, then this will be removed from the set of interest.

fileno() int[source]
on_selectable_init(event: proton._events.Event) None[source]
on_selectable_readable(event: proton._events.Event) None[source]
trigger(event: proton._reactor.ApplicationEvent) None[source]

Request that the given event be dispatched on the event thread of the container to which this EventInjector was added.

Parameters

event (proton.Event, ApplicationEvent) – Event to be injected


class proton.reactor.Filter(filter_set: Dict[proton._data.symbol, proton._data.Described] = {})[source]

Bases: proton._reactor.ReceiverOption

Receiver option which allows incoming messages to be filtered.

Parameters

filter_set – A map of filters with proton.symbol keys containing the filter name, and the value a filter string.

apply(receiver: Receiver) None[source]

Set the filter on the specified receiver.

Parameters

receiver – The receiver on which this filter is to be applied.


class proton.reactor.LinkOption[source]

Bases: object

Abstract interface for link configuration options

apply(link: proton._endpoints.Link) None[source]

Subclasses will implement any configuration logic in this method

test(link: proton._endpoints.Link) bool[source]

Subclasses can override this to selectively apply an option e.g. based on some link criteria


class proton.reactor.Move[source]

Bases: proton._reactor.ReceiverOption

Receiver option which moves messages to the receiver (rather than copying). This has the effect of distributing the incoming messages between the receivers. This is achieved by setting the receiver source distribution mode to proton.Terminus.DIST_MODE_MOVE.

apply(receiver: Receiver)[source]

Set message move semantics on the specified receiver.

Parameters

receiver – The receiver on which message move semantics is to be set.


class proton.reactor.ReceiverOption[source]

Bases: proton._reactor.LinkOption

Abstract class for receiver options

apply(receiver: Receiver) None[source]

Set the option on the receiver.

Parameters

receiver – The receiver on which this option is to be applied.


class proton.reactor.Selector(value: Union[bytes, str], name: str = 'selector')[source]

Bases: proton._reactor.Filter

Configures a receiver with a message selector filter

Parameters
  • value – Selector filter string

  • name – Name of the selector, defaults to "selector".

apply(receiver: Receiver) None

Set the filter on the specified receiver.

Parameters

receiver – The receiver on which this filter is to be applied.


class proton.reactor.SenderOption[source]

Bases: proton._reactor.LinkOption

Abstract class for sender options.

apply(sender: Sender) None[source]

Set the option on the sender.

Parameters

sender – The sender on which this option is to be applied.


class proton.reactor.Transaction(txn_ctrl: Sender, handler: TransactionHandler, settle_before_discharge: bool = False)[source]

Bases: object

Tracks the state of an AMQP 1.0 local transaction. In typical usage, this object is not created directly, but is obtained through the event returned by proton.handlers.TransactionHandler.on_transaction_declared() after a call to proton.reactor.Container.declare_transaction().

To send messages under this transaction, use send().

To receive messages under this transaction, call accept() once the message is received (typically from the proton.handlers.MessagingHandler.on_message() callback).

To discharge the transaction, call either commit() (for a successful transaction), or abort() (for a failed transaction).

abort() None[source]

Abort or roll back this transaction. Closes the transaction as a failure, and reverses, or rolls back all actions (sent and received messages) performed under this transaction.

accept(delivery: proton._delivery.Delivery) None[source]

Accept a received message under this transaction.

Parameters

delivery – Delivery object for the received message.

commit() None[source]

Commit this transaction. Closes the transaction as a success.

send(sender: Sender, msg: proton._message.Message, tag: Optional[str] = None) proton._delivery.Delivery[source]

Send a message under this transaction.

Parameters
  • sender – Link over which to send the message.

  • msg – Message to be sent under this transaction.

  • tag – The delivery tag

Returns

Delivery object for this message.