Package proton
[frames] | no frames]

Source Code for Package proton

   1  # 
   2  # Licensed to the Apache Software Foundation (ASF) under one 
   3  # or more contributor license agreements.  See the NOTICE file 
   4  # distributed with this work for additional information 
   5  # regarding copyright ownership.  The ASF licenses this file 
   6  # to you under the Apache License, Version 2.0 (the 
   7  # "License"); you may not use this file except in compliance 
   8  # with the License.  You may obtain a copy of the License at 
   9  # 
  10  #   http://www.apache.org/licenses/LICENSE-2.0 
  11  # 
  12  # Unless required by applicable law or agreed to in writing, 
  13  # software distributed under the License is distributed on an 
  14  # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
  15  # KIND, either express or implied.  See the License for the 
  16  # specific language governing permissions and limitations 
  17  # under the License. 
  18  # 
  19   
  20  """ 
  21  The proton module defines a suite of APIs that implement the AMQP 1.0 
  22  protocol. 
  23   
  24  The proton APIs consist of the following classes: 
  25   
  26   - L{Messenger} -- A messaging endpoint. 
  27   - L{Message}   -- A class for creating and/or accessing AMQP message content. 
  28   - L{Data}      -- A class for creating and/or accessing arbitrary AMQP encoded 
  29                    data. 
  30   
  31  """ 
  32  from __future__ import absolute_import 
  33   
  34  from cproton import * 
  35  from .wrapper import Wrapper 
  36  from proton import _compat 
  37   
  38  import logging, weakref, socket, sys, threading 
  39   
  40  try: 
  41    handler = logging.NullHandler() 
  42  except AttributeError: 
