Module proton
[frames] | no frames]

Source Code for Module proton

   1  # 
   2  # Licensed to the Apache Software Foundation (ASF) under one 
   3  # or more contributor license agreements.  See the NOTICE file 
   4  # distributed with this work for additional information 
   5  # regarding copyright ownership.  The ASF licenses this file 
   6  # to you under the Apache License, Version 2.0 (the 
   7  # "License"); you may not use this file except in compliance 
   8  # with the License.  You may obtain a copy of the License at 
   9  #  
  10  #   http://www.apache.org/licenses/LICENSE-2.0 
  11  #  
  12  # Unless required by applicable law or agreed to in writing, 
  13  # software distributed under the License is distributed on an 
  14  # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
  15  # KIND, either express or implied.  See the License for the 
  16  # specific language governing permissions and limitations 
  17  # under the License. 
  18  # 
  19   
  20  """ 
  21  The proton module defines a suite of APIs that implement the AMQP 1.0 
  22  protocol. 
  23   
  24  The proton APIs consist of the following classes: 
  25   
  26   - L{Messenger} -- A messaging endpoint. 
  27   - L{Message}   -- A class for creating and/or accessing AMQP message content. 
  28   - L{Data}      -- A class for creating and/or accessing arbitrary AMQP encoded 
  29                    data. 
  30   
  31  """ 
  32   
  33  from cproton import * 
  34  try: 
  35    import uuid 
  36  except ImportError: 
  37    """ 
  38    No 'native' UUID support.  Provide a very basic UUID type that is a compatible subset of the uuid type provided by more modern python releases. 
  39    """ 
  40    import struct 
