1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 import socket, struct, sys, time
21 from logging import getLogger, DEBUG
22 from qpid import compat
23 from qpid import sasl
24 from qpid.concurrency import synchronized
25 from qpid.datatypes import RangedSet, Serial
26 from qpid.framing import OpEncoder, SegmentEncoder, FrameEncoder, \
27 FrameDecoder, SegmentDecoder, OpDecoder
28 from qpid.messaging import address, transports
29 from qpid.messaging.constants import UNLIMITED, REJECTED, RELEASED
30 from qpid.messaging.exceptions import *
31 from qpid.messaging.message import get_codec, Disposition, Message
32 from qpid.messaging.endpoints import MangledString
33 from qpid.ops import *
34 from qpid.selector import Selector
35 from qpid.util import URL, default,get_client_properties_with_defaults
36 from qpid.validator import And, Context, List, Map, Types, Values
37 from threading import Condition, Thread
38
39 log = getLogger("qpid.messaging")
40 rawlog = getLogger("qpid.messaging.io.raw")
41 opslog = getLogger("qpid.messaging.io.ops")
44 name, subject, options = address.parse(addr)
45 if options:
46 type = options.get("node", {}).get("type")
47 else:
48 type = None
49
50 if type == "topic":
51 return ReplyTo(name, subject)
52 else:
53 return ReplyTo(None, name)
54
56 if reply_to.exchange in (None, ""):
57 return reply_to.routing_key
58 elif reply_to.routing_key is None:
59 return "%s; {node: {type: topic}}" % reply_to.exchange
60 else:
61 return "%s/%s; {node: {type: topic}}" % (reply_to.exchange, reply_to.routing_key)
62
67
68
69
70 DURABLE_DEFAULT=False
75 """
76 The pattern filter matches the supplied wildcard pattern against a
77 message subject.
78 """
79
82
83
84 - def _bind(self, sst, exchange, queue):
85 from qpid.ops import ExchangeBind
86
87 sst.write_cmd(ExchangeBind(exchange=exchange, queue=queue,
88 binding_key=self.value.replace("*", "#")))
89
90 SUBJECT_DEFAULTS = {
91 "topic": "#"
92 }
96
98
99 - def __init__(self, driver, session, name, channel):
100 self.driver = driver
101 self.session = session
102 self.name = name
103 self.channel = channel
104 self.detached = False
105 self.committing = False
106 self.aborting = False
107
108
109 self.sent = Serial(0)
110 self.acknowledged = RangedSet()
111 self.actions = {}
112 self.min_completion = self.sent
113 self.max_completion = self.sent
114 self.results = {}
115 self.need_sync = False
116
117
118 self.received = None
119 self.executed = RangedSet()
120
121
122
123 self.destinations = {}
124
126 id = self.sent
127 self.write_cmd(query, lambda: handler(self.results.pop(id), obj))
128
130 for k, v in overrides.items():
131 cmd[k.replace('-', '_')] = v
132
133 - def write_cmd(self, cmd, action=noop, overrides=None, sync=True):
134 if overrides:
135 self.apply_overrides(cmd, overrides)
136
137 if action != noop:
138 cmd.sync = sync
139 if self.detached:
140 raise Exception("detached")
141 cmd.id = self.sent
142 self.sent += 1
143 self.actions[cmd.id] = action
144 self.max_completion = cmd.id
145 self.write_op(cmd)
146 self.need_sync = not cmd.sync
147
149 if cmds:
150 for cmd in cmds[:-1]:
151 self.write_cmd(cmd)
152 self.write_cmd(cmds[-1], action)
153 else:
154 action()
155
159
160 POLICIES = Values("always", "sender", "receiver", "never")
161 RELIABILITY = Values("unreliable", "at-most-once", "at-least-once",
162 "exactly-once")
163
164 DECLARE = Map({}, restricted=False)
165 BINDINGS = List(Map({
166 "exchange": Types(basestring),
167 "queue": Types(basestring),
168 "key": Types(basestring),
169 "arguments": Map({}, restricted=False)
170 }))
171
172 COMMON_OPTS = {
173 "create": POLICIES,
174 "delete": POLICIES,
175 "assert": POLICIES,
176 "node": Map({
177 "type": Values("queue", "topic"),
178 "durable": Types(bool),
179 "x-declare": DECLARE,
180 "x-bindings": BINDINGS
181 }),
182 "link": Map({
183 "name": Types(basestring),
184 "durable": Types(bool),
185 "reliability": RELIABILITY,
186 "x-declare": DECLARE,
187 "x-bindings": BINDINGS,
188 "x-subscribe": Map({}, restricted=False)
189 })
190 }
191
192 RECEIVE_MODES = Values("browse", "consume")
193
194 SOURCE_OPTS = COMMON_OPTS.copy()
195 SOURCE_OPTS.update({
196 "mode": RECEIVE_MODES
197 })
198
199 TARGET_OPTS = COMMON_OPTS.copy()
202
203 ADDR_NAME = "source"
204 DIR_NAME = "receiver"
205 VALIDATOR = Map(SOURCE_OPTS)
206
208 _rcv.destination = str(rcv.id)
209 sst.destinations[_rcv.destination] = _rcv
210 _rcv.draining = False
211 _rcv.bytes_open = False
212 _rcv.on_unlink = []
213
214 - def do_link(self, sst, rcv, _rcv, type, subtype, action):
215 link_opts = _rcv.options.get("link", {})
216 if type == "topic":
217 default_reliability = "unreliable"
218 else:
219 default_reliability = "at-least-once"
220 reliability = link_opts.get("reliability", default_reliability)
221 declare = link_opts.get("x-declare", {})
222 subscribe = link_opts.get("x-subscribe", {})
223 acq_mode = acquire_mode.pre_acquired
224 if reliability in ("unreliable", "at-most-once"):
225 rcv._accept_mode = accept_mode.none
226 else:
227 rcv._accept_mode = accept_mode.explicit
228
229 if type == "topic":
230 default_name = "%s.%s" % (rcv.session.name, _rcv.destination)
231 _rcv._queue = link_opts.get("name", default_name)
232 sst.write_cmd(QueueDeclare(queue=_rcv._queue,
233 durable=link_opts.get("durable", False),
234 exclusive=True,
235 auto_delete=(reliability == "unreliable")),
236 overrides=declare)
237 if declare.get("exclusive", True): _rcv.on_unlink = [QueueDelete(_rcv._queue)]
238 subject = _rcv.subject or SUBJECT_DEFAULTS.get(subtype)
239 bindings = get_bindings(link_opts, _rcv._queue, _rcv.name, subject)
240 if not bindings:
241 sst.write_cmd(ExchangeBind(_rcv._queue, _rcv.name, subject))
242
243 elif type == "queue":
244 _rcv._queue = _rcv.name
245 if _rcv.options.get("mode", "consume") == "browse":
246 acq_mode = acquire_mode.not_acquired
247 bindings = get_bindings(link_opts, queue=_rcv._queue)
248
249
250 sst.write_cmds(bindings)
251 sst.write_cmd(MessageSubscribe(queue=_rcv._queue,
252 destination=_rcv.destination,
253 acquire_mode = acq_mode,
254 accept_mode = rcv._accept_mode),
255 overrides=subscribe)
256 sst.write_cmd(MessageSetFlowMode(_rcv.destination, flow_mode.credit), action)
257
259 link_opts = _rcv.options.get("link", {})
260 reliability = link_opts.get("reliability")
261 cmds = [MessageCancel(_rcv.destination)]
262 cmds.extend(_rcv.on_unlink)
263 msgs = []
264 msg = rcv.session._pop(rcv)
265 while msg is not None:
266 msgs.append(msg)
267 msg = rcv.session._pop(rcv)
268 if len(msgs) > 0:
269 ids = RangedSet(*[m._transfer_id for m in msgs])
270 log.debug("releasing back messages: %s, as receiver is closing", ids)
271 cmds.append(MessageRelease(ids, True))
272 sst.write_cmds(cmds, action)
273
275 del sst.destinations[_rcv.destination]
276
278
279 ADDR_NAME = "target"
280 DIR_NAME = "sender"
281 VALIDATOR = Map(TARGET_OPTS)
282
284 _snd.closing = False
285 _snd.pre_ack = False
286
287 - def do_link(self, sst, snd, _snd, type, subtype, action):
288 link_opts = _snd.options.get("link", {})
289 reliability = link_opts.get("reliability", "at-least-once")
290 _snd.pre_ack = reliability in ("unreliable", "at-most-once")
291 if type == "topic":
292 _snd._exchange = _snd.name
293 _snd._routing_key = _snd.subject
294 bindings = get_bindings(link_opts, exchange=_snd.name, key=_snd.subject)
295 elif type == "queue":
296 _snd._exchange = ""
297 _snd._routing_key = _snd.name
298 bindings = get_bindings(link_opts, queue=_snd.name)
299 sst.write_cmds(bindings, action)
300
303
306
308
310 self.ttl = ttl
311 self.entries = {}
312
314 self.entries[key] = time.time(), value
315
317 tstamp, value = self.entries[key]
318 if time.time() - tstamp >= self.ttl:
319 del self.entries[key]
320 raise KeyError(key)
321 else:
322 return value
323
325 del self.entries[key]
326
327
328 HEADER="!4s4B"
329
330 EMPTY_DP = DeliveryProperties()
331 EMPTY_MP = MessageProperties()
332
333 SUBJECT = "qpid.subject"
334
335 CLOSED = "CLOSED"
336 READ_ONLY = "READ_ONLY"
337 WRITE_ONLY = "WRITE_ONLY"
338 OPEN = "OPEN"
341
343 self.connection = connection
344 self.log_id = "%x" % id(self.connection)
345 self._lock = self.connection._lock
346
347 self._selector = Selector.default()
348 self._attempts = 0
349 self._delay = self.connection.reconnect_interval_min
350 self._reconnect_log = self.connection.reconnect_log
351 self._host = 0
352 self._retrying = False
353 self._next_retry = None
354 self._transport = None
355
356 self._timeout = None
357
358 self.engine = None
359
361 urls = [URL(u) for u in self.connection.reconnect_urls]
362 hosts = [(self.connection.host, default(self.connection.port, 5672))] + \
363 [(u.host, default(u.port, 5672)) for u in urls]
364 if self._host >= len(hosts):
365 self._host = 0
366 self._last_host = hosts[self._host]
367 if self._host == 0:
368 self._attempts += 1
369 self._host = self._host + 1
370 return self._last_host
371
373 return len(self.connection.reconnect_urls) + 1
374
375 @synchronized
379
381 self._selector.register(self)
382
384 self._selector.unregister(self)
385 if self._transport:
386 self.st_closed()
387
389 return self._transport.fileno()
390
391 @synchronized
393 """Called by the Selector I/O thread to determine if the driver needs to
394 wait on the arrival of network data (call self.readable() callback)
395 """
396 return self._transport is not None and \
397 self._transport.reading(True)
398
399 @synchronized
401 """Called by the Selector I/O thread to determine if it should block
402 waiting for output bandwidth (call the self.writeable() callback)
403 """
404 return self._transport is not None and \
405 self.engine is not None and \
406 self._transport.writing(self.engine.pending())
407
408 @synchronized
410 """Called by the Selector I/O thread to determine if it should wake up the
411 driver (call the timeout() callback
412 """
413 return self._timeout
414
415 @synchronized
416 - def abort(self, exc, info):
417 """Called if the Selector I/O thread hits an unrecoverable error and fails.
418 """
419 try:
420 self.connection.error = exc
421 log.error("I/O Thread Fatal error: %s\n%s" % (str(exc), info))
422 except:
423 pass
424
426 """We consider a reconnect to have suceeded only when we have received
427 open-ok from the peer.
428
429 If we declared success as soon as the transport connected, then we could get
430 into an infinite heartbeat loop if the remote process is hung and never
431 sends us any data. We would fail the connection after 2 missed heartbeats,
432 reconnect the transport, declare the reconnect ok, then fail again after 2
433 missed heartbeats and so on.
434 """
435 if self._retrying and self.engine is not None and self.engine._connected:
436 if self._reconnect_log:
437 log.warn("reconnect succeeded: %s:%s", *self._last_host)
438 self._next_retry = None
439 self._attempts = 0
440 self._delay = self.connection.reconnect_interval_min
441 self._retrying = False
442
443 @synchronized
445 if self.engine is None:
446 return
447 try:
448 data = self._transport.recv(64*1024)
449 if data is None:
450 return
451 elif data:
452 rawlog.debug("READ[%s]: %r", self.log_id, data)
453 self.engine.write(data)
454 self._check_retry_ok()
455 else:
456 self.close_engine()
457 except socket.error, e:
458 self.close_engine(ConnectionError(text=str(e)))
459
460 self.update_status()
461
462 self._notify()
463
465 if self.connection.error:
466 self.connection._condition.gc()
467 self.connection._waiter.notifyAll()
468
470 if e is None:
471 e = ConnectionError(text="connection aborted")
472
473 if (self.connection.reconnect and
474 (self.connection.reconnect_limit is None or
475 self.connection.reconnect_limit <= 0 or
476 self._attempts <= self.connection.reconnect_limit)):
477 if self._host < self._num_hosts():
478 delay = 0
479 else:
480 delay = self._delay
481 self._delay = min(2*self._delay,
482 self.connection.reconnect_interval_max)
483 self._next_retry = time.time() + delay
484 if self._reconnect_log:
485 log.warn("recoverable error[attempt %s]: %s" % (self._attempts, e))
486 if delay > 0:
487 log.warn("sleeping %s seconds" % delay)
488 self._retrying = True
489 self.engine.close()
490 else:
491 self.engine.close(e)
492 self.st_closed()
493
494 self.schedule()
495
497 if not self.engine: return False
498 status = self.engine.status()
499 return getattr(self, "st_%s" % status.lower())()
500
502
503
504 if self._transport: self._transport.close()
505 self._transport = None
506 self.engine = None
507 return True
508
511
512 @synchronized
514 if self.engine is None:
515 return
516 notify = False
517 try:
518 n = self._transport.send(self.engine.peek())
519 if n == 0: return
520 sent = self.engine.read(n)
521 rawlog.debug("SENT[%s]: %r", self.log_id, sent)
522 except socket.error, e:
523 self.close_engine(e)
524 notify = True
525
526 if self.update_status() or notify:
527 self._notify()
528
529 @synchronized
535
537 times = []
538 if self.connection.heartbeat:
539 times.append(time.time() + self.connection.heartbeat)
540 if self._next_retry:
541 times.append(self._next_retry)
542 if times:
543 self._timeout = min(times)
544 else:
545 self._timeout = None
546
548 try:
549 if self._transport is None:
550 if self.connection._connected and not self.connection.error:
551 self.connect()
552 elif self.engine is not None:
553 self.engine.dispatch()
554 except HeartbeatTimeout, e:
555 log.warn("Heartbeat timeout")
556 self.close_engine(e)
557 except ContentError, e:
558 msg = compat.format_exc()
559 self.connection.error = ContentError(text=msg)
560 except:
561
562 msg = compat.format_exc()
563 self.connection.error = InternalError(text=msg)
564
566 if self._retrying and time.time() < self._next_retry:
567 return
568
569 try:
570
571 host, port = self._next_host()
572 if self._retrying and self._reconnect_log:
573 log.warn("trying: %s:%s", host, port)
574 self.engine = Engine(self.connection)
575 self.engine.open()
576 rawlog.debug("OPEN[%s]: %s:%s", self.log_id, host, port)
577 trans = transports.TRANSPORTS.get(self.connection.transport)
578 if trans:
579 self._transport = trans(self.connection, host, port)
580 else:
581 raise ConnectError("no such transport: %s" % self.connection.transport)
582 self.schedule()
583 except socket.error, e:
584 self.close_engine(ConnectError(text=str(e)))
585
586 DEFAULT_DISPOSITION = Disposition(None)
587
588 -def get_bindings(opts, queue=None, exchange=None, key=None):
589 bindings = opts.get("x-bindings", [])
590 cmds = []
591 for b in bindings:
592 exchange = b.get("exchange", exchange)
593 queue = b.get("queue", queue)
594 key = b.get("key", key)
595 args = b.get("arguments", {})
596 cmds.append(ExchangeBind(queue, exchange, key, args))
597 return cmds
598
599 CONNECTION_ERRS = {
600
601
602 }
603
604 SESSION_ERRS = {
605
606 error_code.unauthorized_access: UnauthorizedAccess,
607 error_code.not_found: NotFound,
608 error_code.resource_locked: ReceiverError,
609 error_code.resource_limit_exceeded: TargetCapacityExceeded,
610 error_code.internal_error: ServerError
611 }
614
616 self.connection = connection
617 self.log_id = "%x" % id(self.connection)
618 self._closing = False
619 self._connected = False
620 self._reconnecting = bool(connection.sessions)
621 self._attachments = {}
622
623 self._in = LinkIn()
624 self._out = LinkOut()
625
626 self._channel_max = 65536
627 self._channels = 0
628 self._sessions = {}
629
630 self.address_cache = Cache(self.connection.address_ttl)
631
632 self._status = CLOSED
633 self._buf = ""
634 self._hdr = ""
635
636
637 self._last_in = time.time()
638 self._last_out = time.time()
639 self._op_enc = OpEncoder()
640 self._seg_enc = SegmentEncoder()
641 self._frame_enc = FrameEncoder()
642 self._frame_dec = FrameDecoder()
643 self._seg_dec = SegmentDecoder()
644 self._op_dec = OpDecoder()
645
646 self._sasl = sasl.Client()
647 if self.connection.username:
648 self._sasl.setAttr("username", self.connection.username)
649 if self.connection.password:
650 self._sasl.setAttr("password", self.connection.password)
651 if self.connection.host:
652 self._sasl.setAttr("host", self.connection.host)
653 self._sasl.setAttr("service", self.connection.sasl_service)
654 if self.connection.sasl_min_ssf is not None:
655 self._sasl.setAttr("minssf", self.connection.sasl_min_ssf)
656 if self.connection.sasl_max_ssf is not None:
657 self._sasl.setAttr("maxssf", self.connection.sasl_max_ssf)
658 self._sasl.init()
659 self._sasl_encode = False
660 self._sasl_decode = False
661
663 self.connection._transport_connected = False
664
665 for ssn in self.connection.sessions.values():
666 for m in ssn.acked + ssn.unacked + ssn.incoming:
667 m._transfer_id = None
668 for snd in ssn.senders:
669 snd.linked = False
670 for rcv in ssn.receivers:
671 rcv.impending = rcv.received
672 rcv.linked = False
673
676
678 self._last_in = time.time()
679 try:
680 if self._sasl_decode:
681 data = self._sasl.decode(data)
682
683 if len(self._hdr) < 8:
684 r = 8 - len(self._hdr)
685 self._hdr += data[:r]
686 data = data[r:]
687
688 if len(self._hdr) == 8:
689 self.do_header(self._hdr)
690
691 self._frame_dec.write(data)
692 self._seg_dec.write(*self._frame_dec.read())
693 self._op_dec.write(*self._seg_dec.read())
694 for op in self._op_dec.read():
695 self.assign_id(op)
696 opslog.debug("RCVD[%s]: %r", self.log_id, op)
697 op.dispatch(self)
698 self.dispatch()
699 except MessagingError, e:
700 self.close(e)
701 except:
702 self.close(InternalError(text=compat.format_exc()))
703
704 - def close(self, e=None):
705 self._reset()
706
707
708 for ssn in self.connection.sessions.values():
709 if ssn.transactional:
710 if ssn.committing:
711 ssn.error = TransactionUnknown(text="Transaction outcome unknown due to transport failure")
712 else:
713 ssn.error = TransactionAborted(text="Transaction aborted due to transport failure")
714 ssn.closed = True
715 if e:
716 self.connection.error = e
717 self._status = CLOSED
718
720 if isinstance(op, Command):
721 sst = self.get_sst(op)
722 op.id = sst.received
723 sst.received += 1
724
726 return len(self._buf)
727
729 result = self._buf[:n]
730 self._buf = self._buf[n:]
731 return result
732
735
737 opslog.debug("SENT[%s]: %r", self.log_id, op)
738 self._op_enc.write(op)
739 self._seg_enc.write(*self._op_enc.read())
740 self._frame_enc.write(*self._seg_enc.read())
741 bytes = self._frame_enc.read()
742 if self._sasl_encode:
743 bytes = self._sasl.encode(bytes)
744 self._buf += bytes
745 self._last_out = time.time()
746
748 cli_major = 0; cli_minor = 10
749 magic, _, _, major, minor = struct.unpack(HEADER, hdr)
750 if major != cli_major or minor != cli_minor:
751 raise VersionError(text="client: %s-%s, server: %s-%s" %
752 (cli_major, cli_minor, major, minor))
753
755 if self.connection.sasl_mechanisms:
756 permitted = self.connection.sasl_mechanisms.split()
757 mechs = [m for m in start.mechanisms if m in permitted]
758 else:
759 mechs = start.mechanisms
760 try:
761 mech, initial = self._sasl.start(" ".join(mechs))
762 except sasl.SASLError, e:
763 if "ANONYMOUS" not in mechs and self.connection.username is None:
764 _text="Anonymous connections disabled, missing credentials"
765 else:
766 _text=str(e)
767 raise AuthenticationFailure(text=_text)
768
769 client_properties = get_client_properties_with_defaults(provided_client_properties=self.connection.client_properties);
770 self.write_op(ConnectionStartOk(client_properties=client_properties,
771 mechanism=mech, response=initial))
772
774 resp = self._sasl.step(secure.challenge)
775 self.write_op(ConnectionSecureOk(response=resp))
776
778
779 if tune.channel_max is not None:
780 self.channel_max = tune.channel_max
781 self.write_op(ConnectionTuneOk(heartbeat=self.connection.heartbeat,
782 channel_max=self.channel_max))
783 self.write_op(ConnectionOpen())
784 self._sasl_encode = True
785
787 self.connection.auth_username = self._sasl.auth_username()
788 self._connected = True
789 self._sasl_decode = True
790 self.connection._transport_connected = True
791
794
800
801
802
803
804
805
808
811
813 sst = self.get_sst(cp)
814 sst.received = cp.command_id
815
817 sst = self.get_sst(sc)
818 for r in sc.commands:
819 sst.acknowledged.add(r.lower, r.upper)
820
821 if not sc.commands.empty():
822 while sst.min_completion in sc.commands:
823 if sst.actions.has_key(sst.min_completion):
824 sst.actions.pop(sst.min_completion)()
825 sst.min_completion += 1
826
828 sst = self.get_sst(kcmp)
829 executed = RangedSet()
830 for e in sst.executed.ranges:
831 for ke in kcmp.ranges:
832 if e.lower in ke and e.upper in ke:
833 break
834 else:
835 executed.add_range(e)
836 sst.executed = completed
837
839 sst = self.get_sst(sf)
840 if sf.expected:
841 if sst.received is None:
842 exp = None
843 else:
844 exp = RangedSet(sst.received)
845 sst.write_op(SessionExpected(exp))
846 if sf.confirmed:
847 sst.write_op(SessionConfirmed(sst.executed))
848 if sf.completed:
849 sst.write_op(SessionCompleted(sst.executed))
850
854
856 sst = self.get_sst(er)
857 sst.results[er.command_id] = er.value
858 sst.executed.add(er.id)
859
864
866 if not self.connection._connected and not self._closing and self._status != CLOSED:
867 self.disconnect()
868
869 if self._connected and not self._closing:
870 for ssn in self.connection.sessions.values():
871 self.attach(ssn)
872 self.process(ssn)
873
874
875
876 if self.connection.heartbeat and self._status != CLOSED and not self._closing:
877 now = time.time()
878 if now - self._last_in > 2*self.connection.heartbeat:
879 raise HeartbeatTimeout(text="heartbeat timeout")
880
881 if self._connected and now - self._last_out >= self.connection.heartbeat/2.0:
882 self.write_op(ConnectionHeartbeat())
883
885 self._reset()
886 self._status = OPEN
887 self._buf += struct.pack(HEADER, "AMQP", 1, 1, 0, 10)
888
890 self.write_op(ConnectionClose(close_code.normal))
891 self._closing = True
892
894 if ssn.closed: return
895 sst = self._attachments.get(ssn)
896 if sst is None:
897 for i in xrange(0, self.channel_max):
898 if not self._sessions.has_key(i):
899 ch = i
900 break
901 else:
902 raise RuntimeError("all channels used")
903 sst = SessionState(self, ssn, ssn.name, ch)
904 sst.write_op(SessionAttach(name=ssn.name, force=self._reconnecting))
905 sst.write_op(SessionCommandPoint(sst.sent, 0))
906 self._reconnecting = False
907 sst.outgoing_idx = 0
908 sst.acked = []
909 sst.acked_idx = 0
910 if ssn.transactional:
911 sst.write_cmd(TxSelect())
912 self._attachments[ssn] = sst
913 self._sessions[sst.channel] = sst
914
915 for snd in ssn.senders:
916 self.link(snd, self._out, snd.target)
917 for rcv in ssn.receivers:
918 self.link(rcv, self._in, rcv.source)
919
920 if sst is not None and ssn.closing and not sst.detached:
921 sst.detached = True
922 sst.write_op(SessionDetach(name=ssn.name))
923
925 return self._sessions[op.channel]
926
928 sst = self._sessions.pop(dtc.channel)
929 ssn = sst.session
930 del self._attachments[ssn]
931 ssn.closed = True
932
937
938 - def link(self, lnk, dir, addr):
939 sst = self._attachments.get(lnk.session)
940 _lnk = self._attachments.get(lnk)
941
942 if _lnk is None and not lnk.closed:
943 _lnk = Attachment(lnk)
944 _lnk.closing = False
945 dir.init_link(sst, lnk, _lnk)
946
947 err = self.parse_address(_lnk, dir, addr) or self.validate_options(_lnk, dir)
948 if err:
949 lnk.error = err
950 lnk.closed = True
951 return
952
953 def linked():
954 lnk.linked = True
955
956 def resolved(type, subtype):
957 dir.do_link(sst, lnk, _lnk, type, subtype, linked)
958
959 self.resolve_declare(sst, _lnk, dir.DIR_NAME, resolved)
960 self._attachments[lnk] = _lnk
961
962 if lnk.linked and lnk.closing and not lnk.closed:
963 if not _lnk.closing:
964 def unlinked():
965 dir.del_link(sst, lnk, _lnk)
966 del self._attachments[lnk]
967 lnk.closed = True
968 if _lnk.options.get("delete") in ("always", dir.DIR_NAME):
969 dir.do_unlink(sst, lnk, _lnk)
970 requested_type = _lnk.options.get("node", {}).get("type")
971 self.delete(sst, _lnk.name, unlinked, node_type=requested_type)
972 else:
973 dir.do_unlink(sst, lnk, _lnk, unlinked)
974 _lnk.closing = True
975 elif not lnk.linked and lnk.closing and not lnk.closed:
976 if lnk.error: lnk.closed = True
977
979 if addr is None:
980 return MalformedAddress(text="%s is None" % dir.ADDR_NAME)
981 else:
982 try:
983 lnk.name, lnk.subject, lnk.options = address.parse(addr)
984
985 if lnk.options is None:
986 lnk.options = {}
987 if isinstance(addr, MangledString):
988 lnk.options['create'] = "always"
989 if 'node' not in lnk.options:
990 lnk.options['node'] = {}
991 if 'x-declare' not in lnk.options['node']:
992 lnk.options['node']['x-declare'] = {}
993 xdeclare = lnk.options['node']['x-declare']
994 if 'auto-delete' not in xdeclare:
995 xdeclare['auto-delete'] = "True"
996 if 'exclusive' not in xdeclare:
997 xdeclare['exclusive'] = "True"
998 except address.LexError, e:
999 return MalformedAddress(text=str(e))
1000 except address.ParseError, e:
1001 return MalformedAddress(text=str(e))
1002
1004 ctx = Context()
1005 err = dir.VALIDATOR.validate(lnk.options, ctx)
1006 if err: return InvalidOption(text="error in options: %s" % err)
1007
1009 declare = lnk.options.get("create") in ("always", dir)
1010 assrt = lnk.options.get("assert") in ("always", dir)
1011 requested_type = lnk.options.get("node", {}).get("type")
1012 def do_resolved(type, subtype):
1013 err = None
1014 if type is None:
1015 if declare:
1016 err = self.declare(sst, lnk, action, True)
1017 else:
1018 err = NotFound(text="no such %s: %s" % (requested_type or "queue", lnk.name))
1019 else:
1020 if assrt:
1021 expected = lnk.options.get("node", {}).get("type")
1022 if expected and type != expected:
1023 if declare:
1024 err = self.declare(sst, lnk, action, True)
1025 else:
1026 err = AssertionFailed(text="expected %s, got %s" % (expected, type))
1027 if "node" in lnk.options and "x-bindings" in lnk.options["node"]:
1028 err = self.declare(sst, lnk, action, False)
1029 if err is None:
1030 action(type, subtype)
1031
1032 if err:
1033 tgt = lnk.target
1034 tgt.error = err
1035 del self._attachments[tgt]
1036 tgt.closed = True
1037 return
1038 self.resolve(sst, lnk.name, do_resolved, node_type=requested_type, force=declare)
1039
1040 - def resolve(self, sst, name, action, force=False, node_type=None, delete=False):
1041 if not force and not node_type:
1042 try:
1043 type, subtype = self.address_cache[name]
1044 action(type, subtype)
1045 return
1046 except KeyError:
1047 pass
1048
1049 args = { "topic":None, "queue":None }
1050 def do_result(r, obj):
1051 args[obj] = r
1052 def do_action():
1053 er = args["topic"]
1054 qr = args["queue"]
1055 if node_type == "topic" and er and not er.not_found:
1056 type, subtype = "topic", er.type
1057 elif node_type == "queue" and qr and qr.queue:
1058 type, subtype = "queue", None
1059 elif (er and er.not_found) and qr and not qr.queue:
1060 type, subtype = None, None
1061 elif (qr and qr.queue):
1062 if node_type == "topic" and force:
1063 type, subtype = None, None
1064 else:
1065 type, subtype = "queue", None
1066 elif (er and not er.not_found):
1067 if node_type == "queue" and force:
1068 type, subtype = None, None
1069 else:
1070 type, subtype = "topic", er.type
1071 elif er:
1072 if er.not_found:
1073 type, subtype = None, None
1074 else:
1075 type, subtype = "topic", er.type
1076 else:
1077 type, subtype = None, None
1078 if type is not None:
1079 self.address_cache[name] = (type, subtype)
1080 action(type, subtype)
1081 def do_result_and_action(r, obj):
1082 do_result(r, obj)
1083 do_action()
1084 if (node_type is None):
1085 sst.write_query(ExchangeQuery(name), do_result, "topic")
1086 sst.write_query(QueueQuery(name), do_result_and_action, "queue")
1087 elif force and not delete:
1088 do_action()
1089 elif node_type == "topic":
1090 sst.write_query(ExchangeQuery(name), do_result_and_action, "topic")
1091 else:
1092 sst.write_query(QueueQuery(name), do_result_and_action, "queue")
1093
1094 - def declare(self, sst, lnk, action, create_node):
1095 name = lnk.name
1096 props = lnk.options.get("node", {})
1097 durable = props.get("durable", DURABLE_DEFAULT)
1098 type = props.get("type", "queue")
1099 declare = props.get("x-declare", {})
1100
1101 cmd = None
1102 if type == "topic":
1103 if create_node: cmd = ExchangeDeclare(exchange=name, durable=durable)
1104 bindings = get_bindings(props, exchange=name)
1105 elif type == "queue":
1106 if create_node: cmd = QueueDeclare(queue=name, durable=durable)
1107 bindings = get_bindings(props, queue=name)
1108 else:
1109 raise ValueError(type)
1110
1111 if cmd is not None:
1112 sst.apply_overrides(cmd, declare)
1113 if type == "topic":
1114 if cmd.type is None:
1115 cmd.type = "topic"
1116 subtype = cmd.type
1117 else:
1118 subtype = None
1119 cmds = [cmd]
1120 else:
1121 cmds = []
1122
1123 cmds.extend(bindings)
1124
1125 def declared():
1126 if create_node:
1127 self.address_cache[name] = (type, subtype)
1128 action(type, subtype)
1129
1130 sst.write_cmds(cmds, declared)
1131
1132 - def delete(self, sst, name, action, node_type=None):
1133 def deleted():
1134 del self.address_cache[name]
1135 action()
1136
1137 def do_delete(type, subtype):
1138 if type == "topic":
1139 sst.write_cmd(ExchangeDelete(name), deleted)
1140 elif type == "queue":
1141 sst.write_cmd(QueueDelete(name), deleted)
1142 elif type is None:
1143 action()
1144 else:
1145 raise ValueError(type)
1146 self.resolve(sst, name, do_delete, force=True, node_type=node_type, delete=True)
1147
1149 if ssn.closed or ssn.closing: return
1150
1151 sst = self._attachments[ssn]
1152
1153 while sst.outgoing_idx < len(ssn.outgoing):
1154 msg = ssn.outgoing[sst.outgoing_idx]
1155 snd = msg._sender
1156
1157 _snd = self._attachments.get(snd)
1158 if _snd and snd.linked:
1159 self.send(snd, msg)
1160 sst.outgoing_idx += 1
1161 else:
1162 break
1163
1164 for snd in ssn.senders:
1165
1166 if snd.synced >= snd.queued and sst.need_sync:
1167 sst.write_cmd(ExecutionSync(), sync_noop)
1168
1169 for rcv in ssn.receivers:
1170 self.process_receiver(rcv)
1171
1172 if ssn.acked:
1173 messages = ssn.acked[sst.acked_idx:]
1174 if messages:
1175 ids = RangedSet()
1176
1177 disposed = [(DEFAULT_DISPOSITION, [])]
1178 acked = []
1179 for m in messages:
1180
1181
1182 if m._transfer_id is None:
1183 acked.append(m)
1184 continue
1185 ids.add(m._transfer_id)
1186 if m._receiver._accept_mode is accept_mode.explicit:
1187 disp = m._disposition or DEFAULT_DISPOSITION
1188 last, msgs = disposed[-1]
1189 if disp.type is last.type and disp.options == last.options:
1190 msgs.append(m)
1191 else:
1192 disposed.append((disp, [m]))
1193 else:
1194 acked.append(m)
1195
1196 for range in ids:
1197 sst.executed.add_range(range)
1198 sst.write_op(SessionCompleted(sst.executed))
1199
1200 def ack_acker(msgs):
1201 def ack_ack():
1202 for m in msgs:
1203 ssn.acked.remove(m)
1204 sst.acked_idx -= 1
1205
1206 if not ssn.transactional:
1207 sst.acked.remove(m)
1208 return ack_ack
1209
1210 for disp, msgs in disposed:
1211 if not msgs: continue
1212 if disp.type is None:
1213 op = MessageAccept
1214 elif disp.type is RELEASED:
1215 op = MessageRelease
1216 elif disp.type is REJECTED:
1217 op = MessageReject
1218 sst.write_cmd(op(RangedSet(*[m._transfer_id for m in msgs]),
1219 **disp.options),
1220 ack_acker(msgs))
1221 if log.isEnabledFor(DEBUG):
1222 for m in msgs:
1223 log.debug("SACK[%s]: %s, %s", ssn.log_id, m, m._disposition)
1224
1225 sst.acked.extend(messages)
1226 sst.acked_idx += len(messages)
1227 ack_acker(acked)()
1228
1229 if ssn.committing and not sst.committing:
1230 def commit_ok():
1231 del sst.acked[:]
1232 ssn.committing = False
1233 ssn.committed = True
1234 ssn.aborting = False
1235 ssn.aborted = False
1236 sst.committing = False
1237 sst.write_cmd(TxCommit(), commit_ok)
1238 sst.committing = True
1239
1240 if ssn.aborting and not sst.aborting:
1241 sst.aborting = True
1242 def do_rb():
1243 messages = sst.acked + ssn.unacked + ssn.incoming
1244 ids = RangedSet(*[m._transfer_id for m in messages])
1245 for range in ids:
1246 sst.executed.add_range(range)
1247 sst.write_op(SessionCompleted(sst.executed))
1248 sst.write_cmd(MessageRelease(ids, True))
1249 sst.write_cmd(TxRollback(), do_rb_ok)
1250
1251 def do_rb_ok():
1252 del ssn.incoming[:]
1253 del ssn.unacked[:]
1254 del sst.acked[:]
1255
1256 for rcv in ssn.receivers:
1257 rcv.impending = rcv.received
1258 rcv.returned = rcv.received
1259
1260
1261 for rcv in ssn.receivers:
1262 self.process_receiver(rcv)
1263
1264 ssn.aborting = False
1265 ssn.aborted = True
1266 ssn.committing = False
1267 ssn.committed = False
1268 sst.aborting = False
1269
1270 for rcv in ssn.receivers:
1271 _rcv = self._attachments[rcv]
1272 sst.write_cmd(MessageStop(_rcv.destination))
1273 sst.write_cmd(ExecutionSync(), do_rb)
1274
1276 sst = self._attachments[rcv.session]
1277 _rcv = self._attachments.get(rcv)
1278 if _rcv is None or not rcv.linked or _rcv.closing or _rcv.draining:
1279 return
1280
1281 if rcv.granted is UNLIMITED:
1282 if rcv.impending is UNLIMITED:
1283 delta = 0
1284 else:
1285 delta = UNLIMITED
1286 elif rcv.impending is UNLIMITED:
1287 delta = -1
1288 else:
1289 delta = max(rcv.granted, rcv.received) - rcv.impending
1290
1291 if delta is UNLIMITED:
1292 if not _rcv.bytes_open:
1293 sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.byte, UNLIMITED.value))
1294 _rcv.bytes_open = True
1295 sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.message, UNLIMITED.value))
1296 rcv.impending = UNLIMITED
1297 elif delta > 0:
1298 if not _rcv.bytes_open:
1299 sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.byte, UNLIMITED.value))
1300 _rcv.bytes_open = True
1301 sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.message, delta))
1302 rcv.impending += delta
1303 elif delta < 0 and not rcv.draining:
1304 _rcv.draining = True
1305 def do_stop():
1306 rcv.impending = rcv.received
1307 _rcv.draining = False
1308 _rcv.bytes_open = False
1309 self.grant(rcv)
1310 sst.write_cmd(MessageStop(_rcv.destination), do_stop)
1311
1312 if rcv.draining:
1313 _rcv.draining = True
1314 def do_flush():
1315 rcv.impending = rcv.received
1316 rcv.granted = rcv.impending
1317 _rcv.draining = False
1318 _rcv.bytes_open = False
1319 rcv.draining = False
1320 sst.write_cmd(MessageFlush(_rcv.destination), do_flush)
1321
1322
1324 if rcv.closed: return
1325 self.grant(rcv)
1326
1327 - def send(self, snd, msg):
1328 sst = self._attachments[snd.session]
1329 _snd = self._attachments[snd]
1330
1331 if msg.subject is None or _snd._exchange == "":
1332 rk = _snd._routing_key
1333 else:
1334 rk = msg.subject
1335
1336 if msg.subject is None:
1337 subject = _snd.subject
1338 else:
1339 subject = msg.subject
1340
1341
1342 if msg.reply_to:
1343 rt = addr2reply_to(msg.reply_to)
1344 else:
1345 rt = None
1346 content_encoding = msg.properties.get("x-amqp-0-10.content-encoding")
1347 dp = DeliveryProperties(routing_key=rk)
1348 mp = MessageProperties(message_id=msg.id,
1349 user_id=msg.user_id,
1350 reply_to=rt,
1351 correlation_id=msg.correlation_id,
1352 app_id = msg.properties.get("x-amqp-0-10.app-id"),
1353 content_type=msg.content_type,
1354 content_encoding=content_encoding,
1355 application_headers=msg.properties)
1356 if subject is not None:
1357 if mp.application_headers is None:
1358 mp.application_headers = {}
1359 mp.application_headers[SUBJECT] = subject
1360 if msg.durable is not None:
1361 if msg.durable:
1362 dp.delivery_mode = delivery_mode.persistent
1363 else:
1364 dp.delivery_mode = delivery_mode.non_persistent
1365 if msg.priority is not None:
1366 dp.priority = msg.priority
1367 if msg.ttl is not None:
1368 dp.ttl = long(msg.ttl*1000)
1369 enc, dec = get_codec(msg.content_type)
1370 try:
1371 body = enc(msg.content)
1372 except AttributeError, e:
1373
1374 raise EncodeError(e)
1375
1376
1377 def msg_acked():
1378
1379 snd.acked += 1
1380 m = snd.session.outgoing.pop(0)
1381 sst.outgoing_idx -= 1
1382 log.debug("RACK[%s]: %s", sst.session.log_id, msg)
1383 assert msg == m
1384
1385 xfr = MessageTransfer(destination=_snd._exchange, headers=(dp, mp),
1386 payload=body)
1387
1388 if _snd.pre_ack:
1389 sst.write_cmd(xfr)
1390 else:
1391 sst.write_cmd(xfr, msg_acked, sync=msg._sync)
1392
1393 log.debug("SENT[%s]: %s", sst.session.log_id, msg)
1394
1395 if _snd.pre_ack:
1396 msg_acked()
1397
1399 sst = self.get_sst(xfr)
1400 ssn = sst.session
1401
1402 msg = self._decode(xfr)
1403 rcv = sst.destinations[xfr.destination].target
1404 msg._receiver = rcv
1405 if rcv.closing or rcv.closed:
1406 ids = RangedSet(*[msg._transfer_id])
1407 log.debug("releasing back %s message: %s, as receiver is closing", ids, msg)
1408 sst.write_cmd(MessageRelease(ids, True))
1409 return
1410 if rcv.impending is not UNLIMITED:
1411 assert rcv.received < rcv.impending, "%s, %s" % (rcv.received, rcv.impending)
1412 rcv.received += 1
1413 log.debug("RCVD[%s]: %s", ssn.log_id, msg)
1414 ssn._notify_message_received(msg)
1415
1416
1418 dp = EMPTY_DP
1419 mp = EMPTY_MP
1420
1421 for h in xfr.headers:
1422 if isinstance(h, DeliveryProperties):
1423 dp = h
1424 elif isinstance(h, MessageProperties):
1425 mp = h
1426
1427 ap = mp.application_headers
1428 enc, dec = get_codec(mp.content_type)
1429 try:
1430 content = dec(xfr.payload)
1431 except Exception, e:
1432 raise DecodeError(e)
1433 msg = Message(content)
1434 msg.id = mp.message_id
1435 if ap is not None:
1436 msg.subject = ap.get(SUBJECT)
1437 msg.user_id = mp.user_id
1438 if mp.reply_to is not None:
1439 msg.reply_to = reply_to2addr(mp.reply_to)
1440 msg.correlation_id = mp.correlation_id
1441 if dp.delivery_mode is not None:
1442 msg.durable = dp.delivery_mode == delivery_mode.persistent
1443 msg.priority = dp.priority
1444 if dp.ttl is not None:
1445 msg.ttl = dp.ttl/1000.0
1446 msg.redelivered = dp.redelivered
1447 msg.properties = mp.application_headers or {}
1448 if mp.app_id is not None:
1449 msg.properties["x-amqp-0-10.app-id"] = mp.app_id
1450 if mp.content_encoding is not None:
1451 msg.properties["x-amqp-0-10.content-encoding"] = mp.content_encoding
1452 if dp.routing_key is not None:
1453 msg.properties["x-amqp-0-10.routing-key"] = dp.routing_key
1454 if dp.timestamp is not None:
1455 msg.properties["x-amqp-0-10.timestamp"] = dp.timestamp
1456 msg.content_type = mp.content_type
1457 msg._transfer_id = xfr.id
1458 return msg
1459