#include <stdio.h>
#include <stdlib.h>
typedef struct app_data_t {
const char *host, *port;
const char *amqp_address;
const char *container_id;
int message_count;
int sent;
int acknowledged;
} app_data_t;
static int exit_code = 0;
exit_code = 1;
}
}
static void send_message(app_data_t* app,
pn_link_t *sender) {
exit(1);
}
}
static bool handle(app_data_t* app,
pn_event_t* event) {
{
break;
}
}
while (
pn_link_credit(sender) > 0 && app->sent < app->message_count) {
++app->sent;
send_message(app, sender);
}
break;
}
if (++app->acknowledged == app->message_count) {
printf("%d messages sent and acknowledged\n", app->acknowledged);
}
} else {
exit_code=1;
}
break;
}
break;
break;
break;
break;
return false;
default: break;
}
return true;
}
void run(app_data_t *app) {
do {
if (!handle(app, e)) {
return;
}
}
} while(true);
}
int main(int argc, char **argv) {
struct app_data_t app = {0};
app.container_id = argv[0];
app.host = (argc > 1) ? argv[1] : "";
app.port = (argc > 2) ? argv[2] : "amqp";
app.amqp_address = (argc > 3) ? argv[3] : "examples";
app.message_count = (argc > 4) ? atoi(argv[4]) : 10;
run(&app);
free(app.message_buffer.start);
return exit_code;
}
A connection to a remote AMQP peer.
pn_bytes_t pn_bytes(size_t size, const char *start)
Create a pn_bytes_t.
A discriminated union that holds any scalar AMQP value.
Definition: codec.h:200
A non-const byte buffer.
Definition: types.h:235
const char * pn_condition_get_name(pn_condition_t *condition)
Returns the name associated with the exceptional condition, or NULL if there is no conditional inform...
bool pn_condition_is_set(pn_condition_t *condition)
Returns true if the condition object is holding some information, i.e.
struct pn_condition_t pn_condition_t
An AMQP Condition object.
Definition: condition.h:64
const char * pn_condition_get_description(pn_condition_t *condition)
Gets the description associated with the exceptional condition.
void pn_connection_open(pn_connection_t *connection)
Open a connection.
pn_condition_t * pn_connection_remote_condition(pn_connection_t *connection)
Get the remote condition associated with the connection endpoint.
struct pn_connection_t pn_connection_t
An AMQP Connection object.
Definition: types.h:285
void pn_connection_close(pn_connection_t *connection)
Close a connection.
void pn_connection_set_container(pn_connection_t *connection, const char *container)
Set the AMQP Container name advertised by a connection object.
bool pn_data_enter(pn_data_t *data)
Sets the parent node to the current node and clears the current node.
int pn_data_put_map(pn_data_t *data)
Puts an empty map value into a pn_data_t.
struct pn_data_t pn_data_t
An AMQP Data object.
Definition: codec.h:375
int pn_data_put_string(pn_data_t *data, pn_bytes_t string)
Puts a PN_STRING value.
bool pn_data_exit(pn_data_t *data)
Sets the current node to the parent node and the parent node to its own parent.
int pn_data_put_int(pn_data_t *data, int32_t i)
Puts a PN_INT value.
pn_delivery_t * pn_delivery(pn_link_t *link, pn_delivery_tag_t tag)
Create a delivery on a link.
uint64_t pn_delivery_remote_state(pn_delivery_t *delivery)
Get the remote disposition state for a delivery.
#define PN_ACCEPTED
The PN_ACCEPTED delivery state is a terminal state indicating that the delivery was successfully proc...
Definition: disposition.h:66
struct pn_delivery_t pn_delivery_t
An AMQP Delivery object.
Definition: types.h:405
pn_delivery_tag_t pn_dtag(const char *bytes, size_t size)
Construct a delivery tag.
const char * pn_error_text(pn_error_t *error)
Get the error text.
pn_delivery_t * pn_event_delivery(pn_event_t *event)
Get the delivery associated with an event.
pn_transport_t * pn_event_transport(pn_event_t *event)
Get the transport associated with an event.
const char * pn_event_type_name(pn_event_type_t type)
Get a human readable name for an event type.
pn_connection_t * pn_event_connection(pn_event_t *event)
Get the connection associated with an event.
pn_link_t * pn_event_link(pn_event_t *event)
Get the link associated with an event.
struct pn_event_t pn_event_t
Notification of a state change in the protocol engine.
Definition: event.h:75
pn_event_type_t pn_event_type(pn_event_t *event)
Get the type of an event.
pn_session_t * pn_event_session(pn_event_t *event)
Get the session associated with an event.
@ PN_LINK_REMOTE_CLOSE
The remote endpoint has closed the link.
Definition: event.h:223
@ PN_CONNECTION_REMOTE_CLOSE
The remote endpoint has closed the connection.
Definition: event.h:149
@ PN_CONNECTION_INIT
The connection has been created.
Definition: event.h:113
@ PN_TRANSPORT_CLOSED
Indicates that the both the head and tail of the transport are closed.
Definition: event.h:295
@ PN_SESSION_REMOTE_CLOSE
The remote endpoint has closed the session.
Definition: event.h:186
@ PN_LINK_FLOW
The flow control state for a link has changed.
Definition: event.h:241
@ PN_LINK_REMOTE_DETACH
The remote endpoint has detached the link.
Definition: event.h:235
@ PN_DELIVERY
A delivery has been created or updated.
Definition: event.h:254
@ PN_PROACTOR_INACTIVE
The proactor has become inactive: all listeners and connections were closed and the timeout (if set) ...
Definition: event.h:344
pn_link_t * pn_sender(pn_session_t *session, const char *name)
Construct a new sender on a session.
int pn_link_credit(pn_link_t *link)
Get the credit balance for a link.
struct pn_link_t pn_link_t
An AMQP Link object.
Definition: types.h:315
pn_condition_t * pn_link_remote_condition(pn_link_t *link)
Get the remote condition associated with a link endpoint.
pn_terminus_t * pn_link_target(pn_link_t *link)
Access the locally defined target definition for a link.
void pn_link_open(pn_link_t *link)
Open a link.
int pn_message_set_id(pn_message_t *msg, pn_msgid_t id)
Set the id for a message.
pn_data_t * pn_message_body(pn_message_t *msg)
Get and set the body of a message.
void pn_message_free(pn_message_t *msg)
Free a previously constructed pn_message_t.
ssize_t pn_message_send(pn_message_t *msg, pn_link_t *sender, pn_rwbytes_t *buf)
Unsettled API
void pn_message_clear(pn_message_t *msg)
Clears the content of a pn_message_t.
pn_error_t * pn_message_error(pn_message_t *msg)
Access the error information for a message.
pn_message_t * pn_message(void)
Construct a new pn_message_t.
struct pn_message_t pn_message_t
An AMQP Message object.
Definition: message.h:51
#define PN_MAX_ADDR
Size of buffer that can hold the largest connection or listening address.
Definition: proactor.h:74
PNP_EXTERN pn_event_batch_t * pn_proactor_wait(pn_proactor_t *proactor)
Wait until there are Proactor events to handle.
PNP_EXTERN void pn_proactor_free(pn_proactor_t *proactor)
Free the proactor.
struct pn_event_batch_t pn_event_batch_t
A batch of events that must be handled in sequence.
Definition: types.h:462
PNP_EXTERN pn_event_t * pn_event_batch_next(pn_event_batch_t *batch)
Remove the next event from the batch and return it.
PNP_EXTERN void pn_proactor_connect2(pn_proactor_t *proactor, pn_connection_t *connection, pn_transport_t *transport, const char *addr)
Connect transport to addr and bind to connection.
struct pn_proactor_t pn_proactor_t
A harness for multithreaded IO.
Definition: types.h:442
PNP_EXTERN pn_proactor_t * pn_proactor(void)
Create a proactor.
PNP_EXTERN void pn_proactor_done(pn_proactor_t *proactor, pn_event_batch_t *events)
Call when finished handling a batch of events.
PNP_EXTERN int pn_proactor_addr(char *addr, size_t size, const char *host, const char *port)
Format a host:port address string for pn_proactor_connect() or pn_proactor_listen()
pn_session_t * pn_session(pn_connection_t *connection)
Factory for creating a new session on a given connection object.
void pn_session_open(pn_session_t *session)
Open a session.
struct pn_session_t pn_session_t
An AMQP Session object.
Definition: types.h:296
pn_condition_t * pn_session_remote_condition(pn_session_t *session)
Get the remote condition associated with the session endpoint.
int pn_terminus_set_address(pn_terminus_t *terminus, const char *address)
Set the address of a terminus object.
pn_condition_t * pn_transport_condition(pn_transport_t *transport)
Get additional information about the condition of the transport.
A channel for transferring messages.
A mutable holder of application content.
Unsettled API - An API for multithreaded IO.
A network channel supporting an AMQP connection.