Package proton :: Module reactor
[frames] | no frames]

Source Code for Module proton.reactor

  1  from __future__ import absolute_import 
  2  # 
  3  # Licensed to the Apache Software Foundation (ASF) under one 
  4  # or more contributor license agreements.  See the NOTICE file 
  5  # distributed with this work for additional information 
  6  # regarding copyright ownership.  The ASF licenses this file 
  7  # to you under the Apache License, Version 2.0 (the 
  8  # "License"); you may not use this file except in compliance 
  9  # with the License.  You may obtain a copy of the License at 
 10  # 
 11  #   http://www.apache.org/licenses/LICENSE-2.0 
 12  # 
 13  # Unless required by applicable law or agreed to in writing, 
 14  # software distributed under the License is distributed on an 
 15  # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
 16  # KIND, either express or implied.  See the License for the 
 17  # specific language governing permissions and limitations 
 18  # under the License. 
 19  # 
 20  import logging, os, socket, time, types 
 21  from heapq import heappush, heappop, nsmallest 
 22  from proton import Collector, Connection, ConnectionException, Delivery, Described, dispatch 
 23  from proton import Endpoint, Event, EventBase, EventType, generate_uuid, Handler, Link, Message 
 24  from proton import ProtonException, PN_ACCEPTED, PN_PYREF, SASL, Session, SSL, SSLDomain, SSLUnavailable, symbol 
 25  from proton import Terminus, Timeout, Transport, TransportException, ulong, Url 
 26  from select import select 
 27  from proton.handlers import OutgoingMessageHandler 
 28  from proton import unicode2utf8, utf82unicode 
 29   
 30  import traceback 
 31  from proton import WrappedHandler, _chandler, secs2millis, millis2secs, timeout2millis, millis2timeout, Selectable 
 32  from .wrapper import Wrapper, PYCTX 
 33  from cproton import * 
 34  from . import _compat 
 35   
 36  try: 
 37      import Queue 
 38  except ImportError: 
 39      import queue as Queue 
 40   
 41  log = logging.getLogger("proton") 
