Package qpid :: Package messaging :: Module driver
[frames] | no frames]

Source Code for Module qpid.messaging.driver

   1  # 
   2  # Licensed to the Apache Software Foundation (ASF) under one 
   3  # or more contributor license agreements.  See the NOTICE file 
   4  # distributed with this work for additional information 
   5  # regarding copyright ownership.  The ASF licenses this file 
   6  # to you under the Apache License, Version 2.0 (the 
   7  # "License"); you may not use this file except in compliance 
   8  # with the License.  You may obtain a copy of the License at 
   9  # 
  10  #   http://www.apache.org/licenses/LICENSE-2.0 
  11  # 
  12  # Unless required by applicable law or agreed to in writing, 
  13  # software distributed under the License is distributed on an 
  14  # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
  15  # KIND, either express or implied.  See the License for the 
  16  # specific language governing permissions and limitations 
  17  # under the License. 
  18  # 
  19   
  20  import socket, struct, sys, time 
  21  from logging import getLogger, DEBUG 
  22  from qpid import compat 
  23  from qpid import sasl 
  24  from qpid.concurrency import synchronized 
  25  from qpid.datatypes import RangedSet, Serial 
  26  from qpid.framing import OpEncoder, SegmentEncoder, FrameEncoder, \ 
  27      FrameDecoder, SegmentDecoder, OpDecoder 
  28  from qpid.messaging import address, transports 
  29  from qpid.messaging.constants import UNLIMITED, REJECTED, RELEASED 
  30  from qpid.messaging.exceptions import * 
  31  from qpid.messaging.message import get_codec, Disposition, Message 
  32  from qpid.ops import * 
  33  from qpid.selector import Selector 
  34  from qpid.util import URL, default,get_client_properties_with_defaults 
  35  from qpid.validator import And, Context, List, Map, Types, Values 
  36  from threading import Condition, Thread 
  37   
  38  log = getLogger("qpid.messaging") 
  39  rawlog = getLogger("qpid.messaging.io.raw") 
  40  opslog = getLogger("qpid.messaging.io.ops") 
