debuggers.hg

view tools/python/xen/web/connection.py @ 4655:a838a908e38e

bitkeeper revision 1.1327.2.2 (4267a9b3MhPpljnjQ5IbfLdzcW2K3w)

Remove twisted from the HTTP server and replace with a
threaded server. Add classes to provide tcp and unix servers
using threads instead of twisted. Remove use of twisted from
the consoles, event server and HTTP resources

Signed-off-by: Mike Wray <mike.wray@hp.com>
author mjw@wray-m-3.hpl.hp.com
date Thu Apr 21 13:25:07 2005 +0000 (2005-04-21)
parents
children d781b9d08e80
line source
1 import sys
2 import threading
3 import select
4 import socket
6 from errno import EAGAIN, EINTR, EWOULDBLOCK
8 """General classes to support server and client sockets, without
9 specifying what kind of socket they are. There are subclasses
10 for TCP and unix-domain sockets (see tcp.py and unix.py).
11 """
13 """We make sockets non-blocking so that operations like accept()
14 don't block. We also select on a timeout. Otherwise we have no way
15 of getting the threads to shutdown.
16 """
17 SELECT_TIMEOUT = 2.0
19 class SocketServerConnection:
20 """An accepted connection to a server.
21 """
23 def __init__(self, sock, protocol, addr, server):
24 self.sock = sock
25 self.protocol = protocol
26 self.addr = addr
27 self.server = server
28 self.buffer_n = 1024
29 self.thread = None
30 self.connected = True
31 protocol.setTransport(self)
32 protocol.connectionMade(addr)
34 def run(self):
35 self.thread = threading.Thread(target=self.main)
36 #self.thread.setDaemon(True)
37 self.thread.start()
39 def main(self):
40 while True:
41 if not self.thread: break
42 if self.select(): break
43 if not self.thread: break
44 data = self.read()
45 if data is None: continue
46 if data is True: break
47 if self.dataReceived(data): break
49 def select(self):
50 try:
51 select.select([self.sock], [], [], SELECT_TIMEOUT)
52 return False
53 except socket.error, ex:
54 if ex.args[0] in (EWOULDBLOCK, EAGAIN, EINTR):
55 return False
56 else:
57 self.loseConnection(ex)
58 return True
60 def read(self):
61 try:
62 data = self.sock.recv(self.buffer_n)
63 if data == '':
64 self.loseConnection()
65 return True
66 return data
67 except socket.error, ex:
68 if ex.args[0] in (EWOULDBLOCK, EAGAIN, EINTR):
69 return None
70 else:
71 self.loseConnection(ex)
72 return True
74 def dataReceived(self, data):
75 if not self.protocol:
76 return True
77 try:
78 self.protocol.dataReceived(data)
79 except SystemExit:
80 raise
81 except Exception, ex:
82 self.disconnect(ex)
83 return True
84 return False
86 def write(self, data):
87 self.sock.send(data)
89 def loseConnection(self, reason=None):
90 self.thread = None
91 self.closeSocket(reason)
92 self.closeProtocol(reason)
94 def closeSocket(self, reason):
95 try:
96 self.sock.close()
97 except SystemExit:
98 raise
99 except:
100 pass
102 def closeProtocol(self, reason):
103 try:
104 if self.connected:
105 self.connected = False
106 if self.protocol:
107 self.protocol.connectionLost(reason)
108 except SystemExit:
109 raise
110 except:
111 pass
113 def getHost(self):
114 return self.sock.getsockname()
116 def getPeer(self):
117 return self.addr
119 class SocketListener:
120 """A server socket, running listen in a thread.
121 Accepts connections and runs a thread for each one.
122 """
124 def __init__(self, factory, backlog=None):
125 if backlog is None:
126 backlog = 5
127 self.factory = factory
128 self.sock = None
129 self.backlog = backlog
130 self.thread = None
132 def createSocket(self):
133 raise NotImplementedError()
135 def acceptConnection(self, sock, protocol, addr):
136 return SocketServerConnection(sock, protocol, addr, self)
138 def startListening(self):
139 if self.sock or self.thread:
140 raise IOError("already listening")
141 self.sock = self.createSocket()
142 self.sock.setblocking(0)
143 self.sock.listen(self.backlog)
144 self.run()
146 def stopListening(self, reason=None):
147 self.loseConnection(reason)
149 def run(self):
150 self.factory.doStart()
151 self.thread = threading.Thread(target=self.main)
152 #self.thread.setDaemon(True)
153 self.thread.start()
155 def main(self):
156 while True:
157 if not self.thread: break
158 if self.select(): break
159 if self.accept(): break
161 def select(self):
162 try:
163 select.select([self.sock], [], [], SELECT_TIMEOUT)
164 return False
165 except socket.error, ex:
166 if ex.args[0] in (EWOULDBLOCK, EAGAIN, EINTR):
167 return False
168 else:
169 self.loseConnection(ex)
170 return True
172 def accept(self):
173 try:
174 (sock, addr) = self.sock.accept()
175 sock.setblocking(0)
176 return self.accepted(sock, addr)
177 except socket.error, ex:
178 if ex.args[0] in (EWOULDBLOCK, EAGAIN, EINTR):
179 return False
180 else:
181 self.loseConnection(ex)
182 return True
184 def accepted(self, sock, addr):
185 protocol = self.factory.buildProtocol(addr)
186 if protocol is None:
187 self.loseConnection()
188 return True
189 connection = self.acceptConnection(sock, protocol, addr)
190 connection.run()
191 return False
193 def loseConnection(self, reason=None):
194 self.thread = None
195 self.closeSocket(reason)
196 self.closeFactory(reason)
198 def closeSocket(self, reason):
199 try:
200 self.sock.close()
201 except SystemExit:
202 raise
203 except Exception, ex:
204 pass
206 def closeFactory(self, reason):
207 try:
208 self.factory.doStop()
209 except SystemExit:
210 raise
211 except:
212 pass
214 class SocketClientConnection:
215 """A connection to a server from a client.
217 Call connectionMade() on the protocol in a thread when connected.
218 It is completely up to the protocol what to do.
219 """
221 def __init__(self, connector):
222 self.addr = None
223 self.connector = connector
224 self.buffer_n = 1024
225 self.connected = False
227 def createSocket (self):
228 raise NotImplementedError()
230 def write(self, data):
231 if self.sock:
232 return self.sock.send(data)
233 else:
234 return 0
236 def connect(self, timeout):
237 #todo: run a timer to cancel on timeout?
238 try:
239 sock = self.createSocket()
240 sock.connect(self.addr)
241 self.sock = sock
242 self.connected = True
243 self.protocol = self.connector.buildProtocol(self.addr)
244 self.protocol.setTransport(self)
245 except SystemExit:
246 raise
247 except Exception, ex:
248 self.connector.connectionFailed(ex)
249 return False
251 self.thread = threading.Thread(target=self.main)
252 #self.thread.setDaemon(True)
253 self.thread.start()
254 return True
256 def main(self):
257 try:
258 # Call the protocol in a thread.
259 # Up to it what to do.
260 self.protocol.connectionMade(self.addr)
261 except SystemExit:
262 raise
263 except Exception, ex:
264 self.disconnect(ex)
266 def mainLoop(self):
267 # Something a protocol could call.
268 while True:
269 if not self.thread: break
270 if self.select(): break
271 if not self.thread: break
272 data = self.read()
273 if data is None: continue
274 if data is True: break
275 if self.dataReceived(data): break
277 def select(self):
278 try:
279 select.select([self.sock], [], [], SELECT_TIMEOUT)
280 return False
281 except socket.error, ex:
282 if ex.args[0] in (EWOULDBLOCK, EAGAIN, EINTR):
283 return False
284 else:
285 self.disconnect(ex)
286 return True
288 def read(self):
289 try:
290 data = self.sock.recv(self.buffer_n)
291 return data
292 except socket.error, ex:
293 if ex.args[0] in (EWOULDBLOCK, EAGAIN, EINTR):
294 return None
295 else:
296 self.disconnect(ex)
297 return True
299 def dataReceived(self, data):
300 if not self.protocol:
301 return True
302 try:
303 self.protocol.dataReceived(data)
304 except SystemExit:
305 raise
306 except Exception, ex:
307 self.disconnect(ex)
308 return True
309 return False
311 def disconnect(self, reason=None):
312 self.thread = None
313 self.closeSocket(reason)
314 self.closeProtocol(reason)
315 self.closeConnector(reason)
317 def closeSocket(self, reason):
318 try:
319 if self.sock:
320 self.sock.close()
321 except SystemExit:
322 raise
323 except:
324 pass
326 def closeProtocol(self, reason):
327 try:
328 if self.connected:
329 self.connected = False
330 if self.protocol:
331 self.protocol.connectionLost(reason)
332 except SystemExit:
333 raise
334 except:
335 pass
336 self.protocol = None
338 def closeConnector(self, reason):
339 try:
340 self.connector.connectionLost(reason)
341 except SystemExit:
342 raise
343 except:
344 pass
346 class SocketConnector:
347 """A client socket. Connects to a server and runs the client protocol
348 in a thread.
349 """
351 def __init__(self, factory):
352 self.factoryStarted = False
353 self.factory = factory
354 self.state = "disconnected"
355 self.transport = None
357 def getDestination(self):
358 raise NotImplementedError()
360 def connectTransport(self):
361 raise NotImplementedError()
363 def connect(self):
364 if self.state != "disconnected":
365 raise socket.error(EINVAL, "cannot connect in state " + self.state)
366 self.state = "connecting"
367 if not self.factoryStarted:
368 self.factoryStarted = True
369 self.factory.doStart()
370 self.factory.startedConnecting()
371 self.connectTransport()
373 def stopConnecting(self):
374 if self.state != "connecting":
375 return
376 self.state = "disconnected"
377 self.transport.disconnect()
379 def buildProtocol(self, addr):
380 return self.factory.buildProtocol(addr)
382 def connectionLost(self, reason=None):
383 self.factory.doStop()
385 def connectionFailed(self, reason=None):
386 self.factory.doStop()