42 43 -class Task(Wrapper):
44 45 @staticmethod
46 - def wrap(impl):
47 if impl is None: 48 return None 49 else: 50 return Task(impl)
51
52 - def __init__(self, impl):
53 Wrapper.__init__(self, impl, pn_task_attachments)
54
55 - def _init(self):
56 pass
57
58 - def cancel(self):
59 pn_task_cancel(self._impl)
60
61 -class Acceptor(Wrapper):
62
63 - def __init__(self, impl):
64 Wrapper.__init__(self, impl)
65
66 - def set_ssl_domain(self, ssl_domain):
67 pn_acceptor_set_ssl_domain(self._impl, ssl_domain._domain)
68
69 - def close(self):
70 pn_acceptor_close(self._impl)
71
72 -class Reactor(Wrapper):
73 74 @staticmethod
75 - def wrap(impl):
76 if impl is None: 77 return None 78 else: 79 record = pn_reactor_attachments(impl) 80 attrs = pn_void2py(pn_record_get(record, PYCTX)) 81 if attrs and 'subclass' in attrs: 82 return attrs['subclass'](impl=impl) 83 else: 84 return Reactor(impl=impl)
85
86 - def __init__(self, *handlers, **kwargs):
87 Wrapper.__init__(self, kwargs.get("impl", pn_reactor), pn_reactor_attachments) 88 for h in handlers: 89 self.handler.add(h, on_error=self.on_error_delegate())
90
91 - def _init(self):
92 self.errors = []
93 94 # on_error relay handler tied to underlying C reactor. Use when the 95 # error will always be generated from a callback from this reactor. 96 # Needed to prevent reference cycles and be compatible with wrappers.
97 - class ErrorDelegate(object):
98 - def __init__(self, reactor):
99 self.reactor_impl = reactor._impl
100 - def on_error(self, info):
101 ractor = Reactor.wrap(self.reactor_impl) 102 ractor.on_error(info)
103
104 - def on_error_delegate(self):
105 return Reactor.ErrorDelegate(self).on_error
106
107 - def on_error(self, info):
108 self.errors.append(info) 109 self.yield_()
110
111 - def _get_global(self):
112 return WrappedHandler.wrap(pn_reactor_get_global_handler(self._impl), self.on_error_delegate())
113
114 - def _set_global(self, handler):
115 impl = _chandler(handler, self.on_error_delegate()) 116 pn_reactor_set_global_handler(self._impl, impl) 117 pn_decref(impl)
118 119 global_handler = property(_get_global, _set_global) 120
121 - def _get_timeout(self):
122 return millis2timeout(pn_reactor_get_timeout(self._impl))
123
124 - def _set_timeout(self, secs):
125 return pn_reactor_set_timeout(self._impl, timeout2millis(secs))
126 127 timeout = property(_get_timeout, _set_timeout) 128
129 - def yield_(self):
130 pn_reactor_yield(self._impl)
131
132 - def mark(self):
133 return pn_reactor_mark(self._impl)
134
135 - def _get_handler(self):
136 return WrappedHandler.wrap(pn_reactor_get_handler(self._impl), self.on_error_delegate())
137
138 - def _set_handler(self, handler):
139 impl = _chandler(handler, self.on_error_delegate()) 140 pn_reactor_set_handler(self._impl, impl) 141 pn_decref(impl)
142 143 handler = property(_get_handler, _set_handler) 144
145 - def run(self):
146 self.timeout = 3.14159265359 147 self.start() 148 while self.process(): pass 149 self.stop() 150 self.process() 151 self.global_handler = None 152 self.handler = None
153
154 - def wakeup(self):
155 n = pn_reactor_wakeup(self._impl) 156 if n: raise IOError(pn_error_text(pn_reactor_error(self._impl)))
157
158 - def start(self):
159 pn_reactor_start(self._impl)
160 161 @property
162 - def quiesced(self):
163 return pn_reactor_quiesced(self._impl)
164
165 - def _check_errors(self):
166 if self.errors: 167 for exc, value, tb in self.errors[:-1]: 168 traceback.print_exception(exc, value, tb) 169 exc, value, tb = self.errors[-1] 170 _compat.raise_(exc, value, tb)
171
172 - def process(self):
173 result = pn_reactor_process(self._impl) 174 self._check_errors() 175 return result
176
177 - def stop(self):
178 pn_reactor_stop(self._impl) 179 self._check_errors()
180
181 - def schedule(self, delay, task):
182 impl = _chandler(task, self.on_error_delegate()) 183 task = Task.wrap(pn_reactor_schedule(self._impl, secs2millis(delay), impl)) 184 pn_decref(impl) 185 return task
186
187 - def acceptor(self, host, port, handler=None):
188 impl = _chandler(handler, self.on_error_delegate()) 189 aimpl = pn_reactor_acceptor(self._impl, unicode2utf8(host), str(port), impl) 190 pn_decref(impl) 191 if aimpl: 192 return Acceptor(aimpl) 193 else: 194 raise IOError("%s (%s:%s)" % (pn_error_text(pn_reactor_error(self._impl)), host, port))
195
196 - def connection(self, handler=None):
197 """Deprecated: use connection_to_host() instead 198 """ 199 impl = _chandler(handler, self.on_error_delegate()) 200 result = Connection.wrap(pn_reactor_connection(self._impl, impl)) 201 if impl: pn_decref(impl) 202 return result
203
204 - def connection_to_host(self, host, port, handler=None):
205 """Create an outgoing Connection that will be managed by the reactor. 206 The reator's pn_iohandler will create a socket connection to the host 207 once the connection is opened. 208 """ 209 conn = self.connection(handler) 210 self.set_connection_host(conn, host, port) 211 return conn
212
213 - def set_connection_host(self, connection, host, port):
214 """Change the address used by the connection. The address is 215 used by the reactor's iohandler to create an outgoing socket 216 connection. This must be set prior to opening the connection. 217 """ 218 pn_reactor_set_connection_host(self._impl, 219 connection._impl, 220 unicode2utf8(str(host)), 221 unicode2utf8(str(port)))
222
223 - def get_connection_address(self, connection):
224 """This may be used to retrieve the remote peer address. 225 @return: string containing the address in URL format or None if no 226 address is available. Use the proton.Url class to create a Url object 227 from the returned value. 228 """ 229 _url = pn_reactor_get_connection_address(self._impl, connection._impl) 230 return utf82unicode(_url)
231
232 - def selectable(self, handler=None):
233 impl = _chandler(handler, self.on_error_delegate()) 234 result = Selectable.wrap(pn_reactor_selectable(self._impl)) 235 if impl: 236 record = pn_selectable_attachments(result._impl) 237 pn_record_set_handler(record, impl) 238 pn_decref(impl) 239 return result
240
241 - def update(self, sel):
242 pn_reactor_update(self._impl, sel._impl)
243
244 - def push_event(self, obj, etype):
245 pn_collector_put(pn_reactor_collector(self._impl), PN_PYREF, pn_py2void(obj), etype.number)
246 247 from proton import wrappers as _wrappers 248 _wrappers["pn_reactor"] = lambda x: Reactor.wrap(pn_cast_pn_reactor(x)) 249 _wrappers["pn_task"] = lambda x: Task.wrap(pn_cast_pn_task(x))
250 251 252 -class EventInjector(object):
253 """ 254 Can be added to a reactor to allow events to be triggered by an 255 external thread but handled on the event thread associated with 256 the reactor. An instance of this class can be passed to the 257 Reactor.selectable() method of the reactor in order to activate 258 it. The close() method should be called when it is no longer 259 needed, to allow the event loop to end if needed. 260 """
261 - def __init__(self):
262 self.queue = Queue.Queue() 263 self.pipe = os.pipe() 264 self._closed = False
265
266 - def trigger(self, event):
267 """ 268 Request that the given event be dispatched on the event thread 269 of the reactor to which this EventInjector was added. 270 """ 271 self.queue.put(event) 272 os.write(self.pipe[1], _compat.str2bin("!"))
273
274 - def close(self):
275 """ 276 Request that this EventInjector be closed. Existing events 277 will be dispctahed on the reactors event dispactch thread, 278 then this will be removed from the set of interest. 279 """ 280 self._closed = True 281 os.write(self.pipe[1], _compat.str2bin("!"))
282
283 - def fileno(self):
284 return self.pipe[0]
285
286 - def on_selectable_init(self, event):
287 sel = event.context 288 sel.fileno(self.fileno()) 289 sel.reading = True 290 event.reactor.update(sel)
291
292 - def on_selectable_readable(self, event):
293 os.read(self.pipe[0], 512) 294 while not self.queue.empty(): 295 requested = self.queue.get() 296 event.reactor.push_event(requested.context, requested.type) 297 if self._closed: 298 s = event.context 299 s.terminate() 300 event.reactor.update(s)
301
302 303 -class ApplicationEvent(EventBase):
304 """ 305 Application defined event, which can optionally be associated with 306 an engine object and or an arbitrary subject 307 """
308 - def __init__(self, typename, connection=None, session=None, link=None, delivery=None, subject=None):
309 super(ApplicationEvent, self).__init__(PN_PYREF, self, EventType(typename)) 310 self.connection = connection 311 self.session = session 312 self.link = link 313 self.delivery = delivery 314 if self.delivery: 315 self.link = self.delivery.link 316 if self.link: 317 self.session = self.link.session 318 if self.session: 319 self.connection = self.session.connection 320 self.subject = subject
321
322 - def __repr__(self):
323 objects = [self.connection, self.session, self.link, self.delivery, self.subject] 324 return "%s(%s)" % (self.type, ", ".join([str(o) for o in objects if o is not None]))
325
326 -class Transaction(object):
327 """ 328 Class to track state of an AMQP 1.0 transaction. 329 """
330 - def __init__(self, txn_ctrl, handler, settle_before_discharge=False):
331 self.txn_ctrl = txn_ctrl 332 self.handler = handler 333 self.id = None 334 self._declare = None 335 self._discharge = None 336 self.failed = False 337 self._pending = [] 338 self.settle_before_discharge = settle_before_discharge 339 self.declare()
340
341 - def commit(self):
342 self.discharge(False)
343
344 - def abort(self):
345 self.discharge(True)
346
347 - def declare(self):
348 self._declare = self._send_ctrl(symbol(u'amqp:declare:list'), [None])
349
350 - def discharge(self, failed):
351 self.failed = failed 352 self._discharge = self._send_ctrl(symbol(u'amqp:discharge:list'), [self.id, failed])
353
354 - def _send_ctrl(self, descriptor, value):
355 delivery = self.txn_ctrl.send(Message(body=Described(descriptor, value))) 356 delivery.transaction = self 357 return delivery
358
359 - def send(self, sender, msg, tag=None):
360 dlv = sender.send(msg, tag=tag) 361 dlv.local.data = [self.id] 362 dlv.update(0x34) 363 return dlv
364
365 - def accept(self, delivery):
366 self.update(delivery, PN_ACCEPTED) 367 if self.settle_before_discharge: 368 delivery.settle() 369 else: 370 self._pending.append(delivery)
371
372 - def update(self, delivery, state=None):
373 if state: 374 delivery.local.data = [self.id, Described(ulong(state), [])] 375 delivery.update(0x34)
376
377 - def _release_pending(self):
378 for d in self._pending: 379 d.update(Delivery.RELEASED) 380 d.settle() 381 self._clear_pending()
382
383 - def _clear_pending(self):
384 self._pending = []
385
386 - def handle_outcome(self, event):
387 if event.delivery == self._declare: 388 if event.delivery.remote.data: 389 self.id = event.delivery.remote.data[0] 390 self.handler.on_transaction_declared(event) 391 elif event.delivery.remote_state == Delivery.REJECTED: 392 self.handler.on_transaction_declare_failed(event) 393 else: 394 log.warning("Unexpected outcome for declare: %s" % event.delivery.remote_state) 395 self.handler.on_transaction_declare_failed(event) 396 elif event.delivery == self._discharge: 397 if event.delivery.remote_state == Delivery.REJECTED: 398 if not self.failed: 399 self.handler.on_transaction_commit_failed(event) 400 self._release_pending() # make this optional? 401 else: 402 if self.failed: 403 self.handler.on_transaction_aborted(event) 404 self._release_pending() 405 else: 406 self.handler.on_transaction_committed(event) 407 self._clear_pending()
408
409 -class LinkOption(object):
410 """ 411 Abstract interface for link configuration options 412 """
413 - def apply(self, link):
414 """ 415 Subclasses will implement any configuration logic in this 416 method 417 """ 418 pass
419 - def test(self, link):
420 """ 421 Subclasses can override this to selectively apply an option 422 e.g. based on some link criteria 423 """ 424 return True
425
426 -class AtMostOnce(LinkOption):
427 - def apply(self, link):
429
430 -class AtLeastOnce(LinkOption):
431 - def apply(self, link):
434
435 -class SenderOption(LinkOption):
436 - def apply(self, sender): pass
437 - def test(self, link): return link.is_sender
438
439 -class ReceiverOption(LinkOption):
440 - def apply(self, receiver): pass
441 - def test(self, link): return link.is_receiver
442
443 -class DynamicNodeProperties(LinkOption):
444 - def __init__(self, props={}):
445 self.properties = {} 446 for k in props: 447 if isinstance(k, symbol): 448 self.properties[k] = props[k] 449 else: 450 self.properties[symbol(k)] = props[k]
451
452 - def apply(self, link):
457
458 -class Filter(ReceiverOption):
459 - def __init__(self, filter_set={}):
460 self.filter_set = filter_set
461
462 - def apply(self, receiver):
463 receiver.source.filter.put_dict(self.filter_set)
464
465 -class Selector(Filter):
466 """ 467 Configures a link with a message selector filter 468 """
469 - def __init__(self, value, name='selector'):
470 super(Selector, self).__init__({symbol(name): Described(symbol('apache.org:selector-filter:string'), value)})
471
472 -class DurableSubscription(ReceiverOption):
473 - def apply(self, receiver):
476
477 -class Move(ReceiverOption):
478 - def apply(self, receiver):
480
481 -class Copy(ReceiverOption):
482 - def apply(self, receiver):
484 492
493 -def _create_session(connection, handler=None):
494 session = connection.session() 495 session.open() 496 return session
497
498 499 -def _get_attr(target, name):
500 if hasattr(target, name): 501 return getattr(target, name) 502 else: 503 return None
504
505 -class SessionPerConnection(object):
506 - def __init__(self):
507 self._default_session = None
508
509 - def session(self, connection):
510 if not self._default_session: 511 self._default_session = _create_session(connection) 512 return self._default_session
513
514 -class GlobalOverrides(object):
515 """ 516 Internal handler that triggers the necessary socket connect for an 517 opened connection. 518 """
519 - def __init__(self, base):
520 self.base = base
521
522 - def on_unhandled(self, name, event):
523 if not self._override(event): 524 event.dispatch(self.base)
525
526 - def _override(self, event):
527 conn = event.connection 528 return conn and hasattr(conn, '_overrides') and event.dispatch(conn._overrides)
529
530 -class Connector(Handler):
531 """ 532 Internal handler that triggers the necessary socket connect for an 533 opened connection. 534 """
535 - def __init__(self, connection):
536 self.connection = connection 537 self.address = None 538 self.heartbeat = None 539 self.reconnect = None 540 self.ssl_domain = None 541 self.allow_insecure_mechs = True 542 self.allowed_mechs = None 543 self.sasl_enabled = True 544 self.user = None 545 self.password = None 546 self.virtual_host = None 547 self.ssl_sni = None 548 self.max_frame_size = None
549
550 - def _connect(self, connection, reactor):
551 assert(reactor is not None) 552 url = self.address.next() 553 reactor.set_connection_host(connection, url.host, str(url.port)) 554 # if virtual-host not set, use host from address as default 555 if self.virtual_host is None: 556 connection.hostname = url.host 557 log.debug("connecting to %s..." % url) 558 559 transport = Transport() 560 if self.sasl_enabled: 561 sasl = transport.sasl() 562 sasl.allow_insecure_mechs = self.allow_insecure_mechs 563 if url.username: 564 connection.user = url.username 565 elif self.user: 566 connection.user = self.user 567 if url.password: 568 connection.password = url.password 569 elif self.password: 570 connection.password = self.password 571 if self.allowed_mechs: 572 sasl.allowed_mechs(self.allowed_mechs) 573 transport.bind(connection) 574 if self.heartbeat: 575 transport.idle_timeout = self.heartbeat 576 if url.scheme == 'amqps': 577 if not self.ssl_domain: 578 raise SSLUnavailable("amqps: SSL libraries not found") 579 self.ssl = SSL(transport, self.ssl_domain) 580 self.ssl.peer_hostname = self.ssl_sni or self.virtual_host or url.host 581 if self.max_frame_size: 582 transport.max_frame_size = self.max_frame_size
583
584 - def on_connection_local_open(self, event):
585 self._connect(event.connection, event.reactor)
586
587 - def on_connection_remote_open(self, event):
588 log.debug("connected to %s" % event.connection.hostname) 589 if self.reconnect: 590 self.reconnect.reset() 591 self.transport = None
592
593 - def on_transport_tail_closed(self, event):
594 self.on_transport_closed(event)
595
596 - def on_transport_closed(self, event):
597 if self.connection is None: return 598 if self.connection.state & Endpoint.LOCAL_ACTIVE: 599 if self.reconnect: 600 event.transport.unbind() 601 delay = self.reconnect.next() 602 if delay == 0: 603 log.info("Disconnected, reconnecting...") 604 self._connect(self.connection, event.reactor) 605 return 606 else: 607 log.info("Disconnected will try to reconnect after %s seconds" % delay) 608 event.reactor.schedule(delay, self) 609 return 610 else: 611 log.debug("Disconnected") 612 # See connector.cpp: conn.free()/pn_connection_release() here? 613 self.connection = None
614
615 - def on_timer_task(self, event):
616 self._connect(self.connection, event.reactor)
617
618 -class Backoff(object):
619 """ 620 A reconnect strategy involving an increasing delay between 621 retries, up to a maximum or 10 seconds. 622 """
623 - def __init__(self):
624 self.delay = 0
625
626 - def reset(self):
627 self.delay = 0
628
629 - def next(self):
630 current = self.delay 631 if current == 0: 632 self.delay = 0.1 633 else: 634 self.delay = min(10, 2*current) 635 return current
636
637 -class Urls(object):
638 - def __init__(self, values):
639 self.values = [Url(v) for v in values] 640 self.i = iter(self.values)
641
642 - def __iter__(self):
643 return self
644
645 - def next(self):
646 try: 647 return next(self.i) 648 except StopIteration: 649 self.i = iter(self.values) 650 return next(self.i)
651
652 -class SSLConfig(object):
653 - def __init__(self):
654 self.client = SSLDomain(SSLDomain.MODE_CLIENT) 655 self.server = SSLDomain(SSLDomain.MODE_SERVER)
656
657 - def set_credentials(self, cert_file, key_file, password):
658 self.client.set_credentials(cert_file, key_file, password) 659 self.server.set_credentials(cert_file, key_file, password)
660
661 - def set_trusted_ca_db(self, certificate_db):
662 self.client.set_trusted_ca_db(certificate_db) 663 self.server.set_trusted_ca_db(certificate_db)
664
665 666 -class Container(Reactor):
667 """A representation of the AMQP concept of a 'container', which 668 lossely speaking is something that establishes links to or from 669 another container, over which messages are transfered. This is 670 an extension to the Reactor class that adds convenience methods 671 for creating connections and sender- or receiver- links. 672 """
673 - def __init__(self, *handlers, **kwargs):
674 super(Container, self).__init__(*handlers, **kwargs) 675 if "impl" not in kwargs: 676 try: 677 self.ssl = SSLConfig() 678 except SSLUnavailable: 679 self.ssl = None 680 self.global_handler = GlobalOverrides(kwargs.get('global_handler', self.global_handler)) 681 self.trigger = None 682 self.container_id = str(generate_uuid()) 683 self.allow_insecure_mechs = True 684 self.allowed_mechs = None 685 self.sasl_enabled = True 686 self.user = None 687 self.password = None 688 Wrapper.__setattr__(self, 'subclass', self.__class__)
689
690 - def connect(self, url=None, urls=None, address=None, handler=None, reconnect=None, heartbeat=None, ssl_domain=None, **kwargs):
691 """ 692 Initiates the establishment of an AMQP connection. Returns an 693 instance of proton.Connection. 694 695 @param url: URL string of process to connect to 696 697 @param urls: list of URL strings of process to try to connect to 698 699 Only one of url or urls should be specified. 700 701 @param reconnect: A value of False will prevent the library 702 form automatically trying to reconnect if the underlying 703 socket is disconnected before the connection has been closed. 704 705 @param heartbeat: A value in milliseconds indicating the 706 desired frequency of heartbeats used to test the underlying 707 socket is alive. 708 709 @param ssl_domain: SSL configuration in the form of an 710 instance of proton.SSLdomain. 711 712 @param handler: a connection scoped handler that will be 713 called to process any events in the scope of this connection 714 or its child links 715 716 @param kwargs: sasl_enabled, which determines whether a sasl layer is 717 used for the connection; allowed_mechs an optional list of SASL 718 mechanisms to allow if sasl is enabled; allow_insecure_mechs a flag 719 indicating whether insecure mechanisms, such as PLAIN over a 720 non-encrypted socket, are allowed; 'virtual_host' the hostname to set 721 in the Open performative used by peer to determine the correct 722 back-end service for the client. If 'virtual_host' is not supplied the 723 host field from the URL is used instead." 724 725 """ 726 conn = self.connection(handler) 727 conn.container = self.container_id or str(generate_uuid()) 728 conn.offered_capabilities = kwargs.get('offered_capabilities') 729 conn.desired_capabilities = kwargs.get('desired_capabilities') 730 conn.properties = kwargs.get('properties') 731 732 connector = Connector(conn) 733 connector.allow_insecure_mechs = kwargs.get('allow_insecure_mechs', self.allow_insecure_mechs) 734 connector.allowed_mechs = kwargs.get('allowed_mechs', self.allowed_mechs) 735 connector.sasl_enabled = kwargs.get('sasl_enabled', self.sasl_enabled) 736 connector.user = kwargs.get('user', self.user) 737 connector.password = kwargs.get('password', self.password) 738 connector.virtual_host = kwargs.get('virtual_host') 739 if connector.virtual_host: 740 # only set hostname if virtual-host is a non-empty string 741 conn.hostname = connector.virtual_host 742 connector.ssl_sni = kwargs.get('sni') 743 connector.max_frame_size = kwargs.get('max_frame_size') 744 745 conn._overrides = connector 746 if url: connector.address = Urls([url]) 747 elif urls: connector.address = Urls(urls) 748 elif address: connector.address = address 749 else: raise ValueError("One of url, urls or address required") 750 if heartbeat: 751 connector.heartbeat = heartbeat 752 if reconnect: 753 connector.reconnect = reconnect 754 elif reconnect is None: 755 connector.reconnect = Backoff() 756 # use container's default client domain if none specified. This is 757 # only necessary of the URL specifies the "amqps:" scheme 758 connector.ssl_domain = ssl_domain or (self.ssl and self.ssl.client) 759 conn._session_policy = SessionPerConnection() #todo: make configurable 760 conn.open() 761 return conn
762
763 - def _get_id(self, container, remote, local):
764 if local and remote: "%s-%s-%s" % (container, remote, local) 765 elif local: return "%s-%s" % (container, local) 766 elif remote: return "%s-%s" % (container, remote) 767 else: return "%s-%s" % (container, str(generate_uuid()))
768
769 - def _get_session(self, context):
770 if isinstance(context, Url): 771 return self._get_session(self.connect(url=context)) 772 elif isinstance(context, Session): 773 return context 774 elif isinstance(context, Connection): 775 if hasattr(context, '_session_policy'): 776 return context._session_policy.session(context) 777 else: 778 return _create_session(context) 779 else: 780 return context.session()
781
782 - def create_sender(self, context, target=None, source=None, name=None, handler=None, tags=None, options=None):
783 """ 784 Initiates the establishment of a link over which messages can 785 be sent. Returns an instance of proton.Sender. 786 787 There are two patterns of use. (1) A connection can be passed 788 as the first argument, in which case the link is established 789 on that connection. In this case the target address can be 790 specified as the second argument (or as a keyword 791 argument). The source address can also be specified if 792 desired. (2) Alternatively a URL can be passed as the first 793 argument. In this case a new connection will be establised on 794 which the link will be attached. If a path is specified and 795 the target is not, then the path of the URL is used as the 796 target address. 797 798 The name of the link may be specified if desired, otherwise a 799 unique name will be generated. 800 801 Various LinkOptions can be specified to further control the 802 attachment. 803 """ 804 if isinstance(context, _compat.STRING_TYPES): 805 context = Url(context) 806 if isinstance(context, Url) and not target: 807 target = context.path 808 session = self._get_session(context) 809 snd = session.sender(name or self._get_id(session.connection.container, target, source)) 810 if source: 811 snd.source.address = source 812 if target: 813 snd.target.address = target 814 if handler != None: 815 snd.handler = handler 816 if tags: 817 snd.tag_generator = tags 818 _apply_link_options(options, snd) 819 snd.open() 820 return snd
821
822 - def create_receiver(self, context, source=None, target=None, name=None, dynamic=False, handler=None, options=None):
823 """ 824 Initiates the establishment of a link over which messages can 825 be received (aka a subscription). Returns an instance of 826 proton.Receiver. 827 828 There are two patterns of use. (1) A connection can be passed 829 as the first argument, in which case the link is established 830 on that connection. In this case the source address can be 831 specified as the second argument (or as a keyword 832 argument). The target address can also be specified if 833 desired. (2) Alternatively a URL can be passed as the first 834 argument. In this case a new connection will be establised on 835 which the link will be attached. If a path is specified and 836 the source is not, then the path of the URL is used as the 837 target address. 838 839 The name of the link may be specified if desired, otherwise a 840 unique name will be generated. 841 842 Various LinkOptions can be specified to further control the 843 attachment. 844 """ 845 if isinstance(context, _compat.STRING_TYPES): 846 context = Url(context) 847 if isinstance(context, Url) and not source: 848 source = context.path 849 session = self._get_session(context) 850 rcv = session.receiver(name or self._get_id(session.connection.container, source, target)) 851 if source: 852 rcv.source.address = source 853 if dynamic: 854 rcv.source.dynamic = True 855 if target: 856 rcv.target.address = target 857 if handler != None: 858 rcv.handler = handler 859 _apply_link_options(options, rcv) 860 rcv.open() 861 return rcv
862
863 - def declare_transaction(self, context, handler=None, settle_before_discharge=False):
864 if not _get_attr(context, '_txn_ctrl'): 865 class InternalTransactionHandler(OutgoingMessageHandler): 866 def __init__(self): 867 super(InternalTransactionHandler, self).__init__(auto_settle=True)
868 869 def on_settled(self, event): 870 if hasattr(event.delivery, "transaction"): 871 event.transaction = event.delivery.transaction 872 event.delivery.transaction.handle_outcome(event)
873 874 def on_unhandled(self, method, event): 875 if handler: 876 event.dispatch(handler) 877 878 context._txn_ctrl = self.create_sender(context, None, name='txn-ctrl', handler=InternalTransactionHandler()) 879 context._txn_ctrl.target.type = Terminus.COORDINATOR 880 context._txn_ctrl.target.capabilities.put_object(symbol(u'amqp:local-transactions')) 881 return Transaction(context._txn_ctrl, handler, settle_before_discharge) 882
883 - def listen(self, url, ssl_domain=None):
884 """ 885 Initiates a server socket, accepting incoming AMQP connections 886 on the interface and port specified. 887 """ 888 url = Url(url) 889 acceptor = self.acceptor(url.host, url.port) 890 ssl_config = ssl_domain 891 if not ssl_config and url.scheme == 'amqps': 892 # use container's default server domain 893 if self.ssl: 894 ssl_config = self.ssl.server 895 else: 896 raise SSLUnavailable("amqps: SSL libraries not found") 897 if ssl_config: 898 acceptor.set_ssl_domain(ssl_config) 899 return acceptor
900
901 - def do_work(self, timeout=None):
902 if timeout: 903 self.timeout = timeout 904 return self.process()
905