Package qpid :: Package messaging :: Module endpoints
[frames] | no frames]

Source Code for Module qpid.messaging.endpoints

   1  # 
   2  # Licensed to the Apache Software Foundation (ASF) under one 
   3  # or more contributor license agreements.  See the NOTICE file 
   4  # distributed with this work for additional information 
   5  # regarding copyright ownership.  The ASF licenses this file 
   6  # to you under the Apache License, Version 2.0 (the 
   7  # "License"); you may not use this file except in compliance 
   8  # with the License.  You may obtain a copy of the License at 
   9  # 
  10  #   http://www.apache.org/licenses/LICENSE-2.0 
  11  # 
  12  # Unless required by applicable law or agreed to in writing, 
  13  # software distributed under the License is distributed on an 
  14  # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
  15  # KIND, either express or implied.  See the License for the 
  16  # specific language governing permissions and limitations 
  17  # under the License. 
  18  # 
  19   
  20  """ 
  21  A candidate high level messaging API for python. 
  22   
  23  Areas that still need work: 
  24   
  25    - definition of the arguments for L{Session.sender} and L{Session.receiver} 
  26    - standard L{Message} properties 
  27    - L{Message} content encoding 
  28    - protocol negotiation/multiprotocol impl 
  29  """ 
  30   
  31  from logging import getLogger 
  32  from math import ceil 
  33  from qpid.codec010 import StringCodec 
  34  from qpid.concurrency import synchronized, Waiter, Condition 
  35  from qpid.datatypes import Serial, uuid4 
  36  from qpid.messaging.constants import * 
  37  from qpid.messaging.exceptions import * 
  38  from qpid.messaging.message import * 
  39  from qpid.ops import PRIMITIVE 
  40  from qpid.util import default, URL 
  41  from threading import Thread, RLock 
  42   
  43  log = getLogger("qpid.messaging") 
  44   
  45  static = staticmethod 
