1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 import socket, struct, sys, time
21 from logging import getLogger, DEBUG
22 from qpid import compat
23 from qpid import sasl
24 from qpid.concurrency import synchronized
25 from qpid.datatypes import RangedSet, Serial
26 from qpid.framing import OpEncoder, SegmentEncoder, FrameEncoder, \
27 FrameDecoder, SegmentDecoder, OpDecoder
28 from qpid.messaging import address, transports
29 from qpid.messaging.constants import UNLIMITED, REJECTED, RELEASED
30 from qpid.messaging.exceptions import *
31 from qpid.messaging.message import get_codec, Disposition, Message
32 from qpid.ops import *
33 from qpid.selector import Selector
34 from qpid.util import URL, default
35 from qpid.validator import And, Context, List, Map, Types, Values
36 from threading import Condition, Thread
37
38 log = getLogger("qpid.messaging")
39 rawlog = getLogger("qpid.messaging.io.raw")
40 opslog = getLogger("qpid.messaging.io.ops")
43 name, subject, options = address.parse(addr)
44 if options:
45 type = options.get("node", {}).get("type")
46 else:
47 type = None
48
49 if type == "topic":
50 return ReplyTo(name, subject)
51 else:
52 return ReplyTo(None, name)
53
55 if reply_to.exchange in (None, ""):
56 return reply_to.routing_key
57 elif reply_to.routing_key is None:
58 return "%s; {node: {type: topic}}" % reply_to.exchange
59 else:
60 return "%s/%s; {node: {type: topic}}" % (reply_to.exchange, reply_to.routing_key)
61
66
67
68
69 DURABLE_DEFAULT=False
74 """
75 The pattern filter matches the supplied wildcard pattern against a
76 message subject.
77 """
78
81
82
83 - def _bind(self, sst, exchange, queue):
84 from qpid.ops import ExchangeBind
85
86 sst.write_cmd(ExchangeBind(exchange=exchange, queue=queue,
87 binding_key=self.value.replace("*", "#")))
88
89 SUBJECT_DEFAULTS = {
90 "topic": "#"
91 }
92
93
94 ppid = 0
95 try:
96 ppid = os.getppid()
97 except:
98 pass
99
100 CLIENT_PROPERTIES = {"product": "qpid python client",
101 "version": "development",
102 "platform": os.name,
103 "qpid.client_process": os.path.basename(sys.argv[0]),
104 "qpid.client_pid": os.getpid(),
105 "qpid.client_ppid": ppid}
106
107 -def noop(): pass
109
111
112 - def __init__(self, driver, session, name, channel):
113 self.driver = driver
114 self.session = session
115 self.name = name
116 self.channel = channel
117 self.detached = False
118 self.committing = False
119 self.aborting = False
120
121
122 self.sent = Serial(0)
123 self.acknowledged = RangedSet()
124 self.actions = {}
125 self.min_completion = self.sent
126 self.max_completion = self.sent
127 self.results = {}
128 self.need_sync = False
129
130
131 self.received = None
132 self.executed = RangedSet()
133
134
135
136 self.destinations = {}
137
139 id = self.sent
140 self.write_cmd(query, lambda: handler(self.results.pop(id)))
141
143 for k, v in overrides.items():
144 cmd[k.replace('-', '_')] = v
145
146 - def write_cmd(self, cmd, action=noop, overrides=None, sync=True):
147 if overrides:
148 self.apply_overrides(cmd, overrides)
149
150 if action != noop:
151 cmd.sync = sync
152 if self.detached:
153 raise Exception("detached")
154 cmd.id = self.sent
155 self.sent += 1
156 self.actions[cmd.id] = action
157 self.max_completion = cmd.id
158 self.write_op(cmd)
159 self.need_sync = not cmd.sync
160
162 if cmds:
163 for cmd in cmds[:-1]:
164 self.write_cmd(cmd)
165 self.write_cmd(cmds[-1], action)
166 else:
167 action()
168
172
173 POLICIES = Values("always", "sender", "receiver", "never")
174 RELIABILITY = Values("unreliable", "at-most-once", "at-least-once",
175 "exactly-once")
176
177 DECLARE = Map({}, restricted=False)
178 BINDINGS = List(Map({
179 "exchange": Types(basestring),
180 "queue": Types(basestring),
181 "key": Types(basestring),
182 "arguments": Map({}, restricted=False)
183 }))
184
185 COMMON_OPTS = {
186 "create": POLICIES,
187 "delete": POLICIES,
188 "assert": POLICIES,
189 "node": Map({
190 "type": Values("queue", "topic"),
191 "durable": Types(bool),
192 "x-declare": DECLARE,
193 "x-bindings": BINDINGS
194 }),
195 "link": Map({
196 "name": Types(basestring),
197 "durable": Types(bool),
198 "reliability": RELIABILITY,
199 "x-declare": DECLARE,
200 "x-bindings": BINDINGS,
201 "x-subscribe": Map({}, restricted=False)
202 })
203 }
204
205 RECEIVE_MODES = Values("browse", "consume")
206
207 SOURCE_OPTS = COMMON_OPTS.copy()
208 SOURCE_OPTS.update({
209 "mode": RECEIVE_MODES
210 })
211
212 TARGET_OPTS = COMMON_OPTS.copy()
215
216 ADDR_NAME = "source"
217 DIR_NAME = "receiver"
218 VALIDATOR = Map(SOURCE_OPTS)
219
221 _rcv.destination = str(rcv.id)
222 sst.destinations[_rcv.destination] = _rcv
223 _rcv.draining = False
224 _rcv.bytes_open = False
225 _rcv.on_unlink = []
226
227 - def do_link(self, sst, rcv, _rcv, type, subtype, action):
228 link_opts = _rcv.options.get("link", {})
229 reliability = link_opts.get("reliability", "at-least-once")
230 declare = link_opts.get("x-declare", {})
231 subscribe = link_opts.get("x-subscribe", {})
232 acq_mode = acquire_mode.pre_acquired
233 if reliability in ("unreliable", "at-most-once"):
234 rcv._accept_mode = accept_mode.none
235 else:
236 rcv._accept_mode = accept_mode.explicit
237
238 if type == "topic":
239 default_name = "%s.%s" % (rcv.session.name, _rcv.destination)
240 _rcv._queue = link_opts.get("name", default_name)
241 sst.write_cmd(QueueDeclare(queue=_rcv._queue,
242 durable=link_opts.get("durable", False),
243 exclusive=True,
244 auto_delete=(reliability == "unreliable")),
245 overrides=declare)
246 _rcv.on_unlink = [QueueDelete(_rcv._queue)]
247 subject = _rcv.subject or SUBJECT_DEFAULTS.get(subtype)
248 bindings = get_bindings(link_opts, _rcv._queue, _rcv.name, subject)
249 if not bindings:
250 sst.write_cmd(ExchangeBind(_rcv._queue, _rcv.name, subject))
251
252 elif type == "queue":
253 _rcv._queue = _rcv.name
254 if _rcv.options.get("mode", "consume") == "browse":
255 acq_mode = acquire_mode.not_acquired
256 bindings = get_bindings(link_opts, queue=_rcv._queue)
257
258
259 sst.write_cmds(bindings)
260 sst.write_cmd(MessageSubscribe(queue=_rcv._queue,
261 destination=_rcv.destination,
262 acquire_mode = acq_mode,
263 accept_mode = rcv._accept_mode),
264 overrides=subscribe)
265 sst.write_cmd(MessageSetFlowMode(_rcv.destination, flow_mode.credit), action)
266
268 link_opts = _rcv.options.get("link", {})
269 reliability = link_opts.get("reliability")
270 cmds = [MessageCancel(_rcv.destination)]
271 cmds.extend(_rcv.on_unlink)
272 sst.write_cmds(cmds, action)
273
275 del sst.destinations[_rcv.destination]
276
278
279 ADDR_NAME = "target"
280 DIR_NAME = "sender"
281 VALIDATOR = Map(TARGET_OPTS)
282
284 _snd.closing = False
285 _snd.pre_ack = False
286
287 - def do_link(self, sst, snd, _snd, type, subtype, action):
288 link_opts = _snd.options.get("link", {})
289 reliability = link_opts.get("reliability", "at-least-once")
290 _snd.pre_ack = reliability in ("unreliable", "at-most-once")
291 if type == "topic":
292 _snd._exchange = _snd.name
293 _snd._routing_key = _snd.subject
294 bindings = get_bindings(link_opts, exchange=_snd.name, key=_snd.subject)
295 elif type == "queue":
296 _snd._exchange = ""
297 _snd._routing_key = _snd.name
298 bindings = get_bindings(link_opts, queue=_snd.name)
299 sst.write_cmds(bindings, action)
300
303
306
308
310 self.ttl = ttl
311 self.entries = {}
312
314 self.entries[key] = time.time(), value
315
317 tstamp, value = self.entries[key]
318 if time.time() - tstamp >= self.ttl:
319 del self.entries[key]
320 raise KeyError(key)
321 else:
322 return value
323
325 del self.entries[key]
326
327
328 HEADER="!4s4B"
329
330 EMPTY_DP = DeliveryProperties()
331 EMPTY_MP = MessageProperties()
332
333 SUBJECT = "qpid.subject"
334
335 CLOSED = "CLOSED"
336 READ_ONLY = "READ_ONLY"
337 WRITE_ONLY = "WRITE_ONLY"
338 OPEN = "OPEN"
341
343 self.connection = connection
344 self.log_id = "%x" % id(self.connection)
345 self._lock = self.connection._lock
346
347 self._selector = Selector.default()
348 self._attempts = 0
349 self._delay = self.connection.reconnect_interval_min
350 self._reconnect_log = self.connection.reconnect_log
351 self._host = 0
352 self._retrying = False
353 self._next_retry = None
354 self._transport = None
355
356 self._timeout = None
357
358 self.engine = None
359
361 urls = [URL(u) for u in self.connection.reconnect_urls]
362 hosts = [(self.connection.host, default(self.connection.port, 5672))] + \
363 [(u.host, default(u.port, 5672)) for u in urls]
364 if self._host >= len(hosts):
365 self._host = 0
366 result = hosts[self._host]
367 if self._host == 0:
368 self._attempts += 1
369 self._host = self._host + 1
370 return result
371
373 return len(self.connection.reconnect_urls) + 1
374
375 @synchronized
379
381 self._selector.register(self)
382
384 self._selector.unregister(self)
385 if self._transport:
386 self.st_closed()
387
389 return self._transport.fileno()
390
391 @synchronized
393 return self._transport is not None and \
394 self._transport.reading(True)
395
396 @synchronized
398 return self._transport is not None and \
399 self._transport.writing(self.engine.pending())
400
401 @synchronized
404
405 @synchronized
422
424 if self.connection.error:
425 self.connection._condition.gc()
426 self.connection._waiter.notifyAll()
427
429 if e is None:
430 e = ConnectionError(text="connection aborted")
431
432 if (self.connection.reconnect and
433 (self.connection.reconnect_limit is None or
434 self.connection.reconnect_limit <= 0 or
435 self._attempts <= self.connection.reconnect_limit)):
436 if self._host < self._num_hosts():
437 delay = 0
438 else:
439 delay = self._delay
440 self._delay = min(2*self._delay,
441 self.connection.reconnect_interval_max)
442 self._next_retry = time.time() + delay
443 if self._reconnect_log:
444 log.warn("recoverable error[attempt %s]: %s" % (self._attempts, e))
445 if delay > 0:
446 log.warn("sleeping %s seconds" % delay)
447 self._retrying = True
448 self.engine.close()
449 else:
450 self.engine.close(e)
451
452 self.schedule()
453
457
459
460
461 self._transport.close()
462 self._transport = None
463 self.engine = None
464 return True
465
468
469 @synchronized
471 notify = False
472 try:
473 n = self._transport.send(self.engine.peek())
474 if n == 0: return
475 sent = self.engine.read(n)
476 rawlog.debug("SENT[%s]: %r", self.log_id, sent)
477 except socket.error, e:
478 self.close_engine(e)
479 notify = True
480
481 if self.update_status() or notify:
482 self._notify()
483
484 @synchronized
489
491 times = []
492 if self.connection.heartbeat:
493 times.append(time.time() + self.connection.heartbeat)
494 if self._next_retry:
495 times.append(self._next_retry)
496 if times:
497 self._timeout = min(times)
498 else:
499 self._timeout = None
500
502 try:
503 if self._transport is None:
504 if self.connection._connected and not self.connection.error:
505 self.connect()
506 else:
507 self.engine.dispatch()
508 except HeartbeatTimeout, e:
509 self.close_engine(e)
510 except:
511
512 msg = compat.format_exc()
513 self.connection.error = InternalError(text=msg)
514
516 if self._retrying and time.time() < self._next_retry:
517 return
518
519 try:
520
521 host, port = self._next_host()
522 if self._retrying and self._reconnect_log:
523 log.warn("trying: %s:%s", host, port)
524 self.engine = Engine(self.connection)
525 self.engine.open()
526 rawlog.debug("OPEN[%s]: %s:%s", self.log_id, host, port)
527 trans = transports.TRANSPORTS.get(self.connection.transport)
528 if trans:
529 self._transport = trans(self.connection, host, port)
530 else:
531 raise ConnectError("no such transport: %s" % self.connection.transport)
532 if self._retrying and self._reconnect_log:
533 log.warn("reconnect succeeded: %s:%s", host, port)
534 self._next_retry = None
535 self._attempts = 0
536 self._delay = self.connection.reconnect_interval_min
537 self._retrying = False
538 self.schedule()
539 except socket.error, e:
540 self.close_engine(ConnectError(text=str(e)))
541
542 DEFAULT_DISPOSITION = Disposition(None)
543
544 -def get_bindings(opts, queue=None, exchange=None, key=None):
545 bindings = opts.get("x-bindings", [])
546 cmds = []
547 for b in bindings:
548 exchange = b.get("exchange", exchange)
549 queue = b.get("queue", queue)
550 key = b.get("key", key)
551 args = b.get("arguments", {})
552 cmds.append(ExchangeBind(queue, exchange, key, args))
553 return cmds
554
555 CONNECTION_ERRS = {
556
557
558 }
559
560 SESSION_ERRS = {
561
562 error_code.unauthorized_access: UnauthorizedAccess,
563 error_code.not_found: NotFound,
564 error_code.resource_locked: ReceiverError,
565 error_code.resource_limit_exceeded: TargetCapacityExceeded,
566 error_code.internal_error: ServerError
567 }
570
572 self.connection = connection
573 self.log_id = "%x" % id(self.connection)
574 self._closing = False
575 self._connected = False
576 self._attachments = {}
577
578 self._in = LinkIn()
579 self._out = LinkOut()
580
581 self._channel_max = 65536
582 self._channels = 0
583 self._sessions = {}
584
585 self.address_cache = Cache(self.connection.address_ttl)
586
587 self._status = CLOSED
588 self._buf = ""
589 self._hdr = ""
590 self._last_in = None
591 self._last_out = None
592 self._op_enc = OpEncoder()
593 self._seg_enc = SegmentEncoder()
594 self._frame_enc = FrameEncoder()
595 self._frame_dec = FrameDecoder()
596 self._seg_dec = SegmentDecoder()
597 self._op_dec = OpDecoder()
598
599 self._sasl = sasl.Client()
600 if self.connection.username:
601 self._sasl.setAttr("username", self.connection.username)
602 if self.connection.password:
603 self._sasl.setAttr("password", self.connection.password)
604 if self.connection.host:
605 self._sasl.setAttr("host", self.connection.host)
606 self._sasl.setAttr("service", self.connection.sasl_service)
607 if self.connection.sasl_min_ssf is not None:
608 self._sasl.setAttr("minssf", self.connection.sasl_min_ssf)
609 if self.connection.sasl_max_ssf is not None:
610 self._sasl.setAttr("maxssf", self.connection.sasl_max_ssf)
611 self._sasl.init()
612 self._sasl_encode = False
613 self._sasl_decode = False
614
616 self.connection._transport_connected = False
617
618 for ssn in self.connection.sessions.values():
619 for m in ssn.acked + ssn.unacked + ssn.incoming:
620 m._transfer_id = None
621 for snd in ssn.senders:
622 snd.linked = False
623 for rcv in ssn.receivers:
624 rcv.impending = rcv.received
625 rcv.linked = False
626
629
631 self._last_in = time.time()
632 try:
633 if self._sasl_decode:
634 data = self._sasl.decode(data)
635
636 if len(self._hdr) < 8:
637 r = 8 - len(self._hdr)
638 self._hdr += data[:r]
639 data = data[r:]
640
641 if len(self._hdr) == 8:
642 self.do_header(self._hdr)
643
644 self._frame_dec.write(data)
645 self._seg_dec.write(*self._frame_dec.read())
646 self._op_dec.write(*self._seg_dec.read())
647 for op in self._op_dec.read():
648 self.assign_id(op)
649 opslog.debug("RCVD[%s]: %r", self.log_id, op)
650 op.dispatch(self)
651 self.dispatch()
652 except MessagingError, e:
653 self.close(e)
654 except:
655 self.close(InternalError(text=compat.format_exc()))
656
657 - def close(self, e=None):
658 self._reset()
659 if e:
660 self.connection.error = e
661 self._status = CLOSED
662
664 if isinstance(op, Command):
665 sst = self.get_sst(op)
666 op.id = sst.received
667 sst.received += 1
668
670 return len(self._buf)
671
673 result = self._buf[:n]
674 self._buf = self._buf[n:]
675 return result
676
679
681 opslog.debug("SENT[%s]: %r", self.log_id, op)
682 self._op_enc.write(op)
683 self._seg_enc.write(*self._op_enc.read())
684 self._frame_enc.write(*self._seg_enc.read())
685 bytes = self._frame_enc.read()
686 if self._sasl_encode:
687 bytes = self._sasl.encode(bytes)
688 self._buf += bytes
689 self._last_out = time.time()
690
692 cli_major = 0; cli_minor = 10
693 magic, _, _, major, minor = struct.unpack(HEADER, hdr)
694 if major != cli_major or minor != cli_minor:
695 raise VersionError(text="client: %s-%s, server: %s-%s" %
696 (cli_major, cli_minor, major, minor))
697
699 if self.connection.sasl_mechanisms:
700 permitted = self.connection.sasl_mechanisms.split()
701 mechs = [m for m in start.mechanisms if m in permitted]
702 else:
703 mechs = start.mechanisms
704 try:
705 mech, initial = self._sasl.start(" ".join(mechs))
706 except sasl.SASLError, e:
707 raise AuthenticationFailure(text=str(e))
708
709 client_properties = CLIENT_PROPERTIES.copy()
710 client_properties.update(self.connection.client_properties)
711 self.write_op(ConnectionStartOk(client_properties=client_properties,
712 mechanism=mech, response=initial))
713
715 resp = self._sasl.step(secure.challenge)
716 self.write_op(ConnectionSecureOk(response=resp))
717
719
720 if tune.channel_max is not None:
721 self.channel_max = tune.channel_max
722 self.write_op(ConnectionTuneOk(heartbeat=self.connection.heartbeat,
723 channel_max=self.channel_max))
724 self.write_op(ConnectionOpen())
725 self._sasl_encode = True
726
728 self.connection.auth_username = self._sasl.auth_username()
729 self._connected = True
730 self._sasl_decode = True
731 self.connection._transport_connected = True
732
735
741
742
743
744
745
746
749
752
754 sst = self.get_sst(cp)
755 sst.received = cp.command_id
756
758 sst = self.get_sst(sc)
759 for r in sc.commands:
760 sst.acknowledged.add(r.lower, r.upper)
761
762 if not sc.commands.empty():
763 while sst.min_completion in sc.commands:
764 if sst.actions.has_key(sst.min_completion):
765 sst.actions.pop(sst.min_completion)()
766 sst.min_completion += 1
767
769 sst = self.get_sst(kcmp)
770 executed = RangedSet()
771 for e in sst.executed.ranges:
772 for ke in kcmp.ranges:
773 if e.lower in ke and e.upper in ke:
774 break
775 else:
776 executed.add_range(e)
777 sst.executed = completed
778
780 sst = self.get_sst(sf)
781 if sf.expected:
782 if sst.received is None:
783 exp = None
784 else:
785 exp = RangedSet(sst.received)
786 sst.write_op(SessionExpected(exp))
787 if sf.confirmed:
788 sst.write_op(SessionConfirmed(sst.executed))
789 if sf.completed:
790 sst.write_op(SessionCompleted(sst.executed))
791
795
797 sst = self.get_sst(er)
798 sst.results[er.command_id] = er.value
799 sst.executed.add(er.id)
800
805
807 if not self.connection._connected and not self._closing and self._status != CLOSED:
808 self.disconnect()
809
810 if self._connected and not self._closing:
811 for ssn in self.connection.sessions.values():
812 self.attach(ssn)
813 self.process(ssn)
814
815 if self.connection.heartbeat and self._status != CLOSED:
816 now = time.time()
817 if self._last_in is not None and \
818 now - self._last_in > 2*self.connection.heartbeat:
819 raise HeartbeatTimeout(text="heartbeat timeout")
820 if self._last_out is None or now - self._last_out >= self.connection.heartbeat/2.0:
821 self.write_op(ConnectionHeartbeat())
822
824 self._reset()
825 self._status = OPEN
826 self._buf += struct.pack(HEADER, "AMQP", 1, 1, 0, 10)
827
829 self.write_op(ConnectionClose(close_code.normal))
830 self._closing = True
831
833 if ssn.closed: return
834 sst = self._attachments.get(ssn)
835 if sst is None:
836 for i in xrange(0, self.channel_max):
837 if not self._sessions.has_key(i):
838 ch = i
839 break
840 else:
841 raise RuntimeError("all channels used")
842 sst = SessionState(self, ssn, ssn.name, ch)
843 sst.write_op(SessionAttach(name=ssn.name))
844 sst.write_op(SessionCommandPoint(sst.sent, 0))
845 sst.outgoing_idx = 0
846 sst.acked = []
847 sst.acked_idx = 0
848 if ssn.transactional:
849 sst.write_cmd(TxSelect())
850 self._attachments[ssn] = sst
851 self._sessions[sst.channel] = sst
852
853 for snd in ssn.senders:
854 self.link(snd, self._out, snd.target)
855 for rcv in ssn.receivers:
856 self.link(rcv, self._in, rcv.source)
857
858 if sst is not None and ssn.closing and not sst.detached:
859 sst.detached = True
860 sst.write_op(SessionDetach(name=ssn.name))
861
863 return self._sessions[op.channel]
864
866 sst = self._sessions.pop(dtc.channel)
867 ssn = sst.session
868 del self._attachments[ssn]
869 ssn.closed = True
870
875
876 - def link(self, lnk, dir, addr):
877 sst = self._attachments.get(lnk.session)
878 _lnk = self._attachments.get(lnk)
879
880 if _lnk is None and not lnk.closed:
881 _lnk = Attachment(lnk)
882 _lnk.closing = False
883 dir.init_link(sst, lnk, _lnk)
884
885 err = self.parse_address(_lnk, dir, addr) or self.validate_options(_lnk, dir)
886 if err:
887 lnk.error = err
888 lnk.closed = True
889 return
890
891 def linked():
892 lnk.linked = True
893
894 def resolved(type, subtype):
895 dir.do_link(sst, lnk, _lnk, type, subtype, linked)
896
897 self.resolve_declare(sst, _lnk, dir.DIR_NAME, resolved)
898 self._attachments[lnk] = _lnk
899
900 if lnk.linked and lnk.closing and not lnk.closed:
901 if not _lnk.closing:
902 def unlinked():
903 dir.del_link(sst, lnk, _lnk)
904 del self._attachments[lnk]
905 lnk.closed = True
906 if _lnk.options.get("delete") in ("always", dir.DIR_NAME):
907 dir.do_unlink(sst, lnk, _lnk)
908 self.delete(sst, _lnk.name, unlinked)
909 else:
910 dir.do_unlink(sst, lnk, _lnk, unlinked)
911 _lnk.closing = True
912 elif not lnk.linked and lnk.closing and not lnk.closed:
913 if lnk.error: lnk.closed = True
914
928
930 ctx = Context()
931 err = dir.VALIDATOR.validate(lnk.options, ctx)
932 if err: return InvalidOption(text="error in options: %s" % err)
933
935 declare = lnk.options.get("create") in ("always", dir)
936 assrt = lnk.options.get("assert") in ("always", dir)
937 def do_resolved(type, subtype):
938 err = None
939 if type is None:
940 if declare:
941 err = self.declare(sst, lnk, action)
942 else:
943 err = NotFound(text="no such queue: %s" % lnk.name)
944 else:
945 if assrt:
946 expected = lnk.options.get("node", {}).get("type")
947 if expected and type != expected:
948 err = AssertionFailed(text="expected %s, got %s" % (expected, type))
949 if err is None:
950 action(type, subtype)
951
952 if err:
953 tgt = lnk.target
954 tgt.error = err
955 del self._attachments[tgt]
956 tgt.closed = True
957 return
958 self.resolve(sst, lnk.name, do_resolved, force=declare)
959
960 - def resolve(self, sst, name, action, force=False):
961 if not force:
962 try:
963 type, subtype = self.address_cache[name]
964 action(type, subtype)
965 return
966 except KeyError:
967 pass
968
969 args = []
970 def do_result(r):
971 args.append(r)
972 def do_action(r):
973 do_result(r)
974 er, qr = args
975 if er.not_found and not qr.queue:
976 type, subtype = None, None
977 elif qr.queue:
978 type, subtype = "queue", None
979 else:
980 type, subtype = "topic", er.type
981 if type is not None:
982 self.address_cache[name] = (type, subtype)
983 action(type, subtype)
984 sst.write_query(ExchangeQuery(name), do_result)
985 sst.write_query(QueueQuery(name), do_action)
986
987 - def declare(self, sst, lnk, action):
988 name = lnk.name
989 props = lnk.options.get("node", {})
990 durable = props.get("durable", DURABLE_DEFAULT)
991 type = props.get("type", "queue")
992 declare = props.get("x-declare", {})
993
994 if type == "topic":
995 cmd = ExchangeDeclare(exchange=name, durable=durable)
996 bindings = get_bindings(props, exchange=name)
997 elif type == "queue":
998 cmd = QueueDeclare(queue=name, durable=durable)
999 bindings = get_bindings(props, queue=name)
1000 else:
1001 raise ValueError(type)
1002
1003 sst.apply_overrides(cmd, declare)
1004
1005 if type == "topic":
1006 if cmd.type is None:
1007 cmd.type = "topic"
1008 subtype = cmd.type
1009 else:
1010 subtype = None
1011
1012 cmds = [cmd]
1013 cmds.extend(bindings)
1014
1015 def declared():
1016 self.address_cache[name] = (type, subtype)
1017 action(type, subtype)
1018
1019 sst.write_cmds(cmds, declared)
1020
1021 - def delete(self, sst, name, action):
1022 def deleted():
1023 del self.address_cache[name]
1024 action()
1025
1026 def do_delete(type, subtype):
1027 if type == "topic":
1028 sst.write_cmd(ExchangeDelete(name), deleted)
1029 elif type == "queue":
1030 sst.write_cmd(QueueDelete(name), deleted)
1031 elif type is None:
1032 action()
1033 else:
1034 raise ValueError(type)
1035 self.resolve(sst, name, do_delete, force=True)
1036
1038 if ssn.closed or ssn.closing: return
1039
1040 sst = self._attachments[ssn]
1041
1042 while sst.outgoing_idx < len(ssn.outgoing):
1043 msg = ssn.outgoing[sst.outgoing_idx]
1044 snd = msg._sender
1045
1046 _snd = self._attachments.get(snd)
1047 if _snd and snd.linked:
1048 self.send(snd, msg)
1049 sst.outgoing_idx += 1
1050 else:
1051 break
1052
1053 for snd in ssn.senders:
1054
1055 if snd.synced >= snd.queued and sst.need_sync:
1056 sst.write_cmd(ExecutionSync(), sync_noop)
1057
1058 for rcv in ssn.receivers:
1059 self.process_receiver(rcv)
1060
1061 if ssn.acked:
1062 messages = ssn.acked[sst.acked_idx:]
1063 if messages:
1064 ids = RangedSet()
1065
1066 disposed = [(DEFAULT_DISPOSITION, [])]
1067 acked = []
1068 for m in messages:
1069
1070
1071 if m._transfer_id is None:
1072 acked.append(m)
1073 continue
1074 ids.add(m._transfer_id)
1075 if m._receiver._accept_mode is accept_mode.explicit:
1076 disp = m._disposition or DEFAULT_DISPOSITION
1077 last, msgs = disposed[-1]
1078 if disp.type is last.type and disp.options == last.options:
1079 msgs.append(m)
1080 else:
1081 disposed.append((disp, [m]))
1082 else:
1083 acked.append(m)
1084
1085 for range in ids:
1086 sst.executed.add_range(range)
1087 sst.write_op(SessionCompleted(sst.executed))
1088
1089 def ack_acker(msgs):
1090 def ack_ack():
1091 for m in msgs:
1092 ssn.acked.remove(m)
1093 sst.acked_idx -= 1
1094
1095 if not ssn.transactional:
1096 sst.acked.remove(m)
1097 return ack_ack
1098
1099 for disp, msgs in disposed:
1100 if not msgs: continue
1101 if disp.type is None:
1102 op = MessageAccept
1103 elif disp.type is RELEASED:
1104 op = MessageRelease
1105 elif disp.type is REJECTED:
1106 op = MessageReject
1107 sst.write_cmd(op(RangedSet(*[m._transfer_id for m in msgs]),
1108 **disp.options),
1109 ack_acker(msgs))
1110 if log.isEnabledFor(DEBUG):
1111 for m in msgs:
1112 log.debug("SACK[%s]: %s, %s", ssn.log_id, m, m._disposition)
1113
1114 sst.acked.extend(messages)
1115 sst.acked_idx += len(messages)
1116 ack_acker(acked)()
1117
1118 if ssn.committing and not sst.committing:
1119 def commit_ok():
1120 del sst.acked[:]
1121 ssn.committing = False
1122 ssn.committed = True
1123 ssn.aborting = False
1124 ssn.aborted = False
1125 sst.committing = False
1126 sst.write_cmd(TxCommit(), commit_ok)
1127 sst.committing = True
1128
1129 if ssn.aborting and not sst.aborting:
1130 sst.aborting = True
1131 def do_rb():
1132 messages = sst.acked + ssn.unacked + ssn.incoming
1133 ids = RangedSet(*[m._transfer_id for m in messages])
1134 for range in ids:
1135 sst.executed.add_range(range)
1136 sst.write_op(SessionCompleted(sst.executed))
1137 sst.write_cmd(MessageRelease(ids, True))
1138 sst.write_cmd(TxRollback(), do_rb_ok)
1139
1140 def do_rb_ok():
1141 del ssn.incoming[:]
1142 del ssn.unacked[:]
1143 del sst.acked[:]
1144
1145 for rcv in ssn.receivers:
1146 rcv.impending = rcv.received
1147 rcv.returned = rcv.received
1148
1149
1150 for rcv in ssn.receivers:
1151 self.process_receiver(rcv)
1152
1153 ssn.aborting = False
1154 ssn.aborted = True
1155 ssn.committing = False
1156 ssn.committed = False
1157 sst.aborting = False
1158
1159 for rcv in ssn.receivers:
1160 _rcv = self._attachments[rcv]
1161 sst.write_cmd(MessageStop(_rcv.destination))
1162 sst.write_cmd(ExecutionSync(), do_rb)
1163
1165 sst = self._attachments[rcv.session]
1166 _rcv = self._attachments.get(rcv)
1167 if _rcv is None or not rcv.linked or _rcv.closing or _rcv.draining:
1168 return
1169
1170 if rcv.granted is UNLIMITED:
1171 if rcv.impending is UNLIMITED:
1172 delta = 0
1173 else:
1174 delta = UNLIMITED
1175 elif rcv.impending is UNLIMITED:
1176 delta = -1
1177 else:
1178 delta = max(rcv.granted, rcv.received) - rcv.impending
1179
1180 if delta is UNLIMITED:
1181 if not _rcv.bytes_open:
1182 sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.byte, UNLIMITED.value))
1183 _rcv.bytes_open = True
1184 sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.message, UNLIMITED.value))
1185 rcv.impending = UNLIMITED
1186 elif delta > 0:
1187 if not _rcv.bytes_open:
1188 sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.byte, UNLIMITED.value))
1189 _rcv.bytes_open = True
1190 sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.message, delta))
1191 rcv.impending += delta
1192 elif delta < 0 and not rcv.draining:
1193 _rcv.draining = True
1194 def do_stop():
1195 rcv.impending = rcv.received
1196 _rcv.draining = False
1197 _rcv.bytes_open = False
1198 self.grant(rcv)
1199 sst.write_cmd(MessageStop(_rcv.destination), do_stop)
1200
1201 if rcv.draining:
1202 _rcv.draining = True
1203 def do_flush():
1204 rcv.impending = rcv.received
1205 rcv.granted = rcv.impending
1206 _rcv.draining = False
1207 _rcv.bytes_open = False
1208 rcv.draining = False
1209 sst.write_cmd(MessageFlush(_rcv.destination), do_flush)
1210
1211
1213 if rcv.closed: return
1214 self.grant(rcv)
1215
1216 - def send(self, snd, msg):
1217 sst = self._attachments[snd.session]
1218 _snd = self._attachments[snd]
1219
1220 if msg.subject is None or _snd._exchange == "":
1221 rk = _snd._routing_key
1222 else:
1223 rk = msg.subject
1224
1225 if msg.subject is None:
1226 subject = _snd.subject
1227 else:
1228 subject = msg.subject
1229
1230
1231 if msg.reply_to:
1232 rt = addr2reply_to(msg.reply_to)
1233 else:
1234 rt = None
1235 content_encoding = msg.properties.get("x-amqp-0-10.content-encoding")
1236 dp = DeliveryProperties(routing_key=rk)
1237 mp = MessageProperties(message_id=msg.id,
1238 user_id=msg.user_id,
1239 reply_to=rt,
1240 correlation_id=msg.correlation_id,
1241 app_id = msg.properties.get("x-amqp-0-10.app-id"),
1242 content_type=msg.content_type,
1243 content_encoding=content_encoding,
1244 application_headers=msg.properties)
1245 if subject is not None:
1246 if mp.application_headers is None:
1247 mp.application_headers = {}
1248 mp.application_headers[SUBJECT] = subject
1249 if msg.durable is not None:
1250 if msg.durable:
1251 dp.delivery_mode = delivery_mode.persistent
1252 else:
1253 dp.delivery_mode = delivery_mode.non_persistent
1254 if msg.priority is not None:
1255 dp.priority = msg.priority
1256 if msg.ttl is not None:
1257 dp.ttl = long(msg.ttl*1000)
1258 enc, dec = get_codec(msg.content_type)
1259 body = enc(msg.content)
1260
1261
1262 def msg_acked():
1263
1264 snd.acked += 1
1265 m = snd.session.outgoing.pop(0)
1266 sst.outgoing_idx -= 1
1267 log.debug("RACK[%s]: %s", sst.session.log_id, msg)
1268 assert msg == m
1269
1270 xfr = MessageTransfer(destination=_snd._exchange, headers=(dp, mp),
1271 payload=body)
1272
1273 if _snd.pre_ack:
1274 sst.write_cmd(xfr)
1275 else:
1276 sst.write_cmd(xfr, msg_acked, sync=msg._sync)
1277
1278 log.debug("SENT[%s]: %s", sst.session.log_id, msg)
1279
1280 if _snd.pre_ack:
1281 msg_acked()
1282
1284 sst = self.get_sst(xfr)
1285 ssn = sst.session
1286
1287 msg = self._decode(xfr)
1288 rcv = sst.destinations[xfr.destination].target
1289 msg._receiver = rcv
1290 if rcv.impending is not UNLIMITED:
1291 assert rcv.received < rcv.impending, "%s, %s" % (rcv.received, rcv.impending)
1292 rcv.received += 1
1293 log.debug("RCVD[%s]: %s", ssn.log_id, msg)
1294 ssn.incoming.append(msg)
1295
1297 dp = EMPTY_DP
1298 mp = EMPTY_MP
1299
1300 for h in xfr.headers:
1301 if isinstance(h, DeliveryProperties):
1302 dp = h
1303 elif isinstance(h, MessageProperties):
1304 mp = h
1305
1306 ap = mp.application_headers
1307 enc, dec = get_codec(mp.content_type)
1308 content = dec(xfr.payload)
1309 msg = Message(content)
1310 msg.id = mp.message_id
1311 if ap is not None:
1312 msg.subject = ap.get(SUBJECT)
1313 msg.user_id = mp.user_id
1314 if mp.reply_to is not None:
1315 msg.reply_to = reply_to2addr(mp.reply_to)
1316 msg.correlation_id = mp.correlation_id
1317 if dp.delivery_mode is not None:
1318 msg.durable = dp.delivery_mode == delivery_mode.persistent
1319 msg.priority = dp.priority
1320 if dp.ttl is not None:
1321 msg.ttl = dp.ttl/1000.0
1322 msg.redelivered = dp.redelivered
1323 msg.properties = mp.application_headers or {}
1324 if mp.app_id is not None:
1325 msg.properties["x-amqp-0-10.app-id"] = mp.app_id
1326 if mp.content_encoding is not None:
1327 msg.properties["x-amqp-0-10.content-encoding"] = mp.content_encoding
1328 if dp.routing_key is not None:
1329 msg.properties["x-amqp-0-10.routing-key"] = dp.routing_key
1330 if dp.timestamp is not None:
1331 msg.properties["x-amqp-0-10.timestamp"] = dp.timestamp
1332 msg.content_type = mp.content_type
1333 msg._transfer_id = xfr.id
1334 return msg
1335