Module proton
[frames] | no frames]

Source Code for Module proton

   1  # 
   2  # Licensed to the Apache Software Foundation (ASF) under one 
   3  # or more contributor license agreements.  See the NOTICE file 
   4  # distributed with this work for additional information 
   5  # regarding copyright ownership.  The ASF licenses this file 
   6  # to you under the Apache License, Version 2.0 (the 
   7  # "License"); you may not use this file except in compliance 
   8  # with the License.  You may obtain a copy of the License at 
   9  #  
  10  #   http://www.apache.org/licenses/LICENSE-2.0 
  11  #  
  12  # Unless required by applicable law or agreed to in writing, 
  13  # software distributed under the License is distributed on an 
  14  # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
  15  # KIND, either express or implied.  See the License for the 
  16  # specific language governing permissions and limitations 
  17  # under the License. 
  18  # 
  19   
  20  """ 
  21  The proton module defines a suite of APIs that implement the AMQP 1.0 
  22  protocol. 
  23   
  24  The proton APIs consist of the following classes: 
  25   
  26   - L{Messenger} -- A messaging endpoint. 
  27   - L{Message}   -- A class for creating and/or accessing AMQP message content. 
  28   - L{Data}      -- A class for creating and/or accessing arbitrary AMQP encoded 
  29                    data. 
  30   
  31  """ 
  32   
  33  from cproton import * 
  34  try: 
  35    import uuid 
  36  except ImportError: 
  37    """ 
  38    No 'native' UUID support.  Provide a very basic UUID type that is a compatible subset of the uuid type provided by more modern python releases. 
  39    """ 
  40    import struct 
