Menu Search

broker.py

import collections
import optparse
import uuid

from proton import Endpoint
from proton.handlers import MessagingHandler
from proton.reactor import Container


class Queue(object):
    def __init__(self, dynamic=False):
        self.dynamic = dynamic
        self.queue = collections.deque()
        self.consumers = []

    def subscribe(self, consumer):
        self.consumers.append(consumer)

    def unsubscribe(self, consumer):
        """
        :return: True if the queue is to be deleted
        """
        if consumer in self.consumers:
            self.consumers.remove(consumer)
        return len(self.consumers) == 0 and (self.dynamic or len(self.queue) == 0)

    def publish(self, message):
        self.queue.append(message)
        self.dispatch()

    def dispatch(self, consumer=None):
        if consumer:
            c = [consumer]
        else:
            c = self.consumers
        while self._deliver_to(c):
            pass

    def _deliver_to(self, consumers):
        try:
            result = False
            for c in consumers:
                if c.credit:
                    c.send(self.queue.popleft())
                    result = True
            return result
        except IndexError:  # no more messages
            return False


class Broker(MessagingHandler):
    def __init__(self, url):
        super(Broker, self).__init__()
        self.url = url
        self.queues = {}

    def on_start(self, event):
        self.acceptor = event.container.listen(self.url)

    def _queue(self, address):
        if address not in self.queues:
            self.queues[address] = Queue()
        return self.queues[address]

    def on_link_opening(self, event):
        if event.link.is_sender:
            if event.link.remote_source.dynamic:
                address = str(uuid.uuid4())
                event.link.source.address = address
                q = Queue(True)
                self.queues[address] = q
                q.subscribe(event.link)
            elif event.link.remote_source.address:
                event.link.source.address = event.link.remote_source.address
                self._queue(event.link.source.address).subscribe(event.link)
        elif event.link.remote_target.address:
            event.link.target.address = event.link.remote_target.address

    def _unsubscribe(self, link):
        if link.source.address in self.queues and self.queues[link.source.address].unsubscribe(link):
            del self.queues[link.source.address]

    def on_link_closing(self, event):
        if event.link.is_sender:
            self._unsubscribe(event.link)

    def on_connection_closing(self, event):
        self.remove_stale_consumers(event.connection)

    def on_disconnected(self, event):
        self.remove_stale_consumers(event.connection)

    def remove_stale_consumers(self, connection):
        l = connection.link_head(Endpoint.REMOTE_ACTIVE)
        while l:
            if l.is_sender:
                self._unsubscribe(l)
            l = l.next(Endpoint.REMOTE_ACTIVE)

    def on_sendable(self, event):
        self._queue(event.link.source.address).dispatch(event.link)

    def on_message(self, event):
        address = event.link.target.address
        if address is None:
            address = event.message.address
        self._queue(address).publish(event.message)


def main():
    parser = optparse.OptionParser(usage="usage: %prog [options]")
    parser.add_option("-a", "--address", default="localhost:5672",
                      help="address router listens on (default %default)")
    opts, args = parser.parse_args()

    try:
        Container(Broker(opts.address)).run()
    except KeyboardInterrupt:
        pass


if __name__ == '__main__':
    main()

Download this file