Qpid Proton C API  0.37.0
direct.c

A server that can be used to demonstrate direct (no broker) peer-to-peer communication It can accept an incoming connection from either the send.c or receive.c examples and will act as the directly-connected counterpart (receive or send)

/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include <proton/link.h>
#include <proton/netaddr.h>
#include <proton/message.h>
#include <proton/sasl.h>
#include <proton/session.h>
#include <stdio.h>
#include <stdlib.h>
typedef struct app_data_t {
const char *host, *port;
const char *amqp_address;
const char *container_id;
int message_count;
pn_proactor_t *proactor;
pn_listener_t *listener;
pn_rwbytes_t msgin, msgout; /* Buffers for incoming/outgoing messages */
/* Sender values */
int sent;
int acknowledged;
pn_link_t *sender;
/* Receiver values */
int received;
} app_data_t;
static const int BATCH = 1000; /* Batch size for unlimited receive */
static int exit_code = 0;
/* Close the connection and the listener so so we will get a
* PN_PROACTOR_INACTIVE event and exit, once all outstanding events
* are processed.
*/
static void close_all(pn_connection_t *c, app_data_t *app) {
if (app->listener) pn_listener_close(app->listener);
}
static void check_condition(pn_event_t *e, pn_condition_t *cond, app_data_t *app) {
if (pn_condition_is_set(cond)) {
fprintf(stderr, "%s: %s: %s\n", pn_event_type_name(pn_event_type(e)),
close_all(pn_event_connection(e), app);
exit_code = 1;
}
}
/* Create a message with a map { "sequence" : number } encode it and return the encoded buffer. */
static void send_message(app_data_t *app, pn_link_t *sender) {
/* Construct a message with the map { "sequence": app.sent } */
pn_message_t* message = pn_message();
pn_data_t* body = pn_message_body(message);
pn_message_set_id(message, (pn_atom_t){.type=PN_ULONG, .u.as_ulong=app->sent});
pn_data_put_string(body, pn_bytes(sizeof("sequence")-1, "sequence"));
pn_data_put_int(body, app->sent); /* The sequence number */
pn_data_exit(body);
if (pn_message_send(message, sender, &app->msgout) < 0) {
fprintf(stderr, "send error: %s\n", pn_error_text(pn_message_error(message)));
exit_code = 1;
}
pn_message_free(message);
}
static void decode_message(pn_rwbytes_t data) {
int err = pn_message_decode(m, data.start, data.size);
if (!err) {
/* Print the decoded message */
pn_string_t *s = pn_string(NULL);
pn_inspect(pn_message_body(m), s);
printf("%s\n", pn_string_get(s));
fflush(stdout);
pn_free(s);
free(data.start);
} else {
fprintf(stderr, "decode error: %s\n", pn_error_text(pn_message_error(m)));
exit_code = 1;
}
}
/* This function handles events when we are acting as the receiver */
static void handle_receive(app_data_t *app, pn_event_t* event) {
switch (pn_event_type(event)) {
pn_link_t *l = pn_event_link(event);
pn_link_flow(l, app->message_count ? app->message_count : BATCH);
} break;
case PN_DELIVERY: { /* Incoming message data */
size_t size = pn_delivery_pending(d);
pn_rwbytes_t* m = &app->msgin; /* Append data to incoming message buffer */
ssize_t recv;
m->size += size;
m->start = (char*)realloc(m->start, m->size);
recv = pn_link_recv(l, m->start, m->size);
if (recv == PN_ABORTED) {
printf("Message aborted\n");
fflush(stdout);
m->size = 0; /* Forget the data we accumulated */
pn_delivery_settle(d); /* Free the delivery so we can receive the next message */
pn_link_flow(l, 1); /* Replace credit for aborted message */
} else if (recv < 0 && recv != PN_EOS) { /* Unexpected error */
pn_condition_format(pn_link_condition(l), "broker", "PN_DELIVERY error: %s", pn_code(recv));
pn_link_close(l); /* Unexpected error, close the link */
} else if (!pn_delivery_partial(d)) { /* Message is complete */
decode_message(*m);
*m = pn_rwbytes_null;
pn_delivery_settle(d); /* settle and free d */
if (app->message_count == 0) {
/* receive forever - see if more credit is needed */
if (pn_link_credit(l) < BATCH/2) {
pn_link_flow(l, BATCH - pn_link_credit(l));
}
} else if (++app->received >= app->message_count) {
printf("%d messages received\n", app->received);
close_all(pn_event_connection(event), app);
}
}
}
break;
}
default:
break;
}
}
/* This function handles events when we are acting as the sender */
static void handle_send(app_data_t* app, pn_event_t* event) {
switch (pn_event_type(event)) {
pn_link_t* l = pn_event_link(event);
pn_terminus_set_address(pn_link_target(l), app->amqp_address);
} break;
case PN_LINK_FLOW: {
/* The peer has given us some credit, now we can send messages */
pn_link_t *sender = pn_event_link(event);
while (pn_link_credit(sender) > 0 && app->sent < app->message_count) {
++app->sent;
/* Use sent counter as unique delivery tag. */
pn_delivery(sender, pn_dtag((const char *)&app->sent, sizeof(app->sent)));
send_message(app, sender);
}
break;
}
case PN_DELIVERY: {
/* We received acknowledgement from the peer that a message was delivered. */
if (++app->acknowledged == app->message_count) {
printf("%d messages sent and acknowledged\n", app->acknowledged);
close_all(pn_event_connection(event), app);
}
}
} break;
default:
break;
}
}
/* Handle all events, delegate to handle_send or handle_receive depending on link mode.
Return true to continue, false to exit
*/
static bool handle(app_data_t* app, pn_event_t* event) {
switch (pn_event_type(event)) {
char port[256]; /* Get the listening port */
pn_netaddr_host_port(pn_listener_addr(pn_event_listener(event)), NULL, 0, port, sizeof(port));
printf("listening on %s\n", port);
fflush(stdout);
break;
}
break;
break;
/* Turn off security */
pn_sasl_allowed_mechs(pn_sasl(t), "ANONYMOUS");
break;
}
pn_connection_open(pn_event_connection(event)); /* Complete the open */
break;
}
break;
}
check_condition(event, pn_transport_condition(pn_event_transport(event)), app);
break;
check_condition(event, pn_connection_remote_condition(pn_event_connection(event)), app);
pn_connection_close(pn_event_connection(event)); /* Return the close */
break;
check_condition(event, pn_session_remote_condition(pn_event_session(event)), app);
pn_session_close(pn_event_session(event)); /* Return the close */
break;
check_condition(event, pn_link_remote_condition(pn_event_link(event)), app);
pn_link_close(pn_event_link(event)); /* Return the close */
break;
/* Wake the sender's connection */
break;
app->listener = NULL; /* Listener is closed */
check_condition(event, pn_listener_condition(pn_event_listener(event)), app);
break;
return false;
break;
default: {
pn_link_t *l = pn_event_link(event);
if (l) { /* Only delegate link-related events */
handle_send(app, event);
} else {
handle_receive(app, event);
}
}
}
}
return exit_code == 0;
}
void run(app_data_t *app) {
/* Loop and handle events */
do {
pn_event_batch_t *events = pn_proactor_wait(app->proactor);
for (e = pn_event_batch_next(events); e; e = pn_event_batch_next(events)) {
if (!handle(app, e)) {
return;
}
}
pn_proactor_done(app->proactor, events);
} while(true);
}
int main(int argc, char **argv) {
struct app_data_t app = {0};
char addr[PN_MAX_ADDR];
app.container_id = argv[0]; /* Should be unique */
app.host = (argc > 1) ? argv[1] : "";
app.port = (argc > 2) ? argv[2] : "amqp";
app.amqp_address = (argc > 3) ? argv[3] : "examples";
app.message_count = (argc > 4) ? atoi(argv[4]) : 10;
/* Create the proactor and connect */
app.proactor = pn_proactor();
app.listener = pn_listener();
pn_proactor_addr(addr, sizeof(addr), app.host, app.port);
pn_proactor_listen(app.proactor, app.listener, addr, 16);
run(&app);
pn_proactor_free(app.proactor);
free(app.msgout.start);
free(app.msgin.start);
return exit_code;
}
An endpoint error state.
A connection to a remote AMQP peer.
A message transfer.
@ PN_ULONG
The ulong AMQP type.
Definition: codec.h:96
pn_bytes_t pn_bytes(size_t size, const char *start)
Create a pn_bytes_t.
A discriminated union that holds any scalar AMQP value.
Definition: codec.h:200
A non-const byte buffer.
Definition: types.h:235
const char * pn_condition_get_name(pn_condition_t *condition)
Returns the name associated with the exceptional condition, or NULL if there is no conditional inform...
bool pn_condition_is_set(pn_condition_t *condition)
Returns true if the condition object is holding some information, i.e.
struct pn_condition_t pn_condition_t
An AMQP Condition object.
Definition: condition.h:64
int pn_condition_format(pn_condition_t *, const char *name, const char *fmt,...)
Set the name and printf-style formatted description.
const char * pn_condition_get_description(pn_condition_t *condition)
Gets the description associated with the exceptional condition.
void pn_connection_open(pn_connection_t *connection)
Open a connection.
pn_condition_t * pn_connection_remote_condition(pn_connection_t *connection)
Get the remote condition associated with the connection endpoint.
struct pn_connection_t pn_connection_t
An AMQP Connection object.
Definition: types.h:285
void pn_connection_close(pn_connection_t *connection)
Close a connection.
void pn_connection_set_container(pn_connection_t *connection, const char *container)
Set the AMQP Container name advertised by a connection object.
bool pn_data_enter(pn_data_t *data)
Sets the parent node to the current node and clears the current node.
int pn_data_put_map(pn_data_t *data)
Puts an empty map value into a pn_data_t.
struct pn_data_t pn_data_t
An AMQP Data object.
Definition: codec.h:375
int pn_data_put_string(pn_data_t *data, pn_bytes_t string)
Puts a PN_STRING value.
bool pn_data_exit(pn_data_t *data)
Sets the current node to the parent node and the parent node to its own parent.
int pn_data_put_int(pn_data_t *data, int32_t i)
Puts a PN_INT value.
bool pn_delivery_readable(pn_delivery_t *delivery)
Check if a delivery is readable.
size_t pn_delivery_pending(pn_delivery_t *delivery)
Get the amount of pending message data for a delivery.
bool pn_delivery_partial(pn_delivery_t *delivery)
Check if a delivery only has partial message data.
void pn_delivery_update(pn_delivery_t *delivery, uint64_t state)
Update the disposition of a delivery.
pn_delivery_t * pn_delivery(pn_link_t *link, pn_delivery_tag_t tag)
Create a delivery on a link.
void pn_delivery_settle(pn_delivery_t *delivery)
Settle a delivery.
uint64_t pn_delivery_remote_state(pn_delivery_t *delivery)
Get the remote disposition state for a delivery.
#define PN_ACCEPTED
The PN_ACCEPTED delivery state is a terminal state indicating that the delivery was successfully proc...
Definition: disposition.h:66
struct pn_delivery_t pn_delivery_t
An AMQP Delivery object.
Definition: types.h:405
pn_link_t * pn_delivery_link(pn_delivery_t *delivery)
Get the parent link for a delivery object.
pn_delivery_tag_t pn_dtag(const char *bytes, size_t size)
Construct a delivery tag.
const char * pn_error_text(pn_error_t *error)
Get the error text.
#define PN_ABORTED
Delivery aborted error.
Definition: error.h:57
#define PN_EOS
End of stream.
Definition: error.h:47
const char * pn_code(int code)
Get the name of the error code.
pn_delivery_t * pn_event_delivery(pn_event_t *event)
Get the delivery associated with an event.
pn_transport_t * pn_event_transport(pn_event_t *event)
Get the transport associated with an event.
const char * pn_event_type_name(pn_event_type_t type)
Get a human readable name for an event type.
pn_connection_t * pn_event_connection(pn_event_t *event)
Get the connection associated with an event.
pn_link_t * pn_event_link(pn_event_t *event)
Get the link associated with an event.
struct pn_event_t pn_event_t
Notification of a state change in the protocol engine.
Definition: event.h:75
pn_event_type_t pn_event_type(pn_event_t *event)
Get the type of an event.
pn_session_t * pn_event_session(pn_event_t *event)
Get the session associated with an event.
@ PN_LISTENER_ACCEPT
Indicates the listener has an incoming connection, call pn_listener_accept2() to accept it.
Definition: event.h:316
@ PN_LINK_REMOTE_CLOSE
The remote endpoint has closed the link.
Definition: event.h:223
@ PN_CONNECTION_REMOTE_CLOSE
The remote endpoint has closed the connection.
Definition: event.h:149
@ PN_CONNECTION_INIT
The connection has been created.
Definition: event.h:113
@ PN_TRANSPORT_CLOSED
Indicates that the both the head and tail of the transport are closed.
Definition: event.h:295
@ PN_SESSION_REMOTE_CLOSE
The remote endpoint has closed the session.
Definition: event.h:186
@ PN_LINK_REMOTE_OPEN
The remote endpoint has opened the link.
Definition: event.h:211
@ PN_PROACTOR_TIMEOUT
Timeout set by pn_proactor_set_timeout() time limit expired.
Definition: event.h:334
@ PN_LINK_FLOW
The flow control state for a link has changed.
Definition: event.h:241
@ PN_LINK_REMOTE_DETACH
The remote endpoint has detached the link.
Definition: event.h:235
@ PN_LISTENER_OPEN
The listener is listening.
Definition: event.h:350
@ PN_DELIVERY
A delivery has been created or updated.
Definition: event.h:254
@ PN_PROACTOR_INACTIVE
The proactor has become inactive: all listeners and connections were closed and the timeout (if set) ...
Definition: event.h:344
@ PN_CONNECTION_REMOTE_OPEN
The remote endpoint has opened the connection.
Definition: event.h:137
@ PN_LISTENER_CLOSE
Indicates the listener has closed.
Definition: event.h:322
@ PN_CONNECTION_BOUND
The connection has been bound to a transport.
Definition: event.h:119
@ PN_SESSION_REMOTE_OPEN
The remote endpoint has opened the session.
Definition: event.h:174
PNP_EXTERN void pn_listener_close(pn_listener_t *l)
Close the listener.
PNP_EXTERN void pn_listener_accept2(pn_listener_t *listener, pn_connection_t *connection, pn_transport_t *transport)
Accept an incoming connection request using transport and connection, which can be configured before ...
PNP_EXTERN pn_listener_t * pn_event_listener(pn_event_t *event)
Return the listener associated with an event.
struct pn_listener_t pn_listener_t
A listener for incoming connections.
Definition: types.h:424
PNP_EXTERN pn_condition_t * pn_listener_condition(pn_listener_t *l)
Get the error condition for a listener.
PNP_EXTERN pn_listener_t * pn_listener(void)
Create a listener to pass to pn_proactor_listen()
int pn_message_set_id(pn_message_t *msg, pn_msgid_t id)
Set the id for a message.
pn_data_t * pn_message_body(pn_message_t *msg)
Get and set the body of a message.
void pn_message_free(pn_message_t *msg)
Free a previously constructed pn_message_t.
ssize_t pn_message_send(pn_message_t *msg, pn_link_t *sender, pn_rwbytes_t *buf)
Unsettled API
pn_error_t * pn_message_error(pn_message_t *msg)
Access the error information for a message.
pn_message_t * pn_message(void)
Construct a new pn_message_t.
int pn_message_decode(pn_message_t *msg, const char *bytes, size_t size)
Decode/load message content from AMQP formatted binary data.
struct pn_message_t pn_message_t
An AMQP Message object.
Definition: message.h:51
PNP_EXTERN void pn_connection_wake(pn_connection_t *connection)
Return a PN_CONNECTION_WAKE event for connection as soon as possible.
#define PN_MAX_ADDR
Size of buffer that can hold the largest connection or listening address.
Definition: proactor.h:74
PNP_EXTERN pn_event_batch_t * pn_proactor_wait(pn_proactor_t *proactor)
Wait until there are Proactor events to handle.
PNP_EXTERN void pn_proactor_free(pn_proactor_t *proactor)
Free the proactor.
struct pn_event_batch_t pn_event_batch_t
A batch of events that must be handled in sequence.
Definition: types.h:462
PNP_EXTERN pn_event_t * pn_event_batch_next(pn_event_batch_t *batch)
Remove the next event from the batch and return it.
PNP_EXTERN int pn_netaddr_host_port(const pn_netaddr_t *na, char *host, size_t hlen, char *port, size_t plen)
Get the host and port name from na as separate strings.
struct pn_proactor_t pn_proactor_t
A harness for multithreaded IO.
Definition: types.h:442
PNP_EXTERN pn_proactor_t * pn_proactor(void)
Create a proactor.
PNP_EXTERN void pn_proactor_listen(pn_proactor_t *proactor, pn_listener_t *listener, const char *addr, int backlog)
Start listening for incoming connections.
PNP_EXTERN const pn_netaddr_t * pn_listener_addr(pn_listener_t *l)
Get the listening addresses of a listener.
PNP_EXTERN void pn_proactor_done(pn_proactor_t *proactor, pn_event_batch_t *events)
Call when finished handling a batch of events.
PNP_EXTERN int pn_proactor_addr(char *addr, size_t size, const char *host, const char *port)
Format a host:port address string for pn_proactor_connect() or pn_proactor_listen()
void pn_sasl_allowed_mechs(pn_sasl_t *sasl, const char *mechs)
SASL mechanisms that are to be considered for authentication.
pn_sasl_t * pn_sasl(pn_transport_t *transport)
Construct an Authentication and Security Layer object.
void pn_session_free(pn_session_t *session)
Free a session object.
void pn_session_close(pn_session_t *session)
Close a session.
void pn_session_open(pn_session_t *session)
Open a session.
pn_connection_t * pn_session_connection(pn_session_t *session)
Get the parent connection for a session object.
pn_condition_t * pn_session_remote_condition(pn_session_t *session)
Get the remote condition associated with the session endpoint.
int pn_terminus_set_address(pn_terminus_t *terminus, const char *address)
Set the address of a terminus object.
void pn_transport_require_auth(pn_transport_t *transport, bool required)
Set whether a non-authenticated transport connection is allowed.
struct pn_transport_t pn_transport_t
A network channel supporting an AMQP connection.
Definition: types.h:435
pn_condition_t * pn_transport_condition(pn_transport_t *transport)
Get additional information about the condition of the transport.
Unsettled API - A listener for incoming connections.
A mutable holder of application content.
Unsettled API - The network address of a proactor transport.
Unsettled API - An API for multithreaded IO.
SASL secure transport layer.
A container of links.
A network channel supporting an AMQP connection.