debuggers.hg

view tools/python/xen/web/connection.py @ 4672:d781b9d08e80

bitkeeper revision 1.1327.2.4 (426918a34Af7gihN8mTkq-P3KrAZXg)

Remove twisted from save/migrate handling.
This needs to use threads, so add thread support for
http server requests.

Signed-off-by: Mike Wray <mike.wray@hp.com>
author mjw@wray-m-3.hpl.hp.com
date Fri Apr 22 15:30:43 2005 +0000 (2005-04-22)
parents a838a908e38e
children 16efdf7bbd57
line source
1 import sys
2 import threading
3 import select
4 import socket
6 from errno import EAGAIN, EINTR, EWOULDBLOCK
8 """General classes to support server and client sockets, without
9 specifying what kind of socket they are. There are subclasses
10 for TCP and unix-domain sockets (see tcp.py and unix.py).
11 """
13 """We make sockets non-blocking so that operations like accept()
14 don't block. We also select on a timeout. Otherwise we have no way
15 of getting the threads to shutdown.
16 """
17 SELECT_TIMEOUT = 2.0
19 class SocketServerConnection:
20 """An accepted connection to a server.
21 """
23 def __init__(self, sock, protocol, addr, server):
24 self.sock = sock
25 self.protocol = protocol
26 self.addr = addr
27 self.server = server
28 self.buffer_n = 1024
29 self.thread = None
30 self.connected = True
31 protocol.setTransport(self)
32 protocol.connectionMade(addr)
34 def run(self):
35 self.thread = threading.Thread(target=self.main)
36 #self.thread.setDaemon(True)
37 self.thread.start()
39 def main(self):
40 while True:
41 if not self.thread: break
42 if self.select(): break
43 if not self.thread: break
44 data = self.read()
45 if data is None: continue
46 if data is True: break
47 if self.dataReceived(data): break
49 def select(self):
50 try:
51 select.select([self.sock], [], [], SELECT_TIMEOUT)
52 return False
53 except socket.error, ex:
54 if ex.args[0] in (EWOULDBLOCK, EAGAIN, EINTR):
55 return False
56 else:
57 self.loseConnection(ex)
58 return True
60 def read(self):
61 try:
62 data = self.sock.recv(self.buffer_n)
63 if data == '':
64 self.loseConnection()
65 return True
66 return data
67 except socket.error, ex:
68 if ex.args[0] in (EWOULDBLOCK, EAGAIN, EINTR):
69 return None
70 else:
71 self.loseConnection(ex)
72 return True
74 def dataReceived(self, data):
75 if not self.connected:
76 return True
77 if not self.protocol:
78 return True
79 try:
80 self.protocol.dataReceived(data)
81 except SystemExit:
82 raise
83 except Exception, ex:
84 self.loseConnection(ex)
85 return True
86 return False
88 def write(self, data):
89 self.sock.send(data)
91 def loseConnection(self, reason=None):
92 self.thread = None
93 self.closeSocket(reason)
94 self.closeProtocol(reason)
96 def closeSocket(self, reason):
97 try:
98 self.sock.close()
99 except SystemExit:
100 raise
101 except:
102 pass
104 def closeProtocol(self, reason):
105 try:
106 if self.connected:
107 self.connected = False
108 if self.protocol:
109 self.protocol.connectionLost(reason)
110 except SystemExit:
111 raise
112 except:
113 pass
115 def getHost(self):
116 return self.sock.getsockname()
118 def getPeer(self):
119 return self.addr
121 class SocketListener:
122 """A server socket, running listen in a thread.
123 Accepts connections and runs a thread for each one.
124 """
126 def __init__(self, factory, backlog=None):
127 if backlog is None:
128 backlog = 5
129 self.factory = factory
130 self.sock = None
131 self.backlog = backlog
132 self.thread = None
134 def createSocket(self):
135 raise NotImplementedError()
137 def acceptConnection(self, sock, protocol, addr):
138 return SocketServerConnection(sock, protocol, addr, self)
140 def startListening(self):
141 if self.sock or self.thread:
142 raise IOError("already listening")
143 self.sock = self.createSocket()
144 self.sock.setblocking(0)
145 self.sock.listen(self.backlog)
146 self.run()
148 def stopListening(self, reason=None):
149 self.loseConnection(reason)
151 def run(self):
152 self.factory.doStart()
153 self.thread = threading.Thread(target=self.main)
154 #self.thread.setDaemon(True)
155 self.thread.start()
157 def main(self):
158 while True:
159 if not self.thread: break
160 if self.select(): break
161 if self.accept(): break
163 def select(self):
164 try:
165 select.select([self.sock], [], [], SELECT_TIMEOUT)
166 return False
167 except socket.error, ex:
168 if ex.args[0] in (EWOULDBLOCK, EAGAIN, EINTR):
169 return False
170 else:
171 self.loseConnection(ex)
172 return True
174 def accept(self):
175 try:
176 (sock, addr) = self.sock.accept()
177 sock.setblocking(0)
178 return self.accepted(sock, addr)
179 except socket.error, ex:
180 if ex.args[0] in (EWOULDBLOCK, EAGAIN, EINTR):
181 return False
182 else:
183 self.loseConnection(ex)
184 return True
186 def accepted(self, sock, addr):
187 protocol = self.factory.buildProtocol(addr)
188 if protocol is None:
189 self.loseConnection()
190 return True
191 connection = self.acceptConnection(sock, protocol, addr)
192 connection.run()
193 return False
195 def loseConnection(self, reason=None):
196 self.thread = None
197 self.closeSocket(reason)
198 self.closeFactory(reason)
200 def closeSocket(self, reason):
201 try:
202 self.sock.close()
203 except SystemExit:
204 raise
205 except Exception, ex:
206 pass
208 def closeFactory(self, reason):
209 try:
210 self.factory.doStop()
211 except SystemExit:
212 raise
213 except:
214 pass
216 class SocketClientConnection:
217 """A connection to a server from a client.
219 Call connectionMade() on the protocol in a thread when connected.
220 It is completely up to the protocol what to do.
221 """
223 def __init__(self, connector):
224 self.addr = None
225 self.connector = connector
226 self.buffer_n = 1024
227 self.connected = False
229 def createSocket (self):
230 raise NotImplementedError()
232 def write(self, data):
233 if self.sock:
234 return self.sock.send(data)
235 else:
236 return 0
238 def connect(self, timeout):
239 #todo: run a timer to cancel on timeout?
240 try:
241 sock = self.createSocket()
242 sock.connect(self.addr)
243 self.sock = sock
244 self.connected = True
245 self.protocol = self.connector.buildProtocol(self.addr)
246 self.protocol.setTransport(self)
247 except SystemExit:
248 raise
249 except Exception, ex:
250 self.connector.connectionFailed(ex)
251 return False
253 self.thread = threading.Thread(target=self.main)
254 #self.thread.setDaemon(True)
255 self.thread.start()
256 return True
258 def main(self):
259 try:
260 # Call the protocol in a thread.
261 # Up to it what to do.
262 self.protocol.connectionMade(self.addr)
263 except SystemExit:
264 raise
265 except Exception, ex:
266 self.loseConnection(ex)
268 def mainLoop(self):
269 # Something a protocol could call.
270 while True:
271 if not self.thread: break
272 if self.select(): break
273 if not self.thread: break
274 data = self.read()
275 if data is None: continue
276 if data is True: break
277 if self.dataReceived(data): break
279 def select(self):
280 try:
281 select.select([self.sock], [], [], SELECT_TIMEOUT)
282 return False
283 except socket.error, ex:
284 if ex.args[0] in (EWOULDBLOCK, EAGAIN, EINTR):
285 return False
286 else:
287 self.loseConnection(ex)
288 return True
290 def read(self):
291 try:
292 data = self.sock.recv(self.buffer_n)
293 return data
294 except socket.error, ex:
295 if ex.args[0] in (EWOULDBLOCK, EAGAIN, EINTR):
296 return None
297 else:
298 self.loseConnection(ex)
299 return True
301 def dataReceived(self, data):
302 if not self.protocol:
303 return True
304 try:
305 self.protocol.dataReceived(data)
306 except SystemExit:
307 raise
308 except Exception, ex:
309 self.loseConnection(ex)
310 return True
311 return False
313 def loseConnection(self, reason=None):
314 self.thread = None
315 self.closeSocket(reason)
316 self.closeProtocol(reason)
317 self.closeConnector(reason)
319 def closeSocket(self, reason):
320 try:
321 if self.sock:
322 self.sock.close()
323 except SystemExit:
324 raise
325 except:
326 pass
328 def closeProtocol(self, reason):
329 try:
330 if self.connected:
331 self.connected = False
332 if self.protocol:
333 self.protocol.connectionLost(reason)
334 except SystemExit:
335 raise
336 except:
337 pass
338 self.protocol = None
340 def closeConnector(self, reason):
341 try:
342 self.connector.connectionLost(reason)
343 except SystemExit:
344 raise
345 except:
346 pass
348 class SocketConnector:
349 """A client socket. Connects to a server and runs the client protocol
350 in a thread.
351 """
353 def __init__(self, factory):
354 self.factoryStarted = False
355 self.clientLost = False
356 self.clientFailed = False
357 self.factory = factory
358 self.state = "disconnected"
359 self.transport = None
361 def getDestination(self):
362 raise NotImplementedError()
364 def connectTransport(self):
365 raise NotImplementedError()
367 def connect(self):
368 if self.state != "disconnected":
369 raise socket.error(EINVAL, "cannot connect in state " + self.state)
370 self.state = "connecting"
371 self.clientLost = False
372 self.clientFailed = False
373 if not self.factoryStarted:
374 self.factoryStarted = True
375 self.factory.doStart()
376 self.factory.startedConnecting(self)
377 self.connectTransport()
378 self.state = "connected"
380 def stopConnecting(self):
381 if self.state != "connecting":
382 return
383 self.state = "disconnected"
384 self.transport.disconnect()
386 def buildProtocol(self, addr):
387 return self.factory.buildProtocol(addr)
389 def connectionLost(self, reason=None):
390 if not self.clientLost:
391 self.clientLost = True
392 self.factory.clientConnectionLost(self, reason)
394 def connectionFailed(self, reason=None):
395 if not self.clientFailed:
396 self.clientFailed = True
397 self.factory.clientConnectionFailed(self, reason)