Qpid Proton C API 0.40.0
 
Loading...
Searching...
No Matches
Proactor

Unsettled API - An API for multithreaded IO. More...

Macros

#define PN_MAX_ADDR
 Size of buffer that can hold the largest connection or listening address.
 

Typedefs

typedef struct pn_netaddr_t pn_netaddr_t
 Unsettled API - The network address of a proactor transport.
 
typedef struct pn_proactor_t pn_proactor_t
 A harness for multithreaded IO.
 
typedef struct pn_event_batch_t pn_event_batch_t
 A batch of events that must be handled in sequence.
 

Functions

PNP_EXTERN int pn_netaddr_str (const pn_netaddr_t *addr, char *buf, size_t size)
 Format a network address string in buf.
 
PNP_EXTERN const pn_netaddr_tpn_transport_local_addr (pn_transport_t *t)
 Get the local address of a transport.
 
PNP_EXTERN const pn_netaddr_tpn_transport_remote_addr (pn_transport_t *t)
 Get the local address of a transport.
 
PNP_EXTERN const pn_netaddr_tpn_listener_addr (pn_listener_t *l)
 Get the listening addresses of a listener.
 
PNP_EXTERN const pn_netaddr_tpn_netaddr_next (const pn_netaddr_t *na)
 
PNP_EXTERN const struct sockaddr * pn_netaddr_sockaddr (const pn_netaddr_t *na)
 On POSIX or Windows, get the underlying struct sockaddr.
 
PNP_EXTERN size_t pn_netaddr_socklen (const pn_netaddr_t *na)
 On POSIX or Windows, get the size of the underlying struct sockaddr.
 
PNP_EXTERN int pn_netaddr_host_port (const pn_netaddr_t *na, char *host, size_t hlen, char *port, size_t plen)
 Get the host and port name from na as separate strings.
 
PNP_EXTERN const pn_netaddr_tpn_netaddr_local (pn_transport_t *t)
 Deprecated - Use pn_transport_local_addr()
 
PNP_EXTERN const pn_netaddr_tpn_netaddr_remote (pn_transport_t *t)
 Deprecated - Use pn_transport_remote_addr()
 
PNP_EXTERN const pn_netaddr_tpn_netaddr_listening (pn_listener_t *l)
 Deprecated - Use pn_listener_addr()
 
PNP_EXTERN int pn_proactor_addr (char *addr, size_t size, const char *host, const char *port)
 Format a host:port address string for pn_proactor_connect() or pn_proactor_listen()
 
PNP_EXTERN pn_proactor_tpn_proactor (void)
 Create a proactor.
 
PNP_EXTERN void pn_proactor_free (pn_proactor_t *proactor)
 Free the proactor.
 
PNP_EXTERN void pn_proactor_connect2 (pn_proactor_t *proactor, pn_connection_t *connection, pn_transport_t *transport, const char *addr)
 Connect transport to addr and bind to connection.
 
PNP_EXTERN void pn_proactor_connect (pn_proactor_t *proactor, pn_connection_t *connection, const char *addr)
 Deprecated - Use pn_proactor_connect2()
 
PNP_EXTERN void pn_proactor_listen (pn_proactor_t *proactor, pn_listener_t *listener, const char *addr, int backlog)
 Start listening for incoming connections.
 
PNP_EXTERN void pn_proactor_disconnect (pn_proactor_t *proactor, pn_condition_t *condition)
 Disconnect all connections and listeners belonging to the proactor.
 
PNP_EXTERN pn_event_batch_tpn_proactor_wait (pn_proactor_t *proactor)
 Wait until there are Proactor events to handle.
 
PNP_EXTERN pn_event_batch_tpn_proactor_get (pn_proactor_t *proactor)
 Return Proactor events if any are available immediately.
 
PNP_EXTERN pn_event_tpn_event_batch_next (pn_event_batch_t *batch)
 Remove the next event from the batch and return it.
 
PNP_EXTERN pn_proactor_tpn_event_batch_proactor (pn_event_batch_t *batch)
 Query the batch for the subject of the batch.
 
PNP_EXTERN pn_listener_tpn_event_batch_listener (pn_event_batch_t *batch)
 Query the batch for the subject of the batch.
 
PNP_EXTERN pn_connection_tpn_event_batch_connection (pn_event_batch_t *batch)
 Query the batch for the subject of the batch.
 
PNP_EXTERN void pn_proactor_done (pn_proactor_t *proactor, pn_event_batch_t *events)
 Call when finished handling a batch of events.
 
