Package proton
[frames] | no frames]

Source Code for Package proton

   1  # 
   2  # Licensed to the Apache Software Foundation (ASF) under one 
   3  # or more contributor license agreements.  See the NOTICE file 
   4  # distributed with this work for additional information 
   5  # regarding copyright ownership.  The ASF licenses this file 
   6  # to you under the Apache License, Version 2.0 (the 
   7  # "License"); you may not use this file except in compliance 
   8  # with the License.  You may obtain a copy of the License at 
   9  # 
  10  #   http://www.apache.org/licenses/LICENSE-2.0 
  11  # 
  12  # Unless required by applicable law or agreed to in writing, 
  13  # software distributed under the License is distributed on an 
  14  # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
  15  # KIND, either express or implied.  See the License for the 
  16  # specific language governing permissions and limitations 
  17  # under the License. 
  18  # 
  19   
  20  """ 
  21  The proton module defines a suite of APIs that implement the AMQP 1.0 
  22  protocol. 
  23   
  24  The proton APIs consist of the following classes: 
  25   
  26   - L{Messenger} -- A messaging endpoint. 
  27   - L{Message}   -- A class for creating and/or accessing AMQP message content. 
  28   - L{Data}      -- A class for creating and/or accessing arbitrary AMQP encoded 
  29                    data. 
  30   
  31  """ 
  32  from __future__ import absolute_import 
  33   
  34  from cproton import * 
  35  from .wrapper import Wrapper 
  36  from proton import _compat 
  37   
  38  import logging, weakref, socket, sys, threading 
  39   
  40  try: 
  41    handler = logging.NullHandler() 
  42  except AttributeError: 
