Package proton :: Module _io
[frames] | no frames]

Source Code for Module proton._io

  1  # 
  2  # Licensed to the Apache Software Foundation (ASF) under one 
  3  # or more contributor license agreements.  See the NOTICE file 
  4  # distributed with this work for additional information 
  5  # regarding copyright ownership.  The ASF licenses this file 
  6  # to you under the Apache License, Version 2.0 (the 
  7  # "License"); you may not use this file except in compliance 
  8  # with the License.  You may obtain a copy of the License at 
  9  # 
 10  #   http://www.apache.org/licenses/LICENSE-2.0 
 11  # 
 12  # Unless required by applicable law or agreed to in writing, 
 13  # software distributed under the License is distributed on an 
 14  # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
 15  # KIND, either express or implied.  See the License for the 
 16  # specific language governing permissions and limitations 
 17  # under the License. 
 18  # 
 19   
 20  from __future__ import absolute_import 
 21   
 22  import errno 
 23  import socket 
 24  import select 
 25  import time 
 26   
 27  from ._compat import socket_errno 
 28   
 29  PN_INVALID_SOCKET = -1 
30 31 -class IO(object):
32 33 @staticmethod
34 - def _setupsocket(s):
35 s.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, True) 36 s.setblocking(False)
37 38 @staticmethod
39 - def close(s):
40 s.close()
41 42 @staticmethod
43 - def listen(host, port):
44 s = socket.socket() 45 IO._setupsocket(s) 46 s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True) 47 s.bind((host, port)) 48 s.listen(10) 49 return s
50 51 @staticmethod
52 - def accept(s):
53 n = s.accept() 54 IO._setupsocket(n[0]) 55 return n
56 57 @staticmethod
58 - def connect(addr):
59 s = socket.socket(addr[0], addr[1], addr[2]) 60 IO._setupsocket(s) 61 try: 62 s.connect(addr[4]) 63 except socket.error as e: 64 if socket_errno(e) not in (errno.EINPROGRESS, errno.EWOULDBLOCK, errno.EAGAIN): 65 raise 66 return s
67 68 @staticmethod
69 - def select(*args, **kwargs):
70 return select.select(*args, **kwargs)
71 72 @staticmethod
73 - def sleep(t):
74 time.sleep(t) 75 return
76
77 - class Selector(object):
78
79 - def __init__(self):
80 self._selectables = set() 81 self._reading = set() 82 self._writing = set() 83 self._deadline = None
84
85 - def add(self, selectable):
86 self._selectables.add(selectable) 87 if selectable.reading: 88 self._reading.add(selectable) 89 if selectable.writing: 90 self._writing.add(selectable) 91 if selectable.deadline: 92 if self._deadline is None: 93 self._deadline = selectable.deadline 94 else: 95 self._deadline = min(selectable.deadline, self._deadline)
96
97 - def remove(self, selectable):
98 self._selectables.discard(selectable) 99 self._reading.discard(selectable) 100 self._writing.discard(selectable) 101 self.update_deadline()
102 103 @property
104 - def selectables(self):
105 return len(self._selectables)
106
107 - def update_deadline(self):
108 for sel in self._selectables: 109 if sel.deadline: 110 if self._deadline is None: 111 self._deadline = sel.deadline 112 else: 113 self._deadline = min(sel.deadline, self._deadline)
114
115 - def update(self, selectable):
116 self._reading.discard(selectable) 117 self._writing.discard(selectable) 118 if selectable.reading: 119 self._reading.add(selectable) 120 if selectable.writing: 121 self._writing.add(selectable) 122 self.update_deadline()
123
124 - def select(self, timeout):
125 126 def select_inner(timeout): 127 # This inner select adds the writing fds to the exception fd set 128 # because Windows returns connected fds in the exception set not the 129 # writable set 130 r = self._reading 131 w = self._writing 132 133 now = time.time() 134 135 # No timeout or deadline 136 if timeout is None and self._deadline is None: 137 return IO.select(r, w, w) 138 139 if timeout is None: 140 t = max(0, self._deadline - now) 141 return IO.select(r, w, w, t) 142 143 if self._deadline is None: 144 return IO.select(r, w, w, timeout) 145 146 t = max(0, min(timeout, self._deadline - now)) 147 if len(r)==0 and len(w)==0: 148 if t > 0: IO.sleep(t) 149 return ([],[],[]) 150 151 return IO.select(r, w, w, t)
152 153 # Need to allow for signals interrupting us on Python 2 154 # In this case the signal handler could have messed up our internal state 155 # so don't retry just return with no handles. 156 try: 157 r, w, ex = select_inner(timeout) 158 except select.error as e: 159 if socket_errno(e) != errno.EINTR: 160 raise 161 r, w, ex = ([], [], []) 162 163 # For windows non blocking connect we get exception not writable so add exceptions to writable 164 w += ex 165 166 # Calculate timed out selectables 167 now = time.time() 168 t = [s for s in self._selectables if s.deadline and now > s.deadline] 169 self._deadline = None 170 self.update_deadline() 171 return r, w, t
172