Package qpid :: Package messaging :: Module endpoints
[frames] | no frames]

Source Code for Module qpid.messaging.endpoints

   1  # 
   2  # Licensed to the Apache Software Foundation (ASF) under one 
   3  # or more contributor license agreements.  See the NOTICE file 
   4  # distributed with this work for additional information 
   5  # regarding copyright ownership.  The ASF licenses this file 
   6  # to you under the Apache License, Version 2.0 (the 
   7  # "License"); you may not use this file except in compliance 
   8  # with the License.  You may obtain a copy of the License at 
   9  # 
  10  #   http://www.apache.org/licenses/LICENSE-2.0 
  11  # 
  12  # Unless required by applicable law or agreed to in writing, 
  13  # software distributed under the License is distributed on an 
  14  # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
  15  # KIND, either express or implied.  See the License for the 
  16  # specific language governing permissions and limitations 
  17  # under the License. 
  18  # 
  19   
  20  """ 
  21  A candidate high level messaging API for python. 
  22   
  23  Areas that still need work: 
  24   
  25    - definition of the arguments for L{Session.sender} and L{Session.receiver} 
  26    - standard L{Message} properties 
  27    - L{Message} content encoding 
  28    - protocol negotiation/multiprotocol impl 
  29  """ 
  30   
  31  from logging import getLogger 
  32  from math import ceil 
  33  from qpid.codec010 import StringCodec 
  34  from qpid.concurrency import synchronized, Waiter, Condition 
  35  from qpid.datatypes import Serial, uuid4 
  36  from qpid.messaging.constants import * 
  37  from qpid.messaging.exceptions import * 
  38  from qpid.messaging.message import * 
  39  from qpid.ops import PRIMITIVE 
  40  from qpid.util import default, URL 
  41  from threading import Thread, RLock 
  42   
  43  log = getLogger("qpid.messaging") 
  44   
  45  static = staticmethod 
