Package qpid :: Package messaging :: Module driver
[hide private]
[frames] | no frames]

Source Code for Module qpid.messaging.driver

   1  # 
   2  # Licensed to the Apache Software Foundation (ASF) under one 
   3  # or more contributor license agreements.  See the NOTICE file 
   4  # distributed with this work for additional information 
   5  # regarding copyright ownership.  The ASF licenses this file 
   6  # to you under the Apache License, Version 2.0 (the 
   7  # "License"); you may not use this file except in compliance 
   8  # with the License.  You may obtain a copy of the License at 
   9  # 
  10  #   http://www.apache.org/licenses/LICENSE-2.0 
  11  # 
  12  # Unless required by applicable law or agreed to in writing, 
  13  # software distributed under the License is distributed on an 
  14  # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
  15  # KIND, either express or implied.  See the License for the 
  16  # specific language governing permissions and limitations 
  17  # under the License. 
  18  # 
  19   
  20  import socket, struct, sys, time 
  21  from logging import getLogger, DEBUG 
  22  from qpid import compat 
  23  from qpid import sasl 
  24  from qpid.concurrency import synchronized 
  25  from qpid.datatypes import RangedSet, Serial 
  26  from qpid.framing import OpEncoder, SegmentEncoder, FrameEncoder, \ 
  27      FrameDecoder, SegmentDecoder, OpDecoder 
  28  from qpid.messaging import address, transports 
  29  from qpid.messaging.constants import UNLIMITED, REJECTED, RELEASED 
  30  from qpid.messaging.exceptions import * 
  31  from qpid.messaging.message import get_codec, Disposition, Message 
  32  from qpid.ops import * 
  33  from qpid.selector import Selector 
  34  from qpid.util import URL, default 
  35  from qpid.validator import And, Context, List, Map, Types, Values 
  36  from threading import Condition, Thread 
  37   
  38  log = getLogger("qpid.messaging") 
  39  rawlog = getLogger("qpid.messaging.io.raw") 
  40  opslog = getLogger("qpid.messaging.io.ops") 
