Package proton :: Module _utils
[frames] | no frames]

Source Code for Module proton._utils

  1  # 
  2  # Licensed to the Apache Software Foundation (ASF) under one 
  3  # or more contributor license agreements.  See the NOTICE file 
  4  # distributed with this work for additional information 
  5  # regarding copyright ownership.  The ASF licenses this file 
  6  # to you under the Apache License, Version 2.0 (the 
  7  # "License"); you may not use this file except in compliance 
  8  # with the License.  You may obtain a copy of the License at 
  9  # 
 10  #   http://www.apache.org/licenses/LICENSE-2.0 
 11  # 
 12  # Unless required by applicable law or agreed to in writing, 
 13  # software distributed under the License is distributed on an 
 14  # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
 15  # KIND, either express or implied.  See the License for the 
 16  # specific language governing permissions and limitations 
 17  # under the License. 
 18  # 
 19   
 20  from __future__ import absolute_import 
 21   
 22  import collections 
 23  import time 
 24  import threading 
 25   
 26  from ._exceptions import ProtonException, ConnectionException, LinkException, Timeout 
 27  from ._delivery import Delivery 
 28  from ._endpoints import Endpoint, Link 
 29  from ._events import Handler 
 30  from ._url import Url 
 31   
 32  from ._reactor import Container 
 33  from ._handlers import MessagingHandler, IncomingMessageHandler 
 67   
