Package qpid :: Package messaging :: Module endpoints
[frames] | no frames]

Source Code for Module qpid.messaging.endpoints

   1  # 
   2  # Licensed to the Apache Software Foundation (ASF) under one 
   3  # or more contributor license agreements.  See the NOTICE file 
   4  # distributed with this work for additional information 
   5  # regarding copyright ownership.  The ASF licenses this file 
   6  # to you under the Apache License, Version 2.0 (the 
   7  # "License"); you may not use this file except in compliance 
   8  # with the License.  You may obtain a copy of the License at 
   9  # 
  10  #   http://www.apache.org/licenses/LICENSE-2.0 
  11  # 
  12  # Unless required by applicable law or agreed to in writing, 
  13  # software distributed under the License is distributed on an 
  14  # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
  15  # KIND, either express or implied.  See the License for the 
  16  # specific language governing permissions and limitations 
  17  # under the License. 
  18  # 
  19   
  20  """ 
  21  A candidate high level messaging API for python. 
  22   
  23  Areas that still need work: 
  24   
  25    - definition of the arguments for L{Session.sender} and L{Session.receiver} 
  26    - standard L{Message} properties 
  27    - L{Message} content encoding 
  28    - protocol negotiation/multiprotocol impl 
  29  """ 
  30   
  31  from logging import getLogger 
  32  from math import ceil 
  33  from qpid.codec010 import StringCodec 
  34  from qpid.concurrency import synchronized, Waiter, Condition 
  35  from qpid.datatypes import Serial, uuid4 
  36  from qpid.messaging.constants import * 
  37  from qpid.messaging.exceptions import * 
  38  from qpid.messaging.message import * 
  39  from qpid.ops import PRIMITIVE 
  40  from qpid.util import default, URL 
  41  from threading import Thread, RLock 
  42   
  43  log = getLogger("qpid.messaging") 
  44   
  45  static = staticmethod 