41 42 -def addr2reply_to(addr):
43 name, subject, options = address.parse(addr) 44 if options: 45 type = options.get("node", {}).get("type") 46 else: 47 type = None 48 49 if type == "topic": 50 return ReplyTo(name, subject) 51 else: 52 return ReplyTo(None, name)
53
54 -def reply_to2addr(reply_to):
55 if reply_to.exchange in (None, ""): 56 return reply_to.routing_key 57 elif reply_to.routing_key is None: 58 return "%s; {node: {type: topic}}" % reply_to.exchange 59 else: 60 return "%s/%s; {node: {type: topic}}" % (reply_to.exchange, reply_to.routing_key)
61
62 -class Attachment:
63
64 - def __init__(self, target):
65 self.target = target
66 67 # XXX 68 69 DURABLE_DEFAULT=False
70 71 # XXX 72 73 -class Pattern:
74 """ 75 The pattern filter matches the supplied wildcard pattern against a 76 message subject. 77 """ 78
79 - def __init__(self, value):
80 self.value = value
81 82 # XXX: this should become part of the driver
83 - def _bind(self, sst, exchange, queue):
84 from qpid.ops import ExchangeBind 85 86 sst.write_cmd(ExchangeBind(exchange=exchange, queue=queue, 87 binding_key=self.value.replace("*", "#")))
88 89 SUBJECT_DEFAULTS = { 90 "topic": "#" 91 } 92 93 # XXX 94 ppid = 0 95 try: 96 ppid = os.getppid() 97 except: 98 pass 99 100 CLIENT_PROPERTIES = {"product": "qpid python client", 101 "version": "development", 102 "platform": os.name, 103 "qpid.client_process": os.path.basename(sys.argv[0]), 104 "qpid.client_pid": os.getpid(), 105 "qpid.client_ppid": ppid}
106 107 -def noop(): pass
108 -def sync_noop(): pass
109
110 -class SessionState:
111
112 - def __init__(self, driver, session, name, channel):
113 self.driver = driver 114 self.session = session 115 self.name = name 116 self.channel = channel 117 self.detached = False 118 self.committing = False 119 self.aborting = False 120 121 # sender state 122 self.sent = Serial(0) 123 self.acknowledged = RangedSet() 124 self.actions = {} 125 self.min_completion = self.sent 126 self.max_completion = self.sent 127 self.results = {} 128 self.need_sync = False 129 130 # receiver state 131 self.received = None 132 self.executed = RangedSet() 133 134 # XXX: need to periodically exchange completion/known_completion 135 136 self.destinations = {}
137
138 - def write_query(self, query, handler):
139 id = self.sent 140 self.write_cmd(query, lambda: handler(self.results.pop(id)))
141
142 - def apply_overrides(self, cmd, overrides):
143 for k, v in overrides.items(): 144 cmd[k.replace('-', '_')] = v
145
146 - def write_cmd(self, cmd, action=noop, overrides=None, sync=True):
147 if overrides: 148 self.apply_overrides(cmd, overrides) 149 150 if action != noop: 151 cmd.sync = sync 152 if self.detached: 153 raise Exception("detached") 154 cmd.id = self.sent 155 self.sent += 1 156 self.actions[cmd.id] = action 157 self.max_completion = cmd.id 158 self.write_op(cmd) 159 self.need_sync = not cmd.sync
160
161 - def write_cmds(self, cmds, action=noop):
162 if cmds: 163 for cmd in cmds[:-1]: 164 self.write_cmd(cmd) 165 self.write_cmd(cmds[-1], action) 166 else: 167 action()
168
169 - def write_op(self, op):
170 op.channel = self.channel 171 self.driver.write_op(op)
172 173 POLICIES = Values("always", "sender", "receiver", "never") 174 RELIABILITY = Values("unreliable", "at-most-once", "at-least-once", 175 "exactly-once") 176 177 DECLARE = Map({}, restricted=False) 178 BINDINGS = List(Map({ 179 "exchange": Types(basestring), 180 "queue": Types(basestring), 181 "key": Types(basestring), 182 "arguments": Map({}, restricted=False) 183 })) 184 185 COMMON_OPTS = { 186 "create": POLICIES, 187 "delete": POLICIES, 188 "assert": POLICIES, 189 "node": Map({ 190 "type": Values("queue", "topic"), 191 "durable": Types(bool), 192 "x-declare": DECLARE, 193 "x-bindings": BINDINGS 194 }), 195 "link": Map({ 196 "name": Types(basestring), 197 "durable": Types(bool), 198 "reliability": RELIABILITY, 199 "x-declare": DECLARE, 200 "x-bindings": BINDINGS, 201 "x-subscribe": Map({}, restricted=False) 202 }) 203 } 204 205 RECEIVE_MODES = Values("browse", "consume") 206 207 SOURCE_OPTS = COMMON_OPTS.copy() 208 SOURCE_OPTS.update({ 209 "mode": RECEIVE_MODES 210 }) 211 212 TARGET_OPTS = COMMON_OPTS.copy()
213 214 -class LinkIn:
215 216 ADDR_NAME = "source" 217 DIR_NAME = "receiver" 218 VALIDATOR = Map(SOURCE_OPTS) 219 226 266 273
276
277 -class LinkOut:
278 279 ADDR_NAME = "target" 280 DIR_NAME = "sender" 281 VALIDATOR = Map(TARGET_OPTS) 282 286 300 303
306
307 -class Cache:
308
309 - def __init__(self, ttl):
310 self.ttl = ttl 311 self.entries = {}
312
313 - def __setitem__(self, key, value):
314 self.entries[key] = time.time(), value
315
316 - def __getitem__(self, key):
317 tstamp, value = self.entries[key] 318 if time.time() - tstamp >= self.ttl: 319 del self.entries[key] 320 raise KeyError(key) 321 else: 322 return value
323
324 - def __delitem__(self, key):
325 del self.entries[key]
326 327 # XXX 328 HEADER="!4s4B" 329 330 EMPTY_DP = DeliveryProperties() 331 EMPTY_MP = MessageProperties() 332 333 SUBJECT = "qpid.subject" 334 335 CLOSED = "CLOSED" 336 READ_ONLY = "READ_ONLY" 337 WRITE_ONLY = "WRITE_ONLY" 338 OPEN = "OPEN"
339 340 -class Driver:
341
342 - def __init__(self, connection):
343 self.connection = connection 344 self.log_id = "%x" % id(self.connection) 345 self._lock = self.connection._lock 346 347 self._selector = Selector.default() 348 self._attempts = 0 349 self._delay = self.connection.reconnect_interval_min 350 self._reconnect_log = self.connection.reconnect_log 351 self._host = 0 352 self._retrying = False 353 self._next_retry = None 354 self._transport = None 355 356 self._timeout = None 357 358 self.engine = None
359
360 - def _next_host(self):
361 urls = [URL(u) for u in self.connection.reconnect_urls] 362 hosts = [(self.connection.host, default(self.connection.port, 5672))] + \ 363 [(u.host, default(u.port, 5672)) for u in urls] 364 if self._host >= len(hosts): 365 self._host = 0 366 result = hosts[self._host] 367 if self._host == 0: 368 self._attempts += 1 369 self._host = self._host + 1 370 return result
371
372 - def _num_hosts(self):
373 return len(self.connection.reconnect_urls) + 1
374 375 @synchronized
376 - def wakeup(self):
377 self.dispatch() 378 self._selector.wakeup()
379
380 - def start(self):
381 self._selector.register(self)
382
383 - def stop(self):
384 self._selector.unregister(self) 385 if self._transport: 386 self.st_closed()
387
388 - def fileno(self):
389 return self._transport.fileno()
390 391 @synchronized
392 - def reading(self):
393 return self._transport is not None and \ 394 self._transport.reading(True)
395 396 @synchronized
397 - def writing(self):
398 return self._transport is not None and \ 399 self._transport.writing(self.engine.pending())
400 401 @synchronized
402 - def timing(self):
403 return self._timeout
404 405 @synchronized
406 - def readable(self):
407 try: 408 data = self._transport.recv(64*1024) 409 if data is None: 410 return 411 elif data: 412 rawlog.debug("READ[%s]: %r", self.log_id, data) 413 self.engine.write(data) 414 else: 415 self.close_engine() 416 except socket.error, e: 417 self.close_engine(ConnectionError(text=str(e))) 418 419 self.update_status() 420 421 self._notify()
422
423 - def _notify(self):
424 if self.connection.error: 425 self.connection._condition.gc() 426 self.connection._waiter.notifyAll()
427
428 - def close_engine(self, e=None):
429 if e is None: 430 e = ConnectionError(text="connection aborted") 431 432 if (self.connection.reconnect and 433 (self.connection.reconnect_limit is None or 434 self.connection.reconnect_limit <= 0 or 435 self._attempts <= self.connection.reconnect_limit)): 436 if self._host < self._num_hosts(): 437 delay = 0 438 else: 439 delay = self._delay 440 self._delay = min(2*self._delay, 441 self.connection.reconnect_interval_max) 442 self._next_retry = time.time() + delay 443 if self._reconnect_log: 444 log.warn("recoverable error[attempt %s]: %s" % (self._attempts, e)) 445 if delay > 0: 446 log.warn("sleeping %s seconds" % delay) 447 self._retrying = True 448 self.engine.close() 449 else: 450 self.engine.close(e) 451 452 self.schedule()
453
454 - def update_status(self):
455 status = self.engine.status() 456 return getattr(self, "st_%s" % status.lower())()
457
458 - def st_closed(self):
459 # XXX: this log statement seems to sometimes hit when the socket is not connected 460 # XXX: rawlog.debug("CLOSE[%s]: %s", self.log_id, self._socket.getpeername()) 461 self._transport.close() 462 self._transport = None 463 self.engine = None 464 return True
465
466 - def st_open(self):
467 return False
468 469 @synchronized
470 - def writeable(self):
471 notify = False 472 try: 473 n = self._transport.send(self.engine.peek()) 474 if n == 0: return 475 sent = self.engine.read(n) 476 rawlog.debug("SENT[%s]: %r", self.log_id, sent) 477 except socket.error, e: 478 self.close_engine(e) 479 notify = True 480 481 if self.update_status() or notify: 482 self._notify()
483 484 @synchronized
485 - def timeout(self):
486 self.dispatch() 487 self._notify() 488 self.schedule()
489
490 - def schedule(self):
491 times = [] 492 if self.connection.heartbeat: 493 times.append(time.time() + self.connection.heartbeat) 494 if self._next_retry: 495 times.append(self._next_retry) 496 if times: 497 self._timeout = min(times) 498 else: 499 self._timeout = None
500
501 - def dispatch(self):
502 try: 503 if self._transport is None: 504 if self.connection._connected and not self.connection.error: 505 self.connect() 506 else: 507 self.engine.dispatch() 508 except HeartbeatTimeout, e: 509 self.close_engine(e) 510 except: 511 # XXX: Does socket get leaked if this occurs? 512 msg = compat.format_exc() 513 self.connection.error = InternalError(text=msg)
514
515 - def connect(self):
516 if self._retrying and time.time() < self._next_retry: 517 return 518 519 try: 520 # XXX: should make this non blocking 521 host, port = self._next_host() 522 if self._retrying and self._reconnect_log: 523 log.warn("trying: %s:%s", host, port) 524 self.engine = Engine(self.connection) 525 self.engine.open() 526 rawlog.debug("OPEN[%s]: %s:%s", self.log_id, host, port) 527 trans = transports.TRANSPORTS.get(self.connection.transport) 528 if trans: 529 self._transport = trans(self.connection, host, port) 530 else: 531 raise ConnectError("no such transport: %s" % self.connection.transport) 532 if self._retrying and self._reconnect_log: 533 log.warn("reconnect succeeded: %s:%s", host, port) 534 self._next_retry = None 535 self._attempts = 0 536 self._host = 0 537 self._delay = self.connection.reconnect_interval_min 538 self._retrying = False 539 self.schedule() 540 except socket.error, e: 541 self.close_engine(ConnectError(text=str(e)))
542 543 DEFAULT_DISPOSITION = Disposition(None)
544 545 -def get_bindings(opts, queue=None, exchange=None, key=None):
546 bindings = opts.get("x-bindings", []) 547 cmds = [] 548 for b in bindings: 549 exchange = b.get("exchange", exchange) 550 queue = b.get("queue", queue) 551 key = b.get("key", key) 552 args = b.get("arguments", {}) 553 cmds.append(ExchangeBind(queue, exchange, key, args)) 554 return cmds
555 556 CONNECTION_ERRS = { 557 # anythong not here (i.e. everything right now) will default to 558 # connection error 559 } 560 561 SESSION_ERRS = { 562 # anything not here will default to session error 563 error_code.unauthorized_access: UnauthorizedAccess, 564 error_code.not_found: NotFound, 565 error_code.resource_locked: ReceiverError, 566 error_code.resource_limit_exceeded: TargetCapacityExceeded, 567 error_code.internal_error: ServerError 568 }
569 570 -class Engine:
571
572 - def __init__(self, connection):
573 self.connection = connection 574 self.log_id = "%x" % id(self.connection) 575 self._closing = False 576 self._connected = False 577 self._attachments = {} 578 579 self._in = LinkIn() 580 self._out = LinkOut() 581 582 self._channel_max = 65536 583 self._channels = 0 584 self._sessions = {} 585 586 self.address_cache = Cache(self.connection.address_ttl) 587 588 self._status = CLOSED 589 self._buf = "" 590 self._hdr = "" 591 self._last_in = None 592 self._last_out = None 593 self._op_enc = OpEncoder() 594 self._seg_enc = SegmentEncoder() 595 self._frame_enc = FrameEncoder() 596 self._frame_dec = FrameDecoder() 597 self._seg_dec = SegmentDecoder() 598 self._op_dec = OpDecoder() 599 600 self._sasl = sasl.Client() 601 if self.connection.username: 602 self._sasl.setAttr("username", self.connection.username) 603 if self.connection.password: 604 self._sasl.setAttr("password", self.connection.password) 605 if self.connection.host: 606 self._sasl.setAttr("host", self.connection.host) 607 self._sasl.setAttr("service", self.connection.sasl_service) 608 if self.connection.sasl_min_ssf is not None: 609 self._sasl.setAttr("minssf", self.connection.sasl_min_ssf) 610 if self.connection.sasl_max_ssf is not None: 611 self._sasl.setAttr("maxssf", self.connection.sasl_max_ssf) 612 self._sasl.init() 613 self._sasl_encode = False 614 self._sasl_decode = False
615
616 - def _reset(self):
617 self.connection._transport_connected = False 618 619 for ssn in self.connection.sessions.values(): 620 for m in ssn.acked + ssn.unacked + ssn.incoming: 621 m._transfer_id = None 622 for snd in ssn.senders: 623 snd.linked = False 624 for rcv in ssn.receivers: 625 rcv.impending = rcv.received 626 rcv.linked = False
627
628 - def status(self):
629 return self._status
630
631 - def write(self, data):
632 self._last_in = time.time() 633 try: 634 if self._sasl_decode: 635 data = self._sasl.decode(data) 636 637 if len(self._hdr) < 8: 638 r = 8 - len(self._hdr) 639 self._hdr += data[:r] 640 data = data[r:] 641 642 if len(self._hdr) == 8: 643 self.do_header(self._hdr) 644 645 self._frame_dec.write(data) 646 self._seg_dec.write(*self._frame_dec.read()) 647 self._op_dec.write(*self._seg_dec.read()) 648 for op in self._op_dec.read(): 649 self.assign_id(op) 650 opslog.debug("RCVD[%s]: %r", self.log_id, op) 651 op.dispatch(self) 652 self.dispatch() 653 except MessagingError, e: 654 self.close(e) 655 except: 656 self.close(InternalError(text=compat.format_exc()))
657
658 - def close(self, e=None):
659 self._reset() 660 if e: 661 self.connection.error = e 662 self._status = CLOSED
663
664 - def assign_id(self, op):
665 if isinstance(op, Command): 666 sst = self.get_sst(op) 667 op.id = sst.received 668 sst.received += 1
669
670 - def pending(self):
671 return len(self._buf)
672
673 - def read(self, n):
674 result = self._buf[:n] 675 self._buf = self._buf[n:] 676 return result
677
678 - def peek(self):
679 return self._buf
680
681 - def write_op(self, op):
682 opslog.debug("SENT[%s]: %r", self.log_id, op) 683 self._op_enc.write(op) 684 self._seg_enc.write(*self._op_enc.read()) 685 self._frame_enc.write(*self._seg_enc.read()) 686 bytes = self._frame_enc.read() 687 if self._sasl_encode: 688 bytes = self._sasl.encode(bytes) 689 self._buf += bytes 690 self._last_out = time.time()
691
692 - def do_header(self, hdr):
693 cli_major = 0; cli_minor = 10 694 magic, _, _, major, minor = struct.unpack(HEADER, hdr) 695 if major != cli_major or minor != cli_minor: 696 raise VersionError(text="client: %s-%s, server: %s-%s" % 697 (cli_major, cli_minor, major, minor))
698
699 - def do_connection_start(self, start):
700 if self.connection.sasl_mechanisms: 701 permitted = self.connection.sasl_mechanisms.split() 702 mechs = [m for m in start.mechanisms if m in permitted] 703 else: 704 mechs = start.mechanisms 705 try: 706 mech, initial = self._sasl.start(" ".join(mechs)) 707 except sasl.SASLError, e: 708 raise AuthenticationFailure(text=str(e)) 709 self.write_op(ConnectionStartOk(client_properties=CLIENT_PROPERTIES, 710 mechanism=mech, response=initial))
711
712 - def do_connection_secure(self, secure):
713 resp = self._sasl.step(secure.challenge) 714 self.write_op(ConnectionSecureOk(response=resp))
715
716 - def do_connection_tune(self, tune):
717 # XXX: is heartbeat protocol specific? 718 if tune.channel_max is not None: 719 self.channel_max = tune.channel_max 720 self.write_op(ConnectionTuneOk(heartbeat=self.connection.heartbeat, 721 channel_max=self.channel_max)) 722 self.write_op(ConnectionOpen()) 723 self._sasl_encode = True
724
725 - def do_connection_open_ok(self, open_ok):
726 self.connection.auth_username = self._sasl.auth_username() 727 self._connected = True 728 self._sasl_decode = True 729 self.connection._transport_connected = True
730
731 - def do_connection_heartbeat(self, hrt):
732 pass
733
734 - def do_connection_close(self, close):
735 self.write_op(ConnectionCloseOk()) 736 if close.reply_code != close_code.normal: 737 exc = CONNECTION_ERRS.get(close.reply_code, ConnectionError) 738 self.connection.error = exc(close.reply_code, close.reply_text)
739 # XXX: should we do a half shutdown on the socket here? 740 # XXX: we really need to test this, we may end up reporting a 741 # connection abort after this, if we were to do a shutdown on read 742 # and stop reading, then we wouldn't report the abort, that's 743 # probably the right thing to do 744
745 - def do_connection_close_ok(self, close_ok):
746 self.close()
747
748 - def do_session_attached(self, atc):
749 pass
750
751 - def do_session_command_point(self, cp):
752 sst = self.get_sst(cp) 753 sst.received = cp.command_id
754
755 - def do_session_completed(self, sc):
756 sst = self.get_sst(sc) 757 for r in sc.commands: 758 sst.acknowledged.add(r.lower, r.upper) 759 760 if not sc.commands.empty(): 761 while sst.min_completion in sc.commands: 762 if sst.actions.has_key(sst.min_completion): 763 sst.actions.pop(sst.min_completion)() 764 sst.min_completion += 1
765
766 - def session_known_completed(self, kcmp):
767 sst = self.get_sst(kcmp) 768 executed = RangedSet() 769 for e in sst.executed.ranges: 770 for ke in kcmp.ranges: 771 if e.lower in ke and e.upper in ke: 772 break 773 else: 774 executed.add_range(e) 775 sst.executed = completed
776
777 - def do_session_flush(self, sf):
778 sst = self.get_sst(sf) 779 if sf.expected: 780 if sst.received is None: 781 exp = None 782 else: 783 exp = RangedSet(sst.received) 784 sst.write_op(SessionExpected(exp)) 785 if sf.confirmed: 786 sst.write_op(SessionConfirmed(sst.executed)) 787 if sf.completed: 788 sst.write_op(SessionCompleted(sst.executed))
789
790 - def do_session_request_timeout(self, rt):
791 sst = self.get_sst(rt) 792 sst.write_op(SessionTimeout(timeout=0))
793
794 - def do_execution_result(self, er):
795 sst = self.get_sst(er) 796 sst.results[er.command_id] = er.value 797 sst.executed.add(er.id)
798
799 - def do_execution_exception(self, ex):
800 sst = self.get_sst(ex) 801 exc = SESSION_ERRS.get(ex.error_code, SessionError) 802 sst.session.error = exc(ex.error_code, ex.description)
803
804 - def dispatch(self):
805 if not self.connection._connected and not self._closing and self._status != CLOSED: 806 self.disconnect() 807 808 if self._connected and not self._closing: 809 for ssn in self.connection.sessions.values(): 810 self.attach(ssn) 811 self.process(ssn) 812 813 if self.connection.heartbeat and self._status != CLOSED: 814 now = time.time() 815 if self._last_in is not None and \ 816 now - self._last_in > 2*self.connection.heartbeat: 817 raise HeartbeatTimeout(text="heartbeat timeout") 818 if self._last_out is None or now - self._last_out >= self.connection.heartbeat/2.0: 819 self.write_op(ConnectionHeartbeat())
820
821 - def open(self):
822 self._reset() 823 self._status = OPEN 824 self._buf += struct.pack(HEADER, "AMQP", 1, 1, 0, 10)
825
826 - def disconnect(self):
827 self.write_op(ConnectionClose(close_code.normal)) 828 self._closing = True
829
830 - def attach(self, ssn):
831 if ssn.closed: return 832 sst = self._attachments.get(ssn) 833 if sst is None: 834 for i in xrange(0, self.channel_max): 835 if not self._sessions.has_key(i): 836 ch = i 837 break 838 else: 839 raise RuntimeError("all channels used") 840 sst = SessionState(self, ssn, ssn.name, ch) 841 sst.write_op(SessionAttach(name=ssn.name)) 842 sst.write_op(SessionCommandPoint(sst.sent, 0)) 843 sst.outgoing_idx = 0 844 sst.acked = [] 845 sst.acked_idx = 0 846 if ssn.transactional: 847 sst.write_cmd(TxSelect()) 848 self._attachments[ssn] = sst 849 self._sessions[sst.channel] = sst 850 851 for snd in ssn.senders: 852 self.link(snd, self._out, snd.target) 853 for rcv in ssn.receivers: 854 self.link(rcv, self._in, rcv.source) 855 856 if sst is not None and ssn.closing and not sst.detached: 857 sst.detached = True 858 sst.write_op(SessionDetach(name=ssn.name))
859
860 - def get_sst(self, op):
861 return self._sessions[op.channel]
862
863 - def do_session_detached(self, dtc):
864 sst = self._sessions.pop(dtc.channel) 865 ssn = sst.session 866 del self._attachments[ssn] 867 ssn.closed = True
868
869 - def do_session_detach(self, dtc):
870 sst = self.get_sst(dtc) 871 sst.write_op(SessionDetached(name=dtc.name)) 872 self.do_session_detached(dtc)
873 891 892 def resolved(type, subtype): 893 dir.do_link(sst, lnk, _lnk, type, subtype, linked)
894 895 self.resolve_declare(sst, _lnk, dir.DIR_NAME, resolved) 896 self._attachments[lnk] = _lnk 897 898 if lnk.linked and lnk.closing and not lnk.closed: 899 if not _lnk.closing: 900 def unlinked(): 901 dir.del_link(sst, lnk, _lnk) 902 del self._attachments[lnk] 903 lnk.closed = True 904 if _lnk.options.get("delete") in ("always", dir.DIR_NAME): 905 dir.do_unlink(sst, lnk, _lnk) 906 self.delete(sst, _lnk.name, unlinked) 907 else: 908 dir.do_unlink(sst, lnk, _lnk, unlinked) 909 _lnk.closing = True 910 elif not lnk.linked and lnk.closing and not lnk.closed: 911 if lnk.error: lnk.closed = True 912
913 - def parse_address(self, lnk, dir, addr):
914 if addr is None: 915 return MalformedAddress(text="%s is None" % dir.ADDR_NAME) 916 else: 917 try: 918 lnk.name, lnk.subject, lnk.options = address.parse(addr) 919 # XXX: subject 920 if lnk.options is None: 921 lnk.options = {} 922 except address.LexError, e: 923 return MalformedAddress(text=str(e)) 924 except address.ParseError, e: 925 return MalformedAddress(text=str(e))
926
927 - def validate_options(self, lnk, dir):
928 ctx = Context() 929 err = dir.VALIDATOR.validate(lnk.options, ctx) 930 if err: return InvalidOption(text="error in options: %s" % err)
931
932 - def resolve_declare(self, sst, lnk, dir, action):
933 declare = lnk.options.get("create") in ("always", dir) 934 assrt = lnk.options.get("assert") in ("always", dir) 935 def do_resolved(type, subtype): 936 err = None 937 if type is None: 938 if declare: 939 err = self.declare(sst, lnk, action) 940 else: 941 err = NotFound(text="no such queue: %s" % lnk.name) 942 else: 943 if assrt: 944 expected = lnk.options.get("node", {}).get("type") 945 if expected and type != expected: 946 err = AssertionFailed(text="expected %s, got %s" % (expected, type)) 947 if err is None: 948 action(type, subtype) 949 950 if err: 951 tgt = lnk.target 952 tgt.error = err 953 del self._attachments[tgt] 954 tgt.closed = True 955 return
956 self.resolve(sst, lnk.name, do_resolved, force=declare) 957
958 - def resolve(self, sst, name, action, force=False):
959 if not force: 960 try: 961 type, subtype = self.address_cache[name] 962 action(type, subtype) 963 return 964 except KeyError: 965 pass 966 967 args = [] 968 def do_result(r): 969 args.append(r)
970 def do_action(r): 971 do_result(r) 972 er, qr = args 973 if er.not_found and not qr.queue: 974 type, subtype = None, None 975 elif qr.queue: 976 type, subtype = "queue", None 977 else: 978 type, subtype = "topic", er.type 979 if type is not None: 980 self.address_cache[name] = (type, subtype) 981 action(type, subtype) 982 sst.write_query(ExchangeQuery(name), do_result) 983 sst.write_query(QueueQuery(name), do_action) 984
985 - def declare(self, sst, lnk, action):
986 name = lnk.name 987 props = lnk.options.get("node", {}) 988 durable = props.get("durable", DURABLE_DEFAULT) 989 type = props.get("type", "queue") 990 declare = props.get("x-declare", {}) 991 992 if type == "topic": 993 cmd = ExchangeDeclare(exchange=name, durable=durable) 994 bindings = get_bindings(props, exchange=name) 995 elif type == "queue": 996 cmd = QueueDeclare(queue=name, durable=durable) 997 bindings = get_bindings(props, queue=name) 998 else: 999 raise ValueError(type) 1000 1001 sst.apply_overrides(cmd, declare) 1002 1003 if type == "topic": 1004 if cmd.type is None: 1005 cmd.type = "topic" 1006 subtype = cmd.type 1007 else: 1008 subtype = None 1009 1010 cmds = [cmd] 1011 cmds.extend(bindings) 1012 1013 def declared(): 1014 self.address_cache[name] = (type, subtype) 1015 action(type, subtype)
1016 1017 sst.write_cmds(cmds, declared) 1018
1019 - def delete(self, sst, name, action):
1020 def deleted(): 1021 del self.address_cache[name] 1022 action()
1023 1024 def do_delete(type, subtype): 1025 if type == "topic": 1026 sst.write_cmd(ExchangeDelete(name), deleted) 1027 elif type == "queue": 1028 sst.write_cmd(QueueDelete(name), deleted) 1029 elif type is None: 1030 action() 1031 else: 1032 raise ValueError(type) 1033 self.resolve(sst, name, do_delete, force=True) 1034
1035 - def process(self, ssn):
1036 if ssn.closed or ssn.closing: return 1037 1038 sst = self._attachments[ssn] 1039 1040 while sst.outgoing_idx < len(ssn.outgoing): 1041 msg = ssn.outgoing[sst.outgoing_idx] 1042 snd = msg._sender 1043 # XXX: should check for sender error here 1044 _snd = self._attachments.get(snd) 1045 if _snd and snd.linked: 1046 self.send(snd, msg) 1047 sst.outgoing_idx += 1 1048 else: 1049 break 1050 1051 for snd in ssn.senders: 1052 # XXX: should included snd.acked in this 1053 if snd.synced >= snd.queued and sst.need_sync: 1054 sst.write_cmd(ExecutionSync(), sync_noop) 1055 1056 for rcv in ssn.receivers: 1057 self.process_receiver(rcv) 1058 1059 if ssn.acked: 1060 messages = ssn.acked[sst.acked_idx:] 1061 if messages: 1062 ids = RangedSet() 1063 1064 disposed = [(DEFAULT_DISPOSITION, [])] 1065 acked = [] 1066 for m in messages: 1067 # XXX: we're ignoring acks that get lost when disconnected, 1068 # could we deal this via some message-id based purge? 1069 if m._transfer_id is None: 1070 acked.append(m) 1071 continue 1072 ids.add(m._transfer_id) 1073 if m._receiver._accept_mode is accept_mode.explicit: 1074 disp = m._disposition or DEFAULT_DISPOSITION 1075 last, msgs = disposed[-1] 1076 if disp.type is last.type and disp.options == last.options: 1077 msgs.append(m) 1078 else: 1079 disposed.append((disp, [m])) 1080 else: 1081 acked.append(m) 1082 1083 for range in ids: 1084 sst.executed.add_range(range) 1085 sst.write_op(SessionCompleted(sst.executed)) 1086 1087 def ack_acker(msgs): 1088 def ack_ack(): 1089 for m in msgs: 1090 ssn.acked.remove(m) 1091 sst.acked_idx -= 1 1092 # XXX: should this check accept_mode too? 1093 if not ssn.transactional: 1094 sst.acked.remove(m)
1095 return ack_ack 1096 1097 for disp, msgs in disposed: 1098 if not msgs: continue 1099 if disp.type is None: 1100 op = MessageAccept 1101 elif disp.type is RELEASED: 1102 op = MessageRelease 1103 elif disp.type is REJECTED: 1104 op = MessageReject 1105 sst.write_cmd(op(RangedSet(*[m._transfer_id for m in msgs]), 1106 **disp.options), 1107 ack_acker(msgs)) 1108 if log.isEnabledFor(DEBUG): 1109 for m in msgs: 1110 log.debug("SACK[%s]: %s, %s", ssn.log_id, m, m._disposition) 1111 1112 sst.acked.extend(messages) 1113 sst.acked_idx += len(messages) 1114 ack_acker(acked)() 1115 1116 if ssn.committing and not sst.committing: 1117 def commit_ok(): 1118 del sst.acked[:] 1119 ssn.committing = False 1120 ssn.committed = True 1121 ssn.aborting = False 1122 ssn.aborted = False 1123 sst.committing = False 1124 sst.write_cmd(TxCommit(), commit_ok) 1125 sst.committing = True 1126 1127 if ssn.aborting and not sst.aborting: 1128 sst.aborting = True 1129 def do_rb(): 1130 messages = sst.acked + ssn.unacked + ssn.incoming 1131 ids = RangedSet(*[m._transfer_id for m in messages]) 1132 for range in ids: 1133 sst.executed.add_range(range) 1134 sst.write_op(SessionCompleted(sst.executed)) 1135 sst.write_cmd(MessageRelease(ids, True)) 1136 sst.write_cmd(TxRollback(), do_rb_ok) 1137 1138 def do_rb_ok(): 1139 del ssn.incoming[:] 1140 del ssn.unacked[:] 1141 del sst.acked[:] 1142 1143 for rcv in ssn.receivers: 1144 rcv.impending = rcv.received 1145 rcv.returned = rcv.received 1146 # XXX: do we need to update granted here as well? 1147 1148 for rcv in ssn.receivers: 1149 self.process_receiver(rcv) 1150 1151 ssn.aborting = False 1152 ssn.aborted = True 1153 ssn.committing = False 1154 ssn.committed = False 1155 sst.aborting = False 1156 1157 for rcv in ssn.receivers: 1158 _rcv = self._attachments[rcv] 1159 sst.write_cmd(MessageStop(_rcv.destination)) 1160 sst.write_cmd(ExecutionSync(), do_rb) 1161
1162 - def grant(self, rcv):
1163 sst = self._attachments[rcv.session] 1164 _rcv = self._attachments.get(rcv) 1165 if _rcv is None or not rcv.linked or _rcv.closing or _rcv.draining: 1166 return 1167 1168 if rcv.granted is UNLIMITED: 1169 if rcv.impending is UNLIMITED: 1170 delta = 0 1171 else: 1172 delta = UNLIMITED 1173 elif rcv.impending is UNLIMITED: 1174 delta = -1 1175 else: 1176 delta = max(rcv.granted, rcv.received) - rcv.impending 1177 1178 if delta is UNLIMITED: 1179 if not _rcv.bytes_open: 1180 sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.byte, UNLIMITED.value)) 1181 _rcv.bytes_open = True 1182 sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.message, UNLIMITED.value)) 1183 rcv.impending = UNLIMITED 1184 elif delta > 0: 1185 if not _rcv.bytes_open: 1186 sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.byte, UNLIMITED.value)) 1187 _rcv.bytes_open = True 1188 sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.message, delta)) 1189 rcv.impending += delta 1190 elif delta < 0 and not rcv.draining: 1191 _rcv.draining = True 1192 def do_stop(): 1193 rcv.impending = rcv.received 1194 _rcv.draining = False 1195 _rcv.bytes_open = False 1196 self.grant(rcv)
1197 sst.write_cmd(MessageStop(_rcv.destination), do_stop) 1198 1199 if rcv.draining: 1200 _rcv.draining = True 1201 def do_flush(): 1202 rcv.impending = rcv.received 1203 rcv.granted = rcv.impending 1204 _rcv.draining = False 1205 _rcv.bytes_open = False 1206 rcv.draining = False 1207 sst.write_cmd(MessageFlush(_rcv.destination), do_flush) 1208 1209
1210 - def process_receiver(self, rcv):
1211 if rcv.closed: return 1212 self.grant(rcv)
1213
1214 - def send(self, snd, msg):
1215 sst = self._attachments[snd.session] 1216 _snd = self._attachments[snd] 1217 1218 if msg.subject is None or _snd._exchange == "": 1219 rk = _snd._routing_key 1220 else: 1221 rk = msg.subject 1222 1223 if msg.subject is None: 1224 subject = _snd.subject 1225 else: 1226 subject = msg.subject 1227 1228 # XXX: do we need to query to figure out how to create the reply-to interoperably? 1229 if msg.reply_to: 1230 rt = addr2reply_to(msg.reply_to) 1231 else: 1232 rt = None 1233 content_encoding = msg.properties.get("x-amqp-0-10.content-encoding") 1234 dp = DeliveryProperties(routing_key=rk) 1235 mp = MessageProperties(message_id=msg.id, 1236 user_id=msg.user_id, 1237 reply_to=rt, 1238 correlation_id=msg.correlation_id, 1239 app_id = msg.properties.get("x-amqp-0-10.app-id"), 1240 content_type=msg.content_type, 1241 content_encoding=content_encoding, 1242 application_headers=msg.properties) 1243 if subject is not None: 1244 if mp.application_headers is None: 1245 mp.application_headers = {} 1246 mp.application_headers[SUBJECT] = subject 1247 if msg.durable is not None: 1248 if msg.durable: 1249 dp.delivery_mode = delivery_mode.persistent 1250 else: 1251 dp.delivery_mode = delivery_mode.non_persistent 1252 if msg.priority is not None: 1253 dp.priority = msg.priority 1254 if msg.ttl is not None: 1255 dp.ttl = long(msg.ttl*1000) 1256 enc, dec = get_codec(msg.content_type) 1257 body = enc(msg.content) 1258 1259 # XXX: this is not safe for out of order, can this be triggered by pre_ack? 1260 def msg_acked(): 1261 # XXX: should we log the ack somehow too? 1262 snd.acked += 1 1263 m = snd.session.outgoing.pop(0) 1264 sst.outgoing_idx -= 1 1265 log.debug("RACK[%s]: %s", sst.session.log_id, msg) 1266 assert msg == m
1267 1268 xfr = MessageTransfer(destination=_snd._exchange, headers=(dp, mp), 1269 payload=body) 1270 1271 if _snd.pre_ack: 1272 sst.write_cmd(xfr) 1273 else: 1274 sst.write_cmd(xfr, msg_acked, sync=msg._sync) 1275 1276 log.debug("SENT[%s]: %s", sst.session.log_id, msg) 1277 1278 if _snd.pre_ack: 1279 msg_acked() 1280
1281 - def do_message_transfer(self, xfr):
1282 sst = self.get_sst(xfr) 1283 ssn = sst.session 1284 1285 msg = self._decode(xfr) 1286 rcv = sst.destinations[xfr.destination].target 1287 msg._receiver = rcv 1288 if rcv.impending is not UNLIMITED: 1289 assert rcv.received < rcv.impending, "%s, %s" % (rcv.received, rcv.impending) 1290 rcv.received += 1 1291 log.debug("RCVD[%s]: %s", ssn.log_id, msg) 1292 ssn.incoming.append(msg)
1293
1294 - def _decode(self, xfr):
1295 dp = EMPTY_DP 1296 mp = EMPTY_MP 1297 1298 for h in xfr.headers: 1299 if isinstance(h, DeliveryProperties): 1300 dp = h 1301 elif isinstance(h, MessageProperties): 1302 mp = h 1303 1304 ap = mp.application_headers 1305 enc, dec = get_codec(mp.content_type) 1306 content = dec(xfr.payload) 1307 msg = Message(content) 1308 msg.id = mp.message_id 1309 if ap is not None: 1310 msg.subject = ap.get(SUBJECT) 1311 msg.user_id = mp.user_id 1312 if mp.reply_to is not None: 1313 msg.reply_to = reply_to2addr(mp.reply_to) 1314 msg.correlation_id = mp.correlation_id 1315 if dp.delivery_mode is not None: 1316 msg.durable = dp.delivery_mode == delivery_mode.persistent 1317 msg.priority = dp.priority 1318 if dp.ttl is not None: 1319 msg.ttl = dp.ttl/1000.0 1320 msg.redelivered = dp.redelivered 1321 msg.properties = mp.application_headers or {} 1322 if mp.app_id is not None: 1323 msg.properties["x-amqp-0-10.app-id"] = mp.app_id 1324 if mp.content_encoding is not None: 1325 msg.properties["x-amqp-0-10.content-encoding"] = mp.content_encoding 1326 if dp.routing_key is not None: 1327 msg.properties["x-amqp-0-10.routing-key"] = dp.routing_key 1328 if dp.timestamp is not None: 1329 msg.properties["x-amqp-0-10.timestamp"] = dp.timestamp 1330 msg.content_type = mp.content_type 1331 msg._transfer_id = xfr.id 1332 return msg
1333