Package proton :: Module _endpoints
[frames] | no frames]

Source Code for Module proton._endpoints

  1  # 
  2  # Licensed to the Apache Software Foundation (ASF) under one 
  3  # or more contributor license agreements.  See the NOTICE file 
  4  # distributed with this work for additional information 
  5  # regarding copyright ownership.  The ASF licenses this file 
  6  # to you under the Apache License, Version 2.0 (the 
  7  # "License"); you may not use this file except in compliance 
  8  # with the License.  You may obtain a copy of the License at 
  9  # 
 10  #   http://www.apache.org/licenses/LICENSE-2.0 
 11  # 
 12  # Unless required by applicable law or agreed to in writing, 
 13  # software distributed under the License is distributed on an 
 14  # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
 15  # KIND, either express or implied.  See the License for the 
 16  # specific language governing permissions and limitations 
 17  # under the License. 
 18  # 
 19   
 20  """ 
 21  The proton.endpoints module 
 22  """ 
 23   
 24  from __future__ import absolute_import 
 25   
 26  import weakref 
 27   
 28  from cproton import PN_CONFIGURATION, PN_COORDINATOR, PN_DELIVERIES, PN_DIST_MODE_COPY, PN_DIST_MODE_MOVE, \ 
 29      PN_DIST_MODE_UNSPECIFIED, PN_EOS, PN_EXPIRE_NEVER, PN_EXPIRE_WITH_CONNECTION, PN_EXPIRE_WITH_LINK, \ 
 30      PN_EXPIRE_WITH_SESSION, PN_LOCAL_ACTIVE, PN_LOCAL_CLOSED, PN_LOCAL_UNINIT, PN_NONDURABLE, PN_RCV_FIRST, \ 
 31      PN_RCV_SECOND, PN_REMOTE_ACTIVE, PN_REMOTE_CLOSED, PN_REMOTE_UNINIT, PN_SND_MIXED, PN_SND_SETTLED, PN_SND_UNSETTLED, \ 
 32      PN_SOURCE, PN_TARGET, PN_UNSPECIFIED, pn_connection, pn_connection_attachments, pn_connection_close, \ 
 33      pn_connection_collect, pn_connection_condition, pn_connection_desired_capabilities, pn_connection_error, \ 
 34      pn_connection_get_container, pn_connection_get_hostname, pn_connection_get_user, pn_connection_offered_capabilities, \ 
 35      pn_connection_open, pn_connection_properties, pn_connection_release, pn_connection_remote_condition, \ 
 36      pn_connection_remote_container, pn_connection_remote_desired_capabilities, pn_connection_remote_hostname, \ 
 37      pn_connection_remote_offered_capabilities, pn_connection_remote_properties, pn_connection_set_container, \ 
 38      pn_connection_set_hostname, pn_connection_set_password, pn_connection_set_user, pn_connection_state, \ 
 39      pn_connection_transport, pn_delivery, pn_error_code, pn_error_text, pn_link_advance, pn_link_attachments, \ 
 40      pn_link_available, pn_link_close, pn_link_condition, pn_link_credit, pn_link_current, pn_link_detach, pn_link_drain, \ 
 41      pn_link_drained, pn_link_draining, pn_link_error, pn_link_flow, pn_link_free, pn_link_get_drain, pn_link_head, \ 
 42      pn_link_is_receiver, pn_link_is_sender, pn_link_max_message_size, pn_link_name, pn_link_next, pn_link_offered, \ 
 43      pn_link_open, pn_link_queued, pn_link_rcv_settle_mode, pn_link_recv, pn_link_remote_condition, \ 
 44      pn_link_remote_max_message_size, pn_link_remote_rcv_settle_mode, pn_link_remote_snd_settle_mode, \ 
 45      pn_link_remote_source, pn_link_remote_target, pn_link_send, pn_link_session, pn_link_set_drain, \ 
 46      pn_link_set_max_message_size, pn_link_set_rcv_settle_mode, pn_link_set_snd_settle_mode, pn_link_snd_settle_mode, \ 
 47      pn_link_source, pn_link_state, pn_link_target, pn_link_unsettled, pn_receiver, pn_sender, pn_session, \ 
 48      pn_session_attachments, pn_session_close, pn_session_condition, pn_session_connection, pn_session_free, \ 
 49      pn_session_get_incoming_capacity, pn_session_get_outgoing_window, pn_session_head, pn_session_incoming_bytes, \ 
 50      pn_session_next, pn_session_open, pn_session_outgoing_bytes, pn_session_remote_condition, \ 
 51      pn_session_set_incoming_capacity, pn_session_set_outgoing_window, pn_session_state, pn_terminus_capabilities, \ 
 52      pn_terminus_copy, pn_terminus_filter, pn_terminus_get_address, pn_terminus_get_distribution_mode, \ 
 53      pn_terminus_get_durability, pn_terminus_get_expiry_policy, pn_terminus_get_timeout, pn_terminus_get_type, \ 
 54      pn_terminus_is_dynamic, pn_terminus_outcomes, pn_terminus_properties, pn_terminus_set_address, \ 
 55      pn_terminus_set_distribution_mode, pn_terminus_set_durability, pn_terminus_set_dynamic, \ 
 56      pn_terminus_set_expiry_policy, pn_terminus_set_timeout, pn_terminus_set_type, pn_work_head 
 57   
 58  from ._common import unicode2utf8, utf82unicode 
 59  from ._condition import cond2obj, obj2cond 
 60  from ._data import Data, dat2obj, obj2dat 
 61  from ._delivery import Delivery 
 62  from ._exceptions import ConnectionException, EXCEPTIONS, LinkException, SessionException 
 63  from ._transport import Transport 
 64  from ._wrapper import Wrapper 