PNP_EXTERN void pn_proactor_interrupt (pn_proactor_t *proactor)
 Return a PN_PROACTOR_INTERRUPT event as soon as possible.
 
PNP_EXTERN void pn_proactor_set_timeout (pn_proactor_t *proactor, pn_millis_t timeout)
 Return a PN_PROACTOR_TIMEOUT after timeout milliseconds elapse.
 
PNP_EXTERN void pn_proactor_cancel_timeout (pn_proactor_t *proactor)
 Cancel the pending timeout set by pn_proactor_set_timeout().
 
PNP_EXTERN void pn_proactor_release_connection (pn_connection_t *connection)
 Release ownership of connection, disassociate it from its proactor.
 
PNP_EXTERN void pn_connection_wake (pn_connection_t *connection)
 Return a PN_CONNECTION_WAKE event for connection as soon as possible.
 
PNP_EXTERN void pn_connection_write_flush (pn_connection_t *connection)
 Unsettled API Send available AMQP protocol frames to the remote peer.
 
PNP_EXTERN pn_proactor_tpn_connection_proactor (pn_connection_t *connection)
 Return the proactor associated with a connection.
 
PNP_EXTERN pn_proactor_tpn_event_proactor (pn_event_t *event)
 Return the proactor associated with an event.
 
PNP_EXTERN pn_millis_t pn_proactor_now (void)
 
PNP_EXTERN int64_t pn_proactor_now_64 (void)
 Get the real elapsed time since an arbitrary point in the past in milliseconds.
 
PNP_EXTERN void pn_proactor_raw_connect (pn_proactor_t *proactor, pn_raw_connection_t *raw_connection, const char *addr)
 Connect addr and bind to raw_connection.
 

Detailed Description

Unsettled API - An API for multithreaded IO.

The proactor associates an abstract AMQP protocol Connection with a concrete IO Transport implementation for outgoing and incoming connections.

pn_proactor_wait() returns Proactor events to application threads for handling.

The pn_proactor_* functions are thread-safe, but to handle Proactor events you must also use the Core APIs, which are not. Core objects associated with different connections can be used concurrently, but objects associated with a single connection can only be used from their own thread.

The proactor serializes Proactor events for each connection

pn_connection_wake() allows any thread to "wake up" a connection. It causes pn_proactor_wait() to return a PN_CONNECTION_WAKE event that is serialized with the connection's other Proactor events. You can use this to implement communication between different connections, or from non-proactor threads.

Serialization and pn_connection_wake() simplify building applications with a shared thread pool, which serialize work per connection. Many other variations are possible, but you are responsible for any additional synchronization needed.

Typedef Documentation

◆ pn_event_batch_t

A batch of events that must be handled in sequence.

A pn_event_batch_t encapsulates potentially multiple events that relate to an individual proactor related source that must be handled in sequence.

Call pn_event_batch_next() in a loop until it returns NULL to extract the batch's events.

Function Documentation

◆ pn_netaddr_str()

PNP_EXTERN int pn_netaddr_str ( const pn_netaddr_t addr,
char *  buf,
size_t  size 
)

Format a network address string in buf.

Returns
the length of the string (excluding trailing '\0'), if >= size then the address was truncated.

◆ pn_transport_local_addr()

PNP_EXTERN const pn_netaddr_t * pn_transport_local_addr ( pn_transport_t t)

Get the local address of a transport.

Return NULL if not available. Pointer is invalid after the transport closes (PN_TRANSPORT_CLOSED event is handled)

◆ pn_transport_remote_addr()

PNP_EXTERN const pn_netaddr_t * pn_transport_remote_addr ( pn_transport_t t)

Get the local address of a transport.

Return NULL if not available. Pointer is invalid after the transport closes (PN_TRANSPORT_CLOSED event is handled)

◆ pn_listener_addr()

PNP_EXTERN const pn_netaddr_t * pn_listener_addr ( pn_listener_t l)

Get the listening addresses of a listener.

Addresses are only available after the PN_LISTENER_OPEN event for the listener.

A listener can have more than one address for several reasons:

  • DNS host records may indicate more than one address
  • On a multi-homed host, listening on the default host "" will listen on all local addresses.
  • Some IPv4/IPV6 configurations may expand a single address into a v4/v6 pair.

pn_netaddr_next() will iterate over the addresses in the list.

Parameters
lpoints to the listener
Returns
The first listening address or NULL if there are no addresses are available. Use pn_netaddr_next() to iterate over the list. Pointer is invalid after the listener closes (PN_LISTENER_CLOSED event is handled)
Examples
broker.c, and direct.c.

◆ pn_netaddr_next()

