Package qpid :: Package messaging :: Module driver
[hide private]
[frames] | no frames]

Source Code for Module qpid.messaging.driver

   1  # 
   2  # Licensed to the Apache Software Foundation (ASF) under one 
   3  # or more contributor license agreements.  See the NOTICE file 
   4  # distributed with this work for additional information 
   5  # regarding copyright ownership.  The ASF licenses this file 
   6  # to you under the Apache License, Version 2.0 (the 
   7  # "License"); you may not use this file except in compliance 
   8  # with the License.  You may obtain a copy of the License at 
   9  # 
  10  #   http://www.apache.org/licenses/LICENSE-2.0 
  11  # 
  12  # Unless required by applicable law or agreed to in writing, 
  13  # software distributed under the License is distributed on an 
  14  # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
  15  # KIND, either express or implied.  See the License for the 
  16  # specific language governing permissions and limitations 
  17  # under the License. 
  18  # 
  19   
  20  import socket, struct, sys, time 
  21  from logging import getLogger, DEBUG 
  22  from qpid import compat 
  23  from qpid import sasl 
  24  from qpid.concurrency import synchronized 
  25  from qpid.datatypes import RangedSet, Serial 
  26  from qpid.framing import OpEncoder, SegmentEncoder, FrameEncoder, \ 
  27      FrameDecoder, SegmentDecoder, OpDecoder 
  28  from qpid.messaging import address, transports 
  29  from qpid.messaging.constants import UNLIMITED, REJECTED, RELEASED 
  30  from qpid.messaging.exceptions import * 
  31  from qpid.messaging.message import get_codec, Disposition, Message 
  32  from qpid.ops import * 
  33  from qpid.selector import Selector 
  34  from qpid.util import URL, default 
  35  from qpid.validator import And, Context, List, Map, Types, Values 
  36  from threading import Condition, Thread 
  37   
  38  log = getLogger("qpid.messaging") 
  39  rawlog = getLogger("qpid.messaging.io.raw") 
  40  opslog = getLogger("qpid.messaging.io.ops") 
