Package qpid :: Package messaging :: Module driver
[frames] | no frames]

Source Code for Module qpid.messaging.driver

   1  # 
   2  # Licensed to the Apache Software Foundation (ASF) under one 
   3  # or more contributor license agreements.  See the NOTICE file 
   4  # distributed with this work for additional information 
   5  # regarding copyright ownership.  The ASF licenses this file 
   6  # to you under the Apache License, Version 2.0 (the 
   7  # "License"); you may not use this file except in compliance 
   8  # with the License.  You may obtain a copy of the License at 
   9  # 
  10  #   http://www.apache.org/licenses/LICENSE-2.0 
  11  # 
  12  # Unless required by applicable law or agreed to in writing, 
  13  # software distributed under the License is distributed on an 
  14  # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
  15  # KIND, either express or implied.  See the License for the 
  16  # specific language governing permissions and limitations 
  17  # under the License. 
  18  # 
  19   
  20  import socket, struct, sys, time 
  21  from logging import getLogger, DEBUG 
  22  from qpid import compat 
  23  from qpid import sasl 
  24  from qpid.concurrency import synchronized 
  25  from qpid.datatypes import RangedSet, Serial 
  26  from qpid.framing import OpEncoder, SegmentEncoder, FrameEncoder, \ 
  27      FrameDecoder, SegmentDecoder, OpDecoder 
  28  from qpid.messaging import address, transports 
  29  from qpid.messaging.constants import UNLIMITED, REJECTED, RELEASED 
  30  from qpid.messaging.exceptions import * 
  31  from qpid.messaging.message import get_codec, Disposition, Message 
  32  from qpid.ops import * 
  33  from qpid.selector import Selector 
  34  from qpid.util import URL, default,get_client_properties_with_defaults 
  35  from qpid.validator import And, Context, List, Map, Types, Values 
  36  from threading import Condition, Thread 
  37   
  38  log = getLogger("qpid.messaging") 
  39  rawlog = getLogger("qpid.messaging.io.raw") 
  40  opslog = getLogger("qpid.messaging.io.ops") 
