Package qpid :: Package messaging :: Module driver
[frames] | no frames]

Source Code for Module qpid.messaging.driver

   1  # 
   2  # Licensed to the Apache Software Foundation (ASF) under one 
   3  # or more contributor license agreements.  See the NOTICE file 
   4  # distributed with this work for additional information 
   5  # regarding copyright ownership.  The ASF licenses this file 
   6  # to you under the Apache License, Version 2.0 (the 
   7  # "License"); you may not use this file except in compliance 
   8  # with the License.  You may obtain a copy of the License at 
   9  # 
  10  #   http://www.apache.org/licenses/LICENSE-2.0 
  11  # 
  12  # Unless required by applicable law or agreed to in writing, 
  13  # software distributed under the License is distributed on an 
  14  # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
  15  # KIND, either express or implied.  See the License for the 
  16  # specific language governing permissions and limitations 
  17  # under the License. 
  18  # 
  19   
  20  import socket, struct, sys, time 
  21  from logging import getLogger, DEBUG 
  22  from qpid import compat 
  23  from qpid import sasl 
  24  from qpid.concurrency import synchronized 
  25  from qpid.datatypes import RangedSet, Serial 
  26  from qpid.framing import OpEncoder, SegmentEncoder, FrameEncoder, \ 
  27      FrameDecoder, SegmentDecoder, OpDecoder 
  28  from qpid.messaging import address, transports 
  29  from qpid.messaging.constants import UNLIMITED, REJECTED, RELEASED 
  30  from qpid.messaging.exceptions import * 
  31  from qpid.messaging.message import get_codec, Disposition, Message 
  32  from qpid.messaging.endpoints import MangledString 
  33  from qpid.ops import * 
  34  from qpid.selector import Selector 
  35  from qpid.util import URL, default,get_client_properties_with_defaults 
  36  from qpid.validator import And, Context, List, Map, Types, Values 
  37  from threading import Condition, Thread 
  38   
  39  log = getLogger("qpid.messaging") 
  40  rawlog = getLogger("qpid.messaging.io.raw") 
  41  opslog = getLogger("qpid.messaging.io.ops") 
