Package qpid :: Package messaging :: Module driver
[hide private]
[frames] | no frames]

Source Code for Module qpid.messaging.driver

   1  # 
   2  # Licensed to the Apache Software Foundation (ASF) under one 
   3  # or more contributor license agreements.  See the NOTICE file 
   4  # distributed with this work for additional information 
   5  # regarding copyright ownership.  The ASF licenses this file 
   6  # to you under the Apache License, Version 2.0 (the 
   7  # "License"); you may not use this file except in compliance 
   8  # with the License.  You may obtain a copy of the License at 
   9  # 
  10  #   http://www.apache.org/licenses/LICENSE-2.0 
  11  # 
  12  # Unless required by applicable law or agreed to in writing, 
  13  # software distributed under the License is distributed on an 
  14  # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
  15  # KIND, either express or implied.  See the License for the 
  16  # specific language governing permissions and limitations 
  17  # under the License. 
  18  # 
  19   
  20  import socket, struct, sys, time 
  21  from logging import getLogger, DEBUG 
  22  from qpid import compat 
  23  from qpid import sasl 
  24  from qpid.concurrency import synchronized 
  25  from qpid.datatypes import RangedSet, Serial 
  26  from qpid.framing import OpEncoder, SegmentEncoder, FrameEncoder, \ 
  27      FrameDecoder, SegmentDecoder, OpDecoder 
  28  from qpid.messaging import address, transports 
  29  from qpid.messaging.constants import UNLIMITED, REJECTED, RELEASED 
  30  from qpid.messaging.exceptions import * 
  31  from qpid.messaging.message import get_codec, Disposition, Message 
  32  from qpid.ops import * 
  33  from qpid.selector import Selector 
  34  from qpid.util import URL, default 
  35  from qpid.validator import And, Context, List, Map, Types, Values 
  36  from threading import Condition, Thread 
  37   
  38  log = getLogger("qpid.messaging") 
  39  rawlog = getLogger("qpid.messaging.io.raw") 
  40  opslog = getLogger("qpid.messaging.io.ops") 