41 42 -def addr2reply_to(addr):
43 name, subject, options = address.parse(addr) 44 if options: 45 type = options.get("node", {}).get("type") 46 else: 47 type = None 48 49 if type == "topic": 50 return ReplyTo(name, subject) 51 else: 52 return ReplyTo(None, name)
53
54 -def reply_to2addr(reply_to):
55 if reply_to.exchange in (None, ""): 56 return reply_to.routing_key 57 elif reply_to.routing_key is None: 58 return "%s; {node: {type: topic}}" % reply_to.exchange 59 else: 60 return "%s/%s; {node: {type: topic}}" % (reply_to.exchange, reply_to.routing_key)
61
62 -class Attachment:
63
64 - def __init__(self, target):
65 self.target = target
66 67 # XXX 68 69 DURABLE_DEFAULT=False
70 71 # XXX 72 73 -class Pattern:
74 """ 75 The pattern filter matches the supplied wildcard pattern against a 76 message subject. 77 """ 78
79 - def __init__(self, value):
80 self.value = value
81 82 # XXX: this should become part of the driver
83 - def _bind(self, sst, exchange, queue):
84 from qpid.ops import ExchangeBind 85 86 sst.write_cmd(ExchangeBind(exchange=exchange, queue=queue, 87 binding_key=self.value.replace("*", "#")))
88 89 SUBJECT_DEFAULTS = { 90 "topic": "#" 91 }
92 93 -def noop(): pass
94 -def sync_noop(): pass
95
96 -class SessionState:
97
98 - def __init__(self, driver, session, name, channel):
99 self.driver = driver 100 self.session = session 101 self.name = name 102 self.channel = channel 103 self.detached = False 104 self.committing = False 105 self.aborting = False 106 107 # sender state 108 self.sent = Serial(0) 109 self.acknowledged = RangedSet() 110 self.actions = {} 111 self.min_completion = self.sent 112 self.max_completion = self.sent 113 self.results = {} 114 self.need_sync = False 115 116 # receiver state 117 self.received = None 118 self.executed = RangedSet() 119 120 # XXX: need to periodically exchange completion/known_completion 121 122 self.destinations = {}
123
124 - def write_query(self, query, handler):
125 id = self.sent 126 self.write_cmd(query, lambda: handler(self.results.pop(id)))
127
128 - def apply_overrides(self, cmd, overrides):
129 for k, v in overrides.items(): 130 cmd[k.replace('-', '_')] = v
131
132 - def write_cmd(self, cmd, action=noop, overrides=None, sync=True):
133 if overrides: 134 self.apply_overrides(cmd, overrides) 135 136 if action != noop: 137 cmd.sync = sync 138 if self.detached: 139 raise Exception("detached") 140 cmd.id = self.sent 141 self.sent += 1 142 self.actions[cmd.id] = action 143 self.max_completion = cmd.id 144 self.write_op(cmd) 145 self.need_sync = not cmd.sync
146
147 - def write_cmds(self, cmds, action=noop):
148 if cmds: 149 for cmd in cmds[:-1]: 150 self.write_cmd(cmd) 151 self.write_cmd(cmds[-1], action) 152 else: 153 action()
154
155 - def write_op(self, op):
156 op.channel = self.channel 157 self.driver.write_op(op)
158 159 POLICIES = Values("always", "sender", "receiver", "never") 160 RELIABILITY = Values("unreliable", "at-most-once", "at-least-once", 161 "exactly-once") 162 163 DECLARE = Map({}, restricted=False) 164 BINDINGS = List(Map({ 165 "exchange": Types(basestring), 166 "queue": Types(basestring), 167 "key": Types(basestring), 168 "arguments": Map({}, restricted=False) 169 })) 170 171 COMMON_OPTS = { 172 "create": POLICIES, 173 "delete": POLICIES, 174 "assert": POLICIES, 175 "node": Map({ 176 "type": Values("queue", "topic"), 177 "durable": Types(bool), 178 "x-declare": DECLARE, 179 "x-bindings": BINDINGS 180 }), 181 "link": Map({ 182 "name": Types(basestring), 183 "durable": Types(bool), 184 "reliability": RELIABILITY, 185 "x-declare": DECLARE, 186 "x-bindings": BINDINGS, 187 "x-subscribe": Map({}, restricted=False) 188 }) 189 } 190 191 RECEIVE_MODES = Values("browse", "consume") 192 193 SOURCE_OPTS = COMMON_OPTS.copy() 194 SOURCE_OPTS.update({ 195 "mode": RECEIVE_MODES 196 }) 197 198 TARGET_OPTS = COMMON_OPTS.copy()
199 200 -class LinkIn:
201 202 ADDR_NAME = "source" 203 DIR_NAME = "receiver" 204 VALIDATOR = Map(SOURCE_OPTS) 205 212 256 272
275
276 -class LinkOut:
277 278 ADDR_NAME = "target" 279 DIR_NAME = "sender" 280 VALIDATOR = Map(TARGET_OPTS) 281 285 299 302
305
306 -class Cache:
307
308 - def __init__(self, ttl):
309 self.ttl = ttl 310 self.entries = {}
311
312 - def __setitem__(self, key, value):
313 self.entries[key] = time.time(), value
314
315 - def __getitem__(self, key):
316 tstamp, value = self.entries[key] 317 if time.time() - tstamp >= self.ttl: 318 del self.entries[key] 319 raise KeyError(key) 320 else: 321 return value
322
323 - def __delitem__(self, key):
324 del self.entries[key]
325 326 # XXX 327 HEADER="!4s4B" 328 329 EMPTY_DP = DeliveryProperties() 330 EMPTY_MP = MessageProperties() 331 332 SUBJECT = "qpid.subject" 333 334 CLOSED = "CLOSED" 335 READ_ONLY = "READ_ONLY" 336 WRITE_ONLY = "WRITE_ONLY" 337 OPEN = "OPEN"
338 339 -class Driver:
340
341 - def __init__(self, connection):
342 self.connection = connection 343 self.log_id = "%x" % id(self.connection) 344 self._lock = self.connection._lock 345 346 self._selector = Selector.default() 347 self._attempts = 0 348 self._delay = self.connection.reconnect_interval_min 349 self._reconnect_log = self.connection.reconnect_log 350 self._host = 0 351 self._retrying = False 352 self._next_retry = None 353 self._transport = None 354 355 self._timeout = None 356 357 self.engine = None
358
359 - def _next_host(self):
360 urls = [URL(u) for u in self.connection.reconnect_urls] 361 hosts = [(self.connection.host, default(self.connection.port, 5672))] + \ 362 [(u.host, default(u.port, 5672)) for u in urls] 363 if self._host >= len(hosts): 364 self._host = 0 365 result = hosts[self._host] 366 if self._host == 0: 367 self._attempts += 1 368 self._host = self._host + 1 369 return result
370
371 - def _num_hosts(self):
372 return len(self.connection.reconnect_urls) + 1
373 374 @synchronized
375 - def wakeup(self):
376 self.dispatch() 377 self._selector.wakeup()
378
379 - def start(self):
380 self._selector.register(self)
381
382 - def stop(self):
383 self._selector.unregister(self) 384 if self._transport: 385 self.st_closed()
386
387 - def fileno(self):
388 return self._transport.fileno()
389 390 @synchronized
391 - def reading(self):
392 return self._transport is not None and \ 393 self._transport.reading(True)
394 395 @synchronized
396 - def writing(self):
397 return self._transport is not None and \ 398 self._transport.writing(self.engine.pending())
399 400 @synchronized
401 - def timing(self):
402 return self._timeout
403 404 @synchronized
405 - def readable(self):
406 try: 407 data = self._transport.recv(64*1024) 408 if data is None: 409 return 410 elif data: 411 rawlog.debug("READ[%s]: %r", self.log_id, data) 412 self.engine.write(data) 413 else: 414 self.close_engine() 415 except socket.error, e: 416 self.close_engine(ConnectionError(text=str(e))) 417 418 self.update_status() 419 420 self._notify()
421
422 - def _notify(self):
423 if self.connection.error: 424 self.connection._condition.gc() 425 self.connection._waiter.notifyAll()
426
427 - def close_engine(self, e=None):
428 if e is None: 429 e = ConnectionError(text="connection aborted") 430 431 if (self.connection.reconnect and 432 (self.connection.reconnect_limit is None or 433 self.connection.reconnect_limit <= 0 or 434 self._attempts <= self.connection.reconnect_limit)): 435 if self._host < self._num_hosts(): 436 delay = 0 437 else: 438 delay = self._delay 439 self._delay = min(2*self._delay, 440 self.connection.reconnect_interval_max) 441 self._next_retry = time.time() + delay 442 if self._reconnect_log: 443 log.warn("recoverable error[attempt %s]: %s" % (self._attempts, e)) 444 if delay > 0: 445 log.warn("sleeping %s seconds" % delay) 446 self._retrying = True 447 self.engine.close() 448 else: 449 self.engine.close(e) 450 451 self.schedule()
452
453 - def update_status(self):
454 status = self.engine.status() 455 return getattr(self, "st_%s" % status.lower())()
456
457 - def st_closed(self):
458 # XXX: this log statement seems to sometimes hit when the socket is not connected 459 # XXX: rawlog.debug("CLOSE[%s]: %s", self.log_id, self._socket.getpeername()) 460 self._transport.close() 461 self._transport = None 462 self.engine = None 463 return True
464
465 - def st_open(self):
466 return False
467 468 @synchronized
469 - def writeable(self):
470 notify = False 471 try: 472 n = self._transport.send(self.engine.peek()) 473 if n == 0: return 474 sent = self.engine.read(n) 475 rawlog.debug("SENT[%s]: %r", self.log_id, sent) 476 except socket.error, e: 477 self.close_engine(e) 478 notify = True 479 480 if self.update_status() or notify: 481 self._notify()
482 483 @synchronized
484 - def timeout(self):
485 self.dispatch() 486 self._notify() 487 self.schedule()
488
489 - def schedule(self):
490 times = [] 491 if self.connection.heartbeat: 492 times.append(time.time() + self.connection.heartbeat) 493 if self._next_retry: 494 times.append(self._next_retry) 495 if times: 496 self._timeout = min(times) 497 else: 498 self._timeout = None
499
500 - def dispatch(self):
501 try: 502 if self._transport is None: 503 if self.connection._connected and not self.connection.error: 504 self.connect() 505 else: 506 self.engine.dispatch() 507 except HeartbeatTimeout, e: 508 self.close_engine(e) 509 except ContentError, e: 510 msg = compat.format_exc() 511 self.connection.error = ContentError(text=msg) 512 except: 513 # XXX: Does socket get leaked if this occurs? 514 msg = compat.format_exc() 515 self.connection.error = InternalError(text=msg)
516
517 - def connect(self):
518 if self._retrying and time.time() < self._next_retry: 519 return 520 521 try: 522 # XXX: should make this non blocking 523 host, port = self._next_host() 524 if self._retrying and self._reconnect_log: 525 log.warn("trying: %s:%s", host, port) 526 self.engine = Engine(self.connection) 527 self.engine.open() 528 rawlog.debug("OPEN[%s]: %s:%s", self.log_id, host, port) 529 trans = transports.TRANSPORTS.get(self.connection.transport) 530 if trans: 531 self._transport = trans(self.connection, host, port) 532 else: 533 raise ConnectError("no such transport: %s" % self.connection.transport) 534 if self._retrying and self._reconnect_log: 535 log.warn("reconnect succeeded: %s:%s", host, port) 536 self._next_retry = None 537 self._attempts = 0 538 self._delay = self.connection.reconnect_interval_min 539 self._retrying = False 540 self.schedule() 541 except socket.error, e: 542 self.close_engine(ConnectError(text=str(e)))
543 544 DEFAULT_DISPOSITION = Disposition(None)
545 546 -def get_bindings(opts, queue=None, exchange=None, key=None):
547 bindings = opts.get("x-bindings", []) 548 cmds = [] 549 for b in bindings: 550 exchange = b.get("exchange", exchange) 551 queue = b.get("queue", queue) 552 key = b.get("key", key) 553 args = b.get("arguments", {}) 554 cmds.append(ExchangeBind(queue, exchange, key, args)) 555 return cmds
556 557 CONNECTION_ERRS = { 558 # anythong not here (i.e. everything right now) will default to 559 # connection error 560 } 561 562 SESSION_ERRS = { 563 # anything not here will default to session error 564 error_code.unauthorized_access: UnauthorizedAccess, 565 error_code.not_found: NotFound, 566 error_code.resource_locked: ReceiverError, 567 error_code.resource_limit_exceeded: TargetCapacityExceeded, 568 error_code.internal_error: ServerError 569 }
570 571 -class Engine:
572
573 - def __init__(self, connection):
574 self.connection = connection 575 self.log_id = "%x" % id(self.connection) 576 self._closing = False 577 self._connected = False 578 self._attachments = {} 579 580 self._in = LinkIn() 581 self._out = LinkOut() 582 583 self._channel_max = 65536 584 self._channels = 0 585 self._sessions = {} 586 587 self.address_cache = Cache(self.connection.address_ttl) 588 589 self._status = CLOSED 590 self._buf = "" 591 self._hdr = "" 592 self._last_in = None 593 self._last_out = None 594 self._op_enc = OpEncoder() 595 self._seg_enc = SegmentEncoder() 596 self._frame_enc = FrameEncoder() 597 self._frame_dec = FrameDecoder() 598 self._seg_dec = SegmentDecoder() 599 self._op_dec = OpDecoder() 600 601 self._sasl = sasl.Client() 602 if self.connection.username: 603 self._sasl.setAttr("username", self.connection.username) 604 if self.connection.password: 605 self._sasl.setAttr("password", self.connection.password) 606 if self.connection.host: 607 self._sasl.setAttr("host", self.connection.host) 608 self._sasl.setAttr("service", self.connection.sasl_service) 609 if self.connection.sasl_min_ssf is not None: 610 self._sasl.setAttr("minssf", self.connection.sasl_min_ssf) 611 if self.connection.sasl_max_ssf is not None: 612 self._sasl.setAttr("maxssf", self.connection.sasl_max_ssf) 613 self._sasl.init() 614 self._sasl_encode = False 615 self._sasl_decode = False
616
617 - def _reset(self):
618 self.connection._transport_connected = False 619 620 for ssn in self.connection.sessions.values(): 621 for m in ssn.acked + ssn.unacked + ssn.incoming: 622 m._transfer_id = None 623 for snd in ssn.senders: 624 snd.linked = False 625 for rcv in ssn.receivers: 626 rcv.impending = rcv.received 627 rcv.linked = False
628
629 - def status(self):
630 return self._status
631
632 - def write(self, data):
633 self._last_in = time.time() 634 try: 635 if self._sasl_decode: 636 data = self._sasl.decode(data) 637 638 if len(self._hdr) < 8: 639 r = 8 - len(self._hdr) 640 self._hdr += data[:r] 641 data = data[r:] 642 643 if len(self._hdr) == 8: 644 self.do_header(self._hdr) 645 646 self._frame_dec.write(data) 647 self._seg_dec.write(*self._frame_dec.read()) 648 self._op_dec.write(*self._seg_dec.read()) 649 for op in self._op_dec.read(): 650 self.assign_id(op) 651 opslog.debug("RCVD[%s]: %r", self.log_id, op) 652 op.dispatch(self) 653 self.dispatch() 654 except MessagingError, e: 655 self.close(e) 656 except: 657 self.close(InternalError(text=compat.format_exc()))
658
659 - def close(self, e=None):
660 self._reset() 661 if e: 662 self.connection.error = e 663 self._status = CLOSED
664
665 - def assign_id(self, op):
666 if isinstance(op, Command): 667 sst = self.get_sst(op) 668 op.id = sst.received 669 sst.received += 1
670
671 - def pending(self):
672 return len(self._buf)
673
674 - def read(self, n):
675 result = self._buf[:n] 676 self._buf = self._buf[n:] 677 return result
678
679 - def peek(self):
680 return self._buf
681
682 - def write_op(self, op):
683 opslog.debug("SENT[%s]: %r", self.log_id, op) 684 self._op_enc.write(op) 685 self._seg_enc.write(*self._op_enc.read()) 686 self._frame_enc.write(*self._seg_enc.read()) 687 bytes = self._frame_enc.read() 688 if self._sasl_encode: 689 bytes = self._sasl.encode(bytes) 690 self._buf += bytes 691 self._last_out = time.time()
692
693 - def do_header(self, hdr):
694 cli_major = 0; cli_minor = 10 695 magic, _, _, major, minor = struct.unpack(HEADER, hdr) 696 if major != cli_major or minor != cli_minor: 697 raise VersionError(text="client: %s-%s, server: %s-%s" % 698 (cli_major, cli_minor, major, minor))
699
700 - def do_connection_start(self, start):
701 if self.connection.sasl_mechanisms: 702 permitted = self.connection.sasl_mechanisms.split() 703 mechs = [m for m in start.mechanisms if m in permitted] 704 else: 705 mechs = start.mechanisms 706 try: 707 mech, initial = self._sasl.start(" ".join(mechs)) 708 except sasl.SASLError, e: 709 raise AuthenticationFailure(text=str(e)) 710 711 client_properties = get_client_properties_with_defaults(provided_client_properties=self.connection.client_properties); 712 self.write_op(ConnectionStartOk(client_properties=client_properties, 713 mechanism=mech, response=initial))
714
715 - def do_connection_secure(self, secure):
716 resp = self._sasl.step(secure.challenge) 717 self.write_op(ConnectionSecureOk(response=resp))
718
719 - def do_connection_tune(self, tune):
720 # XXX: is heartbeat protocol specific? 721 if tune.channel_max is not None: 722 self.channel_max = tune.channel_max 723 self.write_op(ConnectionTuneOk(heartbeat=self.connection.heartbeat, 724 channel_max=self.channel_max)) 725 self.write_op(ConnectionOpen()) 726 self._sasl_encode = True
727
728 - def do_connection_open_ok(self, open_ok):
729 self.connection.auth_username = self._sasl.auth_username() 730 self._connected = True 731 self._sasl_decode = True 732 self.connection._transport_connected = True
733
734 - def do_connection_heartbeat(self, hrt):
735 pass
736
737 - def do_connection_close(self, close):
738 self.write_op(ConnectionCloseOk()) 739 if close.reply_code != close_code.normal: 740 exc = CONNECTION_ERRS.get(close.reply_code, ConnectionError) 741 self.connection.error = exc(close.reply_code, close.reply_text)
742 # XXX: should we do a half shutdown on the socket here? 743 # XXX: we really need to test this, we may end up reporting a 744 # connection abort after this, if we were to do a shutdown on read 745 # and stop reading, then we wouldn't report the abort, that's 746 # probably the right thing to do 747
748 - def do_connection_close_ok(self, close_ok):
749 self.close()
750
751 - def do_session_attached(self, atc):
752 pass
753
754 - def do_session_command_point(self, cp):
755 sst = self.get_sst(cp) 756 sst.received = cp.command_id
757
758 - def do_session_completed(self, sc):
759 sst = self.get_sst(sc) 760 for r in sc.commands: 761 sst.acknowledged.add(r.lower, r.upper) 762 763 if not sc.commands.empty(): 764 while sst.min_completion in sc.commands: 765 if sst.actions.has_key(sst.min_completion): 766 sst.actions.pop(sst.min_completion)() 767 sst.min_completion += 1
768
769 - def session_known_completed(self, kcmp):
770 sst = self.get_sst(kcmp) 771 executed = RangedSet() 772 for e in sst.executed.ranges: 773 for ke in kcmp.ranges: 774 if e.lower in ke and e.upper in ke: 775 break 776 else: 777 executed.add_range(e) 778 sst.executed = completed
779
780 - def do_session_flush(self, sf):
781 sst = self.get_sst(sf) 782 if sf.expected: 783 if sst.received is None: 784 exp = None 785 else: 786 exp = RangedSet(sst.received) 787 sst.write_op(SessionExpected(exp)) 788 if sf.confirmed: 789 sst.write_op(SessionConfirmed(sst.executed)) 790 if sf.completed: 791 sst.write_op(SessionCompleted(sst.executed))
792
793 - def do_session_request_timeout(self, rt):
794 sst = self.get_sst(rt) 795 sst.write_op(SessionTimeout(timeout=0))
796
797 - def do_execution_result(self, er):
798 sst = self.get_sst(er) 799 sst.results[er.command_id] = er.value 800 sst.executed.add(er.id)
801
802 - def do_execution_exception(self, ex):
803 sst = self.get_sst(ex) 804 exc = SESSION_ERRS.get(ex.error_code, SessionError) 805 sst.session.error = exc(ex.error_code, ex.description)
806
807 - def dispatch(self):
808 if not self.connection._connected and not self._closing and self._status != CLOSED: 809 self.disconnect() 810 811 if self._connected and not self._closing: 812 for ssn in self.connection.sessions.values(): 813 self.attach(ssn) 814 self.process(ssn) 815 816 if self.connection.heartbeat and self._status != CLOSED: 817 now = time.time() 818 if self._last_in is not None and \ 819 now - self._last_in > 2*self.connection.heartbeat: 820 raise HeartbeatTimeout(text="heartbeat timeout") 821 if self._last_out is None or now - self._last_out >= self.connection.heartbeat/2.0: 822 self.write_op(ConnectionHeartbeat())
823
824 - def open(self):
825 self._reset() 826 self._status = OPEN 827 self._buf += struct.pack(HEADER, "AMQP", 1, 1, 0, 10)
828
829 - def disconnect(self):
830 self.write_op(ConnectionClose(close_code.normal)) 831 self._closing = True
832
833 - def attach(self, ssn):
834 if ssn.closed: return 835 sst = self._attachments.get(ssn) 836 if sst is None: 837 for i in xrange(0, self.channel_max): 838 if not self._sessions.has_key(i): 839 ch = i 840 break 841 else: 842 raise RuntimeError("all channels used") 843 sst = SessionState(self, ssn, ssn.name, ch) 844 sst.write_op(SessionAttach(name=ssn.name)) 845 sst.write_op(SessionCommandPoint(sst.sent, 0)) 846 sst.outgoing_idx = 0 847 sst.acked = [] 848 sst.acked_idx = 0 849 if ssn.transactional: 850 sst.write_cmd(TxSelect()) 851 self._attachments[ssn] = sst 852 self._sessions[sst.channel] = sst 853 854 for snd in ssn.senders: 855 self.link(snd, self._out, snd.target) 856 for rcv in ssn.receivers: 857 self.link(rcv, self._in, rcv.source) 858 859 if sst is not None and ssn.closing and not sst.detached: 860 sst.detached = True 861 sst.write_op(SessionDetach(name=ssn.name))
862
863 - def get_sst(self, op):
864 return self._sessions[op.channel]
865
866 - def do_session_detached(self, dtc):
867 sst = self._sessions.pop(dtc.channel) 868 ssn = sst.session 869 del self._attachments[ssn] 870 ssn.closed = True
871
872 - def do_session_detach(self, dtc):
873 sst = self.get_sst(dtc) 874 sst.write_op(SessionDetached(name=dtc.name)) 875 self.do_session_detached(dtc)
876 894 895 def resolved(type, subtype): 896 dir.do_link(sst, lnk, _lnk, type, subtype, linked)
897 898 self.resolve_declare(sst, _lnk, dir.DIR_NAME, resolved) 899 self._attachments[lnk] = _lnk 900 901 if lnk.linked and lnk.closing and not lnk.closed: 902 if not _lnk.closing: 903 def unlinked(): 904 dir.del_link(sst, lnk, _lnk) 905 del self._attachments[lnk] 906 lnk.closed = True 907 if _lnk.options.get("delete") in ("always", dir.DIR_NAME): 908 dir.do_unlink(sst, lnk, _lnk) 909 self.delete(sst, _lnk.name, unlinked) 910 else: 911 dir.do_unlink(sst, lnk, _lnk, unlinked) 912 _lnk.closing = True 913 elif not lnk.linked and lnk.closing and not lnk.closed: 914 if lnk.error: lnk.closed = True 915
916 - def parse_address(self, lnk, dir, addr):
917 if addr is None: 918 return MalformedAddress(text="%s is None" % dir.ADDR_NAME) 919 else: 920 try: 921 lnk.name, lnk.subject, lnk.options = address.parse(addr) 922 # XXX: subject 923 if lnk.options is None: 924 lnk.options = {} 925 except address.LexError, e: 926 return MalformedAddress(text=str(e)) 927 except address.ParseError, e: 928 return MalformedAddress(text=str(e))
929
930 - def validate_options(self, lnk, dir):
931 ctx = Context() 932 err = dir.VALIDATOR.validate(lnk.options, ctx) 933 if err: return InvalidOption(text="error in options: %s" % err)
934
935 - def resolve_declare(self, sst, lnk, dir, action):
936 declare = lnk.options.get("create") in ("always", dir) 937 assrt = lnk.options.get("assert") in ("always", dir) 938 def do_resolved(type, subtype): 939 err = None 940 if type is None: 941 if declare: 942 err = self.declare(sst, lnk, action) 943 else: 944 err = NotFound(text="no such queue: %s" % lnk.name) 945 else: 946 if assrt: 947 expected = lnk.options.get("node", {}).get("type") 948 if expected and type != expected: 949 err = AssertionFailed(text="expected %s, got %s" % (expected, type)) 950 if err is None: 951 action(type, subtype) 952 953 if err: 954 tgt = lnk.target 955 tgt.error = err 956 del self._attachments[tgt] 957 tgt.closed = True 958 return
959 self.resolve(sst, lnk.name, do_resolved, force=declare) 960
961 - def resolve(self, sst, name, action, force=False):
962 if not force: 963 try: 964 type, subtype = self.address_cache[name] 965 action(type, subtype) 966 return 967 except KeyError: 968 pass 969 970 args = [] 971 def do_result(r): 972 args.append(r)
973 def do_action(r): 974 do_result(r) 975 er, qr = args 976 if er.not_found and not qr.queue: 977 type, subtype = None, None 978 elif qr.queue: 979 type, subtype = "queue", None 980 else: 981 type, subtype = "topic", er.type 982 if type is not None: 983 self.address_cache[name] = (type, subtype) 984 action(type, subtype) 985 sst.write_query(ExchangeQuery(name), do_result) 986 sst.write_query(QueueQuery(name), do_action) 987
988 - def declare(self, sst, lnk, action):
989 name = lnk.name 990 props = lnk.options.get("node", {}) 991 durable = props.get("durable", DURABLE_DEFAULT) 992 type = props.get("type", "queue") 993 declare = props.get("x-declare", {}) 994 995 if type == "topic": 996 cmd = ExchangeDeclare(exchange=name, durable=durable) 997 bindings = get_bindings(props, exchange=name) 998 elif type == "queue": 999 cmd = QueueDeclare(queue=name, durable=durable) 1000 bindings = get_bindings(props, queue=name) 1001 else: 1002 raise ValueError(type) 1003 1004 sst.apply_overrides(cmd, declare) 1005 1006 if type == "topic": 1007 if cmd.type is None: 1008 cmd.type = "topic" 1009 subtype = cmd.type 1010 else: 1011 subtype = None 1012 1013 cmds = [cmd] 1014 cmds.extend(bindings) 1015 1016 def declared(): 1017 self.address_cache[name] = (type, subtype) 1018 action(type, subtype)
1019 1020 sst.write_cmds(cmds, declared) 1021
1022 - def delete(self, sst, name, action):
1023 def deleted(): 1024 del self.address_cache[name] 1025 action()
1026 1027 def do_delete(type, subtype): 1028 if type == "topic": 1029 sst.write_cmd(ExchangeDelete(name), deleted) 1030 elif type == "queue": 1031 sst.write_cmd(QueueDelete(name), deleted) 1032 elif type is None: 1033 action() 1034 else: 1035 raise ValueError(type) 1036 self.resolve(sst, name, do_delete, force=True) 1037
1038 - def process(self, ssn):
1039 if ssn.closed or ssn.closing: return 1040 1041 sst = self._attachments[ssn] 1042 1043 while sst.outgoing_idx < len(ssn.outgoing): 1044 msg = ssn.outgoing[sst.outgoing_idx] 1045 snd = msg._sender 1046 # XXX: should check for sender error here 1047 _snd = self._attachments.get(snd) 1048 if _snd and snd.linked: 1049 self.send(snd, msg) 1050 sst.outgoing_idx += 1 1051 else: 1052 break 1053 1054 for snd in ssn.senders: 1055 # XXX: should included snd.acked in this 1056 if snd.synced >= snd.queued and sst.need_sync: 1057 sst.write_cmd(ExecutionSync(), sync_noop) 1058 1059 for rcv in ssn.receivers: 1060 self.process_receiver(rcv) 1061 1062 if ssn.acked: 1063 messages = ssn.acked[sst.acked_idx:] 1064 if messages: 1065 ids = RangedSet() 1066 1067 disposed = [(DEFAULT_DISPOSITION, [])] 1068 acked = [] 1069 for m in messages: 1070 # XXX: we're ignoring acks that get lost when disconnected, 1071 # could we deal this via some message-id based purge? 1072 if m._transfer_id is None: 1073 acked.append(m) 1074 continue 1075 ids.add(m._transfer_id) 1076 if m._receiver._accept_mode is accept_mode.explicit: 1077 disp = m._disposition or DEFAULT_DISPOSITION 1078 last, msgs = disposed[-1] 1079 if disp.type is last.type and disp.options == last.options: 1080 msgs.append(m) 1081 else: 1082 disposed.append((disp, [m])) 1083 else: 1084 acked.append(m) 1085 1086 for range in ids: 1087 sst.executed.add_range(range) 1088 sst.write_op(SessionCompleted(sst.executed)) 1089 1090 def ack_acker(msgs): 1091 def ack_ack(): 1092 for m in msgs: 1093 ssn.acked.remove(m) 1094 sst.acked_idx -= 1 1095 # XXX: should this check accept_mode too? 1096 if not ssn.transactional: 1097 sst.acked.remove(m)
1098 return ack_ack 1099 1100 for disp, msgs in disposed: 1101 if not msgs: continue 1102 if disp.type is None: 1103 op = MessageAccept 1104 elif disp.type is RELEASED: 1105 op = MessageRelease 1106 elif disp.type is REJECTED: 1107 op = MessageReject 1108 sst.write_cmd(op(RangedSet(*[m._transfer_id for m in msgs]), 1109 **disp.options), 1110 ack_acker(msgs)) 1111 if log.isEnabledFor(DEBUG): 1112 for m in msgs: 1113 log.debug("SACK[%s]: %s, %s", ssn.log_id, m, m._disposition) 1114 1115 sst.acked.extend(messages) 1116 sst.acked_idx += len(messages) 1117 ack_acker(acked)() 1118 1119 if ssn.committing and not sst.committing: 1120 def commit_ok(): 1121 del sst.acked[:] 1122 ssn.committing = False 1123 ssn.committed = True 1124 ssn.aborting = False 1125 ssn.aborted = False 1126 sst.committing = False 1127 sst.write_cmd(TxCommit(), commit_ok) 1128 sst.committing = True 1129 1130 if ssn.aborting and not sst.aborting: 1131 sst.aborting = True 1132 def do_rb(): 1133 messages = sst.acked + ssn.unacked + ssn.incoming 1134 ids = RangedSet(*[m._transfer_id for m in messages]) 1135 for range in ids: 1136 sst.executed.add_range(range) 1137 sst.write_op(SessionCompleted(sst.executed)) 1138 sst.write_cmd(MessageRelease(ids, True)) 1139 sst.write_cmd(TxRollback(), do_rb_ok) 1140 1141 def do_rb_ok(): 1142 del ssn.incoming[:] 1143 del ssn.unacked[:] 1144 del sst.acked[:] 1145 1146 for rcv in ssn.receivers: 1147 rcv.impending = rcv.received 1148 rcv.returned = rcv.received 1149 # XXX: do we need to update granted here as well? 1150 1151 for rcv in ssn.receivers: 1152 self.process_receiver(rcv) 1153 1154 ssn.aborting = False 1155 ssn.aborted = True 1156 ssn.committing = False 1157 ssn.committed = False 1158 sst.aborting = False 1159 1160 for rcv in ssn.receivers: 1161 _rcv = self._attachments[rcv] 1162 sst.write_cmd(MessageStop(_rcv.destination)) 1163 sst.write_cmd(ExecutionSync(), do_rb) 1164
1165 - def grant(self, rcv):
1166 sst = self._attachments[rcv.session] 1167 _rcv = self._attachments.get(rcv) 1168 if _rcv is None or not rcv.linked or _rcv.closing or _rcv.draining: 1169 return 1170 1171 if rcv.granted is UNLIMITED: 1172 if rcv.impending is UNLIMITED: 1173 delta = 0 1174 else: 1175 delta = UNLIMITED 1176 elif rcv.impending is UNLIMITED: 1177 delta = -1 1178 else: 1179 delta = max(rcv.granted, rcv.received) - rcv.impending 1180 1181 if delta is UNLIMITED: 1182 if not _rcv.bytes_open: 1183 sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.byte, UNLIMITED.value)) 1184 _rcv.bytes_open = True 1185 sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.message, UNLIMITED.value)) 1186 rcv.impending = UNLIMITED 1187 elif delta > 0: 1188 if not _rcv.bytes_open: 1189 sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.byte, UNLIMITED.value)) 1190 _rcv.bytes_open = True 1191 sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.message, delta)) 1192 rcv.impending += delta 1193 elif delta < 0 and not rcv.draining: 1194 _rcv.draining = True 1195 def do_stop(): 1196 rcv.impending = rcv.received 1197 _rcv.draining = False 1198 _rcv.bytes_open = False 1199 self.grant(rcv)
1200 sst.write_cmd(MessageStop(_rcv.destination), do_stop) 1201 1202 if rcv.draining: 1203 _rcv.draining = True 1204 def do_flush(): 1205 rcv.impending = rcv.received 1206 rcv.granted = rcv.impending 1207 _rcv.draining = False 1208 _rcv.bytes_open = False 1209 rcv.draining = False 1210 sst.write_cmd(MessageFlush(_rcv.destination), do_flush) 1211 1212
1213 - def process_receiver(self, rcv):
1214 if rcv.closed: return 1215 self.grant(rcv)
1216
1217 - def send(self, snd, msg):
1218 sst = self._attachments[snd.session] 1219 _snd = self._attachments[snd] 1220 1221 if msg.subject is None or _snd._exchange == "": 1222 rk = _snd._routing_key 1223 else: 1224 rk = msg.subject 1225 1226 if msg.subject is None: 1227 subject = _snd.subject 1228 else: 1229 subject = msg.subject 1230 1231 # XXX: do we need to query to figure out how to create the reply-to interoperably? 1232 if msg.reply_to: 1233 rt = addr2reply_to(msg.reply_to) 1234 else: 1235 rt = None 1236 content_encoding = msg.properties.get("x-amqp-0-10.content-encoding") 1237 dp = DeliveryProperties(routing_key=rk) 1238 mp = MessageProperties(message_id=msg.id, 1239 user_id=msg.user_id, 1240 reply_to=rt, 1241 correlation_id=msg.correlation_id, 1242 app_id = msg.properties.get("x-amqp-0-10.app-id"), 1243 content_type=msg.content_type, 1244 content_encoding=content_encoding, 1245 application_headers=msg.properties) 1246 if subject is not None: 1247 if mp.application_headers is None: 1248 mp.application_headers = {} 1249 mp.application_headers[SUBJECT] = subject 1250 if msg.durable is not None: 1251 if msg.durable: 1252 dp.delivery_mode = delivery_mode.persistent 1253 else: 1254 dp.delivery_mode = delivery_mode.non_persistent 1255 if msg.priority is not None: 1256 dp.priority = msg.priority 1257 if msg.ttl is not None: 1258 dp.ttl = long(msg.ttl*1000) 1259 enc, dec = get_codec(msg.content_type) 1260 try: 1261 body = enc(msg.content) 1262 except AttributeError, e: 1263 # convert to non-blocking EncodeError 1264 raise EncodeError(e) 1265 1266 # XXX: this is not safe for out of order, can this be triggered by pre_ack? 1267 def msg_acked(): 1268 # XXX: should we log the ack somehow too? 1269 snd.acked += 1 1270 m = snd.session.outgoing.pop(0) 1271 sst.outgoing_idx -= 1 1272 log.debug("RACK[%s]: %s", sst.session.log_id, msg) 1273 assert msg == m
1274 1275 xfr = MessageTransfer(destination=_snd._exchange, headers=(dp, mp), 1276 payload=body) 1277 1278 if _snd.pre_ack: 1279 sst.write_cmd(xfr) 1280 else: 1281 sst.write_cmd(xfr, msg_acked, sync=msg._sync) 1282 1283 log.debug("SENT[%s]: %s", sst.session.log_id, msg) 1284 1285 if _snd.pre_ack: 1286 msg_acked() 1287
1288 - def do_message_transfer(self, xfr):
1289 sst = self.get_sst(xfr) 1290 ssn = sst.session 1291 1292 msg = self._decode(xfr) 1293 rcv = sst.destinations[xfr.destination].target 1294 msg._receiver = rcv 1295 if rcv.closing or rcv.closed: # release message to a closing receiver 1296 ids = RangedSet(*[msg._transfer_id]) 1297 log.debug("releasing back %s message: %s, as receiver is closing", ids, msg) 1298 sst.write_cmd(MessageRelease(ids, True)) 1299 return 1300 if rcv.impending is not UNLIMITED: 1301 assert rcv.received < rcv.impending, "%s, %s" % (rcv.received, rcv.impending) 1302 rcv.received += 1 1303 log.debug("RCVD[%s]: %s", ssn.log_id, msg) 1304 ssn.incoming.append(msg)
1305
1306 - def _decode(self, xfr):
1307 dp = EMPTY_DP 1308 mp = EMPTY_MP 1309 1310 for h in xfr.headers: 1311 if isinstance(h, DeliveryProperties): 1312 dp = h 1313 elif isinstance(h, MessageProperties): 1314 mp = h 1315 1316 ap = mp.application_headers 1317 enc, dec = get_codec(mp.content_type) 1318 try: 1319 content = dec(xfr.payload) 1320 except Exception, e: 1321 raise DecodeError(e) 1322 msg = Message(content) 1323 msg.id = mp.message_id 1324 if ap is not None: 1325 msg.subject = ap.get(SUBJECT) 1326 msg.user_id = mp.user_id 1327 if mp.reply_to is not None: 1328 msg.reply_to = reply_to2addr(mp.reply_to) 1329 msg.correlation_id = mp.correlation_id 1330 if dp.delivery_mode is not None: 1331 msg.durable = dp.delivery_mode == delivery_mode.persistent 1332 msg.priority = dp.priority 1333 if dp.ttl is not None: 1334 msg.ttl = dp.ttl/1000.0 1335 msg.redelivered = dp.redelivered 1336 msg.properties = mp.application_headers or {} 1337 if mp.app_id is not None: 1338 msg.properties["x-amqp-0-10.app-id"] = mp.app_id 1339 if mp.content_encoding is not None: 1340 msg.properties["x-amqp-0-10.content-encoding"] = mp.content_encoding 1341 if dp.routing_key is not None: 1342 msg.properties["x-amqp-0-10.routing-key"] = dp.routing_key 1343 if dp.timestamp is not None: 1344 msg.properties["x-amqp-0-10.timestamp"] = dp.timestamp 1345 msg.content_type = mp.content_type 1346 msg._transfer_id = xfr.id 1347 return msg
1348