Package proton :: Module _handlers
[frames] | no frames]

Source Code for Module proton._handlers

  1  # 
  2  # Licensed to the Apache Software Foundation (ASF) under one 
  3  # or more contributor license agreements.  See the NOTICE file 
  4  # distributed with this work for additional information 
  5  # regarding copyright ownership.  The ASF licenses this file 
  6  # to you under the Apache License, Version 2.0 (the 
  7  # "License"); you may not use this file except in compliance 
  8  # with the License.  You may obtain a copy of the License at 
  9  # 
 10  #   http://www.apache.org/licenses/LICENSE-2.0 
 11  # 
 12  # Unless required by applicable law or agreed to in writing, 
 13  # software distributed under the License is distributed on an 
 14  # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
 15  # KIND, either express or implied.  See the License for the 
 16  # specific language governing permissions and limitations 
 17  # under the License. 
 18  # 
 19   
 20  from __future__ import absolute_import 
 21   
 22  import errno 
 23  import logging 
 24  import socket 
 25  import time 
 26  import weakref 
 27   
 28  from ._condition import Condition 
 29  from ._delivery import Delivery 
 30  from ._endpoints import Endpoint 
 31  from ._events import Event, Handler, _dispatch 
 32  from ._exceptions import ProtonException 
 33  from ._io import IO 
 34  from ._message import Message 
 35  from ._selectable import Selectable 
 36  from ._transport import Transport 
 37  from ._url import Url 
 38   
 39  log = logging.getLogger("proton") 