65 66 67 -class Endpoint(object):
68 LOCAL_UNINIT = PN_LOCAL_UNINIT 69 REMOTE_UNINIT = PN_REMOTE_UNINIT 70 LOCAL_ACTIVE = PN_LOCAL_ACTIVE 71 REMOTE_ACTIVE = PN_REMOTE_ACTIVE 72 LOCAL_CLOSED = PN_LOCAL_CLOSED 73 REMOTE_CLOSED = PN_REMOTE_CLOSED 74
75 - def _init(self):
76 self.condition = None 77 self._handler = None
78
79 - def _update_cond(self):
80 obj2cond(self.condition, self._get_cond_impl())
81 82 @property
83 - def remote_condition(self):
84 return cond2obj(self._get_remote_cond_impl())
85 86 # the following must be provided by subclasses
87 - def _get_cond_impl(self):
88 assert False, "Subclass must override this!"
89
90 - def _get_remote_cond_impl(self):
91 assert False, "Subclass must override this!"
92
93 - def _get_handler(self):
94 return self._handler
95
96 - def _set_handler(self, handler):
97 # TODO Hack This is here for some very odd (IMO) backwards compat behaviour 98 from ._events import Handler 99 if handler is None: 100 self._handler = None 101 elif issubclass(type(handler), Handler): 102 self._handler = handler 103 else: 104 self._handler = Handler() 105 self._handler.add(handler)
106 107 handler = property(_get_handler, _set_handler)
108
109 110 -class Connection(Wrapper, Endpoint):
111 """ 112 A representation of an AMQP connection 113 """ 114 115 @staticmethod
116 - def wrap(impl):
117 if impl is None: 118 return None 119 else: 120 return Connection(impl)
121
122 - def __init__(self, impl=pn_connection):
123 Wrapper.__init__(self, impl, pn_connection_attachments)
124
125 - def _init(self):
126 Endpoint._init(self) 127 self.offered_capabilities = None 128 self.desired_capabilities = None 129 self.properties = None 130 self.url = None 131 self._acceptor = None
132
133 - def _get_attachments(self):
134 return pn_connection_attachments(self._impl)
135 136 @property
137 - def connection(self):
138 return self
139 140 @property
141 - def transport(self):
142 return Transport.wrap(pn_connection_transport(self._impl))
143
144 - def _check(self, err):
145 if err < 0: 146 exc = EXCEPTIONS.get(err, ConnectionException) 147 raise exc("[%s]: %s" % (err, pn_connection_error(self._impl))) 148 else: 149 return err
150
151 - def _get_cond_impl(self):
152 return pn_connection_condition(self._impl)
153
154 - def _get_remote_cond_impl(self):
155 return pn_connection_remote_condition(self._impl)
156
157 - def collect(self, collector):
158 if collector is None: 159 pn_connection_collect(self._impl, None) 160 else: 161 pn_connection_collect(self._impl, collector._impl) 162 self._collector = weakref.ref(collector)
163
164 - def _get_container(self):
165 return utf82unicode(pn_connection_get_container(self._impl))
166
167 - def _set_container(self, name):
168 pn_connection_set_container(self._impl, unicode2utf8(name))
169 170 container = property(_get_container, _set_container) 171
172 - def _get_hostname(self):
173 return utf82unicode(pn_connection_get_hostname(self._impl))
174
175 - def _set_hostname(self, name):
176 pn_connection_set_hostname(self._impl, unicode2utf8(name))
177 178 hostname = property(_get_hostname, _set_hostname, 179 doc=""" 180 Set the name of the host (either fully qualified or relative) to which this 181 connection is connecting to. This information may be used by the remote 182 peer to determine the correct back-end service to connect the client to. 183 This value will be sent in the Open performative, and will be used by SSL 184 and SASL layers to identify the peer. 185 """) 186
187 - def _get_user(self):
188 return utf82unicode(pn_connection_get_user(self._impl))
189
190 - def _set_user(self, name):
191 pn_connection_set_user(self._impl, unicode2utf8(name))
192 193 user = property(_get_user, _set_user) 194
195 - def _get_password(self):
196 return None
197
198 - def _set_password(self, name):
199 pn_connection_set_password(self._impl, unicode2utf8(name))
200 201 password = property(_get_password, _set_password) 202 203 @property
204 - def remote_container(self):
205 """The container identifier specified by the remote peer for this connection.""" 206 return pn_connection_remote_container(self._impl)
207 208 @property
209 - def remote_hostname(self):
210 """The hostname specified by the remote peer for this connection.""" 211 return pn_connection_remote_hostname(self._impl)
212 213 @property
215 """The capabilities offered by the remote peer for this connection.""" 216 return dat2obj(pn_connection_remote_offered_capabilities(self._impl))
217 218 @property
220 """The capabilities desired by the remote peer for this connection.""" 221 return dat2obj(pn_connection_remote_desired_capabilities(self._impl))
222 223 @property
224 - def remote_properties(self):
225 """The properties specified by the remote peer for this connection.""" 226 return dat2obj(pn_connection_remote_properties(self._impl))
227 228 @property
229 - def connected_address(self):
230 return self.url and str(self.url)
231
232 - def open(self):
233 """ 234 Opens the connection. 235 236 In more detail, this moves the local state of the connection to 237 the ACTIVE state and triggers an open frame to be sent to the 238 peer. A connection is fully active once both peers have opened it. 239 """ 240 obj2dat(self.offered_capabilities, 241 pn_connection_offered_capabilities(self._impl)) 242 obj2dat(self.desired_capabilities, 243 pn_connection_desired_capabilities(self._impl)) 244 obj2dat(self.properties, pn_connection_properties(self._impl)) 245 pn_connection_open(self._impl)
246
247 - def close(self):
248 """ 249 Closes the connection. 250 251 In more detail, this moves the local state of the connection to 252 the CLOSED state and triggers a close frame to be sent to the 253 peer. A connection is fully closed once both peers have closed it. 254 """ 255 self._update_cond() 256 pn_connection_close(self._impl) 257 if hasattr(self, '_session_policy'): 258 # break circular ref 259 del self._session_policy
260 261 @property
262 - def state(self):
263 """ 264 The state of the connection as a bit field. The state has a local 265 and a remote component. Each of these can be in one of three 266 states: UNINIT, ACTIVE or CLOSED. These can be tested by masking 267 against LOCAL_UNINIT, LOCAL_ACTIVE, LOCAL_CLOSED, REMOTE_UNINIT, 268 REMOTE_ACTIVE and REMOTE_CLOSED. 269 """ 270 return pn_connection_state(self._impl)
271
272 - def session(self):
273 """ 274 Returns a new session on this connection. 275 """ 276 ssn = pn_session(self._impl) 277 if ssn is None: 278 raise (SessionException("Session allocation failed.")) 279 else: 280 return Session(ssn)
281
282 - def session_head(self, mask):
283 return Session.wrap(pn_session_head(self._impl, mask))
284 287 288 @property
289 - def work_head(self):
290 return Delivery.wrap(pn_work_head(self._impl))
291 292 @property
293 - def error(self):
294 return pn_error_code(pn_connection_error(self._impl))
295
296 - def free(self):
297 pn_connection_release(self._impl)
298
299 300 -class Session(Wrapper, Endpoint):
301 302 @staticmethod
303 - def wrap(impl):
304 if impl is None: 305 return None 306 else: 307 return Session(impl)
308
309 - def __init__(self, impl):
310 Wrapper.__init__(self, impl, pn_session_attachments)
311
312 - def _get_attachments(self):
313 return pn_session_attachments(self._impl)
314
315 - def _get_cond_impl(self):
316 return pn_session_condition(self._impl)
317
318 - def _get_remote_cond_impl(self):
319 return pn_session_remote_condition(self._impl)
320
321 - def _get_incoming_capacity(self):
322 return pn_session_get_incoming_capacity(self._impl)
323
324 - def _set_incoming_capacity(self, capacity):
325 pn_session_set_incoming_capacity(self._impl, capacity)
326 327 incoming_capacity = property(_get_incoming_capacity, _set_incoming_capacity) 328
329 - def _get_outgoing_window(self):
330 return pn_session_get_outgoing_window(self._impl)
331
332 - def _set_outgoing_window(self, window):
333 pn_session_set_outgoing_window(self._impl, window)
334 335 outgoing_window = property(_get_outgoing_window, _set_outgoing_window) 336 337 @property
338 - def outgoing_bytes(self):
339 return pn_session_outgoing_bytes(self._impl)
340 341 @property
342 - def incoming_bytes(self):
343 return pn_session_incoming_bytes(self._impl)
344
345 - def open(self):
346 pn_session_open(self._impl)
347
348 - def close(self):
349 self._update_cond() 350 pn_session_close(self._impl)
351
352 - def next(self, mask):
353 return Session.wrap(pn_session_next(self._impl, mask))
354 355 @property
356 - def state(self):
357 return pn_session_state(self._impl)
358 359 @property
360 - def connection(self):
361 return Connection.wrap(pn_session_connection(self._impl))
362 363 @property
364 - def transport(self):
365 return self.connection.transport
366
367 - def sender(self, name):
368 return Sender(pn_sender(self._impl, unicode2utf8(name)))
369
370 - def receiver(self, name):
371 return Receiver(pn_receiver(self._impl, unicode2utf8(name)))
372
373 - def free(self):
374 pn_session_free(self._impl)
375 580
581 582 -class Sender(Link):
583 """ 584 A link over which messages are sent. 585 """ 586
587 - def offered(self, n):
588 pn_link_offered(self._impl, n)
589
590 - def stream(self, data):
591 """ 592 Send specified data as part of the current delivery 593 594 @type data: binary 595 @param data: data to send 596 """ 597 return self._check(pn_link_send(self._impl, data))
598
599 - def send(self, obj, tag=None):
600 """ 601 Send specified object over this sender; the object is expected to 602 have a send() method on it that takes the sender and an optional 603 tag as arguments. 604 605 Where the object is a Message, this will send the message over 606 this link, creating a new delivery for the purpose. 607 """ 608 if hasattr(obj, 'send'): 609 return obj.send(self, tag=tag) 610 else: 611 # treat object as bytes 612 return self.stream(obj)
613
614 - def delivery_tag(self):
615 if not hasattr(self, 'tag_generator'): 616 def simple_tags(): 617 count = 1 618 while True: 619 yield str(count) 620 count += 1
621 622 self.tag_generator = simple_tags() 623 return next(self.tag_generator)
624
625 626 -class Receiver(Link):
627 """ 628 A link over which messages are received. 629 """ 630
631 - def flow(self, n):
632 """Increases the credit issued to the remote sender by the specified number of messages.""" 633 pn_link_flow(self._impl, n)
634
635 - def recv(self, limit):
636 n, binary = pn_link_recv(self._impl, limit) 637 if n == PN_EOS: 638 return None 639 else: 640 self._check(n) 641 return binary
642
643 - def drain(self, n):
644 pn_link_drain(self._impl, n)
645
646 - def draining(self):
647 return pn_link_draining(self._impl)
648
649 650 -class Terminus(object):
651 UNSPECIFIED = PN_UNSPECIFIED 652 SOURCE = PN_SOURCE 653 TARGET = PN_TARGET 654 COORDINATOR = PN_COORDINATOR 655 656 NONDURABLE = PN_NONDURABLE 657 CONFIGURATION = PN_CONFIGURATION 658 DELIVERIES = PN_DELIVERIES 659 660 DIST_MODE_UNSPECIFIED = PN_DIST_MODE_UNSPECIFIED 661 DIST_MODE_COPY = PN_DIST_MODE_COPY 662 DIST_MODE_MOVE = PN_DIST_MODE_MOVE 663 664 EXPIRE_WITH_LINK = PN_EXPIRE_WITH_LINK 665 EXPIRE_WITH_SESSION = PN_EXPIRE_WITH_SESSION 666 EXPIRE_WITH_CONNECTION = PN_EXPIRE_WITH_CONNECTION 667 EXPIRE_NEVER = PN_EXPIRE_NEVER 668
669 - def __init__(self, impl):
670 self._impl = impl
671
672 - def _check(self, err):
673 if err < 0: 674 exc = EXCEPTIONS.get(err, LinkException) 675 raise exc("[%s]" % err) 676 else: 677 return err
678
679 - def _get_type(self):
680 return pn_terminus_get_type(self._impl)
681
682 - def _set_type(self, type):
683 self._check(pn_terminus_set_type(self._impl, type))
684 685 type = property(_get_type, _set_type) 686
687 - def _get_address(self):
688 """The address that identifies the source or target node""" 689 return utf82unicode(pn_terminus_get_address(self._impl))
690
691 - def _set_address(self, address):
692 self._check(pn_terminus_set_address(self._impl, unicode2utf8(address)))
693 694 address = property(_get_address, _set_address) 695
696 - def _get_durability(self):
697 return pn_terminus_get_durability(self._impl)
698
699 - def _set_durability(self, seconds):
700 self._check(pn_terminus_set_durability(self._impl, seconds))
701 702 durability = property(_get_durability, _set_durability) 703
704 - def _get_expiry_policy(self):
705 return pn_terminus_get_expiry_policy(self._impl)
706
707 - def _set_expiry_policy(self, seconds):
708 self._check(pn_terminus_set_expiry_policy(self._impl, seconds))
709 710 expiry_policy = property(_get_expiry_policy, _set_expiry_policy) 711
712 - def _get_timeout(self):
713 return pn_terminus_get_timeout(self._impl)
714
715 - def _set_timeout(self, seconds):
716 self._check(pn_terminus_set_timeout(self._impl, seconds))
717 718 timeout = property(_get_timeout, _set_timeout) 719
720 - def _is_dynamic(self):
721 """Indicates whether the source or target node was dynamically 722 created""" 723 return pn_terminus_is_dynamic(self._impl)
724
725 - def _set_dynamic(self, dynamic):
726 self._check(pn_terminus_set_dynamic(self._impl, dynamic))
727 728 dynamic = property(_is_dynamic, _set_dynamic) 729
730 - def _get_distribution_mode(self):
731 return pn_terminus_get_distribution_mode(self._impl)
732
733 - def _set_distribution_mode(self, mode):
734 self._check(pn_terminus_set_distribution_mode(self._impl, mode))
735 736 distribution_mode = property(_get_distribution_mode, _set_distribution_mode) 737 738 @property
739 - def properties(self):
740 """Properties of a dynamic source or target.""" 741 return Data(pn_terminus_properties(self._impl))
742 743 @property
744 - def capabilities(self):
745 """Capabilities of the source or target.""" 746 return Data(pn_terminus_capabilities(self._impl))
747 748 @property
749 - def outcomes(self):
750 return Data(pn_terminus_outcomes(self._impl))
751 752 @property
753 - def filter(self):
754 """A filter on a source allows the set of messages transfered over 755 the link to be restricted""" 756 return Data(pn_terminus_filter(self._impl))
757
758 - def copy(self, src):
759 self._check(pn_terminus_copy(self._impl, src._impl))
760