Package proton :: Module handlers
[frames] | no frames]

Source Code for Module proton.handlers

  1  # 
  2  # Licensed to the Apache Software Foundation (ASF) under one 
  3  # or more contributor license agreements.  See the NOTICE file 
  4  # distributed with this work for additional information 
  5  # regarding copyright ownership.  The ASF licenses this file 
  6  # to you under the Apache License, Version 2.0 (the 
  7  # "License"); you may not use this file except in compliance 
  8  # with the License.  You may obtain a copy of the License at 
  9  # 
 10  #   http://www.apache.org/licenses/LICENSE-2.0 
 11  # 
 12  # Unless required by applicable law or agreed to in writing, 
 13  # software distributed under the License is distributed on an 
 14  # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
 15  # KIND, either express or implied.  See the License for the 
 16  # specific language governing permissions and limitations 
 17  # under the License. 
 18  # 
 19  import heapq, logging, os, re, socket, time, types, weakref 
 20   
 21  from proton import dispatch, generate_uuid, PN_ACCEPTED, SASL, symbol, ulong, Url 
 22  from proton import Collector, Connection, Delivery, Described, Endpoint, Event, Link, Terminus, Timeout 
 23  from proton import Message, Handler, ProtonException, Transport, TransportException, ConnectionException 
 24  from select import select 
 25   
 26  log = logging.getLogger("proton") 