41 42 -def addr2reply_to(addr):
43 name, subject, options = address.parse(addr) 44 if options: 45 type = options.get("node", {}).get("type") 46 else: 47 type = None 48 49 if type == "topic": 50 return ReplyTo(name, subject) 51 else: 52 return ReplyTo(None, name)
53
54 -def reply_to2addr(reply_to):
55 if reply_to.exchange in (None, ""): 56 return reply_to.routing_key 57 elif reply_to.routing_key is None: 58 return "%s; {node: {type: topic}}" % reply_to.exchange 59 else: 60 return "%s/%s; {node: {type: topic}}" % (reply_to.exchange, reply_to.routing_key)
61
62 -class Attachment:
63
64 - def __init__(self, target):
65 self.target = target
66 67 # XXX 68 69 DURABLE_DEFAULT=False
70 71 # XXX 72 73 -class Pattern:
74 """ 75 The pattern filter matches the supplied wildcard pattern against a 76 message subject. 77 """ 78
79 - def __init__(self, value):
80 self.value = value
81 82 # XXX: this should become part of the driver
83 - def _bind(self, sst, exchange, queue):
84 from qpid.ops import ExchangeBind 85 86 sst.write_cmd(ExchangeBind(exchange=exchange, queue=queue, 87 binding_key=self.value.replace("*", "#")))
88 89 SUBJECT_DEFAULTS = { 90 "topic": "#" 91 } 92 93 # XXX 94 ppid = 0 95 try: 96 ppid = os.getppid() 97 except: 98 pass 99 100 CLIENT_PROPERTIES = {"product": "qpid python client", 101 "version": "development", 102 "platform": os.name, 103 "qpid.client_process": os.path.basename(sys.argv[0]), 104 "qpid.client_pid": os.getpid(), 105 "qpid.client_ppid": ppid}
106 107 -def noop(): pass
108 -def sync_noop(): pass
109
110 -class SessionState:
111
112 - def __init__(self, driver, session, name, channel):
113 self.driver = driver 114 self.session = session 115 self.name = name 116 self.channel = channel 117 self.detached = False 118 self.committing = False 119 self.aborting = False 120 121 # sender state 122 self.sent = Serial(0) 123 self.acknowledged = RangedSet() 124 self.actions = {} 125 self.min_completion = self.sent 126 self.max_completion = self.sent 127 self.results = {} 128 self.need_sync = False 129 130 # receiver state 131 self.received = None 132 self.executed = RangedSet() 133 134 # XXX: need to periodically exchange completion/known_completion 135 136 self.destinations = {}
137
138 - def write_query(self, query, handler):
139 id = self.sent 140 self.write_cmd(query, lambda: handler(self.results.pop(id)))
141
142 - def apply_overrides(self, cmd, overrides):
143 for k, v in overrides.items(): 144 cmd[k.replace('-', '_')] = v
145
146 - def write_cmd(self, cmd, action=noop, overrides=None, sync=True):
147 if overrides: 148 self.apply_overrides(cmd, overrides) 149 150 if action != noop: 151 cmd.sync = sync 152 if self.detached: 153 raise Exception("detached") 154 cmd.id = self.sent 155 self.sent += 1 156 self.actions[cmd.id] = action 157 self.max_completion = cmd.id 158 self.write_op(cmd) 159 self.need_sync = not cmd.sync
160
161 - def write_cmds(self, cmds, action=noop):
162 if cmds: 163 for cmd in cmds[:-1]: 164 self.write_cmd(cmd) 165 self.write_cmd(cmds[-1], action) 166 else: 167 action()
168
169 - def write_op(self, op):
170 op.channel = self.channel 171 self.driver.write_op(op)
172 173 POLICIES = Values("always", "sender", "receiver", "never") 174 RELIABILITY = Values("unreliable", "at-most-once", "at-least-once", 175 "exactly-once") 176 177 DECLARE = Map({}, restricted=False) 178 BINDINGS = List(Map({ 179 "exchange": Types(basestring), 180 "queue": Types(basestring), 181 "key": Types(basestring), 182 "arguments": Map({}, restricted=False) 183 })) 184 185 COMMON_OPTS = { 186 "create": POLICIES, 187 "delete": POLICIES, 188 "assert": POLICIES, 189 "node": Map({ 190 "type": Values("queue", "topic"), 191 "durable": Types(bool), 192 "x-declare": DECLARE, 193 "x-bindings": BINDINGS 194 }), 195 "link": Map({ 196 "name": Types(basestring), 197 "durable": Types(bool), 198 "reliability": RELIABILITY, 199 "x-declare": DECLARE, 200 "x-bindings": BINDINGS, 201 "x-subscribe": Map({}, restricted=False) 202 }) 203 } 204 205 RECEIVE_MODES = Values("browse", "consume") 206 207 SOURCE_OPTS = COMMON_OPTS.copy() 208 SOURCE_OPTS.update({ 209 "mode": RECEIVE_MODES 210 }) 211 212 TARGET_OPTS = COMMON_OPTS.copy()
213 214 -class LinkIn:
215 216 ADDR_NAME = "source" 217 DIR_NAME = "receiver" 218 VALIDATOR = Map(SOURCE_OPTS) 219 226 266 273
276
277 -class LinkOut:
278 279 ADDR_NAME = "target" 280 DIR_NAME = "sender" 281 VALIDATOR = Map(TARGET_OPTS) 282 286 300 303
306
307 -class Cache:
308
309 - def __init__(self, ttl):
310 self.ttl = ttl 311 self.entries = {}
312
313 - def __setitem__(self, key, value):
314 self.entries[key] = time.time(), value
315
316 - def __getitem__(self, key):
317 tstamp, value = self.entries[key] 318 if time.time() - tstamp >= self.ttl: 319 del self.entries[key] 320 raise KeyError(key) 321 else: 322 return value
323
324 - def __delitem__(self, key):
325 del self.entries[key]
326 327 # XXX 328 HEADER="!4s4B" 329 330 EMPTY_DP = DeliveryProperties() 331 EMPTY_MP = MessageProperties() 332 333 SUBJECT = "qpid.subject" 334 335 CLOSED = "CLOSED" 336 READ_ONLY = "READ_ONLY" 337 WRITE_ONLY = "WRITE_ONLY" 338 OPEN = "OPEN"
339 340 -class Driver:
341
342 - def __init__(self, connection):
343 self.connection = connection 344 self.log_id = "%x" % id(self.connection) 345 self._lock = self.connection._lock 346 347 self._selector = Selector.default() 348 self._attempts = 0 349 self._delay = self.connection.reconnect_interval_min 350 self._reconnect_log = self.connection.reconnect_log 351 self._host = 0 352 self._retrying = False 353 self._next_retry = None 354 self._transport = None 355 356 self._timeout = None 357 358 self.engine = None
359
360 - def _next_host(self):
361 urls = [URL(u) for u in self.connection.reconnect_urls] 362 hosts = [(self.connection.host, default(self.connection.port, 5672))] + \ 363 [(u.host, default(u.port, 5672)) for u in urls] 364 if self._host >= len(hosts): 365 self._host = 0 366 result = hosts[self._host] 367 if self._host == 0: 368 self._attempts += 1 369 self._host = self._host + 1 370 return result
371
372 - def _num_hosts(self):
373 return len(self.connection.reconnect_urls) + 1
374 375 @synchronized
376 - def wakeup(self):
377 self.dispatch() 378 self._selector.wakeup()
379
380 - def start(self):
381 self._selector.register(self)
382
383 - def stop(self):
384 self._selector.unregister(self) 385 if self._transport: 386 self.st_closed()
387
388 - def fileno(self):
389 return self._transport.fileno()
390 391 @synchronized
392 - def reading(self):
393 return self._transport is not None and \ 394 self._transport.reading(True)
395 396 @synchronized
397 - def writing(self):
398 return self._transport is not None and \ 399 self._transport.writing(self.engine.pending())
400 401 @synchronized
402 - def timing(self):
403 return self._timeout
404 405 @synchronized
406 - def readable(self):
407 try: 408 data = self._transport.recv(64*1024) 409 if data is None: 410 return 411 elif data: 412 rawlog.debug("READ[%s]: %r", self.log_id, data) 413 self.engine.write(data) 414 else: 415 self.close_engine() 416 except socket.error, e: 417 self.close_engine(ConnectionError(text=str(e))) 418 419 self.update_status() 420 421 self._notify()
422
423 - def _notify(self):
424 if self.connection.error: 425 self.connection._condition.gc() 426 self.connection._waiter.notifyAll()
427
428 - def close_engine(self, e=None):
429 if e is None: 430 e = ConnectionError(text="connection aborted") 431 432 if (self.connection.reconnect and 433 (self.connection.reconnect_limit is None or 434 self.connection.reconnect_limit <= 0 or 435 self._attempts <= self.connection.reconnect_limit)): 436 if self._host < self._num_hosts(): 437 delay = 0 438 else: 439 delay = self._delay 440 self._delay = min(2*self._delay, 441 self.connection.reconnect_interval_max) 442 self._next_retry = time.time() + delay 443 if self._reconnect_log: 444 log.warn("recoverable error[attempt %s]: %s" % (self._attempts, e)) 445 if delay > 0: 446 log.warn("sleeping %s seconds" % delay) 447 self._retrying = True 448 self.engine.close() 449 else: 450 self.engine.close(e) 451 452 self.schedule()
453
454 - def update_status(self):
455 status = self.engine.status() 456 return getattr(self, "st_%s" % status.lower())()
457
458 - def st_closed(self):
459 # XXX: this log statement seems to sometimes hit when the socket is not connected 460 # XXX: rawlog.debug("CLOSE[%s]: %s", self.log_id, self._socket.getpeername()) 461 self._transport.close() 462 self._transport = None 463 self.engine = None 464 return True
465
466 - def st_open(self):
467 return False
468 469 @synchronized
470 - def writeable(self):
471 notify = False 472 try: 473 n = self._transport.send(self.engine.peek()) 474 if n == 0: return 475 sent = self.engine.read(n) 476 rawlog.debug("SENT[%s]: %r", self.log_id, sent) 477 except socket.error, e: 478 self.close_engine(e) 479 notify = True 480 481 if self.update_status() or notify: 482 self._notify()
483 484 @synchronized
485 - def timeout(self):
486 self.dispatch() 487 self._notify() 488 self.schedule()
489
490 - def schedule(self):
491 times = [] 492 if self.connection.heartbeat: 493 times.append(time.time() + self.connection.heartbeat) 494 if self._next_retry: 495 times.append(self._next_retry) 496 if times: 497 self._timeout = min(times) 498 else: 499 self._timeout = None
500
501 - def dispatch(self):
502 try: 503 if self._transport is None: 504 if self.connection._connected and not self.connection.error: 505 self.connect() 506 else: 507 self.engine.dispatch() 508 except HeartbeatTimeout, e: 509 self.close_engine(e) 510 except: 511 # XXX: Does socket get leaked if this occurs? 512 msg = compat.format_exc() 513 self.connection.error = InternalError(text=msg)
514
515 - def connect(self):
516 if self._retrying and time.time() < self._next_retry: 517 return 518 519 try: 520 # XXX: should make this non blocking 521 host, port = self._next_host() 522 if self._retrying and self._reconnect_log: 523 log.warn("trying: %s:%s", host, port) 524 self.engine = Engine(self.connection) 525 self.engine.open() 526 rawlog.debug("OPEN[%s]: %s:%s", self.log_id, host, port) 527 trans = transports.TRANSPORTS.get(self.connection.transport) 528 if trans: 529 self._transport = trans(self.connection, host, port) 530 else: 531 raise ConnectError("no such transport: %s" % self.connection.transport) 532 if self._retrying and self._reconnect_log: 533 log.warn("reconnect succeeded: %s:%s", host, port) 534 self._next_retry = None 535 self._attempts = 0 536 self._delay = self.connection.reconnect_interval_min 537 self._retrying = False 538 self.schedule() 539 except socket.error, e: 540 self.close_engine(ConnectError(text=str(e)))
541 542 DEFAULT_DISPOSITION = Disposition(None)
543 544 -def get_bindings(opts, queue=None, exchange=None, key=None):
545 bindings = opts.get("x-bindings", []) 546 cmds = [] 547 for b in bindings: 548 exchange = b.get("exchange", exchange) 549 queue = b.get("queue", queue) 550 key = b.get("key", key) 551 args = b.get("arguments", {}) 552 cmds.append(ExchangeBind(queue, exchange, key, args)) 553 return cmds
554 555 CONNECTION_ERRS = { 556 # anythong not here (i.e. everything right now) will default to 557 # connection error 558 } 559 560 SESSION_ERRS = { 561 # anything not here will default to session error 562 error_code.unauthorized_access: UnauthorizedAccess, 563 error_code.not_found: NotFound, 564 error_code.resource_locked: ReceiverError, 565 error_code.resource_limit_exceeded: TargetCapacityExceeded, 566 error_code.internal_error: ServerError 567 }
568 569 -class Engine:
570
571 - def __init__(self, connection):
572 self.connection = connection 573 self.log_id = "%x" % id(self.connection) 574 self._closing = False 575 self._connected = False 576 self._attachments = {} 577 578 self._in = LinkIn() 579 self._out = LinkOut() 580 581 self._channel_max = 65536 582 self._channels = 0 583 self._sessions = {} 584 585 self.address_cache = Cache(self.connection.address_ttl) 586 587 self._status = CLOSED 588 self._buf = "" 589 self._hdr = "" 590 self._last_in = None 591 self._last_out = None 592 self._op_enc = OpEncoder() 593 self._seg_enc = SegmentEncoder() 594 self._frame_enc = FrameEncoder() 595 self._frame_dec = FrameDecoder() 596 self._seg_dec = SegmentDecoder() 597 self._op_dec = OpDecoder() 598 599 self._sasl = sasl.Client() 600 if self.connection.username: 601 self._sasl.setAttr("username", self.connection.username) 602 if self.connection.password: 603 self._sasl.setAttr("password", self.connection.password) 604 if self.connection.host: 605 self._sasl.setAttr("host", self.connection.host) 606 self._sasl.setAttr("service", self.connection.sasl_service) 607 if self.connection.sasl_min_ssf is not None: 608 self._sasl.setAttr("minssf", self.connection.sasl_min_ssf) 609 if self.connection.sasl_max_ssf is not None: 610 self._sasl.setAttr("maxssf", self.connection.sasl_max_ssf) 611 self._sasl.init() 612 self._sasl_encode = False 613 self._sasl_decode = False
614
615 - def _reset(self):
616 self.connection._transport_connected = False 617 618 for ssn in self.connection.sessions.values(): 619 for m in ssn.acked + ssn.unacked + ssn.incoming: 620 m._transfer_id = None 621 for snd in ssn.senders: 622 snd.linked = False 623 for rcv in ssn.receivers: 624 rcv.impending = rcv.received 625 rcv.linked = False
626
627 - def status(self):
628 return self._status
629
630 - def write(self, data):
631 self._last_in = time.time() 632 try: 633 if self._sasl_decode: 634 data = self._sasl.decode(data) 635 636 if len(self._hdr) < 8: 637 r = 8 - len(self._hdr) 638 self._hdr += data[:r] 639 data = data[r:] 640 641 if len(self._hdr) == 8: 642 self.do_header(self._hdr) 643 644 self._frame_dec.write(data) 645 self._seg_dec.write(*self._frame_dec.read()) 646 self._op_dec.write(*self._seg_dec.read()) 647 for op in self._op_dec.read(): 648 self.assign_id(op) 649 opslog.debug("RCVD[%s]: %r", self.log_id, op) 650 op.dispatch(self) 651 self.dispatch() 652 except MessagingError, e: 653 self.close(e) 654 except: 655 self.close(InternalError(text=compat.format_exc()))
656
657 - def close(self, e=None):
658 self._reset() 659 if e: 660 self.connection.error = e 661 self._status = CLOSED
662
663 - def assign_id(self, op):
664 if isinstance(op, Command): 665 sst = self.get_sst(op) 666 op.id = sst.received 667 sst.received += 1
668
669 - def pending(self):
670 return len(self._buf)
671
672 - def read(self, n):
673 result = self._buf[:n] 674 self._buf = self._buf[n:] 675 return result
676
677 - def peek(self):
678 return self._buf
679
680 - def write_op(self, op):
681 opslog.debug("SENT[%s]: %r", self.log_id, op) 682 self._op_enc.write(op) 683 self._seg_enc.write(*self._op_enc.read()) 684 self._frame_enc.write(*self._seg_enc.read()) 685 bytes = self._frame_enc.read() 686 if self._sasl_encode: 687 bytes = self._sasl.encode(bytes) 688 self._buf += bytes 689 self._last_out = time.time()
690
691 - def do_header(self, hdr):
692 cli_major = 0; cli_minor = 10 693 magic, _, _, major, minor = struct.unpack(HEADER, hdr) 694 if major != cli_major or minor != cli_minor: 695 raise VersionError(text="client: %s-%s, server: %s-%s" % 696 (cli_major, cli_minor, major, minor))
697
698 - def do_connection_start(self, start):
699 if self.connection.sasl_mechanisms: 700 permitted = self.connection.sasl_mechanisms.split() 701 mechs = [m for m in start.mechanisms if m in permitted] 702 else: 703 mechs = start.mechanisms 704 try: 705 mech, initial = self._sasl.start(" ".join(mechs)) 706 except sasl.SASLError, e: 707 raise AuthenticationFailure(text=str(e)) 708 709 client_properties = CLIENT_PROPERTIES.copy() 710 client_properties.update(self.connection.client_properties) 711 self.write_op(ConnectionStartOk(client_properties=client_properties, 712 mechanism=mech, response=initial))
713
714 - def do_connection_secure(self, secure):
715 resp = self._sasl.step(secure.challenge) 716 self.write_op(ConnectionSecureOk(response=resp))
717
718 - def do_connection_tune(self, tune):
719 # XXX: is heartbeat protocol specific? 720 if tune.channel_max is not None: 721 self.channel_max = tune.channel_max 722 self.write_op(ConnectionTuneOk(heartbeat=self.connection.heartbeat, 723 channel_max=self.channel_max)) 724 self.write_op(ConnectionOpen()) 725 self._sasl_encode = True
726
727 - def do_connection_open_ok(self, open_ok):
728 self.connection.auth_username = self._sasl.auth_username() 729 self._connected = True 730 self._sasl_decode = True 731 self.connection._transport_connected = True
732
733 - def do_connection_heartbeat(self, hrt):
734 pass
735
736 - def do_connection_close(self, close):
737 self.write_op(ConnectionCloseOk()) 738 if close.reply_code != close_code.normal: 739 exc = CONNECTION_ERRS.get(close.reply_code, ConnectionError) 740 self.connection.error = exc(close.reply_code, close.reply_text)
741 # XXX: should we do a half shutdown on the socket here? 742 # XXX: we really need to test this, we may end up reporting a 743 # connection abort after this, if we were to do a shutdown on read 744 # and stop reading, then we wouldn't report the abort, that's 745 # probably the right thing to do 746
747 - def do_connection_close_ok(self, close_ok):
748 self.close()
749
750 - def do_session_attached(self, atc):
751 pass
752
753 - def do_session_command_point(self, cp):
754 sst = self.get_sst(cp) 755 sst.received = cp.command_id
756
757 - def do_session_completed(self, sc):
758 sst = self.get_sst(sc) 759 for r in sc.commands: 760 sst.acknowledged.add(r.lower, r.upper) 761 762 if not sc.commands.empty(): 763 while sst.min_completion in sc.commands: 764 if sst.actions.has_key(sst.min_completion): 765 sst.actions.pop(sst.min_completion)() 766 sst.min_completion += 1
767
768 - def session_known_completed(self, kcmp):
769 sst = self.get_sst(kcmp) 770 executed = RangedSet() 771 for e in sst.executed.ranges: 772 for ke in kcmp.ranges: 773 if e.lower in ke and e.upper in ke: 774 break 775 else: 776 executed.add_range(e) 777 sst.executed = completed
778
779 - def do_session_flush(self, sf):
780 sst = self.get_sst(sf) 781 if sf.expected: 782 if sst.received is None: 783 exp = None 784 else: 785 exp = RangedSet(sst.received) 786 sst.write_op(SessionExpected(exp)) 787 if sf.confirmed: 788 sst.write_op(SessionConfirmed(sst.executed)) 789 if sf.completed: 790 sst.write_op(SessionCompleted(sst.executed))
791
792 - def do_session_request_timeout(self, rt):
793 sst = self.get_sst(rt) 794 sst.write_op(SessionTimeout(timeout=0))
795
796 - def do_execution_result(self, er):
797 sst = self.get_sst(er) 798 sst.results[er.command_id] = er.value 799 sst.executed.add(er.id)
800
801 - def do_execution_exception(self, ex):
802 sst = self.get_sst(ex) 803 exc = SESSION_ERRS.get(ex.error_code, SessionError) 804 sst.session.error = exc(ex.error_code, ex.description)
805
806 - def dispatch(self):
807 if not self.connection._connected and not self._closing and self._status != CLOSED: 808 self.disconnect() 809 810 if self._connected and not self._closing: 811 for ssn in self.connection.sessions.values(): 812 self.attach(ssn) 813 self.process(ssn) 814 815 if self.connection.heartbeat and self._status != CLOSED: 816 now = time.time() 817 if self._last_in is not None and \ 818 now - self._last_in > 2*self.connection.heartbeat: 819 raise HeartbeatTimeout(text="heartbeat timeout") 820 if self._last_out is None or now - self._last_out >= self.connection.heartbeat/2.0: 821 self.write_op(ConnectionHeartbeat())
822
823 - def open(self):
824 self._reset() 825 self._status = OPEN 826 self._buf += struct.pack(HEADER, "AMQP", 1, 1, 0, 10)
827
828 - def disconnect(self):
829 self.write_op(ConnectionClose(close_code.normal)) 830 self._closing = True
831
832 - def attach(self, ssn):
833 if ssn.closed: return 834 sst = self._attachments.get(ssn) 835 if sst is None: 836 for i in xrange(0, self.channel_max): 837 if not self._sessions.has_key(i): 838 ch = i 839 break 840 else: 841 raise RuntimeError("all channels used") 842 sst = SessionState(self, ssn, ssn.name, ch) 843 sst.write_op(SessionAttach(name=ssn.name)) 844 sst.write_op(SessionCommandPoint(sst.sent, 0)) 845 sst.outgoing_idx = 0 846 sst.acked = [] 847 sst.acked_idx = 0 848 if ssn.transactional: 849 sst.write_cmd(TxSelect()) 850 self._attachments[ssn] = sst 851 self._sessions[sst.channel] = sst 852 853 for snd in ssn.senders: 854 self.link(snd, self._out, snd.target) 855 for rcv in ssn.receivers: 856 self.link(rcv, self._in, rcv.source) 857 858 if sst is not None and ssn.closing and not sst.detached: 859 sst.detached = True 860 sst.write_op(SessionDetach(name=ssn.name))
861
862 - def get_sst(self, op):
863 return self._sessions[op.channel]
864
865 - def do_session_detached(self, dtc):
866 sst = self._sessions.pop(dtc.channel) 867 ssn = sst.session 868 del self._attachments[ssn] 869 ssn.closed = True
870
871 - def do_session_detach(self, dtc):
872 sst = self.get_sst(dtc) 873 sst.write_op(SessionDetached(name=dtc.name)) 874 self.do_session_detached(dtc)
875 893 894 def resolved(type, subtype): 895 dir.do_link(sst, lnk, _lnk, type, subtype, linked)
896 897 self.resolve_declare(sst, _lnk, dir.DIR_NAME, resolved) 898 self._attachments[lnk] = _lnk 899 900 if lnk.linked and lnk.closing and not lnk.closed: 901 if not _lnk.closing: 902 def unlinked(): 903 dir.del_link(sst, lnk, _lnk) 904 del self._attachments[lnk] 905 lnk.closed = True 906 if _lnk.options.get("delete") in ("always", dir.DIR_NAME): 907 dir.do_unlink(sst, lnk, _lnk) 908 self.delete(sst, _lnk.name, unlinked) 909 else: 910 dir.do_unlink(sst, lnk, _lnk, unlinked) 911 _lnk.closing = True 912 elif not lnk.linked and lnk.closing and not lnk.closed: 913 if lnk.error: lnk.closed = True 914
915 - def parse_address(self, lnk, dir, addr):
916 if addr is None: 917 return MalformedAddress(text="%s is None" % dir.ADDR_NAME) 918 else: 919 try: 920 lnk.name, lnk.subject, lnk.options = address.parse(addr) 921 # XXX: subject 922 if lnk.options is None: 923 lnk.options = {} 924 except address.LexError, e: 925 return MalformedAddress(text=str(e)) 926 except address.ParseError, e: 927 return MalformedAddress(text=str(e))
928
929 - def validate_options(self, lnk, dir):
930 ctx = Context() 931 err = dir.VALIDATOR.validate(lnk.options, ctx) 932 if err: return InvalidOption(text="error in options: %s" % err)
933
934 - def resolve_declare(self, sst, lnk, dir, action):
935 declare = lnk.options.get("create") in ("always", dir) 936 assrt = lnk.options.get("assert") in ("always", dir) 937 def do_resolved(type, subtype): 938 err = None 939 if type is None: 940 if declare: 941 err = self.declare(sst, lnk, action) 942 else: 943 err = NotFound(text="no such queue: %s" % lnk.name) 944 else: 945 if assrt: 946 expected = lnk.options.get("node", {}).get("type") 947 if expected and type != expected: 948 err = AssertionFailed(text="expected %s, got %s" % (expected, type)) 949 if err is None: 950 action(type, subtype) 951 952 if err: 953 tgt = lnk.target 954 tgt.error = err 955 del self._attachments[tgt] 956 tgt.closed = True 957 return
958 self.resolve(sst, lnk.name, do_resolved, force=declare) 959
960 - def resolve(self, sst, name, action, force=False):
961 if not force: 962 try: 963 type, subtype = self.address_cache[name] 964 action(type, subtype) 965 return 966 except KeyError: 967 pass 968 969 args = [] 970 def do_result(r): 971 args.append(r)
972 def do_action(r): 973 do_result(r) 974 er, qr = args 975 if er.not_found and not qr.queue: 976 type, subtype = None, None 977 elif qr.queue: 978 type, subtype = "queue", None 979 else: 980 type, subtype = "topic", er.type 981 if type is not None: 982 self.address_cache[name] = (type, subtype) 983 action(type, subtype) 984 sst.write_query(ExchangeQuery(name), do_result) 985 sst.write_query(QueueQuery(name), do_action) 986
987 - def declare(self, sst, lnk, action):
988 name = lnk.name 989 props = lnk.options.get("node", {}) 990 durable = props.get("durable", DURABLE_DEFAULT) 991 type = props.get("type", "queue") 992 declare = props.get("x-declare", {}) 993 994 if type == "topic": 995 cmd = ExchangeDeclare(exchange=name, durable=durable) 996 bindings = get_bindings(props, exchange=name) 997 elif type == "queue": 998 cmd = QueueDeclare(queue=name, durable=durable) 999 bindings = get_bindings(props, queue=name) 1000 else: 1001 raise ValueError(type) 1002 1003 sst.apply_overrides(cmd, declare) 1004 1005 if type == "topic": 1006 if cmd.type is None: 1007 cmd.type = "topic" 1008 subtype = cmd.type 1009 else: 1010 subtype = None 1011 1012 cmds = [cmd] 1013 cmds.extend(bindings) 1014 1015 def declared(): 1016 self.address_cache[name] = (type, subtype) 1017 action(type, subtype)
1018 1019 sst.write_cmds(cmds, declared) 1020
1021 - def delete(self, sst, name, action):
1022 def deleted(): 1023 del self.address_cache[name] 1024 action()
1025 1026 def do_delete(type, subtype): 1027 if type == "topic": 1028 sst.write_cmd(ExchangeDelete(name), deleted) 1029 elif type == "queue": 1030 sst.write_cmd(QueueDelete(name), deleted) 1031 elif type is None: 1032 action() 1033 else: 1034 raise ValueError(type) 1035 self.resolve(sst, name, do_delete, force=True) 1036
1037 - def process(self, ssn):
1038 if ssn.closed or ssn.closing: return 1039 1040 sst = self._attachments[ssn] 1041 1042 while sst.outgoing_idx < len(ssn.outgoing): 1043 msg = ssn.outgoing[sst.outgoing_idx] 1044 snd = msg._sender 1045 # XXX: should check for sender error here 1046 _snd = self._attachments.get(snd) 1047 if _snd and snd.linked: 1048 self.send(snd, msg) 1049 sst.outgoing_idx += 1 1050 else: 1051 break 1052 1053 for snd in ssn.senders: 1054 # XXX: should included snd.acked in this 1055 if snd.synced >= snd.queued and sst.need_sync: 1056 sst.write_cmd(ExecutionSync(), sync_noop) 1057 1058 for rcv in ssn.receivers: 1059 self.process_receiver(rcv) 1060 1061 if ssn.acked: 1062 messages = ssn.acked[sst.acked_idx:] 1063 if messages: 1064 ids = RangedSet() 1065 1066 disposed = [(DEFAULT_DISPOSITION, [])] 1067 acked = [] 1068 for m in messages: 1069 # XXX: we're ignoring acks that get lost when disconnected, 1070 # could we deal this via some message-id based purge? 1071 if m._transfer_id is None: 1072 acked.append(m) 1073 continue 1074 ids.add(m._transfer_id) 1075 if m._receiver._accept_mode is accept_mode.explicit: 1076 disp = m._disposition or DEFAULT_DISPOSITION 1077 last, msgs = disposed[-1] 1078 if disp.type is last.type and disp.options == last.options: 1079 msgs.append(m) 1080 else: 1081 disposed.append((disp, [m])) 1082 else: 1083 acked.append(m) 1084 1085 for range in ids: 1086 sst.executed.add_range(range) 1087 sst.write_op(SessionCompleted(sst.executed)) 1088 1089 def ack_acker(msgs): 1090 def ack_ack(): 1091 for m in msgs: 1092 ssn.acked.remove(m) 1093 sst.acked_idx -= 1 1094 # XXX: should this check accept_mode too? 1095 if not ssn.transactional: 1096 sst.acked.remove(m)
1097 return ack_ack 1098 1099 for disp, msgs in disposed: 1100 if not msgs: continue 1101 if disp.type is None: 1102 op = MessageAccept 1103 elif disp.type is RELEASED: 1104 op = MessageRelease 1105 elif disp.type is REJECTED: 1106 op = MessageReject 1107 sst.write_cmd(op(RangedSet(*[m._transfer_id for m in msgs]), 1108 **disp.options), 1109 ack_acker(msgs)) 1110 if log.isEnabledFor(DEBUG): 1111 for m in msgs: 1112 log.debug("SACK[%s]: %s, %s", ssn.log_id, m, m._disposition) 1113 1114 sst.acked.extend(messages) 1115 sst.acked_idx += len(messages) 1116 ack_acker(acked)() 1117 1118 if ssn.committing and not sst.committing: 1119 def commit_ok(): 1120 del sst.acked[:] 1121 ssn.committing = False 1122 ssn.committed = True 1123 ssn.aborting = False 1124 ssn.aborted = False 1125 sst.committing = False 1126 sst.write_cmd(TxCommit(), commit_ok) 1127 sst.committing = True 1128 1129 if ssn.aborting and not sst.aborting: 1130 sst.aborting = True 1131 def do_rb(): 1132 messages = sst.acked + ssn.unacked + ssn.incoming 1133 ids = RangedSet(*[m._transfer_id for m in messages]) 1134 for range in ids: 1135 sst.executed.add_range(range) 1136 sst.write_op(SessionCompleted(sst.executed)) 1137 sst.write_cmd(MessageRelease(ids, True)) 1138 sst.write_cmd(TxRollback(), do_rb_ok) 1139 1140 def do_rb_ok(): 1141 del ssn.incoming[:] 1142 del ssn.unacked[:] 1143 del sst.acked[:] 1144 1145 for rcv in ssn.receivers: 1146 rcv.impending = rcv.received 1147 rcv.returned = rcv.received 1148 # XXX: do we need to update granted here as well? 1149 1150 for rcv in ssn.receivers: 1151 self.process_receiver(rcv) 1152 1153 ssn.aborting = False 1154 ssn.aborted = True 1155 ssn.committing = False 1156 ssn.committed = False 1157 sst.aborting = False 1158 1159 for rcv in ssn.receivers: 1160 _rcv = self._attachments[rcv] 1161 sst.write_cmd(MessageStop(_rcv.destination)) 1162 sst.write_cmd(ExecutionSync(), do_rb) 1163
1164 - def grant(self, rcv):
1165 sst = self._attachments[rcv.session] 1166 _rcv = self._attachments.get(rcv) 1167 if _rcv is None or not rcv.linked or _rcv.closing or _rcv.draining: 1168 return 1169 1170 if rcv.granted is UNLIMITED: 1171 if rcv.impending is UNLIMITED: 1172 delta = 0 1173 else: 1174 delta = UNLIMITED 1175 elif rcv.impending is UNLIMITED: 1176 delta = -1 1177 else: 1178 delta = max(rcv.granted, rcv.received) - rcv.impending 1179 1180 if delta is UNLIMITED: 1181 if not _rcv.bytes_open: 1182 sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.byte, UNLIMITED.value)) 1183 _rcv.bytes_open = True 1184 sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.message, UNLIMITED.value)) 1185 rcv.impending = UNLIMITED 1186 elif delta > 0: 1187 if not _rcv.bytes_open: 1188 sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.byte, UNLIMITED.value)) 1189 _rcv.bytes_open = True 1190 sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.message, delta)) 1191 rcv.impending += delta 1192 elif delta < 0 and not rcv.draining: 1193 _rcv.draining = True 1194 def do_stop(): 1195 rcv.impending = rcv.received 1196 _rcv.draining = False 1197 _rcv.bytes_open = False 1198 self.grant(rcv)
1199 sst.write_cmd(MessageStop(_rcv.destination), do_stop) 1200 1201 if rcv.draining: 1202 _rcv.draining = True 1203 def do_flush(): 1204 rcv.impending = rcv.received 1205 rcv.granted = rcv.impending 1206 _rcv.draining = False 1207 _rcv.bytes_open = False 1208 rcv.draining = False 1209 sst.write_cmd(MessageFlush(_rcv.destination), do_flush) 1210 1211
1212 - def process_receiver(self, rcv):
1213 if rcv.closed: return 1214 self.grant(rcv)
1215
1216 - def send(self, snd, msg):
1217 sst = self._attachments[snd.session] 1218 _snd = self._attachments[snd] 1219 1220 if msg.subject is None or _snd._exchange == "": 1221 rk = _snd._routing_key 1222 else: 1223 rk = msg.subject 1224 1225 if msg.subject is None: 1226 subject = _snd.subject 1227 else: 1228 subject = msg.subject 1229 1230 # XXX: do we need to query to figure out how to create the reply-to interoperably? 1231 if msg.reply_to: 1232 rt = addr2reply_to(msg.reply_to) 1233 else: 1234 rt = None 1235 content_encoding = msg.properties.get("x-amqp-0-10.content-encoding") 1236 dp = DeliveryProperties(routing_key=rk) 1237 mp = MessageProperties(message_id=msg.id, 1238 user_id=msg.user_id, 1239 reply_to=rt, 1240 correlation_id=msg.correlation_id, 1241 app_id = msg.properties.get("x-amqp-0-10.app-id"), 1242 content_type=msg.content_type, 1243 content_encoding=content_encoding, 1244 application_headers=msg.properties) 1245 if subject is not None: 1246 if mp.application_headers is None: 1247 mp.application_headers = {} 1248 mp.application_headers[SUBJECT] = subject 1249 if msg.durable is not None: 1250 if msg.durable: 1251 dp.delivery_mode = delivery_mode.persistent 1252 else: 1253 dp.delivery_mode = delivery_mode.non_persistent 1254 if msg.priority is not None: 1255 dp.priority = msg.priority 1256 if msg.ttl is not None: 1257 dp.ttl = long(msg.ttl*1000) 1258 enc, dec = get_codec(msg.content_type) 1259 body = enc(msg.content) 1260 1261 # XXX: this is not safe for out of order, can this be triggered by pre_ack? 1262 def msg_acked(): 1263 # XXX: should we log the ack somehow too? 1264 snd.acked += 1 1265 m = snd.session.outgoing.pop(0) 1266 sst.outgoing_idx -= 1 1267 log.debug("RACK[%s]: %s", sst.session.log_id, msg) 1268 assert msg == m
1269 1270 xfr = MessageTransfer(destination=_snd._exchange, headers=(dp, mp), 1271 payload=body) 1272 1273 if _snd.pre_ack: 1274 sst.write_cmd(xfr) 1275 else: 1276 sst.write_cmd(xfr, msg_acked, sync=msg._sync) 1277 1278 log.debug("SENT[%s]: %s", sst.session.log_id, msg) 1279 1280 if _snd.pre_ack: 1281 msg_acked() 1282
1283 - def do_message_transfer(self, xfr):
1284 sst = self.get_sst(xfr) 1285 ssn = sst.session 1286 1287 msg = self._decode(xfr) 1288 rcv = sst.destinations[xfr.destination].target 1289 msg._receiver = rcv 1290 if rcv.impending is not UNLIMITED: 1291 assert rcv.received < rcv.impending, "%s, %s" % (rcv.received, rcv.impending) 1292 rcv.received += 1 1293 log.debug("RCVD[%s]: %s", ssn.log_id, msg) 1294 ssn.incoming.append(msg)
1295
1296 - def _decode(self, xfr):
1297 dp = EMPTY_DP 1298 mp = EMPTY_MP 1299 1300 for h in xfr.headers: 1301 if isinstance(h, DeliveryProperties): 1302 dp = h 1303 elif isinstance(h, MessageProperties): 1304 mp = h 1305 1306 ap = mp.application_headers 1307 enc, dec = get_codec(mp.content_type) 1308 content = dec(xfr.payload) 1309 msg = Message(content) 1310 msg.id = mp.message_id 1311 if ap is not None: 1312 msg.subject = ap.get(SUBJECT) 1313 msg.user_id = mp.user_id 1314 if mp.reply_to is not None: 1315 msg.reply_to = reply_to2addr(mp.reply_to) 1316 msg.correlation_id = mp.correlation_id 1317 if dp.delivery_mode is not None: 1318 msg.durable = dp.delivery_mode == delivery_mode.persistent 1319 msg.priority = dp.priority 1320 if dp.ttl is not None: 1321 msg.ttl = dp.ttl/1000.0 1322 msg.redelivered = dp.redelivered 1323 msg.properties = mp.application_headers or {} 1324 if mp.app_id is not None: 1325 msg.properties["x-amqp-0-10.app-id"] = mp.app_id 1326 if mp.content_encoding is not None: 1327 msg.properties["x-amqp-0-10.content-encoding"] = mp.content_encoding 1328 if dp.routing_key is not None: 1329 msg.properties["x-amqp-0-10.routing-key"] = dp.routing_key 1330 if dp.timestamp is not None: 1331 msg.properties["x-amqp-0-10.timestamp"] = dp.timestamp 1332 msg.content_type = mp.content_type 1333 msg._transfer_id = xfr.id 1334 return msg
1335