46 47 -class Endpoint:
48
49 - def _ecwait(self, predicate, timeout=None):
50 result = self._ewait(lambda: self.closed or predicate(), timeout) 51 self.check_closed() 52 return result
53
54 -class Connection(Endpoint):
55 56 """ 57 A Connection manages a group of L{Sessions<Session>} and connects 58 them with a remote endpoint. 59 """ 60 61 @static
62 - def establish(url=None, timeout=None, **options):
63 """ 64 Constructs a L{Connection} with the supplied parameters and opens 65 it. 66 """ 67 conn = Connection(url, **options) 68 conn.open(timeout=timeout) 69 return conn
70
71 - def __init__(self, url=None, **options):
72 """ 73 Creates a connection. A newly created connection must be opened 74 with the Connection.open() method before it can be used. 75 76 @type url: str 77 @param url: [ <username> [ / <password> ] @ ] <host> [ : <port> ] 78 @type host: str 79 @param host: the name or ip address of the remote host (overriden by url) 80 @type port: int 81 @param port: the port number of the remote host (overriden by url) 82 @type transport: str 83 @param transport: one of tcp, tcp+tls, or ssl (alias for tcp+tls) 84 @type heartbeat: int 85 @param heartbeat: heartbeat interval in seconds 86 87 @type username: str 88 @param username: the username for authentication (overriden by url) 89 @type password: str 90 @param password: the password for authentication (overriden by url) 91 @type sasl_mechanisms: str 92 @param sasl_mechanisms: space separated list of permitted sasl mechanisms 93 @type sasl_service: str 94 @param sasl_service: the service name if needed by the SASL mechanism in use 95 @type sasl_min_ssf: int 96 @param sasl_min_ssf: the minimum acceptable security strength factor 97 @type sasl_max_ssf: int 98 @param sasl_max_ssf: the maximum acceptable security strength factor 99 100 @type reconnect: bool 101 @param reconnect: enable/disable automatic reconnect 102 @type reconnect_timeout: float 103 @param reconnect_timeout: total time to attempt reconnect 104 @type reconnect_interval_min: float 105 @param reconnect_interval_min: minimum interval between reconnect attempts 106 @type reconnect_interval_max: float 107 @param reconnect_interval_max: maximum interval between reconnect attempts 108 @type reconnect_interval: float 109 @param reconnect_interval: set both min and max reconnect intervals 110 @type reconnect_limit: int 111 @param reconnect_limit: limit the total number of reconnect attempts 112 @type reconnect_urls: list[str] 113 @param reconnect_urls: list of backup hosts specified as urls 114 115 @type address_ttl: float 116 @param address_ttl: time until cached address resolution expires 117 118 @type ssl_keyfile: str 119 @param ssl_keyfile: file with client's private key (PEM format) 120 @type ssl_certfile: str 121 @param ssl_certfile: file with client's public (eventually priv+pub) key (PEM format) 122 @type ssl_trustfile: str 123 @param ssl_trustfile: file trusted certificates to validate the server 124 @type ssl_skip_hostname_check: bool 125 @param ssl_skip_hostname_check: disable verification of hostname in 126 certificate. Use with caution - disabling hostname checking leaves you 127 vulnerable to Man-in-the-Middle attacks. 128 129 @rtype: Connection 130 @return: a disconnected Connection 131 """ 132 if url is None: 133 url = options.get("host") 134 if isinstance(url, basestring): 135 url = URL(url) 136 self.host = url.host 137 if options.has_key("transport"): 138 self.transport = options.get("transport") 139 elif url.scheme == url.AMQP: 140 self.transport = "tcp" 141 elif url.scheme == url.AMQPS: 142 self.transport = "ssl" 143 else: 144 self.transport = "tcp" 145 if self.transport in ("ssl", "tcp+tls"): 146 self.port = default(url.port, options.get("port", AMQPS_PORT)) 147 else: 148 self.port = default(url.port, options.get("port", AMQP_PORT)) 149 self.heartbeat = options.get("heartbeat") 150 self.username = default(url.user, options.get("username", None)) 151 self.password = default(url.password, options.get("password", None)) 152 self.auth_username = None 153 154 self.sasl_mechanisms = options.get("sasl_mechanisms") 155 self.sasl_service = options.get("sasl_service", "qpidd") 156 self.sasl_min_ssf = options.get("sasl_min_ssf") 157 self.sasl_max_ssf = options.get("sasl_max_ssf") 158 159 self.reconnect = options.get("reconnect", False) 160 self.reconnect_timeout = options.get("reconnect_timeout") 161 reconnect_interval = options.get("reconnect_interval") 162 self.reconnect_interval_min = options.get("reconnect_interval_min", 163 default(reconnect_interval, 1)) 164 self.reconnect_interval_max = options.get("reconnect_interval_max", 165 default(reconnect_interval, 2*60)) 166 self.reconnect_limit = options.get("reconnect_limit") 167 self.reconnect_urls = options.get("reconnect_urls", []) 168 self.reconnect_log = options.get("reconnect_log", True) 169 170 self.address_ttl = options.get("address_ttl", 60) 171 self.tcp_nodelay = options.get("tcp_nodelay", False) 172 173 self.ssl_keyfile = options.get("ssl_keyfile", None) 174 self.ssl_certfile = options.get("ssl_certfile", None) 175 self.ssl_trustfile = options.get("ssl_trustfile", None) 176 self.ssl_skip_hostname_check = options.get("ssl_skip_hostname_check", False) 177 self.client_properties = options.get("client_properties", {}) 178 179 self.options = options 180 181 182 self.id = str(uuid4()) 183 self.session_counter = 0 184 self.sessions = {} 185 self._open = False 186 self._connected = False 187 self._transport_connected = False 188 self._lock = RLock() 189 self._condition = Condition(self._lock) 190 self._waiter = Waiter(self._condition) 191 self._modcount = Serial(0) 192 self.error = None 193 from driver import Driver 194 self._driver = Driver(self)
195
196 - def _wait(self, predicate, timeout=None):
197 return self._waiter.wait(predicate, timeout=timeout)
198
199 - def _wakeup(self):
200 self._modcount += 1 201 self._driver.wakeup()
202
203 - def check_error(self):
204 if self.error: 205 self._condition.gc() 206 e = self.error 207 if isinstance(e, ContentError): 208 """ forget the content error. It will be 209 raised this time but won't block future calls 210 """ 211 self.error = None 212 raise e
213
214 - def get_error(self):
215 return self.error
216
217 - def _ewait(self, predicate, timeout=None):
218 result = self._wait(lambda: self.error or predicate(), timeout) 219 self.check_error() 220 return result
221
222 - def check_closed(self):
223 if not self._connected: 224 self._condition.gc() 225 raise ConnectionClosed()
226 227 @synchronized
228 - def session(self, name=None, transactional=False):
229 """ 230 Creates or retrieves the named session. If the name is omitted or 231 None, then a unique name is chosen based on a randomly generated 232 uuid. 233 234 @type name: str 235 @param name: the session name 236 @rtype: Session 237 @return: the named Session 238 """ 239 240 if name is None: 241 name = "%s:%s" % (self.id, self.session_counter) 242 self.session_counter += 1 243 else: 244 name = "%s:%s" % (self.id, name) 245 246 if self.sessions.has_key(name): 247 return self.sessions[name] 248 else: 249 ssn = Session(self, name, transactional) 250 self.sessions[name] = ssn 251 self._wakeup() 252 return ssn
253 254 @synchronized
255 - def _remove_session(self, ssn):
256 self.sessions.pop(ssn.name, 0)
257 258 @synchronized
259 - def open(self, timeout=None):
260 """ 261 Opens a connection. 262 """ 263 if self._open: 264 raise ConnectionError("already open") 265 self._open = True 266 if self.reconnect and self.reconnect_timeout > 0: 267 timeout = self.reconnect_timeout 268 self.attach(timeout=timeout)
269 270 @synchronized
271 - def opened(self):
272 """ 273 Return true if the connection is open, false otherwise. 274 """ 275 return self._open
276 277 @synchronized
278 - def attach(self, timeout=None):
279 """ 280 Attach to the remote endpoint. 281 """ 282 if not self._connected: 283 self._connected = True 284 self._driver.start() 285 self._wakeup() 286 if not self._ewait(lambda: self._transport_connected and not self._unlinked(), timeout=timeout): 287 self.reconnect = False 288 raise Timeout("Connection attach timed out")
289
290 - def _unlinked(self):
291 return [l 292 for ssn in self.sessions.values() 293 if not (ssn.error or ssn.closed) 294 for l in ssn.senders + ssn.receivers 295 if not (l.linked or l.error or l.closed)]
296 297 @synchronized
298 - def detach(self, timeout=None):
299 """ 300 Detach from the remote endpoint. 301 """ 302 if self._connected: 303 self._connected = False 304 self._wakeup() 305 cleanup = True 306 else: 307 cleanup = False 308 try: 309 if not self._wait(lambda: not self._transport_connected, timeout=timeout): 310 raise Timeout("detach timed out") 311 finally: 312 if cleanup: 313 self._driver.stop() 314 self._condition.gc()
315 316 @synchronized
317 - def attached(self):
318 """ 319 Return true if the connection is attached, false otherwise. 320 """ 321 return self._connected
322 323 @synchronized
324 - def close(self, timeout=None):
325 """ 326 Close the connection and all sessions. 327 """ 328 try: 329 for ssn in self.sessions.values(): 330 ssn.close(timeout=timeout) 331 finally: 332 self.detach(timeout=timeout) 333 self._open = False
334
335 -class Session(Endpoint):
336 337 """ 338 Sessions provide a linear context for sending and receiving 339 L{Messages<Message>}. L{Messages<Message>} are sent and received 340 using the L{Sender.send} and L{Receiver.fetch} methods of the 341 L{Sender} and L{Receiver} objects associated with a Session. 342 343 Each L{Sender} and L{Receiver} is created by supplying either a 344 target or source address to the L{sender} and L{receiver} methods of 345 the Session. The address is supplied via a string syntax documented 346 below. 347 348 Addresses 349 ========= 350 351 An address identifies a source or target for messages. In its 352 simplest form this is just a name. In general a target address may 353 also be used as a source address, however not all source addresses 354 may be used as a target, e.g. a source might additionally have some 355 filtering criteria that would not be present in a target. 356 357 A subject may optionally be specified along with the name. When an 358 address is used as a target, any subject specified in the address is 359 used as the default subject of outgoing messages for that target. 360 When an address is used as a source, any subject specified in the 361 address is pattern matched against the subject of available messages 362 as a filter for incoming messages from that source. 363 364 The options map contains additional information about the address 365 including: 366 367 - policies for automatically creating, and deleting the node to 368 which an address refers 369 370 - policies for asserting facts about the node to which an address 371 refers 372 373 - extension points that can be used for sender/receiver 374 configuration 375 376 Mapping to AMQP 0-10 377 -------------------- 378 The name is resolved to either an exchange or a queue by querying 379 the broker. 380 381 The subject is set as a property on the message. Additionally, if 382 the name refers to an exchange, the routing key is set to the 383 subject. 384 385 Syntax 386 ------ 387 The following regular expressions define the tokens used to parse 388 addresses:: 389 LBRACE: \\{ 390 RBRACE: \\} 391 LBRACK: \\[ 392 RBRACK: \\] 393 COLON: : 394 SEMI: ; 395 SLASH: / 396 COMMA: , 397 NUMBER: [+-]?[0-9]*\\.?[0-9]+ 398 ID: [a-zA-Z_](?:[a-zA-Z0-9_-]*[a-zA-Z0-9_])? 399 STRING: "(?:[^\\\\"]|\\\\.)*"|\'(?:[^\\\\\']|\\\\.)*\' 400 ESC: \\\\[^ux]|\\\\x[0-9a-fA-F][0-9a-fA-F]|\\\\u[0-9a-fA-F][0-9a-fA-F][0-9a-fA-F][0-9a-fA-F] 401 SYM: [.#*%@$^!+-] 402 WSPACE: [ \\n\\r\\t]+ 403 404 The formal grammar for addresses is given below:: 405 address = name [ "/" subject ] [ ";" options ] 406 name = ( part | quoted )+ 407 subject = ( part | quoted | "/" )* 408 quoted = STRING / ESC 409 part = LBRACE / RBRACE / COLON / COMMA / NUMBER / ID / SYM 410 options = map 411 map = "{" ( keyval ( "," keyval )* )? "}" 412 keyval = ID ":" value 413 value = NUMBER / STRING / ID / map / list 414 list = "[" ( value ( "," value )* )? "]" 415 416 This grammar resuls in the following informal syntax:: 417 418 <name> [ / <subject> ] [ ; <options> ] 419 420 Where options is:: 421 422 { <key> : <value>, ... } 423 424 And values may be: 425 - numbers 426 - single, double, or non quoted strings 427 - maps (dictionaries) 428 - lists 429 430 Options 431 ------- 432 The options map permits the following parameters:: 433 434 <name> [ / <subject> ] ; { 435 create: always | sender | receiver | never, 436 delete: always | sender | receiver | never, 437 assert: always | sender | receiver | never, 438 mode: browse | consume, 439 node: { 440 type: queue | topic, 441 durable: True | False, 442 x-declare: { ... <declare-overrides> ... }, 443 x-bindings: [<binding_1>, ... <binding_n>] 444 }, 445 link: { 446 name: <link-name>, 447 durable: True | False, 448 reliability: unreliable | at-most-once | at-least-once | exactly-once, 449 x-declare: { ... <declare-overrides> ... }, 450 x-bindings: [<binding_1>, ... <binding_n>], 451 x-subscribe: { ... <subscribe-overrides> ... } 452 } 453 } 454 455 Bindings are specified as a map with the following options:: 456 457 { 458 exchange: <exchange>, 459 queue: <queue>, 460 key: <key>, 461 arguments: <arguments> 462 } 463 464 The create, delete, and assert policies specify who should perfom 465 the associated action: 466 467 - I{always}: the action will always be performed 468 - I{sender}: the action will only be performed by the sender 469 - I{receiver}: the action will only be performed by the receiver 470 - I{never}: the action will never be performed (this is the default) 471 472 The node-type is one of: 473 474 - I{topic}: a topic node will default to the topic exchange, 475 x-declare may be used to specify other exchange types 476 - I{queue}: this is the default node-type 477 478 The x-declare map permits protocol specific keys and values to be 479 specified when exchanges or queues are declared. These keys and 480 values are passed through when creating a node or asserting facts 481 about an existing node. 482 483 Examples 484 -------- 485 A simple name resolves to any named node, usually a queue or a 486 topic:: 487 488 my-queue-or-topic 489 490 A simple name with a subject will also resolve to a node, but the 491 presence of the subject will cause a sender using this address to 492 set the subject on outgoing messages, and receivers to filter based 493 on the subject:: 494 495 my-queue-or-topic/my-subject 496 497 A subject pattern can be used and will cause filtering if used by 498 the receiver. If used for a sender, the literal value gets set as 499 the subject:: 500 501 my-queue-or-topic/my-* 502 503 In all the above cases, the address is resolved to an existing node. 504 If you want the node to be auto-created, then you can do the 505 following. By default nonexistent nodes are assumed to be queues:: 506 507 my-queue; {create: always} 508 509 You can customize the properties of the queue:: 510 511 my-queue; {create: always, node: {durable: True}} 512 513 You can create a topic instead if you want:: 514 515 my-queue; {create: always, node: {type: topic}} 516 517 You can assert that the address resolves to a node with particular 518 properties:: 519 520 my-transient-topic; { 521 assert: always, 522 node: { 523 type: topic, 524 durable: False 525 } 526 } 527 """ 528
529 - def __init__(self, connection, name, transactional):
530 self.connection = connection 531 self.name = name 532 self.log_id = "%x" % id(self) 533 534 self.transactional = transactional 535 536 self.committing = False 537 self.committed = True 538 self.aborting = False 539 self.aborted = False 540 541 self.next_sender_id = 0 542 self.senders = [] 543 self.next_receiver_id = 0 544 self.receivers = [] 545 self.outgoing = [] 546 self.incoming = [] 547 self.unacked = [] 548 self.acked = [] 549 # XXX: I hate this name. 550 self.ack_capacity = UNLIMITED 551 552 self.error = None 553 self.closing = False 554 self.closed = False 555 556 self._lock = connection._lock
557
558 - def __repr__(self):
559 return "<Session %s>" % self.name
560
561 - def _wait(self, predicate, timeout=None):
562 return self.connection._wait(predicate, timeout=timeout)
563
564 - def _wakeup(self):
565 self.connection._wakeup()
566
567 - def check_error(self):
568 self.connection.check_error() 569 if self.error: 570 raise self.error
571
572 - def get_error(self):
573 err = self.connection.get_error() 574 if err: 575 return err 576 else: 577 return self.error
578
579 - def _ewait(self, predicate, timeout=None):
580 result = self.connection._ewait(lambda: self.error or predicate(), timeout) 581 self.check_error() 582 return result
583
584 - def check_closed(self):
585 if self.closed: 586 raise SessionClosed()
587 588 @synchronized
589 - def sender(self, target, **options):
590 """ 591 Creates a L{Sender} that may be used to send L{Messages<Message>} 592 to the specified target. 593 594 @type target: str 595 @param target: the target to which messages will be sent 596 @rtype: Sender 597 @return: a new Sender for the specified target 598 """ 599 target = _mangle(target) 600 sender = Sender(self, self.next_sender_id, target, options) 601 self.next_sender_id += 1 602 self.senders.append(sender) 603 if not self.closed and self.connection._connected: 604 self._wakeup() 605 try: 606 sender._ewait(lambda: sender.linked) 607 except LinkError, e: 608 sender.close() 609 raise e 610 return sender
611 612 @synchronized
613 - def receiver(self, source, **options):
614 """ 615 Creates a receiver that may be used to fetch L{Messages<Message>} 616 from the specified source. 617 618 @type source: str 619 @param source: the source of L{Messages<Message>} 620 @rtype: Receiver 621 @return: a new Receiver for the specified source 622 """ 623 source = _mangle(source) 624 receiver = Receiver(self, self.next_receiver_id, source, options) 625 self.next_receiver_id += 1 626 self.receivers.append(receiver) 627 if not self.closed and self.connection._connected: 628 self._wakeup() 629 try: 630 receiver._ewait(lambda: receiver.linked) 631 except LinkError, e: 632 receiver.close() 633 raise e 634 return receiver
635 636 @synchronized
637 - def _count(self, predicate):
638 result = 0 639 for msg in self.incoming: 640 if predicate(msg): 641 result += 1 642 return result
643
644 - def _peek(self, receiver):
645 for msg in self.incoming: 646 if msg._receiver == receiver: 647 return msg
648
649 - def _pop(self, receiver):
650 i = 0 651 while i < len(self.incoming): 652 msg = self.incoming[i] 653 if msg._receiver == receiver: 654 del self.incoming[i] 655 return msg 656 else: 657 i += 1
658 659 @synchronized
660 - def _get(self, receiver, timeout=None):
661 if self._ewait(lambda: ((self._peek(receiver) is not None) or 662 self.closing or receiver.closed), 663 timeout): 664 msg = self._pop(receiver) 665 if msg is not None: 666 msg._receiver.returned += 1 667 self.unacked.append(msg) 668 log.debug("RETR[%s]: %s", self.log_id, msg) 669 return msg 670 return None
671 672 @synchronized
673 - def next_receiver(self, timeout=None):
674 if self._ecwait(lambda: self.incoming, timeout): 675 return self.incoming[0]._receiver 676 else: 677 raise Empty
678 679 @synchronized
680 - def acknowledge(self, message=None, disposition=None, sync=True):
681 """ 682 Acknowledge the given L{Message}. If message is None, then all 683 unacknowledged messages on the session are acknowledged. 684 685 @type message: Message 686 @param message: the message to acknowledge or None 687 @type sync: boolean 688 @param sync: if true then block until the message(s) are acknowledged 689 """ 690 if message is None: 691 messages = self.unacked[:] 692 else: 693 messages = [message] 694 695 for m in messages: 696 if self.ack_capacity is not UNLIMITED: 697 if self.ack_capacity <= 0: 698 # XXX: this is currently a SendError, maybe it should be a SessionError? 699 raise InsufficientCapacity("ack_capacity = %s" % self.ack_capacity) 700 self._wakeup() 701 self._ecwait(lambda: len(self.acked) < self.ack_capacity) 702 m._disposition = disposition 703 self.unacked.remove(m) 704 self.acked.append(m) 705 706 self._wakeup() 707 if sync: 708 self._ecwait(lambda: not [m for m in messages if m in self.acked])
709 710 @synchronized
711 - def commit(self, timeout=None):
712 """ 713 Commit outstanding transactional work. This consists of all 714 message sends and receives since the prior commit or rollback. 715 """ 716 if not self.transactional: 717 raise NontransactionalSession() 718 self.committing = True 719 self._wakeup() 720 self._ecwait(lambda: not self.committing, timeout=timeout) 721 if self.aborted: 722 raise TransactionAborted() 723 assert self.committed
724 725 @synchronized
726 - def rollback(self):
727 """ 728 Rollback outstanding transactional work. This consists of all 729 message sends and receives since the prior commit or rollback. 730 """ 731 if not self.transactional: 732 raise NontransactionalSession() 733 self.aborting = True 734 self._wakeup() 735 self._ecwait(lambda: not self.aborting) 736 assert self.aborted
737 738 @synchronized
739 - def sync(self, timeout=None):
740 """ 741 Sync the session. 742 """ 743 for snd in self.senders: 744 snd.sync(timeout=timeout) 745 if not self._ewait(lambda: not self.outgoing and not self.acked, timeout=timeout): 746 raise Timeout("session sync timed out")
747 748 @synchronized
749 - def close(self, timeout=None):
750 """ 751 Close the session. 752 """ 753 if self.error: return 754 self.sync(timeout=timeout) 755 756 for link in self.receivers + self.senders: 757 link.close(timeout=timeout) 758 759 if not self.closing: 760 self.closing = True 761 self._wakeup() 762 763 try: 764 if not self._ewait(lambda: self.closed, timeout=timeout): 765 raise Timeout("session close timed out") 766 finally: 767 self.connection._remove_session(self)
768
769 -def _mangle(addr):
770 if addr and addr.startswith("#"): 771 return str(uuid4()) + addr 772 else: 773 return addr
774
775 -class Sender(Endpoint):
776 777 """ 778 Sends outgoing messages. 779 """ 780
781 - def __init__(self, session, id, target, options):
782 self.session = session 783 self.id = id 784 self.target = target 785 self.options = options 786 self.capacity = options.get("capacity", UNLIMITED) 787 self.threshold = 0.5 788 self.durable = options.get("durable") 789 self.queued = Serial(0) 790 self.synced = Serial(0) 791 self.acked = Serial(0) 792 self.error = None 793 self.linked = False 794 self.closing = False 795 self.closed = False 796 self._lock = self.session._lock
797
798 - def _wakeup(self):
799 self.session._wakeup()
800
801 - def check_error(self):
802 self.session.check_error() 803 if self.error: 804 raise self.error
805
806 - def get_error(self):
807 err = self.session.get_error() 808 if err: 809 return err 810 else: 811 return self.error
812
813 - def _ewait(self, predicate, timeout=None):
814 result = self.session._ewait(lambda: self.error or predicate(), timeout) 815 self.check_error() 816 return result
817
818 - def check_closed(self):
819 if self.closed: 820 raise LinkClosed()
821 822 @synchronized
823 - def unsettled(self):
824 """ 825 Returns the number of messages awaiting acknowledgment. 826 @rtype: int 827 @return: the number of unacknowledged messages 828 """ 829 return self.queued - self.acked
830 831 @synchronized
832 - def available(self):
833 if self.capacity is UNLIMITED: 834 return UNLIMITED 835 else: 836 return self.capacity - self.unsettled()
837 838 @synchronized
839 - def send(self, object, sync=True, timeout=None):
840 """ 841 Send a message. If the object passed in is of type L{unicode}, 842 L{str}, L{list}, or L{dict}, it will automatically be wrapped in a 843 L{Message} and sent. If it is of type L{Message}, it will be sent 844 directly. If the sender capacity is not L{UNLIMITED} then send 845 will block until there is available capacity to send the message. 846 If the timeout parameter is specified, then send will throw an 847 L{InsufficientCapacity} exception if capacity does not become 848 available within the specified time. 849 850 @type object: unicode, str, list, dict, Message 851 @param object: the message or content to send 852 853 @type sync: boolean 854 @param sync: if true then block until the message is sent 855 856 @type timeout: float 857 @param timeout: the time to wait for available capacity 858 """ 859 860 if not self.session.connection._connected or self.session.closing: 861 raise Detached() 862 863 self._ecwait(lambda: self.linked, timeout=timeout) 864 865 if isinstance(object, Message): 866 message = object 867 else: 868 message = Message(object) 869 870 if message.durable is None: 871 message.durable = self.durable 872 873 if self.capacity is not UNLIMITED: 874 if self.capacity <= 0: 875 raise InsufficientCapacity("capacity = %s" % self.capacity) 876 if not self._ecwait(self.available, timeout=timeout): 877 raise InsufficientCapacity("capacity = %s" % self.capacity) 878 879 # XXX: what if we send the same message to multiple senders? 880 message._sender = self 881 if self.capacity is not UNLIMITED: 882 message._sync = sync or self.available() <= int(ceil(self.threshold*self.capacity)) 883 else: 884 message._sync = sync 885 self.session.outgoing.append(message) 886 self.queued += 1 887 888 if sync: 889 self.sync(timeout=timeout) 890 assert message not in self.session.outgoing 891 else: 892 self._wakeup()
893 894 @synchronized
895 - def sync(self, timeout=None):
896 mno = self.queued 897 if self.synced < mno: 898 self.synced = mno 899 self._wakeup() 900 try: 901 if not self._ewait(lambda: self.acked >= mno, timeout=timeout): 902 raise Timeout("sender sync timed out") 903 except ContentError: 904 # clean bad message so we can continue 905 self.acked = mno 906 self.session.outgoing.pop(0) 907 raise
908 909 @synchronized
910 - def close(self, timeout=None):
911 """ 912 Close the Sender. 913 """ 914 # avoid erroring out when closing a sender that was never 915 # established 916 if self.acked < self.queued: 917 self.sync(timeout=timeout) 918 919 if not self.closing: 920 self.closing = True 921 self._wakeup() 922 923 try: 924 if not self.session._ewait(lambda: self.closed, timeout=timeout): 925 raise Timeout("sender close timed out") 926 finally: 927 try: 928 self.session.senders.remove(self) 929 except ValueError: 930 pass
931
932 -class Receiver(Endpoint, object):
933 934 """ 935 Receives incoming messages from a remote source. Messages may be 936 fetched with L{fetch}. 937 """ 938
939 - def __init__(self, session, id, source, options):
940 self.session = session 941 self.id = id 942 self.source = source 943 self.options = options 944 945 self.granted = Serial(0) 946 self.draining = False 947 self.impending = Serial(0) 948 self.received = Serial(0) 949 self.returned = Serial(0) 950 951 self.error = None 952 self.linked = False 953 self.closing = False 954 self.closed = False 955 self._lock = self.session._lock 956 self._capacity = 0 957 self._set_capacity(options.get("capacity", 0), False) 958 self.threshold = 0.5
959 960 @synchronized
961 - def _set_capacity(self, c, wakeup=True):
962 if c is UNLIMITED: 963 self._capacity = c.value 964 else: 965 self._capacity = c 966 self._grant() 967 if wakeup: 968 self._wakeup()
969
970 - def _get_capacity(self):
971 if self._capacity == UNLIMITED.value: 972 return UNLIMITED 973 else: 974 return self._capacity
975 976 capacity = property(_get_capacity, _set_capacity) 977
978 - def _wakeup(self):
979 self.session._wakeup()
980
981 - def check_error(self):
982 self.session.check_error() 983 if self.error: 984 raise self.error
985
986 - def get_error(self):
987 err = self.session.get_error() 988 if err: 989 return err 990 else: 991 return self.error
992
993 - def _ewait(self, predicate, timeout=None):
994 result = self.session._ewait(lambda: self.error or predicate(), timeout) 995 self.check_error() 996 return result
997
998 - def check_closed(self):
999 if self.closed: 1000 raise LinkClosed()
1001 1002 @synchronized
1003 - def unsettled(self):
1004 """ 1005 Returns the number of acknowledged messages awaiting confirmation. 1006 """ 1007 return len([m for m in self.session.acked if m._receiver is self])
1008 1009 @synchronized
1010 - def available(self):
1011 """ 1012 Returns the number of messages available to be fetched by the 1013 application. 1014 1015 @rtype: int 1016 @return: the number of available messages 1017 """ 1018 return self.received - self.returned
1019 1020 @synchronized
1021 - def fetch(self, timeout=None):
1022 """ 1023 Fetch and return a single message. A timeout of None will block 1024 forever waiting for a message to arrive, a timeout of zero will 1025 return immediately if no messages are available. 1026 1027 @type timeout: float 1028 @param timeout: the time to wait for a message to be available 1029 """ 1030 1031 self._ecwait(lambda: self.linked) 1032 1033 if self._capacity == 0: 1034 self.granted = self.returned + 1 1035 self._wakeup() 1036 self._ecwait(lambda: self.impending >= self.granted) 1037 msg = self.session._get(self, timeout=timeout) 1038 if msg is None: 1039 self.check_closed() 1040 self.draining = True 1041 self._wakeup() 1042 self._ecwait(lambda: not self.draining) 1043 msg = self.session._get(self, timeout=0) 1044 self._grant() 1045 self._wakeup() 1046 if msg is None: 1047 raise Empty() 1048 elif self._capacity not in (0, UNLIMITED.value): 1049 t = int(ceil(self.threshold * self._capacity)) 1050 if self.received - self.returned <= t: 1051 self.granted = self.returned + self._capacity 1052 self._wakeup() 1053 return msg
1054
1055 - def _grant(self):
1056 if self._capacity == UNLIMITED.value: 1057 self.granted = UNLIMITED 1058 else: 1059 self.granted = self.returned + self._capacity
1060 1061 @synchronized
1062 - def close(self, timeout=None):
1063 """ 1064 Close the receiver. 1065 """ 1066 if not self.closing: 1067 self.closing = True 1068 self._wakeup() 1069 1070 try: 1071 if not self.session._ewait(lambda: self.closed, timeout=timeout): 1072 raise Timeout("receiver close timed out") 1073 finally: 1074 try: 1075 self.session.receivers.remove(self) 1076 except ValueError: 1077 pass
1078 1079 __all__ = ["Connection", "Session", "Sender", "Receiver"] 1080