Subscribes to the 'example' node and prints the message bodies received.
Subscribes to the 'example' node and prints the message bodies received.Can be used with broker.c, direct.c or an external AMQP broker.
#include <proton/object.h>
#include <stdio.h>
#include <stdlib.h>
typedef struct app_data_t {
const char *host, *port;
const char *amqp_address;
const char *container_id;
int message_count;
int received;
bool finished;
} app_data_t;
static const int BATCH = 1000;
static int exit_code = 0;
exit_code = 1;
}
}
if (!err) {
printf("%s\n", s);
free(s);
free(data.start);
} else {
fprintf(stderr,
"decode_message: %s\n",
pn_code(err));
exit_code = 1;
}
}
static bool handle(app_data_t* app,
pn_event_t* event) {
{
pn_link_flow(l, app->message_count ? app->message_count : BATCH);
}
} break;
int recv;
size_t oldsize = m->size;
m->size += size;
m->start = (char*)realloc(m->start, m->size);
printf("Message aborted\n");
m->size = 0;
}
else if (recv < 0 && recv !=
PN_EOS) {
decode_message(*m);
*m = pn_rwbytes_null;
if (app->message_count == 0) {
}
} else if (++app->received >= app->message_count) {
printf("%d messages received\n", app->received);
}
}
}
break;
}
break;
break;
break;
break;
return false;
break;
default:
break;
}
return true;
}
void run(app_data_t *app) {
do {
if (!handle(app, e) || exit_code != 0) {
return;
}
}
} while(true);
}
int main(int argc, char **argv) {
struct app_data_t app = {0};
app.container_id = argv[0];
app.host = (argc > 1) ? argv[1] : "";
app.port = (argc > 2) ? argv[2] : "amqp";
app.amqp_address = (argc > 3) ? argv[3] : "examples";
app.message_count = (argc > 4) ? atoi(argv[4]) : 10;
run(&app);
return exit_code;
}
A connection to a remote AMQP peer.
A non-const byte buffer.
Definition types.h:246
bool pn_condition_is_set(pn_condition_t *condition)
Returns true if the condition object is holding some information, i.e.
struct pn_condition_t pn_condition_t
An AMQP Condition object.
Definition condition.h:65
int pn_condition_format(pn_condition_t *, const char *name, PN_PRINTF_FORMAT const char *fmt,...) PN_PRINTF_FORMAT_ATTR(3
Set the name and printf-style formatted description.
const char * pn_condition_get_description(pn_condition_t *condition)
Gets the description associated with the exceptional condition.
const char * pn_condition_get_name(pn_condition_t *condition)
Returns the name associated with the exceptional condition, or NULL if there is no conditional inform...
void pn_connection_open(pn_connection_t *connection)
Open a connection.
struct pn_connection_t pn_connection_t
An AMQP Connection object.
Definition types.h:300
void pn_connection_close(pn_connection_t *connection)
Close a connection.
void pn_connection_set_container(pn_connection_t *connection, const char *container)
Set the AMQP Container name advertised by a connection object.
pn_condition_t * pn_connection_remote_condition(pn_connection_t *connection)
Get the remote condition associated with the connection endpoint.
bool pn_delivery_readable(pn_delivery_t *delivery)
Check if a delivery is readable.
size_t pn_delivery_pending(pn_delivery_t *delivery)
Get the amount of pending message data for a delivery.
bool pn_delivery_partial(pn_delivery_t *delivery)
Check if a delivery only has partial message data.
void pn_delivery_update(pn_delivery_t *delivery, uint64_t state)
Update the disposition of a delivery.
pn_link_t * pn_delivery_link(pn_delivery_t *delivery)
Get the parent link for a delivery object.
void pn_delivery_settle(pn_delivery_t *delivery)
Settle a delivery.
#define PN_ACCEPTED
The PN_ACCEPTED delivery state is a terminal state indicating that the delivery was successfully proc...
Definition disposition.h:66
struct pn_delivery_t pn_delivery_t
An AMQP Delivery object.
Definition types.h:420
#define PN_ABORTED
Delivery aborted error.
Definition error.h:58
#define PN_EOS
End of stream.
Definition error.h:48
const char * pn_code(int code)
Get the name of the error code.
pn_delivery_t * pn_event_delivery(pn_event_t *event)
Get the delivery associated with an event.
pn_session_t * pn_event_session(pn_event_t *event)
Get the session associated with an event.
const char * pn_event_type_name(pn_event_type_t type)
Get a human readable name for an event type.
pn_link_t * pn_event_link(pn_event_t *event)
Get the link associated with an event.
struct pn_event_t pn_event_t
Notification of a state change in the protocol engine.
Definition event.h:75
pn_connection_t * pn_event_connection(pn_event_t *event)
Get the connection associated with an event.
pn_event_type_t pn_event_type(pn_event_t *event)
Get the type of an event.
pn_transport_t * pn_event_transport(pn_event_t *event)
Get the transport associated with an event.
@ PN_LINK_REMOTE_CLOSE
The remote endpoint has closed the link.
Definition event.h:223
@ PN_CONNECTION_REMOTE_CLOSE
The remote endpoint has closed the connection.
Definition event.h:149
@ PN_CONNECTION_INIT
The connection has been created.
Definition event.h:113
@ PN_TRANSPORT_CLOSED
Indicates that the both the head and tail of the transport are closed.
Definition event.h:295
@ PN_SESSION_REMOTE_CLOSE
The remote endpoint has closed the session.
Definition event.h:186
@ PN_LINK_REMOTE_DETACH
The remote endpoint has detached the link.
Definition event.h:235
@ PN_DELIVERY
A delivery has been created or updated.
Definition event.h:254
@ PN_PROACTOR_INACTIVE
The proactor has become inactive: all listeners and connections were closed and the timeout (if set) ...
Definition event.h:344
ssize_t pn_link_recv(pn_link_t *receiver, char *bytes, size_t n)
Receive message data for the current delivery on a link.
pn_session_t * pn_link_session(pn_link_t *link)
Get the parent session for a link object.
pn_condition_t * pn_link_remote_condition(pn_link_t *link)
Get the remote condition associated with a link endpoint.
pn_terminus_t * pn_link_source(pn_link_t *link)
Access the locally defined source definition for a link.
void pn_link_close(pn_link_t *link)
Close a link.
pn_condition_t * pn_link_condition(pn_link_t *link)
Get the local condition associated with a link endpoint.
int pn_link_credit(pn_link_t *link)
Get the credit balance for a link.
pn_link_t * pn_receiver(pn_session_t *session, const char *name)
Construct a new receiver on a session.
struct pn_link_t pn_link_t
An AMQP Link object.
Definition types.h:330
void pn_link_open(pn_link_t *link)
Open a link.
void pn_link_flow(pn_link_t *receiver, int credit)
Grant credit for incoming deliveries on a receiver.
pn_message_t * pn_message(void)
Construct a new pn_message_t.
void pn_message_free(pn_message_t *msg)
Free a previously constructed pn_message_t.
pn_data_t * pn_message_body(pn_message_t *msg)
Get and set the body of a message.
int pn_message_decode(pn_message_t *msg, const char *bytes, size_t size)
Decode/load message content from AMQP formatted binary data.
struct pn_message_t pn_message_t
An AMQP Message object.
Definition message.h:51
PNP_EXTERN pn_proactor_t * pn_proactor(void)
Create a proactor.
PNP_EXTERN pn_event_t * pn_event_batch_next(pn_event_batch_t *batch)
Remove the next event from the batch and return it.
#define PN_MAX_ADDR
Size of buffer that can hold the largest connection or listening address.
Definition proactor.h:74
PNP_EXTERN pn_event_batch_t * pn_proactor_wait(pn_proactor_t *proactor)
Wait until there are Proactor events to handle.
PNP_EXTERN void pn_proactor_free(pn_proactor_t *proactor)
Free the proactor.
struct pn_event_batch_t pn_event_batch_t
A batch of events that must be handled in sequence.
Definition types.h:477
PNP_EXTERN void pn_proactor_connect2(pn_proactor_t *proactor, pn_connection_t *connection, pn_transport_t *transport, const char *addr)
Connect transport to addr and bind to connection.
struct pn_proactor_t pn_proactor_t
A harness for multithreaded IO.
Definition types.h:457
PNP_EXTERN void pn_proactor_done(pn_proactor_t *proactor, pn_event_batch_t *events)
Call when finished handling a batch of events.
PNP_EXTERN int pn_proactor_addr(char *addr, size_t size, const char *host, const char *port)
Format a host:port address string for pn_proactor_connect() or pn_proactor_listen()
pn_condition_t * pn_session_remote_condition(pn_session_t *session)
Get the remote condition associated with the session endpoint.
void pn_session_close(pn_session_t *session)
Close a session.
pn_connection_t * pn_session_connection(pn_session_t *session)
Get the parent connection for a session object.
void pn_session_open(pn_session_t *session)
Open a session.
struct pn_session_t pn_session_t
An AMQP Session object.
Definition types.h:311
pn_session_t * pn_session(pn_connection_t *connection)
Factory for creating a new session on a given connection object.
int pn_terminus_set_address(pn_terminus_t *terminus, const char *address)
Set the address of a terminus object.
pn_condition_t * pn_transport_condition(pn_transport_t *transport)
Get additional information about the condition of the transport.
A channel for transferring messages.
A mutable holder of application content.
Unsettled API - An API for multithreaded IO.
A network channel supporting an AMQP connection.