41 42 -def addr2reply_to(addr):
43 name, subject, options = address.parse(addr) 44 if options: 45 type = options.get("node", {}).get("type") 46 else: 47 type = None 48 49 if type == "topic": 50 return ReplyTo(name, subject) 51 else: 52 return ReplyTo(None, name)
53
54 -def reply_to2addr(reply_to):
55 if reply_to.exchange in (None, ""): 56 return reply_to.routing_key 57 elif reply_to.routing_key is None: 58 return "%s; {node: {type: topic}}" % reply_to.exchange 59 else: 60 return "%s/%s; {node: {type: topic}}" % (reply_to.exchange, reply_to.routing_key)
61
62 -class Attachment:
63
64 - def __init__(self, target):
65 self.target = target
66 67 # XXX 68 69 DURABLE_DEFAULT=False
70 71 # XXX 72 73 -class Pattern:
74 """ 75 The pattern filter matches the supplied wildcard pattern against a 76 message subject. 77 """ 78
79 - def __init__(self, value):
80 self.value = value
81 82 # XXX: this should become part of the driver
83 - def _bind(self, sst, exchange, queue):
84 from qpid.ops import ExchangeBind 85 86 sst.write_cmd(ExchangeBind(exchange=exchange, queue=queue, 87 binding_key=self.value.replace("*", "#")))
88 89 SUBJECT_DEFAULTS = { 90 "topic": "#" 91 } 92 93 # XXX 94 ppid = 0 95 try: 96 ppid = os.getppid() 97 except: 98 pass 99 100 CLIENT_PROPERTIES = {"product": "qpid python client", 101 "version": "development", 102 "platform": os.name, 103 "qpid.client_process": os.path.basename(sys.argv[0]), 104 "qpid.client_pid": os.getpid(), 105 "qpid.client_ppid": ppid}
106 107 -def noop(): pass
108 -def sync_noop(): pass
109
110 -class SessionState:
111
112 - def __init__(self, driver, session, name, channel):
113 self.driver = driver 114 self.session = session 115 self.name = name 116 self.channel = channel 117 self.detached = False 118 self.committing = False 119 self.aborting = False 120 121 # sender state 122 self.sent = Serial(0) 123 self.acknowledged = RangedSet() 124 self.actions = {} 125 self.min_completion = self.sent 126 self.max_completion = self.sent 127 self.results = {} 128 self.need_sync = False 129 130 # receiver state 131 self.received = None 132 self.executed = RangedSet() 133 134 # XXX: need to periodically exchange completion/known_completion 135 136 self.destinations = {}
137
138 - def write_query(self, query, handler):
139 id = self.sent 140 self.write_cmd(query, lambda: handler(self.results.pop(id)))
141
142 - def apply_overrides(self, cmd, overrides):
143 for k, v in overrides.items(): 144 cmd[k.replace('-', '_')] = v
145
146 - def write_cmd(self, cmd, action=noop, overrides=None, sync=True):
147 if overrides: 148 self.apply_overrides(cmd, overrides) 149 150 if action != noop: 151 cmd.sync = sync 152 if self.detached: 153 raise Exception("detached") 154 cmd.id = self.sent 155 self.sent += 1 156 self.actions[cmd.id] = action 157 self.max_completion = cmd.id 158 self.write_op(cmd) 159 self.need_sync = not cmd.sync
160
161 - def write_cmds(self, cmds, action=noop):
162 if cmds: 163 for cmd in cmds[:-1]: 164 self.write_cmd(cmd) 165 self.write_cmd(cmds[-1], action) 166 else: 167 action()
168
169 - def write_op(self, op):
170 op.channel = self.channel 171 self.driver.write_op(op)
172 173 POLICIES = Values("always", "sender", "receiver", "never") 174 RELIABILITY = Values("unreliable", "at-most-once", "at-least-once", 175 "exactly-once") 176 177 DECLARE = Map({}, restricted=False) 178 BINDINGS = List(Map({ 179 "exchange": Types(basestring), 180 "queue": Types(basestring), 181 "key": Types(basestring), 182 "arguments": Map({}, restricted=False) 183 })) 184 185 COMMON_OPTS = { 186 "create": POLICIES, 187 "delete": POLICIES, 188 "assert": POLICIES, 189 "node": Map({ 190 "type": Values("queue", "topic"), 191 "durable": Types(bool), 192 "x-declare": DECLARE, 193 "x-bindings": BINDINGS 194 }), 195 "link": Map({ 196 "name": Types(basestring), 197 "durable": Types(bool), 198 "reliability": RELIABILITY, 199 "x-declare": DECLARE, 200 "x-bindings": BINDINGS, 201 "x-subscribe": Map({}, restricted=False) 202 }) 203 } 204 205 RECEIVE_MODES = Values("browse", "consume") 206 207 SOURCE_OPTS = COMMON_OPTS.copy() 208 SOURCE_OPTS.update({ 209 "mode": RECEIVE_MODES 210 }) 211 212 TARGET_OPTS = COMMON_OPTS.copy()
213 214 -class LinkIn:
215 216 ADDR_NAME = "source" 217 DIR_NAME = "receiver" 218 VALIDATOR = Map(SOURCE_OPTS) 219 226 270 277
280
281 -class LinkOut:
282 283 ADDR_NAME = "target" 284 DIR_NAME = "sender" 285 VALIDATOR = Map(TARGET_OPTS) 286 290 304 307
310
311 -class Cache:
312
313 - def __init__(self, ttl):
314 self.ttl = ttl 315 self.entries = {}
316
317 - def __setitem__(self, key, value):
318 self.entries[key] = time.time(), value
319
320 - def __getitem__(self, key):
321 tstamp, value = self.entries[key] 322 if time.time() - tstamp >= self.ttl: 323 del self.entries[key] 324 raise KeyError(key) 325 else: 326 return value
327
328 - def __delitem__(self, key):
329 del self.entries[key]
330 331 # XXX 332 HEADER="!4s4B" 333 334 EMPTY_DP = DeliveryProperties() 335 EMPTY_MP = MessageProperties() 336 337 SUBJECT = "qpid.subject" 338 339 CLOSED = "CLOSED" 340 READ_ONLY = "READ_ONLY" 341 WRITE_ONLY = "WRITE_ONLY" 342 OPEN = "OPEN"
343 344 -class Driver:
345
346 - def __init__(self, connection):
347 self.connection = connection 348 self.log_id = "%x" % id(self.connection) 349 self._lock = self.connection._lock 350 351 self._selector = Selector.default() 352 self._attempts = 0 353 self._delay = self.connection.reconnect_interval_min 354 self._reconnect_log = self.connection.reconnect_log 355 self._host = 0 356 self._retrying = False 357 self._next_retry = None 358 self._transport = None 359 360 self._timeout = None 361 362 self.engine = None
363
364 - def _next_host(self):
365 urls = [URL(u) for u in self.connection.reconnect_urls] 366 hosts = [(self.connection.host, default(self.connection.port, 5672))] + \ 367 [(u.host, default(u.port, 5672)) for u in urls] 368 if self._host >= len(hosts): 369 self._host = 0 370 result = hosts[self._host] 371 if self._host == 0: 372 self._attempts += 1 373 self._host = self._host + 1 374 return result
375
376 - def _num_hosts(self):
377 return len(self.connection.reconnect_urls) + 1
378 379 @synchronized
380 - def wakeup(self):
381 self.dispatch() 382 self._selector.wakeup()
383
384 - def start(self):
385 self._selector.register(self)
386
387 - def stop(self):
388 self._selector.unregister(self) 389 if self._transport: 390 self.st_closed()
391
392 - def fileno(self):
393 return self._transport.fileno()
394 395 @synchronized
396 - def reading(self):
397 return self._transport is not None and \ 398 self._transport.reading(True)
399 400 @synchronized
401 - def writing(self):
402 return self._transport is not None and \ 403 self._transport.writing(self.engine.pending())
404 405 @synchronized
406 - def timing(self):
407 return self._timeout
408 409 @synchronized
410 - def readable(self):
411 try: 412 data = self._transport.recv(64*1024) 413 if data is None: 414 return 415 elif data: 416 rawlog.debug("READ[%s]: %r", self.log_id, data) 417 self.engine.write(data) 418 else: 419 self.close_engine() 420 except socket.error, e: 421 self.close_engine(ConnectionError(text=str(e))) 422 423 self.update_status() 424 425 self._notify()
426
427 - def _notify(self):
428 if self.connection.error: 429 self.connection._condition.gc() 430 self.connection._waiter.notifyAll()
431
432 - def close_engine(self, e=None):
433 if e is None: 434 e = ConnectionError(text="connection aborted") 435 436 if (self.connection.reconnect and 437 (self.connection.reconnect_limit is None or 438 self.connection.reconnect_limit <= 0 or 439 self._attempts <= self.connection.reconnect_limit)): 440 if self._host < self._num_hosts(): 441 delay = 0 442 else: 443 delay = self._delay 444 self._delay = min(2*self._delay, 445 self.connection.reconnect_interval_max) 446 self._next_retry = time.time() + delay 447 if self._reconnect_log: 448 log.warn("recoverable error[attempt %s]: %s" % (self._attempts, e)) 449 if delay > 0: 450 log.warn("sleeping %s seconds" % delay) 451 self._retrying = True 452 self.engine.close() 453 else: 454 self.engine.close(e) 455 456 self.schedule()
457
458 - def update_status(self):
459 status = self.engine.status() 460 return getattr(self, "st_%s" % status.lower())()
461
462 - def st_closed(self):
463 # XXX: this log statement seems to sometimes hit when the socket is not connected 464 # XXX: rawlog.debug("CLOSE[%s]: %s", self.log_id, self._socket.getpeername()) 465 self._transport.close() 466 self._transport = None 467 self.engine = None 468 return True
469
470 - def st_open(self):
471 return False
472 473 @synchronized
474 - def writeable(self):
475 notify = False 476 try: 477 n = self._transport.send(self.engine.peek()) 478 if n == 0: return 479 sent = self.engine.read(n) 480 rawlog.debug("SENT[%s]: %r", self.log_id, sent) 481 except socket.error, e: 482 self.close_engine(e) 483 notify = True 484 485 if self.update_status() or notify: 486 self._notify()
487 488 @synchronized
489 - def timeout(self):
490 self.dispatch() 491 self._notify() 492 self.schedule()
493
494 - def schedule(self):
495 times = [] 496 if self.connection.heartbeat: 497 times.append(time.time() + self.connection.heartbeat) 498 if self._next_retry: 499 times.append(self._next_retry) 500 if times: 501 self._timeout = min(times) 502 else: 503 self._timeout = None
504
505 - def dispatch(self):
506 try: 507 if self._transport is None: 508 if self.connection._connected and not self.connection.error: 509 self.connect() 510 else: 511 self.engine.dispatch() 512 except HeartbeatTimeout, e: 513 self.close_engine(e) 514 except: 515 # XXX: Does socket get leaked if this occurs? 516 msg = compat.format_exc() 517 self.connection.error = InternalError(text=msg)
518
519 - def connect(self):
520 if self._retrying and time.time() < self._next_retry: 521 return 522 523 try: 524 # XXX: should make this non blocking 525 host, port = self._next_host() 526 if self._retrying and self._reconnect_log: 527 log.warn("trying: %s:%s", host, port) 528 self.engine = Engine(self.connection) 529 self.engine.open() 530 rawlog.debug("OPEN[%s]: %s:%s", self.log_id, host, port) 531 trans = transports.TRANSPORTS.get(self.connection.transport) 532 if trans: 533 self._transport = trans(self.connection, host, port) 534 else: 535 raise ConnectError("no such transport: %s" % self.connection.transport) 536 if self._retrying and self._reconnect_log: 537 log.warn("reconnect succeeded: %s:%s", host, port) 538 self._next_retry = None 539 self._attempts = 0 540 self._delay = self.connection.reconnect_interval_min 541 self._retrying = False 542 self.schedule() 543 except socket.error, e: 544 self.close_engine(ConnectError(text=str(e)))
545 546 DEFAULT_DISPOSITION = Disposition(None)
547 548 -def get_bindings(opts, queue=None, exchange=None, key=None):
549 bindings = opts.get("x-bindings", []) 550 cmds = [] 551 for b in bindings: 552 exchange = b.get("exchange", exchange) 553 queue = b.get("queue", queue) 554 key = b.get("key", key) 555 args = b.get("arguments", {}) 556 cmds.append(ExchangeBind(queue, exchange, key, args)) 557 return cmds
558 559 CONNECTION_ERRS = { 560 # anythong not here (i.e. everything right now) will default to 561 # connection error 562 } 563 564 SESSION_ERRS = { 565 # anything not here will default to session error 566 error_code.unauthorized_access: UnauthorizedAccess, 567 error_code.not_found: NotFound, 568 error_code.resource_locked: ReceiverError, 569 error_code.resource_limit_exceeded: TargetCapacityExceeded, 570 error_code.internal_error: ServerError 571 }
572 573 -class Engine:
574
575 - def __init__(self, connection):
576 self.connection = connection 577 self.log_id = "%x" % id(self.connection) 578 self._closing = False 579 self._connected = False 580 self._attachments = {} 581 582 self._in = LinkIn() 583 self._out = LinkOut() 584 585 self._channel_max = 65536 586 self._channels = 0 587 self._sessions = {} 588 589 self.address_cache = Cache(self.connection.address_ttl) 590 591 self._status = CLOSED 592 self._buf = "" 593 self._hdr = "" 594 self._last_in = None 595 self._last_out = None 596 self._op_enc = OpEncoder() 597 self._seg_enc = SegmentEncoder() 598 self._frame_enc = FrameEncoder() 599 self._frame_dec = FrameDecoder() 600 self._seg_dec = SegmentDecoder() 601 self._op_dec = OpDecoder() 602 603 self._sasl = sasl.Client() 604 if self.connection.username: 605 self._sasl.setAttr("username", self.connection.username) 606 if self.connection.password: 607 self._sasl.setAttr("password", self.connection.password) 608 if self.connection.host: 609 self._sasl.setAttr("host", self.connection.host) 610 self._sasl.setAttr("service", self.connection.sasl_service) 611 if self.connection.sasl_min_ssf is not None: 612 self._sasl.setAttr("minssf", self.connection.sasl_min_ssf) 613 if self.connection.sasl_max_ssf is not None: 614 self._sasl.setAttr("maxssf", self.connection.sasl_max_ssf) 615 self._sasl.init() 616 self._sasl_encode = False 617 self._sasl_decode = False
618
619 - def _reset(self):
620 self.connection._transport_connected = False 621 622 for ssn in self.connection.sessions.values(): 623 for m in ssn.acked + ssn.unacked + ssn.incoming: 624 m._transfer_id = None 625 for snd in ssn.senders: 626 snd.linked = False 627 for rcv in ssn.receivers: 628 rcv.impending = rcv.received 629 rcv.linked = False
630
631 - def status(self):
632 return self._status
633
634 - def write(self, data):
635 self._last_in = time.time() 636 try: 637 if self._sasl_decode: 638 data = self._sasl.decode(data) 639 640 if len(self._hdr) < 8: 641 r = 8 - len(self._hdr) 642 self._hdr += data[:r] 643 data = data[r:] 644 645 if len(self._hdr) == 8: 646 self.do_header(self._hdr) 647 648 self._frame_dec.write(data) 649 self._seg_dec.write(*self._frame_dec.read()) 650 self._op_dec.write(*self._seg_dec.read()) 651 for op in self._op_dec.read(): 652 self.assign_id(op) 653 opslog.debug("RCVD[%s]: %r", self.log_id, op) 654 op.dispatch(self) 655 self.dispatch() 656 except MessagingError, e: 657 self.close(e) 658 except: 659 self.close(InternalError(text=compat.format_exc()))
660
661 - def close(self, e=None):
662 self._reset() 663 if e: 664 self.connection.error = e 665 self._status = CLOSED
666
667 - def assign_id(self, op):
668 if isinstance(op, Command): 669 sst = self.get_sst(op) 670 op.id = sst.received 671 sst.received += 1
672
673 - def pending(self):
674 return len(self._buf)
675
676 - def read(self, n):
677 result = self._buf[:n] 678 self._buf = self._buf[n:] 679 return result
680
681 - def peek(self):
682 return self._buf
683
684 - def write_op(self, op):
685 opslog.debug("SENT[%s]: %r", self.log_id, op) 686 self._op_enc.write(op) 687 self._seg_enc.write(*self._op_enc.read()) 688 self._frame_enc.write(*self._seg_enc.read()) 689 bytes = self._frame_enc.read() 690 if self._sasl_encode: 691 bytes = self._sasl.encode(bytes) 692 self._buf += bytes 693 self._last_out = time.time()
694
695 - def do_header(self, hdr):
696 cli_major = 0; cli_minor = 10 697 magic, _, _, major, minor = struct.unpack(HEADER, hdr) 698 if major != cli_major or minor != cli_minor: 699 raise VersionError(text="client: %s-%s, server: %s-%s" % 700 (cli_major, cli_minor, major, minor))
701
702 - def do_connection_start(self, start):
703 if self.connection.sasl_mechanisms: 704 permitted = self.connection.sasl_mechanisms.split() 705 mechs = [m for m in start.mechanisms if m in permitted] 706 else: 707 mechs = start.mechanisms 708 try: 709 mech, initial = self._sasl.start(" ".join(mechs)) 710 except sasl.SASLError, e: 711 raise AuthenticationFailure(text=str(e)) 712 713 client_properties = CLIENT_PROPERTIES.copy() 714 client_properties.update(self.connection.client_properties) 715 self.write_op(ConnectionStartOk(client_properties=client_properties, 716 mechanism=mech, response=initial))
717
718 - def do_connection_secure(self, secure):
719 resp = self._sasl.step(secure.challenge) 720 self.write_op(ConnectionSecureOk(response=resp))
721
722 - def do_connection_tune(self, tune):
723 # XXX: is heartbeat protocol specific? 724 if tune.channel_max is not None: 725 self.channel_max = tune.channel_max 726 self.write_op(ConnectionTuneOk(heartbeat=self.connection.heartbeat, 727 channel_max=self.channel_max)) 728 self.write_op(ConnectionOpen()) 729 self._sasl_encode = True
730
731 - def do_connection_open_ok(self, open_ok):
732 self.connection.auth_username = self._sasl.auth_username() 733 self._connected = True 734 self._sasl_decode = True 735 self.connection._transport_connected = True
736
737 - def do_connection_heartbeat(self, hrt):
738 pass
739
740 - def do_connection_close(self, close):
741 self.write_op(ConnectionCloseOk()) 742 if close.reply_code != close_code.normal: 743 exc = CONNECTION_ERRS.get(close.reply_code, ConnectionError) 744 self.connection.error = exc(close.reply_code, close.reply_text)
745 # XXX: should we do a half shutdown on the socket here? 746 # XXX: we really need to test this, we may end up reporting a 747 # connection abort after this, if we were to do a shutdown on read 748 # and stop reading, then we wouldn't report the abort, that's 749 # probably the right thing to do 750
751 - def do_connection_close_ok(self, close_ok):
752 self.close()
753
754 - def do_session_attached(self, atc):
755 pass
756
757 - def do_session_command_point(self, cp):
758 sst = self.get_sst(cp) 759 sst.received = cp.command_id
760
761 - def do_session_completed(self, sc):
762 sst = self.get_sst(sc) 763 for r in sc.commands: 764 sst.acknowledged.add(r.lower, r.upper) 765 766 if not sc.commands.empty(): 767 while sst.min_completion in sc.commands: 768 if sst.actions.has_key(sst.min_completion): 769 sst.actions.pop(sst.min_completion)() 770 sst.min_completion += 1
771
772 - def session_known_completed(self, kcmp):
773 sst = self.get_sst(kcmp) 774 executed = RangedSet() 775 for e in sst.executed.ranges: 776 for ke in kcmp.ranges: 777 if e.lower in ke and e.upper in ke: 778 break 779 else: 780 executed.add_range(e) 781 sst.executed = completed
782
783 - def do_session_flush(self, sf):
784 sst = self.get_sst(sf) 785 if sf.expected: 786 if sst.received is None: 787 exp = None 788 else: 789 exp = RangedSet(sst.received) 790 sst.write_op(SessionExpected(exp)) 791 if sf.confirmed: 792 sst.write_op(SessionConfirmed(sst.executed)) 793 if sf.completed: 794 sst.write_op(SessionCompleted(sst.executed))
795
796 - def do_session_request_timeout(self, rt):
797 sst = self.get_sst(rt) 798 sst.write_op(SessionTimeout(timeout=0))
799
800 - def do_execution_result(self, er):
801 sst = self.get_sst(er) 802 sst.results[er.command_id] = er.value 803 sst.executed.add(er.id)
804
805 - def do_execution_exception(self, ex):
806 sst = self.get_sst(ex) 807 exc = SESSION_ERRS.get(ex.error_code, SessionError) 808 sst.session.error = exc(ex.error_code, ex.description)
809
810 - def dispatch(self):
811 if not self.connection._connected and not self._closing and self._status != CLOSED: 812 self.disconnect() 813 814 if self._connected and not self._closing: 815 for ssn in self.connection.sessions.values(): 816 self.attach(ssn) 817 self.process(ssn) 818 819 if self.connection.heartbeat and self._status != CLOSED: 820 now = time.time() 821 if self._last_in is not None and \ 822 now - self._last_in > 2*self.connection.heartbeat: 823 raise HeartbeatTimeout(text="heartbeat timeout") 824 if self._last_out is None or now - self._last_out >= self.connection.heartbeat/2.0: 825 self.write_op(ConnectionHeartbeat())
826
827 - def open(self):
828 self._reset() 829 self._status = OPEN 830 self._buf += struct.pack(HEADER, "AMQP", 1, 1, 0, 10)
831
832 - def disconnect(self):
833 self.write_op(ConnectionClose(close_code.normal)) 834 self._closing = True
835
836 - def attach(self, ssn):
837 if ssn.closed: return 838 sst = self._attachments.get(ssn) 839 if sst is None: 840 for i in xrange(0, self.channel_max): 841 if not self._sessions.has_key(i): 842 ch = i 843 break 844 else: 845 raise RuntimeError("all channels used") 846 sst = SessionState(self, ssn, ssn.name, ch) 847 sst.write_op(SessionAttach(name=ssn.name)) 848 sst.write_op(SessionCommandPoint(sst.sent, 0)) 849 sst.outgoing_idx = 0 850 sst.acked = [] 851 sst.acked_idx = 0 852 if ssn.transactional: 853 sst.write_cmd(TxSelect()) 854 self._attachments[ssn] = sst 855 self._sessions[sst.channel] = sst 856 857 for snd in ssn.senders: 858 self.link(snd, self._out, snd.target) 859 for rcv in ssn.receivers: 860 self.link(rcv, self._in, rcv.source) 861 862 if sst is not None and ssn.closing and not sst.detached: 863 sst.detached = True 864 sst.write_op(SessionDetach(name=ssn.name))
865
866 - def get_sst(self, op):
867 return self._sessions[op.channel]
868
869 - def do_session_detached(self, dtc):
870 sst = self._sessions.pop(dtc.channel) 871 ssn = sst.session 872 del self._attachments[ssn] 873 ssn.closed = True
874
875 - def do_session_detach(self, dtc):
876 sst = self.get_sst(dtc) 877 sst.write_op(SessionDetached(name=dtc.name)) 878 self.do_session_detached(dtc)
879 897 898 def resolved(type, subtype): 899 dir.do_link(sst, lnk, _lnk, type, subtype, linked)
900 901 self.resolve_declare(sst, _lnk, dir.DIR_NAME, resolved) 902 self._attachments[lnk] = _lnk 903 904 if lnk.linked and lnk.closing and not lnk.closed: 905 if not _lnk.closing: 906 def unlinked(): 907 dir.del_link(sst, lnk, _lnk) 908 del self._attachments[lnk] 909 lnk.closed = True 910 if _lnk.options.get("delete") in ("always", dir.DIR_NAME): 911 dir.do_unlink(sst, lnk, _lnk) 912 self.delete(sst, _lnk.name, unlinked) 913 else: 914 dir.do_unlink(sst, lnk, _lnk, unlinked) 915 _lnk.closing = True 916 elif not lnk.linked and lnk.closing and not lnk.closed: 917 if lnk.error: lnk.closed = True 918
919 - def parse_address(self, lnk, dir, addr):
920 if addr is None: 921 return MalformedAddress(text="%s is None" % dir.ADDR_NAME) 922 else: 923 try: 924 lnk.name, lnk.subject, lnk.options = address.parse(addr) 925 # XXX: subject 926 if lnk.options is None: 927 lnk.options = {} 928 except address.LexError, e: 929 return MalformedAddress(text=str(e)) 930 except address.ParseError, e: 931 return MalformedAddress(text=str(e))
932
933 - def validate_options(self, lnk, dir):
934 ctx = Context() 935 err = dir.VALIDATOR.validate(lnk.options, ctx) 936 if err: return InvalidOption(text="error in options: %s" % err)
937
938 - def resolve_declare(self, sst, lnk, dir, action):
939 declare = lnk.options.get("create") in ("always", dir) 940 assrt = lnk.options.get("assert") in ("always", dir) 941 def do_resolved(type, subtype): 942 err = None 943 if type is None: 944 if declare: 945 err = self.declare(sst, lnk, action) 946 else: 947 err = NotFound(text="no such queue: %s" % lnk.name) 948 else: 949 if assrt: 950 expected = lnk.options.get("node", {}).get("type") 951 if expected and type != expected: 952 err = AssertionFailed(text="expected %s, got %s" % (expected, type)) 953 if err is None: 954 action(type, subtype) 955 956 if err: 957 tgt = lnk.target 958 tgt.error = err 959 del self._attachments[tgt] 960 tgt.closed = True 961 return
962 self.resolve(sst, lnk.name, do_resolved, force=declare) 963
964 - def resolve(self, sst, name, action, force=False):
965 if not force: 966 try: 967 type, subtype = self.address_cache[name] 968 action(type, subtype) 969 return 970 except KeyError: 971 pass 972 973 args = [] 974 def do_result(r): 975 args.append(r)
976 def do_action(r): 977 do_result(r) 978 er, qr = args 979 if er.not_found and not qr.queue: 980 type, subtype = None, None 981 elif qr.queue: 982 type, subtype = "queue", None 983 else: 984 type, subtype = "topic", er.type 985 if type is not None: 986 self.address_cache[name] = (type, subtype) 987 action(type, subtype) 988 sst.write_query(ExchangeQuery(name), do_result) 989 sst.write_query(QueueQuery(name), do_action) 990
991 - def declare(self, sst, lnk, action):
992 name = lnk.name 993 props = lnk.options.get("node", {}) 994 durable = props.get("durable", DURABLE_DEFAULT) 995 type = props.get("type", "queue") 996 declare = props.get("x-declare", {}) 997 998 if type == "topic": 999 cmd = ExchangeDeclare(exchange=name, durable=durable) 1000 bindings = get_bindings(props, exchange=name) 1001 elif type == "queue": 1002 cmd = QueueDeclare(queue=name, durable=durable) 1003 bindings = get_bindings(props, queue=name) 1004 else: 1005 raise ValueError(type) 1006 1007 sst.apply_overrides(cmd, declare) 1008 1009 if type == "topic": 1010 if cmd.type is None: 1011 cmd.type = "topic" 1012 subtype = cmd.type 1013 else: 1014 subtype = None 1015 1016 cmds = [cmd] 1017 cmds.extend(bindings) 1018 1019 def declared(): 1020 self.address_cache[name] = (type, subtype) 1021 action(type, subtype)
1022 1023 sst.write_cmds(cmds, declared) 1024
1025 - def delete(self, sst, name, action):
1026 def deleted(): 1027 del self.address_cache[name] 1028 action()
1029 1030 def do_delete(type, subtype): 1031 if type == "topic": 1032 sst.write_cmd(ExchangeDelete(name), deleted) 1033 elif type == "queue": 1034 sst.write_cmd(QueueDelete(name), deleted) 1035 elif type is None: 1036 action() 1037 else: 1038 raise ValueError(type) 1039 self.resolve(sst, name, do_delete, force=True) 1040
1041 - def process(self, ssn):
1042 if ssn.closed or ssn.closing: return 1043 1044 sst = self._attachments[ssn] 1045 1046 while sst.outgoing_idx < len(ssn.outgoing): 1047 msg = ssn.outgoing[sst.outgoing_idx] 1048 snd = msg._sender 1049 # XXX: should check for sender error here 1050 _snd = self._attachments.get(snd) 1051 if _snd and snd.linked: 1052 self.send(snd, msg) 1053 sst.outgoing_idx += 1 1054 else: 1055 break 1056 1057 for snd in ssn.senders: 1058 # XXX: should included snd.acked in this 1059 if snd.synced >= snd.queued and sst.need_sync: 1060 sst.write_cmd(ExecutionSync(), sync_noop) 1061 1062 for rcv in ssn.receivers: 1063 self.process_receiver(rcv) 1064 1065 if ssn.acked: 1066 messages = ssn.acked[sst.acked_idx:] 1067 if messages: 1068 ids = RangedSet() 1069 1070 disposed = [(DEFAULT_DISPOSITION, [])] 1071 acked = [] 1072 for m in messages: 1073 # XXX: we're ignoring acks that get lost when disconnected, 1074 # could we deal this via some message-id based purge? 1075 if m._transfer_id is None: 1076 acked.append(m) 1077 continue 1078 ids.add(m._transfer_id) 1079 if m._receiver._accept_mode is accept_mode.explicit: 1080 disp = m._disposition or DEFAULT_DISPOSITION 1081 last, msgs = disposed[-1] 1082 if disp.type is last.type and disp.options == last.options: 1083 msgs.append(m) 1084 else: 1085 disposed.append((disp, [m])) 1086 else: 1087 acked.append(m) 1088 1089 for range in ids: 1090 sst.executed.add_range(range) 1091 sst.write_op(SessionCompleted(sst.executed)) 1092 1093 def ack_acker(msgs): 1094 def ack_ack(): 1095 for m in msgs: 1096 ssn.acked.remove(m) 1097 sst.acked_idx -= 1 1098 # XXX: should this check accept_mode too? 1099 if not ssn.transactional: 1100 sst.acked.remove(m)
1101 return ack_ack 1102 1103 for disp, msgs in disposed: 1104 if not msgs: continue 1105 if disp.type is None: 1106 op = MessageAccept 1107 elif disp.type is RELEASED: 1108 op = MessageRelease 1109 elif disp.type is REJECTED: 1110 op = MessageReject 1111 sst.write_cmd(op(RangedSet(*[m._transfer_id for m in msgs]), 1112 **disp.options), 1113 ack_acker(msgs)) 1114 if log.isEnabledFor(DEBUG): 1115 for m in msgs: 1116 log.debug("SACK[%s]: %s, %s", ssn.log_id, m, m._disposition) 1117 1118 sst.acked.extend(messages) 1119 sst.acked_idx += len(messages) 1120 ack_acker(acked)() 1121 1122 if ssn.committing and not sst.committing: 1123 def commit_ok(): 1124 del sst.acked[:] 1125 ssn.committing = False 1126 ssn.committed = True 1127 ssn.aborting = False 1128 ssn.aborted = False 1129 sst.committing = False 1130 sst.write_cmd(TxCommit(), commit_ok) 1131 sst.committing = True 1132 1133 if ssn.aborting and not sst.aborting: 1134 sst.aborting = True 1135 def do_rb(): 1136 messages = sst.acked + ssn.unacked + ssn.incoming 1137 ids = RangedSet(*[m._transfer_id for m in messages]) 1138 for range in ids: 1139 sst.executed.add_range(range) 1140 sst.write_op(SessionCompleted(sst.executed)) 1141 sst.write_cmd(MessageRelease(ids, True)) 1142 sst.write_cmd(TxRollback(), do_rb_ok) 1143 1144 def do_rb_ok(): 1145 del ssn.incoming[:] 1146 del ssn.unacked[:] 1147 del sst.acked[:] 1148 1149 for rcv in ssn.receivers: 1150 rcv.impending = rcv.received 1151 rcv.returned = rcv.received 1152 # XXX: do we need to update granted here as well? 1153 1154 for rcv in ssn.receivers: 1155 self.process_receiver(rcv) 1156 1157 ssn.aborting = False 1158 ssn.aborted = True 1159 ssn.committing = False 1160 ssn.committed = False 1161 sst.aborting = False 1162 1163 for rcv in ssn.receivers: 1164 _rcv = self._attachments[rcv] 1165 sst.write_cmd(MessageStop(_rcv.destination)) 1166 sst.write_cmd(ExecutionSync(), do_rb) 1167
1168 - def grant(self, rcv):
1169 sst = self._attachments[rcv.session] 1170 _rcv = self._attachments.get(rcv) 1171 if _rcv is None or not rcv.linked or _rcv.closing or _rcv.draining: 1172 return 1173 1174 if rcv.granted is UNLIMITED: 1175 if rcv.impending is UNLIMITED: 1176 delta = 0 1177 else: 1178 delta = UNLIMITED 1179 elif rcv.impending is UNLIMITED: 1180 delta = -1 1181 else: 1182 delta = max(rcv.granted, rcv.received) - rcv.impending 1183 1184 if delta is UNLIMITED: 1185 if not _rcv.bytes_open: 1186 sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.byte, UNLIMITED.value)) 1187 _rcv.bytes_open = True 1188 sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.message, UNLIMITED.value)) 1189 rcv.impending = UNLIMITED 1190 elif delta > 0: 1191 if not _rcv.bytes_open: 1192 sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.byte, UNLIMITED.value)) 1193 _rcv.bytes_open = True 1194 sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.message, delta)) 1195 rcv.impending += delta 1196 elif delta < 0 and not rcv.draining: 1197 _rcv.draining = True 1198 def do_stop(): 1199 rcv.impending = rcv.received 1200 _rcv.draining = False 1201 _rcv.bytes_open = False 1202 self.grant(rcv)
1203 sst.write_cmd(MessageStop(_rcv.destination), do_stop) 1204 1205 if rcv.draining: 1206 _rcv.draining = True 1207 def do_flush(): 1208 rcv.impending = rcv.received 1209 rcv.granted = rcv.impending 1210 _rcv.draining = False 1211 _rcv.bytes_open = False 1212 rcv.draining = False 1213 sst.write_cmd(MessageFlush(_rcv.destination), do_flush) 1214 1215
1216 - def process_receiver(self, rcv):
1217 if rcv.closed: return 1218 self.grant(rcv)
1219
1220 - def send(self, snd, msg):
1221 sst = self._attachments[snd.session] 1222 _snd = self._attachments[snd] 1223 1224 if msg.subject is None or _snd._exchange == "": 1225 rk = _snd._routing_key 1226 else: 1227 rk = msg.subject 1228 1229 if msg.subject is None: 1230 subject = _snd.subject 1231 else: 1232 subject = msg.subject 1233 1234 # XXX: do we need to query to figure out how to create the reply-to interoperably? 1235 if msg.reply_to: 1236 rt = addr2reply_to(msg.reply_to) 1237 else: 1238 rt = None 1239 content_encoding = msg.properties.get("x-amqp-0-10.content-encoding") 1240 dp = DeliveryProperties(routing_key=rk) 1241 mp = MessageProperties(message_id=msg.id, 1242 user_id=msg.user_id, 1243 reply_to=rt, 1244 correlation_id=msg.correlation_id, 1245 app_id = msg.properties.get("x-amqp-0-10.app-id"), 1246 content_type=msg.content_type, 1247 content_encoding=content_encoding, 1248 application_headers=msg.properties) 1249 if subject is not None: 1250 if mp.application_headers is None: 1251 mp.application_headers = {} 1252 mp.application_headers[SUBJECT] = subject 1253 if msg.durable is not None: 1254 if msg.durable: 1255 dp.delivery_mode = delivery_mode.persistent 1256 else: 1257 dp.delivery_mode = delivery_mode.non_persistent 1258 if msg.priority is not None: 1259 dp.priority = msg.priority 1260 if msg.ttl is not None: 1261 dp.ttl = long(msg.ttl*1000) 1262 enc, dec = get_codec(msg.content_type) 1263 body = enc(msg.content) 1264 1265 # XXX: this is not safe for out of order, can this be triggered by pre_ack? 1266 def msg_acked(): 1267 # XXX: should we log the ack somehow too? 1268 snd.acked += 1 1269 m = snd.session.outgoing.pop(0) 1270 sst.outgoing_idx -= 1 1271 log.debug("RACK[%s]: %s", sst.session.log_id, msg) 1272 assert msg == m
1273 1274 xfr = MessageTransfer(destination=_snd._exchange, headers=(dp, mp), 1275 payload=body) 1276 1277 if _snd.pre_ack: 1278 sst.write_cmd(xfr) 1279 else: 1280 sst.write_cmd(xfr, msg_acked, sync=msg._sync) 1281 1282 log.debug("SENT[%s]: %s", sst.session.log_id, msg) 1283 1284 if _snd.pre_ack: 1285 msg_acked() 1286
1287 - def do_message_transfer(self, xfr):
1288 sst = self.get_sst(xfr) 1289 ssn = sst.session 1290 1291 msg = self._decode(xfr) 1292 rcv = sst.destinations[xfr.destination].target 1293 msg._receiver = rcv 1294 if rcv.impending is not UNLIMITED: 1295 assert rcv.received < rcv.impending, "%s, %s" % (rcv.received, rcv.impending) 1296 rcv.received += 1 1297 log.debug("RCVD[%s]: %s", ssn.log_id, msg) 1298 ssn.incoming.append(msg)
1299
1300 - def _decode(self, xfr):
1301 dp = EMPTY_DP 1302 mp = EMPTY_MP 1303 1304 for h in xfr.headers: 1305 if isinstance(h, DeliveryProperties): 1306 dp = h 1307 elif isinstance(h, MessageProperties): 1308 mp = h 1309 1310 ap = mp.application_headers 1311 enc, dec = get_codec(mp.content_type) 1312 content = dec(xfr.payload) 1313 msg = Message(content) 1314 msg.id = mp.message_id 1315 if ap is not None: 1316 msg.subject = ap.get(SUBJECT) 1317 msg.user_id = mp.user_id 1318 if mp.reply_to is not None: 1319 msg.reply_to = reply_to2addr(mp.reply_to) 1320 msg.correlation_id = mp.correlation_id 1321 if dp.delivery_mode is not None: 1322 msg.durable = dp.delivery_mode == delivery_mode.persistent 1323 msg.priority = dp.priority 1324 if dp.ttl is not None: 1325 msg.ttl = dp.ttl/1000.0 1326 msg.redelivered = dp.redelivered 1327 msg.properties = mp.application_headers or {} 1328 if mp.app_id is not None: 1329 msg.properties["x-amqp-0-10.app-id"] = mp.app_id 1330 if mp.content_encoding is not None: 1331 msg.properties["x-amqp-0-10.content-encoding"] = mp.content_encoding 1332 if dp.routing_key is not None: 1333 msg.properties["x-amqp-0-10.routing-key"] = dp.routing_key 1334 if dp.timestamp is not None: 1335 msg.properties["x-amqp-0-10.timestamp"] = dp.timestamp 1336 msg.content_type = mp.content_type 1337 msg._transfer_id = xfr.id 1338 return msg
1339