Package qpid :: Package messaging :: Module driver
[frames] | no frames]

Source Code for Module qpid.messaging.driver

   1  # 
   2  # Licensed to the Apache Software Foundation (ASF) under one 
   3  # or more contributor license agreements.  See the NOTICE file 
   4  # distributed with this work for additional information 
   5  # regarding copyright ownership.  The ASF licenses this file 
   6  # to you under the Apache License, Version 2.0 (the 
   7  # "License"); you may not use this file except in compliance 
   8  # with the License.  You may obtain a copy of the License at 
   9  # 
  10  #   http://www.apache.org/licenses/LICENSE-2.0 
  11  # 
  12  # Unless required by applicable law or agreed to in writing, 
  13  # software distributed under the License is distributed on an 
  14  # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
  15  # KIND, either express or implied.  See the License for the 
  16  # specific language governing permissions and limitations 
  17  # under the License. 
  18  # 
  19   
  20  import socket, struct, sys, time 
  21  from logging import getLogger, DEBUG 
  22  from qpid import compat 
  23  from qpid import sasl 
  24  from qpid.concurrency import synchronized 
  25  from qpid.datatypes import RangedSet, Serial 
  26  from qpid.framing import OpEncoder, SegmentEncoder, FrameEncoder, \ 
  27      FrameDecoder, SegmentDecoder, OpDecoder 
  28  from qpid.messaging import address, transports 
  29  from qpid.messaging.constants import UNLIMITED, REJECTED, RELEASED 
  30  from qpid.messaging.exceptions import * 
  31  from qpid.messaging.message import get_codec, Disposition, Message 
  32  from qpid.ops import * 
  33  from qpid.selector import Selector 
  34  from qpid.util import URL, default,get_client_properties_with_defaults 
  35  from qpid.validator import And, Context, List, Map, Types, Values 
  36  from threading import Condition, Thread 
  37   
  38  log = getLogger("qpid.messaging") 
  39  rawlog = getLogger("qpid.messaging.io.raw") 
  40  opslog = getLogger("qpid.messaging.io.ops") 