PNP_EXTERN const pn_netaddr_t * pn_netaddr_next ( const pn_netaddr_t na)
Returns
Pointer to the next address in a list of addresses, NULL if at the end of the list or if this address is not part of a list.

◆ pn_netaddr_sockaddr()

PNP_EXTERN const struct sockaddr * pn_netaddr_sockaddr ( const pn_netaddr_t na)

On POSIX or Windows, get the underlying struct sockaddr.

Return NULL if not available.

◆ pn_netaddr_socklen()

PNP_EXTERN size_t pn_netaddr_socklen ( const pn_netaddr_t na)

On POSIX or Windows, get the size of the underlying struct sockaddr.

Return 0 if not available.

◆ pn_netaddr_host_port()

PNP_EXTERN int pn_netaddr_host_port ( const pn_netaddr_t na,
char *  host,
size_t  hlen,
char *  port,
size_t  plen 
)

Get the host and port name from na as separate strings.

Returns 0 if successful, non-0 on error.

Examples
broker.c, and direct.c.

◆ pn_proactor_addr()

PNP_EXTERN int pn_proactor_addr ( char *  addr,
size_t  size,
const char *  host,
const char *  port 
)

Format a host:port address string for pn_proactor_connect() or pn_proactor_listen()

Parameters
[out]addraddress is copied to this buffer, with trailing '\0'
[in]sizesize of addr buffer
[in]hostnetwork host name, DNS name or IP address
[in]portnetwork service name or decimal port number, e.g. "amqp" or "5672"
Returns
the length of network address (excluding trailing '\0'), if >= size then the address was truncated
Examples
broker.c, direct.c, receive.c, and send.c.

◆ pn_proactor()

PNP_EXTERN pn_proactor_t * pn_proactor ( void  )

Create a proactor.

Must be freed with pn_proactor_free()

Examples
broker.c, direct.c, receive.c, and send.c.

◆ pn_proactor_free()

PNP_EXTERN void pn_proactor_free ( pn_proactor_t proactor)

Free the proactor.

Abort open connections/listeners, clean up all resources.

Examples
broker.c, direct.c, receive.c, and send.c.

◆ pn_proactor_connect2()

PNP_EXTERN void pn_proactor_connect2 ( pn_proactor_t proactor,
pn_connection_t connection,
pn_transport_t transport,
const char *  addr 
)

Connect transport to addr and bind to connection.

Errors are returned as PN_TRANSPORT_CLOSED events by pn_proactor_wait().

Note
Thread-safe
Parameters
[in]proactorthe proactor object
[in]connectionIf NULL a new connection is created. proactor takes ownership of connection and will automatically call pn_connection_free() after the final PN_TRANSPORT_CLOSED event is handled, or when pn_proactor_free() is called. You can prevent the automatic free with pn_proactor_release_connection()
[in]transportIf NULL a new transport is created. proactor takes ownership of transport, it will be freed even if pn_proactor_release_connection() is called.
[in]addrthe "host:port" network address, constructed by pn_proactor_addr() An empty host will connect to the local host via the default protocol (IPV6 or IPV4). An empty port will connect to the standard AMQP port (5672).
Examples
receive.c, and send.c.

◆ pn_proactor_listen()

PNP_EXTERN void pn_proactor_listen ( pn_proactor_t proactor,
pn_listener_t listener,
const char *  addr,
int  backlog 
)

Start listening for incoming connections.

pn_proactor_wait() will return a PN_LISTENER_OPEN event when the listener is ready to accept connections, or a PN_LISTENER_CLOSE if the listen operation fails. If the listen failed, pn_listener_condition() will be set.

When the listener is closed by pn_listener_close(), or because of an error, a PN_LISTENER_CLOSE event will be returned and pn_listener_condition() will be set for an error.

Note
Thread-safe
Parameters
[in]proactorthe proactor object
[in]listenerproactor takes ownership of listener, and will automatically call pn_listener_free() after the final PN_LISTENER_CLOSE event is handled, or when pn_proactor_free() is called.
[in]addrthe "host:port" network address, constructed by pn_proactor_addr() An empty host will listen for all protocols (IPV6 and IPV4) on all local interfaces. An empty port will listen on the standard AMQP port (5672).
[in]backlogof un-handled connection requests to allow before refusing connections. If addr resolves to multiple interface/protocol combinations, the backlog applies to each separately.
Examples
broker.c, and direct.c.

◆ pn_proactor_disconnect()

PNP_EXTERN void pn_proactor_disconnect ( pn_proactor_t proactor,
pn_condition_t condition 
)

Disconnect all connections and listeners belonging to the proactor.