68 69 -class SendException(ProtonException):
70 """ 71 Exception used to indicate an exceptional state/condition on a send request 72 """ 73
74 - def __init__(self, state):
75 self.state = state
76
77 78 -def _is_settled(delivery):
79 return delivery.settled or delivery.link.snd_settle_mode == Link.SND_SETTLED
80
81 82 -class BlockingSender(BlockingLink):
83 - def __init__(self, connection, sender):
84 super(BlockingSender, self).__init__(connection, sender) 85 if self.link.target and self.link.target.address and self.link.target.address != self.link.remote_target.address: 86 # this may be followed by a detach, which may contain an error condition, so wait a little... 87 self._waitForClose() 88 # ...but close ourselves if peer does not 89 self.link.close() 90 raise LinkException("Failed to open sender %s, target does not match" % self.link.name)
91
92 - def send(self, msg, timeout=False, error_states=None):
93 delivery = self.link.send(msg) 94 self.connection.wait(lambda: _is_settled(delivery), msg="Sending on sender %s" % self.link.name, 95 timeout=timeout) 96 if delivery.link.snd_settle_mode != Link.SND_SETTLED: 97 delivery.settle() 98 bad = error_states 99 if bad is None: 100 bad = [Delivery.REJECTED, Delivery.RELEASED] 101 if delivery.remote_state in bad: 102 raise SendException(delivery.remote_state) 103 return delivery
104
105 106 -class Fetcher(MessagingHandler):
107 - def __init__(self, connection, prefetch):
108 super(Fetcher, self).__init__(prefetch=prefetch, auto_accept=False) 109 self.connection = connection 110 self.incoming = collections.deque([]) 111 self.unsettled = collections.deque([])
112
113 - def on_message(self, event):
114 self.incoming.append((event.message, event.delivery)) 115 self.connection.container.yield_() # Wake up the wait() loop to handle the message.
116 122
123 - def on_connection_error(self, event):
124 if not self.connection.closing: 125 raise ConnectionClosed(event.connection)
126 127 @property
128 - def has_message(self):
129 return len(self.incoming)
130
131 - def pop(self):
132 message, delivery = self.incoming.popleft() 133 if not delivery.settled: 134 self.unsettled.append(delivery) 135 return message
136
137 - def settle(self, state=None):
138 delivery = self.unsettled.popleft() 139 if state: 140 delivery.update(state) 141 delivery.settle()
142
143 144 -class BlockingReceiver(BlockingLink):
145 - def __init__(self, connection, receiver, fetcher, credit=1):
146 super(BlockingReceiver, self).__init__(connection, receiver) 147 if self.link.source and self.link.source.address and self.link.source.address != self.link.remote_source.address: 148 # this may be followed by a detach, which may contain an error condition, so wait a little... 149 self._waitForClose() 150 # ...but close ourselves if peer does not 151 self.link.close() 152 raise LinkException("Failed to open receiver %s, source does not match" % self.link.name) 153 if credit: receiver.flow(credit) 154 self.fetcher = fetcher 155 self.container = connection.container
156
157 - def __del__(self):
158 self.fetcher = None 159 # The next line causes a core dump if the Proton-C reactor finalizes 160 # first. The self.container reference prevents out of order reactor 161 # finalization. It may not be set if exception in BlockingLink.__init__ 162 if hasattr(self, "container"): 163 self.link.handler = None # implicit call to reactor
164
165 - def receive(self, timeout=False):
166 if not self.fetcher: 167 raise Exception("Can't call receive on this receiver as a handler was provided") 168 if not self.link.credit: 169 self.link.flow(1) 170 self.connection.wait(lambda: self.fetcher.has_message, msg="Receiving on receiver %s" % self.link.name, 171 timeout=timeout) 172 return self.fetcher.pop()
173
174 - def accept(self):
176
177 - def reject(self):
179
180 - def release(self, delivered=True):
181 if delivered: 182 self.settle(Delivery.MODIFIED) 183 else: 184 self.settle(Delivery.RELEASED)
185
186 - def settle(self, state=None):
187 if not self.fetcher: 188 raise Exception("Can't call accept/reject etc on this receiver as a handler was provided") 189 self.fetcher.settle(state)
190
191 192 -class LinkDetached(LinkException):
193 - def __init__(self, link):
194 self.link = link 195 if link.is_sender: 196 txt = "sender %s to %s closed" % (link.name, link.target.address) 197 else: 198 txt = "receiver %s from %s closed" % (link.name, link.source.address) 199 if link.remote_condition: 200 txt += " due to: %s" % link.remote_condition 201 self.condition = link.remote_condition.name 202 else: 203 txt += " by peer" 204 self.condition = None 205 super(LinkDetached, self).__init__(txt)
206
207 208 -class ConnectionClosed(ConnectionException):
209 - def __init__(self, connection):
210 self.connection = connection 211 txt = "Connection %s closed" % connection.hostname 212 if connection.remote_condition: 213 txt += " due to: %s" % connection.remote_condition 214 self.condition = connection.remote_condition.name 215 else: 216 txt += " by peer" 217 self.condition = None 218 super(ConnectionClosed, self).__init__(txt)
219
220 221 -class BlockingConnection(Handler):
222 """ 223 A synchronous style connection wrapper. 224 225 This object's implementation uses OS resources. To ensure they 226 are released when the object is no longer in use, make sure that 227 object operations are enclosed in a try block and that close() is 228 always executed on exit. 229 """ 230
231 - def __init__(self, url, timeout=None, container=None, ssl_domain=None, heartbeat=None, **kwargs):
232 self.disconnected = False 233 self.timeout = timeout or 60 234 self.container = container or Container() 235 self.container.timeout = self.timeout 236 self.container.start() 237 self.url = Url(url).defaults() 238 self.conn = None 239 self.closing = False 240 failed = True 241 try: 242 self.conn = self.container.connect(url=self.url, handler=self, ssl_domain=ssl_domain, reconnect=False, 243 heartbeat=heartbeat, **kwargs) 244 self.wait(lambda: not (self.conn.state & Endpoint.REMOTE_UNINIT), 245 msg="Opening connection") 246 failed = False 247 finally: 248 if failed and self.conn: 249 self.close()
250
251 - def create_sender(self, address, handler=None, name=None, options=None):
252 return BlockingSender(self, self.container.create_sender(self.conn, address, name=name, handler=handler, 253 options=options))
254
255 - def create_receiver(self, address, credit=None, dynamic=False, handler=None, name=None, options=None):
256 prefetch = credit 257 if handler: 258 fetcher = None 259 if prefetch is None: 260 prefetch = 1 261 else: 262 fetcher = Fetcher(self, credit) 263 return BlockingReceiver( 264 self, 265 self.container.create_receiver(self.conn, address, name=name, dynamic=dynamic, handler=handler or fetcher, 266 options=options), fetcher, credit=prefetch)
267
268 - def close(self):
269 # TODO: provide stronger interrupt protection on cleanup. See PEP 419 270 if self.closing: 271 return 272 self.closing = True 273 self.container.errors = [] 274 try: 275 if self.conn: 276 self.conn.close() 277 self.wait(lambda: not (self.conn.state & Endpoint.REMOTE_ACTIVE), 278 msg="Closing connection") 279 finally: 280 self.conn.free() 281 # Nothing left to block on. Allow reactor to clean up. 282 self.run() 283 self.conn = None 284 self.container.global_handler = None # break circular ref: container to cadapter.on_error 285 self.container.stop_events() 286 self.container = None
287
288 - def _is_closed(self):
289 return self.conn.state & (Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_CLOSED)
290
291 - def run(self):
292 """ Hand control over to the event loop (e.g. if waiting indefinitely for incoming messages) """ 293 while self.container.process(): pass 294 self.container.stop() 295 self.container.process()
296
297 - def wait(self, condition, timeout=False, msg=None):
298 """Call process until condition() is true""" 299 if timeout is False: 300 timeout = self.timeout 301 if timeout is None: 302 while not condition() and not self.disconnected: 303 self.container.process() 304 else: 305 container_timeout = self.container.timeout 306 self.container.timeout = timeout 307 try: 308 deadline = time.time() + timeout 309 while not condition() and not self.disconnected: 310 self.container.process() 311 if deadline < time.time(): 312 txt = "Connection %s timed out" % self.url 313 if msg: txt += ": " + msg 314 raise Timeout(txt) 315 finally: 316 self.container.timeout = container_timeout 317 if self.disconnected or self._is_closed(): 318 self.container.stop() 319 self.conn.handler = None # break cyclical reference 320 if self.disconnected and not self._is_closed(): 321 raise ConnectionException( 322 "Connection %s disconnected: %s" % (self.url, self.disconnected))
323 329
330 - def on_connection_remote_close(self, event):
331 if event.connection.state & Endpoint.LOCAL_ACTIVE: 332 event.connection.close() 333 if not self.closing: 334 raise ConnectionClosed(event.connection)
335
336 - def on_transport_tail_closed(self, event):
337 self.on_transport_closed(event)
338
339 - def on_transport_head_closed(self, event):
340 self.on_transport_closed(event)
341
342 - def on_transport_closed(self, event):
343 self.disconnected = event.transport.condition or "unknown"
344
345 346 -class AtomicCount(object):
347 - def __init__(self, start=0, step=1):
348 """Thread-safe atomic counter. Start at start, increment by step.""" 349 self.count, self.step = start, step 350 self.lock = threading.Lock()
351
352 - def next(self):
353 """Get the next value""" 354 self.lock.acquire() 355 self.count += self.step; 356 result = self.count 357 self.lock.release() 358 return result
359
360 361 -class SyncRequestResponse(IncomingMessageHandler):
362 """ 363 Implementation of the synchronous request-response (aka RPC) pattern. 364 @ivar address: Address for all requests, may be None. 365 @ivar connection: Connection for requests and responses. 366 """ 367 368 correlation_id = AtomicCount() 369
370 - def __init__(self, connection, address=None):
371 """ 372 Send requests and receive responses. A single instance can send many requests 373 to the same or different addresses. 374 375 @param connection: A L{BlockingConnection} 376 @param address: Address for all requests. 377 If not specified, each request must have the address property set. 378 Successive messages may have different addresses. 379 """ 380 super(SyncRequestResponse, self).__init__() 381 self.connection = connection 382 self.address = address 383 self.sender = self.connection.create_sender(self.address) 384 # dynamic=true generates a unique address dynamically for this receiver. 385 # credit=1 because we want to receive 1 response message initially. 386 self.receiver = self.connection.create_receiver(None, dynamic=True, credit=1, handler=self) 387 self.response = None
388
389 - def call(self, request):
390 """ 391 Send a request message, wait for and return the response message. 392 393 @param request: A L{proton.Message}. If L{self.address} is not set the 394 L{self.address} must be set and will be used. 395 """ 396 if not self.address and not request.address: 397 raise ValueError("Request message has no address: %s" % request) 398 request.reply_to = self.reply_to 399 request.correlation_id = correlation_id = str(self.correlation_id.next()) 400 self.sender.send(request) 401 402 def wakeup(): 403 return self.response and (self.response.correlation_id == correlation_id)
404 405 self.connection.wait(wakeup, msg="Waiting for response") 406 response = self.response 407 self.response = None # Ready for next response. 408 self.receiver.flow(1) # Set up credit for the next response. 409 return response
410 411 @property
412 - def reply_to(self):
413 """Return the dynamic address of our receiver.""" 414 return self.receiver.remote_source.address
415
416 - def on_message(self, event):
417 """Called when we receive a message for our receiver.""" 418 self.response = event.message 419 self.connection.container.yield_() # Wake up the wait() loop to handle the message.
420