Package proton :: Module _reactor
[frames] | no frames]

Source Code for Module proton._reactor

   1  # 
   2  # Licensed to the Apache Software Foundation (ASF) under one 
   3  # or more contributor license agreements.  See the NOTICE file 
   4  # distributed with this work for additional information 
   5  # regarding copyright ownership.  The ASF licenses this file 
   6  # to you under the Apache License, Version 2.0 (the 
   7  # "License"); you may not use this file except in compliance 
   8  # with the License.  You may obtain a copy of the License at 
   9  # 
  10  #   http://www.apache.org/licenses/LICENSE-2.0 
  11  # 
  12  # Unless required by applicable law or agreed to in writing, 
  13  # software distributed under the License is distributed on an 
  14  # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
  15  # KIND, either express or implied.  See the License for the 
  16  # specific language governing permissions and limitations 
  17  # under the License. 
  18  # 
  19   
  20  from __future__ import absolute_import 
  21   
  22  #from functools import total_ordering 
  23  import heapq 
  24  import json 
  25  import logging 
  26  import re 
  27  import os 
  28  import time 
  29  import traceback 
  30  import uuid 
  31   
  32  from cproton import PN_PYREF, PN_ACCEPTED, PN_EVENT_NONE 
  33   
  34  from ._delivery import  Delivery 
  35  from ._endpoints import Connection, Endpoint, Link, Session, Terminus 
  36  from ._exceptions import SSLUnavailable 
  37  from ._data import Described, symbol, ulong 
  38  from ._message import  Message 
  39  from ._transport import Transport, SSL, SSLDomain 
  40  from ._url import Url 
  41  from ._common import isstring, unicode2utf8, utf82unicode 
  42  from ._events import Collector, EventType, EventBase, Handler, Event 
  43  from ._selectable import Selectable 
  44   
  45  from ._handlers import OutgoingMessageHandler, IOHandler 
  46   
  47  from ._io import IO, PN_INVALID_SOCKET 
  48   
  49  from . import _compat 
  50  from ._compat import queue 
  51   
  52   
  53  _logger = logging.getLogger("proton") 