PN_LISTENER_CLOSE, PN_TRANSPORT_CLOSED and other Proactor events are generated as usual.

A PN_PROACTOR_INACTIVE event will be generated when all connections and listeners are disconnected and no timeout is pending. The event will also be generated if there are no listeners, connections or timeout when pn_proactor_disconnect() is called.

Creating new connections and listeners after this call and before the PN_PROACTOR_INACTIVE event may prevent the proactor from becoming inactive. After the PN_PROACTOR_INACTIVE event, the proactor can be used normally.

Note
Thread-safe
Parameters
proactorthe proactor
conditionif not NULL the condition data is copied to each disconnected transports and listener and is available in the close event.

◆ pn_proactor_wait()

PNP_EXTERN pn_event_batch_t * pn_proactor_wait ( pn_proactor_t proactor)

Wait until there are Proactor events to handle.

You must call pn_proactor_done() when you are finished with the batch, you must not use the batch pointer after calling pn_proactor_done().

Normally it is most efficient to handle the entire batch in the calling thread and then call pn_proactor_done(), but see pn_proactor_done() for more options.

pn_proactor_get() is a non-blocking version of this call.

Note
Thread-safe
Returns
a non-empty batch of events that must be processed in sequence.
Examples
broker.c, direct.c, receive.c, and send.c.

◆ pn_proactor_get()

PNP_EXTERN pn_event_batch_t * pn_proactor_get ( pn_proactor_t proactor)

Return Proactor events if any are available immediately.

If not, return NULL. If the return value is not NULL, the behavior is the same as pn_proactor_wait()

Note
Thread-safe

◆ pn_event_batch_next()

PNP_EXTERN pn_event_t * pn_event_batch_next ( pn_event_batch_t batch)

Remove the next event from the batch and return it.

NULL means the batch is empty. The returned event pointer is valid until pn_event_batch_next() is called again on the same batch.

Note
there is deliberately no peek(), more() or other look-ahead on an event batch. We want to know exactly which events have been handled, next() only allows the user to get each event exactly once, in order.
Examples
broker.c, direct.c, receive.c, and send.c.

◆ pn_event_batch_proactor()

PNP_EXTERN pn_proactor_t * pn_event_batch_proactor ( pn_event_batch_t batch)

Query the batch for the subject of the batch.

If it is a proactor then it is returned. NULL means the subject of the batch is not a proactor. The returned proactor is valid until pn_proactor_done() is called again on the same batch.

Returns
the proactor that is subject of the batch or NULL if none.

◆ pn_event_batch_listener()

PNP_EXTERN pn_listener_t * pn_event_batch_listener ( pn_event_batch_t batch)

Query the batch for the subject of the batch.

If it is a listener then it is returned. NULL means the subject of the batch is not a listener. The returned listener is valid until pn_proactor_done() is called again on the same batch.

Returns
the listener that is subject of the batch or NULL if none.

◆ pn_event_batch_connection()

PNP_EXTERN pn_connection_t * pn_event_batch_connection ( pn_event_batch_t batch)

Query the batch for the subject of the batch.

If it is a connection then it is returned. NULL means the subject of the batch is not a connection. The returned connection is valid until pn_proactor_done() is called again on the same batch.

Returns
the connection that is subject of the batch or NULL if none.
Examples
broker.c.

◆ pn_proactor_done()

PNP_EXTERN void pn_proactor_done ( pn_proactor_t proactor,
pn_event_batch_t events 
)

Call when finished handling a batch of events.

Must be called exactly once to match each call to pn_proactor_wait().

Note
Thread-safe: May be called from any thread provided the exactly once rule is respected.
Examples
broker.c, direct.c, receive.c, and send.c.

◆ pn_proactor_interrupt()

PNP_EXTERN void pn_proactor_interrupt ( pn_proactor_t proactor)

Return a PN_PROACTOR_INTERRUPT event as soon as possible.

At least one PN_PROACTOR_INTERRUPT event will be returned after this call. Interrupts can be "coalesced" - if several pn_proactor_interrupt() calls happen close together, there may be only one PN_PROACTOR_INTERRUPT event that occurs after all of them.

Note
Thread-safe and async-signal-safe: can be called in a signal handler. This is the only pn_proactor function that is async-signal-safe.
Examples
broker.c.

◆ pn_proactor_set_timeout()

PNP_EXTERN void pn_proactor_set_timeout ( pn_proactor_t proactor,
pn_millis_t  timeout 
)

Return a PN_PROACTOR_TIMEOUT after timeout milliseconds elapse.

