Module proton.reactor
¶
Module Summary¶
A representation of the AMQP concept of a ‘container’, which loosely speaking is something that establishes links to or from another container, over which messages are transfered. |
|
Application defined event, which can optionally be associated with an engine object and or an arbitrary subject. |
|
Can be added to a |
|
A reconnect strategy involving an increasing delay between retries, up to a maximum or 10 seconds. |
|
Tracks the state of an AMQP 1.0 local transaction. |
Link Options¶
The methods Container.create_receiver()
and Container.create_sender()
take one or more link options to allow the details of the links to be customized.
Abstract interface for link configuration options. |
|
Abstract class for receiver options. |
|
Abstract class for sender options. |
|
Set at-least-once delivery semantics for message delivery. |
|
Set at-most-once delivery semantics for message delivery. |
|
Allows a map of link properties to be set on a link. |
|
Receiver option which allows incoming messages to be filtered. |
|
Configures a receiver with a message selector filter. |
|
Receiver option which sets both the configuration and delivery state to durable. |
|
Receiver option which copies messages to the receiver. |
|
Receiver option which moves messages to the receiver (rather than copying). |
Module Detail¶
- class proton.reactor.ApplicationEvent(typename: str, connection: Optional[proton._endpoints.Connection] = None, session: Optional[proton._endpoints.Session] = None, link: Optional[proton._endpoints.Link] = None, delivery: Optional[proton._delivery.Delivery] = None, subject: Optional[Any] = None)[source]¶
Bases:
proton._events.EventBase
Application defined event, which can optionally be associated with an engine object and or an arbitrary subject. This produces extended event types - see
proton.EventType
for details.- Parameters
typename – Event type name
connection – Associates this event with a connection.
session – Associates this event with a session.
link – Associate this event with a link.
delivery – Associate this event with a delivery.
subject – Associate this event with an arbitrary object
- TYPES = {}¶
- property context: proton._reactor.ApplicationEvent¶
A reference to this event.
- dispatch(handler: proton._events.Handler, type: Optional[proton._events.EventType] = None) None ¶
Process this event by sending it to all known handlers that are valid for this event type.
- Parameters
handler – Parent handler to process this event
type – Event type
- property type: proton._events.EventType¶
The type of this event.
- class proton.reactor.AtLeastOnce[source]¶
Bases:
proton._reactor.LinkOption
Set at-least-once delivery semantics for message delivery. This is achieved by setting the sender link settle mode to
proton.Link.SND_UNSETTLED
and the receiver link settle mode toproton.Link.RCV_FIRST
. This forces the receiver to settle all messages once they are successfully received.- apply(link: proton._endpoints.Link) None [source]¶
Set the at-least-once delivery semantics on the link.
- Parameters
link (
proton.Link
) – The link on which this option is to be applied.
- class proton.reactor.AtMostOnce[source]¶
Bases:
proton._reactor.LinkOption
Set at-most-once delivery semantics for message delivery. This is achieved by setting the sender link settle mode to
proton.Link.SND_SETTLED
(ie pre-settled).- apply(link: proton._endpoints.Link) None [source]¶
Set the at-most-once delivery semantics on the link.
- Parameters
link (
proton.Link
) – The link on which this option is to be applied.
- class proton.reactor.Backoff(**kwargs)[source]¶
Bases:
object
A reconnect strategy involving an increasing delay between retries, up to a maximum or 10 seconds. Repeated calls to
next()
returns a value for the next delay, starting with an initial value of 0 seconds.
- class proton.reactor.Container(*handlers, **kwargs)[source]¶
Bases:
proton._reactor.Reactor
A representation of the AMQP concept of a ‘container’, which loosely speaking is something that establishes links to or from another container, over which messages are transfered. This is an extension to the Reactor class that adds convenience methods for creating connections and sender- or receiver- links.
- connect(url: Optional[Union[str, proton._url.Url]] = None, urls: Optional[List[str]] = None, address: Optional[str] = None, handler: Optional[proton._events.Handler] = None, reconnect: Union[None, Literal[False], proton._reactor.Backoff] = None, heartbeat: Optional[float] = None, ssl_domain: Optional[proton._transport.SSLDomain] = None, **kwargs) proton._endpoints.Connection [source]¶
Initiates the establishment of an AMQP connection.
An optional JSON configuration file may be used to specify some connection parameters. If present, these will override some of those given in this call (see note below). Some connection parameters (for SSL/TLS) can only be provided through this file. The configuration file is located by searching for it as follows:
The location set in the environment variable
MESSAGING_CONNECT_FILE
./connect.json
~/.config/messaging/connect.json
/etc/messaging/connect.json
To use SSL/TLS for encryption (when an
amqps
URL scheme is used), the above configuration file must contain atls
submap containing the following configuration entries (Seeproton.SSLDomain
for details):ca
: Path to a database of trusted CAs that the server will advertise.cert
: Path to a file/database containing the identifying certificate.key
: An optional key to access the identifying certificate.verify
: IfFalse
, do not verify the peer name (proton.SSLDomain.ANONYMOUS_PEER
) or certificate. By default (or ifTrue
) verify the peer name and certificate using theca
above (proton.SSLDomain.VERIFY_PEER_NAME
).
- Parameters
url – URL string of process to connect to
urls – list of URL strings of process to try to connect to
reconnect – Reconnect is enabled by default. You can pass in an instance of
Backoff
to control reconnect behavior. A value ofFalse
will prevent the library from automatically trying to reconnect if the underlying socket is disconnected before the connection has been closed.heartbeat – A value in seconds indicating the desired frequency of heartbeats used to test the underlying socket is alive.
ssl_domain – SSL configuration.
handler – a connection scoped handler that will be called to process any events in the scope of this connection or its child links.
kwargs –
sasl_enabled
(bool
), which determines whether a sasl layer is used for the connection.allowed_mechs
(str
), an optional string specifying the SASL mechanisms allowed for this connection; the value is a space-separated list of mechanism names; the mechanisms allowed by default are determined by your SASL library and system configuration, with two exceptions:GSSAPI
andGSS-SPNEGO
are disabled by default; to enable them, you must explicitly add them using this option; clients must set the allowed mechanisms before the outgoing connection is attempted; servers must set them before the listening connection is setup.allow_insecure_mechs
(bool
), a flag indicating whether insecure mechanisms, such as PLAIN over a non-encrypted socket, are allowed.password
(str
), the authentication secret. Ignored withoutuser
kwarg also being present.user
(str
), the user to authenticate.virtual_host
(str
), the hostname to set in the Open performative used by peer to determine the correct back-end service for the client; ifvirtual_host
is not supplied the host field from the URL is used instead.offered_capabilities
, a list of capabilities being offered to the peer. The list must contain symbols (or strings, which will be converted to symbols).desired_capabilities
, a list of capabilities desired from the peer. The list must contain symbols (or strings, which will be converted to symbols).properties
, a list of connection properties. This must be a map with symbol keys (or string keys, which will be converted to symbol keys).sni
(str
), a hostname to use with SSL/TLS Server Name Indication (SNI)max_frame_size
(int
), the maximum allowable TCP packet size between the peers.
- Returns
A new connection object.
Note
Only one of
url
orurls
should be specified.Note
The following kwargs will be overridden by the values found in the JSON configuration file (if they exist there):
password
user
and the following kwargs will be overridden by the values found in the
sasl
sub-map of the above configuration file (if they exist there):sasl_enabled
allowed_mechs
- create_receiver(context: Union[proton._endpoints.Connection, proton._url.Url, str], source: Optional[str] = None, target: Optional[str] = None, name: Optional[str] = None, dynamic: bool = False, handler: Optional[proton._events.Handler] = None, options: Optional[Union[proton._reactor.ReceiverOption, List[proton._reactor.ReceiverOption], proton._reactor.LinkOption, List[proton._reactor.LinkOption]]] = None) Receiver [source]¶
Initiates the establishment of a link over which messages can be received (aka a subscription).
There are two patterns of use:
(1) A connection can be passed as the first argument, in which case the link is established on that connection. In this case the source address can be specified as the second argument (or as a keyword argument). The target address can also be specified if desired.
(2) Alternatively a URL can be passed as the first argument. In this case a new connection will be established on which the link will be attached. If a path is specified and the source is not, then the path of the URL is used as the target address.
The name of the link may be specified if desired, otherwise a unique name will be generated.
Various
LinkOption
s can be specified to further control the attachment.- Parameters
context – A connection object or a URL.
source – Address of source node.
target – Address of target node.
name – Receiver name.
dynamic – If
True
, indicates dynamic creation of the receiver.handler – Event handler for this receiver.
options – A single option, or a list of receiver options
- Returns
New receiver instance.
- create_sender(context: Union[str, proton._url.Url, proton._endpoints.Connection], target: Optional[str] = None, source: Optional[str] = None, name: Optional[str] = None, handler: Optional[proton._events.Handler] = None, tags: Optional[Callable[[], bytes]] = None, options: Optional[Union[SenderOption, List[SenderOption], LinkOption, List[LinkOption]]] = None) Sender [source]¶
Initiates the establishment of a link over which messages can be sent.
There are two patterns of use:
A connection can be passed as the first argument, in which case the link is established on that connection. In this case the target address can be specified as the second argument (or as a keyword argument). The source address can also be specified if desired.
Alternatively a URL can be passed as the first argument. In this case a new connection will be established on which the link will be attached. If a path is specified and the target is not, then the path of the URL is used as the target address.
The name of the link may be specified if desired, otherwise a unique name will be generated.
Various
LinkOption
s can be specified to further control the attachment.- Parameters
context – A connection object or a URL.
target – Address of target node.
source – Address of source node.
name – Sender name.
handler – Event handler for this sender.
tags – Function to generate tags for this sender of the form
def simple_tags():
and returns abytes
typeoptions – A single option, or a list of sender options
- Returns
New sender instance.
- declare_transaction(context: proton._endpoints.Connection, handler: Optional[TransactionHandler] = None, settle_before_discharge: bool = False) proton._reactor.Transaction [source]¶
Declare a local transaction.
- Parameters
context – Context for the transaction, usually the connection.
handler – Handler for transactional events.
settle_before_discharge – Settle all transaction control messages before the transaction is discharged.
- listen(url: Union[str, proton._url.Url], ssl_domain: Optional[proton._transport.SSLDomain] = None) proton._reactor.Acceptor [source]¶
Initiates a server socket, accepting incoming AMQP connections on the interface and port specified.
- Parameters
url – URL on which to listen for incoming AMQP connections.
ssl_domain – SSL configuration object if SSL is to be used,
None
otherwise.
- run() None ¶
Start the processing of events and messages for this container.
- schedule(delay: Union[float, int], handler: proton._events.Handler) proton._reactor.Task ¶
Schedule a task to run on this container after a given delay, and using the supplied handler.
- Parameters
delay –
handler –
- selectable(handler: Optional[Union[Acceptor, EventInjector]] = None, delegate: Optional[socket] = None) proton._selectable.Selectable ¶
NO IDEA!
- Parameters
handler – no idea
delegate – no idea
- class proton.reactor.Copy[source]¶
Bases:
proton._reactor.ReceiverOption
Receiver option which copies messages to the receiver. This ensures that all receivers receive all incoming messages, no matter how many receivers there are. This is achieved by setting the receiver source distribution mode to
proton.Terminus.DIST_MODE_COPY
.
- class proton.reactor.DurableSubscription[source]¶
Bases:
proton._reactor.ReceiverOption
Receiver option which sets both the configuration and delivery state to durable. This is achieved by setting the receiver’s source durability to
proton.Terminus.DELIVERIES
and the source expiry policy toproton.Terminus.EXPIRE_NEVER
.
- class proton.reactor.DynamicNodeProperties(props: dict = {})[source]¶
Bases:
proton._reactor.LinkOption
Allows a map of link properties to be set on a link. The keys may be
proton.symbol
or strings (in which case they will be converted to symbols before being applied).- Parameters
props – A map of link options to be applied to a link.
- apply(link: proton._endpoints.Link) None [source]¶
Set the map of properties on the specified link.
- Parameters
link – The link on which this property map is to be set.
- class proton.reactor.EventInjector[source]¶
Bases:
object
Can be added to a
Container
to allow events to be triggered by an external thread but handled on the event thread associated with the container. An instance of this class can be passed to theContainer.selectable()
method in order to activate it.close()
should be called when it is no longer needed, to allow the event loop to end if needed.- close() None [source]¶
Request that this EventInjector be closed. Existing events will be dispatched on the container’s event dispatch thread, then this will be removed from the set of interest.
- on_selectable_init(event: proton._events.Event) None [source]¶
- on_selectable_readable(event: proton._events.Event) None [source]¶
- trigger(event: proton._reactor.ApplicationEvent) None [source]¶
Request that the given event be dispatched on the event thread of the container to which this EventInjector was added.
- Parameters
event (
proton.Event
,ApplicationEvent
) – Event to be injected
- class proton.reactor.Filter(filter_set: Dict[proton._data.symbol, proton._data.Described] = {})[source]¶
Bases:
proton._reactor.ReceiverOption
Receiver option which allows incoming messages to be filtered.
- Parameters
filter_set – A map of filters with
proton.symbol
keys containing the filter name, and the value a filter string.
- class proton.reactor.LinkOption[source]¶
Bases:
object
Abstract interface for link configuration options
- apply(link: proton._endpoints.Link) None [source]¶
Subclasses will implement any configuration logic in this method
- test(link: proton._endpoints.Link) bool [source]¶
Subclasses can override this to selectively apply an option e.g. based on some link criteria
- class proton.reactor.Move[source]¶
Bases:
proton._reactor.ReceiverOption
Receiver option which moves messages to the receiver (rather than copying). This has the effect of distributing the incoming messages between the receivers. This is achieved by setting the receiver source distribution mode to
proton.Terminus.DIST_MODE_MOVE
.
- class proton.reactor.ReceiverOption[source]¶
Bases:
proton._reactor.LinkOption
Abstract class for receiver options
- class proton.reactor.Selector(value: Union[bytes, str], name: str = 'selector')[source]¶
Bases:
proton._reactor.Filter
Configures a receiver with a message selector filter
- Parameters
value – Selector filter string
name – Name of the selector, defaults to
"selector"
.
- class proton.reactor.SenderOption[source]¶
Bases:
proton._reactor.LinkOption
Abstract class for sender options.
- class proton.reactor.Transaction(txn_ctrl: Sender, handler: TransactionHandler, settle_before_discharge: bool = False)[source]¶
Bases:
object
Tracks the state of an AMQP 1.0 local transaction. In typical usage, this object is not created directly, but is obtained through the event returned by
proton.handlers.TransactionHandler.on_transaction_declared()
after a call toproton.reactor.Container.declare_transaction()
.To send messages under this transaction, use
send()
.To receive messages under this transaction, call
accept()
once the message is received (typically from theproton.handlers.MessagingHandler.on_message()
callback).To discharge the transaction, call either
commit()
(for a successful transaction), orabort()
(for a failed transaction).- abort() None [source]¶
Abort or roll back this transaction. Closes the transaction as a failure, and reverses, or rolls back all actions (sent and received messages) performed under this transaction.
- accept(delivery: proton._delivery.Delivery) None [source]¶
Accept a received message under this transaction.
- Parameters
delivery – Delivery object for the received message.
- send(sender: Sender, msg: proton._message.Message, tag: Optional[str] = None) proton._delivery.Delivery [source]¶
Send a message under this transaction.
- Parameters
sender – Link over which to send the message.
msg – Message to be sent under this transaction.
tag – The delivery tag
- Returns
Delivery object for this message.