43 - class NullHandler(logging.Handler):
44 - def handle(self, record):
45 pass
46
47 - def emit(self, record):
48 pass
49
50 - def createLock(self):
51 self.lock = None
52 53 handler = NullHandler() 54 55 log = logging.getLogger("proton") 56 log.addHandler(handler) 57 58 try: 59 import uuid
60 61 - def generate_uuid():
62 return uuid.uuid4()
63 64 except ImportError: 65 """ 66 No 'native' UUID support. Provide a very basic UUID type that is a compatible subset of the uuid type provided by more modern python releases. 67 """ 68 import struct
69 - class uuid:
70 - class UUID:
71 - def __init__(self, hex=None, bytes=None):
72 if [hex, bytes].count(None) != 1: 73 raise TypeError("need one of hex or bytes") 74 if bytes is not None: 75 self.bytes = bytes 76 elif hex is not None: 77 fields=hex.split("-") 78 fields[4:5] = [fields[4][:4], fields[4][4:]] 79 self.bytes = struct.pack("!LHHHHL", *[int(x,16) for x in fields])
80
81 - def __cmp__(self, other):
82 if isinstance(other, uuid.UUID): 83 return cmp(self.bytes, other.bytes) 84 else: 85 return -1
86
87 - def __str__(self):
88 return "%08x-%04x-%04x-%04x-%04x%08x" % struct.unpack("!LHHHHL", self.bytes)
89
90 - def __repr__(self):
91 return "UUID(%r)" % str(self)
92
93 - def __hash__(self):
94 return self.bytes.__hash__()
95 96 import os, random, time 97 rand = random.Random() 98 rand.seed((os.getpid(), time.time(), socket.gethostname()))
99 - def random_uuid():
100 data = [rand.randint(0, 255) for i in xrange(16)] 101 102 # From RFC4122, the version bits are set to 0100 103 data[6] &= 0x0F 104 data[6] |= 0x40 105 106 # From RFC4122, the top two bits of byte 8 get set to 01 107 data[8] &= 0x3F 108 data[8] |= 0x80 109 return "".join(map(chr, data))
110
111 - def uuid4():
112 return uuid.UUID(bytes=random_uuid())
113
114 - def generate_uuid():
115 return uuid4()
116 117 # 118 # Hacks to provide Python2 <---> Python3 compatibility 119 # 120 try: 121 bytes() 122 except NameError: 123 bytes = str 124 try: 125 long() 126 except NameError: 127 long = int 128 try: 129 unicode() 130 except NameError: 131 unicode = str 132 133 134 VERSION_MAJOR = PN_VERSION_MAJOR 135 VERSION_MINOR = PN_VERSION_MINOR 136 VERSION_POINT = PN_VERSION_POINT 137 VERSION = (VERSION_MAJOR, VERSION_MINOR, VERSION_POINT) 138 API_LANGUAGE = "C" 139 IMPLEMENTATION_LANGUAGE = "C"
140 141 -class Constant(object):
142
143 - def __init__(self, name):
144 self.name = name
145
146 - def __repr__(self):
147 return self.name
148
149 -class ProtonException(Exception):
150 """ 151 The root of the proton exception hierarchy. All proton exception 152 classes derive from this exception. 153 """ 154 pass
155
156 -class Timeout(ProtonException):
157 """ 158 A timeout exception indicates that a blocking operation has timed 159 out. 160 """ 161 pass
162
163 -class Interrupt(ProtonException):
164 """ 165 An interrupt exception indicaes that a blocking operation was interrupted. 166 """ 167 pass
168
169 -class MessengerException(ProtonException):
170 """ 171 The root of the messenger exception hierarchy. All exceptions 172 generated by the messenger class derive from this exception. 173 """ 174 pass
175
176 -class MessageException(ProtonException):
177 """ 178 The MessageException class is the root of the message exception 179 hierarhcy. All exceptions generated by the Message class derive from 180 this exception. 181 """ 182 pass
183 184 EXCEPTIONS = { 185 PN_TIMEOUT: Timeout, 186 PN_INTR: Interrupt 187 } 188 189 PENDING = Constant("PENDING") 190 ACCEPTED = Constant("ACCEPTED") 191 REJECTED = Constant("REJECTED") 192 RELEASED = Constant("RELEASED") 193 MODIFIED = Constant("MODIFIED") 194 ABORTED = Constant("ABORTED") 195 SETTLED = Constant("SETTLED") 196 197 STATUSES = { 198 PN_STATUS_ABORTED: ABORTED, 199 PN_STATUS_ACCEPTED: ACCEPTED, 200 PN_STATUS_REJECTED: REJECTED, 201 PN_STATUS_RELEASED: RELEASED, 202 PN_STATUS_MODIFIED: MODIFIED, 203 PN_STATUS_PENDING: PENDING, 204 PN_STATUS_SETTLED: SETTLED, 205 PN_STATUS_UNKNOWN: None 206 } 207 208 AUTOMATIC = Constant("AUTOMATIC") 209 MANUAL = Constant("MANUAL")
210 211 -class Messenger(object):
212 """ 213 The L{Messenger} class defines a high level interface for sending 214 and receiving L{Messages<Message>}. Every L{Messenger} contains a 215 single logical queue of incoming messages and a single logical queue 216 of outgoing messages. These messages in these queues may be destined 217 for, or originate from, a variety of addresses. 218 219 The messenger interface is single-threaded. All methods 220 except one (L{interrupt}) are intended to be used from within 221 the messenger thread. 222 223 224 Address Syntax 225 ============== 226 227 An address has the following form:: 228 229 [ amqp[s]:// ] [user[:password]@] domain [/[name]] 230 231 Where domain can be one of:: 232 233 host | host:port | ip | ip:port | name 234 235 The following are valid examples of addresses: 236 237 - example.org 238 - example.org:1234 239 - amqp://example.org 240 - amqps://example.org 241 - example.org/incoming 242 - amqps://example.org/outgoing 243 - amqps://fred:trustno1@example.org 244 - 127.0.0.1:1234 245 - amqps://127.0.0.1:1234 246 247 Sending & Receiving Messages 248 ============================ 249 250 The L{Messenger} class works in conjuction with the L{Message} class. The 251 L{Message} class is a mutable holder of message content. 252 253 The L{put} method copies its L{Message} to the outgoing queue, and may 254 send queued messages if it can do so without blocking. The L{send} 255 method blocks until it has sent the requested number of messages, 256 or until a timeout interrupts the attempt. 257 258 259 >>> message = Message() 260 >>> for i in range(3): 261 ... message.address = "amqp://host/queue" 262 ... message.subject = "Hello World %i" % i 263 ... messenger.put(message) 264 >>> messenger.send() 265 266 Similarly, the L{recv} method receives messages into the incoming 267 queue, and may block as it attempts to receive the requested number 268 of messages, or until timeout is reached. It may receive fewer 269 than the requested number. The L{get} method pops the 270 eldest L{Message} off the incoming queue and copies it into the L{Message} 271 object that you supply. It will not block. 272 273 274 >>> message = Message() 275 >>> messenger.recv(10): 276 >>> while messenger.incoming > 0: 277 ... messenger.get(message) 278 ... print message.subject 279 Hello World 0 280 Hello World 1 281 Hello World 2 282 283 The blocking flag allows you to turn off blocking behavior entirely, 284 in which case L{send} and L{recv} will do whatever they can without 285 blocking, and then return. You can then look at the number 286 of incoming and outgoing messages to see how much outstanding work 287 still remains. 288 """ 289
290 - def __init__(self, name=None):
291 """ 292 Construct a new L{Messenger} with the given name. The name has 293 global scope. If a NULL name is supplied, a UUID based name will 294 be chosen. 295 296 @type name: string 297 @param name: the name of the messenger or None 298 299 """ 300 self._mng = pn_messenger(name) 301 self._selectables = {}
302
303 - def __del__(self):
304 """ 305 Destroy the L{Messenger}. This will close all connections that 306 are managed by the L{Messenger}. Call the L{stop} method before 307 destroying the L{Messenger}. 308 """ 309 if hasattr(self, "_mng"): 310 pn_messenger_free(self._mng) 311 del self._mng
312
313 - def _check(self, err):
314 if err < 0: 315 if (err == PN_INPROGRESS): 316 return 317 exc = EXCEPTIONS.get(err, MessengerException) 318 raise exc("[%s]: %s" % (err, pn_error_text(pn_messenger_error(self._mng)))) 319 else: 320 return err
321 322 @property
323 - def name(self):
324 """ 325 The name of the L{Messenger}. 326 """ 327 return pn_messenger_name(self._mng)
328
329 - def _get_certificate(self):
330 return pn_messenger_get_certificate(self._mng)
331
332 - def _set_certificate(self, value):
333 self._check(pn_messenger_set_certificate(self._mng, value))
334 335 certificate = property(_get_certificate, _set_certificate, 336 doc=""" 337 Path to a certificate file for the L{Messenger}. This certificate is 338 used when the L{Messenger} accepts or establishes SSL/TLS connections. 339 This property must be specified for the L{Messenger} to accept 340 incoming SSL/TLS connections and to establish client authenticated 341 outgoing SSL/TLS connection. Non client authenticated outgoing SSL/TLS 342 connections do not require this property. 343 """) 344
345 - def _get_private_key(self):
346 return pn_messenger_get_private_key(self._mng)
347
348 - def _set_private_key(self, value):
349 self._check(pn_messenger_set_private_key(self._mng, value))
350 351 private_key = property(_get_private_key, _set_private_key, 352 doc=""" 353 Path to a private key file for the L{Messenger's<Messenger>} 354 certificate. This property must be specified for the L{Messenger} to 355 accept incoming SSL/TLS connections and to establish client 356 authenticated outgoing SSL/TLS connection. Non client authenticated 357 SSL/TLS connections do not require this property. 358 """) 359
360 - def _get_password(self):
361 return pn_messenger_get_password(self._mng)
362
363 - def _set_password(self, value):
364 self._check(pn_messenger_set_password(self._mng, value))
365 366 password = property(_get_password, _set_password, 367 doc=""" 368 This property contains the password for the L{Messenger.private_key} 369 file, or None if the file is not encrypted. 370 """) 371
372 - def _get_trusted_certificates(self):
373 return pn_messenger_get_trusted_certificates(self._mng)
374
375 - def _set_trusted_certificates(self, value):
376 self._check(pn_messenger_set_trusted_certificates(self._mng, value))
377 378 trusted_certificates = property(_get_trusted_certificates, 379 _set_trusted_certificates, 380 doc=""" 381 A path to a database of trusted certificates for use in verifying the 382 peer on an SSL/TLS connection. If this property is None, then the peer 383 will not be verified. 384 """) 385
386 - def _get_timeout(self):
387 t = pn_messenger_get_timeout(self._mng) 388 if t == -1: 389 return None 390 else: 391 return millis2secs(t)
392
393 - def _set_timeout(self, value):
394 if value is None: 395 t = -1 396 else: 397 t = secs2millis(value) 398 self._check(pn_messenger_set_timeout(self._mng, t))
399 400 timeout = property(_get_timeout, _set_timeout, 401 doc=""" 402 The timeout property contains the default timeout for blocking 403 operations performed by the L{Messenger}. 404 """) 405
406 - def _is_blocking(self):
407 return pn_messenger_is_blocking(self._mng)
408
409 - def _set_blocking(self, b):
410 self._check(pn_messenger_set_blocking(self._mng, b))
411 412 blocking = property(_is_blocking, _set_blocking, 413 doc=""" 414 Enable or disable blocking behavior during L{Message} sending 415 and receiving. This affects every blocking call, with the 416 exception of L{work}. Currently, the affected calls are 417 L{send}, L{recv}, and L{stop}. 418 """) 419
420 - def _is_passive(self):
421 return pn_messenger_is_passive(self._mng)
422
423 - def _set_passive(self, b):
424 self._check(pn_messenger_set_passive(self._mng, b))
425 426 passive = property(_is_passive, _set_passive, 427 doc=""" 428 When passive is set to true, Messenger will not attempt to perform I/O 429 internally. In this mode it is necessary to use the selectables API to 430 drive any I/O needed to perform requested actions. In this mode 431 Messenger will never block. 432 """) 433
434 - def _get_incoming_window(self):
435 return pn_messenger_get_incoming_window(self._mng)
436
437 - def _set_incoming_window(self, window):
438 self._check(pn_messenger_set_incoming_window(self._mng, window))
439 440 incoming_window = property(_get_incoming_window, _set_incoming_window, 441 doc=""" 442 The incoming tracking window for the messenger. The messenger will 443 track the remote status of this many incoming deliveries after they 444 have been accepted or rejected. Defaults to zero. 445 446 L{Messages<Message>} enter this window only when you take them into your application 447 using L{get}. If your incoming window size is I{n}, and you get I{n}+1 L{messages<Message>} 448 without explicitly accepting or rejecting the oldest message, then the 449 message that passes beyond the edge of the incoming window will be assigned 450 the default disposition of its link. 451 """) 452
453 - def _get_outgoing_window(self):
454 return pn_messenger_get_outgoing_window(self._mng)
455
456 - def _set_outgoing_window(self, window):
457 self._check(pn_messenger_set_outgoing_window(self._mng, window))
458 459 outgoing_window = property(_get_outgoing_window, _set_outgoing_window, 460 doc=""" 461 The outgoing tracking window for the messenger. The messenger will 462 track the remote status of this many outgoing deliveries after calling 463 send. Defaults to zero. 464 465 A L{Message} enters this window when you call the put() method with the 466 message. If your outgoing window size is I{n}, and you call L{put} I{n}+1 467 times, status information will no longer be available for the 468 first message. 469 """) 470
471 - def start(self):
472 """ 473 Currently a no-op placeholder. 474 For future compatibility, do not L{send} or L{recv} messages 475 before starting the L{Messenger}. 476 """ 477 self._check(pn_messenger_start(self._mng))
478
479 - def stop(self):
480 """ 481 Transitions the L{Messenger} to an inactive state. An inactive 482 L{Messenger} will not send or receive messages from its internal 483 queues. A L{Messenger} should be stopped before being discarded to 484 ensure a clean shutdown handshake occurs on any internally managed 485 connections. 486 """ 487 self._check(pn_messenger_stop(self._mng))
488 489 @property
490 - def stopped(self):
491 """ 492 Returns true iff a L{Messenger} is in the stopped state. 493 This function does not block. 494 """ 495 return pn_messenger_stopped(self._mng)
496
497 - def subscribe(self, source):
498 """ 499 Subscribes the L{Messenger} to messages originating from the 500 specified source. The source is an address as specified in the 501 L{Messenger} introduction with the following addition. If the 502 domain portion of the address begins with the '~' character, the 503 L{Messenger} will interpret the domain as host/port, bind to it, 504 and listen for incoming messages. For example "~0.0.0.0", 505 "amqp://~0.0.0.0", and "amqps://~0.0.0.0" will all bind to any 506 local interface and listen for incoming messages with the last 507 variant only permitting incoming SSL connections. 508 509 @type source: string 510 @param source: the source of messages to subscribe to 511 """ 512 sub_impl = pn_messenger_subscribe(self._mng, source) 513 if not sub_impl: 514 self._check(pn_error_code(pn_messenger_error(self._mng))) 515 raise MessengerException("Cannot subscribe to %s"%source) 516 return Subscription(sub_impl)
517
518 - def put(self, message):
519 """ 520 Places the content contained in the message onto the outgoing 521 queue of the L{Messenger}. This method will never block, however 522 it will send any unblocked L{Messages<Message>} in the outgoing 523 queue immediately and leave any blocked L{Messages<Message>} 524 remaining in the outgoing queue. The L{send} call may be used to 525 block until the outgoing queue is empty. The L{outgoing} property 526 may be used to check the depth of the outgoing queue. 527 528 When the content in a given L{Message} object is copied to the outgoing 529 message queue, you may then modify or discard the L{Message} object 530 without having any impact on the content in the outgoing queue. 531 532 This method returns an outgoing tracker for the L{Message}. The tracker 533 can be used to determine the delivery status of the L{Message}. 534 535 @type message: Message 536 @param message: the message to place in the outgoing queue 537 @return: a tracker 538 """ 539 message._pre_encode() 540 self._check(pn_messenger_put(self._mng, message._msg)) 541 return pn_messenger_outgoing_tracker(self._mng)
542
543 - def status(self, tracker):
544 """ 545 Gets the last known remote state of the delivery associated with 546 the given tracker. 547 548 @type tracker: tracker 549 @param tracker: the tracker whose status is to be retrieved 550 551 @return: one of None, PENDING, REJECTED, MODIFIED, or ACCEPTED 552 """ 553 disp = pn_messenger_status(self._mng, tracker); 554 return STATUSES.get(disp, disp)
555
556 - def buffered(self, tracker):
557 """ 558 Checks if the delivery associated with the given tracker is still 559 waiting to be sent. 560 561 @type tracker: tracker 562 @param tracker: the tracker whose status is to be retrieved 563 564 @return: true if delivery is still buffered 565 """ 566 return pn_messenger_buffered(self._mng, tracker);
567
568 - def settle(self, tracker=None):
569 """ 570 Frees a L{Messenger} from tracking the status associated with a given 571 tracker. If you don't supply a tracker, all outgoing L{messages<Message>} up 572 to the most recent will be settled. 573 """ 574 if tracker is None: 575 tracker = pn_messenger_outgoing_tracker(self._mng) 576 flags = PN_CUMULATIVE 577 else: 578 flags = 0 579 self._check(pn_messenger_settle(self._mng, tracker, flags))
580
581 - def send(self, n=-1):
582 """ 583 This call will block until the indicated number of L{messages<Message>} 584 have been sent, or until the operation times out. If n is -1 this call will 585 block until all outgoing L{messages<Message>} have been sent. If n is 0 then 586 this call will send whatever it can without blocking. 587 """ 588 self._check(pn_messenger_send(self._mng, n))
589
590 - def recv(self, n=None):
591 """ 592 Receives up to I{n} L{messages<Message>} into the incoming queue. If no value 593 for I{n} is supplied, this call will receive as many L{messages<Message>} as it 594 can buffer internally. If the L{Messenger} is in blocking mode, this 595 call will block until at least one L{Message} is available in the 596 incoming queue. 597 """ 598 if n is None: 599 n = -1 600 self._check(pn_messenger_recv(self._mng, n))
601
602 - def work(self, timeout=None):
603 """ 604 Sends or receives any outstanding L{messages<Message>} queued for a L{Messenger}. 605 This will block for the indicated timeout. 606 This method may also do I/O work other than sending and receiving 607 L{messages<Message>}. For example, closing connections after messenger.L{stop}() 608 has been called. 609 """ 610 if timeout is None: 611 t = -1 612 else: 613 t = secs2millis(timeout) 614 err = pn_messenger_work(self._mng, t) 615 if (err == PN_TIMEOUT): 616 return False 617 else: 618 self._check(err) 619 return True
620 621 @property
622 - def receiving(self):
623 return pn_messenger_receiving(self._mng)
624
625 - def interrupt(self):
626 """ 627 The L{Messenger} interface is single-threaded. 628 This is the only L{Messenger} function intended to be called 629 from outside of the L{Messenger} thread. 630 Call this from a non-messenger thread to interrupt 631 a L{Messenger} that is blocking. 632 This will cause any in-progress blocking call to throw 633 the L{Interrupt} exception. If there is no currently blocking 634 call, then the next blocking call will be affected, even if it 635 is within the same thread that interrupt was called from. 636 """ 637 self._check(pn_messenger_interrupt(self._mng))
638
639 - def get(self, message=None):
640 """ 641 Moves the message from the head of the incoming message queue into 642 the supplied message object. Any content in the message will be 643 overwritten. 644 645 A tracker for the incoming L{Message} is returned. The tracker can 646 later be used to communicate your acceptance or rejection of the 647 L{Message}. 648 649 If None is passed in for the L{Message} object, the L{Message} 650 popped from the head of the queue is discarded. 651 652 @type message: Message 653 @param message: the destination message object 654 @return: a tracker 655 """ 656 if message is None: 657 impl = None 658 else: 659 impl = message._msg 660 self._check(pn_messenger_get(self._mng, impl)) 661 if message is not None: 662 message._post_decode() 663 return pn_messenger_incoming_tracker(self._mng)
664
665 - def accept(self, tracker=None):
666 """ 667 Signal the sender that you have acted on the L{Message} 668 pointed to by the tracker. If no tracker is supplied, 669 then all messages that have been returned by the L{get} 670 method are accepted, except those that have already been 671 auto-settled by passing beyond your incoming window size. 672 673 @type tracker: tracker 674 @param tracker: a tracker as returned by get 675 """ 676 if tracker is None: 677 tracker = pn_messenger_incoming_tracker(self._mng) 678 flags = PN_CUMULATIVE 679 else: 680 flags = 0 681 self._check(pn_messenger_accept(self._mng, tracker, flags))
682
683 - def reject(self, tracker=None):
684 """ 685 Rejects the L{Message} indicated by the tracker. If no tracker 686 is supplied, all messages that have been returned by the L{get} 687 method are rejected, except those that have already been auto-settled 688 by passing beyond your outgoing window size. 689 690 @type tracker: tracker 691 @param tracker: a tracker as returned by get 692 """ 693 if tracker is None: 694 tracker = pn_messenger_incoming_tracker(self._mng) 695 flags = PN_CUMULATIVE 696 else: 697 flags = 0 698 self._check(pn_messenger_reject(self._mng, tracker, flags))
699 700 @property
701 - def outgoing(self):
702 """ 703 The outgoing queue depth. 704 """ 705 return pn_messenger_outgoing(self._mng)
706 707 @property
708 - def incoming(self):
709 """ 710 The incoming queue depth. 711 """ 712 return pn_messenger_incoming(self._mng)
713
714 - def route(self, pattern, address):
715 """ 716 Adds a routing rule to a L{Messenger's<Messenger>} internal routing table. 717 718 The route procedure may be used to influence how a L{Messenger} will 719 internally treat a given address or class of addresses. Every call 720 to the route procedure will result in L{Messenger} appending a routing 721 rule to its internal routing table. 722 723 Whenever a L{Message} is presented to a L{Messenger} for delivery, it 724 will match the address of this message against the set of routing 725 rules in order. The first rule to match will be triggered, and 726 instead of routing based on the address presented in the message, 727 the L{Messenger} will route based on the address supplied in the rule. 728 729 The pattern matching syntax supports two types of matches, a '%' 730 will match any character except a '/', and a '*' will match any 731 character including a '/'. 732 733 A routing address is specified as a normal AMQP address, however it 734 may additionally use substitution variables from the pattern match 735 that triggered the rule. 736 737 Any message sent to "foo" will be routed to "amqp://foo.com": 738 739 >>> messenger.route("foo", "amqp://foo.com"); 740 741 Any message sent to "foobar" will be routed to 742 "amqp://foo.com/bar": 743 744 >>> messenger.route("foobar", "amqp://foo.com/bar"); 745 746 Any message sent to bar/<path> will be routed to the corresponding 747 path within the amqp://bar.com domain: 748 749 >>> messenger.route("bar/*", "amqp://bar.com/$1"); 750 751 Route all L{messages<Message>} over TLS: 752 753 >>> messenger.route("amqp:*", "amqps:$1") 754 755 Supply credentials for foo.com: 756 757 >>> messenger.route("amqp://foo.com/*", "amqp://user:password@foo.com/$1"); 758 759 Supply credentials for all domains: 760 761 >>> messenger.route("amqp://*", "amqp://user:password@$1"); 762 763 Route all addresses through a single proxy while preserving the 764 original destination: 765 766 >>> messenger.route("amqp://%/*", "amqp://user:password@proxy/$1/$2"); 767 768 Route any address through a single broker: 769 770 >>> messenger.route("*", "amqp://user:password@broker/$1"); 771 """ 772 self._check(pn_messenger_route(self._mng, unicode2utf8(pattern), unicode2utf8(address)))
773
774 - def rewrite(self, pattern, address):
775 """ 776 Similar to route(), except that the destination of 777 the L{Message} is determined before the message address is rewritten. 778 779 The outgoing address is only rewritten after routing has been 780 finalized. If a message has an outgoing address of 781 "amqp://0.0.0.0:5678", and a rewriting rule that changes its 782 outgoing address to "foo", it will still arrive at the peer that 783 is listening on "amqp://0.0.0.0:5678", but when it arrives there, 784 the receiver will see its outgoing address as "foo". 785 786 The default rewrite rule removes username and password from addresses 787 before they are transmitted. 788 """ 789 self._check(pn_messenger_rewrite(self._mng, unicode2utf8(pattern), unicode2utf8(address)))
790
791 - def selectable(self):
792 return Selectable.wrap(pn_messenger_selectable(self._mng))
793 794 @property
795 - def deadline(self):
796 tstamp = pn_messenger_deadline(self._mng) 797 if tstamp: 798 return millis2secs(tstamp) 799 else: 800 return None
801
802 -class Message(object):
803 """The L{Message} class is a mutable holder of message content. 804 805 @ivar instructions: delivery instructions for the message 806 @type instructions: dict 807 @ivar annotations: infrastructure defined message annotations 808 @type annotations: dict 809 @ivar properties: application defined message properties 810 @type properties: dict 811 @ivar body: message body 812 @type body: bytes | unicode | dict | list | int | long | float | UUID 813 """ 814 815 DEFAULT_PRIORITY = PN_DEFAULT_PRIORITY 816
817 - def __init__(self, body=None, **kwargs):
818 """ 819 @param kwargs: Message property name/value pairs to initialise the Message 820 """ 821 self._msg = pn_message() 822 self._id = Data(pn_message_id(self._msg)) 823 self._correlation_id = Data(pn_message_correlation_id(self._msg)) 824 self.instructions = None 825 self.annotations = None 826 self.properties = None 827 self.body = body 828 for k,v in _compat.iteritems(kwargs): 829 getattr(self, k) # Raise exception if it's not a valid attribute. 830 setattr(self, k, v)
831
832 - def __del__(self):
833 if hasattr(self, "_msg"): 834 pn_message_free(self._msg) 835 del self._msg
836
837 - def _check(self, err):
838 if err < 0: 839 exc = EXCEPTIONS.get(err, MessageException) 840 raise exc("[%s]: %s" % (err, pn_error_text(pn_message_error(self._msg)))) 841 else: 842 return err
843
844 - def _pre_encode(self):
845 inst = Data(pn_message_instructions(self._msg)) 846 ann = Data(pn_message_annotations(self._msg)) 847 props = Data(pn_message_properties(self._msg)) 848 body = Data(pn_message_body(self._msg)) 849 850 inst.clear() 851 if self.instructions is not None: 852 inst.put_object(self.instructions) 853 ann.clear() 854 if self.annotations is not None: 855 ann.put_object(self.annotations) 856 props.clear() 857 if self.properties is not None: 858 props.put_object(self.properties) 859 body.clear() 860 if self.body is not None: 861 body.put_object(self.body)
862
863 - def _post_decode(self):
864 inst = Data(pn_message_instructions(self._msg)) 865 ann = Data(pn_message_annotations(self._msg)) 866 props = Data(pn_message_properties(self._msg)) 867 body = Data(pn_message_body(self._msg)) 868 869 if inst.next(): 870 self.instructions = inst.get_object() 871 else: 872 self.instructions = None 873 if ann.next(): 874 self.annotations = ann.get_object() 875 else: 876 self.annotations = None 877 if props.next(): 878 self.properties = props.get_object() 879 else: 880 self.properties = None 881 if body.next(): 882 self.body = body.get_object() 883 else: 884 self.body = None
885
886 - def clear(self):
887 """ 888 Clears the contents of the L{Message}. All fields will be reset to 889 their default values. 890 """ 891 pn_message_clear(self._msg) 892 self.instructions = None 893 self.annotations = None 894 self.properties = None 895 self.body = None
896
897 - def _is_inferred(self):
898 return pn_message_is_inferred(self._msg)
899
900 - def _set_inferred(self, value):
901 self._check(pn_message_set_inferred(self._msg, bool(value)))
902 903 inferred = property(_is_inferred, _set_inferred, doc=""" 904 The inferred flag for a message indicates how the message content 905 is encoded into AMQP sections. If inferred is true then binary and 906 list values in the body of the message will be encoded as AMQP DATA 907 and AMQP SEQUENCE sections, respectively. If inferred is false, 908 then all values in the body of the message will be encoded as AMQP 909 VALUE sections regardless of their type. 910 """) 911
912 - def _is_durable(self):
913 return pn_message_is_durable(self._msg)
914
915 - def _set_durable(self, value):
916 self._check(pn_message_set_durable(self._msg, bool(value)))
917 918 durable = property(_is_durable, _set_durable, 919 doc=""" 920 The durable property indicates that the message should be held durably 921 by any intermediaries taking responsibility for the message. 922 """) 923
924 - def _get_priority(self):
925 return pn_message_get_priority(self._msg)
926
927 - def _set_priority(self, value):
928 self._check(pn_message_set_priority(self._msg, value))
929 930 priority = property(_get_priority, _set_priority, 931 doc=""" 932 The priority of the message. 933 """) 934
935 - def _get_ttl(self):
936 return millis2secs(pn_message_get_ttl(self._msg))
937
938 - def _set_ttl(self, value):
939 self._check(pn_message_set_ttl(self._msg, secs2millis(value)))
940 941 ttl = property(_get_ttl, _set_ttl, 942 doc=""" 943 The time to live of the message measured in seconds. Expired messages 944 may be dropped. 945 """) 946
947 - def _is_first_acquirer(self):
948 return pn_message_is_first_acquirer(self._msg)
949
950 - def _set_first_acquirer(self, value):
951 self._check(pn_message_set_first_acquirer(self._msg, bool(value)))
952 953 first_acquirer = property(_is_first_acquirer, _set_first_acquirer, 954 doc=""" 955 True iff the recipient is the first to acquire the message. 956 """) 957
958 - def _get_delivery_count(self):
959 return pn_message_get_delivery_count(self._msg)
960
961 - def _set_delivery_count(self, value):
962 self._check(pn_message_set_delivery_count(self._msg, value))
963 964 delivery_count = property(_get_delivery_count, _set_delivery_count, 965 doc=""" 966 The number of delivery attempts made for this message. 967 """) 968 969
970 - def _get_id(self):
971 return self._id.get_object()
972 - def _set_id(self, value):
973 if type(value) in _compat.INT_TYPES: 974 value = ulong(value) 975 self._id.rewind() 976 self._id.put_object(value)
977 id = property(_get_id, _set_id, 978 doc=""" 979 The id of the message. 980 """) 981
982 - def _get_user_id(self):
983 return pn_message_get_user_id(self._msg)
984
985 - def _set_user_id(self, value):
986 self._check(pn_message_set_user_id(self._msg, value))
987 988 user_id = property(_get_user_id, _set_user_id, 989 doc=""" 990 The user id of the message creator. 991 """) 992
993 - def _get_address(self):
994 return utf82unicode(pn_message_get_address(self._msg))
995
996 - def _set_address(self, value):
997 self._check(pn_message_set_address(self._msg, unicode2utf8(value)))
998 999 address = property(_get_address, _set_address, 1000 doc=""" 1001 The address of the message. 1002 """) 1003
1004 - def _get_subject(self):
1005 return utf82unicode(pn_message_get_subject(self._msg))
1006
1007 - def _set_subject(self, value):
1008 self._check(pn_message_set_subject(self._msg, unicode2utf8(value)))
1009 1010 subject = property(_get_subject, _set_subject, 1011 doc=""" 1012 The subject of the message. 1013 """) 1014
1015 - def _get_reply_to(self):
1016 return utf82unicode(pn_message_get_reply_to(self._msg))
1017
1018 - def _set_reply_to(self, value):
1019 self._check(pn_message_set_reply_to(self._msg, unicode2utf8(value)))
1020 1021 reply_to = property(_get_reply_to, _set_reply_to, 1022 doc=""" 1023 The reply-to address for the message. 1024 """) 1025
1026 - def _get_correlation_id(self):
1027 return self._correlation_id.get_object()
1028 - def _set_correlation_id(self, value):
1029 if type(value) in _compat.INT_TYPES: 1030 value = ulong(value) 1031 self._correlation_id.rewind() 1032 self._correlation_id.put_object(value)
1033 1034 correlation_id = property(_get_correlation_id, _set_correlation_id, 1035 doc=""" 1036 The correlation-id for the message. 1037 """) 1038
1039 - def _get_content_type(self):
1040 return symbol(utf82unicode(pn_message_get_content_type(self._msg)))
1041
1042 - def _set_content_type(self, value):
1043 self._check(pn_message_set_content_type(self._msg, unicode2utf8(value)))
1044 1045 content_type = property(_get_content_type, _set_content_type, 1046 doc=""" 1047 The content-type of the message. 1048 """) 1049
1050 - def _get_content_encoding(self):
1051 return symbol(utf82unicode(pn_message_get_content_encoding(self._msg)))
1052
1053 - def _set_content_encoding(self, value):
1054 self._check(pn_message_set_content_encoding(self._msg, unicode2utf8(value)))
1055 1056 content_encoding = property(_get_content_encoding, _set_content_encoding, 1057 doc=""" 1058 The content-encoding of the message. 1059 """) 1060
1061 - def _get_expiry_time(self):
1062 return millis2secs(pn_message_get_expiry_time(self._msg))
1063
1064 - def _set_expiry_time(self, value):
1065 self._check(pn_message_set_expiry_time(self._msg, secs2millis(value)))
1066 1067 expiry_time = property(_get_expiry_time, _set_expiry_time, 1068 doc=""" 1069 The expiry time of the message. 1070 """) 1071
1072 - def _get_creation_time(self):
1073 return millis2secs(pn_message_get_creation_time(self._msg))
1074
1075 - def _set_creation_time(self, value):
1076 self._check(pn_message_set_creation_time(self._msg, secs2millis(value)))
1077 1078 creation_time = property(_get_creation_time, _set_creation_time, 1079 doc=""" 1080 The creation time of the message. 1081 """) 1082
1083 - def _get_group_id(self):
1084 return utf82unicode(pn_message_get_group_id(self._msg))
1085
1086 - def _set_group_id(self, value):
1087 self._check(pn_message_set_group_id(self._msg, unicode2utf8(value)))
1088 1089 group_id = property(_get_group_id, _set_group_id, 1090 doc=""" 1091 The group id of the message. 1092 """) 1093
1094 - def _get_group_sequence(self):
1095 return pn_message_get_group_sequence(self._msg)
1096
1097 - def _set_group_sequence(self, value):
1098 self._check(pn_message_set_group_sequence(self._msg, value))
1099 1100 group_sequence = property(_get_group_sequence, _set_group_sequence, 1101 doc=""" 1102 The sequence of the message within its group. 1103 """) 1104
1105 - def _get_reply_to_group_id(self):
1106 return utf82unicode(pn_message_get_reply_to_group_id(self._msg))
1107
1108 - def _set_reply_to_group_id(self, value):
1109 self._check(pn_message_set_reply_to_group_id(self._msg, unicode2utf8(value)))
1110 1111 reply_to_group_id = property(_get_reply_to_group_id, _set_reply_to_group_id, 1112 doc=""" 1113 The group-id for any replies. 1114 """) 1115
1116 - def encode(self):
1117 self._pre_encode() 1118 sz = 16 1119 while True: 1120 err, data = pn_message_encode(self._msg, sz) 1121 if err == PN_OVERFLOW: 1122 sz *= 2 1123 continue 1124 else: 1125 self._check(err) 1126 return data
1127
1128 - def decode(self, data):
1129 self._check(pn_message_decode(self._msg, data)) 1130 self._post_decode()
1131
1132 - def send(self, sender, tag=None):
1133 dlv = sender.delivery(tag or sender.delivery_tag()) 1134 encoded = self.encode() 1135 sender.stream(encoded) 1136 sender.advance() 1137 if sender.snd_settle_mode == Link.SND_SETTLED: 1138 dlv.settle() 1139 return dlv
1140
1141 - def recv(self, link):
1142 """ 1143 Receives and decodes the message content for the current delivery 1144 from the link. Upon success it will return the current delivery 1145 for the link. If there is no current delivery, or if the current 1146 delivery is incomplete, or if the link is not a receiver, it will 1147 return None. 1148 1149 @type link: Link 1150 @param link: the link to receive a message from 1151 @return the delivery associated with the decoded message (or None) 1152 1153 """ 1154 if link.is_sender: return None 1155 dlv = link.current 1156 if not dlv or dlv.partial: return None 1157 dlv.encoded = link.recv(dlv.pending) 1158 link.advance() 1159 # the sender has already forgotten about the delivery, so we might 1160 # as well too 1161 if link.remote_snd_settle_mode == Link.SND_SETTLED: 1162 dlv.settle() 1163 self.decode(dlv.encoded) 1164 return dlv
1165
1166 - def __repr2__(self):
1167 props = [] 1168 for attr in ("inferred", "address", "reply_to", "durable", "ttl", 1169 "priority", "first_acquirer", "delivery_count", "id", 1170 "correlation_id", "user_id", "group_id", "group_sequence", 1171 "reply_to_group_id", "instructions", "annotations", 1172 "properties", "body"): 1173 value = getattr(self, attr) 1174 if value: props.append("%s=%r" % (attr, value)) 1175 return "Message(%s)" % ", ".join(props)
1176
1177 - def __repr__(self):
1178 tmp = pn_string(None) 1179 err = pn_inspect(self._msg, tmp) 1180 result = pn_string_get(tmp) 1181 pn_free(tmp) 1182 self._check(err) 1183 return result
1184
1185 -class Subscription(object):
1186
1187 - def __init__(self, impl):
1188 self._impl = impl
1189 1190 @property
1191 - def address(self):
1192 return pn_subscription_address(self._impl)
1193 1194 _DEFAULT = object()
1195 1196 -class Selectable(Wrapper):
1197 1198 @staticmethod
1199 - def wrap(impl):
1200 if impl is None: 1201 return None 1202 else: 1203 return Selectable(impl)
1204
1205 - def __init__(self, impl):
1206 Wrapper.__init__(self, impl, pn_selectable_attachments)
1207
1208 - def _init(self):
1209 pass
1210
1211 - def fileno(self, fd = _DEFAULT):
1212 if fd is _DEFAULT: 1213 return pn_selectable_get_fd(self._impl) 1214 elif fd is None: 1215 pn_selectable_set_fd(self._impl, PN_INVALID_SOCKET) 1216 else: 1217 pn_selectable_set_fd(self._impl, fd)
1218
1219 - def _is_reading(self):
1220 return pn_selectable_is_reading(self._impl)
1221
1222 - def _set_reading(self, val):
1223 pn_selectable_set_reading(self._impl, bool(val))
1224 1225 reading = property(_is_reading, _set_reading) 1226
1227 - def _is_writing(self):
1228 return pn_selectable_is_writing(self._impl)
1229
1230 - def _set_writing(self, val):
1231 pn_selectable_set_writing(self._impl, bool(val))
1232 1233 writing = property(_is_writing, _set_writing) 1234
1235 - def _get_deadline(self):
1236 tstamp = pn_selectable_get_deadline(self._impl) 1237 if tstamp: 1238 return millis2secs(tstamp) 1239 else: 1240 return None
1241
1242 - def _set_deadline(self, deadline):
1243 pn_selectable_set_deadline(self._impl, secs2millis(deadline))
1244 1245 deadline = property(_get_deadline, _set_deadline) 1246
1247 - def readable(self):
1248 pn_selectable_readable(self._impl)
1249
1250 - def writable(self):
1251 pn_selectable_writable(self._impl)
1252
1253 - def expired(self):
1254 pn_selectable_expired(self._impl)
1255
1256 - def _is_registered(self):
1257 return pn_selectable_is_registered(self._impl)
1258
1259 - def _set_registered(self, registered):
1260 pn_selectable_set_registered(self._impl, registered)
1261 1262 registered = property(_is_registered, _set_registered, 1263 doc=""" 1264 The registered property may be get/set by an I/O polling system to 1265 indicate whether the fd has been registered or not. 1266 """) 1267 1268 @property
1269 - def is_terminal(self):
1270 return pn_selectable_is_terminal(self._impl)
1271
1272 - def terminate(self):
1273 pn_selectable_terminate(self._impl)
1274
1275 - def release(self):
1276 pn_selectable_release(self._impl)
1277
1278 -class DataException(ProtonException):
1279 """ 1280 The DataException class is the root of the Data exception hierarchy. 1281 All exceptions raised by the Data class extend this exception. 1282 """ 1283 pass
1284
1285 -class UnmappedType:
1286
1287 - def __init__(self, msg):
1288 self.msg = msg
1289
1290 - def __repr__(self):
1291 return "UnmappedType(%s)" % self.msg
1292
1293 -class ulong(long):
1294
1295 - def __repr__(self):
1296 return "ulong(%s)" % long.__repr__(self)
1297
1298 -class timestamp(long):
1299
1300 - def __repr__(self):
1301 return "timestamp(%s)" % long.__repr__(self)
1302
1303 -class symbol(unicode):
1304
1305 - def __repr__(self):
1306 return "symbol(%s)" % unicode.__repr__(self)
1307
1308 -class char(unicode):
1309
1310 - def __repr__(self):
1311 return "char(%s)" % unicode.__repr__(self)
1312
1313 -class byte(int):
1314
1315 - def __repr__(self):
1316 return "byte(%s)" % int.__repr__(self)
1317
1318 -class short(int):
1319
1320 - def __repr__(self):
1321 return "short(%s)" % int.__repr__(self)
1322
1323 -class int32(int):
1324
1325 - def __repr__(self):
1326 return "int32(%s)" % int.__repr__(self)
1327
1328 -class ubyte(int):
1329
1330 - def __repr__(self):
1331 return "ubyte(%s)" % int.__repr__(self)
1332
1333 -class ushort(int):
1334
1335 - def __repr__(self):
1336 return "ushort(%s)" % int.__repr__(self)
1337
1338 -class uint(long):
1339
1340 - def __repr__(self):
1341 return "uint(%s)" % long.__repr__(self)
1342
1343 -class float32(float):
1344
1345 - def __repr__(self):
1346 return "float32(%s)" % float.__repr__(self)
1347
1348 -class decimal32(int):
1349
1350 - def __repr__(self):
1351 return "decimal32(%s)" % int.__repr__(self)
1352
1353 -class decimal64(long):
1354
1355 - def __repr__(self):
1356 return "decimal64(%s)" % long.__repr__(self)
1357
1358 -class decimal128(bytes):
1359
1360 - def __repr__(self):
1361 return "decimal128(%s)" % bytes.__repr__(self)
1362
1363 -class Described(object):
1364
1365 - def __init__(self, descriptor, value):
1366 self.descriptor = descriptor 1367 self.value = value
1368
1369 - def __repr__(self):
1370 return "Described(%r, %r)" % (self.descriptor, self.value)
1371
1372 - def __eq__(self, o):
1373 if isinstance(o, Described): 1374 return self.descriptor == o.descriptor and self.value == o.value 1375 else: 1376 return False
1377 1378 UNDESCRIBED = Constant("UNDESCRIBED")
1379 1380 -class Array(object):
1381
1382 - def __init__(self, descriptor, type, *elements):
1383 self.descriptor = descriptor 1384 self.type = type 1385 self.elements = elements
1386
1387 - def __iter__(self):
1388 return iter(self.elements)
1389
1390 - def __repr__(self):
1391 if self.elements: 1392 els = ", %s" % (", ".join(map(repr, self.elements))) 1393 else: 1394 els = "" 1395 return "Array(%r, %r%s)" % (self.descriptor, self.type, els)
1396
1397 - def __eq__(self, o):
1398 if isinstance(o, Array): 1399 return self.descriptor == o.descriptor and \ 1400 self.type == o.type and self.elements == o.elements 1401 else: 1402 return False
1403
1404 -class Data:
1405 """ 1406 The L{Data} class provides an interface for decoding, extracting, 1407 creating, and encoding arbitrary AMQP data. A L{Data} object 1408 contains a tree of AMQP values. Leaf nodes in this tree correspond 1409 to scalars in the AMQP type system such as L{ints<INT>} or 1410 L{strings<STRING>}. Non-leaf nodes in this tree correspond to 1411 compound values in the AMQP type system such as L{lists<LIST>}, 1412 L{maps<MAP>}, L{arrays<ARRAY>}, or L{described values<DESCRIBED>}. 1413 The root node of the tree is the L{Data} object itself and can have 1414 an arbitrary number of children. 1415 1416 A L{Data} object maintains the notion of the current sibling node 1417 and a current parent node. Siblings are ordered within their parent. 1418 Values are accessed and/or added by using the L{next}, L{prev}, 1419 L{enter}, and L{exit} methods to navigate to the desired location in 1420 the tree and using the supplied variety of put_*/get_* methods to 1421 access or add a value of the desired type. 1422 1423 The put_* methods will always add a value I{after} the current node 1424 in the tree. If the current node has a next sibling the put_* method 1425 will overwrite the value on this node. If there is no current node 1426 or the current node has no next sibling then one will be added. The 1427 put_* methods always set the added/modified node to the current 1428 node. The get_* methods read the value of the current node and do 1429 not change which node is current. 1430 1431 The following types of scalar values are supported: 1432 1433 - L{NULL} 1434 - L{BOOL} 1435 - L{UBYTE} 1436 - L{USHORT} 1437 - L{SHORT} 1438 - L{UINT} 1439 - L{INT} 1440 - L{ULONG} 1441 - L{LONG} 1442 - L{FLOAT} 1443 - L{DOUBLE} 1444 - L{BINARY} 1445 - L{STRING} 1446 - L{SYMBOL} 1447 1448 The following types of compound values are supported: 1449 1450 - L{DESCRIBED} 1451 - L{ARRAY} 1452 - L{LIST} 1453 - L{MAP} 1454 """ 1455 1456 NULL = PN_NULL; "A null value." 1457 BOOL = PN_BOOL; "A boolean value." 1458 UBYTE = PN_UBYTE; "An unsigned byte value." 1459 BYTE = PN_BYTE; "A signed byte value." 1460 USHORT = PN_USHORT; "An unsigned short value." 1461 SHORT = PN_SHORT; "A short value." 1462 UINT = PN_UINT; "An unsigned int value." 1463 INT = PN_INT; "A signed int value." 1464 CHAR = PN_CHAR; "A character value." 1465 ULONG = PN_ULONG; "An unsigned long value." 1466 LONG = PN_LONG; "A signed long value." 1467 TIMESTAMP = PN_TIMESTAMP; "A timestamp value." 1468 FLOAT = PN_FLOAT; "A float value." 1469 DOUBLE = PN_DOUBLE; "A double value." 1470 DECIMAL32 = PN_DECIMAL32; "A DECIMAL32 value." 1471 DECIMAL64 = PN_DECIMAL64; "A DECIMAL64 value." 1472 DECIMAL128 = PN_DECIMAL128; "A DECIMAL128 value." 1473 UUID = PN_UUID; "A UUID value." 1474 BINARY = PN_BINARY; "A binary string." 1475 STRING = PN_STRING; "A unicode string." 1476 SYMBOL = PN_SYMBOL; "A symbolic string." 1477 DESCRIBED = PN_DESCRIBED; "A described value." 1478 ARRAY = PN_ARRAY; "An array value." 1479 LIST = PN_LIST; "A list value." 1480 MAP = PN_MAP; "A map value." 1481 1482 type_names = { 1483 NULL: "null", 1484 BOOL: "bool", 1485 BYTE: "byte", 1486 UBYTE: "ubyte", 1487 SHORT: "short", 1488 USHORT: "ushort", 1489 INT: "int", 1490 UINT: "uint", 1491 CHAR: "char", 1492 LONG: "long", 1493 ULONG: "ulong", 1494 TIMESTAMP: "timestamp", 1495 FLOAT: "float", 1496 DOUBLE: "double", 1497 DECIMAL32: "decimal32", 1498 DECIMAL64: "decimal64", 1499 DECIMAL128: "decimal128", 1500 UUID: "uuid", 1501 BINARY: "binary", 1502 STRING: "string", 1503 SYMBOL: "symbol", 1504 DESCRIBED: "described", 1505 ARRAY: "array", 1506 LIST: "list", 1507 MAP: "map" 1508 } 1509 1510 @classmethod
1511 - def type_name(type): return Data.type_names[type]
1512
1513 - def __init__(self, capacity=16):
1514 if type(capacity) in _compat.INT_TYPES: 1515 self._data = pn_data(capacity) 1516 self._free = True 1517 else: 1518 self._data = capacity 1519 self._free = False
1520
1521 - def __del__(self):
1522 if self._free and hasattr(self, "_data"): 1523 pn_data_free(self._data) 1524 del self._data
1525
1526 - def _check(self, err):
1527 if err < 0: 1528 exc = EXCEPTIONS.get(err, DataException) 1529 raise exc("[%s]: %s" % (err, pn_error_text(pn_data_error(self._data)))) 1530 else: 1531 return err
1532
1533 - def clear(self):
1534 """ 1535 Clears the data object. 1536 """ 1537 pn_data_clear(self._data)
1538
1539 - def rewind(self):
1540 """ 1541 Clears current node and sets the parent to the root node. Clearing the 1542 current node sets it _before_ the first node, calling next() will advance to 1543 the first node. 1544 """ 1545 assert self._data is not None 1546 pn_data_rewind(self._data)
1547
1548 - def next(self):
1549 """ 1550 Advances the current node to its next sibling and returns its 1551 type. If there is no next sibling the current node remains 1552 unchanged and None is returned. 1553 """ 1554 found = pn_data_next(self._data) 1555 if found: 1556 return self.type() 1557 else: 1558 return None
1559
1560 - def prev(self):
1561 """ 1562 Advances the current node to its previous sibling and returns its 1563 type. If there is no previous sibling the current node remains 1564 unchanged and None is returned. 1565 """ 1566 found = pn_data_prev(self._data) 1567 if found: 1568 return self.type() 1569 else: 1570 return None
1571
1572 - def enter(self):
1573 """ 1574 Sets the parent node to the current node and clears the current node. 1575 Clearing the current node sets it _before_ the first child, 1576 call next() advances to the first child. 1577 """ 1578 return pn_data_enter(self._data)
1579
1580 - def exit(self):
1581 """ 1582 Sets the current node to the parent node and the parent node to 1583 its own parent. 1584 """ 1585 return pn_data_exit(self._data)
1586
1587 - def lookup(self, name):
1588 return pn_data_lookup(self._data, name)
1589
1590 - def narrow(self):
1591 pn_data_narrow(self._data)
1592
1593 - def widen(self):
1594 pn_data_widen(self._data)
1595
1596 - def type(self):
1597 """ 1598 Returns the type of the current node. 1599 """ 1600 dtype = pn_data_type(self._data) 1601 if dtype == -1: 1602 return None 1603 else: 1604 return dtype
1605
1606 - def encoded_size(self):
1607 """ 1608 Returns the size in bytes needed to encode the data in AMQP format. 1609 """ 1610 return pn_data_encoded_size(self._data)
1611
1612 - def encode(self):
1613 """ 1614 Returns a representation of the data encoded in AMQP format. 1615 """ 1616 size = 1024 1617 while True: 1618 cd, enc = pn_data_encode(self._data, size) 1619 if cd == PN_OVERFLOW: 1620 size *= 2 1621 elif cd >= 0: 1622 return enc 1623 else: 1624 self._check(cd)
1625
1626 - def decode(self, encoded):
1627 """ 1628 Decodes the first value from supplied AMQP data and returns the 1629 number of bytes consumed. 1630 1631 @type encoded: binary 1632 @param encoded: AMQP encoded binary data 1633 """ 1634 return self._check(pn_data_decode(self._data, encoded))
1635
1636 - def put_list(self):
1637 """ 1638 Puts a list value. Elements may be filled by entering the list 1639 node and putting element values. 1640 1641 >>> data = Data() 1642 >>> data.put_list() 1643 >>> data.enter() 1644 >>> data.put_int(1) 1645 >>> data.put_int(2) 1646 >>> data.put_int(3) 1647 >>> data.exit() 1648 """ 1649 self._check(pn_data_put_list(self._data))
1650
1651 - def put_map(self):
1652 """ 1653 Puts a map value. Elements may be filled by entering the map node 1654 and putting alternating key value pairs. 1655 1656 >>> data = Data() 1657 >>> data.put_map() 1658 >>> data.enter() 1659 >>> data.put_string("key") 1660 >>> data.put_string("value") 1661 >>> data.exit() 1662 """ 1663 self._check(pn_data_put_map(self._data))
1664
1665 - def put_array(self, described, element_type):
1666 """ 1667 Puts an array value. Elements may be filled by entering the array 1668 node and putting the element values. The values must all be of the 1669 specified array element type. If an array is described then the 1670 first child value of the array is the descriptor and may be of any 1671 type. 1672 1673 >>> data = Data() 1674 >>> 1675 >>> data.put_array(False, Data.INT) 1676 >>> data.enter() 1677 >>> data.put_int(1) 1678 >>> data.put_int(2) 1679 >>> data.put_int(3) 1680 >>> data.exit() 1681 >>> 1682 >>> data.put_array(True, Data.DOUBLE) 1683 >>> data.enter() 1684 >>> data.put_symbol("array-descriptor") 1685 >>> data.put_double(1.1) 1686 >>> data.put_double(1.2) 1687 >>> data.put_double(1.3) 1688 >>> data.exit() 1689 1690 @type described: bool 1691 @param described: specifies whether the array is described 1692 @type element_type: int 1693 @param element_type: the type of the array elements 1694 """ 1695 self._check(pn_data_put_array(self._data, described, element_type))
1696
1697 - def put_described(self):
1698 """ 1699 Puts a described value. A described node has two children, the 1700 descriptor and the value. These are specified by entering the node 1701 and putting the desired values. 1702 1703 >>> data = Data() 1704 >>> data.put_described() 1705 >>> data.enter() 1706 >>> data.put_symbol("value-descriptor") 1707 >>> data.put_string("the value") 1708 >>> data.exit() 1709 """ 1710 self._check(pn_data_put_described(self._data))
1711
1712 - def put_null(self):
1713 """ 1714 Puts a null value. 1715 """ 1716 self._check(pn_data_put_null(self._data))
1717
1718 - def put_bool(self, b):
1719 """ 1720 Puts a boolean value. 1721 1722 @param b: a boolean value 1723 """ 1724 self._check(pn_data_put_bool(self._data, b))
1725
1726 - def put_ubyte(self, ub):
1727 """ 1728 Puts an unsigned byte value. 1729 1730 @param ub: an integral value 1731 """ 1732 self._check(pn_data_put_ubyte(self._data, ub))
1733
1734 - def put_byte(self, b):
1735 """ 1736 Puts a signed byte value. 1737 1738 @param b: an integral value 1739 """ 1740 self._check(pn_data_put_byte(self._data, b))
1741
1742 - def put_ushort(self, us):
1743 """ 1744 Puts an unsigned short value. 1745 1746 @param us: an integral value. 1747 """ 1748 self._check(pn_data_put_ushort(self._data, us))
1749
1750 - def put_short(self, s):
1751 """ 1752 Puts a signed short value. 1753 1754 @param s: an integral value 1755 """ 1756 self._check(pn_data_put_short(self._data, s))
1757
1758 - def put_uint(self, ui):
1759 """ 1760 Puts an unsigned int value. 1761 1762 @param ui: an integral value 1763 """ 1764 self._check(pn_data_put_uint(self._data, ui))
1765
1766 - def put_int(self, i):
1767 """ 1768 Puts a signed int value. 1769 1770 @param i: an integral value 1771 """ 1772 self._check(pn_data_put_int(self._data, i))
1773
1774 - def put_char(self, c):
1775 """ 1776 Puts a char value. 1777 1778 @param c: a single character 1779 """ 1780 self._check(pn_data_put_char(self._data, ord(c)))
1781
1782 - def put_ulong(self, ul):
1783 """ 1784 Puts an unsigned long value. 1785 1786 @param ul: an integral value 1787 """ 1788 self._check(pn_data_put_ulong(self._data, ul))
1789
1790 - def put_long(self, l):
1791 """ 1792 Puts a signed long value. 1793 1794 @param l: an integral value 1795 """ 1796 self._check(pn_data_put_long(self._data, l))
1797
1798 - def put_timestamp(self, t):
1799 """ 1800 Puts a timestamp value. 1801 1802 @param t: an integral value 1803 """ 1804 self._check(pn_data_put_timestamp(self._data, t))
1805
1806 - def put_float(self, f):
1807 """ 1808 Puts a float value. 1809 1810 @param f: a floating point value 1811 """ 1812 self._check(pn_data_put_float(self._data, f))
1813
1814 - def put_double(self, d):
1815 """ 1816 Puts a double value. 1817 1818 @param d: a floating point value. 1819 """ 1820 self._check(pn_data_put_double(self._data, d))
1821
1822 - def put_decimal32(self, d):
1823 """ 1824 Puts a decimal32 value. 1825 1826 @param d: a decimal32 value 1827 """ 1828 self._check(pn_data_put_decimal32(self._data, d))
1829
1830 - def put_decimal64(self, d):
1831 """ 1832 Puts a decimal64 value. 1833 1834 @param d: a decimal64 value 1835 """ 1836 self._check(pn_data_put_decimal64(self._data, d))
1837
1838 - def put_decimal128(self, d):
1839 """ 1840 Puts a decimal128 value. 1841 1842 @param d: a decimal128 value 1843 """ 1844 self._check(pn_data_put_decimal128(self._data, d))
1845
1846 - def put_uuid(self, u):
1847 """ 1848 Puts a UUID value. 1849 1850 @param u: a uuid value 1851 """ 1852 self._check(pn_data_put_uuid(self._data, u.bytes))
1853
1854 - def put_binary(self, b):
1855 """ 1856 Puts a binary value. 1857 1858 @type b: binary 1859 @param b: a binary value 1860 """ 1861 self._check(pn_data_put_binary(self._data, b))
1862
1863 - def put_memoryview(self, mv):
1864 """Put a python memoryview object as an AMQP binary value""" 1865 self.put_binary(mv.tobytes())
1866
1867 - def put_buffer(self, buff):
1868 """Put a python buffer object as an AMQP binary value""" 1869 self.put_binary(bytes(buff))
1870
1871 - def put_string(self, s):
1872 """ 1873 Puts a unicode value. 1874 1875 @type s: unicode 1876 @param s: a unicode value 1877 """ 1878 self._check(pn_data_put_string(self._data, s.encode("utf8")))
1879
1880 - def put_symbol(self, s):
1881 """ 1882 Puts a symbolic value. 1883 1884 @type s: string 1885 @param s: the symbol name 1886 """ 1887 self._check(pn_data_put_symbol(self._data, s.encode('ascii')))
1888
1889 - def get_list(self):
1890 """ 1891 If the current node is a list, return the number of elements, 1892 otherwise return zero. List elements can be accessed by entering 1893 the list. 1894 1895 >>> count = data.get_list() 1896 >>> data.enter() 1897 >>> for i in range(count): 1898 ... type = data.next() 1899 ... if type == Data.STRING: 1900 ... print data.get_string() 1901 ... elif type == ...: 1902 ... ... 1903 >>> data.exit() 1904 """ 1905 return pn_data_get_list(self._data)
1906
1907 - def get_map(self):
1908 """ 1909 If the current node is a map, return the number of child elements, 1910 otherwise return zero. Key value pairs can be accessed by entering 1911 the map. 1912 1913 >>> count = data.get_map() 1914 >>> data.enter() 1915 >>> for i in range(count/2): 1916 ... type = data.next() 1917 ... if type == Data.STRING: 1918 ... print data.get_string() 1919 ... elif type == ...: 1920 ... ... 1921 >>> data.exit() 1922 """ 1923 return pn_data_get_map(self._data)
1924
1925 - def get_array(self):
1926 """ 1927 If the current node is an array, return a tuple of the element 1928 count, a boolean indicating whether the array is described, and 1929 the type of each element, otherwise return (0, False, None). Array 1930 data can be accessed by entering the array. 1931 1932 >>> # read an array of strings with a symbolic descriptor 1933 >>> count, described, type = data.get_array() 1934 >>> data.enter() 1935 >>> data.next() 1936 >>> print "Descriptor:", data.get_symbol() 1937 >>> for i in range(count): 1938 ... data.next() 1939 ... print "Element:", data.get_string() 1940 >>> data.exit() 1941 """ 1942 count = pn_data_get_array(self._data) 1943 described = pn_data_is_array_described(self._data) 1944 type = pn_data_get_array_type(self._data) 1945 if type == -1: 1946 type = None 1947 return count, described, type
1948
1949 - def is_described(self):
1950 """ 1951 Checks if the current node is a described value. The descriptor 1952 and value may be accessed by entering the described value. 1953 1954 >>> # read a symbolically described string 1955 >>> assert data.is_described() # will error if the current node is not described 1956 >>> data.enter() 1957 >>> data.next() 1958 >>> print data.get_symbol() 1959 >>> data.next() 1960 >>> print data.get_string() 1961 >>> data.exit() 1962 """ 1963 return pn_data_is_described(self._data)
1964
1965 - def is_null(self):
1966 """ 1967 Checks if the current node is a null. 1968 """ 1969 return pn_data_is_null(self._data)
1970
1971 - def get_bool(self):
1972 """ 1973 If the current node is a boolean, returns its value, returns False 1974 otherwise. 1975 """ 1976 return pn_data_get_bool(self._data)
1977
1978 - def get_ubyte(self):
1979 """ 1980 If the current node is an unsigned byte, returns its value, 1981 returns 0 otherwise. 1982 """ 1983 return ubyte(pn_data_get_ubyte(self._data))
1984
1985 - def get_byte(self):
1986 """ 1987 If the current node is a signed byte, returns its value, returns 0 1988 otherwise. 1989 """ 1990 return byte(pn_data_get_byte(self._data))
1991
1992 - def get_ushort(self):
1993 """ 1994 If the current node is an unsigned short, returns its value, 1995 returns 0 otherwise. 1996 """ 1997 return ushort(pn_data_get_ushort(self._data))
1998
1999 - def get_short(self):
2000 """ 2001 If the current node is a signed short, returns its value, returns 2002 0 otherwise. 2003 """ 2004 return short(pn_data_get_short(self._data))
2005
2006 - def get_uint(self):
2007 """ 2008 If the current node is an unsigned int, returns its value, returns 2009 0 otherwise. 2010 """ 2011 return uint(pn_data_get_uint(self._data))
2012
2013 - def get_int(self):
2014 """ 2015 If the current node is a signed int, returns its value, returns 0 2016 otherwise. 2017 """ 2018 return int32(pn_data_get_int(self._data))
2019
2020 - def get_char(self):
2021 """ 2022 If the current node is a char, returns its value, returns 0 2023 otherwise. 2024 """ 2025 return char(_compat.unichar(pn_data_get_char(self._data)))
2026
2027 - def get_ulong(self):
2028 """ 2029 If the current node is an unsigned long, returns its value, 2030 returns 0 otherwise. 2031 """ 2032 return ulong(pn_data_get_ulong(self._data))
2033
2034 - def get_long(self):
2035 """ 2036 If the current node is an signed long, returns its value, returns 2037 0 otherwise. 2038 """ 2039 return long(pn_data_get_long(self._data))
2040
2041 - def get_timestamp(self):
2042 """ 2043 If the current node is a timestamp, returns its value, returns 0 2044 otherwise. 2045 """ 2046 return timestamp(pn_data_get_timestamp(self._data))
2047
2048 - def get_float(self):
2049 """ 2050 If the current node is a float, returns its value, raises 0 2051 otherwise. 2052 """ 2053 return float32(pn_data_get_float(self._data))
2054
2055 - def get_double(self):
2056 """ 2057 If the current node is a double, returns its value, returns 0 2058 otherwise. 2059 """ 2060 return pn_data_get_double(self._data)
2061 2062 # XXX: need to convert
2063 - def get_decimal32(self):
2064 """ 2065 If the current node is a decimal32, returns its value, returns 0 2066 otherwise. 2067 """ 2068 return decimal32(pn_data_get_decimal32(self._data))
2069 2070 # XXX: need to convert
2071 - def get_decimal64(self):
2072 """ 2073 If the current node is a decimal64, returns its value, returns 0 2074 otherwise. 2075 """ 2076 return decimal64(pn_data_get_decimal64(self._data))
2077 2078 # XXX: need to convert
2079 - def get_decimal128(self):
2080 """ 2081 If the current node is a decimal128, returns its value, returns 0 2082 otherwise. 2083 """ 2084 return decimal128(pn_data_get_decimal128(self._data))
2085
2086 - def get_uuid(self):
2087 """ 2088 If the current node is a UUID, returns its value, returns None 2089 otherwise. 2090 """ 2091 if pn_data_type(self._data) == Data.UUID: 2092 return uuid.UUID(bytes=pn_data_get_uuid(self._data)) 2093 else: 2094 return None
2095
2096 - def get_binary(self):
2097 """ 2098 If the current node is binary, returns its value, returns "" 2099 otherwise. 2100 """ 2101 return pn_data_get_binary(self._data)
2102
2103 - def get_string(self):
2104 """ 2105 If the current node is a string, returns its value, returns "" 2106 otherwise. 2107 """ 2108 return pn_data_get_string(self._data).decode("utf8")
2109
2110 - def get_symbol(self):
2111 """ 2112 If the current node is a symbol, returns its value, returns "" 2113 otherwise. 2114 """ 2115 return symbol(pn_data_get_symbol(self._data).decode('ascii'))
2116
2117 - def copy(self, src):
2118 self._check(pn_data_copy(self._data, src._data))
2119
2120 - def format(self):
2121 sz = 16 2122 while True: 2123 err, result = pn_data_format(self._data, sz) 2124 if err == PN_OVERFLOW: 2125 sz *= 2 2126 continue 2127 else: 2128 self._check(err) 2129 return result
2130
2131 - def dump(self):
2132 pn_data_dump(self._data)
2133
2134 - def put_dict(self, d):
2135 self.put_map() 2136 self.enter() 2137 try: 2138 for k, v in d.items(): 2139 self.put_object(k) 2140 self.put_object(v) 2141 finally: 2142 self.exit()
2143
2144 - def get_dict(self):
2145 if self.enter(): 2146 try: 2147 result = {} 2148 while self.next(): 2149 k = self.get_object() 2150 if self.next(): 2151 v = self.get_object() 2152 else: 2153 v = None 2154 result[k] = v 2155 finally: 2156 self.exit() 2157 return result
2158
2159 - def put_sequence(self, s):
2160 self.put_list() 2161 self.enter() 2162 try: 2163 for o in s: 2164 self.put_object(o) 2165 finally: 2166 self.exit()
2167
2168 - def get_sequence(self):
2169 if self.enter(): 2170 try: 2171 result = [] 2172 while self.next(): 2173 result.append(self.get_object()) 2174 finally: 2175 self.exit() 2176 return result
2177
2178 - def get_py_described(self):
2179 if self.enter(): 2180 try: 2181 self.next() 2182 descriptor = self.get_object() 2183 self.next() 2184 value = self.get_object() 2185 finally: 2186 self.exit() 2187 return Described(descriptor, value)
2188
2189 - def put_py_described(self, d):
2190 self.put_described() 2191 self.enter() 2192 try: 2193 self.put_object(d.descriptor) 2194 self.put_object(d.value) 2195 finally: 2196 self.exit()
2197
2198 - def get_py_array(self):
2199 """ 2200 If the current node is an array, return an Array object 2201 representing the array and its contents. Otherwise return None. 2202 This is a convenience wrapper around get_array, enter, etc. 2203 """ 2204 2205 count, described, type = self.get_array() 2206 if type is None: return None 2207 if self.enter(): 2208 try: 2209 if described: 2210 self.next() 2211 descriptor = self.get_object() 2212 else: 2213 descriptor = UNDESCRIBED 2214 elements = [] 2215 while self.next(): 2216 elements.append(self.get_object()) 2217 finally: 2218 self.exit() 2219 return Array(descriptor, type, *elements)
2220
2221 - def put_py_array(self, a):
2222 described = a.descriptor != UNDESCRIBED 2223 self.put_array(described, a.type) 2224 self.enter() 2225 try: 2226 if described: 2227 self.put_object(a.descriptor) 2228 for e in a.elements: 2229 self.put_object(e) 2230 finally: 2231 self.exit()
2232 2233 put_mappings = { 2234 None.__class__: lambda s, _: s.put_null(), 2235 bool: put_bool, 2236 ubyte: put_ubyte, 2237 ushort: put_ushort, 2238 uint: put_uint, 2239 ulong: put_ulong, 2240 byte: put_byte, 2241 short: put_short, 2242 int32: put_int, 2243 long: put_long, 2244 float32: put_float, 2245 float: put_double, 2246 decimal32: put_decimal32, 2247 decimal64: put_decimal64, 2248 decimal128: put_decimal128, 2249 char: put_char, 2250 timestamp: put_timestamp, 2251 uuid.UUID: put_uuid, 2252 bytes: put_binary, 2253 unicode: put_string, 2254 symbol: put_symbol, 2255 list: put_sequence, 2256 tuple: put_sequence, 2257 dict: put_dict, 2258 Described: put_py_described, 2259 Array: put_py_array 2260 } 2261 # for python 3.x, long is merely an alias for int, but for python 2.x 2262 # we need to add an explicit int since it is a different type 2263 if int not in put_mappings: 2264 put_mappings[int] = put_int 2265 # Python >=3.0 has 'memoryview', <=2.5 has 'buffer', >=2.6 has both. 2266 try: put_mappings[memoryview] = put_memoryview 2267 except NameError: pass 2268 try: put_mappings[buffer] = put_buffer 2269 except NameError: pass 2270 get_mappings = { 2271 NULL: lambda s: None, 2272 BOOL: get_bool, 2273 BYTE: get_byte, 2274 UBYTE: get_ubyte, 2275 SHORT: get_short, 2276 USHORT: get_ushort, 2277 INT: get_int, 2278 UINT: get_uint, 2279 CHAR: get_char, 2280 LONG: get_long, 2281 ULONG: get_ulong, 2282 TIMESTAMP: get_timestamp, 2283 FLOAT: get_float, 2284 DOUBLE: get_double, 2285 DECIMAL32: get_decimal32, 2286 DECIMAL64: get_decimal64, 2287 DECIMAL128: get_decimal128, 2288 UUID: get_uuid, 2289 BINARY: get_binary, 2290 STRING: get_string, 2291 SYMBOL: get_symbol, 2292 DESCRIBED: get_py_described, 2293 ARRAY: get_py_array, 2294 LIST: get_sequence, 2295 MAP: get_dict 2296 } 2297 2298
2299 - def put_object(self, obj):
2300 putter = self.put_mappings[obj.__class__] 2301 putter(self, obj)
2302
2303 - def get_object(self):
2304 type = self.type() 2305 if type is None: return None 2306 getter = self.get_mappings.get(type) 2307 if getter: 2308 return getter(self) 2309 else: 2310 return UnmappedType(str(type))
2311
2312 -class ConnectionException(ProtonException):
2313 pass
2314
2315 -class Endpoint(object):
2316 2317 LOCAL_UNINIT = PN_LOCAL_UNINIT 2318 REMOTE_UNINIT = PN_REMOTE_UNINIT 2319 LOCAL_ACTIVE = PN_LOCAL_ACTIVE 2320 REMOTE_ACTIVE = PN_REMOTE_ACTIVE 2321 LOCAL_CLOSED = PN_LOCAL_CLOSED 2322 REMOTE_CLOSED = PN_REMOTE_CLOSED 2323
2324 - def _init(self):
2325 self.condition = None
2326
2327 - def _update_cond(self):
2328 obj2cond(self.condition, self._get_cond_impl())
2329 2330 @property
2331 - def remote_condition(self):
2332 return cond2obj(self._get_remote_cond_impl())
2333 2334 # the following must be provided by subclasses
2335 - def _get_cond_impl(self):
2336 assert False, "Subclass must override this!"
2337
2338 - def _get_remote_cond_impl(self):
2339 assert False, "Subclass must override this!"
2340
2341 - def _get_handler(self):
2342 from . import reactor 2343 ractor = reactor.Reactor.wrap(pn_object_reactor(self._impl)) 2344 if ractor: 2345 on_error = ractor.on_error_delegate() 2346 else: 2347 on_error = None 2348 record = self._get_attachments() 2349 return WrappedHandler.wrap(pn_record_get_handler(record), on_error)
2350
2351 - def _set_handler(self, handler):
2352 from . import reactor 2353 ractor = reactor.Reactor.wrap(pn_object_reactor(self._impl)) 2354 if ractor: 2355 on_error = ractor.on_error_delegate() 2356 else: 2357 on_error = None 2358 impl = _chandler(handler, on_error) 2359 record = self._get_attachments() 2360 pn_record_set_handler(record, impl) 2361 pn_decref(impl)
2362 2363 handler = property(_get_handler, _set_handler) 2364 2365 @property
2366 - def transport(self):
2367 return self.connection.transport
2368
2369 -class Condition:
2370
2371 - def __init__(self, name, description=None, info=None):
2372 self.name = name 2373 self.description = description 2374 self.info = info
2375
2376 - def __repr__(self):
2377 return "Condition(%s)" % ", ".join([repr(x) for x in 2378 (self.name, self.description, self.info) 2379 if x])
2380
2381 - def __eq__(self, o):
2382 if not isinstance(o, Condition): return False 2383 return self.name == o.name and \ 2384 self.description == o.description and \ 2385 self.info == o.info
2386
2387 -def obj2cond(obj, cond):
2388 pn_condition_clear(cond) 2389 if obj: 2390 pn_condition_set_name(cond, str(obj.name)) 2391 pn_condition_set_description(cond, obj.description) 2392 info = Data(pn_condition_info(cond)) 2393 if obj.info: 2394 info.put_object(obj.info)
2395
2396 -def cond2obj(cond):
2397 if pn_condition_is_set(cond): 2398 return Condition(pn_condition_get_name(cond), 2399 pn_condition_get_description(cond), 2400 dat2obj(pn_condition_info(cond))) 2401 else: 2402 return None
2403
2404 -def dat2obj(dimpl):
2405 if dimpl: 2406 d = Data(dimpl) 2407 d.rewind() 2408 d.next() 2409 obj = d.get_object() 2410 d.rewind() 2411 return obj
2412
2413 -def obj2dat(obj, dimpl):
2414 if obj is not None: 2415 d = Data(dimpl) 2416 d.put_object(obj)
2417
2418 -def secs2millis(secs):
2419 return long(secs*1000)
2420
2421 -def millis2secs(millis):
2422 return float(millis)/1000.0
2423
2424 -def timeout2millis(secs):
2425 if secs is None: return PN_MILLIS_MAX 2426 return secs2millis(secs)
2427
2428 -def millis2timeout(millis):
2429 if millis == PN_MILLIS_MAX: return None 2430 return millis2secs(millis)
2431
2432 -def unicode2utf8(string):
2433 """Some Proton APIs expect a null terminated string. Convert python text 2434 types to UTF8 to avoid zero bytes introduced by other multi-byte encodings. 2435 This method will throw if the string cannot be converted. 2436 """ 2437 if string is None: 2438 return None 2439 if _compat.IS_PY2: 2440 if isinstance(string, unicode): 2441 return string.encode('utf-8') 2442 elif isinstance(string, str): 2443 return string 2444 else: 2445 # decoding a string results in bytes 2446 if isinstance(string, str): 2447 string = string.encode('utf-8') 2448 # fall through 2449 if isinstance(string, bytes): 2450 return string.decode('utf-8') 2451 raise TypeError("Unrecognized string type: %r (%s)" % (string, type(string)))
2452
2453 -def utf82unicode(string):
2454 """Covert C strings returned from proton-c into python unicode""" 2455 if string is None: 2456 return None 2457 if isinstance(string, _compat.TEXT_TYPES): 2458 # already unicode 2459 return string 2460 elif isinstance(string, _compat.BINARY_TYPES): 2461 return string.decode('utf8') 2462 else: 2463 raise TypeError("Unrecognized string type")
2464
2465 -class Connection(Wrapper, Endpoint):
2466 """ 2467 A representation of an AMQP connection 2468 """ 2469 2470 @staticmethod
2471 - def wrap(impl):
2472 if impl is None: 2473 return None 2474 else: 2475 return Connection(impl)
2476
2477 - def __init__(self, impl = pn_connection):
2478 Wrapper.__init__(self, impl, pn_connection_attachments)
2479
2480 - def _init(self):
2481 Endpoint._init(self) 2482 self.offered_capabilities = None 2483 self.desired_capabilities = None 2484 self.properties = None
2485
2486 - def _get_attachments(self):
2487 return pn_connection_attachments(self._impl)
2488 2489 @property
2490 - def connection(self):
2491 return self
2492 2493 @property
2494 - def transport(self):
2495 return Transport.wrap(pn_connection_transport(self._impl))
2496
2497 - def _check(self, err):
2498 if err < 0: 2499 exc = EXCEPTIONS.get(err, ConnectionException) 2500 raise exc("[%s]: %s" % (err, pn_connection_error(self._impl))) 2501 else: 2502 return err
2503
2504 - def _get_cond_impl(self):
2505 return pn_connection_condition(self._impl)
2506
2507 - def _get_remote_cond_impl(self):
2508 return pn_connection_remote_condition(self._impl)
2509
2510 - def collect(self, collector):
2511 if collector is None: 2512 pn_connection_collect(self._impl, None) 2513 else: 2514 pn_connection_collect(self._impl, collector._impl) 2515 self._collector = weakref.ref(collector)
2516
2517 - def _get_container(self):
2518 return utf82unicode(pn_connection_get_container(self._impl))
2519 - def _set_container(self, name):
2520 return pn_connection_set_container(self._impl, unicode2utf8(name))
2521 2522 container = property(_get_container, _set_container) 2523
2524 - def _get_hostname(self):
2525 return utf82unicode(pn_connection_get_hostname(self._impl))
2526 - def _set_hostname(self, name):
2527 return pn_connection_set_hostname(self._impl, unicode2utf8(name))
2528 2529 hostname = property(_get_hostname, _set_hostname, 2530 doc=""" 2531 Set the name of the host (either fully qualified or relative) to which this 2532 connection is connecting to. This information may be used by the remote 2533 peer to determine the correct back-end service to connect the client to. 2534 This value will be sent in the Open performative, and will be used by SSL 2535 and SASL layers to identify the peer. 2536 """) 2537
2538 - def _get_user(self):
2539 return utf82unicode(pn_connection_get_user(self._impl))
2540 - def _set_user(self, name):
2541 return pn_connection_set_user(self._impl, unicode2utf8(name))
2542 2543 user = property(_get_user, _set_user) 2544
2545 - def _get_password(self):
2546 return None
2547 - def _set_password(self, name):
2548 return pn_connection_set_password(self._impl, unicode2utf8(name))
2549 2550 password = property(_get_password, _set_password) 2551 2552 @property
2553 - def remote_container(self):
2554 """The container identifier specified by the remote peer for this connection.""" 2555 return pn_connection_remote_container(self._impl)
2556 2557 @property
2558 - def remote_hostname(self):
2559 """The hostname specified by the remote peer for this connection.""" 2560 return pn_connection_remote_hostname(self._impl)
2561 2562 @property
2564 """The capabilities offered by the remote peer for this connection.""" 2565 return dat2obj(pn_connection_remote_offered_capabilities(self._impl))
2566 2567 @property
2569 """The capabilities desired by the remote peer for this connection.""" 2570 return dat2obj(pn_connection_remote_desired_capabilities(self._impl))
2571 2572 @property
2573 - def remote_properties(self):
2574 """The properties specified by the remote peer for this connection.""" 2575 return dat2obj(pn_connection_remote_properties(self._impl))
2576
2577 - def open(self):
2578 """ 2579 Opens the connection. 2580 2581 In more detail, this moves the local state of the connection to 2582 the ACTIVE state and triggers an open frame to be sent to the 2583 peer. A connection is fully active once both peers have opened it. 2584 """ 2585 obj2dat(self.offered_capabilities, 2586 pn_connection_offered_capabilities(self._impl)) 2587 obj2dat(self.desired_capabilities, 2588 pn_connection_desired_capabilities(self._impl)) 2589 obj2dat(self.properties, pn_connection_properties(self._impl)) 2590 pn_connection_open(self._impl)
2591
2592 - def close(self):
2593 """ 2594 Closes the connection. 2595 2596 In more detail, this moves the local state of the connection to 2597 the CLOSED state and triggers a close frame to be sent to the 2598 peer. A connection is fully closed once both peers have closed it. 2599 """ 2600 self._update_cond() 2601 pn_connection_close(self._impl) 2602 if hasattr(self, '_session_policy'): 2603 # break circular ref 2604 del self._session_policy
2605 2606 @property
2607 - def state(self):
2608 """ 2609 The state of the connection as a bit field. The state has a local 2610 and a remote component. Each of these can be in one of three 2611 states: UNINIT, ACTIVE or CLOSED. These can be tested by masking 2612 against LOCAL_UNINIT, LOCAL_ACTIVE, LOCAL_CLOSED, REMOTE_UNINIT, 2613 REMOTE_ACTIVE and REMOTE_CLOSED. 2614 """ 2615 return pn_connection_state(self._impl)
2616
2617 - def session(self):
2618 """ 2619 Returns a new session on this connection. 2620 """ 2621 ssn = pn_session(self._impl) 2622 if ssn is None: 2623 raise(SessionException("Session allocation failed.")) 2624 else: 2625 return Session(ssn)
2626
2627 - def session_head(self, mask):
2628 return Session.wrap(pn_session_head(self._impl, mask))
2629 2632 2633 @property
2634 - def work_head(self):
2635 return Delivery.wrap(pn_work_head(self._impl))
2636 2637 @property
2638 - def error(self):
2639 return pn_error_code(pn_connection_error(self._impl))
2640
2641 - def free(self):
2642 pn_connection_release(self._impl)
2643
2644 -class SessionException(ProtonException):
2645 pass
2646
2647 -class Session(Wrapper, Endpoint):
2648 2649 @staticmethod
2650 - def wrap(impl):
2651 if impl is None: 2652 return None 2653 else: 2654 return Session(impl)
2655
2656 - def __init__(self, impl):
2657 Wrapper.__init__(self, impl, pn_session_attachments)
2658
2659 - def _get_attachments(self):
2660 return pn_session_attachments(self._impl)
2661
2662 - def _get_cond_impl(self):
2663 return pn_session_condition(self._impl)
2664
2665 - def _get_remote_cond_impl(self):
2666 return pn_session_remote_condition(self._impl)
2667
2668 - def _get_incoming_capacity(self):
2669 return pn_session_get_incoming_capacity(self._impl)
2670
2671 - def _set_incoming_capacity(self, capacity):
2672 pn_session_set_incoming_capacity(self._impl, capacity)
2673 2674 incoming_capacity = property(_get_incoming_capacity, _set_incoming_capacity) 2675
2676 - def _get_outgoing_window(self):
2677 return pn_session_get_outgoing_window(self._impl)
2678
2679 - def _set_outgoing_window(self, window):
2680 pn_session_set_outgoing_window(self._impl, window)
2681 2682 outgoing_window = property(_get_outgoing_window, _set_outgoing_window) 2683 2684 @property
2685 - def outgoing_bytes(self):
2686 return pn_session_outgoing_bytes(self._impl)
2687 2688 @property
2689 - def incoming_bytes(self):
2690 return pn_session_incoming_bytes(self._impl)
2691
2692 - def open(self):
2693 pn_session_open(self._impl)
2694
2695 - def close(self):
2696 self._update_cond() 2697 pn_session_close(self._impl)
2698
2699 - def next(self, mask):
2700 return Session.wrap(pn_session_next(self._impl, mask))
2701 2702 @property
2703 - def state(self):
2704 return pn_session_state(self._impl)
2705 2706 @property
2707 - def connection(self):
2708 return Connection.wrap(pn_session_connection(self._impl))
2709
2710 - def sender(self, name):
2711 return Sender(pn_sender(self._impl, unicode2utf8(name)))
2712
2713 - def receiver(self, name):
2714 return Receiver(pn_receiver(self._impl, unicode2utf8(name)))
2715
2716 - def free(self):
2717 pn_session_free(self._impl)
2718
2719 -class LinkException(ProtonException):
2720 pass
2721 2914
2915 -class Terminus(object):
2916 2917 UNSPECIFIED = PN_UNSPECIFIED 2918 SOURCE = PN_SOURCE 2919 TARGET = PN_TARGET 2920 COORDINATOR = PN_COORDINATOR 2921 2922 NONDURABLE = PN_NONDURABLE 2923 CONFIGURATION = PN_CONFIGURATION 2924 DELIVERIES = PN_DELIVERIES 2925 2926 DIST_MODE_UNSPECIFIED = PN_DIST_MODE_UNSPECIFIED 2927 DIST_MODE_COPY = PN_DIST_MODE_COPY 2928 DIST_MODE_MOVE = PN_DIST_MODE_MOVE 2929 2930 EXPIRE_WITH_LINK = PN_EXPIRE_WITH_LINK 2931 EXPIRE_WITH_SESSION = PN_EXPIRE_WITH_SESSION 2932 EXPIRE_WITH_CONNECTION = PN_EXPIRE_WITH_CONNECTION 2933 EXPIRE_NEVER = PN_EXPIRE_NEVER 2934
2935 - def __init__(self, impl):
2936 self._impl = impl
2937
2938 - def _check(self, err):
2939 if err < 0: 2940 exc = EXCEPTIONS.get(err, LinkException) 2941 raise exc("[%s]" % err) 2942 else: 2943 return err
2944
2945 - def _get_type(self):
2946 return pn_terminus_get_type(self._impl)
2947 - def _set_type(self, type):
2948 self._check(pn_terminus_set_type(self._impl, type))
2949 type = property(_get_type, _set_type) 2950
2951 - def _get_address(self):
2952 """The address that identifies the source or target node""" 2953 return utf82unicode(pn_terminus_get_address(self._impl))
2954 - def _set_address(self, address):
2955 self._check(pn_terminus_set_address(self._impl, unicode2utf8(address)))
2956 address = property(_get_address, _set_address) 2957
2958 - def _get_durability(self):
2959 return pn_terminus_get_durability(self._impl)
2960 - def _set_durability(self, seconds):
2961 self._check(pn_terminus_set_durability(self._impl, seconds))
2962 durability = property(_get_durability, _set_durability) 2963
2964 - def _get_expiry_policy(self):
2965 return pn_terminus_get_expiry_policy(self._impl)
2966 - def _set_expiry_policy(self, seconds):
2967 self._check(pn_terminus_set_expiry_policy(self._impl, seconds))
2968 expiry_policy = property(_get_expiry_policy, _set_expiry_policy) 2969
2970 - def _get_timeout(self):
2971 return pn_terminus_get_timeout(self._impl)
2972 - def _set_timeout(self, seconds):
2973 self._check(pn_terminus_set_timeout(self._impl, seconds))
2974 timeout = property(_get_timeout, _set_timeout) 2975
2976 - def _is_dynamic(self):
2977 """Indicates whether the source or target node was dynamically 2978 created""" 2979 return pn_terminus_is_dynamic(self._impl)
2980 - def _set_dynamic(self, dynamic):
2981 self._check(pn_terminus_set_dynamic(self._impl, dynamic))
2982 dynamic = property(_is_dynamic, _set_dynamic) 2983
2984 - def _get_distribution_mode(self):
2985 return pn_terminus_get_distribution_mode(self._impl)
2986 - def _set_distribution_mode(self, mode):
2987 self._check(pn_terminus_set_distribution_mode(self._impl, mode))
2988 distribution_mode = property(_get_distribution_mode, _set_distribution_mode) 2989 2990 @property
2991 - def properties(self):
2992 """Properties of a dynamic source or target.""" 2993 return Data(pn_terminus_properties(self._impl))
2994 2995 @property
2996 - def capabilities(self):
2997 """Capabilities of the source or target.""" 2998 return Data(pn_terminus_capabilities(self._impl))
2999 3000 @property
3001 - def outcomes(self):
3002 return Data(pn_terminus_outcomes(self._impl))
3003 3004 @property
3005 - def filter(self):
3006 """A filter on a source allows the set of messages transfered over 3007 the link to be restricted""" 3008 return Data(pn_terminus_filter(self._impl))
3009
3010 - def copy(self, src):
3011 self._check(pn_terminus_copy(self._impl, src._impl))
3012
3013 -class Sender(Link):
3014 """ 3015 A link over which messages are sent. 3016 """ 3017
3018 - def offered(self, n):
3019 pn_link_offered(self._impl, n)
3020
3021 - def stream(self, data):
3022 """ 3023 Send specified data as part of the current delivery 3024 3025 @type data: binary 3026 @param data: data to send 3027 """ 3028 return self._check(pn_link_send(self._impl, data))
3029
3030 - def send(self, obj, tag=None):
3031 """ 3032 Send specified object over this sender; the object is expected to 3033 have a send() method on it that takes the sender and an optional 3034 tag as arguments. 3035 3036 Where the object is a Message, this will send the message over 3037 this link, creating a new delivery for the purpose. 3038 """ 3039 if hasattr(obj, 'send'): 3040 return obj.send(self, tag=tag) 3041 else: 3042 # treat object as bytes 3043 return self.stream(obj)
3044
3045 - def delivery_tag(self):
3046 if not hasattr(self, 'tag_generator'): 3047 def simple_tags(): 3048 count = 1 3049 while True: 3050 yield str(count) 3051 count += 1
3052 self.tag_generator = simple_tags() 3053 return next(self.tag_generator)
3054
3055 -class Receiver(Link):
3056 """ 3057 A link over which messages are received. 3058 """ 3059
3060 - def flow(self, n):
3061 """Increases the credit issued to the remote sender by the specified number of messages.""" 3062 pn_link_flow(self._impl, n)
3063
3064 - def recv(self, limit):
3065 n, binary = pn_link_recv(self._impl, limit) 3066 if n == PN_EOS: 3067 return None 3068 else: 3069 self._check(n) 3070 return binary
3071
3072 - def drain(self, n):
3073 pn_link_drain(self._impl, n)
3074
3075 - def draining(self):
3076 return pn_link_draining(self._impl)
3077
3078 -class NamedInt(int):
3079 3080 values = {} 3081
3082 - def __new__(cls, i, name):
3083 ni = super(NamedInt, cls).__new__(cls, i) 3084 cls.values[i] = ni 3085 return ni
3086
3087 - def __init__(self, i, name):
3088 self.name = name
3089
3090 - def __repr__(self):
3091 return self.name
3092
3093 - def __str__(self):
3094 return self.name
3095 3096 @classmethod
3097 - def get(cls, i):
3098 return cls.values.get(i, i)
3099
3100 -class DispositionType(NamedInt):
3101 values = {}
3102
3103 -class Disposition(object):
3104 3105 RECEIVED = DispositionType(PN_RECEIVED, "RECEIVED") 3106 ACCEPTED = DispositionType(PN_ACCEPTED, "ACCEPTED") 3107 REJECTED = DispositionType(PN_REJECTED, "REJECTED") 3108 RELEASED = DispositionType(PN_RELEASED, "RELEASED") 3109 MODIFIED = DispositionType(PN_MODIFIED, "MODIFIED") 3110
3111 - def __init__(self, impl, local):
3112 self._impl = impl 3113 self.local = local 3114 self._data = None 3115 self._condition = None 3116 self._annotations = None
3117 3118 @property
3119 - def type(self):
3120 return DispositionType.get(pn_disposition_type(self._impl))
3121
3122 - def _get_section_number(self):
3123 return pn_disposition_get_section_number(self._impl)
3124 - def _set_section_number(self, n):
3125 pn_disposition_set_section_number(self._impl, n)
3126 section_number = property(_get_section_number, _set_section_number) 3127
3128 - def _get_section_offset(self):
3129 return pn_disposition_get_section_offset(self._impl)
3130 - def _set_section_offset(self, n):
3131 pn_disposition_set_section_offset(self._impl, n)
3132 section_offset = property(_get_section_offset, _set_section_offset) 3133
3134 - def _get_failed(self):
3135 return pn_disposition_is_failed(self._impl)
3136 - def _set_failed(self, b):
3137 pn_disposition_set_failed(self._impl, b)
3138 failed = property(_get_failed, _set_failed) 3139
3140 - def _get_undeliverable(self):
3141 return pn_disposition_is_undeliverable(self._impl)
3142 - def _set_undeliverable(self, b):
3143 pn_disposition_set_undeliverable(self._impl, b)
3144 undeliverable = property(_get_undeliverable, _set_undeliverable) 3145
3146 - def _get_data(self):
3147 if self.local: 3148 return self._data 3149 else: 3150 return dat2obj(pn_disposition_data(self._impl))
3151 - def _set_data(self, obj):
3152 if self.local: 3153 self._data = obj 3154 else: 3155 raise AttributeError("data attribute is read-only")
3156 data = property(_get_data, _set_data) 3157
3158 - def _get_annotations(self):
3159 if self.local: 3160 return self._annotations 3161 else: 3162 return dat2obj(pn_disposition_annotations(self._impl))
3163 - def _set_annotations(self, obj):
3164 if self.local: 3165 self._annotations = obj 3166 else: 3167 raise AttributeError("annotations attribute is read-only")
3168 annotations = property(_get_annotations, _set_annotations) 3169
3170 - def _get_condition(self):
3171 if self.local: 3172 return self._condition 3173 else: 3174 return cond2obj(pn_disposition_condition(self._impl))
3175 - def _set_condition(self, obj):
3176 if self.local: 3177 self._condition = obj 3178 else: 3179 raise AttributeError("condition attribute is read-only")
3180 condition = property(_get_condition, _set_condition)
3181
3182 -class Delivery(Wrapper):
3183 """ 3184 Tracks and/or records the delivery of a message over a link. 3185 """ 3186 3187 RECEIVED = Disposition.RECEIVED 3188 ACCEPTED = Disposition.ACCEPTED 3189 REJECTED = Disposition.REJECTED 3190 RELEASED = Disposition.RELEASED 3191 MODIFIED = Disposition.MODIFIED 3192 3193 @staticmethod
3194 - def wrap(impl):
3195 if impl is None: 3196 return None 3197 else: 3198 return Delivery(impl)
3199
3200 - def __init__(self, impl):
3201 Wrapper.__init__(self, impl, pn_delivery_attachments)
3202
3203 - def _init(self):
3204 self.local = Disposition(pn_delivery_local(self._impl), True) 3205 self.remote = Disposition(pn_delivery_remote(self._impl), False)
3206 3207 @property
3208 - def tag(self):
3209 """The identifier for the delivery.""" 3210 return pn_delivery_tag(self._impl)
3211 3212 @property
3213 - def writable(self):
3214 """Returns true for an outgoing delivery to which data can now be written.""" 3215 return pn_delivery_writable(self._impl)
3216 3217 @property
3218 - def readable(self):
3219 """Returns true for an incoming delivery that has data to read.""" 3220 return pn_delivery_readable(self._impl)
3221 3222 @property
3223 - def updated(self):
3224 """Returns true if the state of the delivery has been updated 3225 (e.g. it has been settled and/or accepted, rejected etc).""" 3226 return pn_delivery_updated(self._impl)
3227
3228 - def update(self, state):
3229 """ 3230 Set the local state of the delivery e.g. ACCEPTED, REJECTED, RELEASED. 3231 """ 3232 obj2dat(self.local._data, pn_disposition_data(self.local._impl)) 3233 obj2dat(self.local._annotations, pn_disposition_annotations(self.local._impl)) 3234 obj2cond(self.local._condition, pn_disposition_condition(self.local._impl)) 3235 pn_delivery_update(self._impl, state)
3236 3237 @property
3238 - def pending(self):
3239 return pn_delivery_pending(self._impl)
3240 3241 @property
3242 - def partial(self):
3243 """ 3244 Returns true for an incoming delivery if not all the data is 3245 yet available. 3246 """ 3247 return pn_delivery_partial(self._impl)
3248 3249 @property
3250 - def local_state(self):
3251 """Returns the local state of the delivery.""" 3252 return DispositionType.get(pn_delivery_local_state(self._impl))
3253 3254 @property
3255 - def remote_state(self):
3256 """ 3257 Returns the state of the delivery as indicated by the remote 3258 peer. 3259 """ 3260 return DispositionType.get(pn_delivery_remote_state(self._impl))
3261 3262 @property
3263 - def settled(self):
3264 """ 3265 Returns true if the delivery has been settled by the remote peer. 3266 """ 3267 return pn_delivery_settled(self._impl)
3268
3269 - def settle(self):
3270 """ 3271 Settles the delivery locally. This indicates the aplication 3272 considers the delivery complete and does not wish to receive any 3273 further events about it. Every delivery should be settled locally. 3274 """ 3275 pn_delivery_settle(self._impl)
3276 3277 @property
3278 - def aborted(self):
3279 """Returns true if the delivery has been aborted.""" 3280 return pn_delivery_aborted(self._impl)
3281
3282 - def abort(self):
3283 """ 3284 Aborts the delivery. This indicates the application wishes to 3285 invalidate any data that may have already been sent on this delivery. 3286 The delivery cannot be aborted after it has been completely delivered. 3287 """ 3288 pn_delivery_abort(self._impl)
3289 3290 @property
3291 - def work_next(self):
3292 return Delivery.wrap(pn_work_next(self._impl))
3293 3294 @property 3300 3301 @property
3302 - def session(self):
3303 """ 3304 Returns the session over which the delivery was sent or received. 3305 """ 3306 return self.link.session
3307 3308 @property
3309 - def connection(self):
3310 """ 3311 Returns the connection over which the delivery was sent or received. 3312 """ 3313 return self.session.connection
3314 3315 @property
3316 - def transport(self):
3317 return self.connection.transport
3318
3319 -class TransportException(ProtonException):
3320 pass
3321
3322 -class TraceAdapter:
3323
3324 - def __init__(self, tracer):
3325 self.tracer = tracer
3326
3327 - def __call__(self, trans_impl, message):
3328 self.tracer(Transport.wrap(trans_impl), message)
3329
3330 -class Transport(Wrapper):
3331 3332 TRACE_OFF = PN_TRACE_OFF 3333 TRACE_DRV = PN_TRACE_DRV 3334 TRACE_FRM = PN_TRACE_FRM 3335 TRACE_RAW = PN_TRACE_RAW 3336 3337 CLIENT = 1 3338 SERVER = 2 3339 3340 @staticmethod
3341 - def wrap(impl):
3342 if impl is None: 3343 return None 3344 else: 3345 return Transport(_impl=impl)
3346
3347 - def __init__(self, mode=None, _impl = pn_transport):
3348 Wrapper.__init__(self, _impl, pn_transport_attachments) 3349 if mode == Transport.SERVER: 3350 pn_transport_set_server(self._impl) 3351 elif mode is None or mode==Transport.CLIENT: 3352 pass 3353 else: 3354 raise TransportException("Cannot initialise Transport from mode: %s" % str(mode))
3355
3356 - def _init(self):
3357 self._sasl = None 3358 self._ssl = None
3359
3360 - def _check(self, err):
3361 if err < 0: 3362 exc = EXCEPTIONS.get(err, TransportException) 3363 raise exc("[%s]: %s" % (err, pn_error_text(pn_transport_error(self._impl)))) 3364 else: 3365 return err
3366
3367 - def _set_tracer(self, tracer):
3368 pn_transport_set_pytracer(self._impl, TraceAdapter(tracer));
3369
3370 - def _get_tracer(self):
3371 adapter = pn_transport_get_pytracer(self._impl) 3372 if adapter: 3373 return adapter.tracer 3374 else: 3375 return None
3376 3377 tracer = property(_get_tracer, _set_tracer, 3378 doc=""" 3379 A callback for trace logging. The callback is passed the transport and log message. 3380 """) 3381
3382 - def log(self, message):
3383 pn_transport_log(self._impl, message)
3384
3385 - def require_auth(self, bool):
3386 pn_transport_require_auth(self._impl, bool)
3387 3388 @property
3389 - def authenticated(self):
3390 return pn_transport_is_authenticated(self._impl)
3391
3392 - def require_encryption(self, bool):
3393 pn_transport_require_encryption(self._impl, bool)
3394 3395 @property
3396 - def encrypted(self):
3397 return pn_transport_is_encrypted(self._impl)
3398 3399 @property
3400 - def user(self):
3401 return pn_transport_get_user(self._impl)
3402
3403 - def bind(self, connection):
3404 """Assign a connection to the transport""" 3405 self._check(pn_transport_bind(self._impl, connection._impl))
3406
3407 - def unbind(self):
3408 """Release the connection""" 3409 self._check(pn_transport_unbind(self._impl))
3410
3411 - def trace(self, n):
3412 pn_transport_trace(self._impl, n)
3413
3414 - def tick(self, now):
3415 """Process any timed events (like heartbeat generation). 3416 now = seconds since epoch (float). 3417 """ 3418 return millis2secs(pn_transport_tick(self._impl, secs2millis(now)))
3419
3420 - def capacity(self):
3421 c = pn_transport_capacity(self._impl) 3422 if c >= PN_EOS: 3423 return c 3424 else: 3425 return self._check(c)
3426
3427 - def push(self, binary):
3428 n = self._check(pn_transport_push(self._impl, binary)) 3429 if n != len(binary): 3430 raise OverflowError("unable to process all bytes: %s, %s" % (n, len(binary)))
3431
3432 - def close_tail(self):
3433 self._check(pn_transport_close_tail(self._impl))
3434
3435 - def pending(self):
3436 p = pn_transport_pending(self._impl) 3437 if p >= PN_EOS: 3438 return p 3439 else: 3440 return self._check(p)
3441
3442 - def peek(self, size):
3443 cd, out = pn_transport_peek(self._impl, size) 3444 if cd == PN_EOS: 3445 return None 3446 else: 3447 self._check(cd) 3448 return out
3449
3450 - def pop(self, size):
3451 pn_transport_pop(self._impl, size)
3452
3453 - def close_head(self):
3454 self._check(pn_transport_close_head(self._impl))
3455 3456 @property
3457 - def closed(self):
3458 return pn_transport_closed(self._impl)
3459 3460 # AMQP 1.0 max-frame-size
3461 - def _get_max_frame_size(self):
3462 return pn_transport_get_max_frame(self._impl)
3463
3464 - def _set_max_frame_size(self, value):
3465 pn_transport_set_max_frame(self._impl, value)
3466 3467 max_frame_size = property(_get_max_frame_size, _set_max_frame_size, 3468 doc=""" 3469 Sets the maximum size for received frames (in bytes). 3470 """) 3471 3472 @property
3473 - def remote_max_frame_size(self):
3474 return pn_transport_get_remote_max_frame(self._impl)
3475
3476 - def _get_channel_max(self):
3477 return pn_transport_get_channel_max(self._impl)
3478
3479 - def _set_channel_max(self, value):
3480 if pn_transport_set_channel_max(self._impl, value): 3481 raise SessionException("Too late to change channel max.")
3482 3483 channel_max = property(_get_channel_max, _set_channel_max, 3484 doc=""" 3485 Sets the maximum channel that may be used on the transport. 3486 """) 3487 3488 @property
3489 - def remote_channel_max(self):
3490 return pn_transport_remote_channel_max(self._impl)
3491 3492 # AMQP 1.0 idle-time-out
3493 - def _get_idle_timeout(self):
3494 return millis2secs(pn_transport_get_idle_timeout(self._impl))
3495
3496 - def _set_idle_timeout(self, sec):
3497 pn_transport_set_idle_timeout(self._impl, secs2millis(sec))
3498 3499 idle_timeout = property(_get_idle_timeout, _set_idle_timeout, 3500 doc=""" 3501 The idle timeout of the connection (float, in seconds). 3502 """) 3503 3504 @property
3505 - def