If no threads are blocked in pn_proactor_wait() when the timeout elapses, the event will be delivered to the next available thread.

Calling pn_proactor_set_timeout() again before the PN_PROACTOR_TIMEOUT is delivered replaces the previous timeout value.

Note
Thread-safe

◆ pn_proactor_cancel_timeout()

PNP_EXTERN void pn_proactor_cancel_timeout ( pn_proactor_t proactor)

Cancel the pending timeout set by pn_proactor_set_timeout().

Does nothing if no timeout is set.

Note
Thread-safe

◆ pn_proactor_release_connection()

PNP_EXTERN void pn_proactor_release_connection ( pn_connection_t connection)

Release ownership of connection, disassociate it from its proactor.

The connection and related objects (sessions, links and so on) remain intact, but the transport is closed and unbound. The proactor will not return any more events for this connection. The caller must call pn_connection_free(), either directly or indirectly by re-using connection in another call to pn_proactor_connect() or pn_proactor_listen().

Note
Not thread-safe. Call this function from a connection event handler.
If connection does not belong to a proactor, this call does nothing.
This has nothing to do with pn_connection_release().

◆ pn_connection_wake()

PNP_EXTERN void pn_connection_wake ( pn_connection_t connection)

Return a PN_CONNECTION_WAKE event for connection as soon as possible.

At least one wake event will be returned, serialized with other Proactor events for the same connection. Wakes can be "coalesced" - if several pn_connection_wake() calls happen close together, there may be only one PN_CONNECTION_WAKE event that occurs after all of them.

Note
If connection does not belong to a proactor, this call does nothing.
Thread-safe
Examples
broker.c, and direct.c.

◆ pn_connection_write_flush()

PNP_EXTERN void pn_connection_write_flush ( pn_connection_t connection)

Unsettled API Send available AMQP protocol frames to the remote peer.

Generate as many currently availabe AMQP frames for connection that can be sent on the network to the remote peer without blocking. May help reduce latency, at the expense of extra processing overhead, for event handlers that spend a long time processing a single event batch. Has little effect if called soon before a call to pn_proactor_done().

Note
Not thread-safe. Call this function from a connection event handler.
If connection does not belong to a proactor, this call does nothing.

◆ pn_connection_proactor()

PNP_EXTERN pn_proactor_t * pn_connection_proactor ( pn_connection_t connection)

Return the proactor associated with a connection.

Note
Not thread-safe
Returns
the proactor or NULL if the connection does not belong to a proactor.

◆ pn_event_proactor()

PNP_EXTERN pn_proactor_t * pn_event_proactor ( pn_event_t event)

Return the proactor associated with an event.

Note
Not thread-safe
Returns
the proactor or NULL if the connection does not belong to a proactor.

◆ pn_proactor_now()

PNP_EXTERN pn_millis_t pn_proactor_now ( void  )
Deprecated:
Use pn_proactor_now_64()

Get the real elapsed time since an arbitrary point in the past in milliseconds.

This may be used as a portable way to get a process-local timestamp for the current time. It is monotonically increasing and will never go backwards.

Note: this is not a suitable value for an AMQP timestamp to be sent as part of a message. Such a timestamp should use the real time in milliseconds since the epoch.

Note
Thread-safe

◆ pn_proactor_now_64()

PNP_EXTERN int64_t pn_proactor_now_64 ( void  )

Get the real elapsed time since an arbitrary point in the past in milliseconds.

This may be used as a portable way to get a process-local timestamp for the current time. It is monotonically increasing and will never go backwards.

Note: this is not a suitable value for an AMQP timestamp to be sent as part of a message. Such a timestamp should use the real time in milliseconds since the epoch.

Note
Thread-safe

◆ pn_proactor_raw_connect()

PNP_EXTERN void pn_proactor_raw_connect ( pn_proactor_t proactor,
pn_raw_connection_t raw_connection,
const char *  addr 
)

Connect addr and bind to raw_connection.

Errors are returned as PN_RAW_CONNECTION_DISCONNECTED events by pn_proactor_wait().

Note
Thread-safe
Parameters
[in]proactorthe proactor
[in]raw_connectionthe application must create a raw connection with pn_raw_connection() this parameter cannot be null.

proactor takes ownership of raw_connection and will automatically call pn_raw_connection_free() after the final PN_RAW_CONNECTION_DISCONNECTED event is handled, or when pn_proactor_free() is called.

Parameters
[in]addrthe "host:port" network address, constructed by pn_proactor_addr() An empty host will connect to the local host via the default protocol (IPV6 or IPV4). An empty port will connect to the standard AMQP port (5672).