43 - class NullHandler(logging.Handler):
44 - def handle(self, record):
45 pass
46
47 - def emit(self, record):
48 pass
49
50 - def createLock(self):
51 self.lock = None
52 53 handler = NullHandler() 54 55 log = logging.getLogger("proton") 56 log.addHandler(handler) 57 58 try: 59 import uuid
60 61 - def generate_uuid():
62 return uuid.uuid4()
63 64 except ImportError: 65 """ 66 No 'native' UUID support. Provide a very basic UUID type that is a compatible subset of the uuid type provided by more modern python releases. 67 """ 68 import struct
69 - class uuid:
70 - class UUID:
71 - def __init__(self, hex=None, bytes=None):
72 if [hex, bytes].count(None) != 1: 73 raise TypeError("need one of hex or bytes") 74 if bytes is not None: 75 self.bytes = bytes 76 elif hex is not None: 77 fields=hex.split("-") 78 fields[4:5] = [fields[4][:4], fields[4][4:]] 79 self.bytes = struct.pack("!LHHHHL", *[int(x,16) for x in fields])
80
81 - def __cmp__(self, other):
82 if isinstance(other, uuid.UUID): 83 return cmp(self.bytes, other.bytes) 84 else: 85 return -1
86
87 - def __str__(self):
88 return "%08x-%04x-%04x-%04x-%04x%08x" % struct.unpack("!LHHHHL", self.bytes)
89
90 - def __repr__(self):
91 return "UUID(%r)" % str(self)
92
93 - def __hash__(self):
94 return self.bytes.__hash__()
95 96 import os, random, time 97 rand = random.Random() 98 rand.seed((os.getpid(), time.time(), socket.gethostname()))
99 - def random_uuid():
100 data = [rand.randint(0, 255) for i in xrange(16)] 101 102 # From RFC4122, the version bits are set to 0100 103 data[6] &= 0x0F 104 data[6] |= 0x40 105 106 # From RFC4122, the top two bits of byte 8 get set to 01 107 data[8] &= 0x3F 108 data[8] |= 0x80 109 return "".join(map(chr, data))
110
111 - def uuid4():
112 return uuid.UUID(bytes=random_uuid())
113
114 - def generate_uuid():
115 return uuid4()
116 117 # 118 # Hacks to provide Python2 <---> Python3 compatibility 119 # 120 try: 121 bytes() 122 except NameError: 123 bytes = str 124 try: 125 long() 126 except NameError: 127 long = int 128 try: 129 unicode() 130 except NameError: 131 unicode = str 132 133 134 VERSION_MAJOR = PN_VERSION_MAJOR 135 VERSION_MINOR = PN_VERSION_MINOR 136 VERSION_POINT = PN_VERSION_POINT 137 VERSION = (VERSION_MAJOR, VERSION_MINOR, VERSION_POINT) 138 API_LANGUAGE = "C" 139 IMPLEMENTATION_LANGUAGE = "C"
140 141 -class Constant(object):
142
143 - def __init__(self, name):
144 self.name = name
145
146 - def __repr__(self):
147 return self.name
148
149 -class ProtonException(Exception):
150 """ 151 The root of the proton exception hierarchy. All proton exception 152 classes derive from this exception. 153 """ 154 pass
155
156 -class Timeout(ProtonException):
157 """ 158 A timeout exception indicates that a blocking operation has timed 159 out. 160 """ 161 pass
162
163 -class Interrupt(ProtonException):
164 """ 165 An interrupt exception indicates that a blocking operation was interrupted. 166 """ 167 pass
168
169 -class MessengerException(ProtonException):
170 """ 171 The root of the messenger exception hierarchy. All exceptions 172 generated by the messenger class derive from this exception. 173 """ 174 pass
175
176 -class MessageException(ProtonException):
177 """ 178 The MessageException class is the root of the message exception 179 hierarchy. All exceptions generated by the Message class derive from 180 this exception. 181 """ 182 pass
183 184 EXCEPTIONS = { 185 PN_TIMEOUT: Timeout, 186 PN_INTR: Interrupt 187 } 188 189 PENDING = Constant("PENDING") 190 ACCEPTED = Constant("ACCEPTED") 191 REJECTED = Constant("REJECTED") 192 RELEASED = Constant("RELEASED") 193 MODIFIED = Constant("MODIFIED") 194 ABORTED = Constant("ABORTED") 195 SETTLED = Constant("SETTLED") 196 197 STATUSES = { 198 PN_STATUS_ABORTED: ABORTED, 199 PN_STATUS_ACCEPTED: ACCEPTED, 200 PN_STATUS_REJECTED: REJECTED, 201 PN_STATUS_RELEASED: RELEASED, 202 PN_STATUS_MODIFIED: MODIFIED, 203 PN_STATUS_PENDING: PENDING, 204 PN_STATUS_SETTLED: SETTLED, 205 PN_STATUS_UNKNOWN: None 206 } 207 208 AUTOMATIC = Constant("AUTOMATIC") 209 MANUAL = Constant("MANUAL")
210 211 -class Messenger(object):
212 """ 213 The L{Messenger} class defines a high level interface for sending 214 and receiving L{Messages<Message>}. Every L{Messenger} contains a 215 single logical queue of incoming messages and a single logical queue 216 of outgoing messages. These messages in these queues may be destined 217 for, or originate from, a variety of addresses. 218 219 The messenger interface is single-threaded. All methods 220 except one (L{interrupt}) are intended to be used from within 221 the messenger thread. 222 223 224 Address Syntax 225 ============== 226 227 An address has the following form:: 228 229 [ amqp[s]:// ] [user[:password]@] domain [/[name]] 230 231 Where domain can be one of:: 232 233 host | host:port | ip | ip:port | name 234 235 The following are valid examples of addresses: 236 237 - example.org 238 - example.org:1234 239 - amqp://example.org 240 - amqps://example.org 241 - example.org/incoming 242 - amqps://example.org/outgoing 243 - amqps://fred:trustno1@example.org 244 - 127.0.0.1:1234 245 - amqps://127.0.0.1:1234 246 247 Sending & Receiving Messages 248 ============================ 249 250 The L{Messenger} class works in conjunction with the L{Message} class. The 251 L{Message} class is a mutable holder of message content. 252 253 The L{put} method copies its L{Message} to the outgoing queue, and may 254 send queued messages if it can do so without blocking. The L{send} 255 method blocks until it has sent the requested number of messages, 256 or until a timeout interrupts the attempt. 257 258 259 >>> message = Message() 260 >>> for i in range(3): 261 ... message.address = "amqp://host/queue" 262 ... message.subject = "Hello World %i" % i 263 ... messenger.put(message) 264 >>> messenger.send() 265 266 Similarly, the L{recv} method receives messages into the incoming 267 queue, and may block as it attempts to receive the requested number 268 of messages, or until timeout is reached. It may receive fewer 269 than the requested number. The L{get} method pops the 270 eldest L{Message} off the incoming queue and copies it into the L{Message} 271 object that you supply. It will not block. 272 273 274 >>> message = Message() 275 >>> messenger.recv(10): 276 >>> while messenger.incoming > 0: 277 ... messenger.get(message) 278 ... print message.subject 279 Hello World 0 280 Hello World 1 281 Hello World 2 282 283 The blocking flag allows you to turn off blocking behavior entirely, 284 in which case L{send} and L{recv} will do whatever they can without 285 blocking, and then return. You can then look at the number 286 of incoming and outgoing messages to see how much outstanding work 287 still remains. 288 """ 289
290 - def __init__(self, name=None):
291 """ 292 Construct a new L{Messenger} with the given name. The name has 293 global scope. If a NULL name is supplied, a UUID based name will 294 be chosen. 295 296 @type name: string 297 @param name: the name of the messenger or None 298 299 """ 300 self._mng = pn_messenger(name) 301 self._selectables = {}
302
303 - def __del__(self):
304 """ 305 Destroy the L{Messenger}. This will close all connections that 306 are managed by the L{Messenger}. Call the L{stop} method before 307 destroying the L{Messenger}. 308 """ 309 if hasattr(self, "_mng"): 310 pn_messenger_free(self._mng) 311 del self._mng
312
313 - def _check(self, err):
314 if err < 0: 315 if (err == PN_INPROGRESS): 316 return 317 exc = EXCEPTIONS.get(err, MessengerException) 318 raise exc("[%s]: %s" % (err, pn_error_text(pn_messenger_error(self._mng)))) 319 else: 320 return err
321 322 @property
323 - def name(self):
324 """ 325 The name of the L{Messenger}. 326 """ 327 return pn_messenger_name(self._mng)
328
329 - def _get_certificate(self):
330 return pn_messenger_get_certificate(self._mng)
331
332 - def _set_certificate(self, value):
333 self._check(pn_messenger_set_certificate(self._mng, value))
334 335 certificate = property(_get_certificate, _set_certificate, 336 doc=""" 337 Path to a certificate file for the L{Messenger}. This certificate is 338 used when the L{Messenger} accepts or establishes SSL/TLS connections. 339 This property must be specified for the L{Messenger} to accept 340 incoming SSL/TLS connections and to establish client authenticated 341 outgoing SSL/TLS connection. Non client authenticated outgoing SSL/TLS 342 connections do not require this property. 343 """) 344
345 - def _get_private_key(self):
346 return pn_messenger_get_private_key(self._mng)
347
348 - def _set_private_key(self, value):
349 self._check(pn_messenger_set_private_key(self._mng, value))
350 351 private_key = property(_get_private_key, _set_private_key, 352 doc=""" 353 Path to a private key file for the L{Messenger's<Messenger>} 354 certificate. This property must be specified for the L{Messenger} to 355 accept incoming SSL/TLS connections and to establish client 356 authenticated outgoing SSL/TLS connection. Non client authenticated 357 SSL/TLS connections do not require this property. 358 """) 359
360 - def _get_password(self):
361 return pn_messenger_get_password(self._mng)
362
363 - def _set_password(self, value):
364 self._check(pn_messenger_set_password(self._mng, value))
365 366 password = property(_get_password, _set_password, 367 doc=""" 368 This property contains the password for the L{Messenger.private_key} 369 file, or None if the file is not encrypted. 370 """) 371
372 - def _get_trusted_certificates(self):
373 return pn_messenger_get_trusted_certificates(self._mng)
374
375 - def _set_trusted_certificates(self, value):
376 self._check(pn_messenger_set_trusted_certificates(self._mng, value))
377 378 trusted_certificates = property(_get_trusted_certificates, 379 _set_trusted_certificates, 380 doc=""" 381 A path to a database of trusted certificates for use in verifying the 382 peer on an SSL/TLS connection. If this property is None, then the peer 383 will not be verified. 384 """) 385
386 - def _get_timeout(self):
387 t = pn_messenger_get_timeout(self._mng) 388 if t == -1: 389 return None 390 else: 391 return millis2secs(t)
392
393 - def _set_timeout(self, value):
394 if value is None: 395 t = -1 396 else: 397 t = secs2millis(value) 398 self._check(pn_messenger_set_timeout(self._mng, t))
399 400 timeout = property(_get_timeout, _set_timeout, 401 doc=""" 402 The timeout property contains the default timeout for blocking 403 operations performed by the L{Messenger}. 404 """) 405
406 - def _is_blocking(self):
407 return pn_messenger_is_blocking(self._mng)
408
409 - def _set_blocking(self, b):
410 self._check(pn_messenger_set_blocking(self._mng, b))
411 412 blocking = property(_is_blocking, _set_blocking, 413 doc=""" 414 Enable or disable blocking behavior during L{Message} sending 415 and receiving. This affects every blocking call, with the 416 exception of L{work}. Currently, the affected calls are 417 L{send}, L{recv}, and L{stop}. 418 """) 419
420 - def _is_passive(self):
421 return pn_messenger_is_passive(self._mng)
422
423 - def _set_passive(self, b):
424 self._check(pn_messenger_set_passive(self._mng, b))
425 426 passive = property(_is_passive, _set_passive, 427 doc=""" 428 When passive is set to true, Messenger will not attempt to perform I/O 429 internally. In this mode it is necessary to use the selectables API to 430 drive any I/O needed to perform requested actions. In this mode 431 Messenger will never block. 432 """) 433
434 - def _get_incoming_window(self):
435 return pn_messenger_get_incoming_window(self._mng)
436
437 - def _set_incoming_window(self, window):
438 self._check(pn_messenger_set_incoming_window(self._mng, window))
439 440 incoming_window = property(_get_incoming_window, _set_incoming_window, 441 doc=""" 442 The incoming tracking window for the messenger. The messenger will 443 track the remote status of this many incoming deliveries after they 444 have been accepted or rejected. Defaults to zero. 445 446 L{Messages<Message>} enter this window only when you take them into your application 447 using L{get}. If your incoming window size is I{n}, and you get I{n}+1 L{messages<Message>} 448 without explicitly accepting or rejecting the oldest message, then the 449 message that passes beyond the edge of the incoming window will be assigned 450 the default disposition of its link. 451 """) 452
453 - def _get_outgoing_window(self):
454 return pn_messenger_get_outgoing_window(self._mng)
455
456 - def _set_outgoing_window(self, window):
457 self._check(pn_messenger_set_outgoing_window(self._mng, window))
458 459 outgoing_window = property(_get_outgoing_window, _set_outgoing_window, 460 doc=""" 461 The outgoing tracking window for the messenger. The messenger will 462 track the remote status of this many outgoing deliveries after calling 463 send. Defaults to zero. 464 465 A L{Message} enters this window when you call the put() method with the 466 message. If your outgoing window size is I{n}, and you call L{put} I{n}+1 467 times, status information will no longer be available for the 468 first message. 469 """) 470
471 - def start(self):
472 """ 473 Currently a no-op placeholder. 474 For future compatibility, do not L{send} or L{recv} messages 475 before starting the L{Messenger}. 476 """ 477 self._check(pn_messenger_start(self._mng))
478
479 - def stop(self):
480 """ 481 Transitions the L{Messenger} to an inactive state. An inactive 482 L{Messenger} will not send or receive messages from its internal 483 queues. A L{Messenger} should be stopped before being discarded to 484 ensure a clean shutdown handshake occurs on any internally managed 485 connections. 486 """ 487 self._check(pn_messenger_stop(self._mng))
488 489 @property
490 - def stopped(self):
491 """ 492 Returns true iff a L{Messenger} is in the stopped state. 493 This function does not block. 494 """ 495 return pn_messenger_stopped(self._mng)
496
497 - def subscribe(self, source):
498 """ 499 Subscribes the L{Messenger} to messages originating from the 500 specified source. The source is an address as specified in the 501 L{Messenger} introduction with the following addition. If the 502 domain portion of the address begins with the '~' character, the 503 L{Messenger} will interpret the domain as host/port, bind to it, 504 and listen for incoming messages. For example "~0.0.0.0", 505 "amqp://~0.0.0.0", and "amqps://~0.0.0.0" will all bind to any 506 local interface and listen for incoming messages with the last 507 variant only permitting incoming SSL connections. 508 509 @type source: string 510 @param source: the source of messages to subscribe to 511 """ 512 sub_impl = pn_messenger_subscribe(self._mng, source) 513 if not sub_impl: 514 self._check(pn_error_code(pn_messenger_error(self._mng))) 515 raise MessengerException("Cannot subscribe to %s"%source) 516 return Subscription(sub_impl)
517
518 - def put(self, message):
519 """ 520 Places the content contained in the message onto the outgoing 521 queue of the L{Messenger}. This method will never block, however 522 it will send any unblocked L{Messages<Message>} in the outgoing 523 queue immediately and leave any blocked L{Messages<Message>} 524 remaining in the outgoing queue. The L{send} call may be used to 525 block until the outgoing queue is empty. The L{outgoing} property 526 may be used to check the depth of the outgoing queue. 527 528 When the content in a given L{Message} object is copied to the outgoing 529 message queue, you may then modify or discard the L{Message} object 530 without having any impact on the content in the outgoing queue. 531 532 This method returns an outgoing tracker for the L{Message}. The tracker 533 can be used to determine the delivery status of the L{Message}. 534 535 @type message: Message 536 @param message: the message to place in the outgoing queue 537 @return: a tracker 538 """ 539 message._pre_encode() 540 self._check(pn_messenger_put(self._mng, message._msg)) 541 return pn_messenger_outgoing_tracker(self._mng)
542
543 - def status(self, tracker):
544 """ 545 Gets the last known remote state of the delivery associated with 546 the given tracker. 547 548 @type tracker: tracker 549 @param tracker: the tracker whose status is to be retrieved 550 551 @return: one of None, PENDING, REJECTED, MODIFIED, or ACCEPTED 552 """ 553 disp = pn_messenger_status(self._mng, tracker); 554 return STATUSES.get(disp, disp)
555
556 - def buffered(self, tracker):
557 """ 558 Checks if the delivery associated with the given tracker is still 559 waiting to be sent. 560 561 @type tracker: tracker 562 @param tracker: the tracker whose status is to be retrieved 563 564 @return: true if delivery is still buffered 565 """ 566 return pn_messenger_buffered(self._mng, tracker);
567
568 - def settle(self, tracker=None):
569 """ 570 Frees a L{Messenger} from tracking the status associated with a given 571 tracker. If you don't supply a tracker, all outgoing L{messages<Message>} up 572 to the most recent will be settled. 573 """ 574 if tracker is None: 575 tracker = pn_messenger_outgoing_tracker(self._mng) 576 flags = PN_CUMULATIVE 577 else: 578 flags = 0 579 self._check(pn_messenger_settle(self._mng, tracker, flags))
580
581 - def send(self, n=-1):
582 """ 583 This call will block until the indicated number of L{messages<Message>} 584 have been sent, or until the operation times out. If n is -1 this call will 585 block until all outgoing L{messages<Message>} have been sent. If n is 0 then 586 this call will send whatever it can without blocking. 587 """ 588 self._check(pn_messenger_send(self._mng, n))
589
590 - def recv(self, n=None):
591 """ 592 Receives up to I{n} L{messages<Message>} into the incoming queue. If no value 593 for I{n} is supplied, this call will receive as many L{messages<Message>} as it 594 can buffer internally. If the L{Messenger} is in blocking mode, this 595 call will block until at least one L{Message} is available in the 596 incoming queue. 597 """ 598 if n is None: 599 n = -1 600 self._check(pn_messenger_recv(self._mng, n))
601
602 - def work(self, timeout=None):
603 """ 604 Sends or receives any outstanding L{messages<Message>} queued for a L{Messenger}. 605 This will block for the indicated timeout. 606 This method may also do I/O work other than sending and receiving 607 L{messages<Message>}. For example, closing connections after messenger.L{stop}() 608 has been called. 609 """ 610 if timeout is None: 611 t = -1 612 else: 613 t = secs2millis(timeout) 614 err = pn_messenger_work(self._mng, t) 615 if (err == PN_TIMEOUT): 616 return False 617 else: 618 self._check(err) 619 return True
620 621 @property
622 - def receiving(self):
623 return pn_messenger_receiving(self._mng)
624
625 - def interrupt(self):
626 """ 627 The L{Messenger} interface is single-threaded. 628 This is the only L{Messenger} function intended to be called 629 from outside of the L{Messenger} thread. 630 Call this from a non-messenger thread to interrupt 631 a L{Messenger} that is blocking. 632 This will cause any in-progress blocking call to throw 633 the L{Interrupt} exception. If there is no currently blocking 634 call, then the next blocking call will be affected, even if it 635 is within the same thread that interrupt was called from. 636 """ 637 self._check(pn_messenger_interrupt(self._mng))
638
639 - def get(self, message=None):
640 """ 641 Moves the message from the head of the incoming message queue into 642 the supplied message object. Any content in the message will be 643 overwritten. 644 645 A tracker for the incoming L{Message} is returned. The tracker can 646 later be used to communicate your acceptance or rejection of the 647 L{Message}. 648 649 If None is passed in for the L{Message} object, the L{Message} 650 popped from the head of the queue is discarded. 651 652 @type message: Message 653 @param message: the destination message object 654 @return: a tracker 655 """ 656 if message is None: 657 impl = None 658 else: 659 impl = message._msg 660 self._check(pn_messenger_get(self._mng, impl)) 661 if message is not None: 662 message._post_decode() 663 return pn_messenger_incoming_tracker(self._mng)
664
665 - def accept(self, tracker=None):
666 """ 667 Signal the sender that you have acted on the L{Message} 668 pointed to by the tracker. If no tracker is supplied, 669 then all messages that have been returned by the L{get} 670 method are accepted, except those that have already been 671 auto-settled by passing beyond your incoming window size. 672 673 @type tracker: tracker 674 @param tracker: a tracker as returned by get 675 """ 676 if tracker is None: 677 tracker = pn_messenger_incoming_tracker(self._mng) 678 flags = PN_CUMULATIVE 679 else: 680 flags = 0 681 self._check(pn_messenger_accept(self._mng, tracker, flags))
682
683 - def reject(self, tracker=None):
684 """ 685 Rejects the L{Message} indicated by the tracker. If no tracker 686 is supplied, all messages that have been returned by the L{get} 687 method are rejected, except those that have already been auto-settled 688 by passing beyond your outgoing window size. 689 690 @type tracker: tracker 691 @param tracker: a tracker as returned by get 692 """ 693 if tracker is None: 694 tracker = pn_messenger_incoming_tracker(self._mng) 695 flags = PN_CUMULATIVE 696 else: 697 flags = 0 698 self._check(pn_messenger_reject(self._mng, tracker, flags))
699 700 @property
701 - def outgoing(self):
702 """ 703 The outgoing queue depth. 704 """ 705 return pn_messenger_outgoing(self._mng)
706 707 @property
708 - def incoming(self):
709 """ 710 The incoming queue depth. 711 """ 712 return pn_messenger_incoming(self._mng)
713
714 - def route(self, pattern, address):
715 """ 716 Adds a routing rule to a L{Messenger's<Messenger>} internal routing table. 717 718 The route procedure may be used to influence how a L{Messenger} will 719 internally treat a given address or class of addresses. Every call 720 to the route procedure will result in L{Messenger} appending a routing 721 rule to its internal routing table. 722 723 Whenever a L{Message} is presented to a L{Messenger} for delivery, it 724 will match the address of this message against the set of routing 725 rules in order. The first rule to match will be triggered, and 726 instead of routing based on the address presented in the message, 727 the L{Messenger} will route based on the address supplied in the rule. 728 729 The pattern matching syntax supports two types of matches, a '%' 730 will match any character except a '/', and a '*' will match any 731 character including a '/'. 732 733 A routing address is specified as a normal AMQP address, however it 734 may additionally use substitution variables from the pattern match 735 that triggered the rule. 736 737 Any message sent to "foo" will be routed to "amqp://foo.com": 738 739 >>> messenger.route("foo", "amqp://foo.com"); 740 741 Any message sent to "foobar" will be routed to 742 "amqp://foo.com/bar": 743 744 >>> messenger.route("foobar", "amqp://foo.com/bar"); 745 746 Any message sent to bar/<path> will be routed to the corresponding 747 path within the amqp://bar.com domain: 748 749 >>> messenger.route("bar/*", "amqp://bar.com/$1"); 750 751 Route all L{messages<Message>} over TLS: 752 753 >>> messenger.route("amqp:*", "amqps:$1") 754 755 Supply credentials for foo.com: 756 757 >>> messenger.route("amqp://foo.com/*", "amqp://user:password@foo.com/$1"); 758 759 Supply credentials for all domains: 760 761 >>> messenger.route("amqp://*", "amqp://user:password@$1"); 762 763 Route all addresses through a single proxy while preserving the 764 original destination: 765 766 >>> messenger.route("amqp://%/*", "amqp://user:password@proxy/$1/$2"); 767 768 Route any address through a single broker: 769 770 >>> messenger.route("*", "amqp://user:password@broker/$1"); 771 """ 772 self._check(pn_messenger_route(self._mng, unicode2utf8(pattern), unicode2utf8(address)))
773
774 - def rewrite(self, pattern, address):
775 """ 776 Similar to route(), except that the destination of 777 the L{Message} is determined before the message address is rewritten. 778 779 The outgoing address is only rewritten after routing has been 780 finalized. If a message has an outgoing address of 781 "amqp://0.0.0.0:5678", and a rewriting rule that changes its 782 outgoing address to "foo", it will still arrive at the peer that 783 is listening on "amqp://0.0.0.0:5678", but when it arrives there, 784 the receiver will see its outgoing address as "foo". 785 786 The default rewrite rule removes username and password from addresses 787 before they are transmitted. 788 """ 789 self._check(pn_messenger_rewrite(self._mng, unicode2utf8(pattern), unicode2utf8(address)))
790
791 - def selectable(self):
792 return Selectable.wrap(pn_messenger_selectable(self._mng))
793 794 @property
795 - def deadline(self):
796 tstamp = pn_messenger_deadline(self._mng) 797 if tstamp: 798 return millis2secs(tstamp) 799 else: 800 return None
801
802 -class Message(object):
803 """The L{Message} class is a mutable holder of message content. 804 805 @ivar instructions: delivery instructions for the message 806 @type instructions: dict 807 @ivar annotations: infrastructure defined message annotations 808 @type annotations: dict 809 @ivar properties: application defined message properties 810 @type properties: dict 811 @ivar body: message body 812 @type body: bytes | unicode | dict | list | int | long | float | UUID 813 """ 814 815 DEFAULT_PRIORITY = PN_DEFAULT_PRIORITY 816
817 - def __init__(self, body=None, **kwargs):
818 """ 819 @param kwargs: Message property name/value pairs to initialise the Message 820 """ 821 self._msg = pn_message() 822 self._id = Data(pn_message_id(self._msg)) 823 self._correlation_id = Data(pn_message_correlation_id(self._msg)) 824 self.instructions = None 825 self.annotations = None 826 self.properties = None 827 self.body = body 828 for k,v in _compat.iteritems(kwargs): 829 getattr(self, k) # Raise exception if it's not a valid attribute. 830 setattr(self, k, v)
831
832 - def __del__(self):
833 if hasattr(self, "_msg"): 834 pn_message_free(self._msg) 835 del self._msg
836
837 - def _check(self, err):
838 if err < 0: 839 exc = EXCEPTIONS.get(err, MessageException) 840 raise exc("[%s]: %s" % (err, pn_error_text(pn_message_error(self._msg)))) 841 else: 842 return err
843
844 - def _check_property_keys(self):
845 for k in self.properties.keys(): 846 if not isinstance(k, (bytes, str, unicode)): 847 raise MessageException('Application property key is not unicode string: key=%s %s' % (str(k), type(k))) 848 if isinstance(k, bytes): 849 self.properties[_compat.bin2str(k)] = self.properties.pop(k)
850
851 - def _pre_encode(self):
852 inst = Data(pn_message_instructions(self._msg)) 853 ann = Data(pn_message_annotations(self._msg)) 854 props = Data(pn_message_properties(self._msg)) 855 body = Data(pn_message_body(self._msg)) 856 857 inst.clear() 858 if self.instructions is not None: 859 inst.put_object(self.instructions) 860 ann.clear() 861 if self.annotations is not None: 862 ann.put_object(self.annotations) 863 props.clear() 864 if self.properties is not None: 865 self._check_property_keys() 866 props.put_object(self.properties) 867 body.clear() 868 if self.body is not None: 869 body.put_object(self.body)
870
871 - def _post_decode(self):
872 inst = Data(pn_message_instructions(self._msg)) 873 ann = Data(pn_message_annotations(self._msg)) 874 props = Data(pn_message_properties(self._msg)) 875 body = Data(pn_message_body(self._msg)) 876 877 if inst.next(): 878 self.instructions = inst.get_object() 879 else: 880 self.instructions = None 881 if ann.next(): 882 self.annotations = ann.get_object() 883 else: 884 self.annotations = None 885 if props.next(): 886 self.properties = props.get_object() 887 else: 888 self.properties = None 889 if body.next(): 890 self.body = body.get_object() 891 else: 892 self.body = None
893
894 - def clear(self):
895 """ 896 Clears the contents of the L{Message}. All fields will be reset to 897 their default values. 898 """ 899 pn_message_clear(self._msg) 900 self.instructions = None 901 self.annotations = None 902 self.properties = None 903 self.body = None
904
905 - def _is_inferred(self):
906 return pn_message_is_inferred(self._msg)
907
908 - def _set_inferred(self, value):
909 self._check(pn_message_set_inferred(self._msg, bool(value)))
910 911 inferred = property(_is_inferred, _set_inferred, doc=""" 912 The inferred flag for a message indicates how the message content 913 is encoded into AMQP sections. If inferred is true then binary and 914 list values in the body of the message will be encoded as AMQP DATA 915 and AMQP SEQUENCE sections, respectively. If inferred is false, 916 then all values in the body of the message will be encoded as AMQP 917 VALUE sections regardless of their type. 918 """) 919
920 - def _is_durable(self):
921 return pn_message_is_durable(self._msg)
922
923 - def _set_durable(self, value):
924 self._check(pn_message_set_durable(self._msg, bool(value)))
925 926 durable = property(_is_durable, _set_durable, 927 doc=""" 928 The durable property indicates that the message should be held durably 929 by any intermediaries taking responsibility for the message. 930 """) 931
932 - def _get_priority(self):
933 return pn_message_get_priority(self._msg)
934
935 - def _set_priority(self, value):
936 self._check(pn_message_set_priority(self._msg, value))
937 938 priority = property(_get_priority, _set_priority, 939 doc=""" 940 The priority of the message. 941 """) 942
943 - def _get_ttl(self):
944 return millis2secs(pn_message_get_ttl(self._msg))
945
946 - def _set_ttl(self, value):
947 self._check(pn_message_set_ttl(self._msg, secs2millis(value)))
948 949 ttl = property(_get_ttl, _set_ttl, 950 doc=""" 951 The time to live of the message measured in seconds. Expired messages 952 may be dropped. 953 """) 954
955 - def _is_first_acquirer(self):
956 return pn_message_is_first_acquirer(self._msg)
957
958 - def _set_first_acquirer(self, value):
959 self._check(pn_message_set_first_acquirer(self._msg, bool(value)))
960 961 first_acquirer = property(_is_first_acquirer, _set_first_acquirer, 962 doc=""" 963 True iff the recipient is the first to acquire the message. 964 """) 965
966 - def _get_delivery_count(self):
967 return pn_message_get_delivery_count(self._msg)
968
969 - def _set_delivery_count(self, value):
970 self._check(pn_message_set_delivery_count(self._msg, value))
971 972 delivery_count = property(_get_delivery_count, _set_delivery_count, 973 doc=""" 974 The number of delivery attempts made for this message. 975 """) 976 977
978 - def _get_id(self):
979 return self._id.get_object()
980 - def _set_id(self, value):
981 if type(value) in _compat.INT_TYPES: 982 value = ulong(value) 983 self._id.rewind() 984 self._id.put_object(value)
985 id = property(_get_id, _set_id, 986 doc=""" 987 The id of the message. 988 """) 989
990 - def _get_user_id(self):
991 return pn_message_get_user_id(self._msg)
992
993 - def _set_user_id(self, value):
994 self._check(pn_message_set_user_id(self._msg, value))
995 996 user_id = property(_get_user_id, _set_user_id, 997 doc=""" 998 The user id of the message creator. 999 """) 1000
1001 - def _get_address(self):
1002 return utf82unicode(pn_message_get_address(self._msg))
1003
1004 - def _set_address(self, value):
1005 self._check(pn_message_set_address(self._msg, unicode2utf8(value)))
1006 1007 address = property(_get_address, _set_address, 1008 doc=""" 1009 The address of the message. 1010 """) 1011
1012 - def _get_subject(self):
1013 return utf82unicode(pn_message_get_subject(self._msg))
1014
1015 - def _set_subject(self, value):
1016 self._check(pn_message_set_subject(self._msg, unicode2utf8(value)))
1017 1018 subject = property(_get_subject, _set_subject, 1019 doc=""" 1020 The subject of the message. 1021 """) 1022
1023 - def _get_reply_to(self):
1024 return utf82unicode(pn_message_get_reply_to(self._msg))
1025
1026 - def _set_reply_to(self, value):
1027 self._check(pn_message_set_reply_to(self._msg, unicode2utf8(value)))
1028 1029 reply_to = property(_get_reply_to, _set_reply_to, 1030 doc=""" 1031 The reply-to address for the message. 1032 """) 1033
1034 - def _get_correlation_id(self):
1035 return self._correlation_id.get_object()
1036 - def _set_correlation_id(self, value):
1037 if type(value) in _compat.INT_TYPES: 1038 value = ulong(value) 1039 self._correlation_id.rewind() 1040 self._correlation_id.put_object(value)
1041 1042 correlation_id = property(_get_correlation_id, _set_correlation_id, 1043 doc=""" 1044 The correlation-id for the message. 1045 """) 1046
1047 - def _get_content_type(self):
1048 return symbol(utf82unicode(pn_message_get_content_type(self._msg)))
1049
1050 - def _set_content_type(self, value):
1051 self._check(pn_message_set_content_type(self._msg, unicode2utf8(value)))
1052 1053 content_type = property(_get_content_type, _set_content_type, 1054 doc=""" 1055 The content-type of the message. 1056 """) 1057
1058 - def _get_content_encoding(self):
1059 return symbol(utf82unicode(pn_message_get_content_encoding(self._msg)))
1060
1061 - def _set_content_encoding(self, value):
1062 self._check(pn_message_set_content_encoding(self._msg, unicode2utf8(value)))
1063 1064 content_encoding = property(_get_content_encoding, _set_content_encoding, 1065 doc=""" 1066 The content-encoding of the message. 1067 """) 1068
1069 - def _get_expiry_time(self):
1070 return millis2secs(pn_message_get_expiry_time(self._msg))
1071
1072 - def _set_expiry_time(self, value):
1073 self._check(pn_message_set_expiry_time(self._msg, secs2millis(value)))
1074 1075 expiry_time = property(_get_expiry_time, _set_expiry_time, 1076 doc=""" 1077 The expiry time of the message. 1078 """) 1079
1080 - def _get_creation_time(self):
1081 return millis2secs(pn_message_get_creation_time(self._msg))
1082
1083 - def _set_creation_time(self, value):
1084 self._check(pn_message_set_creation_time(self._msg, secs2millis(value)))
1085 1086 creation_time = property(_get_creation_time, _set_creation_time, 1087 doc=""" 1088 The creation time of the message. 1089 """) 1090
1091 - def _get_group_id(self):
1092 return utf82unicode(pn_message_get_group_id(self._msg))
1093
1094 - def _set_group_id(self, value):
1095 self._check(pn_message_set_group_id(self._msg, unicode2utf8(value)))
1096 1097 group_id = property(_get_group_id, _set_group_id, 1098 doc=""" 1099 The group id of the message. 1100 """) 1101
1102 - def _get_group_sequence(self):
1103 return pn_message_get_group_sequence(self._msg)
1104
1105 - def _set_group_sequence(self, value):
1106 self._check(pn_message_set_group_sequence(self._msg, value))
1107 1108 group_sequence = property(_get_group_sequence, _set_group_sequence, 1109 doc=""" 1110 The sequence of the message within its group. 1111 """) 1112
1113 - def _get_reply_to_group_id(self):
1114 return utf82unicode(pn_message_get_reply_to_group_id(self._msg))
1115
1116 - def _set_reply_to_group_id(self, value):
1117 self._check(pn_message_set_reply_to_group_id(self._msg, unicode2utf8(value)))
1118 1119 reply_to_group_id = property(_get_reply_to_group_id, _set_reply_to_group_id, 1120 doc=""" 1121 The group-id for any replies. 1122 """) 1123
1124 - def encode(self):
1125 self._pre_encode() 1126 sz = 16 1127 while True: 1128 err, data = pn_message_encode(self._msg, sz) 1129 if err == PN_OVERFLOW: 1130 sz *= 2 1131 continue 1132 else: 1133 self._check(err) 1134 return data
1135
1136 - def decode(self, data):
1137 self._check(pn_message_decode(self._msg, data)) 1138 self._post_decode()
1139
1140 - def send(self, sender, tag=None):
1141 dlv = sender.delivery(tag or sender.delivery_tag()) 1142 encoded = self.encode() 1143 sender.stream(encoded) 1144 sender.advance() 1145 if sender.snd_settle_mode == Link.SND_SETTLED: 1146 dlv.settle() 1147 return dlv
1148
1149 - def recv(self, link):
1150 """ 1151 Receives and decodes the message content for the current delivery 1152 from the link. Upon success it will return the current delivery 1153 for the link. If there is no current delivery, or if the current 1154 delivery is incomplete, or if the link is not a receiver, it will 1155 return None. 1156 1157 @type link: Link 1158 @param link: the link to receive a message from 1159 @return the delivery associated with the decoded message (or None) 1160 1161 """ 1162 if link.is_sender: return None 1163 dlv = link.current 1164 if not dlv or dlv.partial: return None 1165 dlv.encoded = link.recv(dlv.pending) 1166 link.advance() 1167 # the sender has already forgotten about the delivery, so we might 1168 # as well too 1169 if link.remote_snd_settle_mode == Link.SND_SETTLED: 1170 dlv.settle() 1171 self.decode(dlv.encoded) 1172 return dlv
1173
1174 - def __repr2__(self):
1175 props = [] 1176 for attr in ("inferred", "address", "reply_to", "durable", "ttl", 1177 "priority", "first_acquirer", "delivery_count", "id", 1178 "correlation_id", "user_id", "group_id", "group_sequence", 1179 "reply_to_group_id", "instructions", "annotations", 1180 "properties", "body"): 1181 value = getattr(self, attr) 1182 if value: props.append("%s=%r" % (attr, value)) 1183 return "Message(%s)" % ", ".join(props)
1184
1185 - def __repr__(self):
1186 tmp = pn_string(None) 1187 err = pn_inspect(self._msg, tmp) 1188 result = pn_string_get(tmp) 1189 pn_free(tmp) 1190 self._check(err) 1191 return result
1192
1193 -class Subscription(object):
1194
1195 - def __init__(self, impl):
1196 self._impl = impl
1197 1198 @property
1199 - def address(self):
1200 return pn_subscription_address(self._impl)
1201 1202 _DEFAULT = object()
1203 1204 -class Selectable(Wrapper):
1205 1206 @staticmethod
1207 - def wrap(impl):
1208 if impl is None: 1209 return None 1210 else: 1211 return Selectable(impl)
1212
1213 - def __init__(self, impl):
1214 Wrapper.__init__(self, impl, pn_selectable_attachments)
1215
1216 - def _init(self):
1217 pass
1218
1219 - def fileno(self, fd = _DEFAULT):
1220 if fd is _DEFAULT: 1221 return pn_selectable_get_fd(self._impl) 1222 elif fd is None: 1223 pn_selectable_set_fd(self._impl, PN_INVALID_SOCKET) 1224 else: 1225 pn_selectable_set_fd(self._impl, fd)
1226
1227 - def _is_reading(self):
1228 return pn_selectable_is_reading(self._impl)
1229
1230 - def _set_reading(self, val):
1231 pn_selectable_set_reading(self._impl, bool(val))
1232 1233 reading = property(_is_reading, _set_reading) 1234
1235 - def _is_writing(self):
1236 return pn_selectable_is_writing(self._impl)
1237
1238 - def _set_writing(self, val):
1239 pn_selectable_set_writing(self._impl, bool(val))
1240 1241 writing = property(_is_writing, _set_writing) 1242
1243 - def _get_deadline(self):
1244 tstamp = pn_selectable_get_deadline(self._impl) 1245 if tstamp: 1246 return millis2secs(tstamp) 1247 else: 1248 return None
1249
1250 - def _set_deadline(self, deadline):
1251 pn_selectable_set_deadline(self._impl, secs2millis(deadline))
1252 1253 deadline = property(_get_deadline, _set_deadline) 1254
1255 - def readable(self):
1256 pn_selectable_readable(self._impl)
1257
1258 - def writable(self):
1259 pn_selectable_writable(self._impl)
1260
1261 - def expired(self):
1262 pn_selectable_expired(self._impl)
1263
1264 - def _is_registered(self):
1265 return pn_selectable_is_registered(self._impl)
1266
1267 - def _set_registered(self, registered):
1268 pn_selectable_set_registered(self._impl, registered)
1269 1270 registered = property(_is_registered, _set_registered, 1271 doc=""" 1272 The registered property may be get/set by an I/O polling system to 1273 indicate whether the fd has been registered or not. 1274 """) 1275 1276 @property
1277 - def is_terminal(self):
1278 return pn_selectable_is_terminal(self._impl)
1279
1280 - def terminate(self):
1281 pn_selectable_terminate(self._impl)
1282
1283 - def release(self):
1284 pn_selectable_release(self._impl)
1285
1286 -class DataException(ProtonException):
1287 """ 1288 The DataException class is the root of the Data exception hierarchy. 1289 All exceptions raised by the Data class extend this exception. 1290 """ 1291 pass
1292
1293 -class UnmappedType:
1294
1295 - def __init__(self, msg):
1296 self.msg = msg
1297
1298 - def __repr__(self):
1299 return "UnmappedType(%s)" % self.msg
1300
1301 -class ulong(long):
1302
1303 - def __repr__(self):
1304 return "ulong(%s)" % long.__repr__(self)
1305
1306 -class timestamp(long):
1307
1308 - def __repr__(self):
1309 return "timestamp(%s)" % long.__repr__(self)
1310
1311 -class symbol(unicode):
1312
1313 - def __repr__(self):
1314 return "symbol(%s)" % unicode.__repr__(self)
1315
1316 -class char(unicode):
1317
1318 - def __repr__(self):
1319 return "char(%s)" % unicode.__repr__(self)
1320
1321 -class byte(int):
1322
1323 - def __repr__(self):
1324 return "byte(%s)" % int.__repr__(self)
1325
1326 -class short(int):
1327
1328 - def __repr__(self):
1329 return "short(%s)" % int.__repr__(self)
1330
1331 -class int32(int):
1332
1333 - def __repr__(self):
1334 return "int32(%s)" % int.__repr__(self)
1335
1336 -class ubyte(int):
1337
1338 - def __repr__(self):
1339 return "ubyte(%s)" % int.__repr__(self)
1340
1341 -class ushort(int):
1342
1343 - def __repr__(self):
1344 return "ushort(%s)" % int.__repr__(self)
1345
1346 -class uint(long):
1347
1348 - def __repr__(self):
1349 return "uint(%s)" % long.__repr__(self)
1350
1351 -class float32(float):
1352
1353 - def __repr__(self):
1354 return "float32(%s)" % float.__repr__(self)
1355
1356 -class decimal32(int):
1357
1358 - def __repr__(self):
1359 return "decimal32(%s)" % int.__repr__(self)
1360
1361 -class decimal64(long):
1362
1363 - def __repr__(self):
1364 return "decimal64(%s)" % long.__repr__(self)
1365
1366 -class decimal128(bytes):
1367
1368 - def __repr__(self):
1369 return "decimal128(%s)" % bytes.__repr__(self)
1370
1371 -class Described(object):
1372
1373 - def __init__(self, descriptor, value):
1374 self.descriptor = descriptor 1375 self.value = value
1376
1377 - def __repr__(self):
1378 return "Described(%r, %r)" % (self.descriptor, self.value)
1379
1380 - def __eq__(self, o):
1381 if isinstance(o, Described): 1382 return self.descriptor == o.descriptor and self.value == o.value 1383 else: 1384 return False
1385 1386 UNDESCRIBED = Constant("UNDESCRIBED")
1387 1388 -class Array(object):
1389
1390 - def __init__(self, descriptor, type, *elements):
1391 self.descriptor = descriptor 1392 self.type = type 1393 self.elements = elements
1394
1395 - def __iter__(self):
1396 return iter(self.elements)
1397
1398 - def __repr__(self):
1399 if self.elements: 1400 els = ", %s" % (", ".join(map(repr, self.elements))) 1401 else: 1402 els = "" 1403 return "Array(%r, %r%s)" % (self.descriptor, self.type, els)
1404
1405 - def __eq__(self, o):
1406 if isinstance(o, Array): 1407 return self.descriptor == o.descriptor and \ 1408 self.type == o.type and self.elements == o.elements 1409 else: 1410 return False
1411
1412 -class Data:
1413 """ 1414 The L{Data} class provides an interface for decoding, extracting, 1415 creating, and encoding arbitrary AMQP data. A L{Data} object 1416 contains a tree of AMQP values. Leaf nodes in this tree correspond 1417 to scalars in the AMQP type system such as L{ints<INT>} or 1418 L{strings<STRING>}. Non-leaf nodes in this tree correspond to 1419 compound values in the AMQP type system such as L{lists<LIST>}, 1420 L{maps<MAP>}, L{arrays<ARRAY>}, or L{described values<DESCRIBED>}. 1421 The root node of the tree is the L{Data} object itself and can have 1422 an arbitrary number of children. 1423 1424 A L{Data} object maintains the notion of the current sibling node 1425 and a current parent node. Siblings are ordered within their parent. 1426 Values are accessed and/or added by using the L{next}, L{prev}, 1427 L{enter}, and L{exit} methods to navigate to the desired location in 1428 the tree and using the supplied variety of put_*/get_* methods to 1429 access or add a value of the desired type. 1430 1431 The put_* methods will always add a value I{after} the current node 1432 in the tree. If the current node has a next sibling the put_* method 1433 will overwrite the value on this node. If there is no current node 1434 or the current node has no next sibling then one will be added. The 1435 put_* methods always set the added/modified node to the current 1436 node. The get_* methods read the value of the current node and do 1437 not change which node is current. 1438 1439 The following types of scalar values are supported: 1440 1441 - L{NULL} 1442 - L{BOOL} 1443 - L{UBYTE} 1444 - L{USHORT} 1445 - L{SHORT} 1446 - L{UINT} 1447 - L{INT} 1448 - L{ULONG} 1449 - L{LONG} 1450 - L{FLOAT} 1451 - L{DOUBLE} 1452 - L{BINARY} 1453 - L{STRING} 1454 - L{SYMBOL} 1455 1456 The following types of compound values are supported: 1457 1458 - L{DESCRIBED} 1459 - L{ARRAY} 1460 - L{LIST} 1461 - L{MAP} 1462 """ 1463 1464 NULL = PN_NULL; "A null value." 1465 BOOL = PN_BOOL; "A boolean value." 1466 UBYTE = PN_UBYTE; "An unsigned byte value." 1467 BYTE = PN_BYTE; "A signed byte value." 1468 USHORT = PN_USHORT; "An unsigned short value." 1469 SHORT = PN_SHORT; "A short value." 1470 UINT = PN_UINT; "An unsigned int value." 1471 INT = PN_INT; "A signed int value." 1472 CHAR = PN_CHAR; "A character value." 1473 ULONG = PN_ULONG; "An unsigned long value." 1474 LONG = PN_LONG; "A signed long value." 1475 TIMESTAMP = PN_TIMESTAMP; "A timestamp value." 1476 FLOAT = PN_FLOAT; "A float value." 1477 DOUBLE = PN_DOUBLE; "A double value." 1478 DECIMAL32 = PN_DECIMAL32; "A DECIMAL32 value." 1479 DECIMAL64 = PN_DECIMAL64; "A DECIMAL64 value." 1480 DECIMAL128 = PN_DECIMAL128; "A DECIMAL128 value." 1481 UUID = PN_UUID; "A UUID value." 1482 BINARY = PN_BINARY; "A binary string." 1483 STRING = PN_STRING; "A unicode string." 1484 SYMBOL = PN_SYMBOL; "A symbolic string." 1485 DESCRIBED = PN_DESCRIBED; "A described value." 1486 ARRAY = PN_ARRAY; "An array value." 1487 LIST = PN_LIST; "A list value." 1488 MAP = PN_MAP; "A map value." 1489 1490 type_names = { 1491 NULL: "null", 1492 BOOL: "bool", 1493 BYTE: "byte", 1494 UBYTE: "ubyte", 1495 SHORT: "short", 1496 USHORT: "ushort", 1497 INT: "int", 1498 UINT: "uint", 1499 CHAR: "char", 1500 LONG: "long", 1501 ULONG: "ulong", 1502 TIMESTAMP: "timestamp", 1503 FLOAT: "float", 1504 DOUBLE: "double", 1505 DECIMAL32: "decimal32", 1506 DECIMAL64: "decimal64", 1507 DECIMAL128: "decimal128", 1508 UUID: "uuid", 1509 BINARY: "binary", 1510 STRING: "string", 1511 SYMBOL: "symbol", 1512 DESCRIBED: "described", 1513 ARRAY: "array", 1514 LIST: "list", 1515 MAP: "map" 1516 } 1517 1518 @classmethod
1519 - def type_name(type): return Data.type_names[type]
1520
1521 - def __init__(self, capacity=16):
1522 if type(capacity) in _compat.INT_TYPES: 1523 self._data = pn_data(capacity) 1524 self._free = True 1525 else: 1526 self._data = capacity 1527 self._free = False
1528
1529 - def __del__(self):
1530 if self._free and hasattr(self, "_data"): 1531 pn_data_free(self._data) 1532 del self._data
1533
1534 - def _check(self, err):
1535 if err < 0: 1536 exc = EXCEPTIONS.get(err, DataException) 1537 raise exc("[%s]: %s" % (err, pn_error_text(pn_data_error(self._data)))) 1538 else: 1539 return err
1540
1541 - def clear(self):
1542 """ 1543 Clears the data object. 1544 """ 1545 pn_data_clear(self._data)
1546
1547 - def rewind(self):
1548 """ 1549 Clears current node and sets the parent to the root node. Clearing the 1550 current node sets it _before_ the first node, calling next() will advance to 1551 the first node. 1552 """ 1553 assert self._data is not None 1554 pn_data_rewind(self._data)
1555
1556 - def next(self):
1557 """ 1558 Advances the current node to its next sibling and returns its 1559 type. If there is no next sibling the current node remains 1560 unchanged and None is returned. 1561 """ 1562 found = pn_data_next(self._data) 1563 if found: 1564 return self.type() 1565 else: 1566 return None
1567
1568 - def prev(self):
1569 """ 1570 Advances the current node to its previous sibling and returns its 1571 type. If there is no previous sibling the current node remains 1572 unchanged and None is returned. 1573 """ 1574 found = pn_data_prev(self._data) 1575 if found: 1576 return self.type() 1577 else: 1578 return None
1579
1580 - def enter(self):
1581 """ 1582 Sets the parent node to the current node and clears the current node. 1583 Clearing the current node sets it _before_ the first child, 1584 call next() advances to the first child. 1585 """ 1586 return pn_data_enter(self._data)
1587
1588 - def exit(self):
1589 """ 1590 Sets the current node to the parent node and the parent node to 1591 its own parent. 1592 """ 1593 return pn_data_exit(self._data)
1594
1595 - def lookup(self, name):
1596 return pn_data_lookup(self._data, name)
1597
1598 - def narrow(self):
1599 pn_data_narrow(self._data)
1600
1601 - def widen(self):
1602 pn_data_widen(self._data)
1603
1604 - def type(self):
1605 """ 1606 Returns the type of the current node. 1607 """ 1608 dtype = pn_data_type(self._data) 1609 if dtype == -1: 1610 return None 1611 else: 1612 return dtype
1613
1614 - def encoded_size(self):
1615 """ 1616 Returns the size in bytes needed to encode the data in AMQP format. 1617 """ 1618 return pn_data_encoded_size(self._data)
1619
1620 - def encode(self):
1621 """ 1622 Returns a representation of the data encoded in AMQP format. 1623 """ 1624 size = 1024 1625 while True: 1626 cd, enc = pn_data_encode(self._data, size) 1627 if cd == PN_OVERFLOW: 1628 size *= 2 1629 elif cd >= 0: 1630 return enc 1631 else: 1632 self._check(cd)
1633
1634 - def decode(self, encoded):
1635 """ 1636 Decodes the first value from supplied AMQP data and returns the 1637 number of bytes consumed. 1638 1639 @type encoded: binary 1640 @param encoded: AMQP encoded binary data 1641 """ 1642 return self._check(pn_data_decode(self._data, encoded))
1643
1644 - def put_list(self):
1645 """ 1646 Puts a list value. Elements may be filled by entering the list 1647 node and putting element values. 1648 1649 >>> data = Data() 1650 >>> data.put_list() 1651 >>> data.enter() 1652 >>> data.put_int(1) 1653 >>> data.put_int(2) 1654 >>> data.put_int(3) 1655 >>> data.exit() 1656 """ 1657 self._check(pn_data_put_list(self._data))
1658
1659 - def put_map(self):
1660 """ 1661 Puts a map value. Elements may be filled by entering the map node 1662 and putting alternating key value pairs. 1663 1664 >>> data = Data() 1665 >>> data.put_map() 1666 >>> data.enter() 1667 >>> data.put_string("key") 1668 >>> data.put_string("value") 1669 >>> data.exit() 1670 """ 1671 self._check(pn_data_put_map(self._data))
1672
1673 - def put_array(self, described, element_type):
1674 """ 1675 Puts an array value. Elements may be filled by entering the array 1676 node and putting the element values. The values must all be of the 1677 specified array element type. If an array is described then the 1678 first child value of the array is the descriptor and may be of any 1679 type. 1680 1681 >>> data = Data() 1682 >>> 1683 >>> data.put_array(False, Data.INT) 1684 >>> data.enter() 1685 >>> data.put_int(1) 1686 >>> data.put_int(2) 1687 >>> data.put_int(3) 1688 >>> data.exit() 1689 >>> 1690 >>> data.put_array(True, Data.DOUBLE) 1691 >>> data.enter() 1692 >>> data.put_symbol("array-descriptor") 1693 >>> data.put_double(1.1) 1694 >>> data.put_double(1.2) 1695 >>> data.put_double(1.3) 1696 >>> data.exit() 1697 1698 @type described: bool 1699 @param described: specifies whether the array is described 1700 @type element_type: int 1701 @param element_type: the type of the array elements 1702 """ 1703 self._check(pn_data_put_array(self._data, described, element_type))
1704
1705 - def put_described(self):
1706 """ 1707 Puts a described value. A described node has two children, the 1708 descriptor and the value. These are specified by entering the node 1709 and putting the desired values. 1710 1711 >>> data = Data() 1712 >>> data.put_described() 1713 >>> data.enter() 1714 >>> data.put_symbol("value-descriptor") 1715 >>> data.put_string("the value") 1716 >>> data.exit() 1717 """ 1718 self._check(pn_data_put_described(self._data))
1719
1720 - def put_null(self):
1721 """ 1722 Puts a null value. 1723 """ 1724 self._check(pn_data_put_null(self._data))
1725
1726 - def put_bool(self, b):
1727 """ 1728 Puts a boolean value. 1729 1730 @param b: a boolean value 1731 """ 1732 self._check(pn_data_put_bool(self._data, b))
1733
1734 - def put_ubyte(self, ub):
1735 """ 1736 Puts an unsigned byte value. 1737 1738 @param ub: an integral value 1739 """ 1740 self._check(pn_data_put_ubyte(self._data, ub))
1741
1742 - def put_byte(self, b):
1743 """ 1744 Puts a signed byte value. 1745 1746 @param b: an integral value 1747 """ 1748 self._check(pn_data_put_byte(self._data, b))
1749
1750 - def put_ushort(self, us):
1751 """ 1752 Puts an unsigned short value. 1753 1754 @param us: an integral value. 1755 """ 1756 self._check(pn_data_put_ushort(self._data, us))
1757
1758 - def put_short(self, s):
1759 """ 1760 Puts a signed short value. 1761 1762 @param s: an integral value 1763 """ 1764 self._check(pn_data_put_short(self._data, s))
1765
1766 - def put_uint(self, ui):
1767 """ 1768 Puts an unsigned int value. 1769 1770 @param ui: an integral value 1771 """ 1772 self._check(pn_data_put_uint(self._data, ui))
1773
1774 - def put_int(self, i):
1775 """ 1776 Puts a signed int value. 1777 1778 @param i: an integral value 1779 """ 1780 self._check(pn_data_put_int(self._data, i))
1781
1782 - def put_char(self, c):
1783 """ 1784 Puts a char value. 1785 1786 @param c: a single character 1787 """ 1788 self._check(pn_data_put_char(self._data, ord(c)))
1789
1790 - def put_ulong(self, ul):
1791 """ 1792 Puts an unsigned long value. 1793 1794 @param ul: an integral value 1795 """ 1796 self._check(pn_data_put_ulong(self._data, ul))
1797
1798 - def put_long(self, l):
1799 """ 1800 Puts a signed long value. 1801 1802 @param l: an integral value 1803 """ 1804 self._check(pn_data_put_long(self._data, l))
1805
1806 - def put_timestamp(self, t):
1807 """ 1808 Puts a timestamp value. 1809 1810 @param t: an integral value 1811 """ 1812 self._check(pn_data_put_timestamp(self._data, t))
1813
1814 - def put_float(self, f):
1815 """ 1816 Puts a float value. 1817 1818 @param f: a floating point value 1819 """ 1820 self._check(pn_data_put_float(self._data, f))
1821
1822 - def put_double(self, d):
1823 """ 1824 Puts a double value. 1825 1826 @param d: a floating point value. 1827 """ 1828 self._check(pn_data_put_double(self._data, d))
1829
1830 - def put_decimal32(self, d):
1831 """ 1832 Puts a decimal32 value. 1833 1834 @param d: a decimal32 value 1835 """ 1836 self._check(pn_data_put_decimal32(self._data, d))
1837
1838 - def put_decimal64(self, d):
1839 """ 1840 Puts a decimal64 value. 1841 1842 @param d: a decimal64 value 1843 """ 1844 self._check(pn_data_put_decimal64(self._data, d))
1845
1846 - def put_decimal128(self, d):
1847 """ 1848 Puts a decimal128 value. 1849 1850 @param d: a decimal128 value 1851 """ 1852 self._check(pn_data_put_decimal128(self._data, d))
1853
1854 - def put_uuid(self, u):
1855 """ 1856 Puts a UUID value. 1857 1858 @param u: a uuid value 1859 """ 1860 self._check(pn_data_put_uuid(self._data, u.bytes))
1861
1862 - def put_binary(self, b):
1863 """ 1864 Puts a binary value. 1865 1866 @type b: binary 1867 @param b: a binary value 1868 """ 1869 self._check(pn_data_put_binary(self._data, b))
1870
1871 - def put_memoryview(self, mv):
1872 """Put a python memoryview object as an AMQP binary value""" 1873 self.put_binary(mv.tobytes())
1874
1875 - def put_buffer(self, buff):
1876 """Put a python buffer object as an AMQP binary value""" 1877 self.put_binary(bytes(buff))
1878
1879 - def put_string(self, s):
1880 """ 1881 Puts a unicode value. 1882 1883 @type s: unicode 1884 @param s: a unicode value 1885 """ 1886 self._check(pn_data_put_string(self._data, s.encode("utf8")))
1887
1888 - def put_symbol(self, s):
1889 """ 1890 Puts a symbolic value. 1891 1892 @type s: string 1893 @param s: the symbol name 1894 """ 1895 self._check(pn_data_put_symbol(self._data, s.encode('ascii')))
1896
1897 - def get_list(self):
1898 """ 1899 If the current node is a list, return the number of elements, 1900 otherwise return zero. List elements can be accessed by entering 1901 the list. 1902 1903 >>> count = data.get_list() 1904 >>> data.enter() 1905 >>> for i in range(count): 1906 ... type = data.next() 1907 ... if type == Data.STRING: 1908 ... print data.get_string() 1909 ... elif type == ...: 1910 ... ... 1911 >>> data.exit() 1912 """ 1913 return pn_data_get_list(self._data)
1914
1915 - def get_map(self):
1916 """ 1917 If the current node is a map, return the number of child elements, 1918 otherwise return zero. Key value pairs can be accessed by entering 1919 the map. 1920 1921 >>> count = data.get_map() 1922 >>> data.enter() 1923 >>> for i in range(count/2): 1924 ... type = data.next() 1925 ... if type == Data.STRING: 1926 ... print data.get_string() 1927 ... elif type == ...: 1928 ... ... 1929 >>> data.exit() 1930 """ 1931 return pn_data_get_map(self._data)
1932
1933 - def get_array(self):
1934 """ 1935 If the current node is an array, return a tuple of the element 1936 count, a boolean indicating whether the array is described, and 1937 the type of each element, otherwise return (0, False, None). Array 1938 data can be accessed by entering the array. 1939 1940 >>> # read an array of strings with a symbolic descriptor 1941 >>> count, described, type = data.get_array() 1942 >>> data.enter() 1943 >>> data.next() 1944 >>> print "Descriptor:", data.get_symbol() 1945 >>> for i in range(count): 1946 ... data.next() 1947 ... print "Element:", data.get_string() 1948 >>> data.exit() 1949 """ 1950 count = pn_data_get_array(self._data) 1951 described = pn_data_is_array_described(self._data) 1952 type = pn_data_get_array_type(self._data) 1953 if type == -1: 1954 type = None 1955 return count, described, type
1956
1957 - def is_described(self):
1958 """ 1959 Checks if the current node is a described value. The descriptor 1960 and value may be accessed by entering the described value. 1961 1962 >>> # read a symbolically described string 1963 >>> assert data.is_described() # will error if the current node is not described 1964 >>> data.enter() 1965 >>> data.next() 1966 >>> print data.get_symbol() 1967 >>> data.next() 1968 >>> print data.get_string() 1969 >>> data.exit() 1970 """ 1971 return pn_data_is_described(self._data)
1972
1973 - def is_null(self):
1974 """ 1975 Checks if the current node is a null. 1976 """ 1977 return pn_data_is_null(self._data)
1978
1979 - def get_bool(self):
1980 """ 1981 If the current node is a boolean, returns its value, returns False 1982 otherwise. 1983 """ 1984 return pn_data_get_bool(self._data)
1985
1986 - def get_ubyte(self):
1987 """ 1988 If the current node is an unsigned byte, returns its value, 1989 returns 0 otherwise. 1990 """ 1991 return ubyte(pn_data_get_ubyte(self._data))
1992
1993 - def get_byte(self):
1994 """ 1995 If the current node is a signed byte, returns its value, returns 0 1996 otherwise. 1997 """ 1998 return byte(pn_data_get_byte(self._data))
1999
2000 - def get_ushort(self):
2001 """ 2002 If the current node is an unsigned short, returns its value, 2003 returns 0 otherwise. 2004 """ 2005 return ushort(pn_data_get_ushort(self._data))
2006
2007 - def get_short(self):
2008 """ 2009 If the current node is a signed short, returns its value, returns 2010 0 otherwise. 2011 """ 2012 return short(pn_data_get_short(self._data))
2013
2014 - def get_uint(self):
2015 """ 2016 If the current node is an unsigned int, returns its value, returns 2017 0 otherwise. 2018 """ 2019 return uint(pn_data_get_uint(self._data))
2020
2021 - def get_int(self):
2022 """ 2023 If the current node is a signed int, returns its value, returns 0 2024 otherwise. 2025 """ 2026 return int32(pn_data_get_int(self._data))
2027
2028 - def get_char(self):
2029 """ 2030 If the current node is a char, returns its value, returns 0 2031 otherwise. 2032 """ 2033 return char(_compat.unichar(pn_data_get_char(self._data)))
2034
2035 - def get_ulong(self):
2036 """ 2037 If the current node is an unsigned long, returns its value, 2038 returns 0 otherwise. 2039 """ 2040 return ulong(pn_data_get_ulong(self._data))
2041
2042 - def get_long(self):
2043 """ 2044 If the current node is an signed long, returns its value, returns 2045 0 otherwise. 2046 """ 2047 return long(pn_data_get_long(self._data))
2048
2049 - def get_timestamp(self):
2050 """ 2051 If the current node is a timestamp, returns its value, returns 0 2052 otherwise. 2053 """ 2054 return timestamp(pn_data_get_timestamp(self._data))
2055
2056 - def get_float(self):
2057 """ 2058 If the current node is a float, returns its value, raises 0 2059 otherwise. 2060 """ 2061 return float32(pn_data_get_float(self._data))
2062
2063 - def get_double(self):
2064 """ 2065 If the current node is a double, returns its value, returns 0 2066 otherwise. 2067 """ 2068 return pn_data_get_double(self._data)
2069 2070 # XXX: need to convert
2071 - def get_decimal32(self):
2072 """ 2073 If the current node is a decimal32, returns its value, returns 0 2074 otherwise. 2075 """ 2076 return decimal32(pn_data_get_decimal32(self._data))
2077 2078 # XXX: need to convert
2079 - def get_decimal64(self):
2080 """ 2081 If the current node is a decimal64, returns its value, returns 0 2082 otherwise. 2083 """ 2084 return decimal64(pn_data_get_decimal64(self._data))
2085 2086 # XXX: need to convert
2087 - def get_decimal128(self):
2088 """ 2089 If the current node is a decimal128, returns its value, returns 0 2090 otherwise. 2091 """ 2092 return decimal128(pn_data_get_decimal128(self._data))
2093
2094 - def get_uuid(self):
2095 """ 2096 If the current node is a UUID, returns its value, returns None 2097 otherwise. 2098 """ 2099 if pn_data_type(self._data) == Data.UUID: 2100 return uuid.UUID(bytes=pn_data_get_uuid(self._data)) 2101 else: 2102 return None
2103
2104 - def get_binary(self):
2105 """ 2106 If the current node is binary, returns its value, returns "" 2107 otherwise. 2108 """ 2109 return pn_data_get_binary(self._data)
2110
2111 - def get_string(self):
2112 """ 2113 If the current node is a string, returns its value, returns "" 2114 otherwise. 2115 """ 2116 return pn_data_get_string(self._data).decode("utf8")
2117
2118 - def get_symbol(self):
2119 """ 2120 If the current node is a symbol, returns its value, returns "" 2121 otherwise. 2122 """ 2123 return symbol(pn_data_get_symbol(self._data).decode('ascii'))
2124
2125 - def copy(self, src):
2126 self._check(pn_data_copy(self._data, src._data))
2127
2128 - def format(self):
2129 sz = 16 2130 while True: 2131 err, result = pn_data_format(self._data, sz) 2132 if err == PN_OVERFLOW: 2133 sz *= 2 2134 continue 2135 else: 2136 self._check(err) 2137 return result
2138
2139 - def dump(self):
2140 pn_data_dump(self._data)
2141
2142 - def put_dict(self, d):
2143 self.put_map() 2144 self.enter() 2145 try: 2146 for k, v in d.items(): 2147 self.put_object(k) 2148 self.put_object(v) 2149 finally: 2150 self.exit()
2151
2152 - def get_dict(self):
2153 if self.enter(): 2154 try: 2155 result = {} 2156 while self.next(): 2157 k = self.get_object() 2158 if self.next(): 2159 v = self.get_object() 2160 else: 2161 v = None 2162 result[k] = v 2163 finally: 2164 self.exit() 2165 return result
2166
2167 - def put_sequence(self, s):
2168 self.put_list() 2169 self.enter() 2170 try: 2171 for o in s: 2172 self.put_object(o) 2173 finally: 2174 self.exit()
2175
2176 - def get_sequence(self):
2177 if self.enter(): 2178 try: 2179 result = [] 2180 while self.next(): 2181 result.append(self.get_object()) 2182 finally: 2183 self.exit() 2184 return result
2185
2186 - def get_py_described(self):
2187 if self.enter(): 2188 try: 2189 self.next() 2190 descriptor = self.get_object() 2191 self.next() 2192 value = self.get_object() 2193 finally: 2194 self.exit() 2195 return Described(descriptor, value)
2196
2197 - def put_py_described(self, d):
2198 self.put_described() 2199 self.enter() 2200 try: 2201 self.put_object(d.descriptor) 2202 self.put_object(d.value) 2203 finally: 2204 self.exit()
2205
2206 - def get_py_array(self):
2207 """ 2208 If the current node is an array, return an Array object 2209 representing the array and its contents. Otherwise return None. 2210 This is a convenience wrapper around get_array, enter, etc. 2211 """ 2212 2213 count, described, type = self.get_array() 2214 if type is None: return None 2215 if self.enter(): 2216 try: 2217 if described: 2218 self.next() 2219 descriptor = self.get_object() 2220 else: 2221 descriptor = UNDESCRIBED 2222 elements = [] 2223 while self.next(): 2224 elements.append(self.get_object()) 2225 finally: 2226 self.exit() 2227 return Array(descriptor, type, *elements)
2228
2229 - def put_py_array(self, a):
2230 described = a.descriptor != UNDESCRIBED 2231 self.put_array(described, a.type) 2232 self.enter() 2233 try: 2234 if described: 2235 self.put_object(a.descriptor) 2236 for e in a.elements: 2237 self.put_object(e) 2238 finally: 2239 self.exit()
2240 2241 put_mappings = { 2242 None.__class__: lambda s, _: s.put_null(), 2243 bool: put_bool, 2244 ubyte: put_ubyte, 2245 ushort: put_ushort, 2246 uint: put_uint, 2247 ulong: put_ulong, 2248 byte: put_byte, 2249 short: put_short, 2250 int32: put_int, 2251 long: put_long, 2252 float32: put_float, 2253 float: put_double, 2254 decimal32: put_decimal32, 2255 decimal64: put_decimal64, 2256 decimal128: put_decimal128, 2257 char: put_char, 2258 timestamp: put_timestamp, 2259 uuid.UUID: put_uuid, 2260 bytes: put_binary, 2261 unicode: put_string, 2262 symbol: put_symbol, 2263 list: put_sequence, 2264 tuple: put_sequence, 2265 dict: put_dict, 2266 Described: put_py_described, 2267 Array: put_py_array 2268 } 2269 # for python 3.x, long is merely an alias for int, but for python 2.x 2270 # we need to add an explicit int since it is a different type 2271 if int not in put_mappings: 2272 put_mappings[int] = put_int 2273 # Python >=3.0 has 'memoryview', <=2.5 has 'buffer', >=2.6 has both. 2274 try: put_mappings[memoryview] = put_memoryview 2275 except NameError: pass 2276 try: put_mappings[buffer] = put_buffer 2277 except NameError: pass 2278 get_mappings = { 2279 NULL: lambda s: None, 2280 BOOL: get_bool, 2281 BYTE: get_byte, 2282 UBYTE: get_ubyte, 2283 SHORT: get_short, 2284 USHORT: get_ushort, 2285 INT: get_int, 2286 UINT: get_uint, 2287 CHAR: get_char, 2288 LONG: get_long, 2289 ULONG: get_ulong, 2290 TIMESTAMP: get_timestamp, 2291 FLOAT: get_float, 2292 DOUBLE: get_double, 2293 DECIMAL32: get_decimal32, 2294 DECIMAL64: get_decimal64, 2295 DECIMAL128: get_decimal128, 2296 UUID: get_uuid, 2297 BINARY: get_binary, 2298 STRING: get_string, 2299 SYMBOL: get_symbol, 2300 DESCRIBED: get_py_described, 2301 ARRAY: get_py_array, 2302 LIST: get_sequence, 2303 MAP: get_dict 2304 } 2305 2306
2307 - def put_object(self, obj):
2308 putter = self.put_mappings[obj.__class__] 2309 putter(self, obj)
2310
2311 - def get_object(self):
2312 type = self.type() 2313 if type is None: return None 2314 getter = self.get_mappings.get(type) 2315 if getter: 2316 return getter(self) 2317 else: 2318 return UnmappedType(str(type))
2319
2320 -class ConnectionException(ProtonException):
2321 pass
2322
2323 -class Endpoint(object):
2324 2325 LOCAL_UNINIT = PN_LOCAL_UNINIT 2326 REMOTE_UNINIT = PN_REMOTE_UNINIT 2327 LOCAL_ACTIVE = PN_LOCAL_ACTIVE 2328 REMOTE_ACTIVE = PN_REMOTE_ACTIVE 2329 LOCAL_CLOSED = PN_LOCAL_CLOSED 2330 REMOTE_CLOSED = PN_REMOTE_CLOSED 2331
2332 - def _init(self):
2333 self.condition = None
2334
2335 - def _update_cond(self):
2336 obj2cond(self.condition, self._get_cond_impl())
2337 2338 @property
2339 - def remote_condition(self):
2340 return cond2obj(self._get_remote_cond_impl())
2341 2342 # the following must be provided by subclasses
2343 - def _get_cond_impl(self):
2344 assert False, "Subclass must override this!"
2345
2346 - def _get_remote_cond_impl(self):
2347 assert False, "Subclass must override this!"
2348
2349 - def _get_handler(self):
2350 from . import reactor 2351 ractor = reactor.Reactor.wrap(pn_object_reactor(self._impl)) 2352 if ractor: 2353 on_error = ractor.on_error_delegate() 2354 else: 2355 on_error = None 2356 record = self._get_attachments() 2357 return WrappedHandler.wrap(pn_record_get_handler(record), on_error)
2358
2359 - def _set_handler(self, handler):
2360 from . import reactor 2361 ractor = reactor.Reactor.wrap(pn_object_reactor(self._impl)) 2362 if ractor: 2363 on_error = ractor.on_error_delegate() 2364 else: 2365 on_error = None 2366 impl = _chandler(handler, on_error) 2367 record = self._get_attachments() 2368 pn_record_set_handler(record, impl) 2369 pn_decref(impl)
2370 2371 handler = property(_get_handler, _set_handler) 2372 2373 @property
2374 - def transport(self):
2375 return self.connection.transport
2376
2377 -class Condition:
2378
2379 - def __init__(self, name, description=None, info=None):
2380 self.name = name 2381 self.description = description 2382 self.info = info
2383
2384 - def __repr__(self):
2385 return "Condition(%s)" % ", ".join([repr(x) for x in 2386 (self.name, self.description, self.info) 2387 if x])
2388
2389 - def __eq__(self, o):
2390 if not isinstance(o, Condition): return False 2391 return self.name == o.name and \ 2392 self.description == o.description and \ 2393 self.info == o.info
2394
2395 -def obj2cond(obj, cond):
2396 pn_condition_clear(cond) 2397 if obj: 2398 pn_condition_set_name(cond, str(obj.name)) 2399 pn_condition_set_description(cond, obj.description) 2400 info = Data(pn_condition_info(cond)) 2401 if obj.info: 2402 info.put_object(obj.info)
2403
2404 -def cond2obj(cond):
2405 if pn_condition_is_set(cond): 2406 return Condition(pn_condition_get_name(cond), 2407 pn_condition_get_description(cond), 2408 dat2obj(pn_condition_info(cond))) 2409 else: 2410 return None
2411
2412 -def dat2obj(dimpl):
2413 if dimpl: 2414 d = Data(dimpl) 2415 d.rewind() 2416 d.next() 2417 obj = d.get_object() 2418 d.rewind() 2419 return obj
2420
2421 -def obj2dat(obj, dimpl):
2422 if obj is not None: 2423 d = Data(dimpl) 2424 d.put_object(obj)
2425
2426 -def secs2millis(secs):
2427 return long(secs*1000)
2428
2429 -def millis2secs(millis):
2430 return float(millis)/1000.0
2431
2432 -def timeout2millis(secs):
2433 if secs is None: return PN_MILLIS_MAX 2434 return secs2millis(secs)
2435
2436 -def millis2timeout(millis):
2437 if millis == PN_MILLIS_MAX: return None 2438 return millis2secs(millis)
2439
2440 -def unicode2utf8(string):
2441 """Some Proton APIs expect a null terminated string. Convert python text 2442 types to UTF8 to avoid zero bytes introduced by other multi-byte encodings. 2443 This method will throw if the string cannot be converted. 2444 """ 2445 if string is None: 2446 return None 2447 if _compat.IS_PY2: 2448 if isinstance(string, unicode): 2449 return string.encode('utf-8') 2450 elif isinstance(string, str): 2451 return string 2452 else: 2453 # decoding a string results in bytes 2454 if isinstance(string, str): 2455 string = string.encode('utf-8') 2456 # fall through 2457 if isinstance(string, bytes): 2458 return string.decode('utf-8') 2459 raise TypeError("Unrecognized string type: %r (%s)" % (string, type(string)))
2460
2461 -def utf82unicode(string):
2462 """Covert C strings returned from proton-c into python unicode""" 2463 if string is None: 2464 return None 2465 if isinstance(string, _compat.TEXT_TYPES): 2466 # already unicode 2467 return string 2468 elif isinstance(string, _compat.BINARY_TYPES): 2469 return string.decode('utf8') 2470 else: 2471 raise TypeError("Unrecognized string type")
2472
2473 -class Connection(Wrapper, Endpoint):
2474 """ 2475 A representation of an AMQP connection 2476 """ 2477 2478 @staticmethod
2479 - def wrap(impl):
2480 if impl is None: 2481 return None 2482 else: 2483 return Connection(impl)
2484
2485 - def __init__(self, impl = pn_connection):
2486 Wrapper.__init__(self, impl, pn_connection_attachments)
2487
2488 - def _init(self):
2489 Endpoint._init(self) 2490 self.offered_capabilities = None 2491 self.desired_capabilities = None 2492 self.properties = None
2493
2494 - def _get_attachments(self):
2495 return pn_connection_attachments(self._impl)
2496 2497 @property
2498 - def connection(self):
2499 return self
2500 2501 @property
2502 - def transport(self):
2503 return Transport.wrap(pn_connection_transport(self._impl))
2504
2505 - def _check(self, err):
2506 if err < 0: 2507 exc = EXCEPTIONS.get(err, ConnectionException) 2508 raise exc("[%s]: %s" % (err, pn_connection_error(self._impl))) 2509 else: 2510 return err
2511
2512 - def _get_cond_impl(self):
2513 return pn_connection_condition(self._impl)
2514
2515 - def _get_remote_cond_impl(self):
2516 return pn_connection_remote_condition(self._impl)
2517
2518 - def collect(self, collector):
2519 if collector is None: 2520 pn_connection_collect(self._impl, None) 2521 else: 2522 pn_connection_collect(self._impl, collector._impl) 2523 self._collector = weakref.ref(collector)
2524
2525 - def _get_container(self):
2526 return utf82unicode(pn_connection_get_container(self._impl))
2527 - def _set_container(self, name):
2528 return pn_connection_set_container(self._impl, unicode2utf8(name))
2529 2530 container = property(_get_container, _set_container) 2531
2532 - def _get_hostname(self):
2533 return utf82unicode(pn_connection_get_hostname(self._impl))
2534 - def _set_hostname(self, name):
2535 return pn_connection_set_hostname(self._impl, unicode2utf8(name))
2536 2537 hostname = property(_get_hostname, _set_hostname, 2538 doc=""" 2539 Set the name of the host (either fully qualified or relative) to which this 2540 connection is connecting to. This information may be used by the remote 2541 peer to determine the correct back-end service to connect the client to. 2542 This value will be sent in the Open performative, and will be used by SSL 2543 and SASL layers to identify the peer. 2544 """) 2545
2546 - def _get_user(self):
2547 return utf82unicode(pn_connection_get_user(self._impl))
2548 - def _set_user(self, name):
2549 return pn_connection_set_user(self._impl, unicode2utf8(name))
2550 2551 user = property(_get_user, _set_user) 2552
2553 - def _get_password(self):
2554 return None
2555 - def _set_password(self, name):
2556 return pn_connection_set_password(self._impl, unicode2utf8(name))
2557 2558 password = property(_get_password, _set_password) 2559 2560 @property
2561 - def remote_container(self):
2562 """The container identifier specified by the remote peer for this connection.""" 2563 return pn_connection_remote_container(self._impl)
2564 2565 @property
2566 - def remote_hostname(self):
2567 """The hostname specified by the remote peer for this connection.""" 2568 return pn_connection_remote_hostname(self._impl)
2569 2570 @property
2572 """The capabilities offered by the remote peer for this connection.""" 2573 return dat2obj(pn_connection_remote_offered_capabilities(self._impl))
2574 2575 @property
2577 """The capabilities desired by the remote peer for this connection.""" 2578 return dat2obj(pn_connection_remote_desired_capabilities(self._impl))
2579 2580 @property
2581 - def remote_properties(self):
2582 """The properties specified by the remote peer for this connection.""" 2583 return dat2obj(pn_connection_remote_properties(self._impl))
2584
2585 - def open(self):
2586 """ 2587 Opens the connection. 2588 2589 In more detail, this moves the local state of the connection to 2590 the ACTIVE state and triggers an open frame to be sent to the 2591 peer. A connection is fully active once both peers have opened it. 2592 """ 2593 obj2dat(self.offered_capabilities, 2594 pn_connection_offered_capabilities(self._impl)) 2595 obj2dat(self.desired_capabilities, 2596 pn_connection_desired_capabilities(self._impl)) 2597 obj2dat(self.properties, pn_connection_properties(self._impl)) 2598 pn_connection_open(self._impl)
2599
2600 - def close(self):
2601 """ 2602 Closes the connection. 2603 2604 In more detail, this moves the local state of the connection to 2605 the CLOSED state and triggers a close frame to be sent to the 2606 peer. A connection is fully closed once both peers have closed it. 2607 """ 2608 self._update_cond() 2609 pn_connection_close(self._impl) 2610 if hasattr(self, '_session_policy'): 2611 # break circular ref 2612 del self._session_policy
2613 2614 @property
2615 - def state(self):
2616 """ 2617 The state of the connection as a bit field. The state has a local 2618 and a remote component. Each of these can be in one of three 2619 states: UNINIT, ACTIVE or CLOSED. These can be tested by masking 2620 against LOCAL_UNINIT, LOCAL_ACTIVE, LOCAL_CLOSED, REMOTE_UNINIT, 2621 REMOTE_ACTIVE and REMOTE_CLOSED. 2622 """ 2623 return pn_connection_state(self._impl)
2624
2625 - def session(self):
2626 """ 2627 Returns a new session on this connection. 2628 """ 2629 ssn = pn_session(self._impl) 2630 if ssn is None: 2631 raise(SessionException("Session allocation failed.")) 2632 else: 2633 return Session(ssn)
2634
2635 - def session_head(self, mask):
2636 return Session.wrap(pn_session_head(self._impl, mask))
2637 2640 2641 @property
2642 - def work_head(self):
2643 return Delivery.wrap(pn_work_head(self._impl))
2644 2645 @property
2646 - def error(self):
2647 return pn_error_code(pn_connection_error(self._impl))
2648
2649 - def free(self):
2650 pn_connection_release(self._impl)
2651
2652 -class SessionException(ProtonException):
2653 pass
2654
2655 -class Session(Wrapper, Endpoint):
2656 2657 @staticmethod
2658 - def wrap(impl):
2659 if impl is None: 2660 return None 2661 else: 2662 return Session(impl)
2663
2664 - def __init__(self, impl):
2665 Wrapper.__init__(self, impl, pn_session_attachments)
2666
2667 - def _get_attachments(self):
2668 return pn_session_attachments(self._impl)
2669
2670 - def _get_cond_impl(self):
2671 return pn_session_condition(self._impl)
2672
2673 - def _get_remote_cond_impl(self):
2674 return pn_session_remote_condition(self._impl)
2675
2676 - def _get_incoming_capacity(self):
2677 return pn_session_get_incoming_capacity(self._impl)
2678
2679 - def _set_incoming_capacity(self, capacity):
2680 pn_session_set_incoming_capacity(self._impl, capacity)
2681 2682 incoming_capacity = property(_get_incoming_capacity, _set_incoming_capacity) 2683
2684 - def _get_outgoing_window(self):
2685 return pn_session_get_outgoing_window(self._impl)
2686
2687 - def _set_outgoing_window(self, window):
2688 pn_session_set_outgoing_window(self._impl, window)
2689 2690 outgoing_window = property(_get_outgoing_window, _set_outgoing_window) 2691 2692 @property
2693 - def outgoing_bytes(self):
2694 return pn_session_outgoing_bytes(self._impl)
2695 2696 @property
2697 - def incoming_bytes(self):
2698 return pn_session_incoming_bytes(self._impl)
2699
2700 - def open(self):
2701 pn_session_open(self._impl)
2702
2703 - def close(self):
2704 self._update_cond() 2705 pn_session_close(self._impl)
2706
2707 - def next(self, mask):
2708 return Session.wrap(pn_session_next(self._impl, mask))
2709 2710 @property
2711 - def state(self):
2712 return pn_session_state(self._impl)
2713 2714 @property
2715 - def connection(self):
2716 return Connection.wrap(pn_session_connection(self._impl))
2717
2718 - def sender(self, name):
2719 return Sender(pn_sender(self._impl, unicode2utf8(name)))
2720
2721 - def receiver(self, name):
2722 return Receiver(pn_receiver(self._impl, unicode2utf8(name)))
2723
2724 - def free(self):
2725 pn_session_free(self._impl)
2726
2727 -class LinkException(ProtonException):
2728 pass
2729 2922
2923 -class Terminus(object):
2924 2925 UNSPECIFIED = PN_UNSPECIFIED 2926 SOURCE = PN_SOURCE 2927 TARGET = PN_TARGET 2928 COORDINATOR = PN_COORDINATOR 2929 2930 NONDURABLE = PN_NONDURABLE 2931 CONFIGURATION = PN_CONFIGURATION 2932 DELIVERIES = PN_DELIVERIES 2933 2934 DIST_MODE_UNSPECIFIED = PN_DIST_MODE_UNSPECIFIED 2935 DIST_MODE_COPY = PN_DIST_MODE_COPY 2936 DIST_MODE_MOVE = PN_DIST_MODE_MOVE 2937 2938 EXPIRE_WITH_LINK = PN_EXPIRE_WITH_LINK 2939 EXPIRE_WITH_SESSION = PN_EXPIRE_WITH_SESSION 2940 EXPIRE_WITH_CONNECTION = PN_EXPIRE_WITH_CONNECTION 2941 EXPIRE_NEVER = PN_EXPIRE_NEVER 2942
2943 - def __init__(self, impl):
2944 self._impl = impl
2945
2946 - def _check(self, err):
2947 if err < 0: 2948 exc = EXCEPTIONS.get(err, LinkException) 2949 raise exc("[%s]" % err) 2950 else: 2951 return err
2952
2953 - def _get_type(self):
2954 return pn_terminus_get_type(self._impl)
2955 - def _set_type(self, type):
2956 self._check(pn_terminus_set_type(self._impl, type))
2957 type = property(_get_type, _set_type) 2958
2959 - def _get_address(self):
2960 """The address that identifies the source or target node""" 2961 return utf82unicode(pn_terminus_get_address(self._impl))
2962 - def _set_address(self, address):
2963 self._check(pn_terminus_set_address(self._impl, unicode2utf8(address)))
2964 address = property(_get_address, _set_address) 2965
2966 - def _get_durability(self):
2967 return pn_terminus_get_durability(self._impl)
2968 - def _set_durability(self, seconds):
2969 self._check(pn_terminus_set_durability(self._impl, seconds))
2970 durability = property(_get_durability, _set_durability) 2971
2972 - def _get_expiry_policy(self):
2973 return pn_terminus_get_expiry_policy(self._impl)
2974 - def _set_expiry_policy(self, seconds):
2975 self._check(pn_terminus_set_expiry_policy(self._impl, seconds))
2976 expiry_policy = property(_get_expiry_policy, _set_expiry_policy) 2977
2978 - def _get_timeout(self):
2979 return pn_terminus_get_timeout(self._impl)
2980 - def _set_timeout(self, seconds):
2981 self._check(pn_terminus_set_timeout(self._impl, seconds))
2982 timeout = property(_get_timeout, _set_timeout) 2983
2984 - def _is_dynamic(self):
2985 """Indicates whether the source or target node was dynamically 2986 created""" 2987 return pn_terminus_is_dynamic(self._impl)
2988 - def _set_dynamic(self, dynamic):
2989 self._check(pn_terminus_set_dynamic(self._impl, dynamic))
2990 dynamic = property(_is_dynamic, _set_dynamic) 2991
2992 - def _get_distribution_mode(self):
2993 return pn_terminus_get_distribution_mode(self._impl)
2994 - def _set_distribution_mode(self, mode):
2995 self._check(pn_terminus_set_distribution_mode(self._impl, mode))
2996 distribution_mode = property(_get_distribution_mode, _set_distribution_mode) 2997 2998 @property
2999 - def properties(self):
3000 """Properties of a dynamic source or target.""" 3001 return Data(pn_terminus_properties(self._impl))
3002 3003 @property
3004 - def capabilities(self):
3005 """Capabilities of the source or target.""" 3006 return Data(pn_terminus_capabilities(self._impl))
3007 3008 @property
3009 - def outcomes(self):
3010 return Data(pn_terminus_outcomes(self._impl))
3011 3012 @property
3013 - def filter(self):
3014 """A filter on a source allows the set of messages transfered over 3015 the link to be restricted""" 3016 return Data(pn_terminus_filter(self._impl))
3017
3018 - def copy(self, src):
3019 self._check(pn_terminus_copy(self._impl, src._impl))
3020
3021 -class Sender(Link):
3022 """ 3023 A link over which messages are sent. 3024 """ 3025
3026 - def offered(self, n):
3027 pn_link_offered(self._impl, n)
3028
3029 - def stream(self, data):
3030 """ 3031 Send specified data as part of the current delivery 3032 3033 @type data: binary 3034 @param data: data to send 3035 """ 3036 return self._check(pn_link_send(self._impl, data))
3037
3038 - def send(self, obj, tag=None):
3039 """ 3040 Send specified object over this sender; the object is expected to 3041 have a send() method on it that takes the sender and an optional 3042 tag as arguments. 3043 3044 Where the object is a Message, this will send the message over 3045 this link, creating a new delivery for the purpose. 3046 """ 3047 if hasattr(obj, 'send'): 3048 return obj.send(self, tag=tag) 3049 else: 3050 # treat object as bytes 3051 return self.stream(obj)
3052
3053 - def delivery_tag(self):
3054 if not hasattr(self, 'tag_generator'): 3055 def simple_tags(): 3056 count = 1 3057 while True: 3058 yield str(count) 3059 count += 1
3060 self.tag_generator = simple_tags() 3061 return next(self.tag_generator)
3062
3063 -class Receiver(Link):
3064 """ 3065 A link over which messages are received. 3066 """ 3067
3068 - def flow(self, n):
3069 """Increases the credit issued to the remote sender by the specified number of messages.""" 3070 pn_link_flow(self._impl, n)
3071
3072 - def recv(self, limit):
3073 n, binary = pn_link_recv(self._impl, limit) 3074 if n == PN_EOS: 3075 return None 3076 else: 3077 self._check(n) 3078 return binary
3079
3080 - def drain(self, n):
3081 pn_link_drain(self._impl, n)
3082
3083 - def draining(self):
3084 return pn_link_draining(self._impl)
3085
3086 -class NamedInt(int):
3087 3088 values = {} 3089
3090 - def __new__(cls, i, name):
3091 ni = super(NamedInt, cls).__new__(cls, i) 3092 cls.values[i] = ni 3093 return ni
3094
3095 - def __init__(self, i, name):
3096 self.name = name
3097
3098 - def __repr__(self):
3099 return self.name
3100
3101 - def __str__(self):
3102 return self.name
3103 3104 @classmethod
3105 - def get(cls, i):
3106 return cls.values.get(i, i)
3107
3108 -class DispositionType(NamedInt):
3109 values = {}
3110
3111 -class Disposition(object):
3112 3113 RECEIVED = DispositionType(PN_RECEIVED, "RECEIVED") 3114 ACCEPTED = DispositionType(PN_ACCEPTED, "ACCEPTED") 3115 REJECTED = DispositionType(PN_REJECTED, "REJECTED") 3116 RELEASED = DispositionType(PN_RELEASED, "RELEASED") 3117 MODIFIED = DispositionType(PN_MODIFIED, "MODIFIED") 3118
3119 - def __init__(self, impl, local):
3120 self._impl = impl 3121 self.local = local 3122 self._data = None 3123 self._condition = None 3124 self._annotations = None
3125 3126 @property
3127 - def type(self):
3128 return DispositionType.get(pn_disposition_type(self._impl))
3129
3130 - def _get_section_number(self):
3131 return pn_disposition_get_section_number(self._impl)
3132 - def _set_section_number(self, n):
3133 pn_disposition_set_section_number(self._impl, n)
3134 section_number = property(_get_section_number, _set_section_number) 3135
3136 - def _get_section_offset(self):
3137 return pn_disposition_get_section_offset(self._impl)
3138 - def _set_section_offset(self, n):
3139 pn_disposition_set_section_offset(self._impl, n)
3140 section_offset = property(_get_section_offset, _set_section_offset) 3141
3142 - def _get_failed(self):
3143 return pn_disposition_is_failed(self._impl)
3144 - def _set_failed(self, b):
3145 pn_disposition_set_failed(self._impl, b)
3146 failed = property(_get_failed, _set_failed) 3147
3148 - def _get_undeliverable(self):
3149 return pn_disposition_is_undeliverable(self._impl)
3150 - def _set_undeliverable(self, b):
3151 pn_disposition_set_undeliverable(self._impl, b)
3152 undeliverable = property(_get_undeliverable, _set_undeliverable) 3153
3154 - def _get_data(self):
3155 if self.local: 3156 return self._data 3157 else: 3158 return dat2obj(pn_disposition_data(self._impl))
3159 - def _set_data(self, obj):
3160 if self.local: 3161 self._data = obj 3162 else: 3163 raise AttributeError("data attribute is read-only")
3164 data = property(_get_data, _set_data) 3165
3166 - def _get_annotations(self):
3167 if self.local: 3168 return self._annotations 3169 else: 3170 return dat2obj(pn_disposition_annotations(self._impl))
3171 - def _set_annotations(self, obj):
3172 if self.local: 3173 self._annotations = obj 3174 else: 3175 raise AttributeError("annotations attribute is read-only")
3176 annotations = property(_get_annotations, _set_annotations) 3177
3178 - def _get_condition(self):
3179 if self.local: 3180 return self._condition 3181 else: 3182 return cond2obj(pn_disposition_condition(self._impl))
3183 - def _set_condition(self, obj):
3184 if self.local: 3185 self._condition = obj 3186 else: 3187 raise AttributeError("condition attribute is read-only")
3188 condition = property(_get_condition, _set_condition)
3189
3190 -class Delivery(Wrapper):
3191 """ 3192 Tracks and/or records the delivery of a message over a link. 3193 """ 3194 3195 RECEIVED = Disposition.RECEIVED 3196 ACCEPTED = Disposition.ACCEPTED 3197 REJECTED = Disposition.REJECTED 3198 RELEASED = Disposition.RELEASED 3199 MODIFIED = Disposition.MODIFIED 3200 3201 @staticmethod
3202 - def wrap(impl):
3203 if impl is None: 3204 return None 3205 else: 3206 return Delivery(impl)
3207
3208 - def __init__(self, impl):
3209 Wrapper.__init__(self, impl, pn_delivery_attachments)
3210
3211 - def _init(self):
3212 self.local = Disposition(pn_delivery_local(self._impl), True) 3213 self.remote = Disposition(pn_delivery_remote(self._impl), False)
3214 3215 @property
3216 - def tag(self):
3217 """The identifier for the delivery.""" 3218 return pn_delivery_tag(self._impl)
3219 3220 @property
3221 - def writable(self):
3222 """Returns true for an outgoing delivery to which data can now be written.""" 3223 return pn_delivery_writable(self._impl)
3224 3225 @property
3226 - def readable(self):
3227 """Returns true for an incoming delivery that has data to read.""" 3228 return pn_delivery_readable(self._impl)
3229 3230 @property
3231 - def updated(self):
3232 """Returns true if the state of the delivery has been updated 3233 (e.g. it has been settled and/or accepted, rejected etc).""" 3234 return pn_delivery_updated(self._impl)
3235
3236 - def update(self, state):
3237 """ 3238 Set the local state of the delivery e.g. ACCEPTED, REJECTED, RELEASED. 3239 """ 3240 obj2dat(self.local._data, pn_disposition_data(self.local._impl)) 3241 obj2dat(self.local._annotations, pn_disposition_annotations(self.local._impl)) 3242 obj2cond(self.local._condition, pn_disposition_condition(self.local._impl)) 3243 pn_delivery_update(self._impl, state)
3244 3245 @property
3246 - def pending(self):
3247 return pn_delivery_pending(self._impl)
3248 3249 @property
3250 - def partial(self):
3251 """ 3252 Returns true for an incoming delivery if not all the data is 3253 yet available. 3254 """ 3255 return pn_delivery_partial(self._impl)
3256 3257 @property
3258 - def local_state(self):
3259 """Returns the local state of the delivery.""" 3260 return DispositionType.get(pn_delivery_local_state(self._impl))
3261 3262 @property
3263 - def remote_state(self):
3264 """ 3265 Returns the state of the delivery as indicated by the remote 3266 peer. 3267 """ 3268 return DispositionType.get(pn_delivery_remote_state(self._impl))
3269 3270 @property
3271 - def settled(self):
3272 """ 3273 Returns true if the delivery has been settled by the remote peer. 3274 """ 3275 return pn_delivery_settled(self._impl)
3276
3277 - def settle(self):
3278 """ 3279 Settles the delivery locally. This indicates the application 3280 considers the delivery complete and does not wish to receive any 3281 further events about it. Every delivery should be settled locally. 3282 """ 3283 pn_delivery_settle(self._impl)
3284 3285 @property
3286 - def aborted(self):
3287 """Returns true if the delivery has been aborted.""" 3288 return pn_delivery_aborted(self._impl)
3289
3290 - def abort(self):
3291 """ 3292 Aborts the delivery. This indicates the application wishes to 3293 invalidate any data that may have already been sent on this delivery. 3294 The delivery cannot be aborted after it has been completely delivered. 3295 """ 3296 pn_delivery_abort(self._impl)
3297 3298 @property
3299 - def work_next(self):
3300 return Delivery.wrap(pn_work_next(self._impl))
3301 3302 @property 3308 3309 @property
3310 - def session(self):
3311 """ 3312 Returns the session over which the delivery was sent or received. 3313 """ 3314 return self.link.session
3315 3316 @property
3317 - def connection(self):
3318 """ 3319 Returns the connection over which the delivery was sent or received. 3320 """ 3321 return self.session.connection
3322 3323 @property
3324 - def transport(self):
3325 return self.connection.transport
3326
3327 -class TransportException(ProtonException):
3328 pass
3329
3330 -class TraceAdapter:
3331
3332 - def __init__(self, tracer):
3333 self.tracer = tracer
3334
3335 - def __call__(self, trans_impl, message):
3336 self.tracer(Transport.wrap(trans_impl), message)
3337
3338 -class Transport(Wrapper):
3339 3340 TRACE_OFF = PN_TRACE_OFF 3341 TRACE_DRV = PN_TRACE_DRV 3342 TRACE_FRM = PN_TRACE_FRM 3343 TRACE_RAW = PN_TRACE_RAW 3344 3345 CLIENT = 1 3346 SERVER = 2 3347 3348 @staticmethod
3349 - def wrap(impl):
3350 if impl is None: 3351 return None 3352 else: 3353 return Transport(_impl=impl)
3354
3355 - def __init__(self, mode=None, _impl = pn_transport):
3356 Wrapper.__init__(self, _impl, pn_transport_attachments) 3357 if mode == Transport.SERVER: 3358 pn_transport_set_server(self._impl) 3359 elif mode is None or mode==Transport.CLIENT: 3360 pass 3361 else: 3362 raise TransportException("Cannot initialise Transport from mode: %s" % str(mode))
3363
3364 - def _init(self):
3365 self._sasl = None 3366 self._ssl = None
3367
3368 - def _check(self, err):
3369 if err < 0: 3370 exc = EXCEPTIONS.get(err, TransportException) 3371 raise exc("[%s]: %s" % (err, pn_error_text(pn_transport_error(self._impl)))) 3372 else: 3373 return err
3374
3375 - def _set_tracer(self, tracer):
3376 pn_transport_set_pytracer(self._impl, TraceAdapter(tracer));
3377
3378 - def _get_tracer(self):
3379 adapter = pn_transport_get_pytracer(self._impl) 3380 if adapter: 3381 return adapter.tracer 3382 else: 3383 return None
3384 3385 tracer = property(_get_tracer, _set_tracer, 3386 doc=""" 3387 A callback for trace logging. The callback is passed the transport and log message. 3388 """) 3389
3390 - def log(self, message):
3391 pn_transport_log(self._impl, message)
3392
3393 - def require_auth(self, bool):
3394 pn_transport_require_auth(self._impl, bool)
3395 3396 @property
3397 - def authenticated(self):
3398 return pn_transport_is_authenticated(self._impl)
3399
3400 - def require_encryption(self, bool):
3401 pn_transport_require_encryption(self._impl, bool)
3402 3403 @property
3404 - def encrypted(self):
3405 return pn_transport_is_encrypted(self._impl)
3406 3407 @property
3408 - def user(self):
3409 return pn_transport_get_user(self._impl)
3410
3411 - def bind(self, connection):
3412 """Assign a connection to the transport""" 3413 self._check(pn_transport_bind(self._impl, connection._impl))
3414
3415 - def unbind(self):
3416 """Release the connection""" 3417 self._check(pn_transport_unbind(self._impl))
3418
3419 - def trace(self, n):
3420 pn_transport_trace(self._impl, n)
3421
3422 - def tick(self, now):
3423 """Process any timed events (like heartbeat generation). 3424 now = seconds since epoch (float). 3425 """ 3426 return millis2secs(pn_transport_tick(self._impl, secs2millis(now)))
3427
3428 - def capacity(self):
3429 c = pn_transport_capacity(self._impl) 3430 if c >= PN_EOS: 3431 return c 3432 else: 3433 return self._check(c)
3434
3435 - def push(self, binary):
3436 n = self._check(pn_transport_push(self._impl, binary)) 3437 if n != len(binary): 3438 raise OverflowError("unable to process all bytes: %s, %s" % (n, len(binary)))
3439
3440 - def close_tail(self):
3441 self._check(pn_transport_close_tail(self._impl))
3442
3443 - def pending(self):
3444 p = pn_transport_pending(self._impl) 3445 if p >= PN_EOS: 3446 return p 3447 else: 3448 return self._check(p)
3449
3450 - def peek(self, size):
3451 cd, out = pn_transport_peek(self._impl, size) 3452 if cd == PN_EOS: 3453 return None 3454 else: 3455 self._check(cd) 3456 return out
3457
3458 - def pop(self, size):
3459 pn_transport_pop(self._impl, size)
3460
3461 - def close_head(self):
3462 self._check(pn_transport_close_head(self._impl))
3463 3464 @property
3465 - def closed(self):
3466 return pn_transport_closed(self._impl)
3467 3468 # AMQP 1.0 max-frame-size
3469 - def _get_max_frame_size(self):
3470 return pn_transport_get_max_frame(self._impl)
3471
3472 - def _set_max_frame_size(self, value):
3473 pn_transport_set_max_frame(self._impl, value)
3474 3475 max_frame_size = property(_get_max_frame_size, _set_max_frame_size, 3476 doc=""" 3477 Sets the maximum size for received frames (in bytes). 3478 """) 3479 3480 @property
3481 - def remote_max_frame_size(self):
3482 return pn_transport_get_remote_max_frame(self._impl)
3483
3484 - def _get_channel_max(self):
3485 return pn_transport_get_channel_max(self._impl)
3486
3487 - def _set_channel_max(self, value):
3488 if pn_transport_set_channel_max(self._impl, value): 3489 raise SessionException("Too late to change channel max.")
3490 3491 channel_max = property(_get_channel_max, _set_channel_max, 3492 doc=""" 3493 Sets the maximum channel that may be used on the transport. 3494 """) 3495 3496 @property
3497 - def remote_channel_max(self):