Package qpid :: Package messaging :: Module endpoints
[frames] | no frames]

Source Code for Module qpid.messaging.endpoints

   1  # 
   2  # Licensed to the Apache Software Foundation (ASF) under one 
   3  # or more contributor license agreements.  See the NOTICE file 
   4  # distributed with this work for additional information 
   5  # regarding copyright ownership.  The ASF licenses this file 
   6  # to you under the Apache License, Version 2.0 (the 
   7  # "License"); you may not use this file except in compliance 
   8  # with the License.  You may obtain a copy of the License at 
   9  # 
  10  #   http://www.apache.org/licenses/LICENSE-2.0 
  11  # 
  12  # Unless required by applicable law or agreed to in writing, 
  13  # software distributed under the License is distributed on an 
  14  # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
  15  # KIND, either express or implied.  See the License for the 
  16  # specific language governing permissions and limitations 
  17  # under the License. 
  18  # 
  19   
  20  """ 
  21  A high level messaging API for python. 
  22   
  23  Areas that still need work: 
  24   
  25    - definition of the arguments for L{Session.sender} and L{Session.receiver} 
  26    - standard L{Message} properties 
  27    - L{Message} content encoding 
  28    - protocol negotiation/multiprotocol impl 
  29  """ 
  30   
  31  from logging import getLogger 
  32  from math import ceil 
  33  from qpid.codec010 import StringCodec 
  34  from qpid.concurrency import synchronized, Waiter, Condition 
  35  from qpid.datatypes import Serial, uuid4 
  36  from qpid.messaging.constants import * 
  37  from qpid.messaging.exceptions import * 
  38  from qpid.messaging.message import * 
  39  from qpid.ops import PRIMITIVE 
  40  from qpid.util import default, URL 
  41  from threading import Thread, RLock 
  42   
  43  log = getLogger("qpid.messaging") 
  44   
  45  static = staticmethod 
