import sys
import threading
from proton.reactor import ApplicationEvent, Container, EventInjector
from proton.handlers import MessagingHandler, TransactionHandler
class TxRecv(MessagingHandler, TransactionHandler):
def __init__(self):
super(TxRecv, self).__init__(prefetch=0, auto_accept=False)
def on_start(self, event):
self.container = event.container
self.conn = self.container.connect("localhost:5672")
self.receiver = self.container.create_receiver(self.conn, "examples")
self.container.declare_transaction(self.conn, handler=self, settle_before_discharge=True)
self.transaction = None
def on_message(self, event):
print(event.message.body)
self.transaction.accept(event.delivery)
def on_transaction_declared(self, event):
self.transaction = event.transaction
print("transaction declared")
def on_transaction_committed(self, event):
print("transaction committed")
self.container.declare_transaction(self.conn, handler=self)
def on_transaction_aborted(self, event):
print("transaction aborted")
self.container.declare_transaction(self.conn, handler=self)
def on_commit(self, event):
self.transaction.commit()
def on_abort(self, event):
self.transaction.abort()
def on_fetch(self, event):
self.receiver.flow(1)
def on_quit(self, event):
c = self.receiver.connection
self.receiver.close()
c.close()
try:
reactor = Container(TxRecv())
events = EventInjector()
reactor.selectable(events)
thread = threading.Thread(target=reactor.run)
thread.daemon = True
thread.start()
print("Enter 'fetch', 'commit' or 'abort'")
while True:
line = sys.stdin.readline()
if line:
events.trigger(ApplicationEvent(line.strip()))
else:
break
except KeyboardInterrupt:
pass
Apache Qpid, Messaging built on AMQP; Copyright © 2015 The Apache Software Foundation; Licensed under the Apache License, Version 2.0; Apache Qpid, Qpid, Qpid Proton, Proton, Apache, the Apache feather logo, and the Apache Qpid project logo are trademarks of The Apache Software Foundation; All other marks mentioned may be trademarks or registered trademarks of their respective owners