#include "thread.h"
#include <proton/engine.h>
#include <proton/object.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#define SSL_FILE(NAME) "ssl-certs/" NAME
#define SSL_PW "tserverpw"
#if defined(_WIN32)
# define CERTIFICATE(NAME) SSL_FILE(NAME "-certificate.p12")
# define SET_CREDENTIALS(DOMAIN, NAME) \
pn_ssl_domain_set_credentials(DOMAIN, SSL_FILE(NAME "-full.p12"), "", SSL_PW)
#else
# define CERTIFICATE(NAME) SSL_FILE(NAME "-certificate.pem")
# define SET_CREDENTIALS(DOMAIN, NAME) \
pn_ssl_domain_set_credentials(DOMAIN, CERTIFICATE(NAME), SSL_FILE(NAME "-private-key.pem"), SSL_PW)
#endif
#define VEC(T) struct { T* data; size_t len, cap; }
#define VEC_INIT(V) \
do { \
void **vp = (void**)&V.data; \
V.len = 0; \
V.cap = 16; \
*vp = malloc(V.cap * sizeof(*V.data)); \
} while(0)
#define VEC_FINAL(V) free(V.data)
#define VEC_PUSH(V, X) \
do { \
if (V.len == V.cap) { \
void **vp = (void**)&V.data; \
V.cap *= 2; \
*vp = realloc(V.data, V.cap * sizeof(*V.data)); \
} \
V.data[V.len++] = X; \
} while(0) \
#define VEC_POP(V) \
do { \
if (V.len > 0) \
memmove(V.data, V.data+1, (--V.len)*sizeof(*V.data)); \
} while(0)
typedef struct queue_t {
pthread_mutex_t lock;
char *name;
struct queue_t *next;
size_t sent;
} queue_t;
static void queue_init(queue_t *q, const char* name, queue_t *next) {
pthread_mutex_init(&q->lock, NULL);
q->name = (char*)malloc(strlen(name)+1);
memcpy(q->name, name, strlen(name)+1);
VEC_INIT(q->messages);
VEC_INIT(q->waiting);
q->next = next;
q->sent = 0;
}
static void queue_destroy(queue_t *q) {
size_t i;
pthread_mutex_destroy(&q->lock);
for (i = 0; i < q->messages.len; ++i)
free(q->messages.data[i].start);
VEC_FINAL(q->messages);
for (i = 0; i < q->waiting.len; ++i)
pn_decref(q->waiting.data[i]);
VEC_FINAL(q->waiting);
free(q->name);
}
static void queue_send(queue_t *q,
pn_link_t *s) {
size_t tag = 0;
pthread_mutex_lock(&q->lock);
if (q->messages.len == 0) {
size_t i = 0;
for (; i < q->waiting.len && q->waiting.data[i] != c; ++i)
;
if (i == q->waiting.len) {
VEC_PUSH(q->waiting, c);
}
} else {
m = q->messages.data[0];
VEC_POP(q->messages);
tag = ++q->sent;
}
pthread_mutex_unlock(&q->lock);
free(m.start);
}
}
}
}
}
}
pthread_mutex_lock(&q->lock);
VEC_PUSH(q->messages, m);
if (q->messages.len == 1) {
size_t i;
for (i = 0; i < q->waiting.len; ++i) {
set_check_queues(c, true);
}
q->waiting.len = 0;
}
pthread_mutex_unlock(&q->lock);
}
typedef struct queues_t {
pthread_mutex_t lock;
queue_t *queues;
size_t sent;
} queues_t;
void queues_init(queues_t *qs) {
pthread_mutex_init(&qs->lock, NULL);
qs->queues = NULL;
}
void queues_destroy(queues_t *qs) {
while (qs->queues) {
queue_t *q = qs->queues;
qs->queues = qs->queues->next;
queue_destroy(q);
free(q);
}
pthread_mutex_destroy(&qs->lock);
}
queue_t* queues_get(queues_t *qs, const char* name) {
queue_t *q;
pthread_mutex_lock(&qs->lock);
for (q = qs->queues; q && strcmp(q->name, name) != 0; q = q->next)
;
if (!q) {
q = (queue_t*)malloc(sizeof(queue_t));
queue_init(q, name, qs->queues);
qs->queues = q;
}
pthread_mutex_unlock(&qs->lock);
return q;
}
typedef struct broker_t {
size_t threads;
const char *container_id;
queues_t queues;
} broker_t;
void broker_stop(broker_t *b) {
}
static void link_send(broker_t *b,
pn_link_t *s) {
queue_t *q = queues_get(&b->queues, qname);
queue_send(q, s);
}
}
size_t i;
pthread_mutex_lock(&q->lock);
for (i = 0; i < q->waiting.len; ++i) {
if (q->waiting.data[i] == c){
q->waiting.data[i] = q->waiting.data[0];
VEC_POP(q->waiting);
break;
}
}
pthread_mutex_unlock(&q->lock);
}
static void link_unsub(broker_t *b,
pn_link_t *s) {
if (qname) {
queue_t *q = queues_get(&b->queues, qname);
}
}
}
link_unsub(b, l);
}
link_unsub(b, l);
}
}
}
}
const int WINDOW=5;
printf("listening on %s\n", port);
fflush(stdout);
break;
}
if (b->ssl_domain) {
}
break;
}
break;
break;
}
if (get_check_queues(c)) {
set_check_queues(c, false);
link_send(b, l);
}
break;
}
break;
}
} else {
}
break;
}
break;
}
if (buf) {
free(buf->start);
free(buf);
}
break;
}
ssize_t recv;
m->size += size;
m->start = (char*)realloc(m->start, m->size);
printf("Message aborted\n");
fflush(stdout);
m->size = 0;
}
else if (recv < 0 && recv !=
PN_EOS) {
queue_receive(b->proactor, queues_get(&b->queues, qname), *m);
*m = pn_rwbytes_null;
}
}
break;
}
break;
break;
break;
break;
broker_stop(b);
break;
broker_stop(b);
break;
return false;
default:
break;
}
return true;
}
static void* broker_thread(void *void_broker) {
broker_t *b = (broker_t*)void_broker;
bool finished = false;
do {
if (!handle(b, e, connection)) finished = true;
}
} while(!finished);
return NULL;
}
int main(int argc, char **argv) {
const char *host = (argc > 1) ? argv[1] : "";
const char *port = (argc > 2) ? argv[2] : "amqp";
int err;
broker_t b = {0};
queues_init(&b.queues);
b.container_id = argv[0];
b.threads = 4;
err = SET_CREDENTIALS(b.ssl_domain, "tserver");
if (err) {
printf("Failed to set up server certificate: %s, private key: %s\n", CERTIFICATE("tserver"), SSL_FILE("tserver-private-key.pem"));
}
{
}
{
pthread_t* threads = (pthread_t*)calloc(sizeof(pthread_t), b.threads);
size_t i;
for (i = 0; i < b.threads-1; ++i) {
pthread_create(&threads[i], NULL, broker_thread, &b);
}
broker_thread(&b);
for (i = 0; i < b.threads-1; ++i) {
pthread_join(threads[i], NULL);
}
free(threads);
return 0;
}
}
A non-const byte buffer.
Definition: types.h:235
bool pn_condition_is_set(pn_condition_t *condition)
Returns true if the condition object is holding some information, i.e.
struct pn_condition_t pn_condition_t
An AMQP Condition object.
Definition: condition.h:64
int pn_condition_format(pn_condition_t *, const char *name, const char *fmt,...)
Set the name and printf-style formatted description.
const char * pn_condition_get_description(pn_condition_t *condition)
Gets the description associated with the exceptional condition.
const char * pn_condition_get_name(pn_condition_t *condition)
Returns the name associated with the exceptional condition, or NULL if there is no conditional inform...
#define PN_LOCAL_ACTIVE
The local endpoint state is active.
Definition: connection.h:55
void pn_connection_open(pn_connection_t *connection)
Open a connection.
struct pn_connection_t pn_connection_t
An AMQP Connection object.
Definition: types.h:285
void pn_connection_close(pn_connection_t *connection)
Close a connection.
void pn_connection_set_container(pn_connection_t *connection, const char *container)
Set the AMQP Container name advertised by a connection object.
pn_condition_t * pn_connection_remote_condition(pn_connection_t *connection)
Get the remote condition associated with the connection endpoint.
void * pn_connection_get_context(pn_connection_t *connection)
Get the application context that is associated with a connection object.
void pn_connection_set_context(pn_connection_t *connection, void *context)
Set a new application context for a connection object.
#define PN_REMOTE_ACTIVE
The remote endpoint state is active.
Definition: connection.h:70
bool pn_delivery_readable(pn_delivery_t *delivery)
Check if a delivery is readable.
size_t pn_delivery_pending(pn_delivery_t *delivery)
Get the amount of pending message data for a delivery.
bool pn_delivery_partial(pn_delivery_t *delivery)
Check if a delivery only has partial message data.
void pn_delivery_update(pn_delivery_t *delivery, uint64_t state)
Update the disposition of a delivery.
pn_link_t * pn_delivery_link(pn_delivery_t *delivery)
Get the parent link for a delivery object.
pn_delivery_t * pn_delivery(pn_link_t *link, pn_delivery_tag_t tag)
Create a delivery on a link.
void pn_delivery_settle(pn_delivery_t *delivery)
Settle a delivery.
#define PN_ACCEPTED
The PN_ACCEPTED delivery state is a terminal state indicating that the delivery was successfully proc...
Definition: disposition.h:66
struct pn_delivery_t pn_delivery_t
An AMQP Delivery object.
Definition: types.h:405
pn_delivery_tag_t pn_dtag(const char *bytes, size_t size)
Construct a delivery tag.
#define PN_ABORTED
Delivery aborted error.
Definition: error.h:57
#define PN_EOS
End of stream.
Definition: error.h:47
const char * pn_code(int code)
Get the name of the error code.
pn_delivery_t * pn_event_delivery(pn_event_t *event)
Get the delivery associated with an event.
pn_session_t * pn_event_session(pn_event_t *event)
Get the session associated with an event.
const char * pn_event_type_name(pn_event_type_t type)
Get a human readable name for an event type.
pn_link_t * pn_event_link(pn_event_t *event)
Get the link associated with an event.
struct pn_event_t pn_event_t
Notification of a state change in the protocol engine.
Definition: event.h:75
pn_connection_t * pn_event_connection(pn_event_t *event)
Get the connection associated with an event.
pn_event_type_t pn_event_type(pn_event_t *event)
Get the type of an event.
pn_transport_t * pn_event_transport(pn_event_t *event)
Get the transport associated with an event.
@ PN_LISTENER_ACCEPT
Indicates the listener has an incoming connection, call pn_listener_accept2() to accept it.
Definition: event.h:316
@ PN_LINK_REMOTE_CLOSE
The remote endpoint has closed the link.
Definition: event.h:223
@ PN_CONNECTION_REMOTE_CLOSE
The remote endpoint has closed the connection.
Definition: event.h:149
@ PN_CONNECTION_INIT
The connection has been created.
Definition: event.h:113
@ PN_TRANSPORT_CLOSED
Indicates that the both the head and tail of the transport are closed.
Definition: event.h:295
@ PN_CONNECTION_WAKE
pn_connection_wake() was called.
Definition: event.h:309
@ PN_SESSION_REMOTE_CLOSE
The remote endpoint has closed the session.
Definition: event.h:186
@ PN_LINK_REMOTE_OPEN
The remote endpoint has opened the link.
Definition: event.h:211
@ PN_LINK_FLOW
The flow control state for a link has changed.
Definition: event.h:241
@ PN_LISTENER_OPEN
The listener is listening.
Definition: event.h:350
@ PN_DELIVERY
A delivery has been created or updated.
Definition: event.h:254
@ PN_PROACTOR_INACTIVE
The proactor has become inactive: all listeners and connections were closed and the timeout (if set) ...
Definition: event.h:344
@ PN_CONNECTION_REMOTE_OPEN
The remote endpoint has opened the connection.
Definition: event.h:137
@ PN_LISTENER_CLOSE
Indicates the listener has closed.
Definition: event.h:322
@ PN_LINK_FINAL
The link has been freed and any outstanding processing has been completed.
Definition: event.h:248
@ PN_PROACTOR_INTERRUPT
Indicates pn_proactor_interrupt() was called to interrupt a proactor thread.
Definition: event.h:328
@ PN_SESSION_REMOTE_OPEN
The remote endpoint has opened the session.
Definition: event.h:174
ssize_t pn_link_recv(pn_link_t *receiver, char *bytes, size_t n)
Receive message data for the current delivery on a link.
pn_session_t * pn_link_session(pn_link_t *link)
Get the parent session for a link object.
pn_condition_t * pn_link_remote_condition(pn_link_t *link)
Get the remote condition associated with a link endpoint.
void pn_link_set_context(pn_link_t *link, void *context)
Set a new application context for a link object.
pn_terminus_t * pn_link_source(pn_link_t *link)
Access the locally defined source definition for a link.
void pn_link_close(pn_link_t *link)
Close a link.
pn_condition_t * pn_link_condition(pn_link_t *link)
Get the local condition associated with a link endpoint.
int pn_link_credit(pn_link_t *link)
Get the credit balance for a link.
bool pn_link_is_sender(pn_link_t *link)
Test if a link is a sender.
pn_terminus_t * pn_link_remote_source(pn_link_t *link)
Access the remotely defined source definition for a link.
struct pn_link_t pn_link_t
An AMQP Link object.
Definition: types.h:315
bool pn_link_advance(pn_link_t *link)
Advance the current delivery of a link to the next delivery on the link.
pn_link_t * pn_link_head(pn_connection_t *connection, pn_state_t state)
Retrieve the first link that matches the given state mask.
ssize_t pn_link_send(pn_link_t *sender, const char *bytes, size_t n)
Send message data for the current delivery on a link.
pn_link_t * pn_link_next(pn_link_t *link, pn_state_t state)
Retrieve the next link that matches the given state mask.
void pn_link_open(pn_link_t *link)
Open a link.
void * pn_link_get_context(pn_link_t *link)
Get the application context that is associated with a link object.
pn_terminus_t * pn_link_remote_target(pn_link_t *link)
Access the remotely defined target definition for a link.
void pn_link_free(pn_link_t *link)
Free a link object.
pn_terminus_t * pn_link_target(pn_link_t *link)
Access the locally defined target definition for a link.
void pn_link_flow(pn_link_t *receiver, int credit)
Grant credit for incoming deliveries on a receiver.
PNP_EXTERN void pn_listener_accept2(pn_listener_t *listener, pn_connection_t *connection, pn_transport_t *transport)
Accept an incoming connection request using transport and connection, which can be configured before ...
PNP_EXTERN pn_listener_t * pn_listener(void)
Create a listener to pass to pn_proactor_listen()
PNP_EXTERN pn_listener_t * pn_event_listener(pn_event_t *event)
Return the listener associated with an event.
PNP_EXTERN pn_condition_t * pn_listener_condition(pn_listener_t *l)
Get the error condition for a listener.
PNP_EXTERN pn_proactor_t * pn_proactor(void)
Create a proactor.
PNP_EXTERN pn_event_t * pn_event_batch_next(pn_event_batch_t *batch)
Remove the next event from the batch and return it.
PNP_EXTERN void pn_connection_wake(pn_connection_t *connection)
Return a PN_CONNECTION_WAKE event for connection as soon as possible.
#define PN_MAX_ADDR
Size of buffer that can hold the largest connection or listening address.
Definition: proactor.h:74
PNP_EXTERN pn_event_batch_t * pn_proactor_wait(pn_proactor_t *proactor)
Wait until there are Proactor events to handle.
PNP_EXTERN void pn_proactor_free(pn_proactor_t *proactor)
Free the proactor.
struct pn_event_batch_t pn_event_batch_t
A batch of events that must be handled in sequence.
Definition: types.h:462
PNP_EXTERN pn_connection_t * pn_event_batch_connection(pn_event_batch_t *batch)
Query the batch for the subject of the batch.
PNP_EXTERN int pn_netaddr_host_port(const pn_netaddr_t *na, char *host, size_t hlen, char *port, size_t plen)
Get the host and port name from na as separate strings.
PNP_EXTERN void pn_proactor_interrupt(pn_proactor_t *proactor)
Return a PN_PROACTOR_INTERRUPT event as soon as possible.
struct pn_proactor_t pn_proactor_t
A harness for multithreaded IO.
Definition: types.h:442
PNP_EXTERN void pn_proactor_listen(pn_proactor_t *proactor, pn_listener_t *listener, const char *addr, int backlog)
Start listening for incoming connections.
PNP_EXTERN void pn_proactor_done(pn_proactor_t *proactor, pn_event_batch_t *events)
Call when finished handling a batch of events.
PNP_EXTERN const pn_netaddr_t * pn_listener_addr(pn_listener_t *l)
Get the listening addresses of a listener.
PNP_EXTERN int pn_proactor_addr(char *addr, size_t size, const char *host, const char *port)
Format a host:port address string for pn_proactor_connect() or pn_proactor_listen()
pn_sasl_t * pn_sasl(pn_transport_t *transport)
Construct an Authentication and Security Layer object.
void pn_sasl_allowed_mechs(pn_sasl_t *sasl, const char *mechs)
SASL mechanisms that are to be considered for authentication.
pn_condition_t * pn_session_remote_condition(pn_session_t *session)
Get the remote condition associated with the session endpoint.
void pn_session_free(pn_session_t *session)
Free a session object.
void pn_session_close(pn_session_t *session)
Close a session.
pn_connection_t * pn_session_connection(pn_session_t *session)
Get the parent connection for a session object.
void pn_session_open(pn_session_t *session)
Open a session.
struct pn_session_t pn_session_t
An AMQP Session object.
Definition: types.h:296
int pn_ssl_init(pn_ssl_t *ssl, pn_ssl_domain_t *domain, const char *session_id)
Initialize an SSL session.
void pn_ssl_domain_free(pn_ssl_domain_t *domain)
Release an SSL configuration domain.
pn_ssl_domain_t * pn_ssl_domain(pn_ssl_mode_t mode)
Create an SSL configuration domain.
pn_ssl_t * pn_ssl(pn_transport_t *transport)
Create a new SSL session object associated with a transport.
struct pn_ssl_domain_t pn_ssl_domain_t
API for using SSL with the Transport Layer.
Definition: ssl.h:80
@ PN_SSL_MODE_SERVER
Local connection endpoint is an SSL server.
Definition: ssl.h:92
int pn_terminus_set_address(pn_terminus_t *terminus, const char *address)
Set the address of a terminus object.
const char * pn_terminus_get_address(pn_terminus_t *terminus)
Get the address of a terminus object.
pn_transport_t * pn_transport(void)
Factory for creating a transport.
struct pn_transport_t pn_transport_t
A network channel supporting an AMQP connection.
Definition: types.h:435
void pn_transport_set_server(pn_transport_t *transport)
Configure a transport as a server.
void pn_transport_require_encryption(pn_transport_t *transport, bool required)
Set whether a non encrypted transport connection is allowed.
pn_condition_t * pn_transport_condition(pn_transport_t *transport)
Get additional information about the condition of the transport.
Unsettled API - A listener for incoming connections.
Unsettled API - The network address of a proactor transport.
Unsettled API - An API for multithreaded IO.
SASL secure transport layer.
SSL secure transport layer.
A network channel supporting an AMQP connection.