54 55 56 -def _generate_uuid():
57 return uuid.uuid4()
58
59 60 -def _now():
61 return time.time()
62
63 #@total_ordering 64 -class Task(object):
65
66 - def __init__(self, reactor, deadline, handler):
67 self._deadline = deadline 68 self._handler = handler 69 self._reactor = reactor 70 self._cancelled = False
71
72 - def __lt__(self, rhs):
73 return self._deadline < rhs._deadline
74
75 - def cancel(self):
76 self._cancelled = True
77 78 @property
79 - def handler(self):
80 return self._handler
81 82 @property
83 - def container(self):
84 return self._reactor
85
86 -class TimerSelectable(Selectable):
87
88 - def __init__(self, reactor, collector):
89 super(TimerSelectable, self).__init__(None, reactor) 90 self.collect(collector) 91 collector.put(self, Event.SELECTABLE_INIT)
92
93 - def fileno(self):
94 return PN_INVALID_SOCKET
95
96 - def readable(self):
97 pass
98
99 - def writable(self):
100 pass
101
102 - def expired(self):
103 self._reactor.timer_tick() 104 self.deadline = self._reactor.timer_deadline 105 self.update()
106
107 -class Reactor(object):
108
109 - def __init__(self, *handlers, **kwargs):
110 self._previous = PN_EVENT_NONE 111 self._timeout = 0 112 self.mark() 113 self._yield = False 114 self._stop = False 115 self._collector = Collector() 116 self._selectable = None 117 self._selectables = 0 118 self._global_handler = IOHandler() 119 self._handler = Handler() 120 self._timerheap = [] 121 self._timers = 0 122 self.errors = [] 123 for h in handlers: 124 self.handler.add(h, on_error=self.on_error)
125
126 - def on_error(self, info):
127 self.errors.append(info) 128 self.yield_()
129 130 # TODO: need to make this actually return a proxy which catches exceptions and calls 131 # on error. 132 # [Or arrange another way to deal with exceptions thrown by handlers]
133 - def _make_handler(self, handler):
134 """ 135 Return a proxy handler that dispatches to the provided handler. 136 137 If handler throws an exception then on_error is called with info 138 """ 139 return handler
140
141 - def _get_global(self):
142 return self._global_handler
143
144 - def _set_global(self, handler):
145 self._global_handler = self._make_handler(handler)
146 147 global_handler = property(_get_global, _set_global) 148
149 - def _get_timeout(self):
150 return self._timeout
151
152 - def _set_timeout(self, secs):
153 self._timeout = secs
154 155 timeout = property(_get_timeout, _set_timeout) 156
157 - def yield_(self):
158 self._yield = True
159
160 - def mark(self):
161 """ This sets the reactor now instant to the current time """ 162 self._now = _now() 163 return self._now
164 165 @property
166 - def now(self):
167 return self._now
168
169 - def _get_handler(self):
170 return self._handler
171
172 - def _set_handler(self, handler):
173 self._handler = self._make_handler(handler)
174 175 handler = property(_get_handler, _set_handler) 176
177 - def run(self):
178 # TODO: Why do we timeout like this? 179 self.timeout = 3.14159265359 180 self.start() 181 while self.process(): pass 182 self.stop() 183 self.process() 184 # TODO: This isn't correct if we ever run again 185 self._global_handler = None 186 self._handler = None
187 188 # Cross thread reactor wakeup
189 - def wakeup(self):
190 # TODO: Do this with pipe and write? 191 #os.write(self._wakeup[1], "x", 1); 192 pass
193
194 - def start(self):
195 self.push_event(self, Event.REACTOR_INIT) 196 self._selectable = TimerSelectable(self, self._collector) 197 self._selectable.deadline = self.timer_deadline 198 # TODO set up fd to read for wakeups - but problematic on windows 199 #self._selectable.fileno(self._wakeup[0]) 200 #self._selectable.reading = True 201 self.update(self._selectable)
202 203 @property
204 - def quiesced(self):
205 event = self._collector.peek() 206 if not event: 207 return True 208 if self._collector.more(): 209 return False 210 return event.type is Event.REACTOR_QUIESCED
211
212 - def _check_errors(self):
213 """ This """ 214 if self.errors: 215 for exc, value, tb in self.errors[:-1]: 216 traceback.print_exception(exc, value, tb) 217 exc, value, tb = self.errors[-1] 218 _compat.raise_(exc, value, tb)
219
220 - def process(self):
221 # result = pn_reactor_process(self._impl) 222 # self._check_errors() 223 # return result 224 self.mark() 225 previous = PN_EVENT_NONE 226 while True: 227 if self._yield: 228 self._yield = False 229 _logger.debug('%s Yielding', self) 230 return True 231 event = self._collector.peek() 232 if event: 233 _logger.debug('%s recvd Event: %r', self, event) 234 type = event.type 235 236 # regular handler 237 handler = event.handler or self._handler 238 event.dispatch(handler) 239 240 event.dispatch(self._global_handler) 241 242 previous = type 243 self._previous = type 244 self._collector.pop() 245 elif not self._stop and (self._timers > 0 or self._selectables > 1): 246 if previous is not Event.REACTOR_QUIESCED and self._previous is not Event.REACTOR_FINAL: 247 self.push_event(self, Event.REACTOR_QUIESCED) 248 self.yield_() 249 else: 250 if self._selectable: 251 self._selectable.terminate() 252 self.update(self._selectable) 253 self._selectable = None 254 else: 255 if self._previous is not Event.REACTOR_FINAL: 256 self.push_event(self, Event.REACTOR_FINAL) 257 _logger.debug('%s Stopping', self) 258 return False
259
260 - def stop(self):
261 self._stop = True 262 self._check_errors()
263
264 - def stop_events(self):
265 self._collector.release()
266
267 - def schedule(self, delay, handler):
268 himpl = self._make_handler(handler) 269 task = Task(self, self._now+delay, himpl) 270 heapq.heappush(self._timerheap, task) 271 self._timers += 1 272 deadline = self._timerheap[0]._deadline 273 if self._selectable: 274 self._selectable.deadline = deadline 275 self.update(self._selectable) 276 return task
277
278 - def timer_tick(self):
279 while self._timers > 0: 280 t = self._timerheap[0] 281 if t._cancelled: 282 heapq.heappop(self._timerheap) 283 self._timers -= 1 284 elif t._deadline > self._now: 285 return 286 else: 287 heapq.heappop(self._timerheap) 288 self._timers -= 1 289 self.push_event(t, Event.TIMER_TASK)
290 291 @property
292 - def timer_deadline(self):
293 while self._timers > 0: 294 t = self._timerheap[0] 295 if t._cancelled: 296 heapq.heappop(self._timerheap) 297 self._timers -= 1 298 else: 299 return t._deadline 300 return None
301
302 - def acceptor(self, host, port, handler=None):
303 impl = self._make_handler(handler) 304 a = Acceptor(self, unicode2utf8(host), int(port), impl) 305 if a: 306 return a 307 else: 308 raise IOError("%s (%s:%s)" % (str(self.errors), host, port))
309
310 - def connection(self, handler=None):
311 """Deprecated: use connection_to_host() instead 312 """ 313 impl = self._make_handler(handler) 314 result = Connection() 315 if impl: 316 result.handler = impl 317 result._reactor = self 318 result.collect(self._collector) 319 return result
320
321 - def connection_to_host(self, host, port, handler=None):
322 """Create an outgoing Connection that will be managed by the reactor. 323 The reactor's pn_iohandler will create a socket connection to the host 324 once the connection is opened. 325 """ 326 conn = self.connection(handler) 327 self.set_connection_host(conn, host, port) 328 return conn
329
330 - def set_connection_host(self, connection, host, port):
331 """Change the address used by the connection. The address is 332 used by the reactor's iohandler to create an outgoing socket 333 connection. This must be set prior to opening the connection. 334 """ 335 connection.set_address(host, port)
336
337 - def get_connection_address(self, connection):
338 """This may be used to retrieve the remote peer address. 339 @return: string containing the address in URL format or None if no 340 address is available. Use the proton.Url class to create a Url object 341 from the returned value. 342 """ 343 _url = connection.get_address() 344 return utf82unicode(_url)
345
346 - def selectable(self, handler=None, delegate=None):
347 if delegate is None: 348 delegate = handler 349 result = Selectable(delegate, self) 350 result.collect(self._collector) 351 result.handler = handler 352 result.push_event(result, Event.SELECTABLE_INIT) 353 return result
354
355 - def update(self, selectable):
357
358 - def push_event(self, obj, etype):
359 self._collector.put(obj, etype)
360
361 362 -class EventInjector(object):
363 """ 364 Can be added to a reactor to allow events to be triggered by an 365 external thread but handled on the event thread associated with 366 the reactor. An instance of this class can be passed to the 367 Reactor.selectable() method of the reactor in order to activate 368 it. The close() method should be called when it is no longer 369 needed, to allow the event loop to end if needed. 370 """ 371
372 - def __init__(self):
373 self.queue = queue.Queue() 374 self.pipe = os.pipe() 375 self._transport = None 376 self._closed = False
377
378 - def trigger(self, event):
379 """ 380 Request that the given event be dispatched on the event thread 381 of the reactor to which this EventInjector was added. 382 """ 383 self.queue.put(event) 384 os.write(self.pipe[1], b"!")
385
386 - def close(self):
387 """ 388 Request that this EventInjector be closed. Existing events 389 will be dispatched on the reactors event dispatch thread, 390 then this will be removed from the set of interest. 391 """ 392 self._closed = True 393 os.write(self.pipe[1], b"!")
394
395 - def fileno(self):
396 return self.pipe[0]
397
398 - def on_selectable_init(self, event):
399 sel = event.context 400 #sel.fileno(self.fileno()) 401 sel.reading = True 402 sel.update()
403
404 - def on_selectable_readable(self, event):
405 s = event.context 406 os.read(self.pipe[0], 512) 407 while not self.queue.empty(): 408 requested = self.queue.get() 409 s.push_event(requested.context, requested.type) 410 if self._closed: 411 s.terminate() 412 s.update()
413
414 415 -class ApplicationEvent(EventBase):
416 """ 417 Application defined event, which can optionally be associated with 418 an engine object and or an arbitrary subject 419 """ 420
421 - def __init__(self, typename, connection=None, session=None, link=None, delivery=None, subject=None):
422 super(ApplicationEvent, self).__init__(EventType(typename)) 423 self.clazz = PN_PYREF 424 self.connection = connection 425 self.session = session 426 self.link = link 427 self.delivery = delivery 428 if self.delivery: 429 self.link = self.delivery.link 430 if self.link: 431 self.session = self.link.session 432 if self.session: 433 self.connection = self.session.connection 434 self.subject = subject
435 436 @property
437 - def context(self):
438 return self
439
440 - def __repr__(self):
441 objects = [self.connection, self.session, self.link, self.delivery, self.subject] 442 return "%s(%s)" % (self.type, ", ".join([str(o) for o in objects if o is not None]))
443
444 445 -class Transaction(object):
446 """ 447 Class to track state of an AMQP 1.0 transaction. 448 """ 449
450 - def __init__(self, txn_ctrl, handler, settle_before_discharge=False):
451 self.txn_ctrl = txn_ctrl 452 self.handler = handler 453 self.id = None 454 self._declare = None 455 self._discharge = None 456 self.failed = False 457 self._pending = [] 458 self.settle_before_discharge = settle_before_discharge 459 self.declare()
460
461 - def commit(self):
462 self.discharge(False)
463
464 - def abort(self):
465 self.discharge(True)
466
467 - def declare(self):
468 self._declare = self._send_ctrl(symbol(u'amqp:declare:list'), [None])
469
470 - def discharge(self, failed):
471 self.failed = failed 472 self._discharge = self._send_ctrl(symbol(u'amqp:discharge:list'), [self.id, failed])
473
474 - def _send_ctrl(self, descriptor, value):
475 delivery = self.txn_ctrl.send(Message(body=Described(descriptor, value))) 476 delivery.transaction = self 477 return delivery
478
479 - def send(self, sender, msg, tag=None):
480 dlv = sender.send(msg, tag=tag) 481 dlv.local.data = [self.id] 482 dlv.update(0x34) 483 return dlv
484
485 - def accept(self, delivery):
486 self.update(delivery, PN_ACCEPTED) 487 if self.settle_before_discharge: 488 delivery.settle() 489 else: 490 self._pending.append(delivery)
491
492 - def update(self, delivery, state=None):
493 if state: 494 delivery.local.data = [self.id, Described(ulong(state), [])] 495 delivery.update(0x34)
496
497 - def _release_pending(self):
498 for d in self._pending: 499 d.update(Delivery.RELEASED) 500 d.settle() 501 self._clear_pending()
502
503 - def _clear_pending(self):
504 self._pending = []
505
506 - def handle_outcome(self, event):
507 if event.delivery == self._declare: 508 if event.delivery.remote.data: 509 self.id = event.delivery.remote.data[0] 510 self.handler.on_transaction_declared(event) 511 elif event.delivery.remote_state == Delivery.REJECTED: 512 self.handler.on_transaction_declare_failed(event) 513 else: 514 _logger.warning("Unexpected outcome for declare: %s" % event.delivery.remote_state) 515 self.handler.on_transaction_declare_failed(event) 516 elif event.delivery == self._discharge: 517 if event.delivery.remote_state == Delivery.REJECTED: 518 if not self.failed: 519 self.handler.on_transaction_commit_failed(event) 520 self._release_pending() # make this optional? 521 else: 522 if self.failed: 523 self.handler.on_transaction_aborted(event) 524 self._release_pending() 525 else: 526 self.handler.on_transaction_committed(event) 527 self._clear_pending()
528
529 530 -class LinkOption(object):
531 """ 532 Abstract interface for link configuration options 533 """ 534
535 - def apply(self, link):
536 """ 537 Subclasses will implement any configuration logic in this 538 method 539 """ 540 pass
541
542 - def test(self, link):
543 """ 544 Subclasses can override this to selectively apply an option 545 e.g. based on some link criteria 546 """ 547 return True
548
549 550 -class AtMostOnce(LinkOption):
551 - def apply(self, link):
553
554 555 -class AtLeastOnce(LinkOption):
556 - def apply(self, link):
559
560 561 -class SenderOption(LinkOption):
562 - def apply(self, sender): pass
563
564 - def test(self, link): return link.is_sender
565
566 567 -class ReceiverOption(LinkOption):
568 - def apply(self, receiver): pass
569
570 - def test(self, link): return link.is_receiver
571
572 573 -class DynamicNodeProperties(LinkOption):
574 - def __init__(self, props={}):
575 self.properties = {} 576 for k in props: 577 if isinstance(k, symbol): 578 self.properties[k] = props[k] 579 else: 580 self.properties[symbol(k)] = props[k]
581
582 - def apply(self, link):
587
588 589 -class Filter(ReceiverOption):
590 - def __init__(self, filter_set={}):
591 self.filter_set = filter_set
592
593 - def apply(self, receiver):
594 receiver.source.filter.put_dict(self.filter_set)
595
596 597 -class Selector(Filter):
598 """ 599 Configures a link with a message selector filter 600 """ 601
602 - def __init__(self, value, name='selector'):
603 super(Selector, self).__init__({symbol(name): Described(symbol('apache.org:selector-filter:string'), utf82unicode(value))})
604
605 606 -class DurableSubscription(ReceiverOption):
607 - def apply(self, receiver):
610
611 612 -class Move(ReceiverOption):
613 - def apply(self, receiver):
615
616 617 -class Copy(ReceiverOption):
618 - def apply(self, receiver):
620 629
630 631 -def _create_session(connection, handler=None):
632 session = connection.session() 633 session.open() 634 return session
635
636 637 -def _get_attr(target, name):
638 if hasattr(target, name): 639 return getattr(target, name) 640 else: 641 return None
642
643 644 -class SessionPerConnection(object):
645 - def __init__(self):
646 self._default_session = None
647
648 - def session(self, connection):
649 if not self._default_session: 650 self._default_session = _create_session(connection) 651 return self._default_session
652
653 654 -class GlobalOverrides(Handler):
655 """ 656 Internal handler that triggers the necessary socket connect for an 657 opened connection. 658 """ 659
660 - def __init__(self, base):
661 self.base = base
662
663 - def on_unhandled(self, name, event):
664 if not self._override(event): 665 event.dispatch(self.base)
666
667 - def _override(self, event):
668 conn = event.connection 669 return conn and hasattr(conn, '_overrides') and event.dispatch(conn._overrides)
670
671 672 -class Acceptor(Handler):
673
674 - def __init__(self, reactor, host, port, handler=None):
675 self._ssl_domain = None 676 self._reactor = reactor 677 self._handler = handler 678 sock = IO.listen(host, port) 679 s = reactor.selectable(handler=self, delegate=sock) 680 s.reading = True 681 s._transport = None 682 self._selectable = s 683 reactor.update(s)
684
685 - def set_ssl_domain(self, ssl_domain):
686 self._ssl_domain = ssl_domain
687
688 - def close(self):
689 if not self._selectable.is_terminal: 690 IO.close(self._selectable) 691 self._selectable.terminate() 692 self._reactor.update(self._selectable)
693
694 - def on_selectable_readable(self, event):
695 s = event.selectable 696 697 sock, name = IO.accept(self._selectable) 698 _logger.debug("Accepted connection from %s", name) 699 700 r = self._reactor 701 handler = self._handler or r.handler 702 c = r.connection(handler) 703 c._acceptor = self 704 c.url = Url(host=name[0], port=name[1]) 705 t = Transport(Transport.SERVER) 706 if self._ssl_domain: 707 t.ssl(self._ssl_domain) 708 t.bind(c) 709 710 s = r.selectable(delegate=sock) 711 s._transport = t 712 t._selectable = s 713 IOHandler.update(t, s, r.now)
714
715 -class Connector(Handler):
716 """ 717 Internal handler that triggers the necessary socket connect for an 718 opened connection. 719 """ 720
721 - def __init__(self, connection):
722 self.connection = connection 723 self.address = None 724 self.heartbeat = None 725 self.reconnect = None 726 self.ssl_domain = None 727 self.allow_insecure_mechs = True 728 self.allowed_mechs = None 729 self.sasl_enabled = True 730 self.user = None 731 self.password = None 732 self.virtual_host = None 733 self.ssl_sni = None 734 self.max_frame_size = None
735
736 - def _connect(self, connection):
737 url = self.address.next() 738 connection.url = url 739 # if virtual-host not set, use host from address as default 740 if self.virtual_host is None: 741 connection.hostname = url.host 742 _logger.debug("connecting to %r..." % url) 743 744 transport = Transport() 745 if self.sasl_enabled: 746 sasl = transport.sasl() 747 sasl.allow_insecure_mechs = self.allow_insecure_mechs 748 if url.username: 749 connection.user = url.username 750 elif self.user: 751 connection.user = self.user 752 if url.password: 753 connection.password = url.password 754 elif self.password: 755 connection.password = self.password 756 if self.allowed_mechs: 757 sasl.allowed_mechs(self.allowed_mechs) 758 transport.bind(connection) 759 if self.heartbeat: 760 transport.idle_timeout = self.heartbeat 761 if url.scheme == 'amqps': 762 if not self.ssl_domain: 763 raise SSLUnavailable("amqps: SSL libraries not found") 764 self.ssl = SSL(transport, self.ssl_domain) 765 self.ssl.peer_hostname = self.ssl_sni or self.virtual_host or url.host 766 if self.max_frame_size: 767 transport.max_frame_size = self.max_frame_size
768
769 - def on_connection_local_open(self, event):
770 self._connect(event.connection)
771
772 - def on_connection_remote_open(self, event):
773 _logger.debug("connected to %s" % event.connection.hostname) 774 if self.reconnect: 775 self.reconnect.reset()
776
777 - def on_transport_tail_closed(self, event):
778 event.transport.close_head()
779
780 - def on_transport_closed(self, event):
781 if self.connection is None: return 782 if self.connection.state & Endpoint.LOCAL_ACTIVE: 783 if self.reconnect: 784 event.transport.unbind() 785 delay = self.reconnect.next() 786 if delay == 0: 787 _logger.info("Disconnected, reconnecting...") 788 self._connect(self.connection) 789 return 790 else: 791 _logger.info("Disconnected will try to reconnect after %s seconds" % delay) 792 event.reactor.schedule(delay, self) 793 return 794 else: 795 _logger.debug("Disconnected") 796 # See connector.cpp: conn.free()/pn_connection_release() here? 797 self.connection = None
798
799 - def on_timer_task(self, event):
800 self._connect(self.connection)
801
802 803 -class Backoff(object):
804 """ 805 A reconnect strategy involving an increasing delay between 806 retries, up to a maximum or 10 seconds. 807 """ 808
809 - def __init__(self):
810 self.delay = 0
811
812 - def reset(self):
813 self.delay = 0
814
815 - def next(self):
816 current = self.delay 817 if current == 0: 818 self.delay = 0.1 819 else: 820 self.delay = min(10, 2 * current) 821 return current
822
823 824 -class Urls(object):
825 - def __init__(self, values):
826 self.values = [Url(v) for v in values] 827 self.i = iter(self.values)
828
829 - def __iter__(self):
830 return self
831
832 - def next(self):
833 try: 834 return next(self.i) 835 except StopIteration: 836 self.i = iter(self.values) 837 return next(self.i)
838
839 840 -class SSLConfig(object):
841 - def __init__(self):
842 self.client = SSLDomain(SSLDomain.MODE_CLIENT) 843 self.server = SSLDomain(SSLDomain.MODE_SERVER)
844
845 - def set_credentials(self, cert_file, key_file, password):
846 self.client.set_credentials(cert_file, key_file, password) 847 self.server.set_credentials(cert_file, key_file, password)
848
849 - def set_trusted_ca_db(self, certificate_db):
850 self.client.set_trusted_ca_db(certificate_db) 851 self.server.set_trusted_ca_db(certificate_db)
852
853 -def _find_config_file():
854 confname = 'connect.json' 855 confpath = ['.', '~/.config/messaging','/etc/messaging'] 856 for d in confpath: 857 f = os.path.join(d, confname) 858 if os.path.isfile(f): 859 return f 860 return None
861
862 -def _get_default_config():
863 conf = os.environ.get('MESSAGING_CONNECT_FILE') or _find_config_file() 864 if conf and os.path.isfile(conf): 865 with open(conf, 'r') as f: 866 json_text = f.read() 867 json_text = _strip_json_comments(json_text) 868 return json.loads(json_text) 869 else: 870 return {}
871
872 -def _strip_json_comments(json_text):
873 """This strips c-style comments from text, taking into account '/*comments*/' and '//comments' 874 nested inside a string etc.""" 875 def replacer(match): 876 s = match.group(0) 877 if s.startswith('/'): 878 return " " # note: a space and not an empty string 879 else: 880 return s
881 pattern = re.compile(r'//.*?$|/\*.*?\*/|\'(?:\\.|[^\\\'])*\'|"(?:\\.|[^\\"])*"', re.DOTALL | re.MULTILINE) 882 return re.sub(pattern, replacer, json_text) 883
884 -def _get_default_port_for_scheme(scheme):
885 if scheme == 'amqps': 886 return 5671 887 else: 888 return 5672
889
890 -class Container(Reactor):
891 """A representation of the AMQP concept of a 'container', which 892 loosely speaking is something that establishes links to or from 893 another container, over which messages are transfered. This is 894 an extension to the Reactor class that adds convenience methods 895 for creating connections and sender- or receiver- links. 896 """ 897
898 - def __init__(self, *handlers, **kwargs):
899 super(Container, self).__init__(*handlers, **kwargs) 900 if "impl" not in kwargs: 901 try: 902 self.ssl = SSLConfig() 903 except SSLUnavailable: 904 self.ssl = None 905 self.global_handler = GlobalOverrides(kwargs.get('global_handler', self.global_handler)) 906 self.trigger = None 907 self.container_id = str(_generate_uuid()) 908 self.allow_insecure_mechs = True 909 self.allowed_mechs = None 910 self.sasl_enabled = True 911 self.user = None 912 self.password = None
913
914 - def connect(self, url=None, urls=None, address=None, handler=None, reconnect=None, heartbeat=None, ssl_domain=None, 915 **kwargs):
916 """ 917 Initiates the establishment of an AMQP connection. Returns an 918 instance of proton.Connection. 919 920 @param url: URL string of process to connect to 921 922 @param urls: list of URL strings of process to try to connect to 923 924 Only one of url or urls should be specified. 925 926 @param reconnect: Reconnect is enabled by default. You can 927 pass in an instance of Backoff to control reconnect behavior. 928 A value of False will prevent the library from automatically 929 trying to reconnect if the underlying socket is disconnected 930 before the connection has been closed. 931 932 @param heartbeat: A value in milliseconds indicating the 933 desired frequency of heartbeats used to test the underlying 934 socket is alive. 935 936 @param ssl_domain: SSL configuration in the form of an 937 instance of proton.SSLDomain. 938 939 @param handler: a connection scoped handler that will be 940 called to process any events in the scope of this connection 941 or its child links 942 943 @param kwargs: 'sasl_enabled', which determines whether a sasl 944 layer is used for the connection. 'allowed_mechs', an optional 945 string specifying the SASL mechanisms allowed for this 946 connection; the value is a space-separated list of mechanism 947 names; the mechanisms allowed by default are determined by 948 your SASL library and system configuration, with two 949 exceptions: GSSAPI and GSS-SPNEGO are disabled by default; to 950 enable them, you must explicitly add them using this option; 951 clients must set the allowed mechanisms before the the 952 outgoing connection is attempted; servers must set them before 953 the listening connection is setup. 'allow_insecure_mechs', a 954 flag indicating whether insecure mechanisms, such as PLAIN 955 over a non-encrypted socket, are allowed. 'virtual_host', the 956 hostname to set in the Open performative used by peer to 957 determine the correct back-end service for the client; if 958 'virtual_host' is not supplied the host field from the URL is 959 used instead. 'user', the user to authenticate. 'password', 960 the authentication secret. 961 962 """ 963 if not url and not urls and not address: 964 config = _get_default_config() 965 scheme = config.get('scheme', 'amqp') 966 _url = "%s://%s:%s" % (scheme, config.get('host', 'localhost'), config.get('port', _get_default_port_for_scheme(scheme))) 967 _ssl_domain = None 968 _kwargs = kwargs 969 if config.get('user'): 970 _kwargs['user'] = config.get('user') 971 if config.get('password'): 972 _kwargs['password'] = config.get('password') 973 sasl_config = config.get('sasl', {}) 974 _kwargs['sasl_enabled'] = sasl_config.get('enabled', True) 975 if sasl_config.get('mechanisms'): 976 _kwargs['allowed_mechs'] = sasl_config.get('mechanisms') 977 tls_config = config.get('tls', {}) 978 if scheme == 'amqps': 979 _ssl_domain = SSLDomain(SSLDomain.MODE_CLIENT) 980 ca = tls_config.get('ca') 981 cert = tls_config.get('cert') 982 key = tls_config.get('key') 983 if ca: 984 _ssl_domain.set_trusted_ca_db(str(ca)) 985 if tls_config.get('verify', True): 986 _ssl_domain.set_peer_authentication(SSLDomain.VERIFY_PEER_NAME, str(ca)) 987 if cert and key: 988 _ssl_domain.set_credentials(str(cert), str(key), None) 989 990 return self._connect(_url, handler=handler, reconnect=reconnect, heartbeat=heartbeat, ssl_domain=_ssl_domain, **_kwargs) 991 else: 992 return self._connect(url=url, urls=urls, handler=handler, reconnect=reconnect, heartbeat=heartbeat, ssl_domain=ssl_domain, **kwargs)
993
994 - def _connect(self, url=None, urls=None, address=None, handler=None, reconnect=None, heartbeat=None, ssl_domain=None, **kwargs):
995 conn = self.connection(handler) 996 conn.container = self.container_id or str(_generate_uuid()) 997 conn.offered_capabilities = kwargs.get('offered_capabilities') 998 conn.desired_capabilities = kwargs.get('desired_capabilities') 999 conn.properties = kwargs.get('properties') 1000 1001 connector = Connector(conn) 1002 connector.allow_insecure_mechs = kwargs.get('allow_insecure_mechs', self.allow_insecure_mechs) 1003 connector.allowed_mechs = kwargs.get('allowed_mechs', self.allowed_mechs) 1004 connector.sasl_enabled = kwargs.get('sasl_enabled', self.sasl_enabled) 1005 connector.user = kwargs.get('user', self.user) 1006 connector.password = kwargs.get('password', self.password) 1007 connector.virtual_host = kwargs.get('virtual_host') 1008 if connector.virtual_host: 1009 # only set hostname if virtual-host is a non-empty string 1010 conn.hostname = connector.virtual_host 1011 connector.ssl_sni = kwargs.get('sni') 1012 connector.max_frame_size = kwargs.get('max_frame_size') 1013 1014 conn._overrides = connector 1015 if url: 1016 connector.address = Urls([url]) 1017 elif urls: 1018 connector.address = Urls(urls) 1019 elif address: 1020 connector.address = address 1021 else: 1022 raise ValueError("One of url, urls or address required") 1023 if heartbeat: 1024 connector.heartbeat = heartbeat 1025 if reconnect: 1026 connector.reconnect = reconnect 1027 elif reconnect is None: 1028 connector.reconnect = Backoff() 1029 # use container's default client domain if none specified. This is 1030 # only necessary of the URL specifies the "amqps:" scheme 1031 connector.ssl_domain = ssl_domain or (self.ssl and self.ssl.client) 1032 conn._session_policy = SessionPerConnection() # todo: make configurable 1033 conn.open() 1034 return conn
1035
1036 - def _get_id(self, container, remote, local):
1037 if local and remote: 1038 "%s-%s-%s" % (container, remote, local) 1039 elif local: 1040 return "%s-%s" % (container, local) 1041 elif remote: 1042 return "%s-%s" % (container, remote) 1043 else: 1044 return "%s-%s" % (container, str(_generate_uuid()))
1045
1046 - def _get_session(self, context):
1047 if isinstance(context, Url): 1048 return self._get_session(self.connect(url=context)) 1049 elif isinstance(context, Session): 1050 return context 1051 elif isinstance(context, Connection): 1052 if hasattr(context, '_session_policy'): 1053 return context._session_policy.session(context) 1054 else: 1055 return _create_session(context) 1056 else: 1057 return context.session()
1058
1059 - def create_sender(self, context, target=None, source=None, name=None, handler=None, tags=None, options=None):
1060 """ 1061 Initiates the establishment of a link over which messages can 1062 be sent. Returns an instance of proton.Sender. 1063 1064 There are two patterns of use. (1) A connection can be passed 1065 as the first argument, in which case the link is established 1066 on that connection. In this case the target address can be 1067 specified as the second argument (or as a keyword 1068 argument). The source address can also be specified if 1069 desired. (2) Alternatively a URL can be passed as the first 1070 argument. In this case a new connection will be established on 1071 which the link will be attached. If a path is specified and 1072 the target is not, then the path of the URL is used as the 1073 target address. 1074 1075 The name of the link may be specified if desired, otherwise a 1076 unique name will be generated. 1077 1078 Various LinkOptions can be specified to further control the 1079 attachment. 1080 """ 1081 if isstring(context): 1082 context = Url(context) 1083 if isinstance(context, Url) and not target: 1084 target = context.path 1085 session = self._get_session(context) 1086 snd = session.sender(name or self._get_id(session.connection.container, target, source)) 1087 if source: 1088 snd.source.address = source 1089 if target: 1090 snd.target.address = target 1091 if handler is not None: 1092 snd.handler = handler 1093 if tags: 1094 snd.tag_generator = tags 1095 _apply_link_options(options, snd) 1096 snd.open() 1097 return snd
1098
1099 - def create_receiver(self, context, source=None, target=None, name=None, dynamic=False, handler=None, options=None):
1100 """ 1101 Initiates the establishment of a link over which messages can 1102 be received (aka a subscription). Returns an instance of 1103 proton.Receiver. 1104 1105 There are two patterns of use. (1) A connection can be passed 1106 as the first argument, in which case the link is established 1107 on that connection. In this case the source address can be 1108 specified as the second argument (or as a keyword 1109 argument). The target address can also be specified if 1110 desired. (2) Alternatively a URL can be passed as the first 1111 argument. In this case a new connection will be established on 1112 which the link will be attached. If a path is specified and 1113 the source is not, then the path of the URL is used as the 1114 target address. 1115 1116 The name of the link may be specified if desired, otherwise a 1117 unique name will be generated. 1118 1119 Various LinkOptions can be specified to further control the 1120 attachment. 1121 """ 1122 if isstring(context): 1123 context = Url(context) 1124 if isinstance(context, Url) and not source: 1125 source = context.path 1126 session = self._get_session(context) 1127 rcv = session.receiver(name or self._get_id(session.connection.container, source, target)) 1128 if source: 1129 rcv.source.address = source 1130 if dynamic: 1131 rcv.source.dynamic = True 1132 if target: 1133 rcv.target.address = target 1134 if handler is not None: 1135 rcv.handler = handler 1136 _apply_link_options(options, rcv) 1137 rcv.open() 1138 return rcv
1139
1140 - def declare_transaction(self, context, handler=None, settle_before_discharge=False):
1141 if not _get_attr(context, '_txn_ctrl'): 1142 class InternalTransactionHandler(OutgoingMessageHandler): 1143 def __init__(self): 1144 super(InternalTransactionHandler, self).__init__(auto_settle=True)
1145 1146 def on_settled(self, event): 1147 if hasattr(event.delivery, "transaction"): 1148 event.transaction = event.delivery.transaction 1149 event.delivery.transaction.handle_outcome(event)
1150 1151 def on_unhandled(self, method, event): 1152 if handler: 1153 event.dispatch(handler) 1154 1155 context._txn_ctrl = self.create_sender(context, None, name='txn-ctrl', handler=InternalTransactionHandler()) 1156 context._txn_ctrl.target.type = Terminus.COORDINATOR 1157 context._txn_ctrl.target.capabilities.put_object(symbol(u'amqp:local-transactions')) 1158 return Transaction(context._txn_ctrl, handler, settle_before_discharge) 1159
1160 - def listen(self, url, ssl_domain=None):
1161 """ 1162 Initiates a server socket, accepting incoming AMQP connections 1163 on the interface and port specified. 1164 """ 1165 url = Url(url) 1166 acceptor = self.acceptor(url.host, url.port) 1167 ssl_config = ssl_domain 1168 if not ssl_config and url.scheme == 'amqps': 1169 # use container's default server domain 1170 if self.ssl: 1171 ssl_config = self.ssl.server 1172 else: 1173 raise SSLUnavailable("amqps: SSL libraries not found") 1174 if ssl_config: 1175 acceptor.set_ssl_domain(ssl_config) 1176 return acceptor
1177
1178 - def do_work(self, timeout=None):
1179 if timeout: 1180 self.timeout = timeout 1181 return self.process()
1182