46 47 -class Endpoint:
48
49 - def _ecwait(self, predicate, timeout=None):
50 result = self._ewait(lambda: self.closed or predicate(), timeout) 51 self.check_closed() 52 return result
53
54 -class Connection(Endpoint):
55 56 """ 57 A Connection manages a group of L{Sessions<Session>} and connects 58 them with a remote endpoint. 59 """ 60 61 @static
62 - def establish(url=None, timeout=None, **options):
63 """ 64 Constructs a L{Connection} with the supplied parameters and opens 65 it. 66 """ 67 conn = Connection(url, **options) 68 conn.open(timeout=timeout) 69 return conn
70
71 - def __init__(self, url=None, **options):
72 """ 73 Creates a connection. A newly created connection must be connected 74 with the Connection.connect() method before it can be used. 75 76 @type url: str 77 @param url: [ <username> [ / <password> ] @ ] <host> [ : <port> ] 78 @type host: str 79 @param host: the name or ip address of the remote host (overriden by url) 80 @type port: int 81 @param port: the port number of the remote host (overriden by url) 82 @type transport: str 83 @param transport: one of tcp, tcp+tls, or ssl (alias for tcp+tls) 84 @type heartbeat: int 85 @param heartbeat: heartbeat interval in seconds 86 87 @type username: str 88 @param username: the username for authentication (overriden by url) 89 @type password: str 90 @param password: the password for authentication (overriden by url) 91 92 * - sasl_min_ssf: the minimum acceptable security strength factor 93 * - sasl_max_ssf: the minimum acceptable security strength factor 94 * - sasl_service: the service name if needed by the SASL mechanism in use 95 96 @type sasl_mechanisms: str 97 @param sasl_mechanisms: space separated list of permitted sasl mechanisms 98 @type sasl_service: str 99 @param sasl_service: the service name if needed by the SASL mechanism in use 100 @type sasl_min_ssf: int 101 @param sasl_min_ssf: the minimum acceptable security strength factor 102 @type sasl_max_ssf: int 103 @param sasl_max_ssf: the minimum acceptable security strength factor 104 105 @type reconnect: bool 106 @param reconnect: enable/disable automatic reconnect 107 @type reconnect_timeout: float 108 @param reconnect_timeout: total time to attempt reconnect 109 @type reconnect_interval_min: float 110 @param reconnect_interval_min: minimum interval between reconnect attempts 111 @type reconnect_interval_max: float 112 @param reconnect_interval_max: maximum interval between reconnect attempts 113 @type reconnect_interval: float 114 @param reconnect_interval: set both min and max reconnect intervals 115 @type reconnect_limit: int 116 @param reconnect_limit: limit the total number of reconnect attempts 117 @type reconnect_urls: list[str] 118 @param reconnect_urls: list of backup hosts specified as urls 119 120 @type address_ttl: float 121 @param address_ttl: time until cached address resolution expires 122 123 @type ssl_keyfile: str 124 @param ssl_keyfile: file with client's private key (PEM format) 125 @type ssl_certfile: str 126 @param ssl_certfile: file with client's public (eventually priv+pub) key (PEM format) 127 @type ssl_trustfile: str 128 @param ssl_trustfile: file trusted certificates to validate the server 129 @type ssl_skip_hostname_check: bool 130 @param ssl_skip_hostname_check: disable verification of hostname in 131 certificate. Use with caution - disabling hostname checking leaves you 132 vulnerable to Man-in-the-Middle attacks. 133 134 @rtype: Connection 135 @return: a disconnected Connection 136 """ 137 if url is None: 138 url = options.get("host") 139 if isinstance(url, basestring): 140 url = URL(url) 141 self.host = url.host 142 if options.has_key("transport"): 143 self.transport = options.get("transport") 144 elif url.scheme == url.AMQP: 145 self.transport = "tcp" 146 elif url.scheme == url.AMQPS: 147 self.transport = "ssl" 148 else: 149 self.transport = "tcp" 150 if self.transport in ("ssl", "tcp+tls"): 151 self.port = default(url.port, options.get("port", AMQPS_PORT)) 152 else: 153 self.port = default(url.port, options.get("port", AMQP_PORT)) 154 self.heartbeat = options.get("heartbeat") 155 self.username = default(url.user, options.get("username", None)) 156 self.password = default(url.password, options.get("password", None)) 157 self.auth_username = None 158 159 self.sasl_mechanisms = options.get("sasl_mechanisms") 160 self.sasl_service = options.get("sasl_service", "qpidd") 161 self.sasl_min_ssf = options.get("sasl_min_ssf") 162 self.sasl_max_ssf = options.get("sasl_max_ssf") 163 164 self.reconnect = options.get("reconnect", False) 165 self.reconnect_timeout = options.get("reconnect_timeout") 166 reconnect_interval = options.get("reconnect_interval") 167 self.reconnect_interval_min = options.get("reconnect_interval_min", 168 default(reconnect_interval, 1)) 169 self.reconnect_interval_max = options.get("reconnect_interval_max", 170 default(reconnect_interval, 2*60)) 171 self.reconnect_limit = options.get("reconnect_limit") 172 self.reconnect_urls = options.get("reconnect_urls", []) 173 self.reconnect_log = options.get("reconnect_log", True) 174 175 self.address_ttl = options.get("address_ttl", 60) 176 self.tcp_nodelay = options.get("tcp_nodelay", False) 177 178 self.ssl_keyfile = options.get("ssl_keyfile", None) 179 self.ssl_certfile = options.get("ssl_certfile", None) 180 self.ssl_trustfile = options.get("ssl_trustfile", None) 181 self.ssl_skip_hostname_check = options.get("ssl_skip_hostname_check", False) 182 self.client_properties = options.get("client_properties", {}) 183 184 self.options = options 185 186 187 self.id = str(uuid4()) 188 self.session_counter = 0 189 self.sessions = {} 190 self._open = False 191 self._connected = False 192 self._transport_connected = False 193 self._lock = RLock() 194 self._condition = Condition(self._lock) 195 self._waiter = Waiter(self._condition) 196 self._modcount = Serial(0) 197 self.error = None 198 from driver import Driver 199 self._driver = Driver(self)
200
201 - def _wait(self, predicate, timeout=None):
202 return self._waiter.wait(predicate, timeout=timeout)
203
204 - def _wakeup(self):
205 self._modcount += 1 206 self._driver.wakeup()
207
208 - def check_error(self):
209 if self.error: 210 self._condition.gc() 211 e = self.error 212 if isinstance(e, ContentError): 213 """ forget the content error. It will be 214 raised this time but won't block future calls 215 """ 216 self.error = None 217 raise e
218
219 - def get_error(self):
220 return self.error
221
222 - def _ewait(self, predicate, timeout=None):
223 result = self._wait(lambda: self.error or predicate(), timeout) 224 self.check_error() 225 return result
226
227 - def check_closed(self):
228 if not self._connected: 229 self._condition.gc() 230 raise ConnectionClosed()
231 232 @synchronized
233 - def session(self, name=None, transactional=False):
234 """ 235 Creates or retrieves the named session. If the name is omitted or 236 None, then a unique name is chosen based on a randomly generated 237 uuid. 238 239 @type name: str 240 @param name: the session name 241 @rtype: Session 242 @return: the named Session 243 """ 244 245 if name is None: 246 name = "%s:%s" % (self.id, self.session_counter) 247 self.session_counter += 1 248 else: 249 name = "%s:%s" % (self.id, name) 250 251 if self.sessions.has_key(name): 252 return self.sessions[name] 253 else: 254 ssn = Session(self, name, transactional) 255 self.sessions[name] = ssn 256 self._wakeup() 257 return ssn
258 259 @synchronized
260 - def _remove_session(self, ssn):
261 self.sessions.pop(ssn.name, 0)
262 263 @synchronized
264 - def open(self, timeout=None):
265 """ 266 Opens a connection. 267 """ 268 if self._open: 269 raise ConnectionError("already open") 270 self._open = True 271 if self.reconnect and self.reconnect_timeout > 0: 272 timeout = self.reconnect_timeout 273 self.attach(timeout=timeout)
274 275 @synchronized
276 - def opened(self):
277 """ 278 Return true if the connection is open, false otherwise. 279 """ 280 return self._open
281 282 @synchronized
283 - def attach(self, timeout=None):
284 """ 285 Attach to the remote endpoint. 286 """ 287 if not self._connected: 288 self._connected = True 289 self._driver.start() 290 self._wakeup() 291 if not self._ewait(lambda: self._transport_connected and not self._unlinked(), timeout=timeout): 292 self.reconnect = False 293 raise Timeout("Connection attach timed out")
294
295 - def _unlinked(self):
296 return [l 297 for ssn in self.sessions.values() 298 if not (ssn.error or ssn.closed) 299 for l in ssn.senders + ssn.receivers 300 if not (l.linked or l.error or l.closed)]
301 302 @synchronized
303 - def detach(self, timeout=None):
304 """ 305 Detach from the remote endpoint. 306 """ 307 if self._connected: 308 self._connected = False 309 self._wakeup() 310 cleanup = True 311 else: 312 cleanup = False 313 try: 314 if not self._wait(lambda: not self._transport_connected, timeout=timeout): 315 raise Timeout("detach timed out") 316 finally: 317 if cleanup: 318 self._driver.stop() 319 self._condition.gc()
320 321 @synchronized
322 - def attached(self):
323 """ 324 Return true if the connection is attached, false otherwise. 325 """ 326 return self._connected
327 328 @synchronized
329 - def close(self, timeout=None):
330 """ 331 Close the connection and all sessions. 332 """ 333 try: 334 for ssn in self.sessions.values(): 335 ssn.close(timeout=timeout) 336 finally: 337 self.detach(timeout=timeout) 338 self._open = False
339
340 -class Session(Endpoint):
341 342 """ 343 Sessions provide a linear context for sending and receiving 344 L{Messages<Message>}. L{Messages<Message>} are sent and received 345 using the L{Sender.send} and L{Receiver.fetch} methods of the 346 L{Sender} and L{Receiver} objects associated with a Session. 347 348 Each L{Sender} and L{Receiver} is created by supplying either a 349 target or source address to the L{sender} and L{receiver} methods of 350 the Session. The address is supplied via a string syntax documented 351 below. 352 353 Addresses 354 ========= 355 356 An address identifies a source or target for messages. In its 357 simplest form this is just a name. In general a target address may 358 also be used as a source address, however not all source addresses 359 may be used as a target, e.g. a source might additionally have some 360 filtering criteria that would not be present in a target. 361 362 A subject may optionally be specified along with the name. When an 363 address is used as a target, any subject specified in the address is 364 used as the default subject of outgoing messages for that target. 365 When an address is used as a source, any subject specified in the 366 address is pattern matched against the subject of available messages 367 as a filter for incoming messages from that source. 368 369 The options map contains additional information about the address 370 including: 371 372 - policies for automatically creating, and deleting the node to 373 which an address refers 374 375 - policies for asserting facts about the node to which an address 376 refers 377 378 - extension points that can be used for sender/receiver 379 configuration 380 381 Mapping to AMQP 0-10 382 -------------------- 383 The name is resolved to either an exchange or a queue by querying 384 the broker. 385 386 The subject is set as a property on the message. Additionally, if 387 the name refers to an exchange, the routing key is set to the 388 subject. 389 390 Syntax 391 ------ 392 The following regular expressions define the tokens used to parse 393 addresses:: 394 LBRACE: \\{ 395 RBRACE: \\} 396 LBRACK: \\[ 397 RBRACK: \\] 398 COLON: : 399 SEMI: ; 400 SLASH: / 401 COMMA: , 402 NUMBER: [+-]?[0-9]*\\.?[0-9]+ 403 ID: [a-zA-Z_](?:[a-zA-Z0-9_-]*[a-zA-Z0-9_])? 404 STRING: "(?:[^\\\\"]|\\\\.)*"|\'(?:[^\\\\\']|\\\\.)*\' 405 ESC: \\\\[^ux]|\\\\x[0-9a-fA-F][0-9a-fA-F]|\\\\u[0-9a-fA-F][0-9a-fA-F][0-9a-fA-F][0-9a-fA-F] 406 SYM: [.#*%@$^!+-] 407 WSPACE: [ \\n\\r\\t]+ 408 409 The formal grammar for addresses is given below:: 410 address = name [ "/" subject ] [ ";" options ] 411 name = ( part | quoted )+ 412 subject = ( part | quoted | "/" )* 413 quoted = STRING / ESC 414 part = LBRACE / RBRACE / COLON / COMMA / NUMBER / ID / SYM 415 options = map 416 map = "{" ( keyval ( "," keyval )* )? "}" 417 keyval = ID ":" value 418 value = NUMBER / STRING / ID / map / list 419 list = "[" ( value ( "," value )* )? "]" 420 421 This grammar resuls in the following informal syntax:: 422 423 <name> [ / <subject> ] [ ; <options> ] 424 425 Where options is:: 426 427 { <key> : <value>, ... } 428 429 And values may be: 430 - numbers 431 - single, double, or non quoted strings 432 - maps (dictionaries) 433 - lists 434 435 Options 436 ------- 437 The options map permits the following parameters:: 438 439 <name> [ / <subject> ] ; { 440 create: always | sender | receiver | never, 441 delete: always | sender | receiver | never, 442 assert: always | sender | receiver | never, 443 mode: browse | consume, 444 node: { 445 type: queue | topic, 446 durable: True | False, 447 x-declare: { ... <declare-overrides> ... }, 448 x-bindings: [<binding_1>, ... <binding_n>] 449 }, 450 link: { 451 name: <link-name>, 452 durable: True | False, 453 reliability: unreliable | at-most-once | at-least-once | exactly-once, 454 x-declare: { ... <declare-overrides> ... }, 455 x-bindings: [<binding_1>, ... <binding_n>], 456 x-subscribe: { ... <subscribe-overrides> ... } 457 } 458 } 459 460 Bindings are specified as a map with the following options:: 461 462 { 463 exchange: <exchange>, 464 queue: <queue>, 465 key: <key>, 466 arguments: <arguments> 467 } 468 469 The create, delete, and assert policies specify who should perfom 470 the associated action: 471 472 - I{always}: the action will always be performed 473 - I{sender}: the action will only be performed by the sender 474 - I{receiver}: the action will only be performed by the receiver 475 - I{never}: the action will never be performed (this is the default) 476 477 The node-type is one of: 478 479 - I{topic}: a topic node will default to the topic exchange, 480 x-declare may be used to specify other exchange types 481 - I{queue}: this is the default node-type 482 483 The x-declare map permits protocol specific keys and values to be 484 specified when exchanges or queues are declared. These keys and 485 values are passed through when creating a node or asserting facts 486 about an existing node. 487 488 Examples 489 -------- 490 A simple name resolves to any named node, usually a queue or a 491 topic:: 492 493 my-queue-or-topic 494 495 A simple name with a subject will also resolve to a node, but the 496 presence of the subject will cause a sender using this address to 497 set the subject on outgoing messages, and receivers to filter based 498 on the subject:: 499 500 my-queue-or-topic/my-subject 501 502 A subject pattern can be used and will cause filtering if used by 503 the receiver. If used for a sender, the literal value gets set as 504 the subject:: 505 506 my-queue-or-topic/my-* 507 508 In all the above cases, the address is resolved to an existing node. 509 If you want the node to be auto-created, then you can do the 510 following. By default nonexistent nodes are assumed to be queues:: 511 512 my-queue; {create: always} 513 514 You can customize the properties of the queue:: 515 516 my-queue; {create: always, node: {durable: True}} 517 518 You can create a topic instead if you want:: 519 520 my-queue; {create: always, node: {type: topic}} 521 522 You can assert that the address resolves to a node with particular 523 properties:: 524 525 my-transient-topic; { 526 assert: always, 527 node: { 528 type: topic, 529 durable: False 530 } 531 } 532 """ 533
534 - def __init__(self, connection, name, transactional):
535 self.connection = connection 536 self.name = name 537 self.log_id = "%x" % id(self) 538 539 self.transactional = transactional 540 541 self.committing = False 542 self.committed = True 543 self.aborting = False 544 self.aborted = False 545 546 self.next_sender_id = 0 547 self.senders = [] 548 self.next_receiver_id = 0 549 self.receivers = [] 550 self.outgoing = [] 551 self.incoming = [] 552 self.unacked = [] 553 self.acked = [] 554 # XXX: I hate this name. 555 self.ack_capacity = UNLIMITED 556 557 self.error = None 558 self.closing = False 559 self.closed = False 560 561 self._lock = connection._lock
562
563 - def __repr__(self):
564 return "<Session %s>" % self.name
565
566 - def _wait(self, predicate, timeout=None):
567 return self.connection._wait(predicate, timeout=timeout)
568
569 - def _wakeup(self):
570 self.connection._wakeup()
571
572 - def check_error(self):
573 self.connection.check_error() 574 if self.error: 575 raise self.error
576
577 - def get_error(self):
578 err = self.connection.get_error() 579 if err: 580 return err 581 else: 582 return self.error
583
584 - def _ewait(self, predicate, timeout=None):
585 result = self.connection._ewait(lambda: self.error or predicate(), timeout) 586 self.check_error() 587 return result
588
589 - def check_closed(self):
590 if self.closed: 591 raise SessionClosed()
592 593 @synchronized
594 - def sender(self, target, **options):
595 """ 596 Creates a L{Sender} that may be used to send L{Messages<Message>} 597 to the specified target. 598 599 @type target: str 600 @param target: the target to which messages will be sent 601 @rtype: Sender 602 @return: a new Sender for the specified target 603 """ 604 target = _mangle(target) 605 sender = Sender(self, self.next_sender_id, target, options) 606 self.next_sender_id += 1 607 self.senders.append(sender) 608 if not self.closed and self.connection._connected: 609 self._wakeup() 610 try: 611 sender._ewait(lambda: sender.linked) 612 except LinkError, e: 613 sender.close() 614 raise e 615 return sender
616 617 @synchronized
618 - def receiver(self, source, **options):
619 """ 620 Creates a receiver that may be used to fetch L{Messages<Message>} 621 from the specified source. 622 623 @type source: str 624 @param source: the source of L{Messages<Message>} 625 @rtype: Receiver 626 @return: a new Receiver for the specified source 627 """ 628 source = _mangle(source) 629 receiver = Receiver(self, self.next_receiver_id, source, options) 630 self.next_receiver_id += 1 631 self.receivers.append(receiver) 632 if not self.closed and self.connection._connected: 633 self._wakeup() 634 try: 635 receiver._ewait(lambda: receiver.linked) 636 except LinkError, e: 637 receiver.close() 638 raise e 639 return receiver
640 641 @synchronized
642 - def _count(self, predicate):
643 result = 0 644 for msg in self.incoming: 645 if predicate(msg): 646 result += 1 647 return result
648
649 - def _peek(self, receiver):
650 for msg in self.incoming: 651 if msg._receiver == receiver: 652 return msg
653
654 - def _pop(self, receiver):
655 i = 0 656 while i < len(self.incoming): 657 msg = self.incoming[i] 658 if msg._receiver == receiver: 659 del self.incoming[i] 660 return msg 661 else: 662 i += 1
663 664 @synchronized
665 - def _get(self, receiver, timeout=None):
666 if self._ewait(lambda: ((self._peek(receiver) is not None) or 667 self.closing or receiver.closed), 668 timeout): 669 msg = self._pop(receiver) 670 if msg is not None: 671 msg._receiver.returned += 1 672 self.unacked.append(msg) 673 log.debug("RETR[%s]: %s", self.log_id, msg) 674 return msg 675 return None
676 677 @synchronized
678 - def next_receiver(self, timeout=None):
679 if self._ecwait(lambda: self.incoming, timeout): 680 return self.incoming[0]._receiver 681 else: 682 raise Empty
683 684 @synchronized
685 - def acknowledge(self, message=None, disposition=None, sync=True):
686 """ 687 Acknowledge the given L{Message}. If message is None, then all 688 unacknowledged messages on the session are acknowledged. 689 690 @type message: Message 691 @param message: the message to acknowledge or None 692 @type sync: boolean 693 @param sync: if true then block until the message(s) are acknowledged 694 """ 695 if message is None: 696 messages = self.unacked[:] 697 else: 698 messages = [message] 699 700 for m in messages: 701 if self.ack_capacity is not UNLIMITED: 702 if self.ack_capacity <= 0: 703 # XXX: this is currently a SendError, maybe it should be a SessionError? 704 raise InsufficientCapacity("ack_capacity = %s" % self.ack_capacity) 705 self._wakeup() 706 self._ecwait(lambda: len(self.acked) < self.ack_capacity) 707 m._disposition = disposition 708 self.unacked.remove(m) 709 self.acked.append(m) 710 711 self._wakeup() 712 if sync: 713 self._ecwait(lambda: not [m for m in messages if m in self.acked])
714 715 @synchronized
716 - def commit(self):
717 """ 718 Commit outstanding transactional work. This consists of all 719 message sends and receives since the prior commit or rollback. 720 """ 721 if not self.transactional: 722 raise NontransactionalSession() 723 self.committing = True 724 self._wakeup() 725 self._ecwait(lambda: not self.committing) 726 if self.aborted: 727 raise TransactionAborted() 728 assert self.committed
729 730 @synchronized
731 - def rollback(self):
732 """ 733 Rollback outstanding transactional work. This consists of all 734 message sends and receives since the prior commit or rollback. 735 """ 736 if not self.transactional: 737 raise NontransactionalSession() 738 self.aborting = True 739 self._wakeup() 740 self._ecwait(lambda: not self.aborting) 741 assert self.aborted
742 743 @synchronized
744 - def sync(self, timeout=None):
745 """ 746 Sync the session. 747 """ 748 for snd in self.senders: 749 snd.sync(timeout=timeout) 750 if not self._ewait(lambda: not self.outgoing and not self.acked, timeout=timeout): 751 raise Timeout("session sync timed out")
752 753 @synchronized
754 - def close(self, timeout=None):
755 """ 756 Close the session. 757 """ 758 self.sync(timeout=timeout) 759 760 for link in self.receivers + self.senders: 761 link.close(timeout=timeout) 762 763 if not self.closing: 764 self.closing = True 765 self._wakeup() 766 767 try: 768 if not self._ewait(lambda: self.closed, timeout=timeout): 769 raise Timeout("session close timed out") 770 finally: 771 self.connection._remove_session(self)
772
773 -def _mangle(addr):
774 if addr and addr.startswith("#"): 775 return str(uuid4()) + addr 776 else: 777 return addr
778
779 -class Sender(Endpoint):
780 781 """ 782 Sends outgoing messages. 783 """ 784
785 - def __init__(self, session, id, target, options):
786 self.session = session 787 self.id = id 788 self.target = target 789 self.options = options 790 self.capacity = options.get("capacity", UNLIMITED) 791 self.threshold = 0.5 792 self.durable = options.get("durable") 793 self.queued = Serial(0) 794 self.synced = Serial(0) 795 self.acked = Serial(0) 796 self.error = None 797 self.linked = False 798 self.closing = False 799 self.closed = False 800 self._lock = self.session._lock
801
802 - def _wakeup(self):
803 self.session._wakeup()
804
805 - def check_error(self):
806 self.session.check_error() 807 if self.error: 808 raise self.error
809
810 - def get_error(self):
811 err = self.session.get_error() 812 if err: 813 return err 814 else: 815 return self.error
816
817 - def _ewait(self, predicate, timeout=None):
818 result = self.session._ewait(lambda: self.error or predicate(), timeout) 819 self.check_error() 820 return result
821
822 - def check_closed(self):
823 if self.closed: 824 raise LinkClosed()
825 826 @synchronized
827 - def unsettled(self):
828 """ 829 Returns the number of messages awaiting acknowledgment. 830 @rtype: int 831 @return: the number of unacknowledged messages 832 """ 833 return self.queued - self.acked
834 835 @synchronized
836 - def available(self):
837 if self.capacity is UNLIMITED: 838 return UNLIMITED 839 else: 840 return self.capacity - self.unsettled()
841 842 @synchronized
843 - def send(self, object, sync=True, timeout=None):
844 """ 845 Send a message. If the object passed in is of type L{unicode}, 846 L{str}, L{list}, or L{dict}, it will automatically be wrapped in a 847 L{Message} and sent. If it is of type L{Message}, it will be sent 848 directly. If the sender capacity is not L{UNLIMITED} then send 849 will block until there is available capacity to send the message. 850 If the timeout parameter is specified, then send will throw an 851 L{InsufficientCapacity} exception if capacity does not become 852 available within the specified time. 853 854 @type object: unicode, str, list, dict, Message 855 @param object: the message or content to send 856 857 @type sync: boolean 858 @param sync: if true then block until the message is sent 859 860 @type timeout: float 861 @param timeout: the time to wait for available capacity 862 """ 863 864 if not self.session.connection._connected or self.session.closing: 865 raise Detached() 866 867 self._ecwait(lambda: self.linked, timeout=timeout) 868 869 if isinstance(object, Message): 870 message = object 871 else: 872 message = Message(object) 873 874 if message.durable is None: 875 message.durable = self.durable 876 877 if self.capacity is not UNLIMITED: 878 if self.capacity <= 0: 879 raise InsufficientCapacity("capacity = %s" % self.capacity) 880 if not self._ecwait(self.available, timeout=timeout): 881 raise InsufficientCapacity("capacity = %s" % self.capacity) 882 883 # XXX: what if we send the same message to multiple senders? 884 message._sender = self 885 if self.capacity is not UNLIMITED: 886 message._sync = sync or self.available() <= int(ceil(self.threshold*self.capacity)) 887 else: 888 message._sync = sync 889 self.session.outgoing.append(message) 890 self.queued += 1 891 892 if sync: 893 self.sync(timeout=timeout) 894 assert message not in self.session.outgoing 895 else: 896 self._wakeup()
897 898 @synchronized
899 - def sync(self, timeout=None):
900 mno = self.queued 901 if self.synced < mno: 902 self.synced = mno 903 self._wakeup() 904 try: 905 if not self._ewait(lambda: self.acked >= mno, timeout=timeout): 906 raise Timeout("sender sync timed out") 907 except ContentError: 908 # clean bad message so we can continue 909 self.acked = mno 910 self.session.outgoing.pop(0) 911 raise
912 913 @synchronized
914 - def close(self, timeout=None):
915 """ 916 Close the Sender. 917 """ 918 # avoid erroring out when closing a sender that was never 919 # established 920 if self.acked < self.queued: 921 self.sync(timeout=timeout) 922 923 if not self.closing: 924 self.closing = True 925 self._wakeup() 926 927 try: 928 if not self.session._ewait(lambda: self.closed, timeout=timeout): 929 raise Timeout("sender close timed out") 930 finally: 931 try: 932 self.session.senders.remove(self) 933 except ValueError: 934 pass
935
936 -class Receiver(Endpoint, object):
937 938 """ 939 Receives incoming messages from a remote source. Messages may be 940 fetched with L{fetch}. 941 """ 942
943 - def __init__(self, session, id, source, options):
944 self.session = session 945 self.id = id 946 self.source = source 947 self.options = options 948 949 self.granted = Serial(0) 950 self.draining = False 951 self.impending = Serial(0) 952 self.received = Serial(0) 953 self.returned = Serial(0) 954 955 self.error = None 956 self.linked = False 957 self.closing = False 958 self.closed = False 959 self._lock = self.session._lock 960 self._capacity = 0 961 self._set_capacity(options.get("capacity", 0), False) 962 self.threshold = 0.5
963 964 @synchronized
965 - def _set_capacity(self, c, wakeup=True):
966 if c is UNLIMITED: 967 self._capacity = c.value 968 else: 969 self._capacity = c 970 self._grant() 971 if wakeup: 972 self._wakeup()
973
974 - def _get_capacity(self):
975 if self._capacity == UNLIMITED.value: 976 return UNLIMITED 977 else: 978 return self._capacity
979 980 capacity = property(_get_capacity, _set_capacity) 981
982 - def _wakeup(self):
983 self.session._wakeup()
984
985 - def check_error(self):
986 self.session.check_error() 987 if self.error: 988 raise self.error
989
990 - def get_error(self):
991 err = self.session.get_error() 992 if err: 993 return err 994 else: 995 return self.error
996
997 - def _ewait(self, predicate, timeout=None):
998 result = self.session._ewait(lambda: self.error or predicate(), timeout) 999 self.check_error() 1000 return result
1001
1002 - def check_closed(self):
1003 if self.closed: 1004 raise LinkClosed()
1005 1006 @synchronized
1007 - def unsettled(self):
1008 """ 1009 Returns the number of acknowledged messages awaiting confirmation. 1010 """ 1011 return len([m for m in self.session.acked if m._receiver is self])
1012 1013 @synchronized
1014 - def available(self):
1015 """ 1016 Returns the number of messages available to be fetched by the 1017 application. 1018 1019 @rtype: int 1020 @return: the number of available messages 1021 """ 1022 return self.received - self.returned
1023 1024 @synchronized
1025 - def fetch(self, timeout=None):
1026 """ 1027 Fetch and return a single message. A timeout of None will block 1028 forever waiting for a message to arrive, a timeout of zero will 1029 return immediately if no messages are available. 1030 1031 @type timeout: float 1032 @param timeout: the time to wait for a message to be available 1033 """ 1034 1035 self._ecwait(lambda: self.linked) 1036 1037 if self._capacity == 0: 1038 self.granted = self.returned + 1 1039 self._wakeup() 1040 self._ecwait(lambda: self.impending >= self.granted) 1041 msg = self.session._get(self, timeout=timeout) 1042 if msg is None: 1043 self.check_closed() 1044 self.draining = True 1045 self._wakeup() 1046 self._ecwait(lambda: not self.draining) 1047 msg = self.session._get(self, timeout=0) 1048 self._grant() 1049 self._wakeup() 1050 if msg is None: 1051 raise Empty() 1052 elif self._capacity not in (0, UNLIMITED.value): 1053 t = int(ceil(self.threshold * self._capacity)) 1054 if self.received - self.returned <= t: 1055 self.granted = self.returned + self._capacity 1056 self._wakeup() 1057 return msg
1058
1059 - def _grant(self):
1060 if self._capacity == UNLIMITED.value: 1061 self.granted = UNLIMITED 1062 else: 1063 self.granted = self.returned + self._capacity
1064 1065 @synchronized
1066 - def close(self, timeout=None):
1067 """ 1068 Close the receiver. 1069 """ 1070 if not self.closing: 1071 self.closing = True 1072 self._wakeup() 1073 1074 try: 1075 if not self.session._ewait(lambda: self.closed, timeout=timeout): 1076 raise Timeout("receiver close timed out") 1077 finally: 1078 try: 1079 self.session.receivers.remove(self) 1080 except ValueError: 1081 pass
1082 1083 __all__ = ["Connection", "Session", "Sender", "Receiver"] 1084