46 47 -class Endpoint:
48
49 - def _ecwait(self, predicate, timeout=None):
50 result = self._ewait(lambda: self.closed or predicate(), timeout) 51 self.check_closed() 52 return result
53
54 -class Connection(Endpoint):
55 56 """ 57 A Connection manages a group of L{Sessions<Session>} and connects 58 them with a remote endpoint. 59 """ 60 61 @static
62 - def establish(url=None, **options):
63 """ 64 Constructs a L{Connection} with the supplied parameters and opens 65 it. 66 """ 67 conn = Connection(url, **options) 68 conn.open() 69 return conn
70
71 - def __init__(self, url=None, **options):
72 """ 73 Creates a connection. A newly created connection must be connected 74 with the Connection.connect() method before it can be used. 75 76 @type url: str 77 @param url: [ <username> [ / <password> ] @ ] <host> [ : <port> ] 78 @type host: str 79 @param host: the name or ip address of the remote host (overriden by url) 80 @type port: int 81 @param port: the port number of the remote host (overriden by url) 82 @type transport: str 83 @param transport: one of tcp, tcp+tls, or ssl (alias for tcp+tls) 84 @type heartbeat: int 85 @param heartbeat: heartbeat interval in seconds 86 87 @type username: str 88 @param username: the username for authentication (overriden by url) 89 @type password: str 90 @param password: the password for authentication (overriden by url) 91 92 @type sasl_mechanisms: str 93 @param sasl_mechanisms: space separated list of permitted sasl mechanisms 94 @type sasl_service: str 95 @param sasl_service: ??? 96 @type sasl_min_ssf: ??? 97 @param sasl_min_ssf: ??? 98 @type sasl_max_ssf: ??? 99 @param sasl_max_ssf: ??? 100 101 @type reconnect: bool 102 @param reconnect: enable/disable automatic reconnect 103 @type reconnect_timeout: float 104 @param reconnect_timeout: total time to attempt reconnect 105 @type reconnect_internal_min: float 106 @param reconnect_internal_min: minimum interval between reconnect attempts 107 @type reconnect_internal_max: float 108 @param reconnect_internal_max: maximum interval between reconnect attempts 109 @type reconnect_internal: float 110 @param reconnect_interval: set both min and max reconnect intervals 111 @type reconnect_limit: int 112 @param reconnect_limit: limit the total number of reconnect attempts 113 @type reconnect_urls: list[str] 114 @param reconnect_urls: list of backup hosts specified as urls 115 116 @type address_ttl: float 117 @param address_ttl: time until cached address resolution expires 118 119 @type ssl_keyfile: str 120 @param ssl_keyfile: file with client's private key (PEM format) 121 @type ssl_certfile: str 122 @param ssl_certfile: file with client's public (eventually priv+pub) key (PEM format) 123 @type ssl_trustfile: str 124 @param ssl_trustfile: file trusted certificates to validate the server 125 126 @rtype: Connection 127 @return: a disconnected Connection 128 """ 129 if url is None: 130 url = options.get("host") 131 if isinstance(url, basestring): 132 url = URL(url) 133 self.host = url.host 134 if options.has_key("transport"): 135 self.transport = options.get("transport") 136 elif url.scheme == url.AMQP: 137 self.transport = "tcp" 138 elif url.scheme == url.AMQPS: 139 self.transport = "ssl" 140 else: 141 self.transport = "tcp" 142 if self.transport in ("ssl", "tcp+tls"): 143 self.port = default(url.port, options.get("port", AMQPS_PORT)) 144 else: 145 self.port = default(url.port, options.get("port", AMQP_PORT)) 146 self.heartbeat = options.get("heartbeat") 147 self.username = default(url.user, options.get("username", None)) 148 self.password = default(url.password, options.get("password", None)) 149 self.auth_username = None 150 151 self.sasl_mechanisms = options.get("sasl_mechanisms") 152 self.sasl_service = options.get("sasl_service", "qpidd") 153 self.sasl_min_ssf = options.get("sasl_min_ssf") 154 self.sasl_max_ssf = options.get("sasl_max_ssf") 155 156 self.reconnect = options.get("reconnect", False) 157 self.reconnect_timeout = options.get("reconnect_timeout") 158 reconnect_interval = options.get("reconnect_interval") 159 self.reconnect_interval_min = options.get("reconnect_interval_min", 160 default(reconnect_interval, 1)) 161 self.reconnect_interval_max = options.get("reconnect_interval_max", 162 default(reconnect_interval, 2*60)) 163 self.reconnect_limit = options.get("reconnect_limit") 164 self.reconnect_urls = options.get("reconnect_urls", []) 165 self.reconnect_log = options.get("reconnect_log", True) 166 167 self.address_ttl = options.get("address_ttl", 60) 168 self.tcp_nodelay = options.get("tcp_nodelay", False) 169 170 self.ssl_keyfile = options.get("ssl_keyfile", None) 171 self.ssl_certfile = options.get("ssl_certfile", None) 172 self.ssl_trustfile = options.get("ssl_trustfile", None) 173 self.client_properties = options.get("client_properties", {}) 174 175 self.options = options 176 177 178 self.id = str(uuid4()) 179 self.session_counter = 0 180 self.sessions = {} 181 self._open = False 182 self._connected = False 183 self._transport_connected = False 184 self._lock = RLock() 185 self._condition = Condition(self._lock) 186 self._waiter = Waiter(self._condition) 187 self._modcount = Serial(0) 188 self.error = None 189 from driver import Driver 190 self._driver = Driver(self)
191
192 - def _wait(self, predicate, timeout=None):
193 return self._waiter.wait(predicate, timeout=timeout)
194
195 - def _wakeup(self):
196 self._modcount += 1 197 self._driver.wakeup()
198
199 - def check_error(self):
200 if self.error: 201 self._condition.gc() 202 raise self.error
203
204 - def get_error(self):
205 return self.error
206
207 - def _ewait(self, predicate, timeout=None):
208 result = self._wait(lambda: self.error or predicate(), timeout) 209 self.check_error() 210 return result
211
212 - def check_closed(self):
213 if not self._connected: 214 self._condition.gc() 215 raise ConnectionClosed()
216 217 @synchronized
218 - def session(self, name=None, transactional=False):
219 """ 220 Creates or retrieves the named session. If the name is omitted or 221 None, then a unique name is chosen based on a randomly generated 222 uuid. 223 224 @type name: str 225 @param name: the session name 226 @rtype: Session 227 @return: the named Session 228 """ 229 230 if name is None: 231 name = "%s:%s" % (self.id, self.session_counter) 232 self.session_counter += 1 233 else: 234 name = "%s:%s" % (self.id, name) 235 236 if self.sessions.has_key(name): 237 return self.sessions[name] 238 else: 239 ssn = Session(self, name, transactional) 240 self.sessions[name] = ssn 241 self._wakeup() 242 return ssn
243 244 @synchronized
245 - def _remove_session(self, ssn):
246 self.sessions.pop(ssn.name, 0)
247 248 @synchronized
249 - def open(self):
250 """ 251 Opens a connection. 252 """ 253 if self._open: 254 raise ConnectionError("already open") 255 self._open = True 256 self.attach()
257 258 @synchronized
259 - def opened(self):
260 """ 261 Return true if the connection is open, false otherwise. 262 """ 263 return self._open
264 265 @synchronized
266 - def attach(self):
267 """ 268 Attach to the remote endpoint. 269 """ 270 if not self._connected: 271 self._connected = True 272 self._driver.start() 273 self._wakeup() 274 self._ewait(lambda: self._transport_connected and not self._unlinked())
275
276 - def _unlinked(self):
277 return [l 278 for ssn in self.sessions.values() 279 if not (ssn.error or ssn.closed) 280 for l in ssn.senders + ssn.receivers 281 if not (l.linked or l.error or l.closed)]
282 283 @synchronized
284 - def detach(self, timeout=None):
285 """ 286 Detach from the remote endpoint. 287 """ 288 if self._connected: 289 self._connected = False 290 self._wakeup() 291 cleanup = True 292 else: 293 cleanup = False 294 try: 295 if not self._wait(lambda: not self._transport_connected, timeout=timeout): 296 raise Timeout("detach timed out") 297 finally: 298 if cleanup: 299 self._driver.stop() 300 self._condition.gc()
301 302 @synchronized
303 - def attached(self):
304 """ 305 Return true if the connection is attached, false otherwise. 306 """ 307 return self._connected
308 309 @synchronized
310 - def close(self, timeout=None):
311 """ 312 Close the connection and all sessions. 313 """ 314 try: 315 for ssn in self.sessions.values(): 316 ssn.close(timeout=timeout) 317 finally: 318 self.detach(timeout=timeout) 319 self._open = False
320
321 -class Session(Endpoint):
322 323 """ 324 Sessions provide a linear context for sending and receiving 325 L{Messages<Message>}. L{Messages<Message>} are sent and received 326 using the L{Sender.send} and L{Receiver.fetch} methods of the 327 L{Sender} and L{Receiver} objects associated with a Session. 328 329 Each L{Sender} and L{Receiver} is created by supplying either a 330 target or source address to the L{sender} and L{receiver} methods of 331 the Session. The address is supplied via a string syntax documented 332 below. 333 334 Addresses 335 ========= 336 337 An address identifies a source or target for messages. In its 338 simplest form this is just a name. In general a target address may 339 also be used as a source address, however not all source addresses 340 may be used as a target, e.g. a source might additionally have some 341 filtering criteria that would not be present in a target. 342 343 A subject may optionally be specified along with the name. When an 344 address is used as a target, any subject specified in the address is 345 used as the default subject of outgoing messages for that target. 346 When an address is used as a source, any subject specified in the 347 address is pattern matched against the subject of available messages 348 as a filter for incoming messages from that source. 349 350 The options map contains additional information about the address 351 including: 352 353 - policies for automatically creating, and deleting the node to 354 which an address refers 355 356 - policies for asserting facts about the node to which an address 357 refers 358 359 - extension points that can be used for sender/receiver 360 configuration 361 362 Mapping to AMQP 0-10 363 -------------------- 364 The name is resolved to either an exchange or a queue by querying 365 the broker. 366 367 The subject is set as a property on the message. Additionally, if 368 the name refers to an exchange, the routing key is set to the 369 subject. 370 371 Syntax 372 ------ 373 The following regular expressions define the tokens used to parse 374 addresses:: 375 LBRACE: \\{ 376 RBRACE: \\} 377 LBRACK: \\[ 378 RBRACK: \\] 379 COLON: : 380 SEMI: ; 381 SLASH: / 382 COMMA: , 383 NUMBER: [+-]?[0-9]*\\.?[0-9]+ 384 ID: [a-zA-Z_](?:[a-zA-Z0-9_-]*[a-zA-Z0-9_])? 385 STRING: "(?:[^\\\\"]|\\\\.)*"|\'(?:[^\\\\\']|\\\\.)*\' 386 ESC: \\\\[^ux]|\\\\x[0-9a-fA-F][0-9a-fA-F]|\\\\u[0-9a-fA-F][0-9a-fA-F][0-9a-fA-F][0-9a-fA-F] 387 SYM: [.#*%@$^!+-] 388 WSPACE: [ \\n\\r\\t]+ 389 390 The formal grammar for addresses is given below:: 391 address = name [ "/" subject ] [ ";" options ] 392 name = ( part | quoted )+ 393 subject = ( part | quoted | "/" )* 394 quoted = STRING / ESC 395 part = LBRACE / RBRACE / COLON / COMMA / NUMBER / ID / SYM 396 options = map 397 map = "{" ( keyval ( "," keyval )* )? "}" 398 keyval = ID ":" value 399 value = NUMBER / STRING / ID / map / list 400 list = "[" ( value ( "," value )* )? "]" 401 402 This grammar resuls in the following informal syntax:: 403 404 <name> [ / <subject> ] [ ; <options> ] 405 406 Where options is:: 407 408 { <key> : <value>, ... } 409 410 And values may be: 411 - numbers 412 - single, double, or non quoted strings 413 - maps (dictionaries) 414 - lists 415 416 Options 417 ------- 418 The options map permits the following parameters:: 419 420 <name> [ / <subject> ] ; { 421 create: always | sender | receiver | never, 422 delete: always | sender | receiver | never, 423 assert: always | sender | receiver | never, 424 mode: browse | consume, 425 node: { 426 type: queue | topic, 427 durable: True | False, 428 x-declare: { ... <declare-overrides> ... }, 429 x-bindings: [<binding_1>, ... <binding_n>] 430 }, 431 link: { 432 name: <link-name>, 433 durable: True | False, 434 reliability: unreliable | at-most-once | at-least-once | exactly-once, 435 x-declare: { ... <declare-overrides> ... }, 436 x-bindings: [<binding_1>, ... <binding_n>], 437 x-subscribe: { ... <subscribe-overrides> ... } 438 } 439 } 440 441 Bindings are specified as a map with the following options:: 442 443 { 444 exchange: <exchange>, 445 queue: <queue>, 446 key: <key>, 447 arguments: <arguments> 448 } 449 450 The create, delete, and assert policies specify who should perfom 451 the associated action: 452 453 - I{always}: the action will always be performed 454 - I{sender}: the action will only be performed by the sender 455 - I{receiver}: the action will only be performed by the receiver 456 - I{never}: the action will never be performed (this is the default) 457 458 The node-type is one of: 459 460 - I{topic}: a topic node will default to the topic exchange, 461 x-declare may be used to specify other exchange types 462 - I{queue}: this is the default node-type 463 464 The x-declare map permits protocol specific keys and values to be 465 specified when exchanges or queues are declared. These keys and 466 values are passed through when creating a node or asserting facts 467 about an existing node. 468 469 Examples 470 -------- 471 A simple name resolves to any named node, usually a queue or a 472 topic:: 473 474 my-queue-or-topic 475 476 A simple name with a subject will also resolve to a node, but the 477 presence of the subject will cause a sender using this address to 478 set the subject on outgoing messages, and receivers to filter based 479 on the subject:: 480 481 my-queue-or-topic/my-subject 482 483 A subject pattern can be used and will cause filtering if used by 484 the receiver. If used for a sender, the literal value gets set as 485 the subject:: 486 487 my-queue-or-topic/my-* 488 489 In all the above cases, the address is resolved to an existing node. 490 If you want the node to be auto-created, then you can do the 491 following. By default nonexistent nodes are assumed to be queues:: 492 493 my-queue; {create: always} 494 495 You can customize the properties of the queue:: 496 497 my-queue; {create: always, node: {durable: True}} 498 499 You can create a topic instead if you want:: 500 501 my-queue; {create: always, node: {type: topic}} 502 503 You can assert that the address resolves to a node with particular 504 properties:: 505 506 my-transient-topic; { 507 assert: always, 508 node: { 509 type: topic, 510 durable: False 511 } 512 } 513 """ 514
515 - def __init__(self, connection, name, transactional):
516 self.connection = connection 517 self.name = name 518 self.log_id = "%x" % id(self) 519 520 self.transactional = transactional 521 522 self.committing = False 523 self.committed = True 524 self.aborting = False 525 self.aborted = False 526 527 self.next_sender_id = 0 528 self.senders = [] 529 self.next_receiver_id = 0 530 self.receivers = [] 531 self.outgoing = [] 532 self.incoming = [] 533 self.unacked = [] 534 self.acked = [] 535 # XXX: I hate this name. 536 self.ack_capacity = UNLIMITED 537 538 self.error = None 539 self.closing = False 540 self.closed = False 541 542 self._lock = connection._lock
543
544 - def __repr__(self):
545 return "<Session %s>" % self.name
546
547 - def _wait(self, predicate, timeout=None):
548 return self.connection._wait(predicate, timeout=timeout)
549
550 - def _wakeup(self):
551 self.connection._wakeup()
552
553 - def check_error(self):
554 self.connection.check_error() 555 if self.error: 556 raise self.error
557
558 - def get_error(self):
559 err = self.connection.get_error() 560 if err: 561 return err 562 else: 563 return self.error
564
565 - def _ewait(self, predicate, timeout=None):
566 result = self.connection._ewait(lambda: self.error or predicate(), timeout) 567 self.check_error() 568 return result
569
570 - def check_closed(self):
571 if self.closed: 572 raise SessionClosed()
573 574 @synchronized
575 - def sender(self, target, **options):
576 """ 577 Creates a L{Sender} that may be used to send L{Messages<Message>} 578 to the specified target. 579 580 @type target: str 581 @param target: the target to which messages will be sent 582 @rtype: Sender 583 @return: a new Sender for the specified target 584 """ 585 target = _mangle(target) 586 sender = Sender(self, self.next_sender_id, target, options) 587 self.next_sender_id += 1 588 self.senders.append(sender) 589 if not self.closed and self.connection._connected: 590 self._wakeup() 591 try: 592 sender._ewait(lambda: sender.linked) 593 except LinkError, e: 594 sender.close() 595 raise e 596 return sender
597 598 @synchronized
599 - def receiver(self, source, **options):
600 """ 601 Creates a receiver that may be used to fetch L{Messages<Message>} 602 from the specified source. 603 604 @type source: str 605 @param source: the source of L{Messages<Message>} 606 @rtype: Receiver 607 @return: a new Receiver for the specified source 608 """ 609 source = _mangle(source) 610 receiver = Receiver(self, self.next_receiver_id, source, options) 611 self.next_receiver_id += 1 612 self.receivers.append(receiver) 613 if not self.closed and self.connection._connected: 614 self._wakeup() 615 try: 616 receiver._ewait(lambda: receiver.linked) 617 except LinkError, e: 618 receiver.close() 619 raise e 620 return receiver
621 622 @synchronized
623 - def _count(self, predicate):
624 result = 0 625 for msg in self.incoming: 626 if predicate(msg): 627 result += 1 628 return result
629
630 - def _peek(self, receiver):
631 for msg in self.incoming: 632 if msg._receiver == receiver: 633 return msg
634
635 - def _pop(self, receiver):
636 i = 0 637 while i < len(self.incoming): 638 msg = self.incoming[i] 639 if msg._receiver == receiver: 640 del self.incoming[i] 641 return msg 642 else: 643 i += 1
644 645 @synchronized
646 - def _get(self, receiver, timeout=None):
647 if self._ewait(lambda: ((self._peek(receiver) is not None) or 648 self.closing or receiver.closed), 649 timeout): 650 msg = self._pop(receiver) 651 if msg is not None: 652 msg._receiver.returned += 1 653 self.unacked.append(msg) 654 log.debug("RETR[%s]: %s", self.log_id, msg) 655 return msg 656 return None
657 658 @synchronized
659 - def next_receiver(self, timeout=None):
660 if self._ecwait(lambda: self.incoming, timeout): 661 return self.incoming[0]._receiver 662 else: 663 raise Empty
664 665 @synchronized
666 - def acknowledge(self, message=None, disposition=None, sync=True):
667 """ 668 Acknowledge the given L{Message}. If message is None, then all 669 unacknowledged messages on the session are acknowledged. 670 671 @type message: Message 672 @param message: the message to acknowledge or None 673 @type sync: boolean 674 @param sync: if true then block until the message(s) are acknowledged 675 """ 676 if message is None: 677 messages = self.unacked[:] 678 else: 679 messages = [message] 680 681 for m in messages: 682 if self.ack_capacity is not UNLIMITED: 683 if self.ack_capacity <= 0: 684 # XXX: this is currently a SendError, maybe it should be a SessionError? 685 raise InsufficientCapacity("ack_capacity = %s" % self.ack_capacity) 686 self._wakeup() 687 self._ecwait(lambda: len(self.acked) < self.ack_capacity) 688 m._disposition = disposition 689 self.unacked.remove(m) 690 self.acked.append(m) 691 692 self._wakeup() 693 if sync: 694 self._ecwait(lambda: not [m for m in messages if m in self.acked])
695 696 @synchronized
697 - def commit(self):
698 """ 699 Commit outstanding transactional work. This consists of all 700 message sends and receives since the prior commit or rollback. 701 """ 702 if not self.transactional: 703 raise NontransactionalSession() 704 self.committing = True 705 self._wakeup() 706 self._ecwait(lambda: not self.committing) 707 if self.aborted: 708 raise TransactionAborted() 709 assert self.committed
710 711 @synchronized
712 - def rollback(self):
713 """ 714 Rollback outstanding transactional work. This consists of all 715 message sends and receives since the prior commit or rollback. 716 """ 717 if not self.transactional: 718 raise NontransactionalSession() 719 self.aborting = True 720 self._wakeup() 721 self._ecwait(lambda: not self.aborting) 722 assert self.aborted
723 724 @synchronized
725 - def sync(self, timeout=None):
726 """ 727 Sync the session. 728 """ 729 for snd in self.senders: 730 snd.sync(timeout=timeout) 731 if not self._ewait(lambda: not self.outgoing and not self.acked, timeout=timeout): 732 raise Timeout("session sync timed out")
733 734 @synchronized
735 - def close(self, timeout=None):
736 """ 737 Close the session. 738 """ 739 self.sync(timeout=timeout) 740 741 for link in self.receivers + self.senders: 742 link.close(timeout=timeout) 743 744 if not self.closing: 745 self.closing = True 746 self._wakeup() 747 748 try: 749 if not self._ewait(lambda: self.closed, timeout=timeout): 750 raise Timeout("session close timed out") 751 finally: 752 self.connection._remove_session(self)
753
754 -def _mangle(addr):
755 if addr and addr.startswith("#"): 756 return str(uuid4()) + addr 757 else: 758 return addr
759
760 -class Sender(Endpoint):
761 762 """ 763 Sends outgoing messages. 764 """ 765
766 - def __init__(self, session, id, target, options):
767 self.session = session 768 self.id = id 769 self.target = target 770 self.options = options 771 self.capacity = options.get("capacity", UNLIMITED) 772 self.threshold = 0.5 773 self.durable = options.get("durable") 774 self.queued = Serial(0) 775 self.synced = Serial(0) 776 self.acked = Serial(0) 777 self.error = None 778 self.linked = False 779 self.closing = False 780 self.closed = False 781 self._lock = self.session._lock
782
783 - def _wakeup(self):
784 self.session._wakeup()
785
786 - def check_error(self):
787 self.session.check_error() 788 if self.error: 789 raise self.error
790
791 - def get_error(self):
792 err = self.session.get_error() 793 if err: 794 return err 795 else: 796 return self.error
797
798 - def _ewait(self, predicate, timeout=None):
799 result = self.session._ewait(lambda: self.error or predicate(), timeout) 800 self.check_error() 801 return result
802
803 - def check_closed(self):
804 if self.closed: 805 raise LinkClosed()
806 807 @synchronized
808 - def unsettled(self):
809 """ 810 Returns the number of messages awaiting acknowledgment. 811 @rtype: int 812 @return: the number of unacknowledged messages 813 """ 814 return self.queued - self.acked
815 816 @synchronized
817 - def available(self):
818 if self.capacity is UNLIMITED: 819 return UNLIMITED 820 else: 821 return self.capacity - self.unsettled()
822 823 @synchronized
824 - def send(self, object, sync=True, timeout=None):
825 """ 826 Send a message. If the object passed in is of type L{unicode}, 827 L{str}, L{list}, or L{dict}, it will automatically be wrapped in a 828 L{Message} and sent. If it is of type L{Message}, it will be sent 829 directly. If the sender capacity is not L{UNLIMITED} then send 830 will block until there is available capacity to send the message. 831 If the timeout parameter is specified, then send will throw an 832 L{InsufficientCapacity} exception if capacity does not become 833 available within the specified time. 834 835 @type object: unicode, str, list, dict, Message 836 @param object: the message or content to send 837 838 @type sync: boolean 839 @param sync: if true then block until the message is sent 840 841 @type timeout: float 842 @param timeout: the time to wait for available capacity 843 """ 844 845 if not self.session.connection._connected or self.session.closing: 846 raise Detached() 847 848 self._ecwait(lambda: self.linked) 849 850 if isinstance(object, Message): 851 message = object 852 else: 853 message = Message(object) 854 855 if message.durable is None: 856 message.durable = self.durable 857 858 if self.capacity is not UNLIMITED: 859 if self.capacity <= 0: 860 raise InsufficientCapacity("capacity = %s" % self.capacity) 861 if not self._ecwait(self.available, timeout=timeout): 862 raise InsufficientCapacity("capacity = %s" % self.capacity) 863 864 # XXX: what if we send the same message to multiple senders? 865 message._sender = self 866 if self.capacity is not UNLIMITED: 867 message._sync = sync or self.available() <= int(ceil(self.threshold*self.capacity)) 868 else: 869 message._sync = sync 870 self.session.outgoing.append(message) 871 self.queued += 1 872 873 if sync: 874 self.sync(timeout=timeout) 875 assert message not in self.session.outgoing 876 else: 877 self._wakeup()
878 879 @synchronized
880 - def sync(self, timeout=None):
881 mno = self.queued 882 if self.synced < mno: 883 self.synced = mno 884 self._wakeup() 885 if not self._ewait(lambda: self.acked >= mno, timeout=timeout): 886 raise Timeout("sender sync timed out")
887 888 @synchronized
889 - def close(self, timeout=None):
890 """ 891 Close the Sender. 892 """ 893 # avoid erroring out when closing a sender that was never 894 # established 895 if self.acked < self.queued: 896 self.sync(timeout=timeout) 897 898 if not self.closing: 899 self.closing = True 900 self._wakeup() 901 902 try: 903 if not self.session._ewait(lambda: self.closed, timeout=timeout): 904 raise Timeout("sender close timed out") 905 finally: 906 try: 907 self.session.senders.remove(self) 908 except ValueError: 909 pass
910
911 -class Receiver(Endpoint, object):
912 913 """ 914 Receives incoming messages from a remote source. Messages may be 915 fetched with L{fetch}. 916 """ 917
918 - def __init__(self, session, id, source, options):
919 self.session = session 920 self.id = id 921 self.source = source 922 self.options = options 923 924 self.granted = Serial(0) 925 self.draining = False 926 self.impending = Serial(0) 927 self.received = Serial(0) 928 self.returned = Serial(0) 929 930 self.error = None 931 self.linked = False 932 self.closing = False 933 self.closed = False 934 self._lock = self.session._lock 935 self._capacity = 0 936 self._set_capacity(options.get("capacity", 0), False) 937 self.threshold = 0.5
938 939 @synchronized
940 - def _set_capacity(self, c, wakeup=True):
941 if c is UNLIMITED: 942 self._capacity = c.value 943 else: 944 self._capacity = c 945 self._grant() 946 if wakeup: 947 self._wakeup()
948
949 - def _get_capacity(self):
950 if self._capacity == UNLIMITED.value: 951 return UNLIMITED 952 else: 953 return self._capacity
954 955 capacity = property(_get_capacity, _set_capacity) 956
957 - def _wakeup(self):
958 self.session._wakeup()
959
960 - def check_error(self):
961 self.session.check_error() 962 if self.error: 963 raise self.error
964
965 - def get_error(self):
966 err = self.session.get_error() 967 if err: 968 return err 969 else: 970 return self.error
971
972 - def _ewait(self, predicate, timeout=None):
973 result = self.session._ewait(lambda: self.error or predicate(), timeout) 974 self.check_error() 975 return result
976
977 - def check_closed(self):
978 if self.closed: 979 raise LinkClosed()
980 981 @synchronized
982 - def unsettled(self):
983 """ 984 Returns the number of acknowledged messages awaiting confirmation. 985 """ 986 return len([m for m in self.acked if m._receiver is self])
987 988 @synchronized
989 - def available(self):
990 """ 991 Returns the number of messages available to be fetched by the 992 application. 993 994 @rtype: int 995 @return: the number of available messages 996 """ 997 return self.received - self.returned
998 999 @synchronized
1000 - def fetch(self, timeout=None):
1001 """ 1002 Fetch and return a single message. A timeout of None will block 1003 forever waiting for a message to arrive, a timeout of zero will 1004 return immediately if no messages are available. 1005 1006 @type timeout: float 1007 @param timeout: the time to wait for a message to be available 1008 """ 1009 1010 self._ecwait(lambda: self.linked) 1011 1012 if self._capacity == 0: 1013 self.granted = self.returned + 1 1014 self._wakeup() 1015 self._ecwait(lambda: self.impending >= self.granted) 1016 msg = self.session._get(self, timeout=timeout) 1017 if msg is None: 1018 self.check_closed() 1019 self.draining = True 1020 self._wakeup() 1021 self._ecwait(lambda: not self.draining) 1022 msg = self.session._get(self, timeout=0) 1023 self._grant() 1024 self._wakeup() 1025 if msg is None: 1026 raise Empty() 1027 elif self._capacity not in (0, UNLIMITED.value): 1028 t = int(ceil(self.threshold * self._capacity)) 1029 if self.received - self.returned <= t: 1030 self.granted = self.returned + self._capacity 1031 self._wakeup() 1032 return msg
1033
1034 - def _grant(self):
1035 if self._capacity == UNLIMITED.value: 1036 self.granted = UNLIMITED 1037 else: 1038 self.granted = self.returned + self._capacity
1039 1040 @synchronized
1041 - def close(self, timeout=None):
1042 """ 1043 Close the receiver. 1044 """ 1045 if not self.closing: 1046 self.closing = True 1047 self._wakeup() 1048 1049 try: 1050 if not self.session._ewait(lambda: self.closed, timeout=timeout): 1051 raise Timeout("receiver close timed out") 1052 finally: 1053 try: 1054 self.session.receivers.remove(self) 1055 except ValueError: 1056 pass
1057 1058 __all__ = ["Connection", "Session", "Sender", "Receiver"] 1059