42 43 -def addr2reply_to(addr):
44 name, subject, options = address.parse(addr) 45 if options: 46 type = options.get("node", {}).get("type") 47 else: 48 type = None 49 50 if type == "topic": 51 return ReplyTo(name, subject) 52 else: 53 return ReplyTo(None, name)
54
55 -def reply_to2addr(reply_to):
56 if reply_to.exchange in (None, ""): 57 return reply_to.routing_key 58 elif reply_to.routing_key is None: 59 return "%s; {node: {type: topic}}" % reply_to.exchange 60 else: 61 return "%s/%s; {node: {type: topic}}" % (reply_to.exchange, reply_to.routing_key)
62
63 -class Attachment:
64
65 - def __init__(self, target):
66 self.target = target
67 68 # XXX 69 70 DURABLE_DEFAULT=False
71 72 # XXX 73 74 -class Pattern:
75 """ 76 The pattern filter matches the supplied wildcard pattern against a 77 message subject. 78 """ 79
80 - def __init__(self, value):
81 self.value = value
82 83 # XXX: this should become part of the driver
84 - def _bind(self, sst, exchange, queue):
85 from qpid.ops import ExchangeBind 86 87 sst.write_cmd(ExchangeBind(exchange=exchange, queue=queue, 88 binding_key=self.value.replace("*", "#")))
89 90 SUBJECT_DEFAULTS = { 91 "topic": "#" 92 }
93 94 -def noop(): pass
95 -def sync_noop(): pass
96
97 -class SessionState:
98
99 - def __init__(self, driver, session, name, channel):
100 self.driver = driver 101 self.session = session 102 self.name = name 103 self.channel = channel 104 self.detached = False 105 self.committing = False 106 self.aborting = False 107 108 # sender state 109 self.sent = Serial(0) 110 self.acknowledged = RangedSet() 111 self.actions = {} 112 self.min_completion = self.sent 113 self.max_completion = self.sent 114 self.results = {} 115 self.need_sync = False 116 117 # receiver state 118 self.received = None 119 self.executed = RangedSet() 120 121 # XXX: need to periodically exchange completion/known_completion 122 123 self.destinations = {}
124
125 - def write_query(self, query, handler, obj):
126 id = self.sent 127 self.write_cmd(query, lambda: handler(self.results.pop(id), obj))
128
129 - def apply_overrides(self, cmd, overrides):
130 for k, v in overrides.items(): 131 cmd[k.replace('-', '_')] = v
132
133 - def write_cmd(self, cmd, action=noop, overrides=None, sync=True):
134 if overrides: 135 self.apply_overrides(cmd, overrides) 136 137 if action != noop: 138 cmd.sync = sync 139 if self.detached: 140 raise Exception("detached") 141 cmd.id = self.sent 142 self.sent += 1 143 self.actions[cmd.id] = action 144 self.max_completion = cmd.id 145 self.write_op(cmd) 146 self.need_sync = not cmd.sync
147
148 - def write_cmds(self, cmds, action=noop):
149 if cmds: 150 for cmd in cmds[:-1]: 151 self.write_cmd(cmd) 152 self.write_cmd(cmds[-1], action) 153 else: 154 action()
155
156 - def write_op(self, op):
157 op.channel = self.channel 158 self.driver.write_op(op)
159 160 POLICIES = Values("always", "sender", "receiver", "never") 161 RELIABILITY = Values("unreliable", "at-most-once", "at-least-once", 162 "exactly-once") 163 164 DECLARE = Map({}, restricted=False) 165 BINDINGS = List(Map({ 166 "exchange": Types(basestring), 167 "queue": Types(basestring), 168 "key": Types(basestring), 169 "arguments": Map({}, restricted=False) 170 })) 171 172 COMMON_OPTS = { 173 "create": POLICIES, 174 "delete": POLICIES, 175 "assert": POLICIES, 176 "node": Map({ 177 "type": Values("queue", "topic"), 178 "durable": Types(bool), 179 "x-declare": DECLARE, 180 "x-bindings": BINDINGS 181 }), 182 "link": Map({ 183 "name": Types(basestring), 184 "durable": Types(bool), 185 "reliability": RELIABILITY, 186 "x-declare": DECLARE, 187 "x-bindings": BINDINGS, 188 "x-subscribe": Map({}, restricted=False) 189 }) 190 } 191 192 RECEIVE_MODES = Values("browse", "consume") 193 194 SOURCE_OPTS = COMMON_OPTS.copy() 195 SOURCE_OPTS.update({ 196 "mode": RECEIVE_MODES 197 }) 198 199 TARGET_OPTS = COMMON_OPTS.copy()
200 201 -class LinkIn:
202 203 ADDR_NAME = "source" 204 DIR_NAME = "receiver" 205 VALIDATOR = Map(SOURCE_OPTS) 206 213 257 273
276
277 -class LinkOut:
278 279 ADDR_NAME = "target" 280 DIR_NAME = "sender" 281 VALIDATOR = Map(TARGET_OPTS) 282 286 300 303
306
307 -class Cache:
308
309 - def __init__(self, ttl):
310 self.ttl = ttl 311 self.entries = {}
312
313 - def __setitem__(self, key, value):
314 self.entries[key] = time.time(), value
315
316 - def __getitem__(self, key):
317 tstamp, value = self.entries[key] 318 if time.time() - tstamp >= self.ttl: 319 del self.entries[key] 320 raise KeyError(key) 321 else: 322 return value
323
324 - def __delitem__(self, key):
325 del self.entries[key]
326 327 # XXX 328 HEADER="!4s4B" 329 330 EMPTY_DP = DeliveryProperties() 331 EMPTY_MP = MessageProperties() 332 333 SUBJECT = "qpid.subject" 334 335 CLOSED = "CLOSED" 336 READ_ONLY = "READ_ONLY" 337 WRITE_ONLY = "WRITE_ONLY" 338 OPEN = "OPEN"
339 340 -class Driver:
341
342 - def __init__(self, connection):
343 self.connection = connection 344 self.log_id = "%x" % id(self.connection) 345 self._lock = self.connection._lock 346 347 self._selector = Selector.default() 348 self._attempts = 0 349 self._delay = self.connection.reconnect_interval_min 350 self._reconnect_log = self.connection.reconnect_log 351 self._host = 0 352 self._retrying = False 353 self._next_retry = None 354 self._transport = None 355 356 self._timeout = None 357 358 self.engine = None
359
360 - def _next_host(self):
361 urls = [URL(u) for u in self.connection.reconnect_urls] 362 hosts = [(self.connection.host, default(self.connection.port, 5672))] + \ 363 [(u.host, default(u.port, 5672)) for u in urls] 364 if self._host >= len(hosts): 365 self._host = 0 366 self._last_host = hosts[self._host] 367 if self._host == 0: 368 self._attempts += 1 369 self._host = self._host + 1 370 return self._last_host
371
372 - def _num_hosts(self):
373 return len(self.connection.reconnect_urls) + 1
374 375 @synchronized
376 - def wakeup(self):
377 self.dispatch() 378 self._selector.wakeup()
379
380 - def start(self):
381 self._selector.register(self)
382
383 - def stop(self):
384 self._selector.unregister(self) 385 if self._transport: 386 self.st_closed()
387
388 - def fileno(self):
389 return self._transport.fileno()
390 391 @synchronized
392 - def reading(self):
393 """Called by the Selector I/O thread to determine if the driver needs to 394 wait on the arrival of network data (call self.readable() callback) 395 """ 396 return self._transport is not None and \ 397 self._transport.reading(True)
398 399 @synchronized
400 - def writing(self):
401 """Called by the Selector I/O thread to determine if it should block 402 waiting for output bandwidth (call the self.writeable() callback) 403 """ 404 return self._transport is not None and \ 405 self.engine is not None and \ 406 self._transport.writing(self.engine.pending())
407 408 @synchronized
409 - def timing(self):
410 """Called by the Selector I/O thread to determine if it should wake up the 411 driver (call the timeout() callback 412 """ 413 return self._timeout
414 415 @synchronized
416 - def abort(self, exc, info):
417 """Called if the Selector I/O thread hits an unrecoverable error and fails. 418 """ 419 try: 420 self.connection.error = exc 421 log.error("I/O Thread Fatal error: %s\n%s" % (str(exc), info)) 422 except: 423 pass
424
425 - def _check_retry_ok(self):
426 """We consider a reconnect to have suceeded only when we have received 427 open-ok from the peer. 428 429 If we declared success as soon as the transport connected, then we could get 430 into an infinite heartbeat loop if the remote process is hung and never 431 sends us any data. We would fail the connection after 2 missed heartbeats, 432 reconnect the transport, declare the reconnect ok, then fail again after 2 433 missed heartbeats and so on. 434 """ 435 if self._retrying and self.engine is not None and self.engine._connected: # Means we have received open-ok. 436 if self._reconnect_log: 437 log.warn("reconnect succeeded: %s:%s", *self._last_host) 438 self._next_retry = None 439 self._attempts = 0 440 self._delay = self.connection.reconnect_interval_min 441 self._retrying = False
442 443 @synchronized
444 - def readable(self):
445 if self.engine is None: 446 return 447 try: 448 data = self._transport.recv(64*1024) 449 if data is None: 450 return 451 elif data: 452 rawlog.debug("READ[%s]: %r", self.log_id, data) 453 self.engine.write(data) 454 self._check_retry_ok() 455 else: 456 self.close_engine() 457 except socket.error, e: 458 self.close_engine(ConnectionError(text=str(e))) 459 460 self.update_status() 461 462 self._notify()
463
464 - def _notify(self):
465 if self.connection.error: 466 self.connection._condition.gc() 467 self.connection._waiter.notifyAll()
468
469 - def close_engine(self, e=None):
470 if e is None: 471 e = ConnectionError(text="connection aborted") 472 473 if (self.connection.reconnect and 474 (self.connection.reconnect_limit is None or 475 self.connection.reconnect_limit <= 0 or 476 self._attempts <= self.connection.reconnect_limit)): 477 if self._host < self._num_hosts(): 478 delay = 0 479 else: 480 delay = self._delay 481 self._delay = min(2*self._delay, 482 self.connection.reconnect_interval_max) 483 self._next_retry = time.time() + delay 484 if self._reconnect_log: 485 log.warn("recoverable error[attempt %s]: %s" % (self._attempts, e)) 486 if delay > 0: 487 log.warn("sleeping %s seconds" % delay) 488 self._retrying = True 489 self.engine.close() 490 else: 491 self.engine.close(e) 492 self.st_closed() 493 494 self.schedule()
495
496 - def update_status(self):
497 if not self.engine: return False 498 status = self.engine.status() 499 return getattr(self, "st_%s" % status.lower())()
500
501 - def st_closed(self):
502 # XXX: this log statement seems to sometimes hit when the socket is not connected 503 # XXX: rawlog.debug("CLOSE[%s]: %s", self.log_id, self._socket.getpeername()) 504 if self._transport: self._transport.close() 505 self._transport = None 506 self.engine = None 507 return True
508
509 - def st_open(self):
510 return False
511 512 @synchronized
513 - def writeable(self):
514 if self.engine is None: 515 return 516 notify = False 517 try: 518 n = self._transport.send(self.engine.peek()) 519 if n == 0: return 520 sent = self.engine.read(n) 521 rawlog.debug("SENT[%s]: %r", self.log_id, sent) 522 except socket.error, e: 523 self.close_engine(e) 524 notify = True 525 526 if self.update_status() or notify: 527 self._notify()
528 529 @synchronized
530 - def timeout(self):
531 self.dispatch() 532 self.update_status() 533 self._notify() 534 self.schedule()
535
536 - def schedule(self):
537 times = [] 538 if self.connection.heartbeat: 539 times.append(time.time() + self.connection.heartbeat) 540 if self._next_retry: 541 times.append(self._next_retry) 542 if times: 543 self._timeout = min(times) 544 else: 545 self._timeout = None
546
547 - def dispatch(self):
548 try: 549 if self._transport is None: 550 if self.connection._connected and not self.connection.error: 551 self.connect() 552 elif self.engine is not None: 553 self.engine.dispatch() 554 except HeartbeatTimeout, e: 555 log.warn("Heartbeat timeout") 556 self.close_engine(e) 557 except ContentError, e: 558 msg = compat.format_exc() 559 self.connection.error = ContentError(text=msg) 560 except: 561 # XXX: Does socket get leaked if this occurs? 562 msg = compat.format_exc() 563 self.connection.error = InternalError(text=msg)
564
565 - def connect(self):
566 if self._retrying and time.time() < self._next_retry: 567 return 568 569 try: 570 # XXX: should make this non blocking 571 host, port = self._next_host() 572 if self._retrying and self._reconnect_log: 573 log.warn("trying: %s:%s", host, port) 574 self.engine = Engine(self.connection) 575 self.engine.open() 576 rawlog.debug("OPEN[%s]: %s:%s", self.log_id, host, port) 577 trans = transports.TRANSPORTS.get(self.connection.transport) 578 if trans: 579 self._transport = trans(self.connection, host, port) 580 else: 581 raise ConnectError("no such transport: %s" % self.connection.transport) 582 self.schedule() 583 except socket.error, e: 584 self.close_engine(ConnectError(text=str(e)))
585 586 DEFAULT_DISPOSITION = Disposition(None)
587 588 -def get_bindings(opts, queue=None, exchange=None, key=None):
589 bindings = opts.get("x-bindings", []) 590 cmds = [] 591 for b in bindings: 592 exchange = b.get("exchange", exchange) 593 queue = b.get("queue", queue) 594 key = b.get("key", key) 595 args = b.get("arguments", {}) 596 cmds.append(ExchangeBind(queue, exchange, key, args)) 597 return cmds
598 599 CONNECTION_ERRS = { 600 # anythong not here (i.e. everything right now) will default to 601 # connection error 602 } 603 604 SESSION_ERRS = { 605 # anything not here will default to session error 606 error_code.unauthorized_access: UnauthorizedAccess, 607 error_code.not_found: NotFound, 608 error_code.resource_locked: ReceiverError, 609 error_code.resource_limit_exceeded: TargetCapacityExceeded, 610 error_code.internal_error: ServerError 611 }
612 613 -class Engine:
614
615 - def __init__(self, connection):
616 self.connection = connection 617 self.log_id = "%x" % id(self.connection) 618 self._closing = False 619 self._connected = False 620 self._reconnecting = bool(connection.sessions) 621 self._attachments = {} 622 623 self._in = LinkIn() 624 self._out = LinkOut() 625 626 self._channel_max = 65536 627 self._channels = 0 628 self._sessions = {} 629 630 self.address_cache = Cache(self.connection.address_ttl) 631 632 self._status = CLOSED 633 self._buf = "" 634 self._hdr = "" 635 # Set _last_in and _last_out here so heartbeats will be timed from the 636 # beginning of connection if no data is sent/received. 637 self._last_in = time.time() 638 self._last_out = time.time() 639 self._op_enc = OpEncoder() 640 self._seg_enc = SegmentEncoder() 641 self._frame_enc = FrameEncoder() 642 self._frame_dec = FrameDecoder() 643 self._seg_dec = SegmentDecoder() 644 self._op_dec = OpDecoder() 645 646 self._sasl = sasl.Client() 647 if self.connection.username: 648 self._sasl.setAttr("username", self.connection.username) 649 if self.connection.password: 650 self._sasl.setAttr("password", self.connection.password) 651 if self.connection.host: 652 self._sasl.setAttr("host", self.connection.host) 653 self._sasl.setAttr("service", self.connection.sasl_service) 654 if self.connection.sasl_min_ssf is not None: 655 self._sasl.setAttr("minssf", self.connection.sasl_min_ssf) 656 if self.connection.sasl_max_ssf is not None: 657 self._sasl.setAttr("maxssf", self.connection.sasl_max_ssf) 658 self._sasl.init() 659 self._sasl_encode = False 660 self._sasl_decode = False
661
662 - def _reset(self):
663 self.connection._transport_connected = False 664 665 for ssn in self.connection.sessions.values(): 666 for m in ssn.acked + ssn.unacked + ssn.incoming: 667 m._transfer_id = None 668 for snd in ssn.senders: 669 snd.linked = False 670 for rcv in ssn.receivers: 671 rcv.impending = rcv.received 672 rcv.linked = False
673
674 - def status(self):
675 return self._status
676
677 - def write(self, data):
678 self._last_in = time.time() 679 try: 680 if self._sasl_decode: 681 data = self._sasl.decode(data) 682 683 if len(self._hdr) < 8: 684 r = 8 - len(self._hdr) 685 self._hdr += data[:r] 686 data = data[r:] 687 688 if len(self._hdr) == 8: 689 self.do_header(self._hdr) 690 691 self._frame_dec.write(data) 692 self._seg_dec.write(*self._frame_dec.read()) 693 self._op_dec.write(*self._seg_dec.read()) 694 for op in self._op_dec.read(): 695 self.assign_id(op) 696 opslog.debug("RCVD[%s]: %r", self.log_id, op) 697 op.dispatch(self) 698 self.dispatch() 699 except MessagingError, e: 700 self.close(e) 701 except: 702 self.close(InternalError(text=compat.format_exc()))
703
704 - def close(self, e=None):
705 self._reset() 706 # We cannot re-establish transactional sessions, they must be aborted. 707 # We could re-do transactional enqueues, but not dequeues. 708 for ssn in self.connection.sessions.values(): 709 if ssn.transactional: 710 if ssn.committing: 711 ssn.error = TransactionUnknown(text="Transaction outcome unknown due to transport failure") 712 else: 713 ssn.error = TransactionAborted(text="Transaction aborted due to transport failure") 714 ssn.closed = True 715 if e: 716 self.connection.error = e 717 self._status = CLOSED
718
719 - def assign_id(self, op):
720 if isinstance(op, Command): 721 sst = self.get_sst(op) 722 op.id = sst.received 723 sst.received += 1
724
725 - def pending(self):
726 return len(self._buf)
727
728 - def read(self, n):
729 result = self._buf[:n] 730 self._buf = self._buf[n:] 731 return result
732
733 - def peek(self):
734 return self._buf
735
736 - def write_op(self, op):
737 opslog.debug("SENT[%s]: %r", self.log_id, op) 738 self._op_enc.write(op) 739 self._seg_enc.write(*self._op_enc.read()) 740 self._frame_enc.write(*self._seg_enc.read()) 741 bytes = self._frame_enc.read() 742 if self._sasl_encode: 743 bytes = self._sasl.encode(bytes) 744 self._buf += bytes 745 self._last_out = time.time()
746
747 - def do_header(self, hdr):
748 cli_major = 0; cli_minor = 10 749 magic, _, _, major, minor = struct.unpack(HEADER, hdr) 750 if major != cli_major or minor != cli_minor: 751 raise VersionError(text="client: %s-%s, server: %s-%s" % 752 (cli_major, cli_minor, major, minor))
753
754 - def do_connection_start(self, start):
755 if self.connection.sasl_mechanisms: 756 permitted = self.connection.sasl_mechanisms.split() 757 mechs = [m for m in start.mechanisms if m in permitted] 758 else: 759 mechs = start.mechanisms 760 try: 761 mech, initial = self._sasl.start(" ".join(mechs)) 762 except sasl.SASLError, e: 763 if "ANONYMOUS" not in mechs and self.connection.username is None: 764 _text="Anonymous connections disabled, missing credentials" 765 else: 766 _text=str(e) 767 raise AuthenticationFailure(text=_text) 768 769 client_properties = get_client_properties_with_defaults(provided_client_properties=self.connection.client_properties); 770 self.write_op(ConnectionStartOk(client_properties=client_properties, 771 mechanism=mech, response=initial))
772
773 - def do_connection_secure(self, secure):
774 resp = self._sasl.step(secure.challenge) 775 self.write_op(ConnectionSecureOk(response=resp))
776
777 - def do_connection_tune(self, tune):
778 # XXX: is heartbeat protocol specific? 779 if tune.channel_max is not None: 780 self.channel_max = tune.channel_max 781 self.write_op(ConnectionTuneOk(heartbeat=self.connection.heartbeat, 782 channel_max=self.channel_max)) 783 self.write_op(ConnectionOpen()) 784 self._sasl_encode = True
785
786 - def do_connection_open_ok(self, open_ok):
787 self.connection.auth_username = self._sasl.auth_username() 788 self._connected = True 789 self._sasl_decode = True 790 self.connection._transport_connected = True
791
792 - def do_connection_heartbeat(self, hrt):
793 pass
794
795 - def do_connection_close(self, close):
796 self.write_op(ConnectionCloseOk()) 797 if close.reply_code != close_code.normal: 798 exc = CONNECTION_ERRS.get(close.reply_code, ConnectionError) 799 self.connection.error = exc(close.reply_code, close.reply_text)
800 # XXX: should we do a half shutdown on the socket here? 801 # XXX: we really need to test this, we may end up reporting a 802 # connection abort after this, if we were to do a shutdown on read 803 # and stop reading, then we wouldn't report the abort, that's 804 # probably the right thing to do 805
806 - def do_connection_close_ok(self, close_ok):
807 self.close()
808
809 - def do_session_attached(self, atc):
810 pass
811
812 - def do_session_command_point(self, cp):
813 sst = self.get_sst(cp) 814 sst.received = cp.command_id
815
816 - def do_session_completed(self, sc):
817 sst = self.get_sst(sc) 818 for r in sc.commands: 819 sst.acknowledged.add(r.lower, r.upper) 820 821 if not sc.commands.empty(): 822 while sst.min_completion in sc.commands: 823 if sst.actions.has_key(sst.min_completion): 824 sst.actions.pop(sst.min_completion)() 825 sst.min_completion += 1
826
827 - def session_known_completed(self, kcmp):
828 sst = self.get_sst(kcmp) 829 executed = RangedSet() 830 for e in sst.executed.ranges: 831 for ke in kcmp.ranges: 832 if e.lower in ke and e.upper in ke: 833 break 834 else: 835 executed.add_range(e) 836 sst.executed = completed
837
838 - def do_session_flush(self, sf):
839 sst = self.get_sst(sf) 840 if sf.expected: 841 if sst.received is None: 842 exp = None 843 else: 844 exp = RangedSet(sst.received) 845 sst.write_op(SessionExpected(exp)) 846 if sf.confirmed: 847 sst.write_op(SessionConfirmed(sst.executed)) 848 if sf.completed: 849 sst.write_op(SessionCompleted(sst.executed))
850
851 - def do_session_request_timeout(self, rt):
852 sst = self.get_sst(rt) 853 sst.write_op(SessionTimeout(timeout=0))
854
855 - def do_execution_result(self, er):
856 sst = self.get_sst(er) 857 sst.results[er.command_id] = er.value 858 sst.executed.add(er.id)
859
860 - def do_execution_exception(self, ex):
861 sst = self.get_sst(ex) 862 exc = SESSION_ERRS.get(ex.error_code, SessionError) 863 sst.session.error = exc(ex.error_code, ex.description)
864
865 - def dispatch(self):
866 if not self.connection._connected and not self._closing and self._status != CLOSED: 867 self.disconnect() 868 869 if self._connected and not self._closing: 870 for ssn in self.connection.sessions.values(): 871 self.attach(ssn) 872 self.process(ssn) 873 874 # We need to check heartbeat even if not self._connected since we may have 875 # heartbeat timeout before receiving an open-ok 876 if self.connection.heartbeat and self._status != CLOSED and not self._closing: 877 now = time.time() 878 if now - self._last_in > 2*self.connection.heartbeat: 879 raise HeartbeatTimeout(text="heartbeat timeout") 880 # Only send heartbeats if we are connected. 881 if self._connected and now - self._last_out >= self.connection.heartbeat/2.0: 882 self.write_op(ConnectionHeartbeat())
883
884 - def open(self):
885 self._reset() 886 self._status = OPEN 887 self._buf += struct.pack(HEADER, "AMQP", 1, 1, 0, 10)
888
889 - def disconnect(self):
890 self.write_op(ConnectionClose(close_code.normal)) 891 self._closing = True
892
893 - def attach(self, ssn):
894 if ssn.closed: return 895 sst = self._attachments.get(ssn) 896 if sst is None: 897 for i in xrange(0, self.channel_max): 898 if not self._sessions.has_key(i): 899 ch = i 900 break 901 else: 902 raise RuntimeError("all channels used") 903 sst = SessionState(self, ssn, ssn.name, ch) 904 sst.write_op(SessionAttach(name=ssn.name, force=self._reconnecting)) 905 sst.write_op(SessionCommandPoint(sst.sent, 0)) 906 self._reconnecting = False 907 sst.outgoing_idx = 0 908 sst.acked = [] 909 sst.acked_idx = 0 910 if ssn.transactional: 911 sst.write_cmd(TxSelect()) 912 self._attachments[ssn] = sst 913 self._sessions[sst.channel] = sst 914 915 for snd in ssn.senders: 916 self.link(snd, self._out, snd.target) 917 for rcv in ssn.receivers: 918 self.link(rcv, self._in, rcv.source) 919 920 if sst is not None and ssn.closing and not sst.detached: 921 sst.detached = True 922 sst.write_op(SessionDetach(name=ssn.name))
923
924 - def get_sst(self, op):
925 return self._sessions[op.channel]
926
927 - def do_session_detached(self, dtc):
928 sst = self._sessions.pop(dtc.channel) 929 ssn = sst.session 930 del self._attachments[ssn] 931 ssn.closed = True
932
933 - def do_session_detach(self, dtc):
934 sst = self.get_sst(dtc) 935 sst.write_op(SessionDetached(name=dtc.name)) 936 self.do_session_detached(dtc)
937 955 956 def resolved(type, subtype): 957 dir.do_link(sst, lnk, _lnk, type, subtype, linked)
958 959 self.resolve_declare(sst, _lnk, dir.DIR_NAME, resolved) 960 self._attachments[lnk] = _lnk 961 962 if lnk.linked and lnk.closing and not lnk.closed: 963 if not _lnk.closing: 964 def unlinked(): 965 dir.del_link(sst, lnk, _lnk) 966 del self._attachments[lnk] 967 lnk.closed = True 968 if _lnk.options.get("delete") in ("always", dir.DIR_NAME): 969 dir.do_unlink(sst, lnk, _lnk) 970 requested_type = _lnk.options.get("node", {}).get("type") 971 self.delete(sst, _lnk.name, unlinked, node_type=requested_type) 972 else: 973 dir.do_unlink(sst, lnk, _lnk, unlinked) 974 _lnk.closing = True 975 elif not lnk.linked and lnk.closing and not lnk.closed: 976 if lnk.error: lnk.closed = True 977
978 - def parse_address(self, lnk, dir, addr):
979 if addr is None: 980 return MalformedAddress(text="%s is None" % dir.ADDR_NAME) 981 else: 982 try: 983 lnk.name, lnk.subject, lnk.options = address.parse(addr) 984 # XXX: subject 985 if lnk.options is None: 986 lnk.options = {} 987 if isinstance(addr, MangledString): 988 lnk.options['create'] = "always" 989 if 'node' not in lnk.options: 990 lnk.options['node'] = {} 991 if 'x-declare' not in lnk.options['node']: 992 lnk.options['node']['x-declare'] = {} 993 xdeclare = lnk.options['node']['x-declare'] 994 if 'auto-delete' not in xdeclare: 995 xdeclare['auto-delete'] = "True" 996 if 'exclusive' not in xdeclare: 997 xdeclare['exclusive'] = "True" 998 except address.LexError, e: 999 return MalformedAddress(text=str(e)) 1000 except address.ParseError, e: 1001 return MalformedAddress(text=str(e))
1002
1003 - def validate_options(self, lnk, dir):
1004 ctx = Context() 1005 err = dir.VALIDATOR.validate(lnk.options, ctx) 1006 if err: return InvalidOption(text="error in options: %s" % err)
1007
1008 - def resolve_declare(self, sst, lnk, dir, action):
1009 declare = lnk.options.get("create") in ("always", dir) 1010 assrt = lnk.options.get("assert") in ("always", dir) 1011 requested_type = lnk.options.get("node", {}).get("type") 1012 def do_resolved(type, subtype): 1013 err = None 1014 if type is None: 1015 if declare: 1016 err = self.declare(sst, lnk, action, True) 1017 else: 1018 err = NotFound(text="no such %s: %s" % (requested_type or "queue", lnk.name)) 1019 else: 1020 if assrt: 1021 expected = lnk.options.get("node", {}).get("type") 1022 if expected and type != expected: 1023 if declare: 1024 err = self.declare(sst, lnk, action, True) 1025 else: 1026 err = AssertionFailed(text="expected %s, got %s" % (expected, type)) 1027 if "node" in lnk.options and "x-bindings" in lnk.options["node"]: 1028 err = self.declare(sst, lnk, action, False) 1029 if err is None: 1030 action(type, subtype) 1031 1032 if err: 1033 tgt = lnk.target 1034 tgt.error = err 1035 del self._attachments[tgt] 1036 tgt.closed = True 1037 return
1038 self.resolve(sst, lnk.name, do_resolved, node_type=requested_type, force=declare) 1039
1040 - def resolve(self, sst, name, action, force=False, node_type=None, delete=False):
1041 if not force and not node_type: 1042 try: 1043 type, subtype = self.address_cache[name] 1044 action(type, subtype) 1045 return 1046 except KeyError: 1047 pass 1048 1049 args = { "topic":None, "queue":None } 1050 def do_result(r, obj): 1051 args[obj] = r
1052 def do_action(): 1053 er = args["topic"] 1054 qr = args["queue"] 1055 if node_type == "topic" and er and not er.not_found: 1056 type, subtype = "topic", er.type 1057 elif node_type == "queue" and qr and qr.queue: 1058 type, subtype = "queue", None 1059 elif (er and er.not_found) and qr and not qr.queue: 1060 type, subtype = None, None 1061 elif (qr and qr.queue): 1062 if node_type == "topic" and force: 1063 type, subtype = None, None 1064 else: 1065 type, subtype = "queue", None 1066 elif (er and not er.not_found): 1067 if node_type == "queue" and force: 1068 type, subtype = None, None 1069 else: 1070 type, subtype = "topic", er.type 1071 elif er: 1072 if er.not_found: 1073 type, subtype = None, None 1074 else: 1075 type, subtype = "topic", er.type 1076 else: 1077 type, subtype = None, None 1078 if type is not None: 1079 self.address_cache[name] = (type, subtype) 1080 action(type, subtype) 1081 def do_result_and_action(r, obj): 1082 do_result(r, obj) 1083 do_action() 1084 if (node_type is None): # we don't know the type, let check broker 1085 sst.write_query(ExchangeQuery(name), do_result, "topic") 1086 sst.write_query(QueueQuery(name), do_result_and_action, "queue") 1087 elif force and not delete: # we forcefully declare known type, dont ask broker 1088 do_action() 1089 elif node_type == "topic": 1090 sst.write_query(ExchangeQuery(name), do_result_and_action, "topic") 1091 else: 1092 sst.write_query(QueueQuery(name), do_result_and_action, "queue") 1093
1094 - def declare(self, sst, lnk, action, create_node):
1095 name = lnk.name 1096 props = lnk.options.get("node", {}) 1097 durable = props.get("durable", DURABLE_DEFAULT) 1098 type = props.get("type", "queue") 1099 declare = props.get("x-declare", {}) 1100 1101 cmd = None 1102 if type == "topic": 1103 if create_node: cmd = ExchangeDeclare(exchange=name, durable=durable) 1104 bindings = get_bindings(props, exchange=name) 1105 elif type == "queue": 1106 if create_node: cmd = QueueDeclare(queue=name, durable=durable) 1107 bindings = get_bindings(props, queue=name) 1108 else: 1109 raise ValueError(type) 1110 1111 if cmd is not None: 1112 sst.apply_overrides(cmd, declare) 1113 if type == "topic": 1114 if cmd.type is None: 1115 cmd.type = "topic" 1116 subtype = cmd.type 1117 else: 1118 subtype = None 1119 cmds = [cmd] 1120 else: 1121 cmds = [] 1122 1123 cmds.extend(bindings) 1124 1125 def declared(): 1126 if create_node: 1127 self.address_cache[name] = (type, subtype) 1128 action(type, subtype)
1129 1130 sst.write_cmds(cmds, declared) 1131
1132 - def delete(self, sst, name, action, node_type=None):
1133 def deleted(): 1134 del self.address_cache[name] 1135 action()
1136 1137 def do_delete(type, subtype): 1138 if type == "topic": 1139 sst.write_cmd(ExchangeDelete(name), deleted) 1140 elif type == "queue": 1141 sst.write_cmd(QueueDelete(name), deleted) 1142 elif type is None: 1143 action() 1144 else: 1145 raise ValueError(type) 1146 self.resolve(sst, name, do_delete, force=True, node_type=node_type, delete=True) 1147
1148 - def process(self, ssn):
1149 if ssn.closed or ssn.closing: return 1150 1151 sst = self._attachments[ssn] 1152 1153 while sst.outgoing_idx < len(ssn.outgoing): 1154 msg = ssn.outgoing[sst.outgoing_idx] 1155 snd = msg._sender 1156 # XXX: should check for sender error here 1157 _snd = self._attachments.get(snd) 1158 if _snd and snd.linked: 1159 self.send(snd, msg) 1160 sst.outgoing_idx += 1 1161 else: 1162 break 1163 1164 for snd in ssn.senders: 1165 # XXX: should included snd.acked in this 1166 if snd.synced >= snd.queued and sst.need_sync: 1167 sst.write_cmd(ExecutionSync(), sync_noop) 1168 1169 for rcv in ssn.receivers: 1170 self.process_receiver(rcv) 1171 1172 if ssn.acked: 1173 messages = ssn.acked[sst.acked_idx:] 1174 if messages: 1175 ids = RangedSet() 1176 1177 disposed = [(DEFAULT_DISPOSITION, [])] 1178 acked = [] 1179 for m in messages: 1180 # XXX: we're ignoring acks that get lost when disconnected, 1181 # could we deal this via some message-id based purge? 1182 if m._transfer_id is None: 1183 acked.append(m) 1184 continue 1185 ids.add(m._transfer_id) 1186 if m._receiver._accept_mode is accept_mode.explicit: 1187 disp = m._disposition or DEFAULT_DISPOSITION 1188 last, msgs = disposed[-1] 1189 if disp.type is last.type and disp.options == last.options: 1190 msgs.append(m) 1191 else: 1192 disposed.append((disp, [m])) 1193 else: 1194 acked.append(m) 1195 1196 for range in ids: 1197 sst.executed.add_range(range) 1198 sst.write_op(SessionCompleted(sst.executed)) 1199 1200 def ack_acker(msgs): 1201 def ack_ack(): 1202 for m in msgs: 1203 ssn.acked.remove(m) 1204 sst.acked_idx -= 1 1205 # XXX: should this check accept_mode too? 1206 if not ssn.transactional: 1207 sst.acked.remove(m)
1208 return ack_ack 1209 1210 for disp, msgs in disposed: 1211 if not msgs: continue 1212 if disp.type is None: 1213 op = MessageAccept 1214 elif disp.type is RELEASED: 1215 op = MessageRelease 1216 elif disp.type is REJECTED: 1217 op = MessageReject 1218 sst.write_cmd(op(RangedSet(*[m._transfer_id for m in msgs]), 1219 **disp.options), 1220 ack_acker(msgs)) 1221 if log.isEnabledFor(DEBUG): 1222 for m in msgs: 1223 log.debug("SACK[%s]: %s, %s", ssn.log_id, m, m._disposition) 1224 1225 sst.acked.extend(messages) 1226 sst.acked_idx += len(messages) 1227 ack_acker(acked)() 1228 1229 if ssn.committing and not sst.committing: 1230 def commit_ok(): 1231 del sst.acked[:] 1232 ssn.committing = False 1233 ssn.committed = True 1234 ssn.aborting = False 1235 ssn.aborted = False 1236 sst.committing = False 1237 sst.write_cmd(TxCommit(), commit_ok) 1238 sst.committing = True 1239 1240 if ssn.aborting and not sst.aborting: 1241 sst.aborting = True 1242 def do_rb(): 1243 messages = sst.acked + ssn.unacked + ssn.incoming 1244 ids = RangedSet(*[m._transfer_id for m in messages]) 1245 for range in ids: 1246 sst.executed.add_range(range) 1247 sst.write_op(SessionCompleted(sst.executed)) 1248 sst.write_cmd(MessageRelease(ids, True)) 1249 sst.write_cmd(TxRollback(), do_rb_ok) 1250 1251 def do_rb_ok(): 1252 del ssn.incoming[:] 1253 del ssn.unacked[:] 1254 del sst.acked[:] 1255 1256 for rcv in ssn.receivers: 1257 rcv.impending = rcv.received 1258 rcv.returned = rcv.received 1259 # XXX: do we need to update granted here as well? 1260 1261 for rcv in ssn.receivers: 1262 self.process_receiver(rcv) 1263 1264 ssn.aborting = False 1265 ssn.aborted = True 1266 ssn.committing = False 1267 ssn.committed = False 1268 sst.aborting = False 1269 1270 for rcv in ssn.receivers: 1271 _rcv = self._attachments[rcv] 1272 sst.write_cmd(MessageStop(_rcv.destination)) 1273 sst.write_cmd(ExecutionSync(), do_rb) 1274
1275 - def grant(self, rcv):
1276 sst = self._attachments[rcv.session] 1277 _rcv = self._attachments.get(rcv) 1278 if _rcv is None or not rcv.linked or _rcv.closing or _rcv.draining: 1279 return 1280 1281 if rcv.granted is UNLIMITED: 1282 if rcv.impending is UNLIMITED: 1283 delta = 0 1284 else: 1285 delta = UNLIMITED 1286 elif rcv.impending is UNLIMITED: 1287 delta = -1 1288 else: 1289 delta = max(rcv.granted, rcv.received) - rcv.impending 1290 1291 if delta is UNLIMITED: 1292 if not _rcv.bytes_open: 1293 sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.byte, UNLIMITED.value)) 1294 _rcv.bytes_open = True 1295 sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.message, UNLIMITED.value)) 1296 rcv.impending = UNLIMITED 1297 elif delta > 0: 1298 if not _rcv.bytes_open: 1299 sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.byte, UNLIMITED.value)) 1300 _rcv.bytes_open = True 1301 sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.message, delta)) 1302 rcv.impending += delta 1303 elif delta < 0 and not rcv.draining: 1304 _rcv.draining = True 1305 def do_stop(): 1306 rcv.impending = rcv.received 1307 _rcv.draining = False 1308 _rcv.bytes_open = False 1309 self.grant(rcv)
1310 sst.write_cmd(MessageStop(_rcv.destination), do_stop) 1311 1312 if rcv.draining: 1313 _rcv.draining = True 1314 def do_flush(): 1315 rcv.impending = rcv.received 1316 rcv.granted = rcv.impending 1317 _rcv.draining = False 1318 _rcv.bytes_open = False 1319 rcv.draining = False 1320 sst.write_cmd(MessageFlush(_rcv.destination), do_flush) 1321 1322
1323 - def process_receiver(self, rcv):
1324 if rcv.closed: return 1325 self.grant(rcv)
1326
1327 - def send(self, snd, msg):
1328 sst = self._attachments[snd.session] 1329 _snd = self._attachments[snd] 1330 1331 if msg.subject is None or _snd._exchange == "": 1332 rk = _snd._routing_key 1333 else: 1334 rk = msg.subject 1335 1336 if msg.subject is None: 1337 subject = _snd.subject 1338 else: 1339 subject = msg.subject 1340 1341 # XXX: do we need to query to figure out how to create the reply-to interoperably? 1342 if msg.reply_to: 1343 rt = addr2reply_to(msg.reply_to) 1344 else: 1345 rt = None 1346 content_encoding = msg.properties.get("x-amqp-0-10.content-encoding") 1347 dp = DeliveryProperties(routing_key=rk) 1348 mp = MessageProperties(message_id=msg.id, 1349 user_id=msg.user_id, 1350 reply_to=rt, 1351 correlation_id=msg.correlation_id, 1352 app_id = msg.properties.get("x-amqp-0-10.app-id"), 1353 content_type=msg.content_type, 1354 content_encoding=content_encoding, 1355 application_headers=msg.properties) 1356 if subject is not None: 1357 if mp.application_headers is None: 1358 mp.application_headers = {} 1359 mp.application_headers[SUBJECT] = subject 1360 if msg.durable is not None: 1361 if msg.durable: 1362 dp.delivery_mode = delivery_mode.persistent 1363 else: 1364 dp.delivery_mode = delivery_mode.non_persistent 1365 if msg.priority is not None: 1366 dp.priority = msg.priority 1367 if msg.ttl is not None: 1368 dp.ttl = long(msg.ttl*1000) 1369 enc, dec = get_codec(msg.content_type) 1370 try: 1371 body = enc(msg.content) 1372 except AttributeError, e: 1373 # convert to non-blocking EncodeError 1374 raise EncodeError(e) 1375 1376 # XXX: this is not safe for out of order, can this be triggered by pre_ack? 1377 def msg_acked(): 1378 # XXX: should we log the ack somehow too? 1379 snd.acked += 1 1380 m = snd.session.outgoing.pop(0) 1381 sst.outgoing_idx -= 1 1382 log.debug("RACK[%s]: %s", sst.session.log_id, msg) 1383 assert msg == m
1384 1385 xfr = MessageTransfer(destination=_snd._exchange, headers=(dp, mp), 1386 payload=body) 1387 1388 if _snd.pre_ack: 1389 sst.write_cmd(xfr) 1390 else: 1391 sst.write_cmd(xfr, msg_acked, sync=msg._sync) 1392 1393 log.debug("SENT[%s]: %s", sst.session.log_id, msg) 1394 1395 if _snd.pre_ack: 1396 msg_acked() 1397
1398 - def do_message_transfer(self, xfr):
1399 sst = self.get_sst(xfr) 1400 ssn = sst.session 1401 1402 msg = self._decode(xfr) 1403 rcv = sst.destinations[xfr.destination].target 1404 msg._receiver = rcv 1405 if rcv.closing or rcv.closed: # release message to a closing receiver 1406 ids = RangedSet(*[msg._transfer_id]) 1407 log.debug("releasing back %s message: %s, as receiver is closing", ids, msg) 1408 sst.write_cmd(MessageRelease(ids, True)) 1409 return 1410 if rcv.impending is not UNLIMITED: 1411 assert rcv.received < rcv.impending, "%s, %s" % (rcv.received, rcv.impending) 1412 rcv.received += 1 1413 log.debug("RCVD[%s]: %s", ssn.log_id, msg) 1414 ssn._notify_message_received(msg)
1415 1416
1417 - def _decode(self, xfr):
1418 dp = EMPTY_DP 1419 mp = EMPTY_MP 1420 1421 for h in xfr.headers: 1422 if isinstance(h, DeliveryProperties): 1423 dp = h 1424 elif isinstance(h, MessageProperties): 1425 mp = h 1426 1427 ap = mp.application_headers 1428 enc, dec = get_codec(mp.content_type) 1429 try: 1430 content = dec(xfr.payload) 1431 except Exception, e: 1432 raise DecodeError(e) 1433 msg = Message(content) 1434 msg.id = mp.message_id 1435 if ap is not None: 1436 msg.subject = ap.get(SUBJECT) 1437 msg.user_id = mp.user_id 1438 if mp.reply_to is not None: 1439 msg.reply_to = reply_to2addr(mp.reply_to) 1440 msg.correlation_id = mp.correlation_id 1441 if dp.delivery_mode is not None: 1442 msg.durable = dp.delivery_mode == delivery_mode.persistent 1443 msg.priority = dp.priority 1444 if dp.ttl is not None: 1445 msg.ttl = dp.ttl/1000.0 1446 msg.redelivered = dp.redelivered 1447 msg.properties = mp.application_headers or {} 1448 if mp.app_id is not None: 1449 msg.properties["x-amqp-0-10.app-id"] = mp.app_id 1450 if mp.content_encoding is not None: 1451 msg.properties["x-amqp-0-10.content-encoding"] = mp.content_encoding 1452 if dp.routing_key is not None: 1453 msg.properties["x-amqp-0-10.routing-key"] = dp.routing_key 1454 if dp.timestamp is not None: 1455 msg.properties["x-amqp-0-10.timestamp"] = dp.timestamp 1456 msg.content_type = mp.content_type 1457 msg._transfer_id = xfr.id 1458 return msg
1459