Package proton :: Module utils
[frames] | no frames]

Source Code for Module proton.utils

  1  # 
  2  # Licensed to the Apache Software Foundation (ASF) under one 
  3  # or more contributor license agreements.  See the NOTICE file 
  4  # distributed with this work for additional information 
  5  # regarding copyright ownership.  The ASF licenses this file 
  6  # to you under the Apache License, Version 2.0 (the 
  7  # "License"); you may not use this file except in compliance 
  8  # with the License.  You may obtain a copy of the License at 
  9  # 
 10  #   http://www.apache.org/licenses/LICENSE-2.0 
 11  # 
 12  # Unless required by applicable law or agreed to in writing, 
 13  # software distributed under the License is distributed on an 
 14  # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
 15  # KIND, either express or implied.  See the License for the 
 16  # specific language governing permissions and limitations 
 17  # under the License. 
 18  # 
 19  import collections, socket, time, threading 
 20   
 21  from proton import ConnectionException, Delivery, Endpoint, Handler, Link, LinkException, Message 
 22  from proton import ProtonException, Timeout, Url 
 23  from proton.reactor import Container 
 24  from proton.handlers import MessagingHandler, IncomingMessageHandler 
 25  from cproton import pn_reactor_collector, pn_collector_release 
 57   
58 -class SendException(ProtonException):
59 """ 60 Exception used to indicate an exceptional state/condition on a send request 61 """
62 - def __init__(self, state):
63 self.state = state
64
65 -def _is_settled(delivery):
66 return delivery.settled or delivery.link.snd_settle_mode == Link.SND_SETTLED
67
68 -class BlockingSender(BlockingLink):
69 - def __init__(self, connection, sender):
70 super(BlockingSender, self).__init__(connection, sender) 71 if self.link.target and self.link.target.address and self.link.target.address != self.link.remote_target.address: 72 #this may be followed by a detach, which may contain an error condition, so wait a little... 73 self._waitForClose() 74 #...but close ourselves if peer does not 75 self.link.close() 76 raise LinkException("Failed to open sender %s, target does not match" % self.link.name)
77
78 - def send(self, msg, timeout=False, error_states=None):
79 delivery = self.link.send(msg) 80 self.connection.wait(lambda: _is_settled(delivery), msg="Sending on sender %s" % self.link.name, timeout=timeout) 81 if delivery.link.snd_settle_mode != Link.SND_SETTLED: 82 delivery.settle() 83 bad = error_states 84 if bad is None: 85 bad = [Delivery.REJECTED, Delivery.RELEASED] 86 if delivery.remote_state in bad: 87 raise SendException(delivery.remote_state) 88 return delivery
89
90 -class Fetcher(MessagingHandler):
91 - def __init__(self, connection, prefetch):
92 super(Fetcher, self).__init__(prefetch=prefetch, auto_accept=False) 93 self.connection = connection 94 self.incoming = collections.deque([]) 95 self.unsettled = collections.deque([])
96
97 - def on_message(self, event):
98 self.incoming.append((event.message, event.delivery)) 99 self.connection.container.yield_() # Wake up the wait() loop to handle the message.
100 106
107 - def on_connection_error(self, event):
108 if not self.connection.closing: 109 raise ConnectionClosed(event.connection)
110 111 @property
112 - def has_message(self):
113 return len(self.incoming)
114
115 - def pop(self):
116 message, delivery = self.incoming.popleft() 117 if not delivery.settled: 118 self.unsettled.append(delivery) 119 return message
120
121 - def settle(self, state=None):
122 delivery = self.unsettled.popleft() 123 if state: 124 delivery.update(state) 125 delivery.settle()
126
127 128 -class BlockingReceiver(BlockingLink):
129 - def __init__(self, connection, receiver, fetcher, credit=1):
130 super(BlockingReceiver, self).__init__(connection, receiver) 131 if self.link.source and self.link.source.address and self.link.source.address != self.link.remote_source.address: 132 #this may be followed by a detach, which may contain an error condition, so wait a little... 133 self._waitForClose() 134 #...but close ourselves if peer does not 135 self.link.close() 136 raise LinkException("Failed to open receiver %s, source does not match" % self.link.name) 137 if credit: receiver.flow(credit) 138 self.fetcher = fetcher 139 self.container = connection.container
140
141 - def __del__(self):
142 self.fetcher = None 143 # The next line causes a core dump if the Proton-C reactor finalizes 144 # first. The self.container reference prevents out of order reactor 145 # finalization. It may not be set if exception in BlockingLink.__init__ 146 if hasattr(self, "container"): 147 self.link.handler = None # implicit call to reactor
148
149 - def receive(self, timeout=False):
150 if not self.fetcher: 151 raise Exception("Can't call receive on this receiver as a handler was provided") 152 if not self.link.credit: 153 self.link.flow(1) 154 self.connection.wait(lambda: self.fetcher.has_message, msg="Receiving on receiver %s" % self.link.name, timeout=timeout) 155 return self.fetcher.pop()
156
157 - def accept(self):
159
160 - def reject(self):
162
163 - def release(self, delivered=True):
164 if delivered: 165 self.settle(Delivery.MODIFIED) 166 else: 167 self.settle(Delivery.RELEASED)
168
169 - def settle(self, state=None):
170 if not self.fetcher: 171 raise Exception("Can't call accept/reject etc on this receiver as a handler was provided") 172 self.fetcher.settle(state)
173
174 175 -class LinkDetached(LinkException):
176 - def __init__(self, link):
177 self.link = link 178 if link.is_sender: 179 txt = "sender %s to %s closed" % (link.name, link.target.address) 180 else: 181 txt = "receiver %s from %s closed" % (link.name, link.source.address) 182 if link.remote_condition: 183 txt += " due to: %s" % link.remote_condition 184 self.condition = link.remote_condition.name 185 else: 186 txt += " by peer" 187 self.condition = None 188 super(LinkDetached, self).__init__(txt)
189
190 191 -class ConnectionClosed(ConnectionException):
192 - def __init__(self, connection):
193 self.connection = connection 194 txt = "Connection %s closed" % connection.hostname 195 if connection.remote_condition: 196 txt += " due to: %s" % connection.remote_condition 197 self.condition = connection.remote_condition.name 198 else: 199 txt += " by peer" 200 self.condition = None 201 super(ConnectionClosed, self).__init__(txt)
202
203 204 -class BlockingConnection(Handler):
205 """ 206 A synchronous style connection wrapper. 207 208 This object's implementation uses OS resources. To ensure they 209 are released when the object is no longer in use, make sure that 210 object operations are enclosed in a try block and that close() is 211 always executed on exit. 212 """
213 - def __init__(self, url, timeout=None, container=None, ssl_domain=None, heartbeat=None, **kwargs):
214 self.disconnected = False 215 self.timeout = timeout or 60 216 self.container = container or Container() 217 self.container.timeout = self.timeout 218 self.container.start() 219 self.url = Url(url).defaults() 220 self.conn = None 221 self.closing = False 222 failed = True 223 try: 224 self.conn = self.container.connect(url=self.url, handler=self, ssl_domain=ssl_domain, reconnect=False, heartbeat=heartbeat, **kwargs) 225 self.wait(lambda: not (self.conn.state & Endpoint.REMOTE_UNINIT), 226 msg="Opening connection") 227 failed = False 228 finally: 229 if failed and self.conn: 230 self.close()
231
232 - def create_sender(self, address, handler=None, name=None, options=None):
233 return BlockingSender(self, self.container.create_sender(self.conn, address, name=name, handler=handler, options=options))
234
235 - def create_receiver(self, address, credit=None, dynamic=False, handler=None, name=None, options=None):
236 prefetch = credit 237 if handler: 238 fetcher = None 239 if prefetch is None: 240 prefetch = 1 241 else: 242 fetcher = Fetcher(self, credit) 243 return BlockingReceiver( 244 self, self.container.create_receiver(self.conn, address, name=name, dynamic=dynamic, handler=handler or fetcher, options=options), fetcher, credit=prefetch)
245
246 - def close(self):
247 # TODO: provide stronger interrupt protection on cleanup. See PEP 419 248 if self.closing: 249 return 250 self.closing = True 251 self.container.errors = [] 252 try: 253 if self.conn: 254 self.conn.close() 255 self.wait(lambda: not (self.conn.state & Endpoint.REMOTE_ACTIVE), 256 msg="Closing connection") 257 finally: 258 self.conn.free() 259 # Nothing left to block on. Allow reactor to clean up. 260 self.run() 261 self.conn = None 262 self.container.global_handler = None # break circular ref: container to cadapter.on_error 263 pn_collector_release(pn_reactor_collector(self.container._impl)) # straggling event may keep reactor alive 264 self.container = None
265
266 - def _is_closed(self):
267 return self.conn.state & (Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_CLOSED)
268
269 - def run(self):
270 """ Hand control over to the event loop (e.g. if waiting indefinitely for incoming messages) """ 271 while self.container.process(): pass 272 self.container.stop() 273 self.container.process()
274
275 - def wait(self, condition, timeout=False, msg=None):
276 """Call process until condition() is true""" 277 if timeout is False: 278 timeout = self.timeout 279 if timeout is None: 280 while not condition() and not self.disconnected: 281 self.container.process() 282 else: 283 container_timeout = self.container.timeout 284 self.container.timeout = timeout 285 try: 286 deadline = time.time() + timeout 287 while not condition() and not self.disconnected: 288 self.container.process() 289 if deadline < time.time(): 290 txt = "Connection %s timed out" % self.url 291 if msg: txt += ": " + msg 292 raise Timeout(txt) 293 finally: 294 self.container.timeout = container_timeout 295 if self.disconnected or self._is_closed(): 296 self.container.stop() 297 self.conn.handler = None # break cyclical reference 298 if self.disconnected and not self._is_closed(): 299 raise ConnectionException( 300 "Connection %s disconnected: %s" % (self.url, self.disconnected))
301 307
308 - def on_connection_remote_close(self, event):
309 if event.connection.state & Endpoint.LOCAL_ACTIVE: 310 event.connection.close() 311 if not self.closing: 312 raise ConnectionClosed(event.connection)
313
314 - def on_transport_tail_closed(self, event):
315 self.on_transport_closed(event)
316
317 - def on_transport_head_closed(self, event):
318 self.on_transport_closed(event)
319
320 - def on_transport_closed(self, event):
321 self.disconnected = event.transport.condition or "unknown"
322
323 -class AtomicCount(object):
324 - def __init__(self, start=0, step=1):
325 """Thread-safe atomic counter. Start at start, increment by step.""" 326 self.count, self.step = start, step 327 self.lock = threading.Lock()
328
329 - def next(self):
330 """Get the next value""" 331 self.lock.acquire() 332 self.count += self.step; 333 result = self.count 334 self.lock.release() 335 return result
336
337 -class SyncRequestResponse(IncomingMessageHandler):
338 """ 339 Implementation of the synchronous request-response (aka RPC) pattern. 340 @ivar address: Address for all requests, may be None. 341 @ivar connection: Connection for requests and responses. 342 """ 343 344 correlation_id = AtomicCount() 345
346 - def __init__(self, connection, address=None):
347 """ 348 Send requests and receive responses. A single instance can send many requests 349 to the same or different addresses. 350 351 @param connection: A L{BlockingConnection} 352 @param address: Address for all requests. 353 If not specified, each request must have the address property set. 354 Successive messages may have different addresses. 355 """ 356 super(SyncRequestResponse, self).__init__() 357 self.connection = connection 358 self.address = address 359 self.sender = self.connection.create_sender(self.address) 360 # dynamic=true generates a unique address dynamically for this receiver. 361 # credit=1 because we want to receive 1 response message initially. 362 self.receiver = self.connection.create_receiver(None, dynamic=True, credit=1, handler=self) 363 self.response = None
364
365 - def call(self, request):
366 """ 367 Send a request message, wait for and return the response message. 368 369 @param request: A L{proton.Message}. If L{self.address} is not set the 370 L{self.address} must be set and will be used. 371 """ 372 if not self.address and not request.address: 373 raise ValueError("Request message has no address: %s" % request) 374 request.reply_to = self.reply_to 375 request.correlation_id = correlation_id = str(self.correlation_id.next()) 376 self.sender.send(request) 377 def wakeup(): 378 return self.response and (self.response.correlation_id == correlation_id)
379 self.connection.wait(wakeup, msg="Waiting for response") 380 response = self.response 381 self.response = None # Ready for next response. 382 self.receiver.flow(1) # Set up credit for the next response. 383 return response
384 385 @property
386 - def reply_to(self):
387 """Return the dynamic address of our receiver.""" 388 return self.receiver.remote_source.address
389
390 - def on_message(self, event):
391 """Called when we receive a message for our receiver.""" 392 self.response = event.message 393 self.connection.container.yield_() # Wake up the wait() loop to handle the message.
394