Qpid Proton C API 0.39.0
 
Loading...
Searching...
No Matches
receive.c

Subscribes to the 'example' node and prints the message bodies received.

Subscribes to the 'example' node and prints the message bodies received.Can be used with broker.c, direct.c or an external AMQP broker.

/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include <proton/link.h>
#include <proton/message.h>
#include <proton/object.h>
#include <proton/session.h>
#include <stdio.h>
#include <stdlib.h>
typedef struct app_data_t {
const char *host, *port;
const char *amqp_address;
const char *container_id;
int message_count;
pn_proactor_t *proactor;
int received;
bool finished;
pn_rwbytes_t msgin; /* Partially received message */
} app_data_t;
static const int BATCH = 1000; /* Batch size for unlimited receive */
static int exit_code = 0;
static void check_condition(pn_event_t *e, pn_condition_t *cond) {
if (pn_condition_is_set(cond)) {
fprintf(stderr, "%s: %s: %s\n", pn_event_type_name(pn_event_type(e)),
exit_code = 1;
}
}
static void decode_message(pn_rwbytes_t data) {
int err = pn_message_decode(m, data.start, data.size);
if (!err) {
/* Print the decoded message */
char *s = pn_tostring(pn_message_body(m));
printf("%s\n", s);
free(s);
free(data.start);
} else {
fprintf(stderr, "decode_message: %s\n", pn_code(err));
exit_code = 1;
}
}
/* Return true to continue, false to exit */
static bool handle(app_data_t* app, pn_event_t* event) {
switch (pn_event_type(event)) {
pn_connection_set_container(c, app->container_id);
{
pn_link_t* l = pn_receiver(s, "my_receiver");
pn_terminus_set_address(pn_link_source(l), app->amqp_address);
/* cannot receive without granting credit: */
pn_link_flow(l, app->message_count ? app->message_count : BATCH);
}
} break;
case PN_DELIVERY: {
/* A message (or part of a message) has been received */
size_t size = pn_delivery_pending(d);
pn_rwbytes_t* m = &app->msgin; /* Append data to incoming message buffer */
int recv;
size_t oldsize = m->size;
m->size += size;
m->start = (char*)realloc(m->start, m->size);
recv = pn_link_recv(l, m->start + oldsize, m->size);
if (recv == PN_ABORTED) {
printf("Message aborted\n");
m->size = 0; /* Forget the data we accumulated */
pn_delivery_settle(d); /* Free the delivery so we can receive the next message */
pn_link_flow(l, 1); /* Replace credit for aborted message */
} else if (recv < 0 && recv != PN_EOS) { /* Unexpected error */
pn_condition_format(pn_link_condition(l), "broker", "PN_DELIVERY error: %s", pn_code(recv));
pn_link_close(l); /* Unexpected error, close the link */
} else if (!pn_delivery_partial(d)) { /* Message is complete */
decode_message(*m);
*m = pn_rwbytes_null; /* Reset the buffer for the next message*/
/* Accept the delivery */
pn_delivery_settle(d); /* settle and free d */
if (app->message_count == 0) {
/* receive forever - see if more credit is needed */
if (pn_link_credit(l) < BATCH/2) {
/* Grant enough credit to bring it up to BATCH: */
pn_link_flow(l, BATCH - pn_link_credit(l));
}
} else if (++app->received >= app->message_count) {
printf("%d messages received\n", app->received);
}
}
}
break;
}
check_condition(event, pn_transport_condition(pn_event_transport(event)));
break;
check_condition(event, pn_connection_remote_condition(pn_event_connection(event)));
break;
check_condition(event, pn_session_remote_condition(pn_event_session(event)));
break;
check_condition(event, pn_link_remote_condition(pn_event_link(event)));
break;
return false;
break;
default:
break;
}
return true;
}
void run(app_data_t *app) {
/* Loop and handle events */
do {
pn_event_batch_t *events = pn_proactor_wait(app->proactor);
for (e = pn_event_batch_next(events); e; e = pn_event_batch_next(events)) {
if (!handle(app, e) || exit_code != 0) {
return;
}
}
pn_proactor_done(app->proactor, events);
} while(true);
}
int main(int argc, char **argv) {
struct app_data_t app = {0};
char addr[PN_MAX_ADDR];
app.container_id = argv[0]; /* Should be unique */
app.host = (argc > 1) ? argv[1] : "";
app.port = (argc > 2) ? argv[2] : "amqp";
app.amqp_address = (argc > 3) ? argv[3] : "examples";
app.message_count = (argc > 4) ? atoi(argv[4]) : 10;
/* Create the proactor and connect */
app.proactor = pn_proactor();
pn_proactor_addr(addr, sizeof(addr), app.host, app.port);
pn_proactor_connect2(app.proactor, NULL, NULL, addr);
run(&app);
pn_proactor_free(app.proactor);
return exit_code;
}
An endpoint error state.
A connection to a remote AMQP peer.
A message transfer.
A non-const byte buffer.
Definition: types.h:235
bool pn_condition_is_set(pn_condition_t *condition)
Returns true if the condition object is holding some information, i.e.
struct pn_condition_t pn_condition_t
An AMQP Condition object.
Definition: condition.h:64
int pn_condition_format(pn_condition_t *, const char *name, const char *fmt,...)
Set the name and printf-style formatted description.
const char * pn_condition_get_description(pn_condition_t *condition)
Gets the description associated with the exceptional condition.
const char * pn_condition_get_name(pn_condition_t *condition)
Returns the name associated with the exceptional condition, or NULL if there is no conditional inform...
void pn_connection_open(pn_connection_t *connection)
Open a connection.
struct pn_connection_t pn_connection_t
An AMQP Connection object.
Definition: types.h:285
void pn_connection_close(pn_connection_t *connection)
Close a connection.
void pn_connection_set_container(pn_connection_t *connection, const char *container)
Set the AMQP Container name advertised by a connection object.
pn_condition_t * pn_connection_remote_condition(pn_connection_t *connection)
Get the remote condition associated with the connection endpoint.
bool pn_delivery_readable(pn_delivery_t *delivery)
Check if a delivery is readable.
size_t pn_delivery_pending(pn_delivery_t *delivery)
Get the amount of pending message data for a delivery.
bool pn_delivery_partial(pn_delivery_t *delivery)
Check if a delivery only has partial message data.
void pn_delivery_update(pn_delivery_t *delivery, uint64_t state)
Update the disposition of a delivery.
pn_link_t * pn_delivery_link(pn_delivery_t *delivery)
Get the parent link for a delivery object.
void pn_delivery_settle(pn_delivery_t *delivery)
Settle a delivery.
#define PN_ACCEPTED
The PN_ACCEPTED delivery state is a terminal state indicating that the delivery was successfully proc...
Definition: disposition.h:66
struct pn_delivery_t pn_delivery_t
An AMQP Delivery object.
Definition: types.h:405
#define PN_ABORTED
Delivery aborted error.
Definition: error.h:57
#define PN_EOS
End of stream.
Definition: error.h:47
const char * pn_code(int code)
Get the name of the error code.
pn_delivery_t * pn_event_delivery(pn_event_t *event)
Get the delivery associated with an event.
pn_session_t * pn_event_session(pn_event_t *event)
Get the session associated with an event.
const char * pn_event_type_name(pn_event_type_t type)
Get a human readable name for an event type.
pn_link_t * pn_event_link(pn_event_t *event)
Get the link associated with an event.
struct pn_event_t pn_event_t
Notification of a state change in the protocol engine.
Definition: event.h:75
pn_connection_t * pn_event_connection(pn_event_t *event)
Get the connection associated with an event.
pn_event_type_t pn_event_type(pn_event_t *event)
Get the type of an event.
pn_transport_t * pn_event_transport(pn_event_t *event)
Get the transport associated with an event.
@ PN_LINK_REMOTE_CLOSE
The remote endpoint has closed the link.
Definition: event.h:223
@ PN_CONNECTION_REMOTE_CLOSE
The remote endpoint has closed the connection.
Definition: event.h:149
@ PN_CONNECTION_INIT
The connection has been created.
Definition: event.h:113
@ PN_TRANSPORT_CLOSED
Indicates that the both the head and tail of the transport are closed.
Definition: event.h:295
@ PN_SESSION_REMOTE_CLOSE
The remote endpoint has closed the session.
Definition: event.h:186
@ PN_LINK_REMOTE_DETACH
The remote endpoint has detached the link.
Definition: event.h:235
@ PN_DELIVERY
A delivery has been created or updated.
Definition: event.h:254
@ PN_PROACTOR_INACTIVE
The proactor has become inactive: all listeners and connections were closed and the timeout (if set) ...
Definition: event.h:344
pn_message_t * pn_message(void)
Construct a new pn_message_t.
void pn_message_free(pn_message_t *msg)
Free a previously constructed pn_message_t.
pn_data_t * pn_message_body(pn_message_t *msg)
Get and set the body of a message.
int pn_message_decode(pn_message_t *msg, const char *bytes, size_t size)
Decode/load message content from AMQP formatted binary data.
struct pn_message_t pn_message_t
An AMQP Message object.
Definition: message.h:51
PNP_EXTERN pn_proactor_t * pn_proactor(void)
Create a proactor.
PNP_EXTERN pn_event_t * pn_event_batch_next(pn_event_batch_t *batch)
Remove the next event from the batch and return it.
#define PN_MAX_ADDR
Size of buffer that can hold the largest connection or listening address.
Definition: proactor.h:74
PNP_EXTERN pn_event_batch_t * pn_proactor_wait(pn_proactor_t *proactor)
Wait until there are Proactor events to handle.
PNP_EXTERN void pn_proactor_free(pn_proactor_t *proactor)
Free the proactor.
struct pn_event_batch_t pn_event_batch_t
A batch of events that must be handled in sequence.
Definition: types.h:462
PNP_EXTERN void pn_proactor_connect2(pn_proactor_t *proactor, pn_connection_t *connection, pn_transport_t *transport, const char *addr)
Connect transport to addr and bind to connection.
struct pn_proactor_t pn_proactor_t
A harness for multithreaded IO.
Definition: types.h:442
PNP_EXTERN void pn_proactor_done(pn_proactor_t *proactor, pn_event_batch_t *events)
Call when finished handling a batch of events.
PNP_EXTERN int pn_proactor_addr(char *addr, size_t size, const char *host, const char *port)
Format a host:port address string for pn_proactor_connect() or pn_proactor_listen()
pn_condition_t * pn_session_remote_condition(pn_session_t *session)
Get the remote condition associated with the session endpoint.
void pn_session_close(pn_session_t *session)
Close a session.
pn_connection_t * pn_session_connection(pn_session_t *session)
Get the parent connection for a session object.
void pn_session_open(pn_session_t *session)
Open a session.
struct pn_session_t pn_session_t
An AMQP Session object.
Definition: types.h:296
pn_session_t * pn_session(pn_connection_t *connection)
Factory for creating a new session on a given connection object.
int pn_terminus_set_address(pn_terminus_t *terminus, const char *address)
Set the address of a terminus object.
pn_condition_t * pn_transport_condition(pn_transport_t *transport)
Get additional information about the condition of the transport.
A mutable holder of application content.
Unsettled API - An API for multithreaded IO.
A container of links.
A network channel supporting an AMQP connection.