41 - class uuid:
42 - class UUID:
43 - def __init__(self, hex=None, bytes=None):
44 if [hex, bytes].count(None) != 1: 45 raise TypeErrror("need one of hex or bytes") 46 if bytes is not None: 47 self.bytes = bytes 48 elif hex is not None: 49 fields=hex.split("-") 50 fields[4:5] = [fields[4][:4], fields[4][4:]] 51 self.bytes = struct.pack("!LHHHHL", *[int(x,16) for x in fields])
52
53 - def __cmp__(self, other):
54 if isinstance(other, uuid.UUID): 55 return cmp(self.bytes, other.bytes) 56 else: 57 return -1
58
59 - def __str__(self):
60 return "%08x-%04x-%04x-%04x-%04x%08x" % struct.unpack("!LHHHHL", self.bytes)
61
62 - def __repr__(self):
63 return "UUID(%r)" % str(self)
64
65 - def __hash__(self):
66 return self.bytes.__hash__()
67 68 import os, random, socket, time 69 rand = random.Random() 70 rand.seed((os.getpid(), time.time(), socket.gethostname()))
71 - def random_uuid():
72 bytes = [rand.randint(0, 255) for i in xrange(16)] 73 74 # From RFC4122, the version bits are set to 0100 75 bytes[7] &= 0x0F 76 bytes[7] |= 0x40 77 78 # From RFC4122, the top two bits of byte 8 get set to 01 79 bytes[8] &= 0x3F 80 bytes[8] |= 0x80 81 return "".join(map(chr, bytes))
82
83 - def uuid4():
84 return uuid.UUID(bytes=random_uuid())
85 86 try: 87 bytes() 88 except NameError: 89 bytes = str 90 91 API_LANGUAGE = "C" 92 IMPLEMENTATION_LANGUAGE = "C"
93 94 -class Constant(object):
95
96 - def __init__(self, name):
97 self.name = name
98
99 - def __repr__(self):
100 return self.name
101
102 -class ProtonException(Exception):
103 """ 104 The root of the proton exception hierarchy. All proton exception 105 classes derive from this exception. 106 """ 107 pass
108
109 -class Timeout(ProtonException):
110 """ 111 A timeout exception indicates that a blocking operation has timed 112 out. 113 """ 114 pass
115
116 -class Interrupt(ProtonException):
117 """ 118 An interrupt exception indicaes that a blocking operation was interrupted. 119 """ 120 pass
121
122 -class MessengerException(ProtonException):
123 """ 124 The root of the messenger exception hierarchy. All exceptions 125 generated by the messenger class derive from this exception. 126 """ 127 pass
128
129 -class MessageException(ProtonException):
130 """ 131 The MessageException class is the root of the message exception 132 hierarhcy. All exceptions generated by the Message class derive from 133 this exception. 134 """ 135 pass
136 137 EXCEPTIONS = { 138 PN_TIMEOUT: Timeout, 139 PN_INTR: Interrupt 140 } 141 142 PENDING = Constant("PENDING") 143 ACCEPTED = Constant("ACCEPTED") 144 REJECTED = Constant("REJECTED") 145 146 STATUSES = { 147 PN_STATUS_ACCEPTED: ACCEPTED, 148 PN_STATUS_REJECTED: REJECTED, 149 PN_STATUS_PENDING: PENDING, 150 PN_STATUS_UNKNOWN: None 151 } 152 153 AUTOMATIC = Constant("AUTOMATIC") 154 MANUAL = Constant("MANUAL")
155 156 -class Messenger(object):
157 """ 158 The L{Messenger} class defines a high level interface for sending 159 and receiving L{Messages<Message>}. Every L{Messenger} contains a 160 single logical queue of incoming messages and a single logical queue 161 of outgoing messages. These messages in these queues may be destined 162 for, or originate from, a variety of addresses. 163 164 Address Syntax 165 ============== 166 167 An address has the following form:: 168 169 [ amqp[s]:// ] [user[:password]@] domain [/[name]] 170 171 Where domain can be one of:: 172 173 host | host:port | ip | ip:port | name 174 175 The following are valid examples of addresses: 176 177 - example.org 178 - example.org:1234 179 - amqp://example.org 180 - amqps://example.org 181 - example.org/incoming 182 - amqps://example.org/outgoing 183 - amqps://fred:trustno1@example.org 184 - 127.0.0.1:1234 185 - amqps://127.0.0.1:1234 186 187 Sending & Receiving Messages 188 ============================ 189 190 The L{Messenger} class works in conjuction with the L{Message} 191 class. The L{Message} class is a mutable holder of message content. 192 The L{put} method will encode the content in a given L{Message} 193 object into the outgoing message queue leaving that L{Message} 194 object free to be modified or discarded without having any impact on 195 the content in the outgoing queue. 196 197 >>> message = Message() 198 >>> for i in range(3): 199 ... message.address = "amqp://host/queue" 200 ... message.subject = "Hello World %i" % i 201 ... messenger.put(message) 202 >>> messenger.send() 203 204 Similarly, the L{get} method will decode the content in the incoming 205 message queue into the supplied L{Message} object. 206 207 >>> message = Message() 208 >>> messenger.recv(10): 209 >>> while messenger.incoming > 0: 210 ... messenger.get(message) 211 ... print message.subject 212 Hello World 0 213 Hello World 1 214 Hello World 2 215 """ 216
217 - def __init__(self, name=None):
218 """ 219 Construct a new L{Messenger} with the given name. The name has 220 global scope. If a NULL name is supplied, a L{uuid.UUID} based 221 name will be chosen. 222 223 @type name: string 224 @param name: the name of the messenger or None 225 """ 226 self._mng = pn_messenger(name)
227
228 - def __del__(self):
229 if hasattr(self, "_mng"): 230 pn_messenger_free(self._mng) 231 del self._mng
232
233 - def _check(self, err):
234 if err < 0: 235 if (err == PN_INPROGRESS): 236 return 237 exc = EXCEPTIONS.get(err, MessengerException) 238 raise exc("[%s]: %s" % (err, pn_error_text(pn_messenger_error(self._mng)))) 239 else: 240 return err
241 242 @property
243 - def name(self):
244 """ 245 The name of the L{Messenger}. 246 """ 247 return pn_messenger_name(self._mng)
248
249 - def _get_certificate(self):
250 return pn_messenger_get_certificate(self._mng)
251
252 - def _set_certificate(self, value):
253 self._check(pn_messenger_set_certificate(self._mng, value))
254 255 certificate = property(_get_certificate, _set_certificate, 256 doc=""" 257 Path to a certificate file for the L{Messenger}. This certificate is 258 used when the L{Messenger} accepts or establishes SSL/TLS connections. 259 This property must be specified for the L{Messenger} to accept 260 incoming SSL/TLS connections and to establish client authenticated 261 outgoing SSL/TLS connection. Non client authenticated outgoing SSL/TLS 262 connections do not require this property. 263 """) 264
265 - def _get_private_key(self):
266 return pn_messenger_get_private_key(self._mng)
267
268 - def _set_private_key(self, value):
269 self._check(pn_messenger_set_private_key(self._mng, value))
270 271 private_key = property(_get_private_key, _set_private_key, 272 doc=""" 273 Path to a private key file for the L{Messenger's<Messenger>} 274 certificate. This property must be specified for the L{Messenger} to 275 accept incoming SSL/TLS connections and to establish client 276 authenticated outgoing SSL/TLS connection. Non client authenticated 277 SSL/TLS connections do not require this property. 278 """) 279
280 - def _get_password(self):
281 return pn_messenger_get_password(self._mng)
282
283 - def _set_password(self, value):
284 self._check(pn_messenger_set_password(self._mng, value))
285 286 password = property(_get_password, _set_password, 287 doc=""" 288 This property contains the password for the L{Messenger.private_key} 289 file, or None if the file is not encrypted. 290 """) 291
292 - def _get_trusted_certificates(self):
293 return pn_messenger_get_trusted_certificates(self._mng)
294
295 - def _set_trusted_certificates(self, value):
296 self._check(pn_messenger_set_trusted_certificates(self._mng, value))
297 298 trusted_certificates = property(_get_trusted_certificates, 299 _set_trusted_certificates, 300 doc=""" 301 A path do a database of trusted certificates for use in verifying the 302 peer on an SSL/TLS connection. If this property is None, then the peer 303 will not be verified. 304 """) 305
306 - def _get_timeout(self):
307 t = pn_messenger_get_timeout(self._mng) 308 if t == -1: 309 return None 310 else: 311 return float(t)/1000
312
313 - def _set_timeout(self, value):
314 if value is None: 315 t = -1 316 else: 317 t = long(1000*value) 318 self._check(pn_messenger_set_timeout(self._mng, t))
319 320 timeout = property(_get_timeout, _set_timeout, 321 doc=""" 322 The timeout property contains the default timeout for blocking 323 operations performed by the L{Messenger}. 324 """) 325
326 - def _is_blocking(self):
327 return pn_messenger_is_blocking(self._mng)
328
329 - def _set_blocking(self, b):
330 self._check(pn_messenger_set_blocking(self._mng, b))
331 332 blocking = property(_is_blocking, _set_blocking) 333
334 - def _get_incoming_window(self):
335 return pn_messenger_get_incoming_window(self._mng)
336
337 - def _set_incoming_window(self, window):
338 self._check(pn_messenger_set_incoming_window(self._mng, window))
339 340 incoming_window = property(_get_incoming_window, _set_incoming_window, 341 doc=""" 342 The incoming tracking window for the messenger. The messenger will 343 track the remote status of this many incoming deliveries after they 344 have been accepted or rejected. Defaults to zero. 345 """) 346
347 - def _get_outgoing_window(self):
348 return pn_messenger_get_outgoing_window(self._mng)
349
350 - def _set_outgoing_window(self, window):
351 self._check(pn_messenger_set_outgoing_window(self._mng, window))
352 353 outgoing_window = property(_get_outgoing_window, _set_outgoing_window, 354 doc=""" 355 The outgoing tracking window for the messenger. The messenger will 356 track the remote status of this many outgoing deliveries after calling 357 send. Defaults to zero. 358 """) 359
360 - def start(self):
361 """ 362 Transitions the L{Messenger} to an active state. A L{Messenger} is 363 initially created in an inactive state. When inactive a 364 L{Messenger} will not send or receive messages from its internal 365 queues. A L{Messenger} must be started before calling L{send} or 366 L{recv}. 367 """ 368 self._check(pn_messenger_start(self._mng))
369
370 - def stop(self):
371 """ 372 Transitions the L{Messenger} to an inactive state. An inactive 373 L{Messenger} will not send or receive messages from its internal 374 queues. A L{Messenger} should be stopped before being discarded to 375 ensure a clean shutdown handshake occurs on any internally managed 376 connections. 377 """ 378 self._check(pn_messenger_stop(self._mng))
379 380 @property
381 - def stopped(self):
382 return pn_messenger_stopped(self._mng)
383
384 - def subscribe(self, source):
385 """ 386 Subscribes the L{Messenger} to messages originating from the 387 specified source. The source is an address as specified in the 388 L{Messenger} introduction with the following addition. If the 389 domain portion of the address begins with the '~' character, the 390 L{Messenger} will interpret the domain as host/port, bind to it, 391 and listen for incoming messages. For example "~0.0.0.0", 392 "amqp://~0.0.0.0", and "amqps://~0.0.0.0" will all bind to any 393 local interface and listen for incoming messages with the last 394 variant only permitting incoming SSL connections. 395 396 @type source: string 397 @param source: the source of messages to subscribe to 398 """ 399 sub_impl = pn_messenger_subscribe(self._mng, source) 400 if not sub_impl: 401 self._check(PN_ERR)
402
403 - def put(self, message):
404 """ 405 Places the content contained in the message onto the outgoing 406 queue of the L{Messenger}. This method will never block, however 407 it will send any unblocked L{Messages<Message>} in the outgoing 408 queue immediately and leave any blocked L{Messages<Message>} 409 remaining in the outgoing queue. The L{send} call may be used to 410 block until the outgoing queue is empty. The L{outgoing} property 411 may be used to check the depth of the outgoing queue. 412 413 @type message: Message 414 @param message: the message to place in the outgoing queue 415 @return: a tracker 416 """ 417 message._pre_encode() 418 self._check(pn_messenger_put(self._mng, message._msg)) 419 return pn_messenger_outgoing_tracker(self._mng)
420
421 - def status(self, tracker):
422 """ 423 Gets the last known remote state of the delivery associated with 424 the given tracker. 425 426 @type tracker: tracker 427 @param tracker: the tracker whose status is to be retrieved 428 429 @return: one of None, PENDING, REJECTED, or ACCEPTED 430 """ 431 disp = pn_messenger_status(self._mng, tracker); 432 return STATUSES.get(disp, disp)
433
434 - def settle(self, tracker=None):
435 if tracker is None: 436 tracker = pn_messenger_outgoing_tracker(self._mng) 437 flags = PN_CUMULATIVE 438 else: 439 flags = 0 440 self._check(pn_messenger_settle(self._mng, tracker, flags))
441
442 - def send(self, n=-1):
443 """ 444 Blocks until the outgoing queue is empty or the operation times 445 out. The L{timeout} property controls how long a L{Messenger} will 446 block before timing out. 447 """ 448 self._check(pn_messenger_send(self._mng, n))
449
450 - def recv(self, n=None):
451 """ 452 Receives up to I{n} messages into the incoming queue of the 453 L{Messenger}. If I{n} is not specified, L{Messenger} will receive as many 454 messages as it can buffer internally. This method will block until at least 455 one message is available or the operation times out. 456 """ 457 if n is None: 458 n = -1 459 self._check(pn_messenger_recv(self._mng, n))
460
461 - def work(self, timeout=None):
462 if timeout is None: 463 t = -1 464 else: 465 t = long(1000*timeout) 466 err = pn_messenger_work(self._mng, t) 467 if (err == PN_TIMEOUT): 468 return False 469 else: 470 self._check(err) 471 return True
472
473 - def interrupt(self):
474 self._check(pn_messenger_interrupt(self._mng))
475
476 - def get(self, message=None):
477 """ 478 Moves the message from the head of the incoming message queue into 479 the supplied message object. Any content in the message will be 480 overwritten. 481 482 @type message: Message 483 @param message: the destination message object 484 @return: a tracker 485 """ 486 if message is None: 487 impl = None 488 else: 489 impl = message._msg 490 self._check(pn_messenger_get(self._mng, impl)) 491 if message is not None: 492 message._post_decode() 493 return pn_messenger_incoming_tracker(self._mng)
494
495 - def accept(self, tracker=None):
496 """ 497 Accepts messages retreived from the incoming message queue. 498 499 @type tracker: tracker 500 @param tracker: a tracker as returned by get 501 """ 502 if tracker is None: 503 tracker = pn_messenger_incoming_tracker(self._mng) 504 flags = PN_CUMULATIVE 505 else: 506 flags = 0 507 self._check(pn_messenger_accept(self._mng, tracker, flags))
508
509 - def reject(self, tracker=None):
510 """ 511 Rejects messages retreived from the incoming message queue. 512 513 @type tracker: tracker 514 @param tracker: a tracker as returned by get 515 """ 516 if tracker is None: 517 tracker = pn_messenger_incoming_tracker(self._mng) 518 flags = PN_CUMULATIVE 519 else: 520 flags = 0 521 self._check(pn_messenger_reject(self._mng, tracker, flags))
522 523 @property
524 - def outgoing(self):
525 """ 526 The outgoing queue depth. 527 """ 528 return pn_messenger_outgoing(self._mng)
529 530 @property
531 - def incoming(self):
532 """ 533 The incoming queue depth. 534 """ 535 return pn_messenger_incoming(self._mng)
536
537 - def route(self, pattern, address):
538 self._check(pn_messenger_route(self._mng, pattern, address))
539
540 - def rewrite(self, pattern, address):
541 self._check(pn_messenger_rewrite(self._mng, pattern, address))
542
543 -class Message(object):
544 """ 545 The L{Message} class is a mutable holder of message content. 546 547 @ivar instructions: delivery instructions for the message 548 @type instructions: dict 549 @ivar annotations: infrastructure defined message annotations 550 @type annotations: dict 551 @ivar properties: application defined message properties 552 @type properties: dict 553 @ivar body: message body 554 @type body: bytes | unicode | dict | list | int | long | float | UUID 555 """ 556 557 DATA = PN_DATA 558 TEXT = PN_TEXT 559 AMQP = PN_AMQP 560 JSON = PN_JSON 561 562 DEFAULT_PRIORITY = PN_DEFAULT_PRIORITY 563
564 - def __init__(self):
565 self._msg = pn_message() 566 self._id = Data(pn_message_id(self._msg)) 567 self._correlation_id = Data(pn_message_correlation_id(self._msg)) 568 self.instructions = None 569 self.annotations = None 570 self.properties = None 571 self.body = None
572
573 - def __del__(self):
574 if hasattr(self, "_msg"): 575 pn_message_free(self._msg) 576 del self._msg
577
578 - def _check(self, err):
579 if err < 0: 580 exc = EXCEPTIONS.get(err, MessageException) 581 raise exc("[%s]: %s" % (err, pn_message_error(self._msg))) 582 else: 583 return err
584
585 - def _pre_encode(self):
586 inst = Data(pn_message_instructions(self._msg)) 587 ann = Data(pn_message_annotations(self._msg)) 588 props = Data(pn_message_properties(self._msg)) 589 body = Data(pn_message_body(self._msg)) 590 591 inst.clear() 592 if self.instructions is not None: 593 inst.put_object(self.instructions) 594 ann.clear() 595 if self.annotations is not None: 596 ann.put_object(self.annotations) 597 props.clear() 598 if self.properties is not None: 599 props.put_object(self.properties) 600 body.clear() 601 if self.body is not None: 602 body.put_object(self.body)
603
604 - def _post_decode(self):
605 inst = Data(pn_message_instructions(self._msg)) 606 ann = Data(pn_message_annotations(self._msg)) 607 props = Data(pn_message_properties(self._msg)) 608 body = Data(pn_message_body(self._msg)) 609 610 if inst.next(): 611 self.instructions = inst.get_object() 612 else: 613 self.instructions = None 614 if ann.next(): 615 self.annotations = ann.get_object() 616 else: 617 self.annotations = None 618 if props.next(): 619 self.properties = props.get_object() 620 else: 621 self.properties = None 622 if body.next(): 623 self.body = body.get_object() 624 else: 625 self.body = None
626
627 - def clear(self):
628 """ 629 Clears the contents of the L{Message}. All fields will be reset to 630 their default values. 631 """ 632 pn_message_clear(self._msg) 633 self.instructions = None 634 self.annotations = None 635 self.properties = None 636 self.body = None
637
638 - def _is_inferred(self):
639 return pn_message_is_inferred(self._msg)
640
641 - def _set_inferred(self, value):
642 self._check(pn_message_set_inferred(self._msg, bool(value)))
643 644 inferred = property(_is_inferred, _set_inferred) 645
646 - def _is_durable(self):
647 return pn_message_is_durable(self._msg)
648
649 - def _set_durable(self, value):
650 self._check(pn_message_set_durable(self._msg, bool(value)))
651 652 durable = property(_is_durable, _set_durable, 653 doc=""" 654 The durable property indicates that the message should be held durably 655 by any intermediaries taking responsibility for the message. 656 """) 657
658 - def _get_priority(self):
659 return pn_message_get_priority(self._msg)
660
661 - def _set_priority(self, value):
662 self._check(pn_message_set_priority(self._msg, value))
663 664 priority = property(_get_priority, _set_priority, 665 doc=""" 666 The priority of the message. 667 """) 668
669 - def _get_ttl(self):
670 return pn_message_get_ttl(self._msg)
671
672 - def _set_ttl(self, value):
673 self._check(pn_message_set_ttl(self._msg, value))
674 675 ttl = property(_get_ttl, _set_ttl, 676 doc=""" 677 The time to live of the message measured in milliseconds. Expired 678 messages may be dropped. 679 """) 680
681 - def _is_first_acquirer(self):
682 return pn_message_is_first_acquirer(self._msg)
683
684 - def _set_first_acquirer(self, value):
685 self._check(pn_message_set_first_acquirer(self._msg, bool(value)))
686 687 first_acquirer = property(_is_first_acquirer, _set_first_acquirer, 688 doc=""" 689 True iff the recipient is the first to acquire the message. 690 """) 691
692 - def _get_delivery_count(self):
693 return pn_message_get_delivery_count(self._msg)
694
695 - def _set_delivery_count(self, value):
696 self._check(pn_message_set_delivery_count(self._msg, value))
697 698 delivery_count = property(_get_delivery_count, _set_delivery_count, 699 doc=""" 700 The number of delivery attempts made for this message. 701 """) 702 703
704 - def _get_id(self):
705 return self._id.get_object()
706 - def _set_id(self, value):
707 if type(value) in (int, long): 708 value = ulong(value) 709 self._id.rewind() 710 self._id.put_object(value)
711 id = property(_get_id, _set_id, 712 doc=""" 713 The id of the message. 714 """) 715
716 - def _get_user_id(self):
717 return pn_message_get_user_id(self._msg)
718
719 - def _set_user_id(self, value):
720 self._check(pn_message_set_user_id(self._msg, value))
721 722 user_id = property(_get_user_id, _set_user_id, 723 doc=""" 724 The user id of the message creator. 725 """) 726
727 - def _get_address(self):
728 return pn_message_get_address(self._msg)
729
730 - def _set_address(self, value):
731 self._check(pn_message_set_address(self._msg, value))
732 733 address = property(_get_address, _set_address, 734 doc=""" 735 The address of the message. 736 """) 737
738 - def _get_subject(self):
739 return pn_message_get_subject(self._msg)
740
741 - def _set_subject(self, value):
742 self._check(pn_message_set_subject(self._msg, value))
743 744 subject = property(_get_subject, _set_subject, 745 doc=""" 746 The subject of the message. 747 """) 748
749 - def _get_reply_to(self):
750 return pn_message_get_reply_to(self._msg)
751
752 - def _set_reply_to(self, value):
753 self._check(pn_message_set_reply_to(self._msg, value))
754 755 reply_to = property(_get_reply_to, _set_reply_to, 756 doc=""" 757 The reply-to address for the message. 758 """) 759
760 - def _get_correlation_id(self):
761 return self._correlation_id.get_object()
762 - def _set_correlation_id(self, value):
763 if type(value) in (int, long): 764 value = ulong(value) 765 self._correlation_id.rewind() 766 self._correlation_id.put_object(value)
767 768 correlation_id = property(_get_correlation_id, _set_correlation_id, 769 doc=""" 770 The correlation-id for the message. 771 """) 772
773 - def _get_content_type(self):
774 return pn_message_get_content_type(self._msg)
775
776 - def _set_content_type(self, value):
777 self._check(pn_message_set_content_type(self._msg, value))
778 779 content_type = property(_get_content_type, _set_content_type, 780 doc=""" 781 The content-type of the message. 782 """) 783
784 - def _get_content_encoding(self):
785 return pn_message_get_content_encoding(self._msg)
786
787 - def _set_content_encoding(self, value):
788 self._check(pn_message_set_content_encoding(self._msg, value))
789 790 content_encoding = property(_get_content_encoding, _set_content_encoding, 791 doc=""" 792 The content-encoding of the message. 793 """) 794
795 - def _get_expiry_time(self):
796 return pn_message_get_expiry_time(self._msg)
797
798 - def _set_expiry_time(self, value):
799 self._check(pn_message_set_expiry_time(self._msg, value))
800 801 expiry_time = property(_get_expiry_time, _set_expiry_time, 802 doc=""" 803 The expiry time of the message. 804 """) 805
806 - def _get_creation_time(self):
807 return pn_message_get_creation_time(self._msg)
808
809 - def _set_creation_time(self, value):
810 self._check(pn_message_set_creation_time(self._msg, value))
811 812 creation_time = property(_get_creation_time, _set_creation_time, 813 doc=""" 814 The creation time of the message. 815 """) 816
817 - def _get_group_id(self):
818 return pn_message_get_group_id(self._msg)
819
820 - def _set_group_id(self, value):
821 self._check(pn_message_set_group_id(self._msg, value))
822 823 group_id = property(_get_group_id, _set_group_id, 824 doc=""" 825 The group id of the message. 826 """) 827
828 - def _get_group_sequence(self):
829 return pn_message_get_group_sequence(self._msg)
830
831 - def _set_group_sequence(self, value):
832 self._check(pn_message_set_group_sequence(self._msg, value))
833 834 group_sequence = property(_get_group_sequence, _set_group_sequence, 835 doc=""" 836 The sequence of the message within its group. 837 """) 838
839 - def _get_reply_to_group_id(self):
840 return pn_message_get_reply_to_group_id(self._msg)
841
842 - def _set_reply_to_group_id(self, value):
843 self._check(pn_message_set_reply_to_group_id(self._msg, value))
844 845 reply_to_group_id = property(_get_reply_to_group_id, _set_reply_to_group_id, 846 doc=""" 847 The group-id for any replies. 848 """) 849 850 # XXX
851 - def _get_format(self):
852 return pn_message_get_format(self._msg)
853
854 - def _set_format(self, value):
855 self._check(pn_message_set_format(self._msg, value))
856 857 format = property(_get_format, _set_format, 858 doc=""" 859 The format of the message. 860 """) 861
862 - def encode(self):
863 self._pre_encode() 864 sz = 16 865 while True: 866 err, data = pn_message_encode(self._msg, sz) 867 if err == PN_OVERFLOW: 868 sz *= 2 869 continue 870 else: 871 self._check(err) 872 return data
873
874 - def decode(self, data):
875 self._check(pn_message_decode(self._msg, data, len(data))) 876 self._post_decode()
877
878 - def load(self, data):
879 self._check(pn_message_load(self._msg, data))
880
881 - def save(self):
882 sz = 16 883 while True: 884 err, data = pn_message_save(self._msg, sz) 885 if err == PN_OVERFLOW: 886 sz *= 2 887 continue 888 else: 889 self._check(err) 890 return data
891
892 - def __repr2__(self):
893 props = [] 894 for attr in ("inferred", "address", "reply_to", "durable", "ttl", 895 "priority", "first_acquirer", "delivery_count", "id", 896 "correlation_id", "user_id", "group_id", "group_sequence", 897 "reply_to_group_id", "instructions", "annotations", 898 "properties", "body"): 899 value = getattr(self, attr) 900 if value: props.append("%s=%r" % (attr, value)) 901 return "Message(%s)" % ", ".join(props)
902
903 - def __repr__(self):
904 tmp = pn_string(None) 905 err = pn_inspect(self._msg, tmp) 906 result = pn_string_get(tmp) 907 pn_free(tmp) 908 self._check(err) 909 return result
910
911 -class DataException(ProtonException):
912 """ 913 The DataException class is the root of the Data exception hierarchy. 914 All exceptions raised by the Data class extend this exception. 915 """ 916 pass
917
918 -class UnmappedType:
919
920 - def __init__(self, msg):
921 self.msg = msg
922
923 - def __repr__(self):
924 return "UnmappedType(%s)" % self.msg
925
926 -class ulong(long):
927
928 - def __repr__(self):
929 return "ulong(%s)" % long.__repr__(self)
930
931 -class timestamp(long):
932
933 - def __repr__(self):
934 return "timestamp(%s)" % long.__repr__(self)
935
936 -class symbol(unicode):
937
938 - def __repr__(self):
939 return "symbol(%s)" % unicode.__repr__(self)
940
941 -class char(unicode):
942
943 - def __repr__(self):
944 return "char(%s)" % unicode.__repr__(self)
945
946 -class Described(object):
947
948 - def __init__(self, descriptor, value):
949 self.descriptor = descriptor 950 self.value = value
951
952 - def __repr__(self):
953 return "Described(%r, %r)" % (self.descriptor, self.value)
954
955 - def __eq__(self, o):
956 if isinstance(o, Described): 957 return self.descriptor == o.descriptor and self.value == o.value 958 else: 959 return False
960 961 UNDESCRIBED = Constant("UNDESCRIBED")
962 963 -class Array(object):
964
965 - def __init__(self, descriptor, type, *elements):
966 self.descriptor = descriptor 967 self.type = type 968 self.elements = elements
969
970 - def __repr__(self):
971 if self.elements: 972 els = ", %s" % (", ".join(map(repr, self.elements))) 973 else: 974 els = "" 975 return "Array(%r, %r%s)" % (self.descriptor, self.type, els)
976
977 - def __eq__(self, o):
978 if isinstance(o, Array): 979 return self.descriptor == o.descriptor and \ 980 self.type == o.type and self.elements == o.elements 981 else: 982 return False
983
984 -class Data:
985 """ 986 The L{Data} class provides an interface for decoding, extracting, 987 creating, and encoding arbitrary AMQP data. A L{Data} object 988 contains a tree of AMQP values. Leaf nodes in this tree correspond 989 to scalars in the AMQP type system such as L{ints<INT>} or 990 L{strings<STRING>}. Non-leaf nodes in this tree correspond to 991 compound values in the AMQP type system such as L{lists<LIST>}, 992 L{maps<MAP>}, L{arrays<ARRAY>}, or L{described values<DESCRIBED>}. 993 The root node of the tree is the L{Data} object itself and can have 994 an arbitrary number of children. 995 996 A L{Data} object maintains the notion of the current sibling node 997 and a current parent node. Siblings are ordered within their parent. 998 Values are accessed and/or added by using the L{next}, L{prev}, 999 L{enter}, and L{exit} methods to navigate to the desired location in 1000 the tree and using the supplied variety of put_*/get_* methods to 1001 access or add a value of the desired type. 1002 1003 The put_* methods will always add a value I{after} the current node 1004 in the tree. If the current node has a next sibling the put_* method 1005 will overwrite the value on this node. If there is no current node 1006 or the current node has no next sibling then one will be added. The 1007 put_* methods always set the added/modified node to the current 1008 node. The get_* methods read the value of the current node and do 1009 not change which node is current. 1010 1011 The following types of scalar values are supported: 1012 1013 - L{NULL} 1014 - L{BOOL} 1015 - L{UBYTE} 1016 - L{USHORT} 1017 - L{SHORT} 1018 - L{UINT} 1019 - L{INT} 1020 - L{ULONG} 1021 - L{LONG} 1022 - L{FLOAT} 1023 - L{DOUBLE} 1024 - L{BINARY} 1025 - L{STRING} 1026 - L{SYMBOL} 1027 1028 The following types of compound values are supported: 1029 1030 - L{DESCRIBED} 1031 - L{ARRAY} 1032 - L{LIST} 1033 - L{MAP} 1034 """ 1035 1036 NULL = PN_NULL; "A null value." 1037 BOOL = PN_BOOL; "A boolean value." 1038 UBYTE = PN_UBYTE; "An unsigned byte value." 1039 BYTE = PN_BYTE; "A signed byte value." 1040 USHORT = PN_USHORT; "An unsigned short value." 1041 SHORT = PN_SHORT; "A short value." 1042 UINT = PN_UINT; "An unsigned int value." 1043 INT = PN_INT; "A signed int value." 1044 CHAR = PN_CHAR; "A character value." 1045 ULONG = PN_ULONG; "An unsigned long value." 1046 LONG = PN_LONG; "A signed long value." 1047 TIMESTAMP = PN_TIMESTAMP; "A timestamp value." 1048 FLOAT = PN_FLOAT; "A float value." 1049 DOUBLE = PN_DOUBLE; "A double value." 1050 DECIMAL32 = PN_DECIMAL32; "A DECIMAL32 value." 1051 DECIMAL64 = PN_DECIMAL64; "A DECIMAL64 value." 1052 DECIMAL128 = PN_DECIMAL128; "A DECIMAL128 value." 1053 UUID = PN_UUID; "A UUID value." 1054 BINARY = PN_BINARY; "A binary string." 1055 STRING = PN_STRING; "A unicode string." 1056 SYMBOL = PN_SYMBOL; "A symbolic string." 1057 DESCRIBED = PN_DESCRIBED; "A described value." 1058 ARRAY = PN_ARRAY; "An array value." 1059 LIST = PN_LIST; "A list value." 1060 MAP = PN_MAP; "A map value." 1061 1062 type_names = { 1063 NULL: "null", 1064 BOOL: "bool", 1065 BYTE: "byte", 1066 UBYTE: "ubyte", 1067 SHORT: "short", 1068 USHORT: "ushort", 1069 INT: "int", 1070 UINT: "uint", 1071 CHAR: "char", 1072 LONG: "long", 1073 ULONG: "ulong", 1074 TIMESTAMP: "timestamp", 1075 FLOAT: "float", 1076 DOUBLE: "double", 1077 DECIMAL32: "decimal32", 1078 DECIMAL64: "decimal64", 1079 DECIMAL128: "decimal128", 1080 UUID: "uuid", 1081 BINARY: "binary", 1082 STRING: "string", 1083 SYMBOL: "symbol", 1084 DESCRIBED: "described", 1085 ARRAY: "array", 1086 LIST: "list", 1087 MAP: "map" 1088 } 1089 1090 @classmethod
1091 - def type_name(type): return Data.type_names[type]
1092
1093 - def __init__(self, capacity=16):
1094 if type(capacity) in (int, long): 1095 self._data = pn_data(capacity) 1096 self._free = True 1097 else: 1098 self._data = capacity 1099 self._free = False
1100
1101 - def __del__(self):
1102 if self._free and hasattr(self, "_data"): 1103 pn_data_free(self._data) 1104 del self._data
1105
1106 - def _check(self, err):
1107 if err < 0: 1108 exc = EXCEPTIONS.get(err, DataException) 1109 raise exc("[%s]: %s" % (err, pn_data_error(self._data))) 1110 else: 1111 return err
1112
1113 - def clear(self):
1114 """ 1115 Clears the data object. 1116 """ 1117 pn_data_clear(self._data)
1118
1119 - def rewind(self):
1120 """ 1121 Clears current node and sets the parent to the root node. Clearing the 1122 current node sets it _before_ the first node, calling next() will advance to 1123 the first node. 1124 """ 1125 pn_data_rewind(self._data)
1126
1127 - def next(self):
1128 """ 1129 Advances the current node to its next sibling and returns its 1130 type. If there is no next sibling the current node remains 1131 unchanged and None is returned. 1132 """ 1133 found = pn_data_next(self._data) 1134 if found: 1135 return self.type() 1136 else: 1137 return None
1138
1139 - def prev(self):
1140 """ 1141 Advances the current node to its previous sibling and returns its 1142 type. If there is no previous sibling the current node remains 1143 unchanged and None is returned. 1144 """ 1145 found = pn_data_prev(self._data) 1146 if found: 1147 return self.type() 1148 else: 1149 return None
1150
1151 - def enter(self):
1152 """ 1153 Sets the parent node to the current node and clears the current node. 1154 Clearing the current node sets it _before_ the first child, 1155 call next() advances to the first child. 1156 """ 1157 return pn_data_enter(self._data)
1158
1159 - def exit(self):
1160 """ 1161 Sets the current node to the parent node and the parent node to 1162 its own parent. 1163 """ 1164 return pn_data_exit(self._data)
1165
1166 - def lookup(self, name):
1167 return pn_data_lookup(self._data, name)
1168
1169 - def narrow(self):
1170 pn_data_narrow(self._data)
1171
1172 - def widen(self):
1173 pn_data_widen(self._data)
1174
1175 - def type(self):
1176 """ 1177 Returns the type of the current node. 1178 """ 1179 dtype = pn_data_type(self._data) 1180 if dtype == -1: 1181 return None 1182 else: 1183 return dtype
1184
1185 - def encode(self):
1186 """ 1187 Returns a representation of the data encoded in AMQP format. 1188 """ 1189 size = 1024 1190 while True: 1191 cd, enc = pn_data_encode(self._data, size) 1192 if cd == PN_OVERFLOW: 1193 size *= 2 1194 elif cd >= 0: 1195 return enc 1196 else: 1197 self._check(cd)
1198
1199 - def decode(self, encoded):
1200 """ 1201 Decodes the first value from supplied AMQP data and returns the 1202 number of bytes consumed. 1203 1204 @type encoded: binary 1205 @param encoded: AMQP encoded binary data 1206 """ 1207 return self._check(pn_data_decode(self._data, encoded))
1208
1209 - def put_list(self):
1210 """ 1211 Puts a list value. Elements may be filled by entering the list 1212 node and putting element values. 1213 1214 >>> data = Data() 1215 >>> data.put_list() 1216 >>> data.enter() 1217 >>> data.put_int(1) 1218 >>> data.put_int(2) 1219 >>> data.put_int(3) 1220 >>> data.exit() 1221 """ 1222 self._check(pn_data_put_list(self._data))
1223
1224 - def put_map(self):
1225 """ 1226 Puts a map value. Elements may be filled by entering the map node 1227 and putting alternating key value pairs. 1228 1229 >>> data = Data() 1230 >>> data.put_map() 1231 >>> data.enter() 1232 >>> data.put_string("key") 1233 >>> data.put_string("value") 1234 >>> data.exit() 1235 """ 1236 self._check(pn_data_put_map(self._data))
1237
1238 - def put_array(self, described, element_type):
1239 """ 1240 Puts an array value. Elements may be filled by entering the array 1241 node and putting the element values. The values must all be of the 1242 specified array element type. If an array is described then the 1243 first child value of the array is the descriptor and may be of any 1244 type. 1245 1246 >>> data = Data() 1247 >>> 1248 >>> data.put_array(False, Data.INT) 1249 >>> data.enter() 1250 >>> data.put_int(1) 1251 >>> data.put_int(2) 1252 >>> data.put_int(3) 1253 >>> data.exit() 1254 >>> 1255 >>> data.put_array(True, Data.DOUBLE) 1256 >>> data.enter() 1257 >>> data.put_symbol("array-descriptor") 1258 >>> data.put_double(1.1) 1259 >>> data.put_double(1.2) 1260 >>> data.put_double(1.3) 1261 >>> data.exit() 1262 1263 @type described: bool 1264 @param described: specifies whether the array is described 1265 @type element_type: int 1266 @param element_type: the type of the array elements 1267 """ 1268 self._check(pn_data_put_array(self._data, described, element_type))
1269
1270 - def put_described(self):
1271 """ 1272 Puts a described value. A described node has two children, the 1273 descriptor and the value. These are specified by entering the node 1274 and putting the desired values. 1275 1276 >>> data = Data() 1277 >>> data.put_described() 1278 >>> data.enter() 1279 >>> data.put_symbol("value-descriptor") 1280 >>> data.put_string("the value") 1281 >>> data.exit() 1282 """ 1283 self._check(pn_data_put_described(self._data))
1284
1285 - def put_null(self):
1286 """ 1287 Puts a null value. 1288 """ 1289 self._check(pn_data_put_null(self._data))
1290
1291 - def put_bool(self, b):
1292 """ 1293 Puts a boolean value. 1294 1295 @param b: a boolean value 1296 """ 1297 self._check(pn_data_put_bool(self._data, b))
1298
1299 - def put_ubyte(self, ub):
1300 """ 1301 Puts an unsigned byte value. 1302 1303 @param ub: an integral value 1304 """ 1305 self._check(pn_data_put_ubyte(self._data, ub))
1306
1307 - def put_byte(self, b):
1308 """ 1309 Puts a signed byte value. 1310 1311 @param b: an integral value 1312 """ 1313 self._check(pn_data_put_byte(self._data, b))
1314
1315 - def put_ushort(self, us):
1316 """ 1317 Puts an unsigned short value. 1318 1319 @param us: an integral value. 1320 """ 1321 self._check(pn_data_put_ushort(self._data, us))
1322
1323 - def put_short(self, s):
1324 """ 1325 Puts a signed short value. 1326 1327 @param s: an integral value 1328 """ 1329 self._check(pn_data_put_short(self._data, s))
1330
1331 - def put_uint(self, ui):
1332 """ 1333 Puts an unsigned int value. 1334 1335 @param ui: an integral value 1336 """ 1337 self._check(pn_data_put_uint(self._data, ui))
1338
1339 - def put_int(self, i):
1340 """ 1341 Puts a signed int value. 1342 1343 @param i: an integral value 1344 """ 1345 self._check(pn_data_put_int(self._data, i))
1346
1347 - def put_char(self, c):
1348 """ 1349 Puts a char value. 1350 1351 @param c: a single character 1352 """ 1353 self._check(pn_data_put_char(self._data, ord(c)))
1354
1355 - def put_ulong(self, ul):
1356 """ 1357 Puts an unsigned long value. 1358 1359 @param ul: an integral value 1360 """ 1361 self._check(pn_data_put_ulong(self._data, ul))
1362
1363 - def put_long(self, l):
1364 """ 1365 Puts a signed long value. 1366 1367 @param l: an integral value 1368 """ 1369 self._check(pn_data_put_long(self._data, l))
1370
1371 - def put_timestamp(self, t):
1372 """ 1373 Puts a timestamp value. 1374 1375 @param t: an integral value 1376 """ 1377 self._check(pn_data_put_timestamp(self._data, t))
1378
1379 - def put_float(self, f):
1380 """ 1381 Puts a float value. 1382 1383 @param f: a floating point value 1384 """ 1385 self._check(pn_data_put_float(self._data, f))
1386
1387 - def put_double(self, d):
1388 """ 1389 Puts a double value. 1390 1391 @param d: a floating point value. 1392 """ 1393 self._check(pn_data_put_double(self._data, d))
1394
1395 - def put_decimal32(self, d):
1396 """ 1397 Puts a decimal32 value. 1398 1399 @param d: a decimal32 value 1400 """ 1401 self._check(pn_data_put_decimal32(self._data, d))
1402
1403 - def put_decimal64(self, d):
1404 """ 1405 Puts a decimal64 value. 1406 1407 @param d: a decimal64 value 1408 """ 1409 self._check(pn_data_put_decimal64(self._data, d))
1410
1411 - def put_decimal128(self, d):
1412 """ 1413 Puts a decimal128 value. 1414 1415 @param d: a decimal128 value 1416 """ 1417 self._check(pn_data_put_decimal128(self._data, d))
1418
1419 - def put_uuid(self, u):
1420 """ 1421 Puts a UUID value. 1422 1423 @param u: a uuid value 1424 """ 1425 self._check(pn_data_put_uuid(self._data, u.bytes))
1426
1427 - def put_binary(self, b):
1428 """ 1429 Puts a binary value. 1430 1431 @type b: binary 1432 @param b: a binary value 1433 """ 1434 self._check(pn_data_put_binary(self._data, b))
1435
1436 - def put_string(self, s):
1437 """ 1438 Puts a unicode value. 1439 1440 @type s: unicode 1441 @param s: a unicode value 1442 """ 1443 self._check(pn_data_put_string(self._data, s.encode("utf8")))
1444
1445 - def put_symbol(self, s):
1446 """ 1447 Puts a symbolic value. 1448 1449 @type s: string 1450 @param s: the symbol name 1451 """ 1452 self._check(pn_data_put_symbol(self._data, s))
1453
1454 - def get_list(self):
1455 """ 1456 If the current node is a list, return the number of elements, 1457 otherwise return zero. List elements can be accessed by entering 1458 the list. 1459 1460 >>> count = data.get_list() 1461 >>> data.enter() 1462 >>> for i in range(count): 1463 ... type = data.next() 1464 ... if type == Data.STRING: 1465 ... print data.get_string() 1466 ... elif type == ...: 1467 ... ... 1468 >>> data.exit() 1469 """ 1470 return pn_data_get_list(self._data)
1471
1472 - def get_map(self):
1473 """ 1474 If the current node is a map, return the number of child elements, 1475 otherwise return zero. Key value pairs can be accessed by entering 1476 the map. 1477 1478 >>> count = data.get_map() 1479 >>> data.enter() 1480 >>> for i in range(count/2): 1481 ... type = data.next() 1482 ... if type == Data.STRING: 1483 ... print data.get_string() 1484 ... elif type == ...: 1485 ... ... 1486 >>> data.exit() 1487 """ 1488 return pn_data_get_map(self._data)
1489
1490 - def get_array(self):
1491 """ 1492 If the current node is an array, return a tuple of the element 1493 count, a boolean indicating whether the array is described, and 1494 the type of each element, otherwise return (0, False, None). Array 1495 data can be accessed by entering the array. 1496 1497 >>> # read an array of strings with a symbolic descriptor 1498 >>> count, described, type = data.get_array() 1499 >>> data.enter() 1500 >>> data.next() 1501 >>> print "Descriptor:", data.get_symbol() 1502 >>> for i in range(count): 1503 ... data.next() 1504 ... print "Element:", data.get_string() 1505 >>> data.exit() 1506 """ 1507 count = pn_data_get_array(self._data) 1508 described = pn_data_is_array_described(self._data) 1509 type = pn_data_get_array_type(self._data) 1510 if type == -1: 1511 type = None 1512 return count, described, type
1513
1514 - def is_described(self):
1515 """ 1516 Checks if the current node is a described value. The descriptor 1517 and value may be accessed by entering the described value. 1518 1519 >>> # read a symbolically described string 1520 >>> assert data.is_described() # will error if the current node is not described 1521 >>> data.enter() 1522 >>> print data.get_symbol() 1523 >>> print data.get_string() 1524 >>> data.exit() 1525 """ 1526 return pn_data_is_described(self._data)
1527
1528 - def is_null(self):
1529 """ 1530 Checks if the current node is a null. 1531 """ 1532 self._check(pn_data_get_null(self._data))
1533
1534 - def get_bool(self):
1535 """ 1536 If the current node is a boolean, returns its value, returns False 1537 otherwise. 1538 """ 1539 return pn_data_get_bool(self._data)
1540
1541 - def get_ubyte(self):
1542 """ 1543 If the current node is an unsigned byte, returns its value, 1544 returns 0 otherwise. 1545 """ 1546 return pn_data_get_ubyte(self._data)
1547
1548 - def get_byte(self):
1549 """ 1550 If the current node is a signed byte, returns its value, returns 0 1551 otherwise. 1552 """ 1553 return pn_data_get_byte(self._data)
1554
1555 - def get_ushort(self):
1556 """ 1557 If the current node is an unsigned short, returns its value, 1558 returns 0 otherwise. 1559 """ 1560 return pn_data_get_ushort(self._data)
1561
1562 - def get_short(self):
1563 """ 1564 If the current node is a signed short, returns its value, returns 1565 0 otherwise. 1566 """ 1567 return pn_data_get_short(self._data)
1568
1569 - def get_uint(self):
1570 """ 1571 If the current node is an unsigned int, returns its value, returns 1572 0 otherwise. 1573 """ 1574 return pn_data_get_uint(self._data)
1575
1576 - def get_int(self):
1577 """ 1578 If the current node is a signed int, returns its value, returns 0 1579 otherwise. 1580 """ 1581 return pn_data_get_int(self._data)
1582
1583 - def get_char(self):
1584 """ 1585 If the current node is a char, returns its value, returns 0 1586 otherwise. 1587 """ 1588 return char(unichr(pn_data_get_char(self._data)))
1589
1590 - def get_ulong(self):
1591 """ 1592 If the current node is an unsigned long, returns its value, 1593 returns 0 otherwise. 1594 """ 1595 return ulong(pn_data_get_ulong(self._data))
1596
1597 - def get_long(self):
1598 """ 1599 If the current node is an signed long, returns its value, returns 1600 0 otherwise. 1601 """ 1602 return pn_data_get_long(self._data)
1603
1604 - def get_timestamp(self):
1605 """ 1606 If the current node is a timestamp, returns its value, returns 0 1607 otherwise. 1608 """ 1609 return timestamp(pn_data_get_timestamp(self._data))
1610
1611 - def get_float(self):
1612 """ 1613 If the current node is a float, returns its value, raises 0 1614 otherwise. 1615 """ 1616 return pn_data_get_float(self._data)
1617
1618 - def get_double(self):
1619 """ 1620 If the current node is a double, returns its value, returns 0 1621 otherwise. 1622 """ 1623 return pn_data_get_double(self._data)
1624 1625 # XXX: need to convert
1626 - def get_decimal32(self):
1627 """ 1628 If the current node is a decimal32, returns its value, returns 0 1629 otherwise. 1630 """ 1631 return pn_data_get_decimal32(self._data)
1632 1633 # XXX: need to convert
1634 - def get_decimal64(self):
1635 """ 1636 If the current node is a decimal64, returns its value, returns 0 1637 otherwise. 1638 """ 1639 return pn_data_get_decimal64(self._data)
1640 1641 # XXX: need to convert
1642 - def get_decimal128(self):
1643 """ 1644 If the current node is a decimal128, returns its value, returns 0 1645 otherwise. 1646 """ 1647 return pn_data_get_decimal128(self._data)
1648
1649 - def get_uuid(self):
1650 """ 1651 If the current node is a UUID, returns its value, returns None 1652 otherwise. 1653 """ 1654 if pn_data_type(self._data) == Data.UUID: 1655 return uuid.UUID(bytes=pn_data_get_uuid(self._data)) 1656 else: 1657 return None
1658
1659 - def get_binary(self):
1660 """ 1661 If the current node is binary, returns its value, returns "" 1662 otherwise. 1663 """ 1664 return pn_data_get_binary(self._data)
1665
1666 - def get_string(self):
1667 """ 1668 If the current node is a string, returns its value, returns "" 1669 otherwise. 1670 """ 1671 return pn_data_get_string(self._data).decode("utf8")
1672
1673 - def get_symbol(self):
1674 """ 1675 If the current node is a symbol, returns its value, returns "" 1676 otherwise. 1677 """ 1678 return symbol(pn_data_get_symbol(self._data))
1679
1680 - def copy(self, src):
1681 self._check(pn_data_copy(self._data, src._data))
1682
1683 - def format(self):
1684 sz = 16 1685 while True: 1686 err, result = pn_data_format(self._data, sz) 1687 if err == PN_OVERFLOW: 1688 sz *= 2 1689 continue 1690 else: 1691 self._check(err) 1692 return result
1693
1694 - def dump(self):
1695 pn_data_dump(self._data)
1696
1697 - def put_dict(self, d):
1698 self.put_map() 1699 self.enter() 1700 try: 1701 for k, v in d.items(): 1702 self.put_object(k) 1703 self.put_object(v) 1704 finally: 1705 self.exit()
1706
1707 - def get_dict(self):
1708 if self.enter(): 1709 try: 1710 result = {} 1711 while self.next(): 1712 k = self.get_object() 1713 if self.next(): 1714 v = self.get_object() 1715 else: 1716 v = None 1717 result[k] = v 1718 finally: 1719 self.exit() 1720 return result
1721
1722 - def put_sequence(self, s):
1723 self.put_list() 1724 self.enter() 1725 try: 1726 for o in s: 1727 self.put_object(o) 1728 finally: 1729 self.exit()
1730
1731 - def get_sequence(self):
1732 if self.enter(): 1733 try: 1734 result = [] 1735 while self.next(): 1736 result.append(self.get_object()) 1737 finally: 1738 self.exit() 1739 return result
1740
1741 - def get_py_described(self):
1742 if self.enter(): 1743 try: 1744 self.next() 1745 descriptor = self.get_object() 1746 self.next() 1747 value = self.get_object() 1748 finally: 1749 self.exit() 1750 return Described(descriptor, value)
1751
1752 - def put_py_described(self, d):
1753 self.put_described() 1754 self.enter() 1755 try: 1756 self.put_object(d.descriptor) 1757 self.put_object(d.value) 1758 finally: 1759 self.exit()
1760
1761 - def get_py_array(self):
1762 """ 1763 If the current node is an array, return an Array object 1764 representing the array and its contents. Otherwise return None. 1765 This is a convenience wrapper around get_array, enter, etc. 1766 """ 1767 1768 count, described, type = self.get_array() 1769 if type is None: return None 1770 if self.enter(): 1771 try: 1772 if described: 1773 self.next() 1774 descriptor = self.get_object() 1775 else: 1776 descriptor = UNDESCRIBED 1777 elements = [] 1778 while self.next(): 1779 elements.append(self.get_object()) 1780 finally: 1781 self.exit() 1782 return Array(descriptor, type, *elements)
1783
1784 - def put_py_array(self, a):
1785 described = a.descriptor != UNDESCRIBED 1786 self.put_array(described, a.type) 1787 self.enter() 1788 try: 1789 if described: 1790 self.put_object(a.descriptor) 1791 for e in a.elements: 1792 self.put_object(e) 1793 finally: 1794 self.exit()
1795 1796 put_mappings = { 1797 None.__class__: lambda s, _: s.put_null(), 1798 bool: put_bool, 1799 dict: put_dict, 1800 list: put_sequence, 1801 tuple: put_sequence, 1802 unicode: put_string, 1803 bytes: put_binary, 1804 symbol: put_symbol, 1805 int: put_long, 1806 char: put_char, 1807 long: put_long, 1808 ulong: put_ulong, 1809 timestamp: put_timestamp, 1810 float: put_double, 1811 uuid.UUID: put_uuid, 1812 Described: put_py_described, 1813 Array: put_py_array 1814 } 1815 get_mappings = { 1816 NULL: lambda s: None, 1817 BOOL: get_bool, 1818 BYTE: get_byte, 1819 UBYTE: get_ubyte, 1820 SHORT: get_short, 1821 USHORT: get_ushort, 1822 INT: get_int, 1823 UINT: get_uint, 1824 CHAR: get_char, 1825 LONG: get_long, 1826 ULONG: get_ulong, 1827 TIMESTAMP: get_timestamp, 1828 FLOAT: get_float, 1829 DOUBLE: get_double, 1830 DECIMAL32: get_decimal32, 1831 DECIMAL64: get_decimal64, 1832 DECIMAL128: get_decimal128, 1833 UUID: get_uuid, 1834 BINARY: get_binary, 1835 STRING: get_string, 1836 SYMBOL: get_symbol, 1837 DESCRIBED: get_py_described, 1838 ARRAY: get_py_array, 1839 LIST: get_sequence, 1840 MAP: get_dict 1841 } 1842 1843
1844 - def put_object(self, obj):
1845 putter = self.put_mappings[obj.__class__] 1846 putter(self, obj)
1847
1848 - def get_object(self):
1849 type = self.type() 1850 if type is None: return None 1851 getter = self.get_mappings.get(type) 1852 if getter: 1853 return getter(self) 1854 else: 1855 return UnmappedType(str(type))
1856
1857 -class ConnectionException(ProtonException):
1858 pass
1859
1860 -class Endpoint(object):
1861 1862 LOCAL_UNINIT = PN_LOCAL_UNINIT 1863 REMOTE_UNINIT = PN_REMOTE_UNINIT 1864 LOCAL_ACTIVE = PN_LOCAL_ACTIVE 1865 REMOTE_ACTIVE = PN_REMOTE_ACTIVE 1866 LOCAL_CLOSED = PN_LOCAL_CLOSED 1867 REMOTE_CLOSED = PN_REMOTE_CLOSED 1868
1869 - def __init__(self):
1870 self.condition = None
1871
1872 - def _update_cond(self):
1873 obj2cond(self.condition, self._get_cond_impl())
1874 1875 @property
1876 - def remote_condition(self):
1877 return cond2obj(self._get_remote_cond_impl())
1878
1879 -class Condition:
1880
1881 - def __init__(self, name, description=None, info=None):
1882 self.name = name 1883 self.description = description 1884 self.info = info
1885
1886 - def __repr__(self):
1887 return "Condition(%s)" % ", ".join([repr(x) for x in 1888 (self.name, self.description, self.info) 1889 if x])
1890
1891 - def __eq__(self, o):
1892 if not isinstance(o, Condition): return False 1893 return self.name == o.name and \ 1894 self.description == o.description and \ 1895 self.info == o.info
1896
1897 -def obj2cond(obj, cond):
1898 pn_condition_clear(cond) 1899 if obj: 1900 pn_condition_set_name(cond, str(obj.name)) 1901 pn_condition_set_description(cond, obj.description) 1902 info = Data(pn_condition_info(cond)) 1903 if obj.info: 1904 info.put_object(obj.info)
1905
1906 -def cond2obj(cond):
1907 if pn_condition_is_set(cond): 1908 return Condition(pn_condition_get_name(cond), 1909 pn_condition_get_description(cond), 1910 dat2obj(pn_condition_info(cond))) 1911 else: 1912 return None
1913
1914 -def dat2obj(dimpl):
1915 d = Data(dimpl) 1916 d.rewind() 1917 d.next() 1918 obj = d.get_object() 1919 d.rewind() 1920 return obj
1921
1922 -def obj2dat(obj, dimpl):
1923 if obj is not None: 1924 d = Data(dimpl) 1925 d.put_object(obj)
1926
1927 -def wrap_connection(conn):
1928 if not conn: return None 1929 ctx = pn_connection_get_context(conn) 1930 if ctx: return ctx 1931 wrapper = Connection(_conn=conn) 1932 return wrapper
1933
1934 -class Connection(Endpoint):
1935
1936 - def __init__(self, _conn=None):
1937 Endpoint.__init__(self) 1938 if _conn: 1939 self._conn = _conn 1940 else: 1941 self._conn = pn_connection() 1942 pn_connection_set_context(self._conn, self) 1943 self.offered_capabilities = None 1944 self.desired_capabilities = None 1945 self.properties = None
1946
1947 - def __del__(self):
1948 if hasattr(self, "_conn"): 1949 pn_connection_free(self._conn) 1950 del self._conn
1951
1952 - def _check(self, err):
1953 if err < 0: 1954 exc = EXCEPTIONS.get(err, ConnectionException) 1955 raise exc("[%s]: %s" % (err, pn_connection_error(self._conn))) 1956 else: 1957 return err
1958
1959 - def _get_cond_impl(self):
1960 return pn_connection_condition(self._conn)
1961
1962 - def _get_remote_cond_impl(self):
1963 return pn_connection_remote_condition(self._conn)
1964
1965 - def _get_container(self):
1966 return pn_connection_get_container(self._conn)
1967 - def _set_container(self, name):
1968 return pn_connection_set_container(self._conn, name)
1969 1970 container = property(_get_container, _set_container) 1971
1972 - def _get_hostname(self):
1973 return pn_connection_get_hostname(self._conn)
1974 - def _set_hostname(self, name):
1975 return pn_connection_set_hostname(self._conn, name)
1976 1977 hostname = property(_get_hostname, _set_hostname) 1978 1979 @property
1980 - def remote_container(self):
1981 return pn_connection_remote_container(self._conn)
1982 1983 @property
1984 - def remote_hostname(self):
1985 return pn_connection_remote_hostname(self._conn)
1986 1987 @property
1989 return dat2obj(pn_connection_remote_offered_capabilities(self._conn))
1990 1991 @property
1993 return dat2obj(pn_connection_remote_desired_capabilities(self._conn))
1994 1995 @property
1996 - def remote_properties(self):
1997 return dat2obj(pn_connection_remote_properties(self._conn))
1998
1999 - def open(self):
2000 obj2dat(self.offered_capabilities, 2001 pn_connection_offered_capabilities(self._conn)) 2002 obj2dat(self.desired_capabilities, 2003 pn_connection_desired_capabilities(self._conn)) 2004 obj2dat(self.properties, pn_connection_properties(self._conn)) 2005 pn_connection_open(self._conn)
2006
2007 - def close(self):
2008 self._update_cond() 2009 pn_connection_close(self._conn)
2010 2011 @property
2012 - def state(self):
2013 return pn_connection_state(self._conn)
2014 2015 @property
2016 - def writable(self):
2017 return pn_connection_writable(self._conn)
2018
2019 - def session(self):
2020 return wrap_session(pn_session(self._conn))
2021
2022 - def session_head(self, mask):
2023 return wrap_session(pn_session_head(self._conn, mask))
2024 2027 2028 @property
2029 - def work_head(self):
2030 return wrap_delivery(pn_work_head(self._conn))
2031 2032 @property
2033 - def error(self):
2034 return pn_error_code(pn_connection_error(self._conn))
2035
2036 -class SessionException(ProtonException):
2037 pass
2038
2039 -def wrap_session(ssn):
2040 if ssn is None: return None 2041 ctx = pn_session_get_context(ssn) 2042 if ctx: 2043 return ctx 2044 else: 2045 wrapper = Session(ssn) 2046 pn_session_set_context(ssn, wrapper) 2047 return wrapper
2048
2049 -class Session(Endpoint):
2050
2051 - def __init__(self, ssn):
2052 Endpoint.__init__(self) 2053 self._ssn = ssn
2054
2055 - def __del__(self):
2056 if hasattr(self, "_ssn"): 2057 pn_session_free(self._ssn) 2058 del self._ssn
2059
2060 - def _get_cond_impl(self):
2061 return pn_session_condition(self._ssn)
2062
2063 - def _get_remote_cond_impl(self):
2064 return pn_session_remote_condition(self._ssn)
2065
2066 - def _get_incoming_capacity(self):
2067 return pn_session_get_incoming_capacity(self._ssn)
2068
2069 - def _set_incoming_capacity(self, capacity):
2070 pn_session_set_incoming_capacity(self._ssn, capacity)
2071 2072 incoming_capacity = property(_get_incoming_capacity, _set_incoming_capacity) 2073 2074 @property
2075 - def outgoing_bytes(self):
2076 return pn_session_outgoing_bytes(self._ssn)
2077 2078 @property
2079 - def incoming_bytes(self):
2080 return pn_session_incoming_bytes(self._ssn)
2081
2082 - def open(self):
2083 pn_session_open(self._ssn)
2084
2085 - def close(self):
2086 self._update_cond() 2087 pn_session_close(self._ssn)
2088
2089 - def next(self, mask):
2090 return wrap_session(pn_session_next(self._ssn, mask))
2091 2092 @property
2093 - def state(self):
2094 return pn_session_state(self._ssn)
2095 2096 @property
2097 - def connection(self):
2098 return wrap_connection(pn_session_connection(self._ssn))
2099
2100 - def sender(self, name):
2101 return wrap_link(pn_sender(self._ssn, name))
2102
2103 - def receiver(self, name):
2104 return wrap_link(pn_receiver(self._ssn, name))
2105
2106 -class LinkException(ProtonException):
2107 pass
2108 2121 2239
2240 2241 -class Terminus(object):
2242 2243 UNSPECIFIED = PN_UNSPECIFIED 2244 SOURCE = PN_SOURCE 2245 TARGET = PN_TARGET 2246 COORDINATOR = PN_COORDINATOR 2247 2248 NONDURABLE = PN_NONDURABLE 2249 CONFIGURATION = PN_CONFIGURATION 2250 DELIVERIES = PN_DELIVERIES 2251 2252 DIST_MODE_UNSPECIFIED = PN_DIST_MODE_UNSPECIFIED 2253 DIST_MODE_COPY = PN_DIST_MODE_COPY 2254 DIST_MODE_MOVE = PN_DIST_MODE_MOVE 2255
2256 - def __init__(self, impl):
2257 self._impl = impl
2258
2259 - def _check(self, err):
2260 if err < 0: 2261 exc = EXCEPTIONS.get(err, LinkException) 2262 raise exc("[%s]" % err) 2263 else: 2264 return err
2265
2266 - def _get_type(self):
2267 return pn_terminus_get_type(self._impl)
2268 - def _set_type(self, type):
2269 self._check(pn_terminus_set_type(self._impl, type))
2270 type = property(_get_type, _set_type) 2271
2272 - def _get_address(self):
2273 return pn_terminus_get_address(self._impl)
2274 - def _set_address(self, address):
2275 self._check(pn_terminus_set_address(self._impl, address))
2276 address = property(_get_address, _set_address) 2277
2278 - def _get_durability(self):
2279 return pn_terminus_get_durability(self._impl)
2280 - def _set_durability(self, seconds):
2281 self._check(pn_terminus_set_durability(self._impl, seconds))
2282 durability = property(_get_durability, _set_durability) 2283
2284 - def _get_expiry_policy(self):
2285 return pn_terminus_get_expiry_policy(self._impl)
2286 - def _set_expiry_policy(self, seconds):
2287 self._check(pn_terminus_set_expiry_policy(self._impl, seconds))
2288 expiry_policy = property(_get_expiry_policy, _set_expiry_policy) 2289
2290 - def _get_timeout(self):
2291 return pn_terminus_get_timeout(self._impl)
2292 - def _set_timeout(self, seconds):
2293 self._check(pn_terminus_set_timeout(self._impl, seconds))
2294 timeout = property(_get_timeout, _set_timeout) 2295
2296 - def _is_dynamic(self):
2297 return pn_terminus_is_dynamic(self._impl)
2298 - def _set_dynamic(self, dynamic):
2299 self._check(pn_terminus_set_dynamic(self._impl, dynamic))
2300 dynamic = property(_is_dynamic, _set_dynamic) 2301
2302 - def _get_distribution_mode(self):
2303 return pn_terminus_get_distribution_mode(self._impl)
2304 - def _set_distribution_mode(self, mode):
2305 self._check(pn_terminus_set_distribution_mode(self._impl, mode))
2306 distribution_mode = property(_get_distribution_mode, _set_distribution_mode) 2307 2308 @property
2309 - def properties(self):
2310 return Data(pn_terminus_properties(self._impl))
2311 2312 @property
2313 - def capabilities(self):
2314 return Data(pn_terminus_capabilities(self._impl))
2315 2316 @property
2317 - def outcomes(self):
2318 return Data(pn_terminus_outcomes(self._impl))
2319 2320 @property
2321 - def filter(self):
2322 return Data(pn_terminus_filter(self._impl))
2323
2324 - def copy(self, src):
2325 self._check(pn_terminus_copy(self._impl, src._impl))
2326
2327 2328 -class Sender(Link):
2329
2330 - def offered(self, n):
2331 pn_link_offered(self._link, n)
2332
2333 - def send(self, bytes):
2334 return self._check(pn_link_send(self._link, bytes))
2335
2336 - def drained(self):
2337 pn_link_drained(self._link)
2338
2339 -class Receiver(Link):
2340
2341 - def flow(self, n):
2342 pn_link_flow(self._link, n)
2343
2344 - def recv(self, limit):
2345 n, bytes = pn_link_recv(self._link, limit) 2346 if n == PN_EOS: 2347 return None 2348 else: 2349 self._check(n) 2350 return bytes
2351
2352 - def drain(self, n):
2353 pn_link_drain(self._link, n)
2354
2355 -def wrap_delivery(dlv):
2356 if not dlv: return None 2357 ctx = pn_delivery_get_context(dlv) 2358 if ctx: return ctx 2359 wrapper = Delivery(dlv) 2360 pn_delivery_set_context(dlv, wrapper) 2361 return wrapper
2362
2363 -class Disposition(object):
2364 2365 RECEIVED = PN_RECEIVED 2366 ACCEPTED = PN_ACCEPTED 2367 REJECTED = PN_REJECTED 2368 RELEASED = PN_RELEASED 2369 MODIFIED = PN_MODIFIED 2370
2371 - def __init__(self, impl, local):
2372 self._impl = impl 2373 self.local = local 2374 self._data = None 2375 self._condition = None 2376 self._annotations = None
2377 2378 @property
2379 - def type(self):
2380 return pn_disposition_type(self._impl)
2381
2382 - def _get_section_number(self):
2383 return pn_disposition_get_section_number(self._impl)
2384 - def _set_section_number(self, n):
2385 pn_disposition_set_section_number(self._impl, n)
2386 section_number = property(_get_section_number, _set_section_number) 2387
2388 - def _get_section_offset(self):
2389 return pn_disposition_get_section_offset(self._impl)
2390 - def _set_section_offset(self, n):
2391 pn_disposition_set_section_offset(self._impl, n)
2392 section_offset = property(_get_section_offset, _set_section_offset) 2393
2394 - def _get_failed(self):
2395 return pn_disposition_is_failed(self._impl)
2396 - def _set_failed(self, b):
2397 pn_disposition_set_failed(self._impl, b)
2398 failed = property(_get_failed, _set_failed) 2399
2400 - def _get_undeliverable(self):
2401 return pn_disposition_is_undeliverable(self._impl)
2402 - def _set_undeliverable(self, b):
2403 pn_disposition_set_undeliverable(self._impl, b)
2404 undeliverable = property(_get_undeliverable, _set_undeliverable) 2405
2406 - def _get_data(self):
2407 if self.local: 2408 return self._data 2409 else: 2410 return dat2obj(pn_disposition_data(self._impl))
2411 - def _set_data(self, obj):
2412 if self.local: 2413 self._data = obj 2414 else: 2415 raise AttributeError("data attribute is read-only")
2416 data = property(_get_data, _set_data) 2417
2418 - def _get_annotations(self):
2419 if self.local: 2420 return self._annotations 2421 else: 2422 return dat2obj(pn_disposition_annotations(self._impl))
2423 - def _set_annotations(self, obj):
2424 if self.local: 2425 self._annotations = obj 2426 else: 2427 raise AttributeError("annotations attribute is read-only")
2428 annotations = property(_get_annotations, _set_annotations) 2429
2430 - def _get_condition(self):
2431 if self.local: 2432 return self._condition 2433 else: 2434 return cond2obj(pn_disposition_condition(self._impl))
2435 - def _set_condition(self, obj):
2436 if self.local: 2437 self._condition = obj 2438 else: 2439 raise AttributeError("condition attribute is read-only")
2440 condition = property(_get_condition, _set_condition)
2441
2442 -class Delivery(object):
2443 2444 RECEIVED = Disposition.RECEIVED 2445 ACCEPTED = Disposition.ACCEPTED 2446 REJECTED = Disposition.REJECTED 2447 RELEASED = Disposition.RELEASED 2448 MODIFIED = Disposition.MODIFIED 2449
2450 - def __init__(self, dlv):
2451 self._dlv = dlv 2452 self.local = Disposition(pn_delivery_local(self._dlv), True) 2453 self.remote = Disposition(pn_delivery_remote(self._dlv), False)
2454 2455 @property
2456 - def tag(self):
2457 return pn_delivery_tag(self._dlv)
2458 2459 @property
2460 - def writable(self):
2461 return pn_delivery_writable(self._dlv)
2462 2463 @property
2464 - def readable(self):
2465 return pn_delivery_readable(self._dlv)
2466 2467 @property
2468 - def updated(self):
2469 return pn_delivery_updated(self._dlv)
2470
2471 - def update(self, state):
2472 obj2dat(self.local._data, pn_disposition_data(self.local._impl)) 2473 obj2dat(self.local._annotations, pn_disposition_annotations(self.local._impl)) 2474 obj2cond(self.local._condition, pn_disposition_condition(self.local._impl)) 2475 pn_delivery_update(self._dlv, state)
2476 2477 @property
2478 - def pending(self):
2479 return pn_delivery_pending(self._dlv)
2480 2481 @property
2482 - def partial(self):
2483 return pn_delivery_partial(self._dlv)
2484 2485 @property
2486 - def local_state(self):
2487 return pn_delivery_local_state(self._dlv)
2488 2489 @property
2490 - def remote_state(self):
2491 return pn_delivery_remote_state(self._dlv)
2492 2493 @property
2494 - def settled(self):
2495 return pn_delivery_settled(self._dlv)
2496
2497 - def settle(self):
2498 pn_delivery_settle(self._dlv)
2499 2500 @property
2501 - def work_next(self):
2502 return wrap_delivery(pn_work_next(self._dlv))
2503 2504 @property
2507
2508 -class TransportException(ProtonException):
2509 pass
2510
2511 -class Transport(object):
2512 2513 TRACE_DRV = PN_TRACE_DRV 2514 TRACE_FRM = PN_TRACE_FRM 2515 TRACE_RAW = PN_TRACE_RAW 2516
2517 - def __init__(self, _trans=None):
2518 if not _trans: 2519 self._trans = pn_transport() 2520 else: 2521 self._shared_trans = True 2522 self._trans = _trans
2523
2524 - def __del__(self):
2525 if hasattr(self, "_trans"): 2526 if not hasattr(self, "_shared_trans"): 2527 pn_transport_free(self._trans) 2528 del self._trans
2529
2530 - def _check(self, err):
2531 if err < 0: 2532 exc = EXCEPTIONS.get(err, TransportException) 2533 raise exc("[%s]: %s" % (err, pn_error_text(pn_transport_error(self._trans)))) 2534 else: 2535 return err
2536
2537 - def bind(self, connection):
2538 self._check(pn_transport_bind(self._trans, connection._conn))
2539
2540 - def trace(self, n):
2541 pn_transport_trace(self._trans, n)
2542
2543 - def tick(self, now):
2544 return pn_transport_tick(self._trans, now)
2545
2546 - def capacity(self):
2547 c = pn_transport_capacity(self._trans) 2548 if c >= PN_EOS: 2549 return c 2550 else: 2551 return self._check(c)
2552
2553 - def push(self, bytes):
2554 self._check(pn_transport_push(self._trans, bytes))
2555
2556 - def close_tail(self):
2557 self._check(pn_transport_close_tail(self._trans))
2558
2559 - def pending(self):
2560 p = pn_transport_pending(self._trans) 2561 if p >= PN_EOS: 2562 return p 2563 else: 2564 return self._check(p)
2565
2566 - def peek(self, size):
2567 cd, out = pn_transport_peek(self._trans, size) 2568 if cd == PN_EOS: 2569 return None 2570 else: 2571 self._check(cd) 2572 return out
2573
2574 - def pop(self, size):
2575 pn_transport_pop(self._trans, size)
2576
2577 - def close_head(self):
2578 self._check(pn_transport_close_head(self._trans))
2579
2580 - def output(self, size):
2581 p = self.pending() 2582 if p < 0: 2583 return None 2584 else: 2585 out = self.peek(min(size, p)) 2586 self.pop(len(out)) 2587 return out
2588
2589 - def input(self, bytes):
2590 if not bytes: 2591 self.close_tail() 2592 return None 2593 else: 2594 c = self.capacity() 2595 if (c < 0): 2596 return None 2597 trimmed = bytes[:c] 2598 self.push(trimmed) 2599 return len(trimmed)
2600 2601 # AMQP 1.0 max-frame-size
2602 - def _get_max_frame_size(self):
2603 return pn_transport_get_max_frame(self._trans)
2604
2605 - def _set_max_frame_size(self, value):
2606 pn_transport_set_max_frame(self._trans, value)
2607 2608 max_frame_size = property(_get_max_frame_size, _set_max_frame_size, 2609 doc=""" 2610 Sets the maximum size for received frames (in bytes). 2611 """) 2612 2613 @property
2614 - def remote_max_frame_size(self):
2615 return pn_transport_get_remote_max_frame(self._trans)
2616 2617 # AMQP 1.0 idle-time-out
2618 - def _get_idle_timeout(self):
2619 return pn_transport_get_idle_timeout(self._trans)
2620
2621 - def _set_idle_timeout(self, value):
2622 pn_transport_set_idle_timeout(self._trans, value)
2623 2624 idle_timeout = property(_get_idle_timeout, _set_idle_timeout, 2625 doc=""" 2626 The idle timeout of the connection (in milliseconds). 2627 """) 2628 2629 @property
2630 - def remote_idle_timeout(self):
2631 return pn_transport_get_remote_idle_timeout(self._trans)
2632 2633 @property
2634 - def frames_output(self):
2635 return pn_transport_get_frames_output(self._trans)
2636 2637 @property
2638 - def frames_input(self):
2639 return pn_transport_get_frames_input(self._trans)
2640
2641 -class SASLException(TransportException):
2642 pass
2643
2644 -class SASL(object):
2645 2646 OK = PN_SASL_OK 2647 AUTH = PN_SASL_AUTH 2648
2649 - def __init__(self, transport):
2650 self._sasl = pn_sasl(transport._trans)
2651
2652 - def _check(self, err):
2653 if err < 0: 2654 exc = EXCEPTIONS.get(err, SASLException) 2655 raise exc("[%s]" % (err)) 2656 else: 2657 return err
2658
2659 - def mechanisms(self, mechs):
2660 pn_sasl_mechanisms(self._sasl, mechs)
2661
2662 - def client(self):
2663 pn_sasl_client(self._sasl)
2664
2665 - def server(self):
2666 pn_sasl_server(self._sasl)
2667
2668 - def plain(self, user, password):
2669 pn_sasl_plain(self._sasl, user, password)
2670
2671 - def send(self, data):
2672 self._check(pn_sasl_send(self._sasl, data, len(data)))
2673
2674 - def recv(self):
2675 sz = 16 2676 while True: 2677 n, data = pn_sasl_recv(self._sasl, sz) 2678 if n == PN_OVERFLOW: 2679 sz *= 2 2680 continue 2681 elif n == PN_EOS: 2682 return None 2683 else: 2684 self._check(n) 2685 return data
2686 2687 @property
2688 - def outcome(self):
2689 outcome = pn_sasl_outcome(self._sasl) 2690 if outcome == PN_SASL_NONE: 2691 return None 2692 else: 2693 return outcome
2694
2695 - def done(self, outcome):
2696 pn_sasl_done(self._sasl, outcome)
2697 2698 STATE_CONF = PN_SASL_CONF 2699 STATE_IDLE = PN_SASL_IDLE 2700 STATE_STEP = PN_SASL_STEP 2701 STATE_PASS = PN_SASL_PASS 2702 STATE_FAIL = PN_SASL_FAIL 2703 2704 @property
2705 - def state(self):
2706 return pn_sasl_state(self._sasl)
2707
2708 2709 -class SSLException(TransportException):
2710 pass
2711
2712 -class SSLUnavailable(SSLException):
2713 pass
2714
2715 -class SSLDomain(object):
2716 2717 MODE_CLIENT = PN_SSL_MODE_CLIENT 2718 MODE_SERVER = PN_SSL_MODE_SERVER 2719 VERIFY_PEER = PN_SSL_VERIFY_PEER 2720 VERIFY_PEER_NAME = PN_SSL_VERIFY_PEER_NAME 2721 ANONYMOUS_PEER = PN_SSL_ANONYMOUS_PEER 2722
2723 - def __init__(self, mode):
2724 self._domain = pn_ssl_domain(mode) 2725 if self._domain is None: 2726 raise SSLUnavailable()
2727
2728 - def _check(self, err):
2729 if err < 0: 2730 exc = EXCEPTIONS.get(err, SSLException) 2731 raise exc("SSL failure.") 2732 else: 2733 return err
2734
2735 - def set_credentials(self, cert_file, key_file, password):
2736 return self._check( pn_ssl_domain_set_credentials(self._domain, 2737 cert_file, key_file, 2738 password) )
2739 - def set_trusted_ca_db(self, certificate_db):
2740 return self._check( pn_ssl_domain_set_trusted_ca_db(self._domain, 2741 certificate_db) )
2742 - def set_peer_authentication(self, verify_mode, trusted_CAs=None):
2743 return self._check( pn_ssl_domain_set_peer_authentication(self._domain, 2744 verify_mode, 2745 trusted_CAs) )
2746
2747 - def allow_unsecured_client(self):
2748 return self._check( pn_ssl_domain_allow_unsecured_client(self._domain) )
2749
2750 -class SSL(object):
2751
2752 - def _check(self, err):
2753 if err < 0: 2754 exc = EXCEPTIONS.get(err, SSLException) 2755 raise exc("SSL failure.") 2756 else: 2757 return err
2758
2759 - def __init__(self, transport, domain, session_details=None):
2760 session_id = None 2761 if session_details: 2762 session_id = session_details.get_session_id() 2763 self._ssl = pn_ssl( transport._trans ) 2764 if self._ssl is None: 2765 raise SSLUnavailable() 2766 pn_ssl_init( self._ssl, domain._domain, session_id )
2767
2768 - def cipher_name(self):
2769 rc, name = pn_ssl_get_cipher_name( self._ssl, 128 ) 2770 if rc: 2771 return name 2772 return None
2773
2774 - def protocol_name(self):
2775 rc, name = pn_ssl_get_protocol_name( self._ssl, 128 ) 2776 if rc: 2777 return name 2778 return None
2779 2780 RESUME_UNKNOWN = PN_SSL_RESUME_UNKNOWN 2781 RESUME_NEW = PN_SSL_RESUME_NEW 2782 RESUME_REUSED = PN_SSL_RESUME_REUSED 2783
2784 - def resume_status(self):
2785 return pn_ssl_resume_status( self._ssl )
2786
2787 - def _set_peer_hostname(self, hostname):
2788 self._check(pn_ssl_set_peer_hostname( self._ssl, hostname ))
2789 - def _get_peer_hostname(self):
2790 err, name = pn_ssl_get_peer_hostname( self._ssl, 1024 ) 2791 self._check(err) 2792 return name
2793 peer_hostname = property(_get_peer_hostname, _set_peer_hostname, 2794 doc=""" 2795 Manage the expected name of the remote peer. Used to authenticate the remote. 2796 """)
2797
2798 2799 -class SSLSessionDetails(object):
2800 """ Unique identifier for the SSL session. Used to resume previous session on a new 2801 SSL connection. 2802 """ 2803
2804 - def __init__(self, session_id):
2805 self._session_id = session_id
2806
2807 - def get_session_id(self):
2808 return self._session_id
2809
2810 2811 ### 2812 # Driver 2813 ### 2814 2815 -class DriverException(ProtonException):
2816 """ 2817 The DriverException class is the root of the driver exception hierarchy. 2818 """ 2819 pass
2820
2821 2822 -def wrap_connector(cxtr):
2823 if not cxtr: return None 2824 ctx = pn_connector_context(cxtr) 2825 if ctx: return ctx 2826 wrapper = Connector(_cxtr=cxtr) 2827 pn_connector_set_context(cxtr, wrapper) 2828 return wrapper
2829
2830 -class Connector(object):
2831 - def __init__(self, _cxtr):
2832 self._cxtr = _cxtr
2833
2834 - def next(self):
2835 return wrap_connector(pn_connector_next(self._cxtr))
2836
2837 - def process(self):
2838 pn_connector_process(self._cxtr)
2839
2840 - def listener(self):
2841 return wrap_listener(pn_connector_listener(self._cxtr))
2842
2843 - def sasl(self):
2844 ## seems easier just to grab the SASL associated with the transport: 2845 trans = self.transport 2846 if trans: 2847 return SASL(self.transport) 2848 return None
2849 2850 @property
2851 - def transport(self):
2852 trans = pn_connector_transport(self._cxtr) 2853 if trans: 2854 return Transport(trans) 2855 return None
2856
2857 - def close(self):
2858 return pn_connector_close(self._cxtr)
2859 2860 @property
2861 - def closed(self):
2862 return pn_connector_closed(self._cxtr)
2863
2864 - def _get_connection(self):
2865 return wrap_connection(pn_connector_connection(self._cxtr))
2866
2867 - def _set_connection(self, conn):
2868 pn_connector_set_connection(self._cxtr, conn._conn)
2869 2870 connection = property(_get_connection, _set_connection, 2871 doc=""" 2872 Associate a Connection with this Connector. 2873 """)
2874
2875 -def wrap_listener(lsnr):
2876 if not lsnr: return None 2877 ctx = pn_listener_context(lsnr) 2878 if ctx: return ctx 2879 wrapper = Listener(_lsnr=lsnr) 2880 pn_listener_set_context(lsnr, wrapper) 2881 return wrapper
2882
2883 -class Listener(object):
2884 - def __init__(self, _lsnr=None):
2885 self._lsnr = _lsnr
2886
2887 - def next(self):
2888 return wrap_listener(pn_listener_next(self._lsnr))
2889
2890 - def accept(self):
2891 cxtr = pn_listener_accept(self._lsnr) 2892 return wrap_connector(cxtr)
2893
2894 - def close(self):
2895 pn_listener_close(self._lsnr)
2896
2897 -class Driver(object):
2898 - def __init__(self):
2899 self._driver = pn_driver()
2900
2901 - def __del__(self):
2902 if hasattr(self, "_driver"): 2903 pn_driver_free(self._driver) 2904 del self._driver
2905
2906 - def wait(self, timeout):
2907 return pn_driver_wait(self._driver, timeout)
2908
2909 - def wakeup(self):
2910 return pn_driver_wakeup(self._driver)
2911
2912 - def listener(self, host, port):
2913 return wrap_listener(pn_listener(self._driver, host, port, None))
2914
2915 - def pending_listener(self):
2916 return wrap_listener(pn_driver_listener(self._driver))
2917
2918 - def head_listener(self):
2919 return wrap_listener(pn_listener_head(self._driver))
2920
2921 - def connector(self, host, port):
2922 return wrap_connector(pn_connector(self._driver, host, port, None))
2923
2924 - def head_connector(self):
2925 return wrap_connector(pn_connector_head(self._driver))
2926
2927 - def pending_connector(self):
2928 return wrap_connector(pn_driver_connector(self._driver))
2929 2930 __all__ = [ 2931 "API_LANGUAGE", 2932 "IMPLEMENTATION_LANGUAGE", 2933 "ACCEPTED", 2934 "AUTOMATIC", 2935 "PENDING", 2936 "MANUAL", 2937 "REJECTED", 2938 "UNDESCRIBED", 2939 "Array", 2940 "Condition", 2941 "Connection", 2942 "Connector", 2943 "Data", 2944 "Delivery", 2945 "Disposition", 2946 "Described", 2947 "Driver", 2948 "DriverException", 2949 "Endpoint", 2950 "Link", 2951 "Listener", 2952 "Message", 2953 "MessageException", 2954 "Messenger", 2955 "MessengerException", 2956 "ProtonException", 2957 "Receiver", 2958 "SASL", 2959 "Sender", 2960 "Session", 2961 "SSL", 2962 "SSLDomain", 2963 "SSLSessionDetails", 2964 "SSLUnavailable", 2965 "Terminus", 2966 "Timeout", 2967 "Interrupt", 2968 "Transport", 2969 "TransportException", 2970 "char", 2971 "symbol", 2972 "timestamp", 2973 "ulong" 2974 ] 2975