41 42 -def addr2reply_to(addr):
43 name, subject, options = address.parse(addr) 44 if options: 45 type = options.get("node", {}).get("type") 46 else: 47 type = None 48 49 if type == "topic": 50 return ReplyTo(name, subject) 51 else: 52 return ReplyTo(None, name)
53
54 -def reply_to2addr(reply_to):
55 if reply_to.exchange in (None, ""): 56 return reply_to.routing_key 57 elif reply_to.routing_key is None: 58 return "%s; {node: {type: topic}}" % reply_to.exchange 59 else: 60 return "%s/%s; {node: {type: topic}}" % (reply_to.exchange, reply_to.routing_key)
61
62 -class Attachment:
63
64 - def __init__(self, target):
65 self.target = target
66 67 # XXX 68 69 DURABLE_DEFAULT=False
70 71 # XXX 72 73 -class Pattern:
74 """ 75 The pattern filter matches the supplied wildcard pattern against a 76 message subject. 77 """ 78
79 - def __init__(self, value):
80 self.value = value
81 82 # XXX: this should become part of the driver
83 - def _bind(self, sst, exchange, queue):
84 from qpid.ops import ExchangeBind 85 86 sst.write_cmd(ExchangeBind(exchange=exchange, queue=queue, 87 binding_key=self.value.replace("*", "#")))
88 89 SUBJECT_DEFAULTS = { 90 "topic": "#" 91 }
92 93 -def noop(): pass
94 -def sync_noop(): pass
95
96 -class SessionState:
97
98 - def __init__(self, driver, session, name, channel):
99 self.driver = driver 100 self.session = session 101 self.name = name 102 self.channel = channel 103 self.detached = False 104 self.committing = False 105 self.aborting = False 106 107 # sender state 108 self.sent = Serial(0) 109 self.acknowledged = RangedSet() 110 self.actions = {} 111 self.min_completion = self.sent 112 self.max_completion = self.sent 113 self.results = {} 114 self.need_sync = False 115 116 # receiver state 117 self.received = None 118 self.executed = RangedSet() 119 120 # XXX: need to periodically exchange completion/known_completion 121 122 self.destinations = {}
123
124 - def write_query(self, query, handler):
125 id = self.sent 126 self.write_cmd(query, lambda: handler(self.results.pop(id)))
127
128 - def apply_overrides(self, cmd, overrides):
129 for k, v in overrides.items(): 130 cmd[k.replace('-', '_')] = v
131
132 - def write_cmd(self, cmd, action=noop, overrides=None, sync=True):
133 if overrides: 134 self.apply_overrides(cmd, overrides) 135 136 if action != noop: 137 cmd.sync = sync 138 if self.detached: 139 raise Exception("detached") 140 cmd.id = self.sent 141 self.sent += 1 142 self.actions[cmd.id] = action 143 self.max_completion = cmd.id 144 self.write_op(cmd) 145 self.need_sync = not cmd.sync
146
147 - def write_cmds(self, cmds, action=noop):
148 if cmds: 149 for cmd in cmds[:-1]: 150 self.write_cmd(cmd) 151 self.write_cmd(cmds[-1], action) 152 else: 153 action()
154
155 - def write_op(self, op):
156 op.channel = self.channel 157 self.driver.write_op(op)
158 159 POLICIES = Values("always", "sender", "receiver", "never") 160 RELIABILITY = Values("unreliable", "at-most-once", "at-least-once", 161 "exactly-once") 162 163 DECLARE = Map({}, restricted=False) 164 BINDINGS = List(Map({ 165 "exchange": Types(basestring), 166 "queue": Types(basestring), 167 "key": Types(basestring), 168 "arguments": Map({}, restricted=False) 169 })) 170 171 COMMON_OPTS = { 172 "create": POLICIES, 173 "delete": POLICIES, 174 "assert": POLICIES, 175 "node": Map({ 176 "type": Values("queue", "topic"), 177 "durable": Types(bool), 178 "x-declare": DECLARE, 179 "x-bindings": BINDINGS 180 }), 181 "link": Map({ 182 "name": Types(basestring), 183 "durable": Types(bool), 184 "reliability": RELIABILITY, 185 "x-declare": DECLARE, 186 "x-bindings": BINDINGS, 187 "x-subscribe": Map({}, restricted=False) 188 }) 189 } 190 191 RECEIVE_MODES = Values("browse", "consume") 192 193 SOURCE_OPTS = COMMON_OPTS.copy() 194 SOURCE_OPTS.update({ 195 "mode": RECEIVE_MODES 196 }) 197 198 TARGET_OPTS = COMMON_OPTS.copy()
199 200 -class LinkIn:
201 202 ADDR_NAME = "source" 203 DIR_NAME = "receiver" 204 VALIDATOR = Map(SOURCE_OPTS) 205 212 256 263
266
267 -class LinkOut:
268 269 ADDR_NAME = "target" 270 DIR_NAME = "sender" 271 VALIDATOR = Map(TARGET_OPTS) 272 276 290 293
296
297 -class Cache:
298
299 - def __init__(self, ttl):
300 self.ttl = ttl 301 self.entries = {}
302
303 - def __setitem__(self, key, value):
304 self.entries[key] = time.time(), value
305
306 - def __getitem__(self, key):
307 tstamp, value = self.entries[key] 308 if time.time() - tstamp >= self.ttl: 309 del self.entries[key] 310 raise KeyError(key) 311 else: 312 return value
313
314 - def __delitem__(self, key):
315 del self.entries[key]
316 317 # XXX 318 HEADER="!4s4B" 319 320 EMPTY_DP = DeliveryProperties() 321 EMPTY_MP = MessageProperties() 322 323 SUBJECT = "qpid.subject" 324 325 CLOSED = "CLOSED" 326 READ_ONLY = "READ_ONLY" 327 WRITE_ONLY = "WRITE_ONLY" 328 OPEN = "OPEN"
329 330 -class Driver:
331
332 - def __init__(self, connection):
333 self.connection = connection 334 self.log_id = "%x" % id(self.connection) 335 self._lock = self.connection._lock 336 337 self._selector = Selector.default() 338 self._attempts = 0 339 self._delay = self.connection.reconnect_interval_min 340 self._reconnect_log = self.connection.reconnect_log 341 self._host = 0 342 self._retrying = False 343 self._next_retry = None 344 self._transport = None 345 346 self._timeout = None 347 348 self.engine = None
349
350 - def _next_host(self):
351 urls = [URL(u) for u in self.connection.reconnect_urls] 352 hosts = [(self.connection.host, default(self.connection.port, 5672))] + \ 353 [(u.host, default(u.port, 5672)) for u in urls] 354 if self._host >= len(hosts): 355 self._host = 0 356 result = hosts[self._host] 357 if self._host == 0: 358 self._attempts += 1 359 self._host = self._host + 1 360 return result
361
362 - def _num_hosts(self):
363 return len(self.connection.reconnect_urls) + 1
364 365 @synchronized
366 - def wakeup(self):
367 self.dispatch() 368 self._selector.wakeup()
369
370 - def start(self):
371 self._selector.register(self)
372
373 - def stop(self):
374 self._selector.unregister(self) 375 if self._transport: 376 self.st_closed()
377
378 - def fileno(self):
379 return self._transport.fileno()
380 381 @synchronized
382 - def reading(self):
383 return self._transport is not None and \ 384 self._transport.reading(True)
385 386 @synchronized
387 - def writing(self):
388 return self._transport is not None and \ 389 self._transport.writing(self.engine.pending())
390 391 @synchronized
392 - def timing(self):
393 return self._timeout
394 395 @synchronized
396 - def readable(self):
397 try: 398 data = self._transport.recv(64*1024) 399 if data is None: 400 return 401 elif data: 402 rawlog.debug("READ[%s]: %r", self.log_id, data) 403 self.engine.write(data) 404 else: 405 self.close_engine() 406 except socket.error, e: 407 self.close_engine(ConnectionError(text=str(e))) 408 409 self.update_status() 410 411 self._notify()
412
413 - def _notify(self):
414 if self.connection.error: 415 self.connection._condition.gc() 416 self.connection._waiter.notifyAll()
417
418 - def close_engine(self, e=None):
419 if e is None: 420 e = ConnectionError(text="connection aborted") 421 422 if (self.connection.reconnect and 423 (self.connection.reconnect_limit is None or 424 self.connection.reconnect_limit <= 0 or 425 self._attempts <= self.connection.reconnect_limit)): 426 if self._host < self._num_hosts(): 427 delay = 0 428 else: 429 delay = self._delay 430 self._delay = min(2*self._delay, 431 self.connection.reconnect_interval_max) 432 self._next_retry = time.time() + delay 433 if self._reconnect_log: 434 log.warn("recoverable error[attempt %s]: %s" % (self._attempts, e)) 435 if delay > 0: 436 log.warn("sleeping %s seconds" % delay) 437 self._retrying = True 438 self.engine.close() 439 else: 440 self.engine.close(e) 441 442 self.schedule()
443
444 - def update_status(self):
445 status = self.engine.status() 446 return getattr(self, "st_%s" % status.lower())()
447
448 - def st_closed(self):
449 # XXX: this log statement seems to sometimes hit when the socket is not connected 450 # XXX: rawlog.debug("CLOSE[%s]: %s", self.log_id, self._socket.getpeername()) 451 self._transport.close() 452 self._transport = None 453 self.engine = None 454 return True
455
456 - def st_open(self):
457 return False
458 459 @synchronized
460 - def writeable(self):
461 notify = False 462 try: 463 n = self._transport.send(self.engine.peek()) 464 if n == 0: return 465 sent = self.engine.read(n) 466 rawlog.debug("SENT[%s]: %r", self.log_id, sent) 467 except socket.error, e: 468 self.close_engine(e) 469 notify = True 470 471 if self.update_status() or notify: 472 self._notify()
473 474 @synchronized
475 - def timeout(self):
476 self.dispatch() 477 self._notify() 478 self.schedule()
479
480 - def schedule(self):
481 times = [] 482 if self.connection.heartbeat: 483 times.append(time.time() + self.connection.heartbeat) 484 if self._next_retry: 485 times.append(self._next_retry) 486 if times: 487 self._timeout = min(times) 488 else: 489 self._timeout = None
490
491 - def dispatch(self):
492 try: 493 if self._transport is None: 494 if self.connection._connected and not self.connection.error: 495 self.connect() 496 else: 497 self.engine.dispatch() 498 except HeartbeatTimeout, e: 499 self.close_engine(e) 500 except ContentError, e: 501 msg = compat.format_exc() 502 self.connection.error = ContentError(text=msg) 503 except: 504 # XXX: Does socket get leaked if this occurs? 505 msg = compat.format_exc() 506 self.connection.error = InternalError(text=msg)
507
508 - def connect(self):
509 if self._retrying and time.time() < self._next_retry: 510 return 511 512 try: 513 # XXX: should make this non blocking 514 host, port = self._next_host() 515 if self._retrying and self._reconnect_log: 516 log.warn("trying: %s:%s", host, port) 517 self.engine = Engine(self.connection) 518 self.engine.open() 519 rawlog.debug("OPEN[%s]: %s:%s", self.log_id, host, port) 520 trans = transports.TRANSPORTS.get(self.connection.transport) 521 if trans: 522 self._transport = trans(self.connection, host, port) 523 else: 524 raise ConnectError("no such transport: %s" % self.connection.transport) 525 if self._retrying and self._reconnect_log: 526 log.warn("reconnect succeeded: %s:%s", host, port) 527 self._next_retry = None 528 self._attempts = 0 529 self._delay = self.connection.reconnect_interval_min 530 self._retrying = False 531 self.schedule() 532 except socket.error, e: 533 self.close_engine(ConnectError(text=str(e)))
534 535 DEFAULT_DISPOSITION = Disposition(None)
536 537 -def get_bindings(opts, queue=None, exchange=None, key=None):
538 bindings = opts.get("x-bindings", []) 539 cmds = [] 540 for b in bindings: 541 exchange = b.get("exchange", exchange) 542 queue = b.get("queue", queue) 543 key = b.get("key", key) 544 args = b.get("arguments", {}) 545 cmds.append(ExchangeBind(queue, exchange, key, args)) 546 return cmds
547 548 CONNECTION_ERRS = { 549 # anythong not here (i.e. everything right now) will default to 550 # connection error 551 } 552 553 SESSION_ERRS = { 554 # anything not here will default to session error 555 error_code.unauthorized_access: UnauthorizedAccess, 556 error_code.not_found: NotFound, 557 error_code.resource_locked: ReceiverError, 558 error_code.resource_limit_exceeded: TargetCapacityExceeded, 559 error_code.internal_error: ServerError 560 }
561 562 -class Engine:
563
564 - def __init__(self, connection):
565 self.connection = connection 566 self.log_id = "%x" % id(self.connection) 567 self._closing = False 568 self._connected = False 569 self._attachments = {} 570 571 self._in = LinkIn() 572 self._out = LinkOut() 573 574 self._channel_max = 65536 575 self._channels = 0 576 self._sessions = {} 577 578 self.address_cache = Cache(self.connection.address_ttl) 579 580 self._status = CLOSED 581 self._buf = "" 582 self._hdr = "" 583 self._last_in = None 584 self._last_out = None 585 self._op_enc = OpEncoder() 586 self._seg_enc = SegmentEncoder() 587 self._frame_enc = FrameEncoder() 588 self._frame_dec = FrameDecoder() 589 self._seg_dec = SegmentDecoder() 590 self._op_dec = OpDecoder() 591 592 self._sasl = sasl.Client() 593 if self.connection.username: 594 self._sasl.setAttr("username", self.connection.username) 595 if self.connection.password: 596 self._sasl.setAttr("password", self.connection.password) 597 if self.connection.host: 598 self._sasl.setAttr("host", self.connection.host) 599 self._sasl.setAttr("service", self.connection.sasl_service) 600 if self.connection.sasl_min_ssf is not None: 601 self._sasl.setAttr("minssf", self.connection.sasl_min_ssf) 602 if self.connection.sasl_max_ssf is not None: 603 self._sasl.setAttr("maxssf", self.connection.sasl_max_ssf) 604 self._sasl.init() 605 self._sasl_encode = False 606 self._sasl_decode = False
607
608 - def _reset(self):
609 self.connection._transport_connected = False 610 611 for ssn in self.connection.sessions.values(): 612 for m in ssn.acked + ssn.unacked + ssn.incoming: 613 m._transfer_id = None 614 for snd in ssn.senders: 615 snd.linked = False 616 for rcv in ssn.receivers: 617 rcv.impending = rcv.received 618 rcv.linked = False
619
620 - def status(self):
621 return self._status
622
623 - def write(self, data):
624 self._last_in = time.time() 625 try: 626 if self._sasl_decode: 627 data = self._sasl.decode(data) 628 629 if len(self._hdr) < 8: 630 r = 8 - len(self._hdr) 631 self._hdr += data[:r] 632 data = data[r:] 633 634 if len(self._hdr) == 8: 635 self.do_header(self._hdr) 636 637 self._frame_dec.write(data) 638 self._seg_dec.write(*self._frame_dec.read()) 639 self._op_dec.write(*self._seg_dec.read()) 640 for op in self._op_dec.read(): 641 self.assign_id(op) 642 opslog.debug("RCVD[%s]: %r", self.log_id, op) 643 op.dispatch(self) 644 self.dispatch() 645 except MessagingError, e: 646 self.close(e) 647 except: 648 self.close(InternalError(text=compat.format_exc()))
649
650 - def close(self, e=None):
651 self._reset() 652 if e: 653 self.connection.error = e 654 self._status = CLOSED
655
656 - def assign_id(self, op):
657 if isinstance(op, Command): 658 sst = self.get_sst(op) 659 op.id = sst.received 660 sst.received += 1
661
662 - def pending(self):
663 return len(self._buf)
664
665 - def read(self, n):
666 result = self._buf[:n] 667 self._buf = self._buf[n:] 668 return result
669
670 - def peek(self):
671 return self._buf
672
673 - def write_op(self, op):
674 opslog.debug("SENT[%s]: %r", self.log_id, op) 675 self._op_enc.write(op) 676 self._seg_enc.write(*self._op_enc.read()) 677 self._frame_enc.write(*self._seg_enc.read()) 678 bytes = self._frame_enc.read() 679 if self._sasl_encode: 680 bytes = self._sasl.encode(bytes) 681 self._buf += bytes 682 self._last_out = time.time()
683
684 - def do_header(self, hdr):
685 cli_major = 0; cli_minor = 10 686 magic, _, _, major, minor = struct.unpack(HEADER, hdr) 687 if major != cli_major or minor != cli_minor: 688 raise VersionError(text="client: %s-%s, server: %s-%s" % 689 (cli_major, cli_minor, major, minor))
690
691 - def do_connection_start(self, start):
692 if self.connection.sasl_mechanisms: 693 permitted = self.connection.sasl_mechanisms.split() 694 mechs = [m for m in start.mechanisms if m in permitted] 695 else: 696 mechs = start.mechanisms 697 try: 698 mech, initial = self._sasl.start(" ".join(mechs)) 699 except sasl.SASLError, e: 700 raise AuthenticationFailure(text=str(e)) 701 702 client_properties = get_client_properties_with_defaults(provided_client_properties=self.connection.client_properties); 703 self.write_op(ConnectionStartOk(client_properties=client_properties, 704 mechanism=mech, response=initial))
705
706 - def do_connection_secure(self, secure):
707 resp = self._sasl.step(secure.challenge) 708 self.write_op(ConnectionSecureOk(response=resp))
709
710 - def do_connection_tune(self, tune):
711 # XXX: is heartbeat protocol specific? 712 if tune.channel_max is not None: 713 self.channel_max = tune.channel_max 714 self.write_op(ConnectionTuneOk(heartbeat=self.connection.heartbeat, 715 channel_max=self.channel_max)) 716 self.write_op(ConnectionOpen()) 717 self._sasl_encode = True
718
719 - def do_connection_open_ok(self, open_ok):
720 self.connection.auth_username = self._sasl.auth_username() 721 self._connected = True 722 self._sasl_decode = True 723 self.connection._transport_connected = True
724
725 - def do_connection_heartbeat(self, hrt):
726 pass
727
728 - def do_connection_close(self, close):
729 self.write_op(ConnectionCloseOk()) 730 if close.reply_code != close_code.normal: 731 exc = CONNECTION_ERRS.get(close.reply_code, ConnectionError) 732 self.connection.error = exc(close.reply_code, close.reply_text)
733 # XXX: should we do a half shutdown on the socket here? 734 # XXX: we really need to test this, we may end up reporting a 735 # connection abort after this, if we were to do a shutdown on read 736 # and stop reading, then we wouldn't report the abort, that's 737 # probably the right thing to do 738
739 - def do_connection_close_ok(self, close_ok):
740 self.close()
741
742 - def do_session_attached(self, atc):
743 pass
744
745 - def do_session_command_point(self, cp):
746 sst = self.get_sst(cp) 747 sst.received = cp.command_id
748
749 - def do_session_completed(self, sc):
750 sst = self.get_sst(sc) 751 for r in sc.commands: 752 sst.acknowledged.add(r.lower, r.upper) 753 754 if not sc.commands.empty(): 755 while sst.min_completion in sc.commands: 756 if sst.actions.has_key(sst.min_completion): 757 sst.actions.pop(sst.min_completion)() 758 sst.min_completion += 1
759
760 - def session_known_completed(self, kcmp):
761 sst = self.get_sst(kcmp) 762 executed = RangedSet() 763 for e in sst.executed.ranges: 764 for ke in kcmp.ranges: 765 if e.lower in ke and e.upper in ke: 766 break 767 else: 768 executed.add_range(e) 769 sst.executed = completed
770
771 - def do_session_flush(self, sf):
772 sst = self.get_sst(sf) 773 if sf.expected: 774 if sst.received is None: 775 exp = None 776 else: 777 exp = RangedSet(sst.received) 778 sst.write_op(SessionExpected(exp)) 779 if sf.confirmed: 780 sst.write_op(SessionConfirmed(sst.executed)) 781 if sf.completed: 782 sst.write_op(SessionCompleted(sst.executed))
783
784 - def do_session_request_timeout(self, rt):
785 sst = self.get_sst(rt) 786 sst.write_op(SessionTimeout(timeout=0))
787
788 - def do_execution_result(self, er):
789 sst = self.get_sst(er) 790 sst.results[er.command_id] = er.value 791 sst.executed.add(er.id)
792
793 - def do_execution_exception(self, ex):
794 sst = self.get_sst(ex) 795 exc = SESSION_ERRS.get(ex.error_code, SessionError) 796 sst.session.error = exc(ex.error_code, ex.description)
797
798 - def dispatch(self):
799 if not self.connection._connected and not self._closing and self._status != CLOSED: 800 self.disconnect() 801 802 if self._connected and not self._closing: 803 for ssn in self.connection.sessions.values(): 804 self.attach(ssn) 805 self.process(ssn) 806 807 if self.connection.heartbeat and self._status != CLOSED: 808 now = time.time() 809 if self._last_in is not None and \ 810 now - self._last_in > 2*self.connection.heartbeat: 811 raise HeartbeatTimeout(text="heartbeat timeout") 812 if self._last_out is None or now - self._last_out >= self.connection.heartbeat/2.0: 813 self.write_op(ConnectionHeartbeat())
814
815 - def open(self):
816 self._reset() 817 self._status = OPEN 818 self._buf += struct.pack(HEADER, "AMQP", 1, 1, 0, 10)
819
820 - def disconnect(self):
821 self.write_op(ConnectionClose(close_code.normal)) 822 self._closing = True
823
824 - def attach(self, ssn):
825 if ssn.closed: return 826 sst = self._attachments.get(ssn) 827 if sst is None: 828 for i in xrange(0, self.channel_max): 829 if not self._sessions.has_key(i): 830 ch = i 831 break 832 else: 833 raise RuntimeError("all channels used") 834 sst = SessionState(self, ssn, ssn.name, ch) 835 sst.write_op(SessionAttach(name=ssn.name)) 836 sst.write_op(SessionCommandPoint(sst.sent, 0)) 837 sst.outgoing_idx = 0 838 sst.acked = [] 839 sst.acked_idx = 0 840 if ssn.transactional: 841 sst.write_cmd(TxSelect()) 842 self._attachments[ssn] = sst 843 self._sessions[sst.channel] = sst 844 845 for snd in ssn.senders: 846 self.link(snd, self._out, snd.target) 847 for rcv in ssn.receivers: 848 self.link(rcv, self._in, rcv.source) 849 850 if sst is not None and ssn.closing and not sst.detached: 851 sst.detached = True 852 sst.write_op(SessionDetach(name=ssn.name))
853
854 - def get_sst(self, op):
855 return self._sessions[op.channel]
856
857 - def do_session_detached(self, dtc):
858 sst = self._sessions.pop(dtc.channel) 859 ssn = sst.session 860 del self._attachments[ssn] 861 ssn.closed = True
862
863 - def do_session_detach(self, dtc):
864 sst = self.get_sst(dtc) 865 sst.write_op(SessionDetached(name=dtc.name)) 866 self.do_session_detached(dtc)
867 885 886 def resolved(type, subtype): 887 dir.do_link(sst, lnk, _lnk, type, subtype, linked)
888 889 self.resolve_declare(sst, _lnk, dir.DIR_NAME, resolved) 890 self._attachments[lnk] = _lnk 891 892 if lnk.linked and lnk.closing and not lnk.closed: 893 if not _lnk.closing: 894 def unlinked(): 895 dir.del_link(sst, lnk, _lnk) 896 del self._attachments[lnk] 897 lnk.closed = True 898 if _lnk.options.get("delete") in ("always", dir.DIR_NAME): 899 dir.do_unlink(sst, lnk, _lnk) 900 self.delete(sst, _lnk.name, unlinked) 901 else: 902 dir.do_unlink(sst, lnk, _lnk, unlinked) 903 _lnk.closing = True 904 elif not lnk.linked and lnk.closing and not lnk.closed: 905 if lnk.error: lnk.closed = True 906
907 - def parse_address(self, lnk, dir, addr):
908 if addr is None: 909 return MalformedAddress(text="%s is None" % dir.ADDR_NAME) 910 else: 911 try: 912 lnk.name, lnk.subject, lnk.options = address.parse(addr) 913 # XXX: subject 914 if lnk.options is None: 915 lnk.options = {} 916 except address.LexError, e: 917 return MalformedAddress(text=str(e)) 918 except address.ParseError, e: 919 return MalformedAddress(text=str(e))
920
921 - def validate_options(self, lnk, dir):
922 ctx = Context() 923 err = dir.VALIDATOR.validate(lnk.options, ctx) 924 if err: return InvalidOption(text="error in options: %s" % err)
925
926 - def resolve_declare(self, sst, lnk, dir, action):
927 declare = lnk.options.get("create") in ("always", dir) 928 assrt = lnk.options.get("assert") in ("always", dir) 929 def do_resolved(type, subtype): 930 err = None 931 if type is None: 932 if declare: 933 err = self.declare(sst, lnk, action) 934 else: 935 err = NotFound(text="no such queue: %s" % lnk.name) 936 else: 937 if assrt: 938 expected = lnk.options.get("node", {}).get("type") 939 if expected and type != expected: 940 err = AssertionFailed(text="expected %s, got %s" % (expected, type)) 941 if err is None: 942 action(type, subtype) 943 944 if err: 945 tgt = lnk.target 946 tgt.error = err 947 del self._attachments[tgt] 948 tgt.closed = True 949 return
950 self.resolve(sst, lnk.name, do_resolved, force=declare) 951
952 - def resolve(self, sst, name, action, force=False):
953 if not force: 954 try: 955 type, subtype = self.address_cache[name] 956 action(type, subtype) 957 return 958 except KeyError: 959 pass 960 961 args = [] 962 def do_result(r): 963 args.append(r)
964 def do_action(r): 965 do_result(r) 966 er, qr = args 967 if er.not_found and not qr.queue: 968 type, subtype = None, None 969 elif qr.queue: 970 type, subtype = "queue", None 971 else: 972 type, subtype = "topic", er.type 973 if type is not None: 974 self.address_cache[name] = (type, subtype) 975 action(type, subtype) 976 sst.write_query(ExchangeQuery(name), do_result) 977 sst.write_query(QueueQuery(name), do_action) 978
979 - def declare(self, sst, lnk, action):
980 name = lnk.name 981 props = lnk.options.get("node", {}) 982 durable = props.get("durable", DURABLE_DEFAULT) 983 type = props.get("type", "queue") 984 declare = props.get("x-declare", {}) 985 986 if type == "topic": 987 cmd = ExchangeDeclare(exchange=name, durable=durable) 988 bindings = get_bindings(props, exchange=name) 989 elif type == "queue": 990 cmd = QueueDeclare(queue=name, durable=durable) 991 bindings = get_bindings(props, queue=name) 992 else: 993 raise ValueError(type) 994 995 sst.apply_overrides(cmd, declare) 996 997 if type == "topic": 998 if cmd.type is None: 999 cmd.type = "topic" 1000 subtype = cmd.type 1001 else: 1002 subtype = None 1003 1004 cmds = [cmd] 1005 cmds.extend(bindings) 1006 1007 def declared(): 1008 self.address_cache[name] = (type, subtype) 1009 action(type, subtype)
1010 1011 sst.write_cmds(cmds, declared) 1012
1013 - def delete(self, sst, name, action):
1014 def deleted(): 1015 del self.address_cache[name] 1016 action()
1017 1018 def do_delete(type, subtype): 1019 if type == "topic": 1020 sst.write_cmd(ExchangeDelete(name), deleted) 1021 elif type == "queue": 1022 sst.write_cmd(QueueDelete(name), deleted) 1023 elif type is None: 1024 action() 1025 else: 1026 raise ValueError(type) 1027 self.resolve(sst, name, do_delete, force=True) 1028
1029 - def process(self, ssn):
1030 if ssn.closed or ssn.closing: return 1031 1032 sst = self._attachments[ssn] 1033 1034 while sst.outgoing_idx < len(ssn.outgoing): 1035 msg = ssn.outgoing[sst.outgoing_idx] 1036 snd = msg._sender 1037 # XXX: should check for sender error here 1038 _snd = self._attachments.get(snd) 1039 if _snd and snd.linked: 1040 self.send(snd, msg) 1041 sst.outgoing_idx += 1 1042 else: 1043 break 1044 1045 for snd in ssn.senders: 1046 # XXX: should included snd.acked in this 1047 if snd.synced >= snd.queued and sst.need_sync: 1048 sst.write_cmd(ExecutionSync(), sync_noop) 1049 1050 for rcv in ssn.receivers: 1051 self.process_receiver(rcv) 1052 1053 if ssn.acked: 1054 messages = ssn.acked[sst.acked_idx:] 1055 if messages: 1056 ids = RangedSet() 1057 1058 disposed = [(DEFAULT_DISPOSITION, [])] 1059 acked = [] 1060 for m in messages: 1061 # XXX: we're ignoring acks that get lost when disconnected, 1062 # could we deal this via some message-id based purge? 1063 if m._transfer_id is None: 1064 acked.append(m) 1065 continue 1066 ids.add(m._transfer_id) 1067 if m._receiver._accept_mode is accept_mode.explicit: 1068 disp = m._disposition or DEFAULT_DISPOSITION 1069 last, msgs = disposed[-1] 1070 if disp.type is last.type and disp.options == last.options: 1071 msgs.append(m) 1072 else: 1073 disposed.append((disp, [m])) 1074 else: 1075 acked.append(m) 1076 1077 for range in ids: 1078 sst.executed.add_range(range) 1079 sst.write_op(SessionCompleted(sst.executed)) 1080 1081 def ack_acker(msgs): 1082 def ack_ack(): 1083 for m in msgs: 1084 ssn.acked.remove(m) 1085 sst.acked_idx -= 1 1086 # XXX: should this check accept_mode too? 1087 if not ssn.transactional: 1088 sst.acked.remove(m)
1089 return ack_ack 1090 1091 for disp, msgs in disposed: 1092 if not msgs: continue 1093 if disp.type is None: 1094 op = MessageAccept 1095 elif disp.type is RELEASED: 1096 op = MessageRelease 1097 elif disp.type is REJECTED: 1098 op = MessageReject 1099 sst.write_cmd(op(RangedSet(*[m._transfer_id for m in msgs]), 1100 **disp.options), 1101 ack_acker(msgs)) 1102 if log.isEnabledFor(DEBUG): 1103 for m in msgs: 1104 log.debug("SACK[%s]: %s, %s", ssn.log_id, m, m._disposition) 1105 1106 sst.acked.extend(messages) 1107 sst.acked_idx += len(messages) 1108 ack_acker(acked)() 1109 1110 if ssn.committing and not sst.committing: 1111 def commit_ok(): 1112 del sst.acked[:] 1113 ssn.committing = False 1114 ssn.committed = True 1115 ssn.aborting = False 1116 ssn.aborted = False 1117 sst.committing = False 1118 sst.write_cmd(TxCommit(), commit_ok) 1119 sst.committing = True 1120 1121 if ssn.aborting and not sst.aborting: 1122 sst.aborting = True 1123 def do_rb(): 1124 messages = sst.acked + ssn.unacked + ssn.incoming 1125 ids = RangedSet(*[m._transfer_id for m in messages]) 1126 for range in ids: 1127 sst.executed.add_range(range) 1128 sst.write_op(SessionCompleted(sst.executed)) 1129 sst.write_cmd(MessageRelease(ids, True)) 1130 sst.write_cmd(TxRollback(), do_rb_ok) 1131 1132 def do_rb_ok(): 1133 del ssn.incoming[:] 1134 del ssn.unacked[:] 1135 del sst.acked[:] 1136 1137 for rcv in ssn.receivers: 1138 rcv.impending = rcv.received 1139 rcv.returned = rcv.received 1140 # XXX: do we need to update granted here as well? 1141 1142 for rcv in ssn.receivers: 1143 self.process_receiver(rcv) 1144 1145 ssn.aborting = False 1146 ssn.aborted = True 1147 ssn.committing = False 1148 ssn.committed = False 1149 sst.aborting = False 1150 1151 for rcv in ssn.receivers: 1152 _rcv = self._attachments[rcv] 1153 sst.write_cmd(MessageStop(_rcv.destination)) 1154 sst.write_cmd(ExecutionSync(), do_rb) 1155
1156 - def grant(self, rcv):
1157 sst = self._attachments[rcv.session] 1158 _rcv = self._attachments.get(rcv) 1159 if _rcv is None or not rcv.linked or _rcv.closing or _rcv.draining: 1160 return 1161 1162 if rcv.granted is UNLIMITED: 1163 if rcv.impending is UNLIMITED: 1164 delta = 0 1165 else: 1166 delta = UNLIMITED 1167 elif rcv.impending is UNLIMITED: 1168 delta = -1 1169 else: 1170 delta = max(rcv.granted, rcv.received) - rcv.impending 1171 1172 if delta is UNLIMITED: 1173 if not _rcv.bytes_open: 1174 sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.byte, UNLIMITED.value)) 1175 _rcv.bytes_open = True 1176 sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.message, UNLIMITED.value)) 1177 rcv.impending = UNLIMITED 1178 elif delta > 0: 1179 if not _rcv.bytes_open: 1180 sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.byte, UNLIMITED.value)) 1181 _rcv.bytes_open = True 1182 sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.message, delta)) 1183 rcv.impending += delta 1184 elif delta < 0 and not rcv.draining: 1185 _rcv.draining = True 1186 def do_stop(): 1187 rcv.impending = rcv.received 1188 _rcv.draining = False 1189 _rcv.bytes_open = False 1190 self.grant(rcv)
1191 sst.write_cmd(MessageStop(_rcv.destination), do_stop) 1192 1193 if rcv.draining: 1194 _rcv.draining = True 1195 def do_flush(): 1196 rcv.impending = rcv.received 1197 rcv.granted = rcv.impending 1198 _rcv.draining = False 1199 _rcv.bytes_open = False 1200 rcv.draining = False 1201 sst.write_cmd(MessageFlush(_rcv.destination), do_flush) 1202 1203
1204 - def process_receiver(self, rcv):
1205 if rcv.closed: return 1206 self.grant(rcv)
1207
1208 - def send(self, snd, msg):
1209 sst = self._attachments[snd.session] 1210 _snd = self._attachments[snd] 1211 1212 if msg.subject is None or _snd._exchange == "": 1213 rk = _snd._routing_key 1214 else: 1215 rk = msg.subject 1216 1217 if msg.subject is None: 1218 subject = _snd.subject 1219 else: 1220 subject = msg.subject 1221 1222 # XXX: do we need to query to figure out how to create the reply-to interoperably? 1223 if msg.reply_to: 1224 rt = addr2reply_to(msg.reply_to) 1225 else: 1226 rt = None 1227 content_encoding = msg.properties.get("x-amqp-0-10.content-encoding") 1228 dp = DeliveryProperties(routing_key=rk) 1229 mp = MessageProperties(message_id=msg.id, 1230 user_id=msg.user_id, 1231 reply_to=rt, 1232 correlation_id=msg.correlation_id, 1233 app_id = msg.properties.get("x-amqp-0-10.app-id"), 1234 content_type=msg.content_type, 1235 content_encoding=content_encoding, 1236 application_headers=msg.properties) 1237 if subject is not None: 1238 if mp.application_headers is None: 1239 mp.application_headers = {} 1240 mp.application_headers[SUBJECT] = subject 1241 if msg.durable is not None: 1242 if msg.durable: 1243 dp.delivery_mode = delivery_mode.persistent 1244 else: 1245 dp.delivery_mode = delivery_mode.non_persistent 1246 if msg.priority is not None: 1247 dp.priority = msg.priority 1248 if msg.ttl is not None: 1249 dp.ttl = long(msg.ttl*1000) 1250 enc, dec = get_codec(msg.content_type) 1251 try: 1252 body = enc(msg.content) 1253 except AttributeError, e: 1254 # convert to non-blocking EncodeError 1255 raise EncodeError(e) 1256 1257 # XXX: this is not safe for out of order, can this be triggered by pre_ack? 1258 def msg_acked(): 1259 # XXX: should we log the ack somehow too? 1260 snd.acked += 1 1261 m = snd.session.outgoing.pop(0) 1262 sst.outgoing_idx -= 1 1263 log.debug("RACK[%s]: %s", sst.session.log_id, msg) 1264 assert msg == m
1265 1266 xfr = MessageTransfer(destination=_snd._exchange, headers=(dp, mp), 1267 payload=body) 1268 1269 if _snd.pre_ack: 1270 sst.write_cmd(xfr) 1271 else: 1272 sst.write_cmd(xfr, msg_acked, sync=msg._sync) 1273 1274 log.debug("SENT[%s]: %s", sst.session.log_id, msg) 1275 1276 if _snd.pre_ack: 1277 msg_acked() 1278
1279 - def do_message_transfer(self, xfr):
1280 sst = self.get_sst(xfr) 1281 ssn = sst.session 1282 1283 msg = self._decode(xfr) 1284 rcv = sst.destinations[xfr.destination].target 1285 msg._receiver = rcv 1286 if rcv.impending is not UNLIMITED: 1287 assert rcv.received < rcv.impending, "%s, %s" % (rcv.received, rcv.impending) 1288 rcv.received += 1 1289 log.debug("RCVD[%s]: %s", ssn.log_id, msg) 1290 ssn.incoming.append(msg)
1291
1292 - def _decode(self, xfr):
1293 dp = EMPTY_DP 1294 mp = EMPTY_MP 1295 1296 for h in xfr.headers: 1297 if isinstance(h, DeliveryProperties): 1298 dp = h 1299 elif isinstance(h, MessageProperties): 1300 mp = h 1301 1302 ap = mp.application_headers 1303 enc, dec = get_codec(mp.content_type) 1304 try: 1305 content = dec(xfr.payload) 1306 except Exception, e: 1307 raise DecodeError(e) 1308 msg = Message(content) 1309 msg.id = mp.message_id 1310 if ap is not None: 1311 msg.subject = ap.get(SUBJECT) 1312 msg.user_id = mp.user_id 1313 if mp.reply_to is not None: 1314 msg.reply_to = reply_to2addr(mp.reply_to) 1315 msg.correlation_id = mp.correlation_id 1316 if dp.delivery_mode is not None: 1317 msg.durable = dp.delivery_mode == delivery_mode.persistent 1318 msg.priority = dp.priority 1319 if dp.ttl is not None: 1320 msg.ttl = dp.ttl/1000.0 1321 msg.redelivered = dp.redelivered 1322 msg.properties = mp.application_headers or {} 1323 if mp.app_id is not None: 1324 msg.properties["x-amqp-0-10.app-id"] = mp.app_id 1325 if mp.content_encoding is not None: 1326 msg.properties["x-amqp-0-10.content-encoding"] = mp.content_encoding 1327 if dp.routing_key is not None: 1328 msg.properties["x-amqp-0-10.routing-key"] = dp.routing_key 1329 if dp.timestamp is not None: 1330 msg.properties["x-amqp-0-10.timestamp"] = dp.timestamp 1331 msg.content_type = mp.content_type 1332 msg._transfer_id = xfr.id 1333 return msg
1334