41 - class uuid:
42 - class UUID:
43 - def __init__(self, hex=None, bytes=None):
44 if [hex, bytes].count(None) != 1: 45 raise TypeError("need one of hex or bytes") 46 if bytes is not None: 47 self.bytes = bytes 48 elif hex is not None: 49 fields=hex.split("-") 50 fields[4:5] = [fields[4][:4], fields[4][4:]] 51 self.bytes = struct.pack("!LHHHHL", *[int(x,16) for x in fields])
52
53 - def __cmp__(self, other):
54 if isinstance(other, uuid.UUID): 55 return cmp(self.bytes, other.bytes) 56 else: 57 return -1
58
59 - def __str__(self):
60 return "%08x-%04x-%04x-%04x-%04x%08x" % struct.unpack("!LHHHHL", self.bytes)
61
62 - def __repr__(self):
63 return "UUID(%r)" % str(self)
64
65 - def __hash__(self):
66 return self.bytes.__hash__()
67 68 import os, random, socket, time 69 rand = random.Random() 70 rand.seed((os.getpid(), time.time(), socket.gethostname()))
71 - def random_uuid():
72 bytes = [rand.randint(0, 255) for i in xrange(16)] 73 74 # From RFC4122, the version bits are set to 0100 75 bytes[7] &= 0x0F 76 bytes[7] |= 0x40 77 78 # From RFC4122, the top two bits of byte 8 get set to 01 79 bytes[8] &= 0x3F 80 bytes[8] |= 0x80 81 return "".join(map(chr, bytes))
82
83 - def uuid4():
84 return uuid.UUID(bytes=random_uuid())
85 86 try: 87 bytes() 88 except NameError: 89 bytes = str 90 91 API_LANGUAGE = "C" 92 IMPLEMENTATION_LANGUAGE = "C"
93 94 -class Constant(object):
95
96 - def __init__(self, name):
97 self.name = name
98
99 - def __repr__(self):
100 return self.name
101
102 -class ProtonException(Exception):
103 """ 104 The root of the proton exception hierarchy. All proton exception 105 classes derive from this exception. 106 """ 107 pass
108
109 -class Timeout(ProtonException):
110 """ 111 A timeout exception indicates that a blocking operation has timed 112 out. 113 """ 114 pass
115
116 -class Interrupt(ProtonException):
117 """ 118 An interrupt exception indicaes that a blocking operation was interrupted. 119 """ 120 pass
121
122 -class MessengerException(ProtonException):
123 """ 124 The root of the messenger exception hierarchy. All exceptions 125 generated by the messenger class derive from this exception. 126 """ 127 pass
128
129 -class MessageException(ProtonException):
130 """ 131 The MessageException class is the root of the message exception 132 hierarhcy. All exceptions generated by the Message class derive from 133 this exception. 134 """ 135 pass
136 137 EXCEPTIONS = { 138 PN_TIMEOUT: Timeout, 139 PN_INTR: Interrupt 140 } 141 142 PENDING = Constant("PENDING") 143 ACCEPTED = Constant("ACCEPTED") 144 REJECTED = Constant("REJECTED") 145 RELEASED = Constant("RELEASED") 146 ABORTED = Constant("ABORTED") 147 SETTLED = Constant("SETTLED") 148 149 STATUSES = { 150 PN_STATUS_ABORTED: ABORTED, 151 PN_STATUS_ACCEPTED: ACCEPTED, 152 PN_STATUS_REJECTED: REJECTED, 153 PN_STATUS_RELEASED: RELEASED, 154 PN_STATUS_PENDING: PENDING, 155 PN_STATUS_SETTLED: SETTLED, 156 PN_STATUS_UNKNOWN: None 157 } 158 159 AUTOMATIC = Constant("AUTOMATIC") 160 MANUAL = Constant("MANUAL")
161 162 -class Messenger(object):
163 """ 164 The L{Messenger} class defines a high level interface for sending 165 and receiving L{Messages<Message>}. Every L{Messenger} contains a 166 single logical queue of incoming messages and a single logical queue 167 of outgoing messages. These messages in these queues may be destined 168 for, or originate from, a variety of addresses. 169 170 The messenger interface is single-threaded. All methods 171 except one (L{interrupt}) are intended to be used from within 172 the messenger thread. 173 174 175 Address Syntax 176 ============== 177 178 An address has the following form:: 179 180 [ amqp[s]:// ] [user[:password]@] domain [/[name]] 181 182 Where domain can be one of:: 183 184 host | host:port | ip | ip:port | name 185 186 The following are valid examples of addresses: 187 188 - example.org 189 - example.org:1234 190 - amqp://example.org 191 - amqps://example.org 192 - example.org/incoming 193 - amqps://example.org/outgoing 194 - amqps://fred:trustno1@example.org 195 - 127.0.0.1:1234 196 - amqps://127.0.0.1:1234 197 198 Sending & Receiving Messages 199 ============================ 200 201 The L{Messenger} class works in conjuction with the L{Message} class. The 202 L{Message} class is a mutable holder of message content. 203 204 The L{put} method copies its L{Message} to the outgoing queue, and may 205 send queued messages if it can do so without blocking. The L{send} 206 method blocks until it has sent the requested number of messages, 207 or until a timeout interrupts the attempt. 208 209 210 >>> message = Message() 211 >>> for i in range(3): 212 ... message.address = "amqp://host/queue" 213 ... message.subject = "Hello World %i" % i 214 ... messenger.put(message) 215 >>> messenger.send() 216 217 Similarly, the L{recv} method receives messages into the incoming 218 queue, and may block as it attempts to receive the requested number 219 of messages, or until timeout is reached. It may receive fewer 220 than the requested number. The L{get} method pops the 221 eldest L{Message} off the incoming queue and copies it into the L{Message} 222 object that you supply. It will not block. 223 224 225 >>> message = Message() 226 >>> messenger.recv(10): 227 >>> while messenger.incoming > 0: 228 ... messenger.get(message) 229 ... print message.subject 230 Hello World 0 231 Hello World 1 232 Hello World 2 233 234 The blocking flag allows you to turn off blocking behavior entirely, 235 in which case L{send} and L{recv} will do whatever they can without 236 blocking, and then return. You can then look at the number 237 of incoming and outgoing messages to see how much outstanding work 238 still remains. 239 """ 240
241 - def __init__(self, name=None):
242 """ 243 Construct a new L{Messenger} with the given name. The name has 244 global scope. If a NULL name is supplied, a L{uuid.UUID} based 245 name will be chosen. 246 247 @type name: string 248 @param name: the name of the messenger or None 249 """ 250 self._mng = pn_messenger(name)
251
252 - def __del__(self):
253 """ 254 Destroy the L{Messenger}. This will close all connections that 255 are managed by the L{Messenger}. Call the L{stop} method before 256 destroying the L{Messenger}. 257 """ 258 if hasattr(self, "_mng"): 259 pn_messenger_free(self._mng) 260 del self._mng
261
262 - def _check(self, err):
263 if err < 0: 264 if (err == PN_INPROGRESS): 265 return 266 exc = EXCEPTIONS.get(err, MessengerException) 267 raise exc("[%s]: %s" % (err, pn_error_text(pn_messenger_error(self._mng)))) 268 else: 269 return err
270 271 @property
272 - def name(self):
273 """ 274 The name of the L{Messenger}. 275 """ 276 return pn_messenger_name(self._mng)
277
278 - def _get_certificate(self):
279 return pn_messenger_get_certificate(self._mng)
280
281 - def _set_certificate(self, value):
282 self._check(pn_messenger_set_certificate(self._mng, value))
283 284 certificate = property(_get_certificate, _set_certificate, 285 doc=""" 286 Path to a certificate file for the L{Messenger}. This certificate is 287 used when the L{Messenger} accepts or establishes SSL/TLS connections. 288 This property must be specified for the L{Messenger} to accept 289 incoming SSL/TLS connections and to establish client authenticated 290 outgoing SSL/TLS connection. Non client authenticated outgoing SSL/TLS 291 connections do not require this property. 292 """) 293
294 - def _get_private_key(self):
295 return pn_messenger_get_private_key(self._mng)
296
297 - def _set_private_key(self, value):
298 self._check(pn_messenger_set_private_key(self._mng, value))
299 300 private_key = property(_get_private_key, _set_private_key, 301 doc=""" 302 Path to a private key file for the L{Messenger's<Messenger>} 303 certificate. This property must be specified for the L{Messenger} to 304 accept incoming SSL/TLS connections and to establish client 305 authenticated outgoing SSL/TLS connection. Non client authenticated 306 SSL/TLS connections do not require this property. 307 """) 308
309 - def _get_password(self):
310 return pn_messenger_get_password(self._mng)
311
312 - def _set_password(self, value):
313 self._check(pn_messenger_set_password(self._mng, value))
314 315 password = property(_get_password, _set_password, 316 doc=""" 317 This property contains the password for the L{Messenger.private_key} 318 file, or None if the file is not encrypted. 319 """) 320
321 - def _get_trusted_certificates(self):
322 return pn_messenger_get_trusted_certificates(self._mng)
323
324 - def _set_trusted_certificates(self, value):
325 self._check(pn_messenger_set_trusted_certificates(self._mng, value))
326 327 trusted_certificates = property(_get_trusted_certificates, 328 _set_trusted_certificates, 329 doc=""" 330 A path to a database of trusted certificates for use in verifying the 331 peer on an SSL/TLS connection. If this property is None, then the peer 332 will not be verified. 333 """) 334
335 - def _get_timeout(self):
336 t = pn_messenger_get_timeout(self._mng) 337 if t == -1: 338 return None 339 else: 340 return float(t)/1000
341
342 - def _set_timeout(self, value):
343 if value is None: 344 t = -1 345 else: 346 t = long(1000*value) 347 self._check(pn_messenger_set_timeout(self._mng, t))
348 349 timeout = property(_get_timeout, _set_timeout, 350 doc=""" 351 The timeout property contains the default timeout for blocking 352 operations performed by the L{Messenger}. 353 """) 354
355 - def _is_blocking(self):
356 return pn_messenger_is_blocking(self._mng)
357
358 - def _set_blocking(self, b):
359 self._check(pn_messenger_set_blocking(self._mng, b))
360 361 blocking = property(_is_blocking, _set_blocking, 362 doc=""" 363 Enable or disable blocking behavior during L{Message} sending 364 and receiving. This affects every blocking call, with the 365 exception of L{work}. Currently, the affected calls are 366 L{send}, L{recv}, and L{stop}. 367 """) 368
369 - def _get_incoming_window(self):
370 return pn_messenger_get_incoming_window(self._mng)
371
372 - def _set_incoming_window(self, window):
373 self._check(pn_messenger_set_incoming_window(self._mng, window))
374 375 incoming_window = property(_get_incoming_window, _set_incoming_window, 376 doc=""" 377 The incoming tracking window for the messenger. The messenger will 378 track the remote status of this many incoming deliveries after they 379 have been accepted or rejected. Defaults to zero. 380 381 L{Messages<Message>} enter this window only when you take them into your application 382 using L{get}. If your incoming window size is I{n}, and you get I{n}+1 L{messages<Message>} 383 without explicitly accepting or rejecting the oldest message, then the 384 message that passes beyond the edge of the incoming window will be assigned 385 the default disposition of its link. 386 """) 387
388 - def _get_outgoing_window(self):
389 return pn_messenger_get_outgoing_window(self._mng)
390
391 - def _set_outgoing_window(self, window):
392 self._check(pn_messenger_set_outgoing_window(self._mng, window))
393 394 outgoing_window = property(_get_outgoing_window, _set_outgoing_window, 395 doc=""" 396 The outgoing tracking window for the messenger. The messenger will 397 track the remote status of this many outgoing deliveries after calling 398 send. Defaults to zero. 399 400 A L{Message} enters this window when you call the put() method with the 401 message. If your outgoing window size is I{n}, and you call L{put} I{n}+1 402 times, status information will no longer be available for the 403 first message. 404 """) 405
406 - def start(self):
407 """ 408 Currently a no-op placeholder. 409 For future compatibility, do not L{send} or L{recv} messages 410 before starting the L{Messenger}. 411 """ 412 self._check(pn_messenger_start(self._mng))
413
414 - def stop(self):
415 """ 416 Transitions the L{Messenger} to an inactive state. An inactive 417 L{Messenger} will not send or receive messages from its internal 418 queues. A L{Messenger} should be stopped before being discarded to 419 ensure a clean shutdown handshake occurs on any internally managed 420 connections. 421 """ 422 self._check(pn_messenger_stop(self._mng))
423 424 @property
425 - def stopped(self):
426 """ 427 Returns true iff a L{Messenger} is in the stopped state. 428 This function does not block. 429 """ 430 return pn_messenger_stopped(self._mng)
431
432 - def subscribe(self, source):
433 """ 434 Subscribes the L{Messenger} to messages originating from the 435 specified source. The source is an address as specified in the 436 L{Messenger} introduction with the following addition. If the 437 domain portion of the address begins with the '~' character, the 438 L{Messenger} will interpret the domain as host/port, bind to it, 439 and listen for incoming messages. For example "~0.0.0.0", 440 "amqp://~0.0.0.0", and "amqps://~0.0.0.0" will all bind to any 441 local interface and listen for incoming messages with the last 442 variant only permitting incoming SSL connections. 443 444 @type source: string 445 @param source: the source of messages to subscribe to 446 """ 447 sub_impl = pn_messenger_subscribe(self._mng, source) 448 if not sub_impl: 449 self._check(PN_ERR) 450 return Subscription(sub_impl)
451
452 - def put(self, message):
453 """ 454 Places the content contained in the message onto the outgoing 455 queue of the L{Messenger}. This method will never block, however 456 it will send any unblocked L{Messages<Message>} in the outgoing 457 queue immediately and leave any blocked L{Messages<Message>} 458 remaining in the outgoing queue. The L{send} call may be used to 459 block until the outgoing queue is empty. The L{outgoing} property 460 may be used to check the depth of the outgoing queue. 461 462 When the content in a given L{Message} object is copied to the outgoing 463 message queue, you may then modify or discard the L{Message} object 464 without having any impact on the content in the outgoing queue. 465 466 This method returns an outgoing tracker for the L{Message}. The tracker 467 can be used to determine the delivery status of the L{Message}. 468 469 @type message: Message 470 @param message: the message to place in the outgoing queue 471 @return: a tracker 472 """ 473 message._pre_encode() 474 self._check(pn_messenger_put(self._mng, message._msg)) 475 return pn_messenger_outgoing_tracker(self._mng)
476
477 - def status(self, tracker):
478 """ 479 Gets the last known remote state of the delivery associated with 480 the given tracker. 481 482 @type tracker: tracker 483 @param tracker: the tracker whose status is to be retrieved 484 485 @return: one of None, PENDING, REJECTED, or ACCEPTED 486 """ 487 disp = pn_messenger_status(self._mng, tracker); 488 return STATUSES.get(disp, disp)
489
490 - def buffered(self, tracker):
491 """ 492 Checks if the delivery associated with the given tracker is still 493 waiting to be sent. 494 495 @type tracker: tracker 496 @param tracker: the tracker whose status is to be retrieved 497 498 @return true if delivery is still buffered 499 """ 500 return pn_messenger_buffered(self._mng, tracker);
501
502 - def settle(self, tracker=None):
503 """ 504 Frees a L{Messenger} from tracking the status associated with a given 505 tracker. If you don't supply a tracker, all outgoing L{messages<Message>} up 506 to the most recent will be settled. 507 """ 508 if tracker is None: 509 tracker = pn_messenger_outgoing_tracker(self._mng) 510 flags = PN_CUMULATIVE 511 else: 512 flags = 0 513 self._check(pn_messenger_settle(self._mng, tracker, flags))
514
515 - def send(self, n=-1):
516 """ 517 This call will block until the indicated number of L{messages<Message>} 518 have been sent, or until the operation times out. If n is -1 this call will 519 block until all outgoing L{messages<Message>} have been sent. If n is 0 then 520 this call will send whatever it can without blocking. 521 """ 522 self._check(pn_messenger_send(self._mng, n))
523
524 - def recv(self, n=None):
525 """ 526 Receives up to I{n} L{messages<Message>} into the incoming queue. If no value 527 for I{n} is supplied, this call will receive as many L{messages<Message>} as it 528 can buffer internally. If the L{Messenger} is in blocking mode, this 529 call will block until at least one L{Message} is available in the 530 incoming queue. 531 """ 532 if n is None: 533 n = -1 534 self._check(pn_messenger_recv(self._mng, n))
535
536 - def work(self, timeout=None):
537 """ 538 Sends or receives any outstanding L{messages<Message>} queued for a L{Messenger}. 539 This will block for the indicated timeout. 540 This method may also do I/O work other than sending and receiving 541 L{messages<Message>}. For example, closing connections after messenger.L{stop}() 542 has been called. 543 """ 544 if timeout is None: 545 t = -1 546 else: 547 t = long(1000*timeout) 548 err = pn_messenger_work(self._mng, t) 549 if (err == PN_TIMEOUT): 550 return False 551 else: 552 self._check(err) 553 return True
554 555 @property
556 - def receiving(self):
557 return pn_messenger_receiving(self._mng)
558
559 - def interrupt(self):
560 """ 561 The L{Messenger} interface is single-threaded. 562 This is the only L{Messenger} function intended to be called 563 from outside of the L{Messenger} thread. 564 Call this from a non-messenger thread to interrupt 565 a L{Messenger} that is blocking. 566 This will cause any in-progress blocking call to throw 567 the L{Interrupt} exception. If there is no currently blocking 568 call, then the next blocking call will be affected, even if it 569 is within the same thread that interrupt was called from. 570 """ 571 self._check(pn_messenger_interrupt(self._mng))
572
573 - def get(self, message=None):
574 """ 575 Moves the message from the head of the incoming message queue into 576 the supplied message object. Any content in the message will be 577 overwritten. 578 579 A tracker for the incoming L{Message} is returned. The tracker can 580 later be used to communicate your acceptance or rejection of the 581 L{Message}. 582 583 If None is passed in for the L{Message} object, the L{Message} 584 popped from the head of the queue is discarded. 585 586 @type message: Message 587 @param message: the destination message object 588 @return: a tracker 589 """ 590 if message is None: 591 impl = None 592 else: 593 impl = message._msg 594 self._check(pn_messenger_get(self._mng, impl)) 595 if message is not None: 596 message._post_decode() 597 return pn_messenger_incoming_tracker(self._mng)
598
599 - def accept(self, tracker=None):
600 """ 601 Signal the sender that you have acted on the L{Message} 602 pointed to by the tracker. If no tracker is supplied, 603 then all messages that have been returned by the L{get} 604 method are accepted, except those that have already been 605 auto-settled by passing beyond your incoming window size. 606 607 @type tracker: tracker 608 @param tracker: a tracker as returned by get 609 """ 610 if tracker is None: 611 tracker = pn_messenger_incoming_tracker(self._mng) 612 flags = PN_CUMULATIVE 613 else: 614 flags = 0 615 self._check(pn_messenger_accept(self._mng, tracker, flags))
616
617 - def reject(self, tracker=None):
618 """ 619 Rejects the L{Message} indicated by the tracker. If no tracker 620 is supplied, all messages that have been returned by the L{get} 621 method are rejected, except those that have already been auto-settled 622 by passing beyond your outgoing window size. 623 624 @type tracker: tracker 625 @param tracker: a tracker as returned by get 626 """ 627 if tracker is None: 628 tracker = pn_messenger_incoming_tracker(self._mng) 629 flags = PN_CUMULATIVE 630 else: 631 flags = 0 632 self._check(pn_messenger_reject(self._mng, tracker, flags))
633 634 @property
635 - def outgoing(self):
636 """ 637 The outgoing queue depth. 638 """ 639 return pn_messenger_outgoing(self._mng)
640 641 @property
642 - def incoming(self):
643 """ 644 The incoming queue depth. 645 """ 646 return pn_messenger_incoming(self._mng)
647
648 - def route(self, pattern, address):
649 """ 650 Adds a routing rule to a L{Messenger's<Messenger>} internal routing table. 651 652 The route procedure may be used to influence how a L{Messenger} will 653 internally treat a given address or class of addresses. Every call 654 to the route procedure will result in L{Messenger} appending a routing 655 rule to its internal routing table. 656 657 Whenever a L{Message} is presented to a L{Messenger} for delivery, it 658 will match the address of this message against the set of routing 659 rules in order. The first rule to match will be triggered, and 660 instead of routing based on the address presented in the message, 661 the L{Messenger} will route based on the address supplied in the rule. 662 663 The pattern matching syntax supports two types of matches, a '%' 664 will match any character except a '/', and a '*' will match any 665 character including a '/'. 666 667 A routing address is specified as a normal AMQP address, however it 668 may additionally use substitution variables from the pattern match 669 that triggered the rule. 670 671 Any message sent to "foo" will be routed to "amqp://foo.com": 672 673 >>> messenger.route("foo", "amqp://foo.com"); 674 675 Any message sent to "foobar" will be routed to 676 "amqp://foo.com/bar": 677 678 >>> messenger.route("foobar", "amqp://foo.com/bar"); 679 680 Any message sent to bar/<path> will be routed to the corresponding 681 path within the amqp://bar.com domain: 682 683 >>> messenger.route("bar/*", "amqp://bar.com/$1"); 684 685 Route all L{messages<Message>} over TLS: 686 687 >>> messenger.route("amqp:*", "amqps:$1") 688 689 Supply credentials for foo.com: 690 691 >>> messenger.route("amqp://foo.com/*", "amqp://user:password@foo.com/$1"); 692 693 Supply credentials for all domains: 694 695 >>> messenger.route("amqp://*", "amqp://user:password@$1"); 696 697 Route all addresses through a single proxy while preserving the 698 original destination: 699 700 >>> messenger.route("amqp://%/*", "amqp://user:password@proxy/$1/$2"); 701 702 Route any address through a single broker: 703 704 >>> messenger.route("*", "amqp://user:password@broker/$1"); 705 """ 706 self._check(pn_messenger_route(self._mng, pattern, address))
707
708 - def rewrite(self, pattern, address):
709 """ 710 Similar to route(), except that the destination of 711 the L{Message} is determined before the message address is rewritten. 712 713 The outgoing address is only rewritten after routing has been 714 finalized. If a message has an outgoing address of 715 "amqp://0.0.0.0:5678", and a rewriting rule that changes its 716 outgoing address to "foo", it will still arrive at the peer that 717 is listening on "amqp://0.0.0.0:5678", but when it arrives there, 718 the receiver will see its outgoing address as "foo". 719 720 The default rewrite rule removes username and password from addresses 721 before they are transmitted. 722 """ 723 self._check(pn_messenger_rewrite(self._mng, pattern, address))
724
725 -class Message(object):
726 """ 727 The L{Message} class is a mutable holder of message content. 728 729 @ivar instructions: delivery instructions for the message 730 @type instructions: dict 731 @ivar annotations: infrastructure defined message annotations 732 @type annotations: dict 733 @ivar properties: application defined message properties 734 @type properties: dict 735 @ivar body: message body 736 @type body: bytes | unicode | dict | list | int | long | float | UUID 737 """ 738 739 DATA = PN_DATA 740 TEXT = PN_TEXT 741 AMQP = PN_AMQP 742 JSON = PN_JSON 743 744 DEFAULT_PRIORITY = PN_DEFAULT_PRIORITY 745
746 - def __init__(self):
747 self._msg = pn_message() 748 self._id = Data(pn_message_id(self._msg)) 749 self._correlation_id = Data(pn_message_correlation_id(self._msg)) 750 self.instructions = None 751 self.annotations = None 752 self.properties = None 753 self.body = None
754
755 - def __del__(self):
756 if hasattr(self, "_msg"): 757 pn_message_free(self._msg) 758 del self._msg
759
760 - def _check(self, err):
761 if err < 0: 762 exc = EXCEPTIONS.get(err, MessageException) 763 raise exc("[%s]: %s" % (err, pn_message_error(self._msg))) 764 else: 765 return err
766
767 - def _pre_encode(self):
768 inst = Data(pn_message_instructions(self._msg)) 769 ann = Data(pn_message_annotations(self._msg)) 770 props = Data(pn_message_properties(self._msg)) 771 body = Data(pn_message_body(self._msg)) 772 773 inst.clear() 774 if self.instructions is not None: 775 inst.put_object(self.instructions) 776 ann.clear() 777 if self.annotations is not None: 778 ann.put_object(self.annotations) 779 props.clear() 780 if self.properties is not None: 781 props.put_object(self.properties) 782 body.clear() 783 if self.body is not None: 784 body.put_object(self.body)
785
786 - def _post_decode(self):
787 inst = Data(pn_message_instructions(self._msg)) 788 ann = Data(pn_message_annotations(self._msg)) 789 props = Data(pn_message_properties(self._msg)) 790 body = Data(pn_message_body(self._msg)) 791 792 if inst.next(): 793 self.instructions = inst.get_object() 794 else: 795 self.instructions = None 796 if ann.next(): 797 self.annotations = ann.get_object() 798 else: 799 self.annotations = None 800 if props.next(): 801 self.properties = props.get_object() 802 else: 803 self.properties = None 804 if body.next(): 805 self.body = body.get_object() 806 else: 807 self.body = None
808
809 - def clear(self):
810 """ 811 Clears the contents of the L{Message}. All fields will be reset to 812 their default values. 813 """ 814 pn_message_clear(self._msg) 815 self.instructions = None 816 self.annotations = None 817 self.properties = None 818 self.body = None
819
820 - def _is_inferred(self):
821 return pn_message_is_inferred(self._msg)
822
823 - def _set_inferred(self, value):
824 self._check(pn_message_set_inferred(self._msg, bool(value)))
825 826 inferred = property(_is_inferred, _set_inferred) 827
828 - def _is_durable(self):
829 return pn_message_is_durable(self._msg)
830
831 - def _set_durable(self, value):
832 self._check(pn_message_set_durable(self._msg, bool(value)))
833 834 durable = property(_is_durable, _set_durable, 835 doc=""" 836 The durable property indicates that the message should be held durably 837 by any intermediaries taking responsibility for the message. 838 """) 839
840 - def _get_priority(self):
841 return pn_message_get_priority(self._msg)
842
843 - def _set_priority(self, value):
844 self._check(pn_message_set_priority(self._msg, value))
845 846 priority = property(_get_priority, _set_priority, 847 doc=""" 848 The priority of the message. 849 """) 850
851 - def _get_ttl(self):
852 return pn_message_get_ttl(self._msg)
853
854 - def _set_ttl(self, value):
855 self._check(pn_message_set_ttl(self._msg, value))
856 857 ttl = property(_get_ttl, _set_ttl, 858 doc=""" 859 The time to live of the message measured in milliseconds. Expired 860 messages may be dropped. 861 """) 862
863 - def _is_first_acquirer(self):
864 return pn_message_is_first_acquirer(self._msg)
865
866 - def _set_first_acquirer(self, value):
867 self._check(pn_message_set_first_acquirer(self._msg, bool(value)))
868 869 first_acquirer = property(_is_first_acquirer, _set_first_acquirer, 870 doc=""" 871 True iff the recipient is the first to acquire the message. 872 """) 873
874 - def _get_delivery_count(self):
875 return pn_message_get_delivery_count(self._msg)
876
877 - def _set_delivery_count(self, value):
878 self._check(pn_message_set_delivery_count(self._msg, value))
879 880 delivery_count = property(_get_delivery_count, _set_delivery_count, 881 doc=""" 882 The number of delivery attempts made for this message. 883 """) 884 885
886 - def _get_id(self):
887 return self._id.get_object()
888 - def _set_id(self, value):
889 if type(value) in (int, long): 890 value = ulong(value) 891 self._id.rewind() 892 self._id.put_object(value)
893 id = property(_get_id, _set_id, 894 doc=""" 895 The id of the message. 896 """) 897
898 - def _get_user_id(self):
899 return pn_message_get_user_id(self._msg)
900
901 - def _set_user_id(self, value):
902 self._check(pn_message_set_user_id(self._msg, value))
903 904 user_id = property(_get_user_id, _set_user_id, 905 doc=""" 906 The user id of the message creator. 907 """) 908
909 - def _get_address(self):
910 return pn_message_get_address(self._msg)
911
912 - def _set_address(self, value):
913 self._check(pn_message_set_address(self._msg, value))
914 915 address = property(_get_address, _set_address, 916 doc=""" 917 The address of the message. 918 """) 919
920 - def _get_subject(self):
921 return pn_message_get_subject(self._msg)
922
923 - def _set_subject(self, value):
924 self._check(pn_message_set_subject(self._msg, value))
925 926 subject = property(_get_subject, _set_subject, 927 doc=""" 928 The subject of the message. 929 """) 930
931 - def _get_reply_to(self):
932 return pn_message_get_reply_to(self._msg)
933
934 - def _set_reply_to(self, value):
935 self._check(pn_message_set_reply_to(self._msg, value))
936 937 reply_to = property(_get_reply_to, _set_reply_to, 938 doc=""" 939 The reply-to address for the message. 940 """) 941
942 - def _get_correlation_id(self):
943 return self._correlation_id.get_object()
944 - def _set_correlation_id(self, value):
945 if type(value) in (int, long): 946 value = ulong(value) 947 self._correlation_id.rewind() 948 self._correlation_id.put_object(value)
949 950 correlation_id = property(_get_correlation_id, _set_correlation_id, 951 doc=""" 952 The correlation-id for the message. 953 """) 954
955 - def _get_content_type(self):
956 return pn_message_get_content_type(self._msg)
957
958 - def _set_content_type(self, value):
959 self._check(pn_message_set_content_type(self._msg, value))
960 961 content_type = property(_get_content_type, _set_content_type, 962 doc=""" 963 The content-type of the message. 964 """) 965
966 - def _get_content_encoding(self):
967 return pn_message_get_content_encoding(self._msg)
968
969 - def _set_content_encoding(self, value):
970 self._check(pn_message_set_content_encoding(self._msg, value))
971 972 content_encoding = property(_get_content_encoding, _set_content_encoding, 973 doc=""" 974 The content-encoding of the message. 975 """) 976
977 - def _get_expiry_time(self):
978 return pn_message_get_expiry_time(self._msg)
979
980 - def _set_expiry_time(self, value):
981 self._check(pn_message_set_expiry_time(self._msg, value))
982 983 expiry_time = property(_get_expiry_time, _set_expiry_time, 984 doc=""" 985 The expiry time of the message. 986 """) 987
988 - def _get_creation_time(self):
989 return pn_message_get_creation_time(self._msg)
990
991 - def _set_creation_time(self, value):
992 self._check(pn_message_set_creation_time(self._msg, value))
993 994 creation_time = property(_get_creation_time, _set_creation_time, 995 doc=""" 996 The creation time of the message. 997 """) 998
999 - def _get_group_id(self):
1000 return pn_message_get_group_id(self._msg)
1001
1002 - def _set_group_id(self, value):
1003 self._check(pn_message_set_group_id(self._msg, value))
1004 1005 group_id = property(_get_group_id, _set_group_id, 1006 doc=""" 1007 The group id of the message. 1008 """) 1009
1010 - def _get_group_sequence(self):
1011 return pn_message_get_group_sequence(self._msg)
1012
1013 - def _set_group_sequence(self, value):
1014 self._check(pn_message_set_group_sequence(self._msg, value))
1015 1016 group_sequence = property(_get_group_sequence, _set_group_sequence, 1017 doc=""" 1018 The sequence of the message within its group. 1019 """) 1020
1021 - def _get_reply_to_group_id(self):
1022 return pn_message_get_reply_to_group_id(self._msg)
1023
1024 - def _set_reply_to_group_id(self, value):
1025 self._check(pn_message_set_reply_to_group_id(self._msg, value))
1026 1027 reply_to_group_id = property(_get_reply_to_group_id, _set_reply_to_group_id, 1028 doc=""" 1029 The group-id for any replies. 1030 """) 1031 1032 # XXX
1033 - def _get_format(self):
1034 return pn_message_get_format(self._msg)
1035
1036 - def _set_format(self, value):
1037 self._check(pn_message_set_format(self._msg, value))
1038 1039 format = property(_get_format, _set_format, 1040 doc=""" 1041 The format of the message. 1042 """) 1043
1044 - def encode(self):
1045 self._pre_encode() 1046 sz = 16 1047 while True: 1048 err, data = pn_message_encode(self._msg, sz) 1049 if err == PN_OVERFLOW: 1050 sz *= 2 1051 continue 1052 else: 1053 self._check(err) 1054 return data
1055
1056 - def decode(self, data):
1057 self._check(pn_message_decode(self._msg, data, len(data))) 1058 self._post_decode()
1059
1060 - def load(self, data):
1061 self._check(pn_message_load(self._msg, data))
1062
1063 - def save(self):
1064 sz = 16 1065 while True: 1066 err, data = pn_message_save(self._msg, sz) 1067 if err == PN_OVERFLOW: 1068 sz *= 2 1069 continue 1070 else: 1071 self._check(err) 1072 return data
1073
1074 - def __repr2__(self):
1075 props = [] 1076 for attr in ("inferred", "address", "reply_to", "durable", "ttl", 1077 "priority", "first_acquirer", "delivery_count", "id", 1078 "correlation_id", "user_id", "group_id", "group_sequence", 1079 "reply_to_group_id", "instructions", "annotations", 1080 "properties", "body"): 1081 value = getattr(self, attr) 1082 if value: props.append("%s=%r" % (attr, value)) 1083 return "Message(%s)" % ", ".join(props)
1084
1085 - def __repr__(self):
1086 tmp = pn_string(None) 1087 err = pn_inspect(self._msg, tmp) 1088 result = pn_string_get(tmp) 1089 pn_free(tmp) 1090 self._check(err) 1091 return result
1092
1093 -class Subscription(object):
1094
1095 - def __init__(self, impl):
1096 self._impl = impl
1097 1098 @property
1099 - def address(self):
1100 return pn_subscription_address(self._impl)
1101
1102 -class DataException(ProtonException):
1103 """ 1104 The DataException class is the root of the Data exception hierarchy. 1105 All exceptions raised by the Data class extend this exception. 1106 """ 1107 pass
1108
1109 -class UnmappedType:
1110
1111 - def __init__(self, msg):
1112 self.msg = msg
1113
1114 - def __repr__(self):
1115 return "UnmappedType(%s)" % self.msg
1116
1117 -class ulong(long):
1118
1119 - def __repr__(self):
1120 return "ulong(%s)" % long.__repr__(self)
1121
1122 -class timestamp(long):
1123
1124 - def __repr__(self):
1125 return "timestamp(%s)" % long.__repr__(self)
1126
1127 -class symbol(unicode):
1128
1129 - def __repr__(self):
1130 return "symbol(%s)" % unicode.__repr__(self)
1131
1132 -class char(unicode):
1133
1134 - def __repr__(self):
1135 return "char(%s)" % unicode.__repr__(self)
1136
1137 -class Described(object):
1138
1139 - def __init__(self, descriptor, value):
1140 self.descriptor = descriptor 1141 self.value = value
1142
1143 - def __repr__(self):
1144 return "Described(%r, %r)" % (self.descriptor, self.value)
1145
1146 - def __eq__(self, o):
1147 if isinstance(o, Described): 1148 return self.descriptor == o.descriptor and self.value == o.value 1149 else: 1150 return False
1151 1152 UNDESCRIBED = Constant("UNDESCRIBED")
1153 1154 -class Array(object):
1155
1156 - def __init__(self, descriptor, type, *elements):
1157 self.descriptor = descriptor 1158 self.type = type 1159 self.elements = elements
1160
1161 - def __repr__(self):
1162 if self.elements: 1163 els = ", %s" % (", ".join(map(repr, self.elements))) 1164 else: 1165 els = "" 1166 return "Array(%r, %r%s)" % (self.descriptor, self.type, els)
1167
1168 - def __eq__(self, o):
1169 if isinstance(o, Array): 1170 return self.descriptor == o.descriptor and \ 1171 self.type == o.type and self.elements == o.elements 1172 else: 1173 return False
1174
1175 -class Data:
1176 """ 1177 The L{Data} class provides an interface for decoding, extracting, 1178 creating, and encoding arbitrary AMQP data. A L{Data} object 1179 contains a tree of AMQP values. Leaf nodes in this tree correspond 1180 to scalars in the AMQP type system such as L{ints<INT>} or 1181 L{strings<STRING>}. Non-leaf nodes in this tree correspond to 1182 compound values in the AMQP type system such as L{lists<LIST>}, 1183 L{maps<MAP>}, L{arrays<ARRAY>}, or L{described values<DESCRIBED>}. 1184 The root node of the tree is the L{Data} object itself and can have 1185 an arbitrary number of children. 1186 1187 A L{Data} object maintains the notion of the current sibling node 1188 and a current parent node. Siblings are ordered within their parent. 1189 Values are accessed and/or added by using the L{next}, L{prev}, 1190 L{enter}, and L{exit} methods to navigate to the desired location in 1191 the tree and using the supplied variety of put_*/get_* methods to 1192 access or add a value of the desired type. 1193 1194 The put_* methods will always add a value I{after} the current node 1195 in the tree. If the current node has a next sibling the put_* method 1196 will overwrite the value on this node. If there is no current node 1197 or the current node has no next sibling then one will be added. The 1198 put_* methods always set the added/modified node to the current 1199 node. The get_* methods read the value of the current node and do 1200 not change which node is current. 1201 1202 The following types of scalar values are supported: 1203 1204 - L{NULL} 1205 - L{BOOL} 1206 - L{UBYTE} 1207 - L{USHORT} 1208 - L{SHORT} 1209 - L{UINT} 1210 - L{INT} 1211 - L{ULONG} 1212 - L{LONG} 1213 - L{FLOAT} 1214 - L{DOUBLE} 1215 - L{BINARY} 1216 - L{STRING} 1217 - L{SYMBOL} 1218 1219 The following types of compound values are supported: 1220 1221 - L{DESCRIBED} 1222 - L{ARRAY} 1223 - L{LIST} 1224 - L{MAP} 1225 """ 1226 1227 NULL = PN_NULL; "A null value." 1228 BOOL = PN_BOOL; "A boolean value." 1229 UBYTE = PN_UBYTE; "An unsigned byte value." 1230 BYTE = PN_BYTE; "A signed byte value." 1231 USHORT = PN_USHORT; "An unsigned short value." 1232 SHORT = PN_SHORT; "A short value." 1233 UINT = PN_UINT; "An unsigned int value." 1234 INT = PN_INT; "A signed int value." 1235 CHAR = PN_CHAR; "A character value." 1236 ULONG = PN_ULONG; "An unsigned long value." 1237 LONG = PN_LONG; "A signed long value." 1238 TIMESTAMP = PN_TIMESTAMP; "A timestamp value." 1239 FLOAT = PN_FLOAT; "A float value." 1240 DOUBLE = PN_DOUBLE; "A double value." 1241 DECIMAL32 = PN_DECIMAL32; "A DECIMAL32 value." 1242 DECIMAL64 = PN_DECIMAL64; "A DECIMAL64 value." 1243 DECIMAL128 = PN_DECIMAL128; "A DECIMAL128 value." 1244 UUID = PN_UUID; "A UUID value." 1245 BINARY = PN_BINARY; "A binary string." 1246 STRING = PN_STRING; "A unicode string." 1247 SYMBOL = PN_SYMBOL; "A symbolic string." 1248 DESCRIBED = PN_DESCRIBED; "A described value." 1249 ARRAY = PN_ARRAY; "An array value." 1250 LIST = PN_LIST; "A list value." 1251 MAP = PN_MAP; "A map value." 1252 1253 type_names = { 1254 NULL: "null", 1255 BOOL: "bool", 1256 BYTE: "byte", 1257 UBYTE: "ubyte", 1258 SHORT: "short", 1259 USHORT: "ushort", 1260 INT: "int", 1261 UINT: "uint", 1262 CHAR: "char", 1263 LONG: "long", 1264 ULONG: "ulong", 1265 TIMESTAMP: "timestamp", 1266 FLOAT: "float", 1267 DOUBLE: "double", 1268 DECIMAL32: "decimal32", 1269 DECIMAL64: "decimal64", 1270 DECIMAL128: "decimal128", 1271 UUID: "uuid", 1272 BINARY: "binary", 1273 STRING: "string", 1274 SYMBOL: "symbol", 1275 DESCRIBED: "described", 1276 ARRAY: "array", 1277 LIST: "list", 1278 MAP: "map" 1279 } 1280 1281 @classmethod
1282 - def type_name(type): return Data.type_names[type]
1283
1284 - def __init__(self, capacity=16):
1285 if type(capacity) in (int, long): 1286 self._data = pn_data(capacity) 1287 self._free = True 1288 else: 1289 self._data = capacity 1290 self._free = False
1291
1292 - def __del__(self):
1293 if self._free and hasattr(self, "_data"): 1294 pn_data_free(self._data) 1295 del self._data
1296
1297 - def _check(self, err):
1298 if err < 0: 1299 exc = EXCEPTIONS.get(err, DataException) 1300 raise exc("[%s]: %s" % (err, pn_error_text(pn_data_error(self._data)))) 1301 else: 1302 return err
1303
1304 - def clear(self):
1305 """ 1306 Clears the data object. 1307 """ 1308 pn_data_clear(self._data)
1309
1310 - def rewind(self):
1311 """ 1312 Clears current node and sets the parent to the root node. Clearing the 1313 current node sets it _before_ the first node, calling next() will advance to 1314 the first node. 1315 """ 1316 pn_data_rewind(self._data)
1317
1318 - def next(self):
1319 """ 1320 Advances the current node to its next sibling and returns its 1321 type. If there is no next sibling the current node remains 1322 unchanged and None is returned. 1323 """ 1324 found = pn_data_next(self._data) 1325 if found: 1326 return self.type() 1327 else: 1328 return None
1329
1330 - def prev(self):
1331 """ 1332 Advances the current node to its previous sibling and returns its 1333 type. If there is no previous sibling the current node remains 1334 unchanged and None is returned. 1335 """ 1336 found = pn_data_prev(self._data) 1337 if found: 1338 return self.type() 1339 else: 1340 return None
1341
1342 - def enter(self):
1343 """ 1344 Sets the parent node to the current node and clears the current node. 1345 Clearing the current node sets it _before_ the first child, 1346 call next() advances to the first child. 1347 """ 1348 return pn_data_enter(self._data)
1349
1350 - def exit(self):
1351 """ 1352 Sets the current node to the parent node and the parent node to 1353 its own parent. 1354 """ 1355 return pn_data_exit(self._data)
1356
1357 - def lookup(self, name):
1358 return pn_data_lookup(self._data, name)
1359
1360 - def narrow(self):
1361 pn_data_narrow(self._data)
1362
1363 - def widen(self):
1364 pn_data_widen(self._data)
1365
1366 - def type(self):
1367 """ 1368 Returns the type of the current node. 1369 """ 1370 dtype = pn_data_type(self._data) 1371 if dtype == -1: 1372 return None 1373 else: 1374 return dtype
1375
1376 - def encode(self):
1377 """ 1378 Returns a representation of the data encoded in AMQP format. 1379 """ 1380 size = 1024 1381 while True: 1382 cd, enc = pn_data_encode(self._data, size) 1383 if cd == PN_OVERFLOW: 1384 size *= 2 1385 elif cd >= 0: 1386 return enc 1387 else: 1388 self._check(cd)
1389
1390 - def decode(self, encoded):
1391 """ 1392 Decodes the first value from supplied AMQP data and returns the 1393 number of bytes consumed. 1394 1395 @type encoded: binary 1396 @param encoded: AMQP encoded binary data 1397 """ 1398 return self._check(pn_data_decode(self._data, encoded))
1399
1400 - def put_list(self):
1401 """ 1402 Puts a list value. Elements may be filled by entering the list 1403 node and putting element values. 1404 1405 >>> data = Data() 1406 >>> data.put_list() 1407 >>> data.enter() 1408 >>> data.put_int(1) 1409 >>> data.put_int(2) 1410 >>> data.put_int(3) 1411 >>> data.exit() 1412 """ 1413 self._check(pn_data_put_list(self._data))
1414
1415 - def put_map(self):
1416 """ 1417 Puts a map value. Elements may be filled by entering the map node 1418 and putting alternating key value pairs. 1419 1420 >>> data = Data() 1421 >>> data.put_map() 1422 >>> data.enter() 1423 >>> data.put_string("key") 1424 >>> data.put_string("value") 1425 >>> data.exit() 1426 """ 1427 self._check(pn_data_put_map(self._data))
1428
1429 - def put_array(self, described, element_type):
1430 """ 1431 Puts an array value. Elements may be filled by entering the array 1432 node and putting the element values. The values must all be of the 1433 specified array element type. If an array is described then the 1434 first child value of the array is the descriptor and may be of any 1435 type. 1436 1437 >>> data = Data() 1438 >>> 1439 >>> data.put_array(False, Data.INT) 1440 >>> data.enter() 1441 >>> data.put_int(1) 1442 >>> data.put_int(2) 1443 >>> data.put_int(3) 1444 >>> data.exit() 1445 >>> 1446 >>> data.put_array(True, Data.DOUBLE) 1447 >>> data.enter() 1448 >>> data.put_symbol("array-descriptor") 1449 >>> data.put_double(1.1) 1450 >>> data.put_double(1.2) 1451 >>> data.put_double(1.3) 1452 >>> data.exit() 1453 1454 @type described: bool 1455 @param described: specifies whether the array is described 1456 @type element_type: int 1457 @param element_type: the type of the array elements 1458 """ 1459 self._check(pn_data_put_array(self._data, described, element_type))
1460
1461 - def put_described(self):
1462 """ 1463 Puts a described value. A described node has two children, the 1464 descriptor and the value. These are specified by entering the node 1465 and putting the desired values. 1466 1467 >>> data = Data() 1468 >>> data.put_described() 1469 >>> data.enter() 1470 >>> data.put_symbol("value-descriptor") 1471 >>> data.put_string("the value") 1472 >>> data.exit() 1473 """ 1474 self._check(pn_data_put_described(self._data))
1475
1476 - def put_null(self):
1477 """ 1478 Puts a null value. 1479 """ 1480 self._check(pn_data_put_null(self._data))
1481
1482 - def put_bool(self, b):
1483 """ 1484 Puts a boolean value. 1485 1486 @param b: a boolean value 1487 """ 1488 self._check(pn_data_put_bool(self._data, b))
1489
1490 - def put_ubyte(self, ub):
1491 """ 1492 Puts an unsigned byte value. 1493 1494 @param ub: an integral value 1495 """ 1496 self._check(pn_data_put_ubyte(self._data, ub))
1497
1498 - def put_byte(self, b):
1499 """ 1500 Puts a signed byte value. 1501 1502 @param b: an integral value 1503 """ 1504 self._check(pn_data_put_byte(self._data, b))
1505
1506 - def put_ushort(self, us):
1507 """ 1508 Puts an unsigned short value. 1509 1510 @param us: an integral value. 1511 """ 1512 self._check(pn_data_put_ushort(self._data, us))
1513
1514 - def put_short(self, s):
1515 """ 1516 Puts a signed short value. 1517 1518 @param s: an integral value 1519 """ 1520 self._check(pn_data_put_short(self._data, s))
1521
1522 - def put_uint(self, ui):
1523 """ 1524 Puts an unsigned int value. 1525 1526 @param ui: an integral value 1527 """ 1528 self._check(pn_data_put_uint(self._data, ui))
1529
1530 - def put_int(self, i):
1531 """ 1532 Puts a signed int value. 1533 1534 @param i: an integral value 1535 """ 1536 self._check(pn_data_put_int(self._data, i))
1537
1538 - def put_char(self, c):
1539 """ 1540 Puts a char value. 1541 1542 @param c: a single character 1543 """ 1544 self._check(pn_data_put_char(self._data, ord(c)))
1545
1546 - def put_ulong(self, ul):
1547 """ 1548 Puts an unsigned long value. 1549 1550 @param ul: an integral value 1551 """ 1552 self._check(pn_data_put_ulong(self._data, ul))
1553
1554 - def put_long(self, l):
1555 """ 1556 Puts a signed long value. 1557 1558 @param l: an integral value 1559 """ 1560 self._check(pn_data_put_long(self._data, l))
1561
1562 - def put_timestamp(self, t):
1563 """ 1564 Puts a timestamp value. 1565 1566 @param t: an integral value 1567 """ 1568 self._check(pn_data_put_timestamp(self._data, t))
1569
1570 - def put_float(self, f):
1571 """ 1572 Puts a float value. 1573 1574 @param f: a floating point value 1575 """ 1576 self._check(pn_data_put_float(self._data, f))
1577
1578 - def put_double(self, d):
1579 """ 1580 Puts a double value. 1581 1582 @param d: a floating point value. 1583 """ 1584 self._check(pn_data_put_double(self._data, d))
1585
1586 - def put_decimal32(self, d):
1587 """ 1588 Puts a decimal32 value. 1589 1590 @param d: a decimal32 value 1591 """ 1592 self._check(pn_data_put_decimal32(self._data, d))
1593
1594 - def put_decimal64(self, d):
1595 """ 1596 Puts a decimal64 value. 1597 1598 @param d: a decimal64 value 1599 """ 1600 self._check(pn_data_put_decimal64(self._data, d))
1601
1602 - def put_decimal128(self, d):
1603 """ 1604 Puts a decimal128 value. 1605 1606 @param d: a decimal128 value 1607 """ 1608 self._check(pn_data_put_decimal128(self._data, d))
1609
1610 - def put_uuid(self, u):
1611 """ 1612 Puts a UUID value. 1613 1614 @param u: a uuid value 1615 """ 1616 self._check(pn_data_put_uuid(self._data, u.bytes))
1617
1618 - def put_binary(self, b):
1619 """ 1620 Puts a binary value. 1621 1622 @type b: binary 1623 @param b: a binary value 1624 """ 1625 self._check(pn_data_put_binary(self._data, b))
1626
1627 - def put_string(self, s):
1628 """ 1629 Puts a unicode value. 1630 1631 @type s: unicode 1632 @param s: a unicode value 1633 """ 1634 self._check(pn_data_put_string(self._data, s.encode("utf8")))
1635
1636 - def put_symbol(self, s):
1637 """ 1638 Puts a symbolic value. 1639 1640 @type s: string 1641 @param s: the symbol name 1642 """ 1643 self._check(pn_data_put_symbol(self._data, s))
1644
1645 - def get_list(self):
1646 """ 1647 If the current node is a list, return the number of elements, 1648 otherwise return zero. List elements can be accessed by entering 1649 the list. 1650 1651 >>> count = data.get_list() 1652 >>> data.enter() 1653 >>> for i in range(count): 1654 ... type = data.next() 1655 ... if type == Data.STRING: 1656 ... print data.get_string() 1657 ... elif type == ...: 1658 ... ... 1659 >>> data.exit() 1660 """ 1661 return pn_data_get_list(self._data)
1662
1663 - def get_map(self):
1664 """ 1665 If the current node is a map, return the number of child elements, 1666 otherwise return zero. Key value pairs can be accessed by entering 1667 the map. 1668 1669 >>> count = data.get_map() 1670 >>> data.enter() 1671 >>> for i in range(count/2): 1672 ... type = data.next() 1673 ... if type == Data.STRING: 1674 ... print data.get_string() 1675 ... elif type == ...: 1676 ... ... 1677 >>> data.exit() 1678 """ 1679 return pn_data_get_map(self._data)
1680
1681 - def get_array(self):
1682 """ 1683 If the current node is an array, return a tuple of the element 1684 count, a boolean indicating whether the array is described, and 1685 the type of each element, otherwise return (0, False, None). Array 1686 data can be accessed by entering the array. 1687 1688 >>> # read an array of strings with a symbolic descriptor 1689 >>> count, described, type = data.get_array() 1690 >>> data.enter() 1691 >>> data.next() 1692 >>> print "Descriptor:", data.get_symbol() 1693 >>> for i in range(count): 1694 ... data.next() 1695 ... print "Element:", data.get_string() 1696 >>> data.exit() 1697 """ 1698 count = pn_data_get_array(self._data) 1699 described = pn_data_is_array_described(self._data) 1700 type = pn_data_get_array_type(self._data) 1701 if type == -1: 1702 type = None 1703 return count, described, type
1704
1705 - def is_described(self):
1706 """ 1707 Checks if the current node is a described value. The descriptor 1708 and value may be accessed by entering the described value. 1709 1710 >>> # read a symbolically described string 1711 >>> assert data.is_described() # will error if the current node is not described 1712 >>> data.enter() 1713 >>> print data.get_symbol() 1714 >>> print data.get_string() 1715 >>> data.exit() 1716 """ 1717 return pn_data_is_described(self._data)
1718
1719 - def is_null(self):
1720 """ 1721 Checks if the current node is a null. 1722 """ 1723 return pn_data_is_null(self._data)
1724
1725 - def get_bool(self):
1726 """ 1727 If the current node is a boolean, returns its value, returns False 1728 otherwise. 1729 """ 1730 return pn_data_get_bool(self._data)
1731
1732 - def get_ubyte(self):
1733 """ 1734 If the current node is an unsigned byte, returns its value, 1735 returns 0 otherwise. 1736 """ 1737 return pn_data_get_ubyte(self._data)
1738
1739 - def get_byte(self):
1740 """ 1741 If the current node is a signed byte, returns its value, returns 0 1742 otherwise. 1743 """ 1744 return pn_data_get_byte(self._data)
1745
1746 - def get_ushort(self):
1747 """ 1748 If the current node is an unsigned short, returns its value, 1749 returns 0 otherwise. 1750 """ 1751 return pn_data_get_ushort(self._data)
1752
1753 - def get_short(self):
1754 """ 1755 If the current node is a signed short, returns its value, returns 1756 0 otherwise. 1757 """ 1758 return pn_data_get_short(self._data)
1759
1760 - def get_uint(self):
1761 """ 1762 If the current node is an unsigned int, returns its value, returns 1763 0 otherwise. 1764 """ 1765 return pn_data_get_uint(self._data)
1766
1767 - def get_int(self):
1768 """ 1769 If the current node is a signed int, returns its value, returns 0 1770 otherwise. 1771 """ 1772 return pn_data_get_int(self._data)
1773
1774 - def get_char(self):
1775 """ 1776 If the current node is a char, returns its value, returns 0 1777 otherwise. 1778 """ 1779 return char(unichr(pn_data_get_char(self._data)))
1780
1781 - def get_ulong(self):
1782 """ 1783 If the current node is an unsigned long, returns its value, 1784 returns 0 otherwise. 1785 """ 1786 return ulong(pn_data_get_ulong(self._data))
1787
1788 - def get_long(self):
1789 """ 1790 If the current node is an signed long, returns its value, returns 1791 0 otherwise. 1792 """ 1793 return pn_data_get_long(self._data)
1794
1795 - def get_timestamp(self):
1796 """ 1797 If the current node is a timestamp, returns its value, returns 0 1798 otherwise. 1799 """ 1800 return timestamp(pn_data_get_timestamp(self._data))
1801
1802 - def get_float(self):
1803 """ 1804 If the current node is a float, returns its value, raises 0 1805 otherwise. 1806 """ 1807 return pn_data_get_float(self._data)
1808
1809 - def get_double(self):
1810 """ 1811 If the current node is a double, returns its value, returns 0 1812 otherwise. 1813 """ 1814 return pn_data_get_double(self._data)
1815 1816 # XXX: need to convert
1817 - def get_decimal32(self):
1818 """ 1819 If the current node is a decimal32, returns its value, returns 0 1820 otherwise. 1821 """ 1822 return pn_data_get_decimal32(self._data)
1823 1824 # XXX: need to convert
1825 - def get_decimal64(self):
1826 """ 1827 If the current node is a decimal64, returns its value, returns 0 1828 otherwise. 1829 """ 1830 return pn_data_get_decimal64(self._data)
1831 1832 # XXX: need to convert
1833 - def get_decimal128(self):
1834 """ 1835 If the current node is a decimal128, returns its value, returns 0 1836 otherwise. 1837 """ 1838 return pn_data_get_decimal128(self._data)
1839
1840 - def get_uuid(self):
1841 """ 1842 If the current node is a UUID, returns its value, returns None 1843 otherwise. 1844 """ 1845 if pn_data_type(self._data) == Data.UUID: 1846 return uuid.UUID(bytes=pn_data_get_uuid(self._data)) 1847 else: 1848 return None
1849
1850 - def get_binary(self):
1851 """ 1852 If the current node is binary, returns its value, returns "" 1853 otherwise. 1854 """ 1855 return pn_data_get_binary(self._data)
1856
1857 - def get_string(self):
1858 """ 1859 If the current node is a string, returns its value, returns "" 1860 otherwise. 1861 """ 1862 return pn_data_get_string(self._data).decode("utf8")
1863
1864 - def get_symbol(self):
1865 """ 1866 If the current node is a symbol, returns its value, returns "" 1867 otherwise. 1868 """ 1869 return symbol(pn_data_get_symbol(self._data))
1870
1871 - def copy(self, src):
1872 self._check(pn_data_copy(self._data, src._data))
1873
1874 - def format(self):
1875 sz = 16 1876 while True: 1877 err, result = pn_data_format(self._data, sz) 1878 if err == PN_OVERFLOW: 1879 sz *= 2 1880 continue 1881 else: 1882 self._check(err) 1883 return result
1884
1885 - def dump(self):
1886 pn_data_dump(self._data)
1887
1888 - def put_dict(self, d):
1889 self.put_map() 1890 self.enter() 1891 try: 1892 for k, v in d.items(): 1893 self.put_object(k) 1894 self.put_object(v) 1895 finally: 1896 self.exit()
1897
1898 - def get_dict(self):
1899 if self.enter(): 1900 try: 1901 result = {} 1902 while self.next(): 1903 k = self.get_object() 1904 if self.next(): 1905 v = self.get_object() 1906 else: 1907 v = None 1908 result[k] = v 1909 finally: 1910 self.exit() 1911 return result
1912
1913 - def put_sequence(self, s):
1914 self.put_list() 1915 self.enter() 1916 try: 1917 for o in s: 1918 self.put_object(o) 1919 finally: 1920 self.exit()
1921
1922 - def get_sequence(self):
1923 if self.enter(): 1924 try: 1925 result = [] 1926 while self.next(): 1927 result.append(self.get_object()) 1928 finally: 1929 self.exit() 1930 return result
1931
1932 - def get_py_described(self):
1933 if self.enter(): 1934 try: 1935 self.next() 1936 descriptor = self.get_object() 1937 self.next() 1938 value = self.get_object() 1939 finally: 1940 self.exit() 1941 return Described(descriptor, value)
1942
1943 - def put_py_described(self, d):
1944 self.put_described() 1945 self.enter() 1946 try: 1947 self.put_object(d.descriptor) 1948 self.put_object(d.value) 1949 finally: 1950 self.exit()
1951
1952 - def get_py_array(self):
1953 """ 1954 If the current node is an array, return an Array object 1955 representing the array and its contents. Otherwise return None. 1956 This is a convenience wrapper around get_array, enter, etc. 1957 """ 1958 1959 count, described, type = self.get_array() 1960 if type is None: return None 1961 if self.enter(): 1962 try: 1963 if described: 1964 self.next() 1965 descriptor = self.get_object() 1966 else: 1967 descriptor = UNDESCRIBED 1968 elements = [] 1969 while self.next(): 1970 elements.append(self.get_object()) 1971 finally: 1972 self.exit() 1973 return Array(descriptor, type, *elements)
1974
1975 - def put_py_array(self, a):
1976 described = a.descriptor != UNDESCRIBED 1977 self.put_array(described, a.type) 1978 self.enter() 1979 try: 1980 if described: 1981 self.put_object(a.descriptor) 1982 for e in a.elements: 1983 self.put_object(e) 1984 finally: 1985 self.exit()
1986 1987 put_mappings = { 1988 None.__class__: lambda s, _: s.put_null(), 1989 bool: put_bool, 1990 dict: put_dict, 1991 list: put_sequence, 1992 tuple: put_sequence, 1993 unicode: put_string, 1994 bytes: put_binary, 1995 symbol: put_symbol, 1996 int: put_long, 1997 char: put_char, 1998 long: put_long, 1999 ulong: put_ulong, 2000 timestamp: put_timestamp, 2001 float: put_double, 2002 uuid.UUID: put_uuid, 2003 Described: put_py_described, 2004 Array: put_py_array 2005 } 2006 get_mappings = { 2007 NULL: lambda s: None, 2008 BOOL: get_bool, 2009 BYTE: get_byte, 2010 UBYTE: get_ubyte, 2011 SHORT: get_short, 2012 USHORT: get_ushort, 2013 INT: get_int, 2014 UINT: get_uint, 2015 CHAR: get_char, 2016 LONG: get_long, 2017 ULONG: get_ulong, 2018 TIMESTAMP: get_timestamp, 2019 FLOAT: get_float, 2020 DOUBLE: get_double, 2021 DECIMAL32: get_decimal32, 2022 DECIMAL64: get_decimal64, 2023 DECIMAL128: get_decimal128, 2024 UUID: get_uuid, 2025 BINARY: get_binary, 2026 STRING: get_string, 2027 SYMBOL: get_symbol, 2028 DESCRIBED: get_py_described, 2029 ARRAY: get_py_array, 2030 LIST: get_sequence, 2031 MAP: get_dict 2032 } 2033 2034
2035 - def put_object(self, obj):
2036 putter = self.put_mappings[obj.__class__] 2037 putter(self, obj)
2038
2039 - def get_object(self):
2040 type = self.type() 2041 if type is None: return None 2042 getter = self.get_mappings.get(type) 2043 if getter: 2044 return getter(self) 2045 else: 2046 return UnmappedType(str(type))
2047
2048 -class ConnectionException(ProtonException):
2049 pass
2050
2051 -class Endpoint(object):
2052 2053 LOCAL_UNINIT = PN_LOCAL_UNINIT 2054 REMOTE_UNINIT = PN_REMOTE_UNINIT 2055 LOCAL_ACTIVE = PN_LOCAL_ACTIVE 2056 REMOTE_ACTIVE = PN_REMOTE_ACTIVE 2057 LOCAL_CLOSED = PN_LOCAL_CLOSED 2058 REMOTE_CLOSED = PN_REMOTE_CLOSED 2059
2060 - def __init__(self):
2061 self.condition = None
2062
2063 - def _update_cond(self):
2064 obj2cond(self.condition, self._get_cond_impl())
2065 2066 @property
2067 - def remote_condition(self):
2068 return cond2obj(self._get_remote_cond_impl())
2069 2070 # the following must be provided by subclasses
2071 - def _get_cond_impl(self):
2072 assert False, "Subclass must override this!"
2073
2074 - def _get_remote_cond_impl(self):
2075 assert False, "Subclass must override this!"
2076
2077 -class Condition:
2078
2079 - def __init__(self, name, description=None, info=None):
2080 self.name = name 2081 self.description = description 2082 self.info = info
2083
2084 - def __repr__(self):
2085 return "Condition(%s)" % ", ".join([repr(x) for x in 2086 (self.name, self.description, self.info) 2087 if x])
2088
2089 - def __eq__(self, o):
2090 if not isinstance(o, Condition): return False 2091 return self.name == o.name and \ 2092 self.description == o.description and \ 2093 self.info == o.info
2094
2095 -def obj2cond(obj, cond):
2096 pn_condition_clear(cond) 2097 if obj: 2098 pn_condition_set_name(cond, str(obj.name)) 2099 pn_condition_set_description(cond, obj.description) 2100 info = Data(pn_condition_info(cond)) 2101 if obj.info: 2102 info.put_object(obj.info)
2103
2104 -def cond2obj(cond):
2105 if pn_condition_is_set(cond): 2106 return Condition(pn_condition_get_name(cond), 2107 pn_condition_get_description(cond), 2108 dat2obj(pn_condition_info(cond))) 2109 else: 2110 return None
2111
2112 -def dat2obj(dimpl):
2113 d = Data(dimpl) 2114 d.rewind() 2115 d.next() 2116 obj = d.get_object() 2117 d.rewind() 2118 return obj
2119
2120 -def obj2dat(obj, dimpl):
2121 if obj is not None: 2122 d = Data(dimpl) 2123 d.put_object(obj)
2124
2125 -def wrap_connection(conn):
2126 if not conn: return None 2127 ctx = pn_connection_get_context(conn) 2128 if ctx: return ctx 2129 wrapper = Connection(_conn=conn) 2130 return wrapper
2131
2132 -class Connection(Endpoint):
2133
2134 - def __init__(self, _conn=None):
2135 Endpoint.__init__(self) 2136 if _conn: 2137 self._conn = _conn 2138 else: 2139 self._conn = pn_connection() 2140 pn_connection_set_context(self._conn, self) 2141 self.offered_capabilities = None 2142 self.desired_capabilities = None 2143 self.properties = None
2144
2145 - def __del__(self):
2146 if hasattr(self, "_conn"): 2147 pn_connection_free(self._conn) 2148 del self._conn
2149
2150 - def _check(self, err):
2151 if err < 0: 2152 exc = EXCEPTIONS.get(err, ConnectionException) 2153 raise exc("[%s]: %s" % (err, pn_connection_error(self._conn))) 2154 else: 2155 return err
2156
2157 - def _get_cond_impl(self):
2158 return pn_connection_condition(self._conn)
2159
2160 - def _get_remote_cond_impl(self):
2161 return pn_connection_remote_condition(self._conn)
2162
2163 - def _get_container(self):
2164 return pn_connection_get_container(self._conn)
2165 - def _set_container(self, name):
2166 return pn_connection_set_container(self._conn, name)
2167 2168 container = property(_get_container, _set_container) 2169
2170 - def _get_hostname(self):
2171 return pn_connection_get_hostname(self._conn)
2172 - def _set_hostname(self, name):
2173 return pn_connection_set_hostname(self._conn, name)
2174 2175 hostname = property(_get_hostname, _set_hostname) 2176 2177 @property
2178 - def remote_container(self):
2179 return pn_connection_remote_container(self._conn)
2180 2181 @property
2182 - def remote_hostname(self):
2183 return pn_connection_remote_hostname(self._conn)
2184 2185 @property
2187 return dat2obj(pn_connection_remote_offered_capabilities(self._conn))
2188 2189 @property
2191 return dat2obj(pn_connection_remote_desired_capabilities(self._conn))
2192 2193 @property
2194 - def remote_properties(self):
2195 return dat2obj(pn_connection_remote_properties(self._conn))
2196
2197 - def open(self):
2198 obj2dat(self.offered_capabilities, 2199 pn_connection_offered_capabilities(self._conn)) 2200 obj2dat(self.desired_capabilities, 2201 pn_connection_desired_capabilities(self._conn)) 2202 obj2dat(self.properties, pn_connection_properties(self._conn)) 2203 pn_connection_open(self._conn)
2204
2205 - def close(self):
2206 self._update_cond() 2207 pn_connection_close(self._conn)
2208 2209 @property
2210 - def state(self):
2211 return pn_connection_state(self._conn)
2212
2213 - def session(self):
2214 return wrap_session(pn_session(self._conn))
2215
2216 - def session_head(self, mask):
2217 return wrap_session(pn_session_head(self._conn, mask))
2218 2221 2222 @property
2223 - def work_head(self):
2224 return wrap_delivery(pn_work_head(self._conn))
2225 2226 @property
2227 - def error(self):
2228 return pn_error_code(pn_connection_error(self._conn))
2229
2230 -class SessionException(ProtonException):
2231 pass
2232
2233 -def wrap_session(ssn):
2234 if ssn is None: return None 2235 ctx = pn_session_get_context(ssn) 2236 if ctx: 2237 return ctx 2238 else: 2239 wrapper = Session(ssn) 2240 pn_session_set_context(ssn, wrapper) 2241 return wrapper
2242
2243 -class Session(Endpoint):
2244
2245 - def __init__(self, ssn):
2246 Endpoint.__init__(self) 2247 self._ssn = ssn
2248
2249 - def __del__(self):
2250 if hasattr(self, "_ssn"): 2251 pn_session_free(self._ssn) 2252 del self._ssn
2253
2254 - def _get_cond_impl(self):
2255 return pn_session_condition(self._ssn)
2256
2257 - def _get_remote_cond_impl(self):
2258 return pn_session_remote_condition(self._ssn)
2259
2260 - def _get_incoming_capacity(self):
2261 return pn_session_get_incoming_capacity(self._ssn)
2262
2263 - def _set_incoming_capacity(self, capacity):
2264 pn_session_set_incoming_capacity(self._ssn, capacity)
2265 2266 incoming_capacity = property(_get_incoming_capacity, _set_incoming_capacity) 2267 2268 @property
2269 - def outgoing_bytes(self):
2270 return pn_session_outgoing_bytes(self._ssn)
2271 2272 @property
2273 - def incoming_bytes(self):
2274 return pn_session_incoming_bytes(self._ssn)
2275
2276 - def open(self):
2277 pn_session_open(self._ssn)
2278
2279 - def close(self):
2280 self._update_cond() 2281 pn_session_close(self._ssn)
2282
2283 - def next(self, mask):
2284 return wrap_session(pn_session_next(self._ssn, mask))
2285 2286 @property
2287 - def state(self):
2288 return pn_session_state(self._ssn)
2289 2290 @property
2291 - def connection(self):
2292 return wrap_connection(pn_session_connection(self._ssn))
2293
2294 - def sender(self, name):
2295 return wrap_link(pn_sender(self._ssn, name))
2296
2297 - def receiver(self, name):
2298 return wrap_link(pn_receiver(self._ssn, name))
2299
2300 -class LinkException(ProtonException):
2301 pass
2302 2315 2440
2441 -class Terminus(object):
2442 2443 UNSPECIFIED = PN_UNSPECIFIED 2444 SOURCE = PN_SOURCE 2445 TARGET = PN_TARGET 2446 COORDINATOR = PN_COORDINATOR 2447 2448 NONDURABLE = PN_NONDURABLE 2449 CONFIGURATION = PN_CONFIGURATION 2450 DELIVERIES = PN_DELIVERIES 2451 2452 DIST_MODE_UNSPECIFIED = PN_DIST_MODE_UNSPECIFIED 2453 DIST_MODE_COPY = PN_DIST_MODE_COPY 2454 DIST_MODE_MOVE = PN_DIST_MODE_MOVE 2455
2456 - def __init__(self, impl):
2457 self._impl = impl
2458
2459 - def _check(self, err):
2460 if err < 0: 2461 exc = EXCEPTIONS.get(err, LinkException) 2462 raise exc("[%s]" % err) 2463 else: 2464 return err
2465
2466 - def _get_type(self):
2467 return pn_terminus_get_type(self._impl)
2468 - def _set_type(self, type):
2469 self._check(pn_terminus_set_type(self._impl, type))
2470 type = property(_get_type, _set_type) 2471
2472 - def _get_address(self):
2473 return pn_terminus_get_address(self._impl)
2474 - def _set_address(self, address):
2475 self._check(pn_terminus_set_address(self._impl, address))
2476 address = property(_get_address, _set_address) 2477
2478 - def _get_durability(self):
2479 return pn_terminus_get_durability(self._impl)
2480 - def _set_durability(self, seconds):
2481 self._check(pn_terminus_set_durability(self._impl, seconds))
2482 durability = property(_get_durability, _set_durability) 2483
2484 - def _get_expiry_policy(self):
2485 return pn_terminus_get_expiry_policy(self._impl)
2486 - def _set_expiry_policy(self, seconds):
2487 self._check(pn_terminus_set_expiry_policy(self._impl, seconds))
2488 expiry_policy = property(_get_expiry_policy, _set_expiry_policy) 2489
2490 - def _get_timeout(self):
2491 return pn_terminus_get_timeout(self._impl)
2492 - def _set_timeout(self, seconds):
2493 self._check(pn_terminus_set_timeout(self._impl, seconds))
2494 timeout = property(_get_timeout, _set_timeout) 2495
2496 - def _is_dynamic(self):
2497 return pn_terminus_is_dynamic(self._impl)
2498 - def _set_dynamic(self, dynamic):
2499 self._check(pn_terminus_set_dynamic(self._impl, dynamic))
2500 dynamic = property(_is_dynamic, _set_dynamic) 2501
2502 - def _get_distribution_mode(self):
2503 return pn_terminus_get_distribution_mode(self._impl)
2504 - def _set_distribution_mode(self, mode):
2505 self._check(pn_terminus_set_distribution_mode(self._impl, mode))
2506 distribution_mode = property(_get_distribution_mode, _set_distribution_mode) 2507 2508 @property
2509 - def properties(self):
2510 return Data(pn_terminus_properties(self._impl))
2511 2512 @property
2513 - def capabilities(self):
2514 return Data(pn_terminus_capabilities(self._impl))
2515 2516 @property
2517 - def outcomes(self):
2518 return Data(pn_terminus_outcomes(self._impl))
2519 2520 @property
2521 - def filter(self):
2522 return Data(pn_terminus_filter(self._impl))
2523
2524 - def copy(self, src):
2525 self._check(pn_terminus_copy(self._impl, src._impl))
2526
2527 2528 -class Sender(Link):
2529
2530 - def offered(self, n):
2531 pn_link_offered(self._link, n)
2532
2533 - def send(self, bytes):
2534 return self._check(pn_link_send(self._link, bytes))
2535
2536 -class Receiver(Link):
2537
2538 - def flow(self, n):
2539 pn_link_flow(self._link, n)
2540
2541 - def recv(self, limit):
2542 n, bytes = pn_link_recv(self._link, limit) 2543 if n == PN_EOS: 2544 return None 2545 else: 2546 self._check(n) 2547 return bytes
2548
2549 - def drain(self, n):
2550 pn_link_drain(self._link, n)
2551
2552 - def draining(self):
2553 return pn_link_draining(self._link)
2554
2555 -def wrap_delivery(dlv):
2556 if not dlv: return None 2557 ctx = pn_delivery_get_context(dlv) 2558 if ctx: return ctx 2559 wrapper = Delivery(dlv) 2560 pn_delivery_set_context(dlv, wrapper) 2561 return wrapper
2562
2563 -class Disposition(object):
2564 2565 RECEIVED = PN_RECEIVED 2566 ACCEPTED = PN_ACCEPTED 2567 REJECTED = PN_REJECTED 2568 RELEASED = PN_RELEASED 2569 MODIFIED = PN_MODIFIED 2570
2571 - def __init__(self, impl, local):
2572 self._impl = impl 2573 self.local = local 2574 self._data = None 2575 self._condition = None 2576 self._annotations = None
2577 2578 @property
2579 - def type(self):
2580 return pn_disposition_type(self._impl)
2581
2582 - def _get_section_number(self):
2583 return pn_disposition_get_section_number(self._impl)
2584 - def _set_section_number(self, n):
2585 pn_disposition_set_section_number(self._impl, n)
2586 section_number = property(_get_section_number, _set_section_number) 2587
2588 - def _get_section_offset(self):
2589 return pn_disposition_get_section_offset(self._impl)
2590 - def _set_section_offset(self, n):
2591 pn_disposition_set_section_offset(self._impl, n)
2592 section_offset = property(_get_section_offset, _set_section_offset) 2593
2594 - def _get_failed(self):
2595 return pn_disposition_is_failed(self._impl)
2596 - def _set_failed(self, b):
2597 pn_disposition_set_failed(self._impl, b)
2598 failed = property(_get_failed, _set_failed) 2599
2600 - def _get_undeliverable(self):
2601 return pn_disposition_is_undeliverable(self._impl)
2602 - def _set_undeliverable(self, b):
2603 pn_disposition_set_undeliverable(self._impl, b)
2604 undeliverable = property(_get_undeliverable, _set_undeliverable) 2605
2606 - def _get_data(self):
2607 if self.local: 2608 return self._data 2609 else: 2610 return dat2obj(pn_disposition_data(self._impl))
2611 - def _set_data(self, obj):
2612 if self.local: 2613 self._data = obj 2614 else: 2615 raise AttributeError("data attribute is read-only")
2616 data = property(_get_data, _set_data) 2617
2618 - def _get_annotations(self):
2619 if self.local: 2620 return self._annotations 2621 else: 2622 return dat2obj(pn_disposition_annotations(self._impl))
2623 - def _set_annotations(self, obj):
2624 if self.local: 2625 self._annotations = obj 2626 else: 2627 raise AttributeError("annotations attribute is read-only")
2628 annotations = property(_get_annotations, _set_annotations) 2629
2630 - def _get_condition(self):
2631 if self.local: 2632 return self._condition 2633 else: 2634 return cond2obj(pn_disposition_condition(self._impl))
2635 - def _set_condition(self, obj):
2636 if self.local: 2637 self._condition = obj 2638 else: 2639 raise AttributeError("condition attribute is read-only")
2640 condition = property(_get_condition, _set_condition)
2641
2642 -class Delivery(object):
2643 2644 RECEIVED = Disposition.RECEIVED 2645 ACCEPTED = Disposition.ACCEPTED 2646 REJECTED = Disposition.REJECTED 2647 RELEASED = Disposition.RELEASED 2648 MODIFIED = Disposition.MODIFIED 2649
2650 - def __init__(self, dlv):
2651 self._dlv = dlv 2652 self.local = Disposition(pn_delivery_local(self._dlv), True) 2653 self.remote = Disposition(pn_delivery_remote(self._dlv), False)
2654 2655 @property
2656 - def tag(self):
2657 return pn_delivery_tag(self._dlv)
2658 2659 @property
2660 - def writable(self):
2661 return pn_delivery_writable(self._dlv)
2662 2663 @property
2664 - def readable(self):
2665 return pn_delivery_readable(self._dlv)
2666 2667 @property
2668 - def updated(self):
2669 return pn_delivery_updated(self._dlv)
2670
2671 - def update(self, state):
2672 obj2dat(self.local._data, pn_disposition_data(self.local._impl)) 2673 obj2dat(self.local._annotations, pn_disposition_annotations(self.local._impl)) 2674 obj2cond(self.local._condition, pn_disposition_condition(self.local._impl)) 2675 pn_delivery_update(self._dlv, state)
2676 2677 @property
2678 - def pending(self):
2679 return pn_delivery_pending(self._dlv)
2680 2681 @property
2682 - def partial(self):
2683 return pn_delivery_partial(self._dlv)
2684 2685 @property
2686 - def local_state(self):
2687 return pn_delivery_local_state(self._dlv)
2688 2689 @property
2690 - def remote_state(self):
2691 return pn_delivery_remote_state(self._dlv)
2692 2693 @property
2694 - def settled(self):
2695 return pn_delivery_settled(self._dlv)
2696
2697 - def settle(self):
2698 pn_delivery_settle(self._dlv)
2699 2700 @property
2701 - def work_next(self):
2702 return wrap_delivery(pn_work_next(self._dlv))
2703 2704 @property
2707
2708 -class TransportException(ProtonException):
2709 pass
2710
2711 -class Transport(object):
2712 2713 TRACE_DRV = PN_TRACE_DRV 2714 TRACE_FRM = PN_TRACE_FRM 2715 TRACE_RAW = PN_TRACE_RAW 2716
2717 - def __init__(self, _trans=None):
2718 if not _trans: 2719 self._trans = pn_transport() 2720 else: 2721 self._shared_trans = True 2722 self._trans = _trans 2723 self._sasl = None 2724 self._ssl = None
2725
2726 - def __del__(self):
2727 if hasattr(self, "_trans"): 2728 if not hasattr(self, "_shared_trans"): 2729 pn_transport_free(self._trans) 2730 if hasattr(self, "_sasl") and self._sasl: 2731 # pn_transport_free deallocs the C sasl associated with the 2732 # transport, so erase the reference if a SASL object was used. 2733 self._sasl._sasl = None 2734 self._sasl = None 2735 if hasattr(self, "_ssl") and self._ssl: 2736 # ditto the owned c SSL object 2737 self._ssl._ssl = None 2738 self._ssl = None 2739 del self._trans
2740
2741 - def _check(self, err):
2742 if err < 0: 2743 exc = EXCEPTIONS.get(err, TransportException) 2744 raise exc("[%s]: %s" % (err, pn_error_text(pn_transport_error(self._trans)))) 2745 else: 2746 return err
2747
2748 - def bind(self, connection):
2749 self._check(pn_transport_bind(self._trans, connection._conn))
2750
2751 - def trace(self, n):
2752 pn_transport_trace(self._trans, n)
2753
2754 - def tick(self, now):
2755 """Process any timed events (like heartbeat generation). 2756 now = seconds since epoch (float). 2757 """ 2758 next = pn_transport_tick(self._trans, long(now * 1000)) 2759 return float(next) / 1000.0
2760
2761 - def capacity(self):
2762 c = pn_transport_capacity(self._trans) 2763 if c >= PN_EOS: 2764 return c 2765 else: 2766 return self._check(c)
2767
2768 - def push(self, bytes):
2769 self._check(pn_transport_push(self._trans, bytes))
2770
2771 - def close_tail(self):
2772 self._check(pn_transport_close_tail(self._trans))
2773
2774 - def pending(self):
2775 p = pn_transport_pending(self._trans) 2776 if p >= PN_EOS: 2777 return p 2778 else: 2779 return self._check(p)
2780
2781 - def peek(self, size):
2782 cd, out = pn_transport_peek(self._trans, size) 2783 if cd == PN_EOS: 2784 return None 2785 else: 2786 self._check(cd) 2787 return out
2788
2789 - def pop(self, size):
2790 pn_transport_pop(self._trans, size)
2791
2792 - def close_head(self):
2793 self._check(pn_transport_close_head(self._trans))
2794
2795 - def output(self, size):
2796 p = self.pending() 2797 if p < 0: 2798 return None 2799 else: 2800 out = self.peek(min(size, p)) 2801 self.pop(len(out)) 2802 return out
2803
2804 - def input(self, bytes):
2805 if not bytes: 2806 self.close_tail() 2807 return None 2808 else: 2809 c = self.capacity() 2810 if (c < 0): 2811 return None 2812 trimmed = bytes[:c] 2813 self.push(trimmed) 2814 return len(trimmed)
2815 2816 # AMQP 1.0 max-frame-size
2817 - def _get_max_frame_size(self):
2818 return pn_transport_get_max_frame(self._trans)
2819
2820 - def _set_max_frame_size(self, value):
2821 pn_transport_set_max_frame(self._trans, value)
2822 2823 max_frame_size = property(_get_max_frame_size, _set_max_frame_size, 2824 doc=""" 2825 Sets the maximum size for received frames (in bytes). 2826 """) 2827 2828 @property
2829 - def remote_max_frame_size(self):
2830 return pn_transport_get_remote_max_frame(self._trans)
2831 2832 # AMQP 1.0 idle-time-out
2833 - def _get_idle_timeout(self):
2834 msec = pn_transport_get_idle_timeout(self._trans) 2835 return float(msec)/1000.0
2836
2837 - def _set_idle_timeout(self, sec):
2838 pn_transport_set_idle_timeout(self._trans, long(sec * 1000))
2839 2840 idle_timeout = property(_get_idle_timeout, _set_idle_timeout, 2841 doc=""" 2842 The idle timeout of the connection (in milliseconds). 2843 """) 2844 2845 @property
2846 - def remote_idle_timeout(self):
2847 msec = pn_transport_get_remote_idle_timeout(self._trans) 2848 return float(msec)/1000.0
2849 2850 @property
2851 - def frames_output(self):
2852 return pn_transport_get_frames_output(self._trans)
2853 2854 @property
2855 - def frames_input(self):
2856 return pn_transport_get_frames_input(self._trans)
2857
2858 - def sasl(self):
2859 # SASL factory (singleton for this transport) 2860 if not self._sasl: 2861 self._sasl = SASL(self) 2862 return self._sasl
2863
2864 - def ssl(self, domain=None, session_details=None):
2865 # SSL factory (singleton for this transport) 2866 if not self._ssl: 2867 self._ssl = SSL(self, domain, session_details) 2868 return self._ssl
2869
2870 -class SASLException(TransportException):
2871 pass
2872
2873 -class SASL(object):
2874 2875 OK = PN_SASL_OK 2876 AUTH = PN_SASL_AUTH 2877
2878 - def __new__(cls, transport):
2879 """Enforce a singleton SASL object per Transport""" 2880 if not transport._sasl: 2881 obj = super(SASL, cls).__new__(cls) 2882 obj._sasl = pn_sasl(transport._trans) 2883 transport._sasl = obj 2884 return transport._sasl
2885
2886 - def _check(self, err):
2887 if err < 0: 2888 exc = EXCEPTIONS.get(err, SASLException) 2889 raise exc("[%s]" % (err)) 2890 else: 2891 return err
2892
2893 - def mechanisms(self, mechs):
2894 pn_sasl_mechanisms(self._sasl, mechs)
2895
2896 - def client(self):
2897 pn_sasl_client(self._sasl)
2898
2899 - def server(self):
2900 pn_sasl_server(self._sasl)
2901
2902 - def plain(self, user, password):
2903 pn_sasl_plain(self._sasl, user, password)
2904
2905 - def send(self, data):
2906 self._check(pn_sasl_send(self._sasl, data, len(data)))
2907
2908 - def recv(self):
2909 sz = 16 2910 while True: 2911 n, data = pn_sasl_recv(self._sasl, sz) 2912 if n == PN_OVERFLOW: 2913 sz *= 2 2914 continue 2915 elif n == PN_EOS: 2916 return None 2917 else: 2918 self._check(n) 2919 return data
2920 2921 @property
2922 - def outcome(self):
2923 outcome = pn_sasl_outcome(self._sasl) 2924 if outcome == PN_SASL_NONE: 2925 return None 2926 else: 2927 return outcome
2928
2929 - def done(self, outcome):
2930 pn_sasl_done(self._sasl, outcome)
2931 2932 STATE_CONF = PN_SASL_CONF 2933 STATE_IDLE = PN_SASL_IDLE 2934 STATE_STEP = PN_SASL_STEP 2935 STATE_PASS = PN_SASL_PASS 2936 STATE_FAIL = PN_SASL_FAIL 2937 2938 @property
2939 - def state(self):
2940 return pn_sasl_state(self._sasl)
2941
2942 2943 -class SSLException(TransportException):
2944 pass
2945
2946 -class SSLUnavailable(SSLException):
2947 pass
2948
2949 -class SSLDomain(object):
2950 2951 MODE_CLIENT = PN_SSL_MODE_CLIENT 2952 MODE_SERVER = PN_SSL_MODE_SERVER 2953 VERIFY_PEER = PN_SSL_VERIFY_PEER 2954 VERIFY_PEER_NAME = PN_SSL_VERIFY_PEER_NAME 2955 ANONYMOUS_PEER = PN_SSL_ANONYMOUS_PEER 2956
2957 - def __init__(self, mode):
2958 self._domain = pn_ssl_domain(mode) 2959 if self._domain is None: 2960 raise SSLUnavailable()
2961
2962 - def _check(self, err):
2963 if err < 0: 2964 exc = EXCEPTIONS.get(err, SSLException) 2965 raise exc("SSL failure.") 2966 else: 2967 return err
2968
2969 - def set_credentials(self, cert_file, key_file, password):
2970 return self._check( pn_ssl_domain_set_credentials(self._domain, 2971 cert_file, key_file, 2972 password) )
2973 - def set_trusted_ca_db(self, certificate_db):
2974 return self._check( pn_ssl_domain_set_trusted_ca_db(self._domain, 2975 certificate_db) )
2976 - def set_peer_authentication(self, verify_mode, trusted_CAs=None):
2977 return self._check( pn_ssl_domain_set_peer_authentication(self._domain, 2978 verify_mode, 2979 trusted_CAs) )
2980
2981 - def allow_unsecured_client(self):
2982 return self._check( pn_ssl_domain_allow_unsecured_client(self._domain) )
2983
2984 -class SSL(object):
2985
2986 - def _check(self, err):
2987 if err < 0: 2988 exc = EXCEPTIONS.get(err, SSLException) 2989 raise exc("SSL failure.") 2990 else: 2991 return err
2992
2993 - def __new__(cls, transport, domain, session_details=None):
2994 """Enforce a singleton SSL object per Transport""" 2995 if transport._ssl: 2996 # unfortunately, we've combined the allocation and the configuration in a 2997 # single step. So catch any attempt by the application to provide what 2998 # may be a different configuration than the original (hack) 2999 ssl = transport._ssl 3000 if (domain and (ssl._domain is not domain) or 3001 session_details and (ssl._session_details is not session_details)): 3002 raise SSLException("Cannot re-configure existing SSL object!") 3003 else: 3004 obj = super(SSL, cls).__new__(cls) 3005 obj._domain = domain 3006 obj._session_details = session_details 3007 session_id = None 3008 if session_details: 3009 session_id = session_details.get_session_id() 3010 obj._ssl = pn_ssl( transport._trans ) 3011 if obj._ssl is None: 3012 raise SSLUnavailable() 3013 pn_ssl_init( obj._ssl, domain._domain, session_id ) 3014 transport._ssl = obj 3015 return transport._ssl
3016
3017 - def cipher_name(self):
3018 rc, name = pn_ssl_get_cipher_name( self._ssl, 128 ) 3019 if rc: 3020 return name 3021 return None
3022
3023 - def protocol_name(self):
3024 rc, name = pn_ssl_get_protocol_name( self._ssl, 128 ) 3025 if rc: 3026 return name 3027 return None
3028 3029 RESUME_UNKNOWN = PN_SSL_RESUME_UNKNOWN 3030 RESUME_NEW = PN_SSL_RESUME_NEW 3031 RESUME_REUSED = PN_SSL_RESUME_REUSED 3032
3033 - def resume_status(self):
3034 return pn_ssl_resume_status( self._ssl )
3035
3036 - def _set_peer_hostname(self, hostname):
3037 self._check(pn_ssl_set_peer_hostname( self._ssl, hostname ))
3038 - def _get_peer_hostname(self):
3039 err, name = pn_ssl_get_peer_hostname( self._ssl, 1024 ) 3040 self._check(err) 3041 return name
3042 peer_hostname = property(_get_peer_hostname, _set_peer_hostname, 3043 doc=""" 3044 Manage the expected name of the remote peer. Used to authenticate the remote. 3045 """)
3046
3047 3048 -class SSLSessionDetails(object):
3049 """ Unique identifier for the SSL session. Used to resume previous session on a new 3050 SSL connection. 3051 """ 3052
3053 - def __init__(self, session_id):
3054 self._session_id = session_id
3055
3056 - def get_session_id(self):
3057 return self._session_id
3058
3059 3060 ### 3061 # Driver 3062 ### 3063 3064 -class DriverException(ProtonException):
3065 """ 3066 The DriverException class is the root of the driver exception hierarchy. 3067 """ 3068 pass
3069
3070 3071 -def wrap_connector(cxtr):
3072 if not cxtr: return None 3073 ctx = pn_connector_context(cxtr) 3074 if ctx: return ctx 3075 wrapper = Connector(_cxtr=cxtr) 3076 pn_connector_set_context(cxtr, wrapper) 3077 return wrapper
3078
3079 -class Connector(object):
3080 - def __init__(self, _cxtr):
3081 self._cxtr = _cxtr
3082
3083 - def next(self):
3084 return wrap_connector(pn_connector_next(self._cxtr))
3085
3086 - def process(self):
3087 pn_connector_process(self._cxtr)
3088
3089 - def listener(self):
3090 return wrap_listener(pn_connector_listener(self._cxtr))
3091
3092 - def sasl(self):
3093 ## seems easier just to grab the SASL associated with the transport: 3094 trans = self.transport 3095 if trans: 3096 return SASL(self.transport) 3097 return None
3098 3099 @property
3100 - def transport(self):
3101 trans = pn_connector_transport(self._cxtr) 3102 if trans: 3103 return Transport(trans) 3104 return None
3105
3106 - def close(self):
3107 return pn_connector_close(self._cxtr)
3108 3109 @property
3110 - def closed(self):
3111 return pn_connector_closed(self._cxtr)
3112
3113 - def _get_connection(self):
3114 return wrap_connection(pn_connector_connection(self._cxtr))
3115
3116 - def _set_connection(self, conn):
3117 pn_connector_set_connection(self._cxtr, conn._conn)
3118 3119 connection = property(_get_connection, _set_connection, 3120 doc=""" 3121 Associate a Connection with this Connector. 3122 """)
3123
3124 -def wrap_listener(lsnr):
3125 if not lsnr: return None 3126 ctx = pn_listener_context(lsnr) 3127 if ctx: return ctx 3128 wrapper = Listener(_lsnr=lsnr) 3129 pn_listener_set_context(lsnr, wrapper) 3130 return wrapper
3131
3132 -class Listener(object):
3133 - def __init__(self, _lsnr=None):
3134 self._lsnr = _lsnr
3135
3136 - def next(self):
3137 return wrap_listener(pn_listener_next(self._lsnr))
3138
3139 - def accept(self):
3140 cxtr = pn_listener_accept(self._lsnr) 3141 return wrap_connector(cxtr)
3142
3143 - def close(self):
3144 pn_listener_close(self._lsnr)
3145
3146 -class Driver(object):
3147 - def __init__(self):
3148 self._driver = pn_driver()
3149
3150 - def __del__(self):
3151 if hasattr(self, "_driver"): 3152 pn_driver_free(self._driver) 3153 del self._driver
3154
3155 - def wait(self, timeout_sec):
3156 if timeout_sec is None or timeout_sec < 0.0: 3157 t = -1 3158 else: 3159 t = long(1000*timeout_sec) 3160 return pn_driver_wait(self._driver, t)
3161
3162 - def wakeup(self):
3163 return pn_driver_wakeup(self._driver)
3164
3165 - def listener(self, host, port):
3166 return wrap_listener(pn_listener(self._driver, host, port, None))
3167
3168 - def pending_listener(self):
3169 return wrap_listener(pn_driver_listener(self._driver))
3170
3171 - def head_listener(self):
3172 return wrap_listener(pn_listener_head(self._driver))
3173
3174 - def connector(self, host, port):
3175 return wrap_connector(pn_connector(self._driver, host, port, None))
3176
3177 - def head_connector(self):
3178 return wrap_connector(pn_connector_head(self._driver))
3179
3180 - def pending_connector(self):
3181 return wrap_connector(pn_driver_connector(self._driver))
3182 3183 __all__ = [ 3184 "API_LANGUAGE", 3185 "IMPLEMENTATION_LANGUAGE", 3186 "ABORTED", 3187 "ACCEPTED", 3188 "AUTOMATIC", 3189 "PENDING", 3190 "MANUAL", 3191 "REJECTED", 3192 "RELEASED", 3193 "SETTLED", 3194 "UNDESCRIBED", 3195 "Array", 3196 "Condition", 3197 "Connection", 3198 "Connector", 3199 "Data", 3200 "Delivery", 3201 "Disposition", 3202 "Described", 3203 "Driver", 3204 "DriverException", 3205 "Endpoint", 3206 "Link", 3207 "Listener", 3208 "Message", 3209 "MessageException", 3210 "Messenger", 3211 "MessengerException", 3212 "ProtonException", 3213 "Receiver", 3214 "SASL", 3215 "Sender", 3216 "Session", 3217 "SSL", 3218 "SSLDomain", 3219 "SSLSessionDetails", 3220 "SSLUnavailable", 3221 "SSLException", 3222 "Terminus", 3223 "Timeout", 3224 "Interrupt", 3225 "Transport", 3226 "TransportException", 3227 "char", 3228 "symbol", 3229 "timestamp", 3230 "ulong" 3231 ] 3232