27 28 -class OutgoingMessageHandler(Handler):
29 """ 30 A utility for simpler and more intuitive handling of delivery 31 events related to outgoing i.e. sent messages. 32 """
33 - def __init__(self, auto_settle=True, delegate=None):
34 self.auto_settle = auto_settle 35 self.delegate = delegate
36 42
43 - def on_delivery(self, event):
44 dlv = event.delivery 45 if dlv.link.is_sender and dlv.updated: 46 if dlv.remote_state == Delivery.ACCEPTED: 47 self.on_accepted(event) 48 elif dlv.remote_state == Delivery.REJECTED: 49 self.on_rejected(event) 50 elif dlv.remote_state == Delivery.RELEASED or dlv.remote_state == Delivery.MODIFIED: 51 self.on_released(event) 52 if dlv.settled: 53 self.on_settled(event) 54 if self.auto_settle: 55 dlv.settle()
56
57 - def on_sendable(self, event):
58 """ 59 Called when the sender link has credit and messages can 60 therefore be transferred. 61 """ 62 if self.delegate != None: 63 dispatch(self.delegate, 'on_sendable', event)
64
65 - def on_accepted(self, event):
66 """ 67 Called when the remote peer accepts an outgoing message. 68 """ 69 if self.delegate != None: 70 dispatch(self.delegate, 'on_accepted', event)
71
72 - def on_rejected(self, event):
73 """ 74 Called when the remote peer rejects an outgoing message. 75 """ 76 if self.delegate != None: 77 dispatch(self.delegate, 'on_rejected', event)
78
79 - def on_released(self, event):
80 """ 81 Called when the remote peer releases an outgoing message. Note 82 that this may be in response to either the RELEASE or MODIFIED 83 state as defined by the AMQP specification. 84 """ 85 if self.delegate != None: 86 dispatch(self.delegate, 'on_released', event)
87
88 - def on_settled(self, event):
89 """ 90 Called when the remote peer has settled the outgoing 91 message. This is the point at which it should never be 92 retransmitted. 93 """ 94 if self.delegate != None: 95 dispatch(self.delegate, 'on_settled', event)
96
97 -def recv_msg(delivery):
98 msg = Message() 99 msg.decode(delivery.link.recv(delivery.pending)) 100 delivery.link.advance() 101 return msg
102
103 -class Reject(ProtonException):
104 """ 105 An exception that indicate a message should be rejected 106 """ 107 pass
108
109 -class Release(ProtonException):
110 """ 111 An exception that indicate a message should be rejected 112 """ 113 pass
114
115 -class Acking(object):
116 - def accept(self, delivery):
117 """ 118 Accepts a received message. 119 120 Note that this method cannot currently be used in combination 121 with transactions. 122 """ 123 self.settle(delivery, Delivery.ACCEPTED)
124
125 - def reject(self, delivery):
126 """ 127 Rejects a received message that is considered invalid or 128 unprocessable. 129 """ 130 self.settle(delivery, Delivery.REJECTED)
131
132 - def release(self, delivery, delivered=True):
133 """ 134 Releases a received message, making it available at the source 135 for any (other) interested receiver. The ``delivered`` 136 parameter indicates whether this should be considered a 137 delivery attempt (and the delivery count updated) or not. 138 """ 139 if delivered: 140 self.settle(delivery, Delivery.MODIFIED) 141 else: 142 self.settle(delivery, Delivery.RELEASED)
143
144 - def settle(self, delivery, state=None):
148
149 -class IncomingMessageHandler(Handler, Acking):
150 """ 151 A utility for simpler and more intuitive handling of delivery 152 events related to incoming i.e. received messages. 153 """ 154
155 - def __init__(self, auto_accept=True, delegate=None):
156 self.delegate = delegate 157 self.auto_accept = auto_accept
158
159 - def on_delivery(self, event):
160 dlv = event.delivery 161 if not dlv.link.is_receiver: return 162 if dlv.aborted: 163 self.on_aborted(event) 164 dlv.settle() 165 elif dlv.readable and not dlv.partial: 166 event.message = recv_msg(dlv) 167 if event.link.state & Endpoint.LOCAL_CLOSED: 168 if self.auto_accept: 169 dlv.update(Delivery.RELEASED) 170 dlv.settle() 171 else: 172 try: 173 self.on_message(event) 174 if self.auto_accept: 175 dlv.update(Delivery.ACCEPTED) 176 dlv.settle() 177 except Reject: 178 dlv.update(Delivery.REJECTED) 179 dlv.settle() 180 except Release: 181 dlv.update(Delivery.MODIFIED) 182 dlv.settle() 183 elif dlv.updated and dlv.settled: 184 self.on_settled(event)
185
186 - def on_message(self, event):
187 """ 188 Called when a message is received. The message itself can be 189 obtained as a property on the event. For the purpose of 190 referring to this message in further actions (e.g. if 191 explicitly accepting it, the ``delivery`` should be used, also 192 obtainable via a property on the event. 193 """ 194 if self.delegate != None: 195 dispatch(self.delegate, 'on_message', event)
196
197 - def on_settled(self, event):
198 if self.delegate != None: 199 dispatch(self.delegate, 'on_settled', event)
200
201 - def on_aborted(self, event):
202 if self.delegate != None: 203 dispatch(self.delegate, 'on_aborted', event)
204
205 -class EndpointStateHandler(Handler):
206 """ 207 A utility that exposes 'endpoint' events i.e. the open/close for 208 links, sessions and connections in a more intuitive manner. A 209 XXX_opened method will be called when both local and remote peers 210 have opened the link, session or connection. This can be used to 211 confirm a locally initiated action for example. A XXX_opening 212 method will be called when the remote peer has requested an open 213 that was not initiated locally. By default this will simply open 214 locally, which then triggers the XXX_opened call. The same applies 215 to close. 216 """ 217
218 - def __init__(self, peer_close_is_error=False, delegate=None):
219 self.delegate = delegate 220 self.peer_close_is_error = peer_close_is_error
221 222 @classmethod
223 - def is_local_open(cls, endpoint):
224 return endpoint.state & Endpoint.LOCAL_ACTIVE
225 226 @classmethod
227 - def is_local_uninitialised(cls, endpoint):
228 return endpoint.state & Endpoint.LOCAL_UNINIT
229 230 @classmethod
231 - def is_local_closed(cls, endpoint):
232 return endpoint.state & Endpoint.LOCAL_CLOSED
233 234 @classmethod
235 - def is_remote_open(cls, endpoint):
236 return endpoint.state & Endpoint.REMOTE_ACTIVE
237 238 @classmethod
239 - def is_remote_closed(cls, endpoint):
240 return endpoint.state & Endpoint.REMOTE_CLOSED
241 242 @classmethod
243 - def print_error(cls, endpoint, endpoint_type):
244 if endpoint.remote_condition: 245 log.error(endpoint.remote_condition.description or endpoint.remote_condition.name) 246 elif cls.is_local_open(endpoint) and cls.is_remote_closed(endpoint): 247 log.error("%s closed by peer" % endpoint_type)
248 257
258 - def on_session_remote_close(self, event):
259 if event.session.remote_condition: 260 self.on_session_error(event) 261 elif self.is_local_closed(event.session): 262 self.on_session_closed(event) 263 else: 264 self.on_session_closing(event) 265 event.session.close()
266
267 - def on_connection_remote_close(self, event):
268 if event.connection.remote_condition: 269 if event.connection.remote_condition.name == "amqp:connection:forced": 270 # Treat this the same as just having the transport closed by the peer without 271 # sending any events. Allow reconnection to happen transparently. 272 return 273 self.on_connection_error(event) 274 elif self.is_local_closed(event.connection): 275 self.on_connection_closed(event) 276 else: 277 self.on_connection_closing(event) 278 event.connection.close()
279
280 - def on_connection_local_open(self, event):
281 if self.is_remote_open(event.connection): 282 self.on_connection_opened(event)
283
284 - def on_connection_remote_open(self, event):
285 if self.is_local_open(event.connection): 286 self.on_connection_opened(event) 287 elif self.is_local_uninitialised(event.connection): 288 self.on_connection_opening(event) 289 event.connection.open()
290
291 - def on_session_local_open(self, event):
292 if self.is_remote_open(event.session): 293 self.on_session_opened(event)
294
295 - def on_session_remote_open(self, event):
296 if self.is_local_open(event.session): 297 self.on_session_opened(event) 298 elif self.is_local_uninitialised(event.session): 299 self.on_session_opening(event) 300 event.session.open()
301 305 312
313 - def on_connection_opened(self, event):
314 if self.delegate != None: 315 dispatch(self.delegate, 'on_connection_opened', event)
316
317 - def on_session_opened(self, event):
318 if self.delegate != None: 319 dispatch(self.delegate, 'on_session_opened', event)
320 324
325 - def on_connection_opening(self, event):
326 if self.delegate != None: 327 dispatch(self.delegate, 'on_connection_opening', event)
328
329 - def on_session_opening(self, event):
330 if self.delegate != None: 331 dispatch(self.delegate, 'on_session_opening', event)
332 336
337 - def on_connection_error(self, event):
338 if self.delegate != None: 339 dispatch(self.delegate, 'on_connection_error', event) 340 else: 341 self.log_error(event.connection, "connection")
342
343 - def on_session_error(self, event):
344 if self.delegate != None: 345 dispatch(self.delegate, 'on_session_error', event) 346 else: 347 self.log_error(event.session, "session") 348 event.connection.close()
349 356
357 - def on_connection_closed(self, event):
358 if self.delegate != None: 359 dispatch(self.delegate, 'on_connection_closed', event)
360
361 - def on_session_closed(self, event):
362 if self.delegate != None: 363 dispatch(self.delegate, 'on_session_closed', event)
364 368
369 - def on_connection_closing(self, event):
370 if self.delegate != None: 371 dispatch(self.delegate, 'on_connection_closing', event) 372 elif self.peer_close_is_error: 373 self.on_connection_error(event)
374
375 - def on_session_closing(self, event):
376 if self.delegate != None: 377 dispatch(self.delegate, 'on_session_closing', event) 378 elif self.peer_close_is_error: 379 self.on_session_error(event)
380 386
387 - def on_transport_tail_closed(self, event):
388 self.on_transport_closed(event)
389
390 - def on_transport_closed(self, event):
391 if self.delegate != None and event.connection and self.is_local_open(event.connection): 392 dispatch(self.delegate, 'on_disconnected', event)
393
394 -class MessagingHandler(Handler, Acking):
395 """ 396 A general purpose handler that makes the proton-c events somewhat 397 simpler to deal with and/or avoids repetitive tasks for common use 398 cases. 399 """
400 - def __init__(self, prefetch=10, auto_accept=True, auto_settle=True, peer_close_is_error=False):
401 self.handlers = [] 402 if prefetch: 403 self.handlers.append(CFlowController(prefetch)) 404 self.handlers.append(EndpointStateHandler(peer_close_is_error, weakref.proxy(self))) 405 self.handlers.append(IncomingMessageHandler(auto_accept, weakref.proxy(self))) 406 self.handlers.append(OutgoingMessageHandler(auto_settle, weakref.proxy(self))) 407 self.fatal_conditions = ["amqp:unauthorized-access"]
408
409 - def on_transport_error(self, event):
410 """ 411 Called when some error is encountered with the transport over 412 which the AMQP connection is to be established. This includes 413 authentication errors as well as socket errors. 414 """ 415 if event.transport.condition: 416 if event.transport.condition.info: 417 log.error("%s: %s: %s" % (event.transport.condition.name, event.transport.condition.description, event.transport.condition.info)) 418 else: 419 log.error("%s: %s" % (event.transport.condition.name, event.transport.condition.description)) 420 if event.transport.condition.name in self.fatal_conditions: 421 event.connection.close() 422 else: 423 logging.error("Unspecified transport error")
424
425 - def on_connection_error(self, event):
426 """ 427 Called when the peer closes the connection with an error condition. 428 """ 429 EndpointStateHandler.print_error(event.connection, "connection")
430
431 - def on_session_error(self, event):
432 """ 433 Called when the peer closes the session with an error condition. 434 """ 435 EndpointStateHandler.print_error(event.session, "session") 436 event.connection.close()
437 444
445 - def on_reactor_init(self, event):
446 """ 447 Called when the event loop - the reactor - starts. 448 """ 449 if hasattr(event.reactor, 'subclass'): 450 setattr(event, event.reactor.subclass.__name__.lower(), event.reactor) 451 self.on_start(event)
452
453 - def on_start(self, event):
454 """ 455 Called when the event loop starts. (Just an alias for on_reactor_init) 456 """ 457 pass
458 - def on_connection_closed(self, event):
459 """ 460 Called when the connection is closed. 461 """ 462 pass
463 - def on_session_closed(self, event):
464 """ 465 Called when the session is closed. 466 """ 467 pass
473 - def on_connection_closing(self, event):
474 """ 475 Called when the peer initiates the closing of the connection. 476 """ 477 pass
478 - def on_session_closing(self, event):
479 """ 480 Called when the peer initiates the closing of the session. 481 """ 482 pass
488 - def on_disconnected(self, event):
489 """ 490 Called when the socket is disconnected. 491 """ 492 pass
493
494 - def on_sendable(self, event):
495 """ 496 Called when the sender link has credit and messages can 497 therefore be transferred. 498 """ 499 pass
500
501 - def on_accepted(self, event):
502 """ 503 Called when the remote peer accepts an outgoing message. 504 """ 505 pass
506
507 - def on_rejected(self, event):
508 """ 509 Called when the remote peer rejects an outgoing message. 510 """ 511 pass
512
513 - def on_released(self, event):
514 """ 515 Called when the remote peer releases an outgoing message. Note 516 that this may be in response to either the RELEASE or MODIFIED 517 state as defined by the AMQP specification. 518 """ 519 pass
520
521 - def on_settled(self, event):
522 """ 523 Called when the remote peer has settled the outgoing 524 message. This is the point at which it should never be 525 retransmitted. 526 """ 527 pass
528 - def on_message(self, event):
529 """ 530 Called when a message is received. The message itself can be 531 obtained as a property on the event. For the purpose of 532 referring to this message in further actions (e.g. if 533 explicitly accepting it, the ``delivery`` should be used, also 534 obtainable via a property on the event. 535 """ 536 pass
537
538 -class TransactionHandler(object):
539 """ 540 The interface for transaction handlers, i.e. objects that want to 541 be notified of state changes related to a transaction. 542 """
543 - def on_transaction_declared(self, event):
544 pass
545
546 - def on_transaction_committed(self, event):
547 pass
548
549 - def on_transaction_aborted(self, event):
550 pass
551
552 - def on_transaction_declare_failed(self, event):
553 pass
554
555 - def on_transaction_commit_failed(self, event):
556 pass
557
558 -class TransactionalClientHandler(MessagingHandler, TransactionHandler):
559 """ 560 An extension to the MessagingHandler for applications using 561 transactions. 562 """ 563
564 - def __init__(self, prefetch=10, auto_accept=False, auto_settle=True, peer_close_is_error=False):
565 super(TransactionalClientHandler, self).__init__(prefetch, auto_accept, auto_settle, peer_close_is_error)
566
567 - def accept(self, delivery, transaction=None):
568 if transaction: 569 transaction.accept(delivery) 570 else: 571 super(TransactionalClientHandler, self).accept(delivery)
572 573 from proton import WrappedHandler 574 from cproton import pn_flowcontroller, pn_handshaker, pn_iohandler
575 576 -class CFlowController(WrappedHandler):
577
578 - def __init__(self, window=1024):
579 WrappedHandler.__init__(self, lambda: pn_flowcontroller(window))
580
581 -class CHandshaker(WrappedHandler):
582
583 - def __init__(self):
584 WrappedHandler.__init__(self, pn_handshaker)
585
586 -class IOHandler(WrappedHandler):
587
588 - def __init__(self):
589 WrappedHandler.__init__(self, pn_iohandler)
590
591 -class PythonIO:
592
593 - def __init__(self):
594 self.selectables = [] 595 self.delegate = IOHandler()
596
597 - def on_unhandled(self, method, event):
598 event.dispatch(self.delegate)
599
600 - def on_selectable_init(self, event):
601 self.selectables.append(event.context)
602
603 - def on_selectable_updated(self, event):
604 pass
605
606 - def on_selectable_final(self, event):
607 sel = event.context 608 if sel.is_terminal: 609 self.selectables.remove(sel) 610 sel.release()
611
612 - def on_reactor_quiesced(self, event):
613 reactor = event.reactor 614 # check if we are still quiesced, other handlers of 615 # on_reactor_quiesced could have produced events to process 616 if not reactor.quiesced: return 617 618 reading = [] 619 writing = [] 620 deadline = None 621 for sel in self.selectables: 622 if sel.reading: 623 reading.append(sel) 624 if sel.writing: 625 writing.append(sel) 626 if sel.deadline: 627 if deadline is None: 628 deadline = sel.deadline 629 else: 630 deadline = min(sel.deadline, deadline) 631 632 if deadline is not None: 633 timeout = deadline - time.time() 634 else: 635 timeout = reactor.timeout 636 if (timeout < 0): timeout = 0 637 timeout = min(timeout, reactor.timeout) 638 readable, writable, _ = select(reading, writing, [], timeout) 639 640 reactor.mark() 641 642 now = time.time() 643 644 for s in readable: 645 s.readable() 646 for s in writable: 647 s.writable() 648 for s in self.selectables: 649 if s.deadline and now > s.deadline: 650 s.expired() 651 652 reactor.yield_()
653