Package qpid :: Package messaging :: Module endpoints
[frames] | no frames]

Source Code for Module qpid.messaging.endpoints

   1  # 
   2  # Licensed to the Apache Software Foundation (ASF) under one 
   3  # or more contributor license agreements.  See the NOTICE file 
   4  # distributed with this work for additional information 
   5  # regarding copyright ownership.  The ASF licenses this file 
   6  # to you under the Apache License, Version 2.0 (the 
   7  # "License"); you may not use this file except in compliance 
   8  # with the License.  You may obtain a copy of the License at 
   9  # 
  10  #   http://www.apache.org/licenses/LICENSE-2.0 
  11  # 
  12  # Unless required by applicable law or agreed to in writing, 
  13  # software distributed under the License is distributed on an 
  14  # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
  15  # KIND, either express or implied.  See the License for the 
  16  # specific language governing permissions and limitations 
  17  # under the License. 
  18  # 
  19   
  20  """ 
  21  A candidate high level messaging API for python. 
  22   
  23  Areas that still need work: 
  24   
  25    - definition of the arguments for L{Session.sender} and L{Session.receiver} 
  26    - standard L{Message} properties 
  27    - L{Message} content encoding 
  28    - protocol negotiation/multiprotocol impl 
  29  """ 
  30   
  31  from logging import getLogger 
  32  from math import ceil 
  33  from qpid.codec010 import StringCodec 
  34  from qpid.concurrency import synchronized, Waiter, Condition 
  35  from qpid.datatypes import Serial, uuid4 
  36  from qpid.messaging.constants import * 
  37  from qpid.messaging.exceptions import * 
  38  from qpid.messaging.message import * 
  39  from qpid.ops import PRIMITIVE 
  40  from qpid.util import default, URL 
  41  from threading import Thread, RLock 
  42   
  43  log = getLogger("qpid.messaging") 
  44   
  45  static = staticmethod 