41 42 -def addr2reply_to(addr):
43 name, subject, options = address.parse(addr) 44 if options: 45 type = options.get("node", {}).get("type") 46 else: 47 type = None 48 49 if type == "topic": 50 return ReplyTo(name, subject) 51 else: 52 return ReplyTo(None, name)
53
54 -def reply_to2addr(reply_to):
55 if reply_to.exchange in (None, ""): 56 return reply_to.routing_key 57 elif reply_to.routing_key is None: 58 return "%s; {node: {type: topic}}" % reply_to.exchange 59 else: 60 return "%s/%s; {node: {type: topic}}" % (reply_to.exchange, reply_to.routing_key)
61
62 -class Attachment:
63
64 - def __init__(self, target):
65 self.target = target
66 67 # XXX 68 69 DURABLE_DEFAULT=False
70 71 # XXX 72 73 -class Pattern:
74 """ 75 The pattern filter matches the supplied wildcard pattern against a 76 message subject. 77 """ 78
79 - def __init__(self, value):
80 self.value = value
81 82 # XXX: this should become part of the driver
83 - def _bind(self, sst, exchange, queue):
84 from qpid.ops import ExchangeBind 85 86 sst.write_cmd(ExchangeBind(exchange=exchange, queue=queue, 87 binding_key=self.value.replace("*", "#")))
88 89 SUBJECT_DEFAULTS = { 90 "topic": "#" 91 }
92 93 -def noop(): pass
94 -def sync_noop(): pass
95
96 -class SessionState:
97
98 - def __init__(self, driver, session, name, channel):
99 self.driver = driver 100 self.session = session 101 self.name = name 102 self.channel = channel 103 self.detached = False 104 self.committing = False 105 self.aborting = False 106 107 # sender state 108 self.sent = Serial(0) 109 self.acknowledged = RangedSet() 110 self.actions = {} 111 self.min_completion = self.sent 112 self.max_completion = self.sent 113 self.results = {} 114 self.need_sync = False 115 116 # receiver state 117 self.received = None 118 self.executed = RangedSet() 119 120 # XXX: need to periodically exchange completion/known_completion 121 122 self.destinations = {}
123
124 - def write_query(self, query, handler):
125 id = self.sent 126 self.write_cmd(query, lambda: handler(self.results.pop(id)))
127
128 - def apply_overrides(self, cmd, overrides):
129 for k, v in overrides.items(): 130 cmd[k.replace('-', '_')] = v
131
132 - def write_cmd(self, cmd, action=noop, overrides=None, sync=True):
133 if overrides: 134 self.apply_overrides(cmd, overrides) 135 136 if action != noop: 137 cmd.sync = sync 138 if self.detached: 139 raise Exception("detached") 140 cmd.id = self.sent 141 self.sent += 1 142 self.actions[cmd.id] = action 143 self.max_completion = cmd.id 144 self.write_op(cmd) 145 self.need_sync = not cmd.sync
146
147 - def write_cmds(self, cmds, action=noop):
148 if cmds: 149 for cmd in cmds[:-1]: 150 self.write_cmd(cmd) 151 self.write_cmd(cmds[-1], action) 152 else: 153 action()
154
155 - def write_op(self, op):
156 op.channel = self.channel 157 self.driver.write_op(op)
158 159 POLICIES = Values("always", "sender", "receiver", "never") 160 RELIABILITY = Values("unreliable", "at-most-once", "at-least-once", 161 "exactly-once") 162 163 DECLARE = Map({}, restricted=False) 164 BINDINGS = List(Map({ 165 "exchange": Types(basestring), 166 "queue": Types(basestring), 167 "key": Types(basestring), 168 "arguments": Map({}, restricted=False) 169 })) 170 171 COMMON_OPTS = { 172 "create": POLICIES, 173 "delete": POLICIES, 174 "assert": POLICIES, 175 "node": Map({ 176 "type": Values("queue", "topic"), 177 "durable": Types(bool), 178 "x-declare": DECLARE, 179 "x-bindings": BINDINGS 180 }), 181 "link": Map({ 182 "name": Types(basestring), 183 "durable": Types(bool), 184 "reliability": RELIABILITY, 185 "x-declare": DECLARE, 186 "x-bindings": BINDINGS, 187 "x-subscribe": Map({}, restricted=False) 188 }) 189 } 190 191 RECEIVE_MODES = Values("browse", "consume") 192 193 SOURCE_OPTS = COMMON_OPTS.copy() 194 SOURCE_OPTS.update({ 195 "mode": RECEIVE_MODES 196 }) 197 198 TARGET_OPTS = COMMON_OPTS.copy()
199 200 -class LinkIn:
201 202 ADDR_NAME = "source" 203 DIR_NAME = "receiver" 204 VALIDATOR = Map(SOURCE_OPTS) 205 212 256 263
266
267 -class LinkOut:
268 269 ADDR_NAME = "target" 270 DIR_NAME = "sender" 271 VALIDATOR = Map(TARGET_OPTS) 272 276 290 293
296
297 -class Cache:
298
299 - def __init__(self, ttl):
300 self.ttl = ttl 301 self.entries = {}
302
303 - def __setitem__(self, key, value):
304 self.entries[key] = time.time(), value
305
306 - def __getitem__(self, key):
307 tstamp, value = self.entries[key] 308 if time.time() - tstamp >= self.ttl: 309 del self.entries[key] 310 raise KeyError(key) 311 else: 312 return value
313
314 - def __delitem__(self, key):
315 del self.entries[key]
316 317 # XXX 318 HEADER="!4s4B" 319 320 EMPTY_DP = DeliveryProperties() 321 EMPTY_MP = MessageProperties() 322 323 SUBJECT = "qpid.subject" 324 325 CLOSED = "CLOSED" 326 READ_ONLY = "READ_ONLY" 327 WRITE_ONLY = "WRITE_ONLY" 328 OPEN = "OPEN"
329 330 -class Driver:
331
332 - def __init__(self, connection):
333 self.connection = connection 334 self.log_id = "%x" % id(self.connection) 335 self._lock = self.connection._lock 336 337 self._selector = Selector.default() 338 self._attempts = 0 339 self._delay = self.connection.reconnect_interval_min 340 self._reconnect_log = self.connection.reconnect_log 341 self._host = 0 342 self._retrying = False 343 self._next_retry = None 344 self._transport = None 345 346 self._timeout = None 347 348 self.engine = None
349
350 - def _next_host(self):
351 urls = [URL(u) for u in self.connection.reconnect_urls] 352 hosts = [(self.connection.host, default(self.connection.port, 5672))] + \ 353 [(u.host, default(u.port, 5672)) for u in urls] 354 if self._host >= len(hosts): 355 self._host = 0 356 result = hosts[self._host] 357 if self._host == 0: 358 self._attempts += 1 359 self._host = self._host + 1 360 return result
361
362 - def _num_hosts(self):
363 return len(self.connection.reconnect_urls) + 1
364 365 @synchronized
366 - def wakeup(self):
367 self.dispatch() 368 self._selector.wakeup()
369
370 - def start(self):
371 self._selector.register(self)
372
373 - def stop(self):
374 self._selector.unregister(self) 375 if self._transport: 376 self.st_closed()
377
378 - def fileno(self):
379 return self._transport.fileno()
380 381 @synchronized
382 - def reading(self):
383 return self._transport is not None and \ 384 self._transport.reading(True)
385 386 @synchronized
387 - def writing(self):
388 return self._transport is not None and \ 389 self._transport.writing(self.engine.pending())
390 391 @synchronized
392 - def timing(self):
393 return self._timeout
394 395 @synchronized
396 - def readable(self):
397 try: 398 data = self._transport.recv(64*1024) 399 if data is None: 400 return 401 elif data: 402 rawlog.debug("READ[%s]: %r", self.log_id, data) 403 self.engine.write(data) 404 else: 405 self.close_engine() 406 except socket.error, e: 407 self.close_engine(ConnectionError(text=str(e))) 408 409 self.update_status() 410 411 self._notify()
412
413 - def _notify(self):
414 if self.connection.error: 415 self.connection._condition.gc() 416 self.connection._waiter.notifyAll()
417
418 - def close_engine(self, e=None):
419 if e is None: 420 e = ConnectionError(text="connection aborted") 421 422 if (self.connection.reconnect and 423 (self.connection.reconnect_limit is None or 424 self.connection.reconnect_limit <= 0 or 425 self._attempts <= self.connection.reconnect_limit)): 426 if self._host < self._num_hosts(): 427 delay = 0 428 else: 429 delay = self._delay 430 self._delay = min(2*self._delay, 431 self.connection.reconnect_interval_max) 432 self._next_retry = time.time() + delay 433 if self._reconnect_log: 434 log.warn("recoverable error[attempt %s]: %s" % (self._attempts, e)) 435 if delay > 0: 436 log.warn("sleeping %s seconds" % delay) 437 self._retrying = True 438 self.engine.close() 439 else: 440 self.engine.close(e) 441 442 self.schedule()
443
444 - def update_status(self):
445 status = self.engine.status() 446 return getattr(self, "st_%s" % status.lower())()
447
448 - def st_closed(self):
449 # XXX: this log statement seems to sometimes hit when the socket is not connected 450 # XXX: rawlog.debug("CLOSE[%s]: %s", self.log_id, self._socket.getpeername()) 451 self._transport.close() 452 self._transport = None 453 self.engine = None 454 return True
455
456 - def st_open(self):
457 return False
458 459 @synchronized
460 - def writeable(self):
461 notify = False 462 try: 463 n = self._transport.send(self.engine.peek()) 464 if n == 0: return 465 sent = self.engine.read(n) 466 rawlog.debug("SENT[%s]: %r", self.log_id, sent) 467 except socket.error, e: 468 self.close_engine(e) 469 notify = True 470 471 if self.update_status() or notify: 472 self._notify()
473 474 @synchronized
475 - def timeout(self):
476 self.dispatch() 477 self._notify() 478 self.schedule()
479
480 - def schedule(self):
481 times = [] 482 if self.connection.heartbeat: 483 times.append(time.time() + self.connection.heartbeat) 484 if self._next_retry: 485 times.append(self._next_retry) 486 if times: 487 self._timeout = min(times) 488 else: 489 self._timeout = None
490
491 - def dispatch(self):
492 try: 493 if self._transport is None: 494 if self.connection._connected and not self.connection.error: 495 self.connect() 496 else: 497 self.engine.dispatch() 498 except HeartbeatTimeout, e: 499 self.close_engine(e) 500 except: 501 # XXX: Does socket get leaked if this occurs? 502 msg = compat.format_exc() 503 self.connection.error = InternalError(text=msg)
504
505 - def connect(self):
506 if self._retrying and time.time() < self._next_retry: 507 return 508 509 try: 510 # XXX: should make this non blocking 511 host, port = self._next_host() 512 if self._retrying and self._reconnect_log: 513 log.warn("trying: %s:%s", host, port) 514 self.engine = Engine(self.connection) 515 self.engine.open() 516 rawlog.debug("OPEN[%s]: %s:%s", self.log_id, host, port) 517 trans = transports.TRANSPORTS.get(self.connection.transport) 518 if trans: 519 self._transport = trans(self.connection, host, port) 520 else: 521 raise ConnectError("no such transport: %s" % self.connection.transport) 522 if self._retrying and self._reconnect_log: 523 log.warn("reconnect succeeded: %s:%s", host, port) 524 self._next_retry = None 525 self._attempts = 0 526 self._delay = self.connection.reconnect_interval_min 527 self._retrying = False 528 self.schedule() 529 except socket.error, e: 530 self.close_engine(ConnectError(text=str(e)))
531 532 DEFAULT_DISPOSITION = Disposition(None)
533 534 -def get_bindings(opts, queue=None, exchange=None, key=None):
535 bindings = opts.get("x-bindings", []) 536 cmds = [] 537 for b in bindings: 538 exchange = b.get("exchange", exchange) 539 queue = b.get("queue", queue) 540 key = b.get("key", key) 541 args = b.get("arguments", {}) 542 cmds.append(ExchangeBind(queue, exchange, key, args)) 543 return cmds
544 545 CONNECTION_ERRS = { 546 # anythong not here (i.e. everything right now) will default to 547 # connection error 548 } 549 550 SESSION_ERRS = { 551 # anything not here will default to session error 552 error_code.unauthorized_access: UnauthorizedAccess, 553 error_code.not_found: NotFound, 554 error_code.resource_locked: ReceiverError, 555 error_code.resource_limit_exceeded: TargetCapacityExceeded, 556 error_code.internal_error: ServerError 557 }
558 559 -class Engine:
560
561 - def __init__(self, connection):
562 self.connection = connection 563 self.log_id = "%x" % id(self.connection) 564 self._closing = False 565 self._connected = False 566 self._attachments = {} 567 568 self._in = LinkIn() 569 self._out = LinkOut() 570 571 self._channel_max = 65536 572 self._channels = 0 573 self._sessions = {} 574 575 self.address_cache = Cache(self.connection.address_ttl) 576 577 self._status = CLOSED 578 self._buf = "" 579 self._hdr = "" 580 self._last_in = None 581 self._last_out = None 582 self._op_enc = OpEncoder() 583 self._seg_enc = SegmentEncoder() 584 self._frame_enc = FrameEncoder() 585 self._frame_dec = FrameDecoder() 586 self._seg_dec = SegmentDecoder() 587 self._op_dec = OpDecoder() 588 589 self._sasl = sasl.Client() 590 if self.connection.username: 591 self._sasl.setAttr("username", self.connection.username) 592 if self.connection.password: 593 self._sasl.setAttr("password", self.connection.password) 594 if self.connection.host: 595 self._sasl.setAttr("host", self.connection.host) 596 self._sasl.setAttr("service", self.connection.sasl_service) 597 if self.connection.sasl_min_ssf is not None: 598 self._sasl.setAttr("minssf", self.connection.sasl_min_ssf) 599 if self.connection.sasl_max_ssf is not None: 600 self._sasl.setAttr("maxssf", self.connection.sasl_max_ssf) 601 self._sasl.init() 602 self._sasl_encode = False 603 self._sasl_decode = False
604
605 - def _reset(self):
606 self.connection._transport_connected = False 607 608 for ssn in self.connection.sessions.values(): 609 for m in ssn.acked + ssn.unacked + ssn.incoming: 610 m._transfer_id = None 611 for snd in ssn.senders: 612 snd.linked = False 613 for rcv in ssn.receivers: 614 rcv.impending = rcv.received 615 rcv.linked = False
616
617 - def status(self):
618 return self._status
619
620 - def write(self, data):
621 self._last_in = time.time() 622 try: 623 if self._sasl_decode: 624 data = self._sasl.decode(data) 625 626 if len(self._hdr) < 8: 627 r = 8 - len(self._hdr) 628 self._hdr += data[:r] 629 data = data[r:] 630 631 if len(self._hdr) == 8: 632 self.do_header(self._hdr) 633 634 self._frame_dec.write(data) 635 self._seg_dec.write(*self._frame_dec.read()) 636 self._op_dec.write(*self._seg_dec.read()) 637 for op in self._op_dec.read(): 638 self.assign_id(op) 639 opslog.debug("RCVD[%s]: %r", self.log_id, op) 640 op.dispatch(self) 641 self.dispatch() 642 except MessagingError, e: 643 self.close(e) 644 except: 645 self.close(InternalError(text=compat.format_exc()))
646
647 - def close(self, e=None):
648 self._reset() 649 if e: 650 self.connection.error = e 651 self._status = CLOSED
652
653 - def assign_id(self, op):
654 if isinstance(op, Command): 655 sst = self.get_sst(op) 656 op.id = sst.received 657 sst.received += 1
658
659 - def pending(self):
660 return len(self._buf)
661
662 - def read(self, n):
663 result = self._buf[:n] 664 self._buf = self._buf[n:] 665 return result
666
667 - def peek(self):
668 return self._buf
669
670 - def write_op(self, op):
671 opslog.debug("SENT[%s]: %r", self.log_id, op) 672 self._op_enc.write(op) 673 self._seg_enc.write(*self._op_enc.read()) 674 self._frame_enc.write(*self._seg_enc.read()) 675 bytes = self._frame_enc.read() 676 if self._sasl_encode: 677 bytes = self._sasl.encode(bytes) 678 self._buf += bytes 679 self._last_out = time.time()
680
681 - def do_header(self, hdr):
682 cli_major = 0; cli_minor = 10 683 magic, _, _, major, minor = struct.unpack(HEADER, hdr) 684 if major != cli_major or minor != cli_minor: 685 raise VersionError(text="client: %s-%s, server: %s-%s" % 686 (cli_major, cli_minor, major, minor))
687
688 - def do_connection_start(self, start):
689 if self.connection.sasl_mechanisms: 690 permitted = self.connection.sasl_mechanisms.split() 691 mechs = [m for m in start.mechanisms if m in permitted] 692 else: 693 mechs = start.mechanisms 694 try: 695 mech, initial = self._sasl.start(" ".join(mechs)) 696 except sasl.SASLError, e: 697 raise AuthenticationFailure(text=str(e)) 698 699 client_properties = get_client_properties_with_defaults(provided_client_properties=self.connection.client_properties); 700 self.write_op(ConnectionStartOk(client_properties=client_properties, 701 mechanism=mech, response=initial))
702
703 - def do_connection_secure(self, secure):
704 resp = self._sasl.step(secure.challenge) 705 self.write_op(ConnectionSecureOk(response=resp))
706
707 - def do_connection_tune(self, tune):
708 # XXX: is heartbeat protocol specific? 709 if tune.channel_max is not None: 710 self.channel_max = tune.channel_max 711 self.write_op(ConnectionTuneOk(heartbeat=self.connection.heartbeat, 712 channel_max=self.channel_max)) 713 self.write_op(ConnectionOpen()) 714 self._sasl_encode = True
715
716 - def do_connection_open_ok(self, open_ok):
717 self.connection.auth_username = self._sasl.auth_username() 718 self._connected = True 719 self._sasl_decode = True 720 self.connection._transport_connected = True
721
722 - def do_connection_heartbeat(self, hrt):
723 pass
724
725 - def do_connection_close(self, close):
726 self.write_op(ConnectionCloseOk()) 727 if close.reply_code != close_code.normal: 728 exc = CONNECTION_ERRS.get(close.reply_code, ConnectionError) 729 self.connection.error = exc(close.reply_code, close.reply_text)
730 # XXX: should we do a half shutdown on the socket here? 731 # XXX: we really need to test this, we may end up reporting a 732 # connection abort after this, if we were to do a shutdown on read 733 # and stop reading, then we wouldn't report the abort, that's 734 # probably the right thing to do 735
736 - def do_connection_close_ok(self, close_ok):
737 self.close()
738
739 - def do_session_attached(self, atc):
740 pass
741
742 - def do_session_command_point(self, cp):
743 sst = self.get_sst(cp) 744 sst.received = cp.command_id
745
746 - def do_session_completed(self, sc):
747 sst = self.get_sst(sc) 748 for r in sc.commands: 749 sst.acknowledged.add(r.lower, r.upper) 750 751 if not sc.commands.empty(): 752 while sst.min_completion in sc.commands: 753 if sst.actions.has_key(sst.min_completion): 754 sst.actions.pop(sst.min_completion)() 755 sst.min_completion += 1
756
757 - def session_known_completed(self, kcmp):
758 sst = self.get_sst(kcmp) 759 executed = RangedSet() 760 for e in sst.executed.ranges: 761 for ke in kcmp.ranges: 762 if e.lower in ke and e.upper in ke: 763 break 764 else: 765 executed.add_range(e) 766 sst.executed = completed
767
768 - def do_session_flush(self, sf):
769 sst = self.get_sst(sf) 770 if sf.expected: 771 if sst.received is None: 772 exp = None 773 else: 774 exp = RangedSet(sst.received) 775 sst.write_op(SessionExpected(exp)) 776 if sf.confirmed: 777 sst.write_op(SessionConfirmed(sst.executed)) 778 if sf.completed: 779 sst.write_op(SessionCompleted(sst.executed))
780
781 - def do_session_request_timeout(self, rt):
782 sst = self.get_sst(rt) 783 sst.write_op(SessionTimeout(timeout=0))
784
785 - def do_execution_result(self, er):
786 sst = self.get_sst(er) 787 sst.results[er.command_id] = er.value 788 sst.executed.add(er.id)
789
790 - def do_execution_exception(self, ex):
791 sst = self.get_sst(ex) 792 exc = SESSION_ERRS.get(ex.error_code, SessionError) 793 sst.session.error = exc(ex.error_code, ex.description)
794
795 - def dispatch(self):
796 if not self.connection._connected and not self._closing and self._status != CLOSED: 797 self.disconnect() 798 799 if self._connected and not self._closing: 800 for ssn in self.connection.sessions.values(): 801 self.attach(ssn) 802 self.process(ssn) 803 804 if self.connection.heartbeat and self._status != CLOSED: 805 now = time.time() 806 if self._last_in is not None and \ 807 now - self._last_in > 2*self.connection.heartbeat: 808 raise HeartbeatTimeout(text="heartbeat timeout") 809 if self._last_out is None or now - self._last_out >= self.connection.heartbeat/2.0: 810 self.write_op(ConnectionHeartbeat())
811
812 - def open(self):
813 self._reset() 814 self._status = OPEN 815 self._buf += struct.pack(HEADER, "AMQP", 1, 1, 0, 10)
816
817 - def disconnect(self):
818 self.write_op(ConnectionClose(close_code.normal)) 819 self._closing = True
820
821 - def attach(self, ssn):
822 if ssn.closed: return 823 sst = self._attachments.get(ssn) 824 if sst is None: 825 for i in xrange(0, self.channel_max): 826 if not self._sessions.has_key(i): 827 ch = i 828 break 829 else: 830 raise RuntimeError("all channels used") 831 sst = SessionState(self, ssn, ssn.name, ch) 832 sst.write_op(SessionAttach(name=ssn.name)) 833 sst.write_op(SessionCommandPoint(sst.sent, 0)) 834 sst.outgoing_idx = 0 835 sst.acked = [] 836 sst.acked_idx = 0 837 if ssn.transactional: 838 sst.write_cmd(TxSelect()) 839 self._attachments[ssn] = sst 840 self._sessions[sst.channel] = sst 841 842 for snd in ssn.senders: 843 self.link(snd, self._out, snd.target) 844 for rcv in ssn.receivers: 845 self.link(rcv, self._in, rcv.source) 846 847 if sst is not None and ssn.closing and not sst.detached: 848 sst.detached = True 849 sst.write_op(SessionDetach(name=ssn.name))
850
851 - def get_sst(self, op):
852 return self._sessions[op.channel]
853
854 - def do_session_detached(self, dtc):
855 sst = self._sessions.pop(dtc.channel) 856 ssn = sst.session 857 del self._attachments[ssn] 858 ssn.closed = True
859
860 - def do_session_detach(self, dtc):
861 sst = self.get_sst(dtc) 862 sst.write_op(SessionDetached(name=dtc.name)) 863 self.do_session_detached(dtc)
864 882 883 def resolved(type, subtype): 884 dir.do_link(sst, lnk, _lnk, type, subtype, linked)
885 886 self.resolve_declare(sst, _lnk, dir.DIR_NAME, resolved) 887 self._attachments[lnk] = _lnk 888 889 if lnk.linked and lnk.closing and not lnk.closed: 890 if not _lnk.closing: 891 def unlinked(): 892 dir.del_link(sst, lnk, _lnk) 893 del self._attachments[lnk] 894 lnk.closed = True 895 if _lnk.options.get("delete") in ("always", dir.DIR_NAME): 896 dir.do_unlink(sst, lnk, _lnk) 897 self.delete(sst, _lnk.name, unlinked) 898 else: 899 dir.do_unlink(sst, lnk, _lnk, unlinked) 900 _lnk.closing = True 901 elif not lnk.linked and lnk.closing and not lnk.closed: 902 if lnk.error: lnk.closed = True 903
904 - def parse_address(self, lnk, dir, addr):
905 if addr is None: 906 return MalformedAddress(text="%s is None" % dir.ADDR_NAME) 907 else: 908 try: 909 lnk.name, lnk.subject, lnk.options = address.parse(addr) 910 # XXX: subject 911 if lnk.options is None: 912 lnk.options = {} 913 except address.LexError, e: 914 return MalformedAddress(text=str(e)) 915 except address.ParseError, e: 916 return MalformedAddress(text=str(e))
917
918 - def validate_options(self, lnk, dir):
919 ctx = Context() 920 err = dir.VALIDATOR.validate(lnk.options, ctx) 921 if err: return InvalidOption(text="error in options: %s" % err)
922
923 - def resolve_declare(self, sst, lnk, dir, action):
924 declare = lnk.options.get("create") in ("always", dir) 925 assrt = lnk.options.get("assert") in ("always", dir) 926 def do_resolved(type, subtype): 927 err = None 928 if type is None: 929 if declare: 930 err = self.declare(sst, lnk, action) 931 else: 932 err = NotFound(text="no such queue: %s" % lnk.name) 933 else: 934 if assrt: 935 expected = lnk.options.get("node", {}).get("type") 936 if expected and type != expected: 937 err = AssertionFailed(text="expected %s, got %s" % (expected, type)) 938 if err is None: 939 action(type, subtype) 940 941 if err: 942 tgt = lnk.target 943 tgt.error = err 944 del self._attachments[tgt] 945 tgt.closed = True 946 return
947 self.resolve(sst, lnk.name, do_resolved, force=declare) 948
949 - def resolve(self, sst, name, action, force=False):
950 if not force: 951 try: 952 type, subtype = self.address_cache[name] 953 action(type, subtype) 954 return 955 except KeyError: 956 pass 957 958 args = [] 959 def do_result(r): 960 args.append(r)
961 def do_action(r): 962 do_result(r) 963 er, qr = args 964 if er.not_found and not qr.queue: 965 type, subtype = None, None 966 elif qr.queue: 967 type, subtype = "queue", None 968 else: 969 type, subtype = "topic", er.type 970 if type is not None: 971 self.address_cache[name] = (type, subtype) 972 action(type, subtype) 973 sst.write_query(ExchangeQuery(name), do_result) 974 sst.write_query(QueueQuery(name), do_action) 975
976 - def declare(self, sst, lnk, action):
977 name = lnk.name 978 props = lnk.options.get("node", {}) 979 durable = props.get("durable", DURABLE_DEFAULT) 980 type = props.get("type", "queue") 981 declare = props.get("x-declare", {}) 982 983 if type == "topic": 984 cmd = ExchangeDeclare(exchange=name, durable=durable) 985 bindings = get_bindings(props, exchange=name) 986 elif type == "queue": 987 cmd = QueueDeclare(queue=name, durable=durable) 988 bindings = get_bindings(props, queue=name) 989 else: 990 raise ValueError(type) 991 992 sst.apply_overrides(cmd, declare) 993 994 if type == "topic": 995 if cmd.type is None: 996 cmd.type = "topic" 997 subtype = cmd.type 998 else: 999 subtype = None 1000 1001 cmds = [cmd] 1002 cmds.extend(bindings) 1003 1004 def declared(): 1005 self.address_cache[name] = (type, subtype) 1006 action(type, subtype)
1007 1008 sst.write_cmds(cmds, declared) 1009
1010 - def delete(self, sst, name, action):
1011 def deleted(): 1012 del self.address_cache[name] 1013 action()
1014 1015 def do_delete(type, subtype): 1016 if type == "topic": 1017 sst.write_cmd(ExchangeDelete(name), deleted) 1018 elif type == "queue": 1019 sst.write_cmd(QueueDelete(name), deleted) 1020 elif type is None: 1021 action() 1022 else: 1023 raise ValueError(type) 1024 self.resolve(sst, name, do_delete, force=True) 1025
1026 - def process(self, ssn):
1027 if ssn.closed or ssn.closing: return 1028 1029 sst = self._attachments[ssn] 1030 1031 while sst.outgoing_idx < len(ssn.outgoing): 1032 msg = ssn.outgoing[sst.outgoing_idx] 1033 snd = msg._sender 1034 # XXX: should check for sender error here 1035 _snd = self._attachments.get(snd) 1036 if _snd and snd.linked: 1037 self.send(snd, msg) 1038 sst.outgoing_idx += 1 1039 else: 1040 break 1041 1042 for snd in ssn.senders: 1043 # XXX: should included snd.acked in this 1044 if snd.synced >= snd.queued and sst.need_sync: 1045 sst.write_cmd(ExecutionSync(), sync_noop) 1046 1047 for rcv in ssn.receivers: 1048 self.process_receiver(rcv) 1049 1050 if ssn.acked: 1051 messages = ssn.acked[sst.acked_idx:] 1052 if messages: 1053 ids = RangedSet() 1054 1055 disposed = [(DEFAULT_DISPOSITION, [])] 1056 acked = [] 1057 for m in messages: 1058 # XXX: we're ignoring acks that get lost when disconnected, 1059 # could we deal this via some message-id based purge? 1060 if m._transfer_id is None: 1061 acked.append(m) 1062 continue 1063 ids.add(m._transfer_id) 1064 if m._receiver._accept_mode is accept_mode.explicit: 1065 disp = m._disposition or DEFAULT_DISPOSITION 1066 last, msgs = disposed[-1] 1067 if disp.type is last.type and disp.options == last.options: 1068 msgs.append(m) 1069 else: 1070 disposed.append((disp, [m])) 1071 else: 1072 acked.append(m) 1073 1074 for range in ids: 1075 sst.executed.add_range(range) 1076 sst.write_op(SessionCompleted(sst.executed)) 1077 1078 def ack_acker(msgs): 1079 def ack_ack(): 1080 for m in msgs: 1081 ssn.acked.remove(m) 1082 sst.acked_idx -= 1 1083 # XXX: should this check accept_mode too? 1084 if not ssn.transactional: 1085 sst.acked.remove(m)
1086 return ack_ack 1087 1088 for disp, msgs in disposed: 1089 if not msgs: continue 1090 if disp.type is None: 1091 op = MessageAccept 1092 elif disp.type is RELEASED: 1093 op = MessageRelease 1094 elif disp.type is REJECTED: 1095 op = MessageReject 1096 sst.write_cmd(op(RangedSet(*[m._transfer_id for m in msgs]), 1097 **disp.options), 1098 ack_acker(msgs)) 1099 if log.isEnabledFor(DEBUG): 1100 for m in msgs: 1101 log.debug("SACK[%s]: %s, %s", ssn.log_id, m, m._disposition) 1102 1103 sst.acked.extend(messages) 1104 sst.acked_idx += len(messages) 1105 ack_acker(acked)() 1106 1107 if ssn.committing and not sst.committing: 1108 def commit_ok(): 1109 del sst.acked[:] 1110 ssn.committing = False 1111 ssn.committed = True 1112 ssn.aborting = False 1113 ssn.aborted = False 1114 sst.committing = False 1115 sst.write_cmd(TxCommit(), commit_ok) 1116 sst.committing = True 1117 1118 if ssn.aborting and not sst.aborting: 1119 sst.aborting = True 1120 def do_rb(): 1121 messages = sst.acked + ssn.unacked + ssn.incoming 1122 ids = RangedSet(*[m._transfer_id for m in messages]) 1123 for range in ids: 1124 sst.executed.add_range(range) 1125 sst.write_op(SessionCompleted(sst.executed)) 1126 sst.write_cmd(MessageRelease(ids, True)) 1127 sst.write_cmd(TxRollback(), do_rb_ok) 1128 1129 def do_rb_ok(): 1130 del ssn.incoming[:] 1131 del ssn.unacked[:] 1132 del sst.acked[:] 1133 1134 for rcv in ssn.receivers: 1135 rcv.impending = rcv.received 1136 rcv.returned = rcv.received 1137 # XXX: do we need to update granted here as well? 1138 1139 for rcv in ssn.receivers: 1140 self.process_receiver(rcv) 1141 1142 ssn.aborting = False 1143 ssn.aborted = True 1144 ssn.committing = False 1145 ssn.committed = False 1146 sst.aborting = False 1147 1148 for rcv in ssn.receivers: 1149 _rcv = self._attachments[rcv] 1150 sst.write_cmd(MessageStop(_rcv.destination)) 1151 sst.write_cmd(ExecutionSync(), do_rb) 1152
1153 - def grant(self, rcv):
1154 sst = self._attachments[rcv.session] 1155 _rcv = self._attachments.get(rcv) 1156 if _rcv is None or not rcv.linked or _rcv.closing or _rcv.draining: 1157 return 1158 1159 if rcv.granted is UNLIMITED: 1160 if rcv.impending is UNLIMITED: 1161 delta = 0 1162 else: 1163 delta = UNLIMITED 1164 elif rcv.impending is UNLIMITED: 1165 delta = -1 1166 else: 1167 delta = max(rcv.granted, rcv.received) - rcv.impending 1168 1169 if delta is UNLIMITED: 1170 if not _rcv.bytes_open: 1171 sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.byte, UNLIMITED.value)) 1172 _rcv.bytes_open = True 1173 sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.message, UNLIMITED.value)) 1174 rcv.impending = UNLIMITED 1175 elif delta > 0: 1176 if not _rcv.bytes_open: 1177 sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.byte, UNLIMITED.value)) 1178 _rcv.bytes_open = True 1179 sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.message, delta)) 1180 rcv.impending += delta 1181 elif delta < 0 and not rcv.draining: 1182 _rcv.draining = True 1183 def do_stop(): 1184 rcv.impending = rcv.received 1185 _rcv.draining = False 1186 _rcv.bytes_open = False 1187 self.grant(rcv)
1188 sst.write_cmd(MessageStop(_rcv.destination), do_stop) 1189 1190 if rcv.draining: 1191 _rcv.draining = True 1192 def do_flush(): 1193 rcv.impending = rcv.received 1194 rcv.granted = rcv.impending 1195 _rcv.draining = False 1196 _rcv.bytes_open = False 1197 rcv.draining = False 1198 sst.write_cmd(MessageFlush(_rcv.destination), do_flush) 1199 1200
1201 - def process_receiver(self, rcv):
1202 if rcv.closed: return 1203 self.grant(rcv)
1204
1205 - def send(self, snd, msg):
1206 sst = self._attachments[snd.session] 1207 _snd = self._attachments[snd] 1208 1209 if msg.subject is None or _snd._exchange == "": 1210 rk = _snd._routing_key 1211 else: 1212 rk = msg.subject 1213 1214 if msg.subject is None: 1215 subject = _snd.subject 1216 else: 1217 subject = msg.subject 1218 1219 # XXX: do we need to query to figure out how to create the reply-to interoperably? 1220 if msg.reply_to: 1221 rt = addr2reply_to(msg.reply_to) 1222 else: 1223 rt = None 1224 content_encoding = msg.properties.get("x-amqp-0-10.content-encoding") 1225 dp = DeliveryProperties(routing_key=rk) 1226 mp = MessageProperties(message_id=msg.id, 1227 user_id=msg.user_id, 1228 reply_to=rt, 1229 correlation_id=msg.correlation_id, 1230 app_id = msg.properties.get("x-amqp-0-10.app-id"), 1231 content_type=msg.content_type, 1232 content_encoding=content_encoding, 1233 application_headers=msg.properties) 1234 if subject is not None: 1235 if mp.application_headers is None: 1236 mp.application_headers = {} 1237 mp.application_headers[SUBJECT] = subject 1238 if msg.durable is not None: 1239 if msg.durable: 1240 dp.delivery_mode = delivery_mode.persistent 1241 else: 1242 dp.delivery_mode = delivery_mode.non_persistent 1243 if msg.priority is not None: 1244 dp.priority = msg.priority 1245 if msg.ttl is not None: 1246 dp.ttl = long(msg.ttl*1000) 1247 enc, dec = get_codec(msg.content_type) 1248 body = enc(msg.content) 1249 1250 # XXX: this is not safe for out of order, can this be triggered by pre_ack? 1251 def msg_acked(): 1252 # XXX: should we log the ack somehow too? 1253 snd.acked += 1 1254 m = snd.session.outgoing.pop(0) 1255 sst.outgoing_idx -= 1 1256 log.debug("RACK[%s]: %s", sst.session.log_id, msg) 1257 assert msg == m
1258 1259 xfr = MessageTransfer(destination=_snd._exchange, headers=(dp, mp), 1260 payload=body) 1261 1262 if _snd.pre_ack: 1263 sst.write_cmd(xfr) 1264 else: 1265 sst.write_cmd(xfr, msg_acked, sync=msg._sync) 1266 1267 log.debug("SENT[%s]: %s", sst.session.log_id, msg) 1268 1269 if _snd.pre_ack: 1270 msg_acked() 1271
1272 - def do_message_transfer(self, xfr):
1273 sst = self.get_sst(xfr) 1274 ssn = sst.session 1275 1276 msg = self._decode(xfr) 1277 rcv = sst.destinations[xfr.destination].target 1278 msg._receiver = rcv 1279 if rcv.impending is not UNLIMITED: 1280 assert rcv.received < rcv.impending, "%s, %s" % (rcv.received, rcv.impending) 1281 rcv.received += 1 1282 log.debug("RCVD[%s]: %s", ssn.log_id, msg) 1283 ssn.incoming.append(msg)
1284
1285 - def _decode(self, xfr):
1286 dp = EMPTY_DP 1287 mp = EMPTY_MP 1288 1289 for h in xfr.headers: 1290 if isinstance(h, DeliveryProperties): 1291 dp = h 1292 elif isinstance(h, MessageProperties): 1293 mp = h 1294 1295 ap = mp.application_headers 1296 enc, dec = get_codec(mp.content_type) 1297 content = dec(xfr.payload) 1298 msg = Message(content) 1299 msg.id = mp.message_id 1300 if ap is not None: 1301 msg.subject = ap.get(SUBJECT) 1302 msg.user_id = mp.user_id 1303 if mp.reply_to is not None: 1304 msg.reply_to = reply_to2addr(mp.reply_to) 1305 msg.correlation_id = mp.correlation_id 1306 if dp.delivery_mode is not None: 1307 msg.durable = dp.delivery_mode == delivery_mode.persistent 1308 msg.priority = dp.priority 1309 if dp.ttl is not None: 1310 msg.ttl = dp.ttl/1000.0 1311 msg.redelivered = dp.redelivered 1312 msg.properties = mp.application_headers or {} 1313 if mp.app_id is not None: 1314 msg.properties["x-amqp-0-10.app-id"] = mp.app_id 1315 if mp.content_encoding is not None: 1316 msg.properties["x-amqp-0-10.content-encoding"] = mp.content_encoding 1317 if dp.routing_key is not None: 1318 msg.properties["x-amqp-0-10.routing-key"] = dp.routing_key 1319 if dp.timestamp is not None: 1320 msg.properties["x-amqp-0-10.timestamp"] = dp.timestamp 1321 msg.content_type = mp.content_type 1322 msg._transfer_id = xfr.id 1323 return msg
1324