46 47 -class Endpoint(object):
48 """ 49 Base class for all endpoint objects types. 50 @undocumented: __init__, __setattr__ 51 """
52 - def __init__(self):
53 self._async_exception_notify_handler = None 54 self.error = None
55
56 - def _ecwait(self, predicate, timeout=None):
57 result = self._ewait(lambda: self.closed or predicate(), timeout) 58 self.check_closed() 59 return result
60 61 @synchronized
62 - def set_async_exception_notify_handler(self, handler):
63 """ 64 Register a callable that will be invoked when the driver thread detects an 65 error on the Endpoint. The callable is invoked with the instance of the 66 Endpoint object passed as the first argument. The second argument is an 67 Exception instance describing the failure. 68 69 @param handler: invoked by the driver thread when an error occurs. 70 @type handler: callable object taking an Endpoint and an Exception as 71 arguments. 72 @return: None 73 @note: The exception will also be raised the next time the application 74 invokes one of the blocking messaging APIs. 75 @warning: B{Use with caution} This callback is invoked in the context of 76 the driver thread. It is B{NOT} safe to call B{ANY} of the messaging APIs 77 from within this callback. This includes any of the Endpoint's methods. The 78 intent of the handler is to provide an efficient way to notify the 79 application that an exception has occurred in the driver thread. This can 80 be useful for those applications that periodically poll the messaging layer 81 for events. In this case the callback can be used to schedule a task that 82 retrieves the error using the Endpoint's get_error() or check_error() 83 methods. 84 """ 85 self._async_exception_notify_handler = handler
86
87 - def __setattr__(self, name, value):
88 """ 89 Intercept any attempt to set the endpoint error flag and invoke the 90 callback if registered. 91 """ 92 super(Endpoint, self).__setattr__(name, value) 93 if name == 'error' and value is not None: 94 if self._async_exception_notify_handler: 95 self._async_exception_notify_handler(self, value)
96
97 98 -class Connection(Endpoint):
99 100 """ 101 A Connection manages a group of L{Sessions<Session>} and connects 102 them with a remote endpoint. 103 """ 104 105 @static
106 - def establish(url=None, timeout=None, **options):
107 """ 108 Constructs a L{Connection} with the supplied parameters and opens 109 it. 110 """ 111 conn = Connection(url, **options) 112 conn.open(timeout=timeout) 113 return conn
114
115 - def __init__(self, url=None, **options):
116 """ 117 Creates a connection. A newly created connection must be opened 118 with the Connection.open() method before it can be used. 119 120 @type url: str 121 @param url: [ <username> [ / <password> ] @ ] <host> [ : <port> ] 122 @type host: str 123 @param host: the name or ip address of the remote host (overriden by url) 124 @type port: int 125 @param port: the port number of the remote host (overriden by url) 126 @type transport: str 127 @param transport: one of tcp, tcp+tls, or ssl (alias for tcp+tls) 128 @type heartbeat: int 129 @param heartbeat: heartbeat interval in seconds 130 131 @type username: str 132 @param username: the username for authentication (overriden by url) 133 @type password: str 134 @param password: the password for authentication (overriden by url) 135 @type sasl_mechanisms: str 136 @param sasl_mechanisms: space separated list of permitted sasl mechanisms 137 @type sasl_service: str 138 @param sasl_service: the service name if needed by the SASL mechanism in use 139 @type sasl_min_ssf: int 140 @param sasl_min_ssf: the minimum acceptable security strength factor 141 @type sasl_max_ssf: int 142 @param sasl_max_ssf: the maximum acceptable security strength factor 143 144 @type reconnect: bool 145 @param reconnect: enable/disable automatic reconnect 146 @type reconnect_timeout: float 147 @param reconnect_timeout: total time to attempt reconnect 148 @type reconnect_interval_min: float 149 @param reconnect_interval_min: minimum interval between reconnect attempts 150 @type reconnect_interval_max: float 151 @param reconnect_interval_max: maximum interval between reconnect attempts 152 @type reconnect_interval: float 153 @param reconnect_interval: set both min and max reconnect intervals 154 @type reconnect_limit: int 155 @param reconnect_limit: limit the total number of reconnect attempts 156 @type reconnect_urls: list[str] 157 @param reconnect_urls: list of backup hosts specified as urls 158 159 @type address_ttl: float 160 @param address_ttl: time until cached address resolution expires 161 162 @type ssl_keyfile: str 163 @param ssl_keyfile: file with client's private key (PEM format) 164 @type ssl_certfile: str 165 @param ssl_certfile: file with client's public (eventually priv+pub) key (PEM format) 166 @type ssl_trustfile: str 167 @param ssl_trustfile: file trusted certificates to validate the server 168 @type ssl_skip_hostname_check: bool 169 @param ssl_skip_hostname_check: disable verification of hostname in 170 certificate. Use with caution - disabling hostname checking leaves you 171 vulnerable to Man-in-the-Middle attacks. 172 173 @rtype: Connection 174 @return: a disconnected Connection 175 """ 176 super(Connection, self).__init__() 177 # List of all attributes 178 opt_keys = ['host', 'transport', 'port', 'heartbeat', 'username', 'password', 'sasl_mechanisms', 'sasl_service', 'sasl_min_ssf', 'sasl_max_ssf', 'reconnect', 'reconnect_timeout', 'reconnect_interval', 'reconnect_interval_min', 'reconnect_interval_max', 'reconnect_limit', 'reconnect_urls', 'reconnect_log', 'address_ttl', 'tcp_nodelay', 'ssl_keyfile', 'ssl_certfile', 'ssl_trustfile', 'ssl_skip_hostname_check', 'client_properties', 'protocol' ] 179 # Create all attributes on self and set to None. 180 for key in opt_keys: 181 setattr(self, key, None) 182 # Get values from options, check for invalid options 183 for (key, value) in options.iteritems(): 184 if key in opt_keys: 185 setattr(self, key, value) 186 else: 187 raise ConnectionError("Unknown connection option %s with value %s" %(key, value)) 188 189 # Now handle items that need special treatment or have speical defaults: 190 if self.host: 191 url = default(url, self.host) 192 if isinstance(url, basestring): 193 url = URL(url) 194 self.host = url.host 195 196 if self.transport is None: 197 if url.scheme == url.AMQP: 198 self.transport = "tcp" 199 elif url.scheme == url.AMQPS: 200 self.transport = "ssl" 201 else: 202 self.transport = "tcp" 203 if self.transport in ("ssl", "tcp+tls"): 204 self.port = default(url.port, default(self.port, AMQPS_PORT)) 205 else: 206 self.port = default(url.port, default(self.port, AMQP_PORT)) 207 208 if self.protocol and self.protocol != "amqp0-10": 209 raise ConnectionError("Connection option 'protocol' value '" + value + "' unsupported (must be amqp0-10)") 210 211 self.username = default(url.user, self.username) 212 self.password = default(url.password, self.password) 213 self.auth_username = None 214 self.sasl_service = default(self.sasl_service, "qpidd") 215 216 self.reconnect = default(self.reconnect, False) 217 self.reconnect_interval_min = default(self.reconnect_interval_min, 218 default(self.reconnect_interval, 1)) 219 self.reconnect_interval_max = default(self.reconnect_interval_max, 220 default(self.reconnect_interval, 2*60)) 221 self.reconnect_urls = default(self.reconnect_urls, []) 222 self.reconnect_log = default(self.reconnect_log, True) 223 224 self.address_ttl = default(self.address_ttl, 60) 225 self.tcp_nodelay = default(self.tcp_nodelay, False) 226 227 self.ssl_keyfile = default(self.ssl_keyfile, None) 228 self.ssl_certfile = default(self.ssl_certfile, None) 229 self.ssl_trustfile = default(self.ssl_trustfile, None) 230 # if ssl_skip_hostname_check was not explicitly set, this will be None 231 self._ssl_skip_hostname_check_actual = options.get("ssl_skip_hostname_check") 232 self.ssl_skip_hostname_check = default(self.ssl_skip_hostname_check, False) 233 self.client_properties = default(self.client_properties, {}) 234 235 self.options = options 236 237 238 self.id = str(uuid4()) 239 self.session_counter = 0 240 self.sessions = {} 241 self._open = False 242 self._connected = False 243 self._transport_connected = False 244 self._lock = RLock() 245 self._condition = Condition(self._lock) 246 self._waiter = Waiter(self._condition) 247 self._modcount = Serial(0) 248 from driver import Driver 249 self._driver = Driver(self)
250
251 - def _wait(self, predicate, timeout=None):
252 return self._waiter.wait(predicate, timeout=timeout)
253
254 - def _wakeup(self):
255 self._modcount += 1 256 self._driver.wakeup()
257
258 - def check_error(self):
259 if self.error: 260 self._condition.gc() 261 e = self.error 262 if isinstance(e, ContentError): 263 """ forget the content error. It will be 264 raised this time but won't block future calls 265 """ 266 self.error = None 267 raise e
268
269 - def get_error(self):
270 return self.error
271
272 - def _ewait(self, predicate, timeout=None):
273 result = self._wait(lambda: self.error or predicate(), timeout) 274 self.check_error() 275 return result
276
277 - def check_closed(self):
278 if not self._connected: 279 self._condition.gc() 280 raise ConnectionClosed()
281 282 @synchronized
283 - def session(self, name=None, transactional=False):
284 """ 285 Creates or retrieves the named session. If the name is omitted or 286 None, then a unique name is chosen based on a randomly generated 287 uuid. 288 289 @type name: str 290 @param name: the session name 291 @rtype: Session 292 @return: the named Session 293 """ 294 295 if name is None: 296 name = "%s:%s" % (self.id, self.session_counter) 297 self.session_counter += 1 298 else: 299 name = "%s:%s" % (self.id, name) 300 301 if self.sessions.has_key(name): 302 return self.sessions[name] 303 else: 304 ssn = Session(self, name, transactional) 305 self.sessions[name] = ssn 306 self._wakeup() 307 return ssn
308 309 @synchronized
310 - def _remove_session(self, ssn):
311 self.sessions.pop(ssn.name, 0)
312 313 @synchronized
314 - def open(self, timeout=None):
315 """ 316 Opens a connection. 317 """ 318 if self._open: 319 raise ConnectionError("already open") 320 self._open = True 321 if self.reconnect and self.reconnect_timeout > 0: 322 timeout = self.reconnect_timeout 323 self.attach(timeout=timeout)
324 325 @synchronized
326 - def opened(self):
327 """ 328 Return true if the connection is open, false otherwise. 329 """ 330 return self._open
331 332 @synchronized
333 - def attach(self, timeout=None):
334 """ 335 Attach to the remote endpoint. 336 """ 337 if not self._connected: 338 self._connected = True 339 self._driver.start() 340 self._wakeup() 341 if not self._ewait(lambda: self._transport_connected and not self._unlinked(), timeout=timeout): 342 self.reconnect = False 343 raise Timeout("Connection attach timed out")
344
345 - def _unlinked(self):
346 return [l 347 for ssn in self.sessions.values() 348 if not (ssn.error or ssn.closed) 349 for l in ssn.senders + ssn.receivers 350 if not (l.linked or l.error or l.closed)]
351 352 @synchronized
353 - def detach(self, timeout=None):
354 """ 355 Detach from the remote endpoint. 356 """ 357 if self._connected: 358 self._connected = False 359 self._wakeup() 360 cleanup = True 361 else: 362 cleanup = False 363 try: 364 if not self._wait(lambda: not self._transport_connected, timeout=timeout): 365 raise Timeout("detach timed out") 366 finally: 367 if cleanup: 368 self._driver.stop() 369 self._condition.gc()
370 371 @synchronized
372 - def attached(self):
373 """ 374 Return true if the connection is attached, false otherwise. 375 """ 376 return self._connected
377 378 @synchronized
379 - def close(self, timeout=None):
380 """ 381 Close the connection and all sessions. 382 """ 383 try: 384 for ssn in self.sessions.values(): 385 ssn.close(timeout=timeout) 386 finally: 387 self.detach(timeout=timeout) 388 self._open = False
389
390 391 -class Session(Endpoint):
392 393 """ 394 Sessions provide a linear context for sending and receiving 395 L{Messages<Message>}. L{Messages<Message>} are sent and received 396 using the L{Sender.send} and L{Receiver.fetch} methods of the 397 L{Sender} and L{Receiver} objects associated with a Session. 398 399 Each L{Sender} and L{Receiver} is created by supplying either a 400 target or source address to the L{sender} and L{receiver} methods of 401 the Session. The address is supplied via a string syntax documented 402 below. 403 404 Addresses 405 ========= 406 407 An address identifies a source or target for messages. In its 408 simplest form this is just a name. In general a target address may 409 also be used as a source address, however not all source addresses 410 may be used as a target, e.g. a source might additionally have some 411 filtering criteria that would not be present in a target. 412 413 A subject may optionally be specified along with the name. When an 414 address is used as a target, any subject specified in the address is 415 used as the default subject of outgoing messages for that target. 416 When an address is used as a source, any subject specified in the 417 address is pattern matched against the subject of available messages 418 as a filter for incoming messages from that source. 419 420 The options map contains additional information about the address 421 including: 422 423 - policies for automatically creating, and deleting the node to 424 which an address refers 425 426 - policies for asserting facts about the node to which an address 427 refers 428 429 - extension points that can be used for sender/receiver 430 configuration 431 432 Mapping to AMQP 0-10 433 -------------------- 434 The name is resolved to either an exchange or a queue by querying 435 the broker. 436 437 The subject is set as a property on the message. Additionally, if 438 the name refers to an exchange, the routing key is set to the 439 subject. 440 441 Syntax 442 ------ 443 The following regular expressions define the tokens used to parse 444 addresses:: 445 LBRACE: \\{ 446 RBRACE: \\} 447 LBRACK: \\[ 448 RBRACK: \\] 449 COLON: : 450 SEMI: ; 451 SLASH: / 452 COMMA: , 453 NUMBER: [+-]?[0-9]*\\.?[0-9]+ 454 ID: [a-zA-Z_](?:[a-zA-Z0-9_-]*[a-zA-Z0-9_])? 455 STRING: "(?:[^\\\\"]|\\\\.)*"|\'(?:[^\\\\\']|\\\\.)*\' 456 ESC: \\\\[^ux]|\\\\x[0-9a-fA-F][0-9a-fA-F]|\\\\u[0-9a-fA-F][0-9a-fA-F][0-9a-fA-F][0-9a-fA-F] 457 SYM: [.#*%@$^!+-] 458 WSPACE: [ \\n\\r\\t]+ 459 460 The formal grammar for addresses is given below:: 461 address = name [ "/" subject ] [ ";" options ] 462 name = ( part | quoted )+ 463 subject = ( part | quoted | "/" )* 464 quoted = STRING / ESC 465 part = LBRACE / RBRACE / COLON / COMMA / NUMBER / ID / SYM 466 options = map 467 map = "{" ( keyval ( "," keyval )* )? "}" 468 keyval = ID ":" value 469 value = NUMBER / STRING / ID / map / list 470 list = "[" ( value ( "," value )* )? "]" 471 472 This grammar resuls in the following informal syntax:: 473 474 <name> [ / <subject> ] [ ; <options> ] 475 476 Where options is:: 477 478 { <key> : <value>, ... } 479 480 And values may be: 481 - numbers 482 - single, double, or non quoted strings 483 - maps (dictionaries) 484 - lists 485 486 Options 487 ------- 488 The options map permits the following parameters:: 489 490 <name> [ / <subject> ] ; { 491 create: always | sender | receiver | never, 492 delete: always | sender | receiver | never, 493 assert: always | sender | receiver | never, 494 mode: browse | consume, 495 node: { 496 type: queue | topic, 497 durable: True | False, 498 x-declare: { ... <declare-overrides> ... }, 499 x-bindings: [<binding_1>, ... <binding_n>] 500 }, 501 link: { 502 name: <link-name>, 503 durable: True | False, 504 reliability: unreliable | at-most-once | at-least-once | exactly-once, 505 x-declare: { ... <declare-overrides> ... }, 506 x-bindings: [<binding_1>, ... <binding_n>], 507 x-subscribe: { ... <subscribe-overrides> ... } 508 } 509 } 510 511 Bindings are specified as a map with the following options:: 512 513 { 514 exchange: <exchange>, 515 queue: <queue>, 516 key: <key>, 517 arguments: <arguments> 518 } 519 520 The create, delete, and assert policies specify who should perfom 521 the associated action: 522 523 - I{always}: the action will always be performed 524 - I{sender}: the action will only be performed by the sender 525 - I{receiver}: the action will only be performed by the receiver 526 - I{never}: the action will never be performed (this is the default) 527 528 The node-type is one of: 529 530 - I{topic}: a topic node will default to the topic exchange, 531 x-declare may be used to specify other exchange types 532 - I{queue}: this is the default node-type 533 534 The x-declare map permits protocol specific keys and values to be 535 specified when exchanges or queues are declared. These keys and 536 values are passed through when creating a node or asserting facts 537 about an existing node. 538 539 Examples 540 -------- 541 A simple name resolves to any named node, usually a queue or a 542 topic:: 543 544 my-queue-or-topic 545 546 A simple name with a subject will also resolve to a node, but the 547 presence of the subject will cause a sender using this address to 548 set the subject on outgoing messages, and receivers to filter based 549 on the subject:: 550 551 my-queue-or-topic/my-subject 552 553 A subject pattern can be used and will cause filtering if used by 554 the receiver. If used for a sender, the literal value gets set as 555 the subject:: 556 557 my-queue-or-topic/my-* 558 559 In all the above cases, the address is resolved to an existing node. 560 If you want the node to be auto-created, then you can do the 561 following. By default nonexistent nodes are assumed to be queues:: 562 563 my-queue; {create: always} 564 565 You can customize the properties of the queue:: 566 567 my-queue; {create: always, node: {durable: True}} 568 569 You can create a topic instead if you want:: 570 571 my-queue; {create: always, node: {type: topic}} 572 573 You can assert that the address resolves to a node with particular 574 properties:: 575 576 my-transient-topic; { 577 assert: always, 578 node: { 579 type: topic, 580 durable: False 581 } 582 } 583 """ 584
585 - def __init__(self, connection, name, transactional):
586 super(Session, self).__init__() 587 self.connection = connection 588 self.name = name 589 self.log_id = "%x" % id(self) 590 591 self.transactional = transactional 592 593 self.committing = False 594 self.committed = True 595 self.aborting = False 596 self.aborted = False 597 598 self.next_sender_id = 0 599 self.senders = [] 600 self.next_receiver_id = 0 601 self.receivers = [] 602 self.outgoing = [] 603 self.incoming = [] 604 self.unacked = [] 605 self.acked = [] 606 # XXX: I hate this name. 607 self.ack_capacity = UNLIMITED 608 609 self.closing = False 610 self.closed = False 611 612 self._lock = connection._lock 613 self._msg_received_notify_handler = None
614
615 - def __repr__(self):
616 return "<Session %s>" % self.name
617
618 - def _wait(self, predicate, timeout=None):
619 return self.connection._wait(predicate, timeout=timeout)
620
621 - def _wakeup(self):
622 self.connection._wakeup()
623
624 - def check_error(self):
625 self.connection.check_error() 626 if self.error: 627 raise self.error
628
629 - def get_error(self):
630 err = self.connection.get_error() 631 if err: 632 return err 633 else: 634 return self.error
635
636 - def _ewait(self, predicate, timeout=None):
637 result = self.connection._ewait(lambda: self.error or predicate(), timeout) 638 self.check_error() 639 return result
640
641 - def check_closed(self):
642 if self.closed: 643 raise SessionClosed()
644
645 - def _notify_message_received(self, msg):
646 self.incoming.append(msg) 647 if self._msg_received_notify_handler: 648 try: 649 # new callback parameter: the Session 650 self._msg_received_notify_handler(self) 651 except TypeError: 652 # backward compatibility with old API, no Session 653 self._msg_received_notify_handler()
654 655 @synchronized
656 - def sender(self, target, **options):
657 """ 658 Creates a L{Sender} that may be used to send L{Messages<Message>} 659 to the specified target. 660 661 @type target: str 662 @param target: the target to which messages will be sent 663 @rtype: Sender 664 @return: a new Sender for the specified target 665 """ 666 target = _mangle(target) 667 sender = Sender(self, self.next_sender_id, target, options) 668 self.next_sender_id += 1 669 self.senders.append(sender) 670 if not self.closed and self.connection._connected: 671 self._wakeup() 672 try: 673 sender._ewait(lambda: sender.linked) 674 except LinkError, e: 675 sender.close() 676 raise e 677 return sender
678 679 @synchronized
680 - def receiver(self, source, **options):
681 """ 682 Creates a receiver that may be used to fetch L{Messages<Message>} 683 from the specified source. 684 685 @type source: str 686 @param source: the source of L{Messages<Message>} 687 @rtype: Receiver 688 @return: a new Receiver for the specified source 689 """ 690 source = _mangle(source) 691 receiver = Receiver(self, self.next_receiver_id, source, options) 692 self.next_receiver_id += 1 693 self.receivers.append(receiver) 694 if not self.closed and self.connection._connected: 695 self._wakeup() 696 try: 697 receiver._ewait(lambda: receiver.linked) 698 except LinkError, e: 699 receiver.close() 700 raise e 701 return receiver
702 703 @synchronized
704 - def _count(self, predicate):
705 result = 0 706 for msg in self.incoming: 707 if predicate(msg): 708 result += 1 709 return result
710
711 - def _peek(self, receiver):
712 for msg in self.incoming: 713 if msg._receiver == receiver: 714 return msg
715
716 - def _pop(self, receiver):
717 i = 0 718 while i < len(self.incoming): 719 msg = self.incoming[i] 720 if msg._receiver == receiver: 721 del self.incoming[i] 722 return msg 723 else: 724 i += 1
725 726 @synchronized
727 - def _get(self, receiver, timeout=None):
728 if self._ewait(lambda: ((self._peek(receiver) is not None) or 729 self.closing or receiver.closed), 730 timeout): 731 msg = self._pop(receiver) 732 if msg is not None: 733 msg._receiver.returned += 1 734 self.unacked.append(msg) 735 log.debug("RETR[%s]: %s", self.log_id, msg) 736 return msg 737 return None
738 739 @synchronized
740 - def set_message_received_notify_handler(self, handler):
741 """ 742 Register a callable that will be invoked when a Message arrives on the 743 Session. 744 745 @param handler: invoked by the driver thread when an error occurs. 746 @type handler: a callable object taking a Session instance as its only 747 argument 748 @return: None 749 750 @note: When using this method it is recommended to also register 751 asynchronous error callbacks on all endpoint objects. Doing so will cause 752 the application to be notified if an error is raised by the driver 753 thread. This is necessary as after a driver error occurs the message received 754 callback may never be invoked again. See 755 L{Endpoint.set_async_exception_notify_handler} 756 757 @warning: B{Use with caution} This callback is invoked in the context of 758 the driver thread. It is B{NOT} safe to call B{ANY} of the public 759 messaging APIs from within this callback, including any of the passed 760 Session's methods. The intent of the handler is to provide an efficient 761 way to notify the application that a message has arrived. This can be 762 useful for those applications that need to schedule a task to poll for 763 received messages without blocking in the messaging API. The scheduled 764 task may then retrieve the message using L{next_receiver} and 765 L{Receiver.fetch} 766 """ 767 self._msg_received_notify_handler = handler
768 769 @synchronized
770 - def set_message_received_handler(self, handler):
771 """@deprecated: Use L{set_message_received_notify_handler} instead. 772 """ 773 self._msg_received_notify_handler = handler
774 775 @synchronized
776 - def next_receiver(self, timeout=None):
777 if self._ecwait(lambda: self.incoming, timeout): 778 return self.incoming[0]._receiver 779 else: 780 raise Empty
781 782 @synchronized
783 - def acknowledge(self, message=None, disposition=None, sync=True):
784 """ 785 Acknowledge the given L{Message}. If message is None, then all 786 unacknowledged messages on the session are acknowledged. 787 788 @type message: Message 789 @param message: the message to acknowledge or None 790 @type sync: boolean 791 @param sync: if true then block until the message(s) are acknowledged 792 """ 793 if message is None: 794 messages = self.unacked[:] 795 else: 796 messages = [message] 797 798 for m in messages: 799 if self.ack_capacity is not UNLIMITED: 800 if self.ack_capacity <= 0: 801 # XXX: this is currently a SendError, maybe it should be a SessionError? 802 raise InsufficientCapacity("ack_capacity = %s" % self.ack_capacity) 803 self._wakeup() 804 self._ecwait(lambda: len(self.acked) < self.ack_capacity) 805 m._disposition = disposition 806 self.unacked.remove(m) 807 self.acked.append(m) 808 809 self._wakeup() 810 if sync: 811 self._ecwait(lambda: not [m for m in messages if m in self.acked])
812 813 @synchronized
814 - def commit(self, timeout=None):
815 """ 816 Commit outstanding transactional work. This consists of all 817 message sends and receives since the prior commit or rollback. 818 """ 819 if not self.transactional: 820 raise NontransactionalSession() 821 self.committing = True 822 self._wakeup() 823 try: 824 if not self._ecwait(lambda: not self.committing, timeout=timeout): 825 raise Timeout("commit timed out") 826 except TransactionError: 827 raise 828 except Exception, e: 829 self.error = TransactionAborted(text="Transaction aborted: %s"%e) 830 raise self.error 831 if self.aborted: 832 raise TransactionAborted() 833 assert self.committed
834 835 @synchronized
836 - def rollback(self, timeout=None):
837 """ 838 Rollback outstanding transactional work. This consists of all 839 message sends and receives since the prior commit or rollback. 840 """ 841 if not self.transactional: 842 raise NontransactionalSession() 843 self.aborting = True 844 self._wakeup() 845 if not self._ecwait(lambda: not self.aborting, timeout=timeout): 846 raise Timeout("rollback timed out") 847 assert self.aborted
848 849 @synchronized
850 - def sync(self, timeout=None):
851 """ 852 Sync the session. 853 """ 854 for snd in self.senders: 855 snd.sync(timeout=timeout) 856 if not self._ewait(lambda: not self.outgoing and not self.acked, timeout=timeout): 857 raise Timeout("session sync timed out")
858 859 @synchronized
860 - def close(self, timeout=None):
861 """ 862 Close the session. 863 """ 864 if self.error: return 865 self.sync(timeout=timeout) 866 867 for link in self.receivers + self.senders: 868 link.close(timeout=timeout) 869 870 if not self.closing: 871 self.closing = True 872 self._wakeup() 873 874 try: 875 if not self._ewait(lambda: self.closed, timeout=timeout): 876 raise Timeout("session close timed out") 877 finally: 878 self.connection._remove_session(self)
879
880 881 -class MangledString(str): pass
882
883 -def _mangle(addr):
884 if addr and addr.startswith("#"): 885 return MangledString(str(uuid4()) + addr) 886 else: 887 return addr
888
889 -class Sender(Endpoint):
890 891 """ 892 Sends outgoing messages. 893 """ 894
895 - def __init__(self, session, id, target, options):
896 super(Sender, self).__init__() 897 self.session = session 898 self.id = id 899 self.target = target 900 self.options = options 901 self.capacity = options.get("capacity", UNLIMITED) 902 self.threshold = 0.5 903 self.durable = options.get("durable") 904 self.queued = Serial(0) 905 self.synced = Serial(0) 906 self.acked = Serial(0) 907 self.linked = False 908 self.closing = False 909 self.closed = False 910 self._lock = self.session._lock
911
912 - def _wakeup(self):
913 self.session._wakeup()
914
915 - def check_error(self):
916 self.session.check_error() 917 if self.error: 918 raise self.error
919
920 - def get_error(self):
921 err = self.session.get_error() 922 if err: 923 return err 924 else: 925 return self.error
926
927 - def _ewait(self, predicate, timeout=None):
928 result = self.session._ewait(lambda: self.error or predicate(), timeout) 929 self.check_error() 930 return result
931
932 - def check_closed(self):
933 if self.closed: 934 raise LinkClosed()
935 936 @synchronized
937 - def unsettled(self):
938 """ 939 Returns the number of messages awaiting acknowledgment. 940 @rtype: int 941 @return: the number of unacknowledged messages 942 """ 943 return self.queued - self.acked
944 945 @synchronized
946 - def available(self):
947 if self.capacity is UNLIMITED: 948 return UNLIMITED 949 else: 950 return self.capacity - self.unsettled()
951 952 @synchronized
953 - def send(self, object, sync=True, timeout=None):
954 """ 955 Send a message. If the object passed in is of type L{unicode}, 956 L{str}, L{list}, or L{dict}, it will automatically be wrapped in a 957 L{Message} and sent. If it is of type L{Message}, it will be sent 958 directly. If the sender capacity is not L{UNLIMITED} then send 959 will block until there is available capacity to send the message. 960 If the timeout parameter is specified, then send will throw an 961 L{InsufficientCapacity} exception if capacity does not become 962 available within the specified time. 963 964 @type object: unicode, str, list, dict, Message 965 @param object: the message or content to send 966 967 @type sync: boolean 968 @param sync: if true then block until the message is sent 969 970 @type timeout: float 971 @param timeout: the time to wait for available capacity 972 """ 973 974 if not self.session.connection._connected or self.session.closing: 975 raise Detached() 976 977 self._ecwait(lambda: self.linked, timeout=timeout) 978 979 if isinstance(object, Message): 980 message = object 981 else: 982 message = Message(object) 983 984 if message.durable is None: 985 message.durable = self.durable 986 987 if self.capacity is not UNLIMITED: 988 if self.capacity <= 0: 989 raise InsufficientCapacity("capacity = %s" % self.capacity) 990 if not self._ecwait(self.available, timeout=timeout): 991 raise InsufficientCapacity("capacity = %s" % self.capacity) 992 993 # XXX: what if we send the same message to multiple senders? 994 message._sender = self 995 if self.capacity is not UNLIMITED: 996 message._sync = sync or self.available() <= int(ceil(self.threshold*self.capacity)) 997 else: 998 message._sync = sync 999 self.session.outgoing.append(message) 1000 self.queued += 1 1001 1002 if sync: 1003 self.sync(timeout=timeout) 1004 assert message not in self.session.outgoing 1005 else: 1006 self._wakeup()
1007 1008 @synchronized
1009 - def sync(self, timeout=None):
1010 mno = self.queued 1011 if self.synced < mno: 1012 self.synced = mno 1013 self._wakeup() 1014 try: 1015 if not self._ewait(lambda: self.acked >= mno, timeout=timeout): 1016 raise Timeout("sender sync timed out") 1017 except ContentError: 1018 # clean bad message so we can continue 1019 self.acked = mno 1020 self.session.outgoing.pop(0) 1021 raise
1022 1023 @synchronized
1024 - def close(self, timeout=None):
1025 """ 1026 Close the Sender. 1027 """ 1028 # avoid erroring out when closing a sender that was never 1029 # established 1030 if self.acked < self.queued: 1031 self.sync(timeout=timeout) 1032 1033 if not self.closing: 1034 self.closing = True 1035 self._wakeup() 1036 1037 try: 1038 if not self.session._ewait(lambda: self.closed, timeout=timeout): 1039 raise Timeout("sender close timed out") 1040 finally: 1041 try: 1042 self.session.senders.remove(self) 1043 except ValueError: 1044 pass
1045
1046 1047 -class Receiver(Endpoint):
1048 1049 """ 1050 Receives incoming messages from a remote source. Messages may be 1051 fetched with L{fetch}. 1052 """ 1053
1054 - def __init__(self, session, id, source, options):
1055 super(Receiver, self).__init__() 1056 self.session = session 1057 self.id = id 1058 self.source = source 1059 self.options = options 1060 1061 self.granted = Serial(0) 1062 self.draining = False 1063 self.impending = Serial(0) 1064 self.received = Serial(0) 1065 self.returned = Serial(0) 1066 1067 self.linked = False 1068 self.closing = False 1069 self.closed = False 1070 self._lock = self.session._lock 1071 self._capacity = 0 1072 self._set_capacity(options.get("capacity", 0), False) 1073 self.threshold = 0.5
1074 1075 @synchronized
1076 - def _set_capacity(self, c, wakeup=True):
1077 if c is UNLIMITED: 1078 self._capacity = c.value 1079 else: 1080 self._capacity = c 1081 self._grant() 1082 if wakeup: 1083 self._wakeup()
1084
1085 - def _get_capacity(self):
1086 if self._capacity == UNLIMITED.value: 1087 return UNLIMITED 1088 else: 1089 return self._capacity
1090 1091 capacity = property(_get_capacity, _set_capacity) 1092
1093 - def _wakeup(self):
1094 self.session._wakeup()
1095
1096 - def check_error(self):
1097 self.session.check_error() 1098 if self.error: 1099 raise self.error
1100
1101 - def get_error(self):
1102 err = self.session.get_error() 1103 if err: 1104 return err 1105 else: 1106 return self.error
1107
1108 - def _ewait(self, predicate, timeout=None):
1109 result = self.session._ewait(lambda: self.error or predicate(), timeout) 1110 self.check_error() 1111 return result
1112
1113 - def check_closed(self):
1114 if self.closed: 1115 raise LinkClosed()
1116 1117 @synchronized
1118 - def unsettled(self):
1119 """ 1120 Returns the number of acknowledged messages awaiting confirmation. 1121 """ 1122 return len([m for m in self.session.acked if m._receiver is self])
1123 1124 @synchronized
1125 - def available(self):
1126 """ 1127 Returns the number of messages available to be fetched by the 1128 application. 1129 1130 @rtype: int 1131 @return: the number of available messages 1132 """ 1133 return self.received - self.returned
1134 1135 @synchronized
1136 - def fetch(self, timeout=None):
1137 """ 1138 Fetch and return a single message. A timeout of None will block 1139 forever waiting for a message to arrive, a timeout of zero will 1140 return immediately if no messages are available. 1141 1142 @type timeout: float 1143 @param timeout: the time to wait for a message to be available 1144 """ 1145 1146 self._ecwait(lambda: self.linked) 1147 1148 if self._capacity == 0: 1149 self.granted = self.returned + 1 1150 self._wakeup() 1151 self._ecwait(lambda: self.impending >= self.granted) 1152 msg = self.session._get(self, timeout=timeout) 1153 if msg is None: 1154 self.check_closed() 1155 self.draining = True 1156 self._wakeup() 1157 self._ecwait(lambda: not self.draining) 1158 msg = self.session._get(self, timeout=0) 1159 self._grant() 1160 self._wakeup() 1161 if msg is None: 1162 raise Empty() 1163 elif self._capacity not in (0, UNLIMITED.value): 1164 t = int(ceil(self.threshold * self._capacity)) 1165 if self.received - self.returned <= t: 1166 self.granted = self.returned + self._capacity 1167 self._wakeup() 1168 return msg
1169
1170 - def _grant(self):
1171 if self._capacity == UNLIMITED.value: 1172 self.granted = UNLIMITED 1173 else: 1174 self.granted = self.returned + self._capacity
1175 1176 @synchronized
1177 - def close(self, timeout=None):
1178 """ 1179 Close the receiver. 1180 """ 1181 if not self.closing: 1182 self.closing = True 1183 self._wakeup() 1184 1185 try: 1186 if not self.session._ewait(lambda: self.closed, timeout=timeout): 1187 raise Timeout("receiver close timed out") 1188 finally: 1189 try: 1190 self.session.receivers.remove(self) 1191 except ValueError: 1192 pass
1193 1194 1195 __all__ = ["Connection", "Endpoint", "Session", "Sender", "Receiver"] 1196