46 47 -class Endpoint:
48
49 - def _ecwait(self, predicate, timeout=None):
50 result = self._ewait(lambda: self.closed or predicate(), timeout) 51 self.check_closed() 52 return result
53
54 -class Connection(Endpoint):
55 56 """ 57 A Connection manages a group of L{Sessions<Session>} and connects 58 them with a remote endpoint. 59 """ 60 61 @static
62 - def establish(url=None, **options):
63 """ 64 Constructs a L{Connection} with the supplied parameters and opens 65 it. 66 """ 67 conn = Connection(url, **options) 68 conn.open() 69 return conn
70
71 - def __init__(self, url=None, **options):
72 """ 73 Creates a connection. A newly created connection must be connected 74 with the Connection.connect() method before it can be used. 75 76 @type url: str 77 @param url: [ <username> [ / <password> ] @ ] <host> [ : <port> ] 78 @type host: str 79 @param host: the name or ip address of the remote host (overriden by url) 80 @type port: int 81 @param port: the port number of the remote host (overriden by url) 82 @type transport: str 83 @param transport: one of tcp, tcp+tls, or ssl (alias for tcp+tls) 84 @type heartbeat: int 85 @param heartbeat: heartbeat interval in seconds 86 87 @type username: str 88 @param username: the username for authentication (overriden by url) 89 @type password: str 90 @param password: the password for authentication (overriden by url) 91 92 @type sasl_mechanisms: str 93 @param sasl_mechanisms: space separated list of permitted sasl mechanisms 94 @type sasl_service: str 95 @param sasl_service: ??? 96 @type sasl_min_ssf: ??? 97 @param sasl_min_ssf: ??? 98 @type sasl_max_ssf: ??? 99 @param sasl_max_ssf: ??? 100 101 @type reconnect: bool 102 @param reconnect: enable/disable automatic reconnect 103 @type reconnect_timeout: float 104 @param reconnect_timeout: total time to attempt reconnect 105 @type reconnect_internal_min: float 106 @param reconnect_internal_min: minimum interval between reconnect attempts 107 @type reconnect_internal_max: float 108 @param reconnect_internal_max: maximum interval between reconnect attempts 109 @type reconnect_internal: float 110 @param reconnect_interval: set both min and max reconnect intervals 111 @type reconnect_limit: int 112 @param reconnect_limit: limit the total number of reconnect attempts 113 @type reconnect_urls: list[str] 114 @param reconnect_urls: list of backup hosts specified as urls 115 116 @type address_ttl: float 117 @param address_ttl: time until cached address resolution expires 118 119 @type ssl_keyfile: str 120 @param ssl_keyfile: file with client's private key (PEM format) 121 @type ssl_certfile: str 122 @param ssl_certfile: file with client's public (eventually priv+pub) key (PEM format) 123 @type ssl_trustfile: str 124 @param ssl_trustfile: file trusted certificates to validate the server 125 @type ssl_skip_hostname_check: bool 126 @param ssl_skip_hostname_check: disable verification of hostname in 127 certificate. Use with caution - disabling hostname checking leaves you 128 vulnerable to Man-in-the-Middle attacks. 129 130 @rtype: Connection 131 @return: a disconnected Connection 132 """ 133 if url is None: 134 url = options.get("host") 135 if isinstance(url, basestring): 136 url = URL(url) 137 self.host = url.host 138 if options.has_key("transport"): 139 self.transport = options.get("transport") 140 elif url.scheme == url.AMQP: 141 self.transport = "tcp" 142 elif url.scheme == url.AMQPS: 143 self.transport = "ssl" 144 else: 145 self.transport = "tcp" 146 if self.transport in ("ssl", "tcp+tls"): 147 self.port = default(url.port, options.get("port", AMQPS_PORT)) 148 else: 149 self.port = default(url.port, options.get("port", AMQP_PORT)) 150 self.heartbeat = options.get("heartbeat") 151 self.username = default(url.user, options.get("username", None)) 152 self.password = default(url.password, options.get("password", None)) 153 self.auth_username = None 154 155 self.sasl_mechanisms = options.get("sasl_mechanisms") 156 self.sasl_service = options.get("sasl_service", "qpidd") 157 self.sasl_min_ssf = options.get("sasl_min_ssf") 158 self.sasl_max_ssf = options.get("sasl_max_ssf") 159 160 self.reconnect = options.get("reconnect", False) 161 self.reconnect_timeout = options.get("reconnect_timeout") 162 reconnect_interval = options.get("reconnect_interval") 163 self.reconnect_interval_min = options.get("reconnect_interval_min", 164 default(reconnect_interval, 1)) 165 self.reconnect_interval_max = options.get("reconnect_interval_max", 166 default(reconnect_interval, 2*60)) 167 self.reconnect_limit = options.get("reconnect_limit") 168 self.reconnect_urls = options.get("reconnect_urls", []) 169 self.reconnect_log = options.get("reconnect_log", True) 170 171 self.address_ttl = options.get("address_ttl", 60) 172 self.tcp_nodelay = options.get("tcp_nodelay", False) 173 174 self.ssl_keyfile = options.get("ssl_keyfile", None) 175 self.ssl_certfile = options.get("ssl_certfile", None) 176 self.ssl_trustfile = options.get("ssl_trustfile", None) 177 self.ssl_skip_hostname_check = options.get("ssl_skip_hostname_check", False) 178 self.client_properties = options.get("client_properties", {}) 179 180 self.options = options 181 182 183 self.id = str(uuid4()) 184 self.session_counter = 0 185 self.sessions = {} 186 self._open = False 187 self._connected = False 188 self._transport_connected = False 189 self._lock = RLock() 190 self._condition = Condition(self._lock) 191 self._waiter = Waiter(self._condition) 192 self._modcount = Serial(0) 193 self.error = None 194 from driver import Driver 195 self._driver = Driver(self)
196
197 - def _wait(self, predicate, timeout=None):
198 return self._waiter.wait(predicate, timeout=timeout)
199
200 - def _wakeup(self):
201 self._modcount += 1 202 self._driver.wakeup()
203
204 - def check_error(self):
205 if self.error: 206 self._condition.gc() 207 raise self.error
208
209 - def get_error(self):
210 return self.error
211
212 - def _ewait(self, predicate, timeout=None):
213 result = self._wait(lambda: self.error or predicate(), timeout) 214 self.check_error() 215 return result
216
217 - def check_closed(self):
218 if not self._connected: 219 self._condition.gc() 220 raise ConnectionClosed()
221 222 @synchronized
223 - def session(self, name=None, transactional=False):
224 """ 225 Creates or retrieves the named session. If the name is omitted or 226 None, then a unique name is chosen based on a randomly generated 227 uuid. 228 229 @type name: str 230 @param name: the session name 231 @rtype: Session 232 @return: the named Session 233 """ 234 235 if name is None: 236 name = "%s:%s" % (self.id, self.session_counter) 237 self.session_counter += 1 238 else: 239 name = "%s:%s" % (self.id, name) 240 241 if self.sessions.has_key(name): 242 return self.sessions[name] 243 else: 244 ssn = Session(self, name, transactional) 245 self.sessions[name] = ssn 246 self._wakeup() 247 return ssn
248 249 @synchronized
250 - def _remove_session(self, ssn):
251 self.sessions.pop(ssn.name, 0)
252 253 @synchronized
254 - def open(self):
255 """ 256 Opens a connection. 257 """ 258 if self._open: 259 raise ConnectionError("already open") 260 self._open = True 261 self.attach()
262 263 @synchronized
264 - def opened(self):
265 """ 266 Return true if the connection is open, false otherwise. 267 """ 268 return self._open
269 270 @synchronized
271 - def attach(self):
272 """ 273 Attach to the remote endpoint. 274 """ 275 if not self._connected: 276 self._connected = True 277 self._driver.start() 278 self._wakeup() 279 self._ewait(lambda: self._transport_connected and not self._unlinked())
280
281 - def _unlinked(self):
282 return [l 283 for ssn in self.sessions.values() 284 if not (ssn.error or ssn.closed) 285 for l in ssn.senders + ssn.receivers 286 if not (l.linked or l.error or l.closed)]
287 288 @synchronized
289 - def detach(self, timeout=None):
290 """ 291 Detach from the remote endpoint. 292 """ 293 if self._connected: 294 self._connected = False 295 self._wakeup() 296 cleanup = True 297 else: 298 cleanup = False 299 try: 300 if not self._wait(lambda: not self._transport_connected, timeout=timeout): 301 raise Timeout("detach timed out") 302 finally: 303 if cleanup: 304 self._driver.stop() 305 self._condition.gc()
306 307 @synchronized
308 - def attached(self):
309 """ 310 Return true if the connection is attached, false otherwise. 311 """ 312 return self._connected
313 314 @synchronized
315 - def close(self, timeout=None):
316 """ 317 Close the connection and all sessions. 318 """ 319 try: 320 for ssn in self.sessions.values(): 321 ssn.close(timeout=timeout) 322 finally: 323 self.detach(timeout=timeout) 324 self._open = False
325
326 -class Session(Endpoint):
327 328 """ 329 Sessions provide a linear context for sending and receiving 330 L{Messages<Message>}. L{Messages<Message>} are sent and received 331 using the L{Sender.send} and L{Receiver.fetch} methods of the 332 L{Sender} and L{Receiver} objects associated with a Session. 333 334 Each L{Sender} and L{Receiver} is created by supplying either a 335 target or source address to the L{sender} and L{receiver} methods of 336 the Session. The address is supplied via a string syntax documented 337 below. 338 339 Addresses 340 ========= 341 342 An address identifies a source or target for messages. In its 343 simplest form this is just a name. In general a target address may 344 also be used as a source address, however not all source addresses 345 may be used as a target, e.g. a source might additionally have some 346 filtering criteria that would not be present in a target. 347 348 A subject may optionally be specified along with the name. When an 349 address is used as a target, any subject specified in the address is 350 used as the default subject of outgoing messages for that target. 351 When an address is used as a source, any subject specified in the 352 address is pattern matched against the subject of available messages 353 as a filter for incoming messages from that source. 354 355 The options map contains additional information about the address 356 including: 357 358 - policies for automatically creating, and deleting the node to 359 which an address refers 360 361 - policies for asserting facts about the node to which an address 362 refers 363 364 - extension points that can be used for sender/receiver 365 configuration 366 367 Mapping to AMQP 0-10 368 -------------------- 369 The name is resolved to either an exchange or a queue by querying 370 the broker. 371 372 The subject is set as a property on the message. Additionally, if 373 the name refers to an exchange, the routing key is set to the 374 subject. 375 376 Syntax 377 ------ 378 The following regular expressions define the tokens used to parse 379 addresses:: 380 LBRACE: \\{ 381 RBRACE: \\} 382 LBRACK: \\[ 383 RBRACK: \\] 384 COLON: : 385 SEMI: ; 386 SLASH: / 387 COMMA: , 388 NUMBER: [+-]?[0-9]*\\.?[0-9]+ 389 ID: [a-zA-Z_](?:[a-zA-Z0-9_-]*[a-zA-Z0-9_])? 390 STRING: "(?:[^\\\\"]|\\\\.)*"|\'(?:[^\\\\\']|\\\\.)*\' 391 ESC: \\\\[^ux]|\\\\x[0-9a-fA-F][0-9a-fA-F]|\\\\u[0-9a-fA-F][0-9a-fA-F][0-9a-fA-F][0-9a-fA-F] 392 SYM: [.#*%@$^!+-] 393 WSPACE: [ \\n\\r\\t]+ 394 395 The formal grammar for addresses is given below:: 396 address = name [ "/" subject ] [ ";" options ] 397 name = ( part | quoted )+ 398 subject = ( part | quoted | "/" )* 399 quoted = STRING / ESC 400 part = LBRACE / RBRACE / COLON / COMMA / NUMBER / ID / SYM 401 options = map 402 map = "{" ( keyval ( "," keyval )* )? "}" 403 keyval = ID ":" value 404 value = NUMBER / STRING / ID / map / list 405 list = "[" ( value ( "," value )* )? "]" 406 407 This grammar resuls in the following informal syntax:: 408 409 <name> [ / <subject> ] [ ; <options> ] 410 411 Where options is:: 412 413 { <key> : <value>, ... } 414 415 And values may be: 416 - numbers 417 - single, double, or non quoted strings 418 - maps (dictionaries) 419 - lists 420 421 Options 422 ------- 423 The options map permits the following parameters:: 424 425 <name> [ / <subject> ] ; { 426 create: always | sender | receiver | never, 427 delete: always | sender | receiver | never, 428 assert: always | sender | receiver | never, 429 mode: browse | consume, 430 node: { 431 type: queue | topic, 432 durable: True | False, 433 x-declare: { ... <declare-overrides> ... }, 434 x-bindings: [<binding_1>, ... <binding_n>] 435 }, 436 link: { 437 name: <link-name>, 438 durable: True | False, 439 reliability: unreliable | at-most-once | at-least-once | exactly-once, 440 x-declare: { ... <declare-overrides> ... }, 441 x-bindings: [<binding_1>, ... <binding_n>], 442 x-subscribe: { ... <subscribe-overrides> ... } 443 } 444 } 445 446 Bindings are specified as a map with the following options:: 447 448 { 449 exchange: <exchange>, 450 queue: <queue>, 451 key: <key>, 452 arguments: <arguments> 453 } 454 455 The create, delete, and assert policies specify who should perfom 456 the associated action: 457 458 - I{always}: the action will always be performed 459 - I{sender}: the action will only be performed by the sender 460 - I{receiver}: the action will only be performed by the receiver 461 - I{never}: the action will never be performed (this is the default) 462 463 The node-type is one of: 464 465 - I{topic}: a topic node will default to the topic exchange, 466 x-declare may be used to specify other exchange types 467 - I{queue}: this is the default node-type 468 469 The x-declare map permits protocol specific keys and values to be 470 specified when exchanges or queues are declared. These keys and 471 values are passed through when creating a node or asserting facts 472 about an existing node. 473 474 Examples 475 -------- 476 A simple name resolves to any named node, usually a queue or a 477 topic:: 478 479 my-queue-or-topic 480 481 A simple name with a subject will also resolve to a node, but the 482 presence of the subject will cause a sender using this address to 483 set the subject on outgoing messages, and receivers to filter based 484 on the subject:: 485 486 my-queue-or-topic/my-subject 487 488 A subject pattern can be used and will cause filtering if used by 489 the receiver. If used for a sender, the literal value gets set as 490 the subject:: 491 492 my-queue-or-topic/my-* 493 494 In all the above cases, the address is resolved to an existing node. 495 If you want the node to be auto-created, then you can do the 496 following. By default nonexistent nodes are assumed to be queues:: 497 498 my-queue; {create: always} 499 500 You can customize the properties of the queue:: 501 502 my-queue; {create: always, node: {durable: True}} 503 504 You can create a topic instead if you want:: 505 506 my-queue; {create: always, node: {type: topic}} 507 508 You can assert that the address resolves to a node with particular 509 properties:: 510 511 my-transient-topic; { 512 assert: always, 513 node: { 514 type: topic, 515 durable: False 516 } 517 } 518 """ 519
520 - def __init__(self, connection, name, transactional):
521 self.connection = connection 522 self.name = name 523 self.log_id = "%x" % id(self) 524 525 self.transactional = transactional 526 527 self.committing = False 528 self.committed = True 529 self.aborting = False 530 self.aborted = False 531 532 self.next_sender_id = 0 533 self.senders = [] 534 self.next_receiver_id = 0 535 self.receivers = [] 536 self.outgoing = [] 537 self.incoming = [] 538 self.unacked = [] 539 self.acked = [] 540 # XXX: I hate this name. 541 self.ack_capacity = UNLIMITED 542 543 self.error = None 544 self.closing = False 545 self.closed = False 546 547 self._lock = connection._lock
548
549 - def __repr__(self):
550 return "<Session %s>" % self.name
551
552 - def _wait(self, predicate, timeout=None):
553 return self.connection._wait(predicate, timeout=timeout)
554
555 - def _wakeup(self):
556 self.connection._wakeup()
557
558 - def check_error(self):
559 self.connection.check_error() 560 if self.error: 561 raise self.error
562
563 - def get_error(self):
564 err = self.connection.get_error() 565 if err: 566 return err 567 else: 568 return self.error
569
570 - def _ewait(self, predicate, timeout=None):
571 result = self.connection._ewait(lambda: self.error or predicate(), timeout) 572 self.check_error() 573 return result
574
575 - def check_closed(self):
576 if self.closed: 577 raise SessionClosed()
578 579 @synchronized
580 - def sender(self, target, **options):
581 """ 582 Creates a L{Sender} that may be used to send L{Messages<Message>} 583 to the specified target. 584 585 @type target: str 586 @param target: the target to which messages will be sent 587 @rtype: Sender 588 @return: a new Sender for the specified target 589 """ 590 target = _mangle(target) 591 sender = Sender(self, self.next_sender_id, target, options) 592 self.next_sender_id += 1 593 self.senders.append(sender) 594 if not self.closed and self.connection._connected: 595 self._wakeup() 596 try: 597 sender._ewait(lambda: sender.linked) 598 except LinkError, e: 599 sender.close() 600 raise e 601 return sender
602 603 @synchronized
604 - def receiver(self, source, **options):
605 """ 606 Creates a receiver that may be used to fetch L{Messages<Message>} 607 from the specified source. 608 609 @type source: str 610 @param source: the source of L{Messages<Message>} 611 @rtype: Receiver 612 @return: a new Receiver for the specified source 613 """ 614 source = _mangle(source) 615 receiver = Receiver(self, self.next_receiver_id, source, options) 616 self.next_receiver_id += 1 617 self.receivers.append(receiver) 618 if not self.closed and self.connection._connected: 619 self._wakeup() 620 try: 621 receiver._ewait(lambda: receiver.linked) 622 except LinkError, e: 623 receiver.close() 624 raise e 625 return receiver
626 627 @synchronized
628 - def _count(self, predicate):
629 result = 0 630 for msg in self.incoming: 631 if predicate(msg): 632 result += 1 633 return result
634
635 - def _peek(self, receiver):
636 for msg in self.incoming: 637 if msg._receiver == receiver: 638 return msg
639
640 - def _pop(self, receiver):
641 i = 0 642 while i < len(self.incoming): 643 msg = self.incoming[i] 644 if msg._receiver == receiver: 645 del self.incoming[i] 646 return msg 647 else: 648 i += 1
649 650 @synchronized
651 - def _get(self, receiver, timeout=None):
652 if self._ewait(lambda: ((self._peek(receiver) is not None) or 653 self.closing or receiver.closed), 654 timeout): 655 msg = self._pop(receiver) 656 if msg is not None: 657 msg._receiver.returned += 1 658 self.unacked.append(msg) 659 log.debug("RETR[%s]: %s", self.log_id, msg) 660 return msg 661 return None
662 663 @synchronized
664 - def next_receiver(self, timeout=None):
665 if self._ecwait(lambda: self.incoming, timeout): 666 return self.incoming[0]._receiver 667 else: 668 raise Empty
669 670 @synchronized
671 - def acknowledge(self, message=None, disposition=None, sync=True):
672 """ 673 Acknowledge the given L{Message}. If message is None, then all 674 unacknowledged messages on the session are acknowledged. 675 676 @type message: Message 677 @param message: the message to acknowledge or None 678 @type sync: boolean 679 @param sync: if true then block until the message(s) are acknowledged 680 """ 681 if message is None: 682 messages = self.unacked[:] 683 else: 684 messages = [message] 685 686 for m in messages: 687 if self.ack_capacity is not UNLIMITED: 688 if self.ack_capacity <= 0: 689 # XXX: this is currently a SendError, maybe it should be a SessionError? 690 raise InsufficientCapacity("ack_capacity = %s" % self.ack_capacity) 691 self._wakeup() 692 self._ecwait(lambda: len(self.acked) < self.ack_capacity) 693 m._disposition = disposition 694 self.unacked.remove(m) 695 self.acked.append(m) 696 697 self._wakeup() 698 if sync: 699 self._ecwait(lambda: not [m for m in messages if m in self.acked])
700 701 @synchronized
702 - def commit(self):
703 """ 704 Commit outstanding transactional work. This consists of all 705 message sends and receives since the prior commit or rollback. 706 """ 707 if not self.transactional: 708 raise NontransactionalSession() 709 self.committing = True 710 self._wakeup() 711 self._ecwait(lambda: not self.committing) 712 if self.aborted: 713 raise TransactionAborted() 714 assert self.committed
715 716 @synchronized
717 - def rollback(self):
718 """ 719 Rollback outstanding transactional work. This consists of all 720 message sends and receives since the prior commit or rollback. 721 """ 722 if not self.transactional: 723 raise NontransactionalSession() 724 self.aborting = True 725 self._wakeup() 726 self._ecwait(lambda: not self.aborting) 727 assert self.aborted
728 729 @synchronized
730 - def sync(self, timeout=None):
731 """ 732 Sync the session. 733 """ 734 for snd in self.senders: 735 snd.sync(timeout=timeout) 736 if not self._ewait(lambda: not self.outgoing and not self.acked, timeout=timeout): 737 raise Timeout("session sync timed out")
738 739 @synchronized
740 - def close(self, timeout=None):
741 """ 742 Close the session. 743 """ 744 self.sync(timeout=timeout) 745 746 for link in self.receivers + self.senders: 747 link.close(timeout=timeout) 748 749 if not self.closing: 750 self.closing = True 751 self._wakeup() 752 753 try: 754 if not self._ewait(lambda: self.closed, timeout=timeout): 755 raise Timeout("session close timed out") 756 finally: 757 self.connection._remove_session(self)
758
759 -def _mangle(addr):
760 if addr and addr.startswith("#"): 761 return str(uuid4()) + addr 762 else: 763 return addr
764
765 -class Sender(Endpoint):
766 767 """ 768 Sends outgoing messages. 769 """ 770
771 - def __init__(self, session, id, target, options):
772 self.session = session 773 self.id = id 774 self.target = target 775 self.options = options 776 self.capacity = options.get("capacity", UNLIMITED) 777 self.threshold = 0.5 778 self.durable = options.get("durable") 779 self.queued = Serial(0) 780 self.synced = Serial(0) 781 self.acked = Serial(0) 782 self.error = None 783 self.linked = False 784 self.closing = False 785 self.closed = False 786 self._lock = self.session._lock
787
788 - def _wakeup(self):
789 self.session._wakeup()
790
791 - def check_error(self):
792 self.session.check_error() 793 if self.error: 794 raise self.error
795
796 - def get_error(self):
797 err = self.session.get_error() 798 if err: 799 return err 800 else: 801 return self.error
802
803 - def _ewait(self, predicate, timeout=None):
804 result = self.session._ewait(lambda: self.error or predicate(), timeout) 805 self.check_error() 806 return result
807
808 - def check_closed(self):
809 if self.closed: 810 raise LinkClosed()
811 812 @synchronized
813 - def unsettled(self):
814 """ 815 Returns the number of messages awaiting acknowledgment. 816 @rtype: int 817 @return: the number of unacknowledged messages 818 """ 819 return self.queued - self.acked
820 821 @synchronized
822 - def available(self):
823 if self.capacity is UNLIMITED: 824 return UNLIMITED 825 else: 826 return self.capacity - self.unsettled()
827 828 @synchronized
829 - def send(self, object, sync=True, timeout=None):
830 """ 831 Send a message. If the object passed in is of type L{unicode}, 832 L{str}, L{list}, or L{dict}, it will automatically be wrapped in a 833 L{Message} and sent. If it is of type L{Message}, it will be sent 834 directly. If the sender capacity is not L{UNLIMITED} then send 835 will block until there is available capacity to send the message. 836 If the timeout parameter is specified, then send will throw an 837 L{InsufficientCapacity} exception if capacity does not become 838 available within the specified time. 839 840 @type object: unicode, str, list, dict, Message 841 @param object: the message or content to send 842 843 @type sync: boolean 844 @param sync: if true then block until the message is sent 845 846 @type timeout: float 847 @param timeout: the time to wait for available capacity 848 """ 849 850 if not self.session.connection._connected or self.session.closing: 851 raise Detached() 852 853 self._ecwait(lambda: self.linked) 854 855 if isinstance(object, Message): 856 message = object 857 else: 858 message = Message(object) 859 860 if message.durable is None: 861 message.durable = self.durable 862 863 if self.capacity is not UNLIMITED: 864 if self.capacity <= 0: 865 raise InsufficientCapacity("capacity = %s" % self.capacity) 866 if not self._ecwait(self.available, timeout=timeout): 867 raise InsufficientCapacity("capacity = %s" % self.capacity) 868 869 # XXX: what if we send the same message to multiple senders? 870 message._sender = self 871 if self.capacity is not UNLIMITED: 872 message._sync = sync or self.available() <= int(ceil(self.threshold*self.capacity)) 873 else: 874 message._sync = sync 875 self.session.outgoing.append(message) 876 self.queued += 1 877 878 if sync: 879 self.sync(timeout=timeout) 880 assert message not in self.session.outgoing 881 else: 882 self._wakeup()
883 884 @synchronized
885 - def sync(self, timeout=None):
886 mno = self.queued 887 if self.synced < mno: 888 self.synced = mno 889 self._wakeup() 890 if not self._ewait(lambda: self.acked >= mno, timeout=timeout): 891 raise Timeout("sender sync timed out")
892 893 @synchronized
894 - def close(self, timeout=None):
895 """ 896 Close the Sender. 897 """ 898 # avoid erroring out when closing a sender that was never 899 # established 900 if self.acked < self.queued: 901 self.sync(timeout=timeout) 902 903 if not self.closing: 904 self.closing = True 905 self._wakeup() 906 907 try: 908 if not self.session._ewait(lambda: self.closed, timeout=timeout): 909 raise Timeout("sender close timed out") 910 finally: 911 try: 912 self.session.senders.remove(self) 913 except ValueError: 914 pass
915
916 -class Receiver(Endpoint, object):
917 918 """ 919 Receives incoming messages from a remote source. Messages may be 920 fetched with L{fetch}. 921 """ 922
923 - def __init__(self, session, id, source, options):
924 self.session = session 925 self.id = id 926 self.source = source 927 self.options = options 928 929 self.granted = Serial(0) 930 self.draining = False 931 self.impending = Serial(0) 932 self.received = Serial(0) 933 self.returned = Serial(0) 934 935 self.error = None 936 self.linked = False 937 self.closing = False 938 self.closed = False 939 self._lock = self.session._lock 940 self._capacity = 0 941 self._set_capacity(options.get("capacity", 0), False) 942 self.threshold = 0.5
943 944 @synchronized
945 - def _set_capacity(self, c, wakeup=True):
946 if c is UNLIMITED: 947 self._capacity = c.value 948 else: 949 self._capacity = c 950 self._grant() 951 if wakeup: 952 self._wakeup()
953
954 - def _get_capacity(self):
955 if self._capacity == UNLIMITED.value: 956 return UNLIMITED 957 else: 958 return self._capacity
959 960 capacity = property(_get_capacity, _set_capacity) 961
962 - def _wakeup(self):
963 self.session._wakeup()
964
965 - def check_error(self):
966 self.session.check_error() 967 if self.error: 968 raise self.error
969
970 - def get_error(self):
971 err = self.session.get_error() 972 if err: 973 return err 974 else: 975 return self.error
976
977 - def _ewait(self, predicate, timeout=None):
978 result = self.session._ewait(lambda: self.error or predicate(), timeout) 979 self.check_error() 980 return result
981
982 - def check_closed(self):
983 if self.closed: 984 raise LinkClosed()
985 986 @synchronized
987 - def unsettled(self):
988 """ 989 Returns the number of acknowledged messages awaiting confirmation. 990 """ 991 return len([m for m in self.acked if m._receiver is self])
992 993 @synchronized
994 - def available(self):
995 """ 996 Returns the number of messages available to be fetched by the 997 application. 998 999 @rtype: int 1000 @return: the number of available messages 1001 """ 1002 return self.received - self.returned
1003 1004 @synchronized
1005 - def fetch(self, timeout=None):
1006 """ 1007 Fetch and return a single message. A timeout of None will block 1008 forever waiting for a message to arrive, a timeout of zero will 1009 return immediately if no messages are available. 1010 1011 @type timeout: float 1012 @param timeout: the time to wait for a message to be available 1013 """ 1014 1015 self._ecwait(lambda: self.linked) 1016 1017 if self._capacity == 0: 1018 self.granted = self.returned + 1 1019 self._wakeup() 1020 self._ecwait(lambda: self.impending >= self.granted) 1021 msg = self.session._get(self, timeout=timeout) 1022 if msg is None: 1023 self.check_closed() 1024 self.draining = True 1025 self._wakeup() 1026 self._ecwait(lambda: not self.draining) 1027 msg = self.session._get(self, timeout=0) 1028 self._grant() 1029 self._wakeup() 1030 if msg is None: 1031 raise Empty() 1032 elif self._capacity not in (0, UNLIMITED.value): 1033 t = int(ceil(self.threshold * self._capacity)) 1034 if self.received - self.returned <= t: 1035 self.granted = self.returned + self._capacity 1036 self._wakeup() 1037 return msg
1038
1039 - def _grant(self):
1040 if self._capacity == UNLIMITED.value: 1041 self.granted = UNLIMITED 1042 else: 1043 self.granted = self.returned + self._capacity
1044 1045 @synchronized
1046 - def close(self, timeout=None):
1047 """ 1048 Close the receiver. 1049 """ 1050 if not self.closing: 1051 self.closing = True 1052 self._wakeup() 1053 1054 try: 1055 if not self.session._ewait(lambda: self.closed, timeout=timeout): 1056 raise Timeout("receiver close timed out") 1057 finally: 1058 try: 1059 self.session.receivers.remove(self) 1060 except ValueError: 1061 pass
1062 1063 __all__ = ["Connection", "Session", "Sender", "Receiver"] 1064