40 41 42 -class OutgoingMessageHandler(Handler):
43 """ 44 A utility for simpler and more intuitive handling of delivery 45 events related to outgoing i.e. sent messages. 46 """ 47
48 - def __init__(self, auto_settle=True, delegate=None):
49 self.auto_settle = auto_settle 50 self.delegate = delegate
51 57
58 - def on_delivery(self, event):
59 dlv = event.delivery 60 if dlv.link.is_sender and dlv.updated: 61 if dlv.remote_state == Delivery.ACCEPTED: 62 self.on_accepted(event) 63 elif dlv.remote_state == Delivery.REJECTED: 64 self.on_rejected(event) 65 elif dlv.remote_state == Delivery.RELEASED or dlv.remote_state == Delivery.MODIFIED: 66 self.on_released(event) 67 if dlv.settled: 68 self.on_settled(event) 69 if self.auto_settle: 70 dlv.settle()
71
72 - def on_sendable(self, event):
73 """ 74 Called when the sender link has credit and messages can 75 therefore be transferred. 76 """ 77 if self.delegate is not None: 78 _dispatch(self.delegate, 'on_sendable', event)
79
80 - def on_accepted(self, event):
81 """ 82 Called when the remote peer accepts an outgoing message. 83 """ 84 if self.delegate is not None: 85 _dispatch(self.delegate, 'on_accepted', event)
86
87 - def on_rejected(self, event):
88 """ 89 Called when the remote peer rejects an outgoing message. 90 """ 91 if self.delegate is not None: 92 _dispatch(self.delegate, 'on_rejected', event)
93
94 - def on_released(self, event):
95 """ 96 Called when the remote peer releases an outgoing message. Note 97 that this may be in response to either the RELEASE or MODIFIED 98 state as defined by the AMQP specification. 99 """ 100 if self.delegate is not None: 101 _dispatch(self.delegate, 'on_released', event)
102
103 - def on_settled(self, event):
104 """ 105 Called when the remote peer has settled the outgoing 106 message. This is the point at which it should never be 107 retransmitted. 108 """ 109 if self.delegate is not None: 110 _dispatch(self.delegate, 'on_settled', event)
111
112 113 -def recv_msg(delivery):
114 msg = Message() 115 msg.decode(delivery.link.recv(delivery.pending)) 116 delivery.link.advance() 117 return msg
118
119 120 -class Reject(ProtonException):
121 """ 122 An exception that indicate a message should be rejected 123 """ 124 pass
125
126 127 -class Release(ProtonException):
128 """ 129 An exception that indicate a message should be rejected 130 """ 131 pass
132
133 134 -class Acking(object):
135 - def accept(self, delivery):
136 """ 137 Accepts a received message. 138 139 Note that this method cannot currently be used in combination 140 with transactions. 141 """ 142 self.settle(delivery, Delivery.ACCEPTED)
143
144 - def reject(self, delivery):
145 """ 146 Rejects a received message that is considered invalid or 147 unprocessable. 148 """ 149 self.settle(delivery, Delivery.REJECTED)
150
151 - def release(self, delivery, delivered=True):
152 """ 153 Releases a received message, making it available at the source 154 for any (other) interested receiver. The ``delivered`` 155 parameter indicates whether this should be considered a 156 delivery attempt (and the delivery count updated) or not. 157 """ 158 if delivered: 159 self.settle(delivery, Delivery.MODIFIED) 160 else: 161 self.settle(delivery, Delivery.RELEASED)
162
163 - def settle(self, delivery, state=None):
167
168 169 -class IncomingMessageHandler(Handler, Acking):
170 """ 171 A utility for simpler and more intuitive handling of delivery 172 events related to incoming i.e. received messages. 173 """ 174
175 - def __init__(self, auto_accept=True, delegate=None):
176 self.delegate = delegate 177 self.auto_accept = auto_accept
178
179 - def on_delivery(self, event):
180 dlv = event.delivery 181 if not dlv.link.is_receiver: return 182 if dlv.aborted: 183 self.on_aborted(event) 184 dlv.settle() 185 elif dlv.readable and not dlv.partial: 186 event.message = recv_msg(dlv) 187 if event.link.state & Endpoint.LOCAL_CLOSED: 188 if self.auto_accept: 189 dlv.update(Delivery.RELEASED) 190 dlv.settle() 191 else: 192 try: 193 self.on_message(event) 194 if self.auto_accept: 195 dlv.update(Delivery.ACCEPTED) 196 dlv.settle() 197 except Reject: 198 dlv.update(Delivery.REJECTED) 199 dlv.settle() 200 except Release: 201 dlv.update(Delivery.MODIFIED) 202 dlv.settle() 203 elif dlv.updated and dlv.settled: 204 self.on_settled(event)
205
206 - def on_message(self, event):
207 """ 208 Called when a message is received. The message itself can be 209 obtained as a property on the event. For the purpose of 210 referring to this message in further actions (e.g. if 211 explicitly accepting it, the ``delivery`` should be used, also 212 obtainable via a property on the event. 213 """ 214 if self.delegate is not None: 215 _dispatch(self.delegate, 'on_message', event)
216
217 - def on_settled(self, event):
218 if self.delegate is not None: 219 _dispatch(self.delegate, 'on_settled', event)
220
221 - def on_aborted(self, event):
222 if self.delegate is not None: 223 _dispatch(self.delegate, 'on_aborted', event)
224
225 226 -class EndpointStateHandler(Handler):
227 """ 228 A utility that exposes 'endpoint' events i.e. the open/close for 229 links, sessions and connections in a more intuitive manner. A 230 XXX_opened method will be called when both local and remote peers 231 have opened the link, session or connection. This can be used to 232 confirm a locally initiated action for example. A XXX_opening 233 method will be called when the remote peer has requested an open 234 that was not initiated locally. By default this will simply open 235 locally, which then triggers the XXX_opened call. The same applies 236 to close. 237 """ 238
239 - def __init__(self, peer_close_is_error=False, delegate=None):
240 self.delegate = delegate 241 self.peer_close_is_error = peer_close_is_error
242 243 @classmethod
244 - def is_local_open(cls, endpoint):
245 return endpoint.state & Endpoint.LOCAL_ACTIVE
246 247 @classmethod
248 - def is_local_uninitialised(cls, endpoint):
249 return endpoint.state & Endpoint.LOCAL_UNINIT
250 251 @classmethod
252 - def is_local_closed(cls, endpoint):
253 return endpoint.state & Endpoint.LOCAL_CLOSED
254 255 @classmethod
256 - def is_remote_open(cls, endpoint):
257 return endpoint.state & Endpoint.REMOTE_ACTIVE
258 259 @classmethod
260 - def is_remote_closed(cls, endpoint):
261 return endpoint.state & Endpoint.REMOTE_CLOSED
262 263 @classmethod
264 - def print_error(cls, endpoint, endpoint_type):
265 if endpoint.remote_condition: 266 log.error(endpoint.remote_condition.description or endpoint.remote_condition.name) 267 elif cls.is_local_open(endpoint) and cls.is_remote_closed(endpoint): 268 log.error("%s closed by peer" % endpoint_type)
269 278
279 - def on_session_remote_close(self, event):
280 if event.session.remote_condition: 281 self.on_session_error(event) 282 elif self.is_local_closed(event.session): 283 self.on_session_closed(event) 284 else: 285 self.on_session_closing(event) 286 event.session.close()
287
288 - def on_connection_remote_close(self, event):
289 if event.connection.remote_condition: 290 if event.connection.remote_condition.name == "amqp:connection:forced": 291 # Treat this the same as just having the transport closed by the peer without 292 # sending any events. Allow reconnection to happen transparently. 293 return 294 self.on_connection_error(event) 295 elif self.is_local_closed(event.connection): 296 self.on_connection_closed(event) 297 else: 298 self.on_connection_closing(event) 299 event.connection.close()
300
301 - def on_connection_local_open(self, event):
302 if self.is_remote_open(event.connection): 303 self.on_connection_opened(event)
304
305 - def on_connection_remote_open(self, event):
306 if self.is_local_open(event.connection): 307 self.on_connection_opened(event) 308 elif self.is_local_uninitialised(event.connection): 309 self.on_connection_opening(event) 310 event.connection.open()
311
312 - def on_session_local_open(self, event):
313 if self.is_remote_open(event.session): 314 self.on_session_opened(event)
315
316 - def on_session_remote_open(self, event):
317 if self.is_local_open(event.session): 318 self.on_session_opened(event) 319 elif self.is_local_uninitialised(event.session): 320 self.on_session_opening(event) 321 event.session.open()
322 326 333
334 - def on_connection_opened(self, event):
335 if self.delegate is not None: 336 _dispatch(self.delegate, 'on_connection_opened', event)
337
338 - def on_session_opened(self, event):
339 if self.delegate is not None: 340 _dispatch(self.delegate, 'on_session_opened', event)
341 345
346 - def on_connection_opening(self, event):
347 if self.delegate is not None: 348 _dispatch(self.delegate, 'on_connection_opening', event)
349
350 - def on_session_opening(self, event):
351 if self.delegate is not None: 352 _dispatch(self.delegate, 'on_session_opening', event)
353 357
358 - def on_connection_error(self, event):
359 if self.delegate is not None: 360 _dispatch(self.delegate, 'on_connection_error', event) 361 else: 362 self.print_error(event.connection, "connection")
363
364 - def on_session_error(self, event):
365 if self.delegate is not None: 366 _dispatch(self.delegate, 'on_session_error', event) 367 else: 368 self.print_error(event.session, "session") 369 event.connection.close()
370 377
378 - def on_connection_closed(self, event):
379 if self.delegate is not None: 380 _dispatch(self.delegate, 'on_connection_closed', event)
381
382 - def on_session_closed(self, event):
383 if self.delegate is not None: 384 _dispatch(self.delegate, 'on_session_closed', event)
385 389
390 - def on_connection_closing(self, event):
391 if self.delegate is not None: 392 _dispatch(self.delegate, 'on_connection_closing', event) 393 elif self.peer_close_is_error: 394 self.on_connection_error(event)
395
396 - def on_session_closing(self, event):
397 if self.delegate is not None: 398 _dispatch(self.delegate, 'on_session_closing', event) 399 elif self.peer_close_is_error: 400 self.on_session_error(event)
401 407
408 - def on_transport_tail_closed(self, event):
409 self.on_transport_closed(event)
410
411 - def on_transport_closed(self, event):
412 if self.delegate is not None and event.connection and self.is_local_open(event.connection): 413 _dispatch(self.delegate, 'on_disconnected', event)
414
415 416 -class MessagingHandler(Handler, Acking):
417 """ 418 A general purpose handler that makes the proton-c events somewhat 419 simpler to deal with and/or avoids repetitive tasks for common use 420 cases. 421 """ 422
423 - def __init__(self, prefetch=10, auto_accept=True, auto_settle=True, peer_close_is_error=False):
424 self.handlers = [] 425 if prefetch: 426 self.handlers.append(FlowController(prefetch)) 427 self.handlers.append(EndpointStateHandler(peer_close_is_error, weakref.proxy(self))) 428 self.handlers.append(IncomingMessageHandler(auto_accept, weakref.proxy(self))) 429 self.handlers.append(OutgoingMessageHandler(auto_settle, weakref.proxy(self))) 430 self.fatal_conditions = ["amqp:unauthorized-access"]
431
432 - def on_transport_error(self, event):
433 """ 434 Called when some error is encountered with the transport over 435 which the AMQP connection is to be established. This includes 436 authentication errors as well as socket errors. 437 """ 438 if event.transport.condition: 439 if event.transport.condition.info: 440 log.error("%s: %s: %s" % ( 441 event.transport.condition.name, event.transport.condition.description, 442 event.transport.condition.info)) 443 else: 444 log.error("%s: %s" % (event.transport.condition.name, event.transport.condition.description)) 445 if event.transport.condition.name in self.fatal_conditions: 446 event.connection.close() 447 else: 448 logging.error("Unspecified transport error")
449
450 - def on_connection_error(self, event):
451 """ 452 Called when the peer closes the connection with an error condition. 453 """ 454 EndpointStateHandler.print_error(event.connection, "connection")
455
456 - def on_session_error(self, event):
457 """ 458 Called when the peer closes the session with an error condition. 459 """ 460 EndpointStateHandler.print_error(event.session, "session") 461 event.connection.close()
462 469
470 - def on_reactor_init(self, event):
471 """ 472 Called when the event loop - the reactor - starts. 473 """ 474 if hasattr(event.reactor, 'subclass'): 475 setattr(event, event.reactor.subclass.__name__.lower(), event.reactor) 476 self.on_start(event)
477
478 - def on_start(self, event):
479 """ 480 Called when the event loop starts. (Just an alias for on_reactor_init) 481 """ 482 pass
483
484 - def on_connection_closed(self, event):
485 """ 486 Called when the connection is closed. 487 """ 488 pass
489
490 - def on_session_closed(self, event):
491 """ 492 Called when the session is closed. 493 """ 494 pass
495 501
502 - def on_connection_closing(self, event):
503 """ 504 Called when the peer initiates the closing of the connection. 505 """ 506 pass
507
508 - def on_session_closing(self, event):
509 """ 510 Called when the peer initiates the closing of the session. 511 """ 512 pass
513 519
520 - def on_disconnected(self, event):
521 """ 522 Called when the socket is disconnected. 523 """ 524 pass
525
526 - def on_sendable(self, event):
527 """ 528 Called when the sender link has credit and messages can 529 therefore be transferred. 530 """ 531 pass
532
533 - def on_accepted(self, event):
534 """ 535 Called when the remote peer accepts an outgoing message. 536 """ 537 pass
538
539 - def on_rejected(self, event):
540 """ 541 Called when the remote peer rejects an outgoing message. 542 """ 543 pass
544
545 - def on_released(self, event):
546 """ 547 Called when the remote peer releases an outgoing message. Note 548 that this may be in response to either the RELEASE or MODIFIED 549 state as defined by the AMQP specification. 550 """ 551 pass
552
553 - def on_settled(self, event):
554 """ 555 Called when the remote peer has settled the outgoing 556 message. This is the point at which it should never be 557 retransmitted. 558 """ 559 pass
560
561 - def on_message(self, event):
562 """ 563 Called when a message is received. The message itself can be 564 obtained as a property on the event. For the purpose of 565 referring to this message in further actions (e.g. if 566 explicitly accepting it, the ``delivery`` should be used, also 567 obtainable via a property on the event. 568 """ 569 pass
570
571 572 -class TransactionHandler(object):
573 """ 574 The interface for transaction handlers, i.e. objects that want to 575 be notified of state changes related to a transaction. 576 """ 577
578 - def on_transaction_declared(self, event):
579 pass
580
581 - def on_transaction_committed(self, event):
582 pass
583
584 - def on_transaction_aborted(self, event):
585 pass
586
587 - def on_transaction_declare_failed(self, event):
588 pass
589
590 - def on_transaction_commit_failed(self, event):
591 pass
592
593 594 -class TransactionalClientHandler(MessagingHandler, TransactionHandler):
595 """ 596 An extension to the MessagingHandler for applications using 597 transactions. 598 """ 599
600 - def __init__(self, prefetch=10, auto_accept=False, auto_settle=True, peer_close_is_error=False):
601 super(TransactionalClientHandler, self).__init__(prefetch, auto_accept, auto_settle, peer_close_is_error)
602
603 - def accept(self, delivery, transaction=None):
604 if transaction: 605 transaction.accept(delivery) 606 else: 607 super(TransactionalClientHandler, self).accept(delivery)
608
609 610 -class FlowController(Handler):
611 - def __init__(self, window=1024):
612 self._window = window 613 self._drained = 0
614 617 620 623
624 - def on_delivery(self, event):
625 self._flow(event.link)
626
627 - def _flow(self, link):
628 if link.is_receiver: 629 self._drained += link.drained() 630 if self._drained == 0: 631 delta = self._window - link.credit 632 link.flow(delta)
633
634 635 -class Handshaker(Handler):
636 637 @staticmethod
638 - def on_connection_remote_open(event):
639 conn = event.connection 640 if conn.state & Endpoint.LOCAL_UNINIT: 641 conn.open()
642 643 @staticmethod
644 - def on_session_remote_open(event):
645 ssn = event.session 646 if ssn.state() & Endpoint.LOCAL_UNINIT: 647 ssn.open()
648 649 @staticmethod 656 657 @staticmethod
658 - def on_connection_remote_close(event):
659 conn = event.connection 660 if not conn.state & Endpoint.LOCAL_CLOSED: 661 conn.close()
662 663 @staticmethod
664 - def on_session_remote_close(event):
665 ssn = event.session 666 if not ssn.state & Endpoint.LOCAL_CLOSED: 667 ssn.close()
668 669 @staticmethod
674 675 676 # Back compatibility definitions 677 CFlowController = FlowController 678 CHandshaker = Handshaker
679 680 681 -class PythonIO:
682
683 - def __init__(self):
684 self.selectables = [] 685 self.delegate = IOHandler()
686
687 - def on_unhandled(self, method, event):
688 event.dispatch(self.delegate)
689
690 - def on_selectable_init(self, event):
691 self.selectables.append(event.context)
692
693 - def on_selectable_updated(self, event):
694 pass
695
696 - def on_selectable_final(self, event):
697 sel = event.context 698 if sel.is_terminal: 699 self.selectables.remove(sel) 700 sel.release()
701
702 - def on_reactor_quiesced(self, event):
703 reactor = event.reactor 704 # check if we are still quiesced, other handlers of 705 # on_reactor_quiesced could have produced events to process 706 if not reactor.quiesced: return 707 708 reading = [] 709 writing = [] 710 deadline = None 711 for sel in self.selectables: 712 if sel.reading: 713 reading.append(sel) 714 if sel.writing: 715 writing.append(sel) 716 if sel.deadline: 717 if deadline is None: 718 deadline = sel.deadline 719 else: 720 deadline = min(sel.deadline, deadline) 721 722 if deadline is not None: 723 timeout = deadline - time.time() 724 else: 725 timeout = reactor.timeout 726 if timeout < 0: timeout = 0 727 timeout = min(timeout, reactor.timeout) 728 readable, writable, _ = IO.select(reading, writing, [], timeout) 729 730 now = reactor.mark() 731 732 for s in readable: 733 s.readable() 734 for s in writable: 735 s.writable() 736 for s in self.selectables: 737 if s.deadline and now > s.deadline: 738 s.expired() 739 740 reactor.yield_()
741
742 743 # For C style IO handler need to implement Selector 744 -class IOHandler(Handler):
745
746 - def __init__(self):
747 self._selector = IO.Selector()
748
749 - def on_selectable_init(self, event):
750 s = event.selectable 751 self._selector.add(s) 752 s._reactor._selectables += 1
753
754 - def on_selectable_updated(self, event):
755 s = event.selectable 756 self._selector.update(s)
757
758 - def on_selectable_final(self, event):
759 s = event.selectable 760 self._selector.remove(s) 761 s._reactor._selectables -= 1 762 s.release()
763
764 - def on_reactor_quiesced(self, event):
765 r = event.reactor 766 767 if not r.quiesced: 768 return 769 770 d = r.timer_deadline 771 readable, writable, expired = self._selector.select(r.timeout) 772 773 now = r.mark() 774 775 for s in readable: 776 s.readable() 777 for s in writable: 778 s.writable() 779 for s in expired: 780 s.expired() 781 782 r.yield_()
783
784 - def on_selectable_readable(self, event):
785 s = event.selectable 786 t = s._transport 787 788 # If we're an acceptor we can't have a transport 789 # and we don't want to do anything here in any case 790 if not t: 791 return 792 793 capacity = t.capacity() 794 if capacity > 0: 795 try: 796 b = s.recv(capacity) 797 if len(b) > 0: 798 n = t.push(b) 799 else: 800 # EOF handling 801 self.on_selectable_error(event) 802 except socket.error as e: 803 # TODO: What's the error handling to be here? 804 log.error("Couldn't recv: %r" % e) 805 t.close_tail() 806 807 # Always update as we may have gone to not reading or from 808 # not writing to writing when processing the incoming bytes 809 r = s._reactor 810 self.update(t, s, r.now)
811
812 - def on_selectable_writable(self, event):
813 s = event.selectable 814 t = s._transport 815 816 # If we're an acceptor we can't have a transport 817 # and we don't want to do anything here in any case 818 if not t: 819 return 820 821 pending = t.pending() 822 if pending > 0: 823 824 try: 825 n = s.send(t.peek(pending)) 826 t.pop(n) 827 except socket.error as e: 828 log.error("Couldn't send: %r" % e) 829 # TODO: Error? or actually an exception 830 t.close_head() 831 832 newpending = t.pending() 833 if newpending != pending: 834 r = s._reactor 835 self.update(t, s, r.now)
836
837 - def on_selectable_error(self, event):
838 s = event.selectable 839 t = s._transport 840 841 t.close_head() 842 t.close_tail() 843 s.terminate() 844 s.update()
845
846 - def on_selectable_expired(self, event):
847 s = event.selectable 848 t = s._transport 849 r = s._reactor 850 851 self.update(t, s, r.now)
852
853 - def on_connection_local_open(self, event):
854 c = event.connection 855 if not c.state & Endpoint.REMOTE_UNINIT: 856 return 857 858 t = Transport() 859 # It seems perverse, but the C code ignores bind errors too! 860 # and this is required or you get errors because Connector() has already 861 # bound the transport and connection! 862 t.bind_nothrow(c)
863
864 - def on_connection_bound(self, event):
865 c = event.connection 866 t = event.transport 867 868 reactor = c._reactor 869 870 # link the new transport to its reactor: 871 t._reactor = reactor 872 873 if c._acceptor: 874 # this connection was created by the acceptor. There is already a 875 # socket assigned to this connection. Nothing needs to be done. 876 return 877 878 url = c.url or Url(c.hostname) 879 url.defaults() 880 881 host = url.host 882 port = int(url.port) 883 884 if not c.user: 885 user = url.username 886 if user: 887 c.user = user 888 password = url.password 889 if password: 890 c.password = password 891 892 addrs = socket.getaddrinfo(host, port, socket.AF_UNSPEC, socket.SOCK_STREAM) 893 894 # Try first possible address 895 log.debug("Connect trying first transport address: %s", addrs[0]) 896 sock = IO.connect(addrs[0]) 897 898 # At this point we need to arrange to be called back when the socket is writable 899 connector = ConnectSelectable(sock, reactor, addrs[1:], t, self) 900 connector.collect(reactor._collector) 901 connector.writing = True 902 connector.push_event(connector, Event.SELECTABLE_INIT) 903 904 # TODO: Don't understand why we need this now - how can we get PN_TRANSPORT until the connection succeeds? 905 t._selectable = None
906 907 @staticmethod
908 - def update(transport, selectable, now):
909 try: 910 capacity = transport.capacity() 911 selectable.reading = capacity>0 912 except: 913 if transport.closed: 914 selectable.terminate() 915 try: 916 pending = transport.pending() 917 selectable.writing = pending>0 918 except: 919 if transport.closed: 920 selectable.terminate() 921 selectable.deadline = transport.tick(now) 922 selectable.update()
923
924 - def on_transport(self, event):
925 t = event.transport 926 r = t._reactor 927 s = t._selectable 928 if s and not s.is_terminal: 929 self.update(t, s, r.now)
930
931 - def on_transport_closed(self, event):
932 t = event.transport 933 r = t._reactor 934 s = t._selectable 935 if s and not s.is_terminal: 936 s.terminate() 937 r.update(s) 938 t.unbind()
939
940 941 -class ConnectSelectable(Selectable):
942 - def __init__(self, sock, reactor, addrs, transport, iohandler):
943 super(ConnectSelectable, self).__init__(sock, reactor) 944 self._addrs = addrs 945 self._transport = transport 946 self._iohandler = iohandler
947
948 - def readable(self):
949 pass
950
951 - def writable(self):
952 e = self._delegate.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) 953 t = self._transport 954 if e == 0: 955 log.debug("Connection succeeded") 956 s = self._reactor.selectable(delegate=self._delegate) 957 s._transport = t 958 t._selectable = s 959 self._iohandler.update(t, s, t._reactor.now) 960 961 # Disassociate from the socket (which has been passed on) 962 self._delegate = None 963 self.terminate() 964 self.update() 965 return 966 elif e == errno.ECONNREFUSED: 967 if len(self._addrs) > 0: 968 log.debug("Connection refused: trying next transport address: %s", self._addrs[0]) 969 sock = IO.connect(self._addrs[0]) 970 self._addrs = self._addrs[1:] 971 self._delegate.close() 972 self._delegate = sock 973 return 974 else: 975 log.debug("Connection refused, but tried all transport addresses") 976 t.condition = Condition("proton.pythonio", "Connection refused to all addresses") 977 else: 978 log.error("Couldn't connect: %s", e) 979 t.condition = Condition("proton.pythonio", "Connection error: %s" % e) 980 981 t.close_tail() 982 t.close_head() 983 self.terminate() 984 self.update()
985