1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 """
21 A high level messaging API for python.
22
23 Areas that still need work:
24
25 - definition of the arguments for L{Session.sender} and L{Session.receiver}
26 - standard L{Message} properties
27 - L{Message} content encoding
28 - protocol negotiation/multiprotocol impl
29 """
30
31 from logging import getLogger
32 from math import ceil
33 from qpid.codec010 import StringCodec
34 from qpid.concurrency import synchronized, Waiter, Condition
35 from qpid.datatypes import Serial, uuid4
36 from qpid.messaging.constants import *
37 from qpid.messaging.exceptions import *
38 from qpid.messaging.message import *
39 from qpid.ops import PRIMITIVE
40 from qpid.util import default, URL
41 from threading import Thread, RLock
42
43 log = getLogger("qpid.messaging")
44
45 static = staticmethod
48 """
49 Base class for all endpoint objects types.
50 @undocumented: __init__, __setattr__
51 """
53 self._async_exception_notify_handler = None
54 self.error = None
55
56 - def _ecwait(self, predicate, timeout=None):
57 result = self._ewait(lambda: self.closed or predicate(), timeout)
58 self.check_closed()
59 return result
60
61 @synchronized
63 """
64 Register a callable that will be invoked when the driver thread detects an
65 error on the Endpoint. The callable is invoked with the instance of the
66 Endpoint object passed as the first argument. The second argument is an
67 Exception instance describing the failure.
68
69 @param handler: invoked by the driver thread when an error occurs.
70 @type handler: callable object taking an Endpoint and an Exception as
71 arguments.
72 @return: None
73 @note: The exception will also be raised the next time the application
74 invokes one of the blocking messaging APIs.
75 @warning: B{Use with caution} This callback is invoked in the context of
76 the driver thread. It is B{NOT} safe to call B{ANY} of the messaging APIs
77 from within this callback. This includes any of the Endpoint's methods. The
78 intent of the handler is to provide an efficient way to notify the
79 application that an exception has occurred in the driver thread. This can
80 be useful for those applications that periodically poll the messaging layer
81 for events. In this case the callback can be used to schedule a task that
82 retrieves the error using the Endpoint's get_error() or check_error()
83 methods.
84 """
85 self._async_exception_notify_handler = handler
86
88 """
89 Intercept any attempt to set the endpoint error flag and invoke the
90 callback if registered.
91 """
92 super(Endpoint, self).__setattr__(name, value)
93 if name == 'error' and value is not None:
94 if self._async_exception_notify_handler:
95 self._async_exception_notify_handler(self, value)
96
99
100 """
101 A Connection manages a group of L{Sessions<Session>} and connects
102 them with a remote endpoint.
103 """
104
105 @static
106 - def establish(url=None, timeout=None, **options):
107 """
108 Constructs a L{Connection} with the supplied parameters and opens
109 it.
110 """
111 conn = Connection(url, **options)
112 conn.open(timeout=timeout)
113 return conn
114
115 - def __init__(self, url=None, **options):
116 """
117 Creates a connection. A newly created connection must be opened
118 with the Connection.open() method before it can be used.
119
120 @type url: str
121 @param url: [ <username> [ / <password> ] @ ] <host> [ : <port> ]
122 @type host: str
123 @param host: the name or ip address of the remote host (overriden by url)
124 @type port: int
125 @param port: the port number of the remote host (overriden by url)
126 @type transport: str
127 @param transport: one of tcp, tcp+tls, or ssl (alias for tcp+tls)
128 @type heartbeat: int
129 @param heartbeat: heartbeat interval in seconds
130
131 @type username: str
132 @param username: the username for authentication (overriden by url)
133 @type password: str
134 @param password: the password for authentication (overriden by url)
135 @type sasl_mechanisms: str
136 @param sasl_mechanisms: space separated list of permitted sasl mechanisms
137 @type sasl_service: str
138 @param sasl_service: the service name if needed by the SASL mechanism in use
139 @type sasl_min_ssf: int
140 @param sasl_min_ssf: the minimum acceptable security strength factor
141 @type sasl_max_ssf: int
142 @param sasl_max_ssf: the maximum acceptable security strength factor
143
144 @type reconnect: bool
145 @param reconnect: enable/disable automatic reconnect
146 @type reconnect_timeout: float
147 @param reconnect_timeout: total time to attempt reconnect
148 @type reconnect_interval_min: float
149 @param reconnect_interval_min: minimum interval between reconnect attempts
150 @type reconnect_interval_max: float
151 @param reconnect_interval_max: maximum interval between reconnect attempts
152 @type reconnect_interval: float
153 @param reconnect_interval: set both min and max reconnect intervals
154 @type reconnect_limit: int
155 @param reconnect_limit: limit the total number of reconnect attempts
156 @type reconnect_urls: list[str]
157 @param reconnect_urls: list of backup hosts specified as urls
158
159 @type address_ttl: float
160 @param address_ttl: time until cached address resolution expires
161
162 @type ssl_keyfile: str
163 @param ssl_keyfile: file with client's private key (PEM format)
164 @type ssl_certfile: str
165 @param ssl_certfile: file with client's public (eventually priv+pub) key (PEM format)
166 @type ssl_trustfile: str
167 @param ssl_trustfile: file trusted certificates to validate the server
168 @type ssl_skip_hostname_check: bool
169 @param ssl_skip_hostname_check: disable verification of hostname in
170 certificate. Use with caution - disabling hostname checking leaves you
171 vulnerable to Man-in-the-Middle attacks.
172
173 @rtype: Connection
174 @return: a disconnected Connection
175 """
176 super(Connection, self).__init__()
177
178 opt_keys = ['host', 'transport', 'port', 'heartbeat', 'username', 'password', 'sasl_mechanisms', 'sasl_service', 'sasl_min_ssf', 'sasl_max_ssf', 'reconnect', 'reconnect_timeout', 'reconnect_interval', 'reconnect_interval_min', 'reconnect_interval_max', 'reconnect_limit', 'reconnect_urls', 'reconnect_log', 'address_ttl', 'tcp_nodelay', 'ssl_keyfile', 'ssl_certfile', 'ssl_trustfile', 'ssl_skip_hostname_check', 'client_properties', 'protocol' ]
179
180 for key in opt_keys:
181 setattr(self, key, None)
182
183 for (key, value) in options.iteritems():
184 if key in opt_keys:
185 setattr(self, key, value)
186 else:
187 raise ConnectionError("Unknown connection option %s with value %s" %(key, value))
188
189
190 if self.host:
191 url = default(url, self.host)
192 if isinstance(url, basestring):
193 url = URL(url)
194 self.host = url.host
195
196 if self.transport is None:
197 if url.scheme == url.AMQP:
198 self.transport = "tcp"
199 elif url.scheme == url.AMQPS:
200 self.transport = "ssl"
201 else:
202 self.transport = "tcp"
203 if self.transport in ("ssl", "tcp+tls"):
204 self.port = default(url.port, default(self.port, AMQPS_PORT))
205 else:
206 self.port = default(url.port, default(self.port, AMQP_PORT))
207
208 if self.protocol and self.protocol != "amqp0-10":
209 raise ConnectionError("Connection option 'protocol' value '" + value + "' unsupported (must be amqp0-10)")
210
211 self.username = default(url.user, self.username)
212 self.password = default(url.password, self.password)
213 self.auth_username = None
214 self.sasl_service = default(self.sasl_service, "qpidd")
215
216 self.reconnect = default(self.reconnect, False)
217 self.reconnect_interval_min = default(self.reconnect_interval_min,
218 default(self.reconnect_interval, 1))
219 self.reconnect_interval_max = default(self.reconnect_interval_max,
220 default(self.reconnect_interval, 2*60))
221 self.reconnect_urls = default(self.reconnect_urls, [])
222 self.reconnect_log = default(self.reconnect_log, True)
223
224 self.address_ttl = default(self.address_ttl, 60)
225 self.tcp_nodelay = default(self.tcp_nodelay, False)
226
227 self.ssl_keyfile = default(self.ssl_keyfile, None)
228 self.ssl_certfile = default(self.ssl_certfile, None)
229 self.ssl_trustfile = default(self.ssl_trustfile, None)
230
231 self._ssl_skip_hostname_check_actual = options.get("ssl_skip_hostname_check")
232 self.ssl_skip_hostname_check = default(self.ssl_skip_hostname_check, False)
233 self.client_properties = default(self.client_properties, {})
234
235 self.options = options
236
237
238 self.id = str(uuid4())
239 self.session_counter = 0
240 self.sessions = {}
241 self._open = False
242 self._connected = False
243 self._transport_connected = False
244 self._lock = RLock()
245 self._condition = Condition(self._lock)
246 self._waiter = Waiter(self._condition)
247 self._modcount = Serial(0)
248 from driver import Driver
249 self._driver = Driver(self)
250
251 - def _wait(self, predicate, timeout=None):
253
255 self._modcount += 1
256 self._driver.wakeup()
257
259 if self.error:
260 self._condition.gc()
261 e = self.error
262 if isinstance(e, ContentError):
263 """ forget the content error. It will be
264 raised this time but won't block future calls
265 """
266 self.error = None
267 raise e
268
271
272 - def _ewait(self, predicate, timeout=None):
273 result = self._wait(lambda: self.error or predicate(), timeout)
274 self.check_error()
275 return result
276
278 if not self._connected:
279 self._condition.gc()
280 raise ConnectionClosed()
281
282 @synchronized
283 - def session(self, name=None, transactional=False):
284 """
285 Creates or retrieves the named session. If the name is omitted or
286 None, then a unique name is chosen based on a randomly generated
287 uuid.
288
289 @type name: str
290 @param name: the session name
291 @rtype: Session
292 @return: the named Session
293 """
294
295 if name is None:
296 name = "%s:%s" % (self.id, self.session_counter)
297 self.session_counter += 1
298 else:
299 name = "%s:%s" % (self.id, name)
300
301 if self.sessions.has_key(name):
302 return self.sessions[name]
303 else:
304 ssn = Session(self, name, transactional)
305 self.sessions[name] = ssn
306 self._wakeup()
307 return ssn
308
309 @synchronized
311 self.sessions.pop(ssn.name, 0)
312
313 @synchronized
314 - def open(self, timeout=None):
315 """
316 Opens a connection.
317 """
318 if self._open:
319 raise ConnectionError("already open")
320 self._open = True
321 if self.reconnect and self.reconnect_timeout > 0:
322 timeout = self.reconnect_timeout
323 self.attach(timeout=timeout)
324
325 @synchronized
327 """
328 Return true if the connection is open, false otherwise.
329 """
330 return self._open
331
332 @synchronized
333 - def attach(self, timeout=None):
334 """
335 Attach to the remote endpoint.
336 """
337 if not self._connected:
338 self._connected = True
339 self._driver.start()
340 self._wakeup()
341 if not self._ewait(lambda: self._transport_connected and not self._unlinked(), timeout=timeout):
342 self.reconnect = False
343 raise Timeout("Connection attach timed out")
344
346 return [l
347 for ssn in self.sessions.values()
348 if not (ssn.error or ssn.closed)
349 for l in ssn.senders + ssn.receivers
350 if not (l.linked or l.error or l.closed)]
351
352 @synchronized
353 - def detach(self, timeout=None):
354 """
355 Detach from the remote endpoint.
356 """
357 if self._connected:
358 self._connected = False
359 self._wakeup()
360 cleanup = True
361 else:
362 cleanup = False
363 try:
364 if not self._wait(lambda: not self._transport_connected, timeout=timeout):
365 raise Timeout("detach timed out")
366 finally:
367 if cleanup:
368 self._driver.stop()
369 self._condition.gc()
370
371 @synchronized
373 """
374 Return true if the connection is attached, false otherwise.
375 """
376 return self._connected
377
378 @synchronized
379 - def close(self, timeout=None):
380 """
381 Close the connection and all sessions.
382 """
383 try:
384 for ssn in self.sessions.values():
385 ssn.close(timeout=timeout)
386 finally:
387 self.detach(timeout=timeout)
388 self._open = False
389
392
393 """
394 Sessions provide a linear context for sending and receiving
395 L{Messages<Message>}. L{Messages<Message>} are sent and received
396 using the L{Sender.send} and L{Receiver.fetch} methods of the
397 L{Sender} and L{Receiver} objects associated with a Session.
398
399 Each L{Sender} and L{Receiver} is created by supplying either a
400 target or source address to the L{sender} and L{receiver} methods of
401 the Session. The address is supplied via a string syntax documented
402 below.
403
404 Addresses
405 =========
406
407 An address identifies a source or target for messages. In its
408 simplest form this is just a name. In general a target address may
409 also be used as a source address, however not all source addresses
410 may be used as a target, e.g. a source might additionally have some
411 filtering criteria that would not be present in a target.
412
413 A subject may optionally be specified along with the name. When an
414 address is used as a target, any subject specified in the address is
415 used as the default subject of outgoing messages for that target.
416 When an address is used as a source, any subject specified in the
417 address is pattern matched against the subject of available messages
418 as a filter for incoming messages from that source.
419
420 The options map contains additional information about the address
421 including:
422
423 - policies for automatically creating, and deleting the node to
424 which an address refers
425
426 - policies for asserting facts about the node to which an address
427 refers
428
429 - extension points that can be used for sender/receiver
430 configuration
431
432 Mapping to AMQP 0-10
433 --------------------
434 The name is resolved to either an exchange or a queue by querying
435 the broker.
436
437 The subject is set as a property on the message. Additionally, if
438 the name refers to an exchange, the routing key is set to the
439 subject.
440
441 Syntax
442 ------
443 The following regular expressions define the tokens used to parse
444 addresses::
445 LBRACE: \\{
446 RBRACE: \\}
447 LBRACK: \\[
448 RBRACK: \\]
449 COLON: :
450 SEMI: ;
451 SLASH: /
452 COMMA: ,
453 NUMBER: [+-]?[0-9]*\\.?[0-9]+
454 ID: [a-zA-Z_](?:[a-zA-Z0-9_-]*[a-zA-Z0-9_])?
455 STRING: "(?:[^\\\\"]|\\\\.)*"|\'(?:[^\\\\\']|\\\\.)*\'
456 ESC: \\\\[^ux]|\\\\x[0-9a-fA-F][0-9a-fA-F]|\\\\u[0-9a-fA-F][0-9a-fA-F][0-9a-fA-F][0-9a-fA-F]
457 SYM: [.#*%@$^!+-]
458 WSPACE: [ \\n\\r\\t]+
459
460 The formal grammar for addresses is given below::
461 address = name [ "/" subject ] [ ";" options ]
462 name = ( part | quoted )+
463 subject = ( part | quoted | "/" )*
464 quoted = STRING / ESC
465 part = LBRACE / RBRACE / COLON / COMMA / NUMBER / ID / SYM
466 options = map
467 map = "{" ( keyval ( "," keyval )* )? "}"
468 keyval = ID ":" value
469 value = NUMBER / STRING / ID / map / list
470 list = "[" ( value ( "," value )* )? "]"
471
472 This grammar resuls in the following informal syntax::
473
474 <name> [ / <subject> ] [ ; <options> ]
475
476 Where options is::
477
478 { <key> : <value>, ... }
479
480 And values may be:
481 - numbers
482 - single, double, or non quoted strings
483 - maps (dictionaries)
484 - lists
485
486 Options
487 -------
488 The options map permits the following parameters::
489
490 <name> [ / <subject> ] ; {
491 create: always | sender | receiver | never,
492 delete: always | sender | receiver | never,
493 assert: always | sender | receiver | never,
494 mode: browse | consume,
495 node: {
496 type: queue | topic,
497 durable: True | False,
498 x-declare: { ... <declare-overrides> ... },
499 x-bindings: [<binding_1>, ... <binding_n>]
500 },
501 link: {
502 name: <link-name>,
503 durable: True | False,
504 reliability: unreliable | at-most-once | at-least-once | exactly-once,
505 x-declare: { ... <declare-overrides> ... },
506 x-bindings: [<binding_1>, ... <binding_n>],
507 x-subscribe: { ... <subscribe-overrides> ... }
508 }
509 }
510
511 Bindings are specified as a map with the following options::
512
513 {
514 exchange: <exchange>,
515 queue: <queue>,
516 key: <key>,
517 arguments: <arguments>
518 }
519
520 The create, delete, and assert policies specify who should perfom
521 the associated action:
522
523 - I{always}: the action will always be performed
524 - I{sender}: the action will only be performed by the sender
525 - I{receiver}: the action will only be performed by the receiver
526 - I{never}: the action will never be performed (this is the default)
527
528 The node-type is one of:
529
530 - I{topic}: a topic node will default to the topic exchange,
531 x-declare may be used to specify other exchange types
532 - I{queue}: this is the default node-type
533
534 The x-declare map permits protocol specific keys and values to be
535 specified when exchanges or queues are declared. These keys and
536 values are passed through when creating a node or asserting facts
537 about an existing node.
538
539 Examples
540 --------
541 A simple name resolves to any named node, usually a queue or a
542 topic::
543
544 my-queue-or-topic
545
546 A simple name with a subject will also resolve to a node, but the
547 presence of the subject will cause a sender using this address to
548 set the subject on outgoing messages, and receivers to filter based
549 on the subject::
550
551 my-queue-or-topic/my-subject
552
553 A subject pattern can be used and will cause filtering if used by
554 the receiver. If used for a sender, the literal value gets set as
555 the subject::
556
557 my-queue-or-topic/my-*
558
559 In all the above cases, the address is resolved to an existing node.
560 If you want the node to be auto-created, then you can do the
561 following. By default nonexistent nodes are assumed to be queues::
562
563 my-queue; {create: always}
564
565 You can customize the properties of the queue::
566
567 my-queue; {create: always, node: {durable: True}}
568
569 You can create a topic instead if you want::
570
571 my-queue; {create: always, node: {type: topic}}
572
573 You can assert that the address resolves to a node with particular
574 properties::
575
576 my-transient-topic; {
577 assert: always,
578 node: {
579 type: topic,
580 durable: False
581 }
582 }
583 """
584
585 - def __init__(self, connection, name, transactional):
586 super(Session, self).__init__()
587 self.connection = connection
588 self.name = name
589 self.log_id = "%x" % id(self)
590
591 self.transactional = transactional
592
593 self.committing = False
594 self.committed = True
595 self.aborting = False
596 self.aborted = False
597
598 self.next_sender_id = 0
599 self.senders = []
600 self.next_receiver_id = 0
601 self.receivers = []
602 self.outgoing = []
603 self.incoming = []
604 self.unacked = []
605 self.acked = []
606
607 self.ack_capacity = UNLIMITED
608
609 self.closing = False
610 self.closed = False
611
612 self._lock = connection._lock
613 self._msg_received_notify_handler = None
614
616 return "<Session %s>" % self.name
617
618 - def _wait(self, predicate, timeout=None):
620
622 self.connection._wakeup()
623
625 self.connection.check_error()
626 if self.error:
627 raise self.error
628
630 err = self.connection.get_error()
631 if err:
632 return err
633 else:
634 return self.error
635
636 - def _ewait(self, predicate, timeout=None):
637 result = self.connection._ewait(lambda: self.error or predicate(), timeout)
638 self.check_error()
639 return result
640
644
646 self.incoming.append(msg)
647 if self._msg_received_notify_handler:
648 try:
649
650 self._msg_received_notify_handler(self)
651 except TypeError:
652
653 self._msg_received_notify_handler()
654
655 @synchronized
656 - def sender(self, target, **options):
657 """
658 Creates a L{Sender} that may be used to send L{Messages<Message>}
659 to the specified target.
660
661 @type target: str
662 @param target: the target to which messages will be sent
663 @rtype: Sender
664 @return: a new Sender for the specified target
665 """
666 target = _mangle(target)
667 sender = Sender(self, self.next_sender_id, target, options)
668 self.next_sender_id += 1
669 self.senders.append(sender)
670 if not self.closed and self.connection._connected:
671 self._wakeup()
672 try:
673 sender._ewait(lambda: sender.linked)
674 except LinkError, e:
675 sender.close()
676 raise e
677 return sender
678
679 @synchronized
681 """
682 Creates a receiver that may be used to fetch L{Messages<Message>}
683 from the specified source.
684
685 @type source: str
686 @param source: the source of L{Messages<Message>}
687 @rtype: Receiver
688 @return: a new Receiver for the specified source
689 """
690 source = _mangle(source)
691 receiver = Receiver(self, self.next_receiver_id, source, options)
692 self.next_receiver_id += 1
693 self.receivers.append(receiver)
694 if not self.closed and self.connection._connected:
695 self._wakeup()
696 try:
697 receiver._ewait(lambda: receiver.linked)
698 except LinkError, e:
699 receiver.close()
700 raise e
701 return receiver
702
703 @synchronized
705 result = 0
706 for msg in self.incoming:
707 if predicate(msg):
708 result += 1
709 return result
710
711 - def _peek(self, receiver):
712 for msg in self.incoming:
713 if msg._receiver == receiver:
714 return msg
715
716 - def _pop(self, receiver):
717 i = 0
718 while i < len(self.incoming):
719 msg = self.incoming[i]
720 if msg._receiver == receiver:
721 del self.incoming[i]
722 return msg
723 else:
724 i += 1
725
726 @synchronized
727 - def _get(self, receiver, timeout=None):
728 if self._ewait(lambda: ((self._peek(receiver) is not None) or
729 self.closing or receiver.closed),
730 timeout):
731 msg = self._pop(receiver)
732 if msg is not None:
733 msg._receiver.returned += 1
734 self.unacked.append(msg)
735 log.debug("RETR[%s]: %s", self.log_id, msg)
736 return msg
737 return None
738
739 @synchronized
741 """
742 Register a callable that will be invoked when a Message arrives on the
743 Session.
744
745 @param handler: invoked by the driver thread when an error occurs.
746 @type handler: a callable object taking a Session instance as its only
747 argument
748 @return: None
749
750 @note: When using this method it is recommended to also register
751 asynchronous error callbacks on all endpoint objects. Doing so will cause
752 the application to be notified if an error is raised by the driver
753 thread. This is necessary as after a driver error occurs the message received
754 callback may never be invoked again. See
755 L{Endpoint.set_async_exception_notify_handler}
756
757 @warning: B{Use with caution} This callback is invoked in the context of
758 the driver thread. It is B{NOT} safe to call B{ANY} of the public
759 messaging APIs from within this callback, including any of the passed
760 Session's methods. The intent of the handler is to provide an efficient
761 way to notify the application that a message has arrived. This can be
762 useful for those applications that need to schedule a task to poll for
763 received messages without blocking in the messaging API. The scheduled
764 task may then retrieve the message using L{next_receiver} and
765 L{Receiver.fetch}
766 """
767 self._msg_received_notify_handler = handler
768
769 @synchronized
771 """@deprecated: Use L{set_message_received_notify_handler} instead.
772 """
773 self._msg_received_notify_handler = handler
774
775 @synchronized
777 if self._ecwait(lambda: self.incoming, timeout):
778 return self.incoming[0]._receiver
779 else:
780 raise Empty
781
782 @synchronized
783 - def acknowledge(self, message=None, disposition=None, sync=True):
784 """
785 Acknowledge the given L{Message}. If message is None, then all
786 unacknowledged messages on the session are acknowledged.
787
788 @type message: Message
789 @param message: the message to acknowledge or None
790 @type sync: boolean
791 @param sync: if true then block until the message(s) are acknowledged
792 """
793 if message is None:
794 messages = self.unacked[:]
795 else:
796 messages = [message]
797
798 for m in messages:
799 if self.ack_capacity is not UNLIMITED:
800 if self.ack_capacity <= 0:
801
802 raise InsufficientCapacity("ack_capacity = %s" % self.ack_capacity)
803 self._wakeup()
804 self._ecwait(lambda: len(self.acked) < self.ack_capacity)
805 m._disposition = disposition
806 self.unacked.remove(m)
807 self.acked.append(m)
808
809 self._wakeup()
810 if sync:
811 self._ecwait(lambda: not [m for m in messages if m in self.acked])
812
813 @synchronized
814 - def commit(self, timeout=None):
815 """
816 Commit outstanding transactional work. This consists of all
817 message sends and receives since the prior commit or rollback.
818 """
819 if not self.transactional:
820 raise NontransactionalSession()
821 self.committing = True
822 self._wakeup()
823 try:
824 if not self._ecwait(lambda: not self.committing, timeout=timeout):
825 raise Timeout("commit timed out")
826 except TransactionError:
827 raise
828 except Exception, e:
829 self.error = TransactionAborted(text="Transaction aborted: %s"%e)
830 raise self.error
831 if self.aborted:
832 raise TransactionAborted()
833 assert self.committed
834
835 @synchronized
837 """
838 Rollback outstanding transactional work. This consists of all
839 message sends and receives since the prior commit or rollback.
840 """
841 if not self.transactional:
842 raise NontransactionalSession()
843 self.aborting = True
844 self._wakeup()
845 if not self._ecwait(lambda: not self.aborting, timeout=timeout):
846 raise Timeout("rollback timed out")
847 assert self.aborted
848
849 @synchronized
850 - def sync(self, timeout=None):
851 """
852 Sync the session.
853 """
854 for snd in self.senders:
855 snd.sync(timeout=timeout)
856 if not self._ewait(lambda: not self.outgoing and not self.acked, timeout=timeout):
857 raise Timeout("session sync timed out")
858
859 @synchronized
860 - def close(self, timeout=None):
861 """
862 Close the session.
863 """
864 if self.error: return
865 self.sync(timeout=timeout)
866
867 for link in self.receivers + self.senders:
868 link.close(timeout=timeout)
869
870 if not self.closing:
871 self.closing = True
872 self._wakeup()
873
874 try:
875 if not self._ewait(lambda: self.closed, timeout=timeout):
876 raise Timeout("session close timed out")
877 finally:
878 self.connection._remove_session(self)
879
882
884 if addr and addr.startswith("#"):
885 return MangledString(str(uuid4()) + addr)
886 else:
887 return addr
888
890
891 """
892 Sends outgoing messages.
893 """
894
895 - def __init__(self, session, id, target, options):
896 super(Sender, self).__init__()
897 self.session = session
898 self.id = id
899 self.target = target
900 self.options = options
901 self.capacity = options.get("capacity", UNLIMITED)
902 self.threshold = 0.5
903 self.durable = options.get("durable")
904 self.queued = Serial(0)
905 self.synced = Serial(0)
906 self.acked = Serial(0)
907 self.linked = False
908 self.closing = False
909 self.closed = False
910 self._lock = self.session._lock
911
914
919
921 err = self.session.get_error()
922 if err:
923 return err
924 else:
925 return self.error
926
927 - def _ewait(self, predicate, timeout=None):
931
935
936 @synchronized
938 """
939 Returns the number of messages awaiting acknowledgment.
940 @rtype: int
941 @return: the number of unacknowledged messages
942 """
943 return self.queued - self.acked
944
945 @synchronized
951
952 @synchronized
953 - def send(self, object, sync=True, timeout=None):
954 """
955 Send a message. If the object passed in is of type L{unicode},
956 L{str}, L{list}, or L{dict}, it will automatically be wrapped in a
957 L{Message} and sent. If it is of type L{Message}, it will be sent
958 directly. If the sender capacity is not L{UNLIMITED} then send
959 will block until there is available capacity to send the message.
960 If the timeout parameter is specified, then send will throw an
961 L{InsufficientCapacity} exception if capacity does not become
962 available within the specified time.
963
964 @type object: unicode, str, list, dict, Message
965 @param object: the message or content to send
966
967 @type sync: boolean
968 @param sync: if true then block until the message is sent
969
970 @type timeout: float
971 @param timeout: the time to wait for available capacity
972 """
973
974 if not self.session.connection._connected or self.session.closing:
975 raise Detached()
976
977 self._ecwait(lambda: self.linked, timeout=timeout)
978
979 if isinstance(object, Message):
980 message = object
981 else:
982 message = Message(object)
983
984 if message.durable is None:
985 message.durable = self.durable
986
987 if self.capacity is not UNLIMITED:
988 if self.capacity <= 0:
989 raise InsufficientCapacity("capacity = %s" % self.capacity)
990 if not self._ecwait(self.available, timeout=timeout):
991 raise InsufficientCapacity("capacity = %s" % self.capacity)
992
993
994 message._sender = self
995 if self.capacity is not UNLIMITED:
996 message._sync = sync or self.available() <= int(ceil(self.threshold*self.capacity))
997 else:
998 message._sync = sync
999 self.session.outgoing.append(message)
1000 self.queued += 1
1001
1002 if sync:
1003 self.sync(timeout=timeout)
1004 assert message not in self.session.outgoing
1005 else:
1006 self._wakeup()
1007
1008 @synchronized
1009 - def sync(self, timeout=None):
1010 mno = self.queued
1011 if self.synced < mno:
1012 self.synced = mno
1013 self._wakeup()
1014 try:
1015 if not self._ewait(lambda: self.acked >= mno, timeout=timeout):
1016 raise Timeout("sender sync timed out")
1017 except ContentError:
1018
1019 self.acked = mno
1020 self.session.outgoing.pop(0)
1021 raise
1022
1023 @synchronized
1024 - def close(self, timeout=None):
1025 """
1026 Close the Sender.
1027 """
1028
1029
1030 if self.acked < self.queued:
1031 self.sync(timeout=timeout)
1032
1033 if not self.closing:
1034 self.closing = True
1035 self._wakeup()
1036
1037 try:
1038 if not self.session._ewait(lambda: self.closed, timeout=timeout):
1039 raise Timeout("sender close timed out")
1040 finally:
1041 try:
1042 self.session.senders.remove(self)
1043 except ValueError:
1044 pass
1045
1048
1049 """
1050 Receives incoming messages from a remote source. Messages may be
1051 fetched with L{fetch}.
1052 """
1053
1054 - def __init__(self, session, id, source, options):
1055 super(Receiver, self).__init__()
1056 self.session = session
1057 self.id = id
1058 self.source = source
1059 self.options = options
1060
1061 self.granted = Serial(0)
1062 self.draining = False
1063 self.impending = Serial(0)
1064 self.received = Serial(0)
1065 self.returned = Serial(0)
1066
1067 self.linked = False
1068 self.closing = False
1069 self.closed = False
1070 self._lock = self.session._lock
1071 self._capacity = 0
1072 self._set_capacity(options.get("capacity", 0), False)
1073 self.threshold = 0.5
1074
1075 @synchronized
1077 if c is UNLIMITED:
1078 self._capacity = c.value
1079 else:
1080 self._capacity = c
1081 self._grant()
1082 if wakeup:
1083 self._wakeup()
1084
1086 if self._capacity == UNLIMITED.value:
1087 return UNLIMITED
1088 else:
1089 return self._capacity
1090
1091 capacity = property(_get_capacity, _set_capacity)
1092
1095
1100
1102 err = self.session.get_error()
1103 if err:
1104 return err
1105 else:
1106 return self.error
1107
1108 - def _ewait(self, predicate, timeout=None):
1112
1116
1117 @synchronized
1119 """
1120 Returns the number of acknowledged messages awaiting confirmation.
1121 """
1122 return len([m for m in self.session.acked if m._receiver is self])
1123
1124 @synchronized
1126 """
1127 Returns the number of messages available to be fetched by the
1128 application.
1129
1130 @rtype: int
1131 @return: the number of available messages
1132 """
1133 return self.received - self.returned
1134
1135 @synchronized
1136 - def fetch(self, timeout=None):
1137 """
1138 Fetch and return a single message. A timeout of None will block
1139 forever waiting for a message to arrive, a timeout of zero will
1140 return immediately if no messages are available.
1141
1142 @type timeout: float
1143 @param timeout: the time to wait for a message to be available
1144 """
1145
1146 self._ecwait(lambda: self.linked)
1147
1148 if self._capacity == 0:
1149 self.granted = self.returned + 1
1150 self._wakeup()
1151 self._ecwait(lambda: self.impending >= self.granted)
1152 msg = self.session._get(self, timeout=timeout)
1153 if msg is None:
1154 self.check_closed()
1155 self.draining = True
1156 self._wakeup()
1157 self._ecwait(lambda: not self.draining)
1158 msg = self.session._get(self, timeout=0)
1159 self._grant()
1160 self._wakeup()
1161 if msg is None:
1162 raise Empty()
1163 elif self._capacity not in (0, UNLIMITED.value):
1164 t = int(ceil(self.threshold * self._capacity))
1165 if self.received - self.returned <= t:
1166 self.granted = self.returned + self._capacity
1167 self._wakeup()
1168 return msg
1169
1171 if self._capacity == UNLIMITED.value:
1172 self.granted = UNLIMITED
1173 else:
1174 self.granted = self.returned + self._capacity
1175
1176 @synchronized
1177 - def close(self, timeout=None):
1178 """
1179 Close the receiver.
1180 """
1181 if not self.closing:
1182 self.closing = True
1183 self._wakeup()
1184
1185 try:
1186 if not self.session._ewait(lambda: self.closed, timeout=timeout):
1187 raise Timeout("receiver close timed out")
1188 finally:
1189 try:
1190 self.session.receivers.remove(self)
1191 except ValueError:
1192 pass
1193
1194
1195 __all__ = ["Connection", "Endpoint", "Session", "Sender", "Receiver"]
1196