import collections
import optparse
import uuid
from proton import Endpoint
from proton.handlers import MessagingHandler
from proton.reactor import Container
class Queue(object):
def __init__(self, dynamic=False):
self.dynamic = dynamic
self.queue = collections.deque()
self.consumers = []
def subscribe(self, consumer):
self.consumers.append(consumer)
def unsubscribe(self, consumer):
"""
:return: True if the queue is to be deleted
"""
if consumer in self.consumers:
self.consumers.remove(consumer)
return len(self.consumers) == 0 and (self.dynamic or len(self.queue) == 0)
def publish(self, message):
self.queue.append(message)
self.dispatch()
def dispatch(self, consumer=None):
if consumer:
c = [consumer]
else:
c = self.consumers
while self._deliver_to(c):
pass
def _deliver_to(self, consumers):
try:
result = False
for c in consumers:
if c.credit:
c.send(self.queue.popleft())
result = True
return result
except IndexError: # no more messages
return False
class Broker(MessagingHandler):
def __init__(self, url):
super(Broker, self).__init__()
self.url = url
self.queues = {}
def on_start(self, event):
self.acceptor = event.container.listen(self.url)
def _queue(self, address):
if address not in self.queues:
self.queues[address] = Queue()
return self.queues[address]
def on_link_opening(self, event):
if event.link.is_sender:
if event.link.remote_source.dynamic:
address = str(uuid.uuid4())
event.link.source.address = address
q = Queue(True)
self.queues[address] = q
q.subscribe(event.link)
elif event.link.remote_source.address:
event.link.source.address = event.link.remote_source.address
self._queue(event.link.source.address).subscribe(event.link)
elif event.link.remote_target.address:
event.link.target.address = event.link.remote_target.address
def _unsubscribe(self, link):
if link.source.address in self.queues and self.queues[link.source.address].unsubscribe(link):
del self.queues[link.source.address]
def on_link_closing(self, event):
if event.link.is_sender:
self._unsubscribe(event.link)
def on_connection_closing(self, event):
self.remove_stale_consumers(event.connection)
def on_disconnected(self, event):
self.remove_stale_consumers(event.connection)
def remove_stale_consumers(self, connection):
l = connection.link_head(Endpoint.REMOTE_ACTIVE)
while l:
if l.is_sender:
self._unsubscribe(l)
l = l.next(Endpoint.REMOTE_ACTIVE)
def on_sendable(self, event):
self._queue(event.link.source.address).dispatch(event.link)
def on_message(self, event):
address = event.link.target.address
if address is None:
address = event.message.address
self._queue(address).publish(event.message)
def main():
parser = optparse.OptionParser(usage="usage: %prog [options]")
parser.add_option("-a", "--address", default="localhost:5672",
help="address router listens on (default %default)")
opts, args = parser.parse_args()
try:
Container(Broker(opts.address)).run()
except KeyboardInterrupt:
pass
if __name__ == '__main__':
main()
Apache Qpid, Messaging built on AMQP; Copyright © 2015 The Apache Software Foundation; Licensed under the Apache License, Version 2.0; Apache Qpid, Qpid, Qpid Proton, Proton, Apache, the Apache feather logo, and the Apache Qpid project logo are trademarks of The Apache Software Foundation; All other marks mentioned may be trademarks or registered trademarks of their respective owners