debuggers.hg

view tools/python/xen/xend/server/channel.py @ 6698:652bd7876153

Remove python virq code.
Signed-off-by: Christian Limpach <Christian.Limpach@cl.cam.ac.uk>
author cl349@firebug.cl.cam.ac.uk
date Wed Sep 07 14:19:05 2005 +0000 (2005-09-07)
parents dd668f7527cb
children 0e2b1e04d4cb 7d0fb56b4a91
line source
1 #============================================================================
2 # This library is free software; you can redistribute it and/or
3 # modify it under the terms of version 2.1 of the GNU Lesser General Public
4 # License as published by the Free Software Foundation.
5 #
6 # This library is distributed in the hope that it will be useful,
7 # but WITHOUT ANY WARRANTY; without even the implied warranty of
8 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
9 # Lesser General Public License for more details.
10 #
11 # You should have received a copy of the GNU Lesser General Public
12 # License along with this library; if not, write to the Free Software
13 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
14 #============================================================================
15 # Copyright (C) 2004, 2005 Mike Wray <mike.wray@hp.com>
16 #============================================================================
18 import threading
19 import select
21 import xen.lowlevel.xc; xc = xen.lowlevel.xc.new()
22 from xen.lowlevel import xu
24 from xen.xend.XendLogging import log
26 from messages import *
28 DEBUG = 0
30 RESPONSE_TIMEOUT = 20.0
32 class EventChannel(dict):
33 """An event channel between domains.
34 """
36 def interdomain(cls, dom1, dom2, port1=0, port2=0):
37 """Create an event channel between domains.
39 @return EventChannel (None on error)
40 """
41 v = xc.evtchn_bind_interdomain(dom1=dom1, dom2=dom2,
42 port1=port1, port2=port2)
43 if v:
44 v = cls(dom1, dom2, v)
45 return v
47 interdomain = classmethod(interdomain)
49 def restoreFromDB(cls, db, dom1, dom2, port1=0, port2=0):
50 """Create an event channel using db info if available.
51 Inverse to saveToDB().
53 @param db db
54 @param dom1
55 @param dom2
56 @param port1
57 @param port2
58 """
59 try:
60 dom1 = int(db['dom1'].getData())
61 except: pass
62 try:
63 dom2 = int(db['dom2'].getData())
64 except: pass
65 try:
66 port1 = int(db['port1'].getData())
67 except: pass
68 try:
69 port2 = int(db['port2'].getData())
70 except: pass
71 evtchn = cls.interdomain(dom1, dom2, port1=port1, port2=port2)
72 return evtchn
74 restoreFromDB = classmethod(restoreFromDB)
76 def __init__(self, dom1, dom2, d):
77 d['dom1'] = dom1
78 d['dom2'] = dom2
79 self.update(d)
80 self.dom1 = dom1
81 self.dom2 = dom2
82 self.port1 = d.get('port1')
83 self.port2 = d.get('port2')
85 def close(self):
86 """Close the event channel.
87 """
88 def evtchn_close(dom, port):
89 try:
90 xc.evtchn_close(dom=dom, port=port)
91 except Exception, ex:
92 pass
94 if DEBUG:
95 print 'EventChannel>close>', self
96 evtchn_close(self.dom1, self.port1)
97 evtchn_close(self.dom2, self.port2)
99 def saveToDB(self, db, save=False):
100 """Save the event channel to the db so it can be restored later,
101 using restoreFromDB() on the class.
103 @param db db
104 """
105 db['dom1'] = str(self.dom1)
106 db['dom2'] = str(self.dom2)
107 db['port1'] = str(self.port1)
108 db['port2'] = str(self.port2)
109 db.saveDB(save=save)
111 def sxpr(self):
112 return ['event-channel',
113 ['dom1', self.dom1 ],
114 ['port1', self.port1 ],
115 ['dom2', self.dom2 ],
116 ['port2', self.port2 ]
117 ]
119 def __repr__(self):
120 return ("<EventChannel dom1:%d:%d dom2:%d:%d>"
121 % (self.dom1, self.port1, self.dom2, self.port2))
123 def eventChannel(dom1, dom2, port1=0, port2=0):
124 """Create an event channel between domains.
126 @return EventChannel (None on error)
127 """
128 return EventChannel.interdomain(dom1, dom2, port1=port1, port2=port2)
130 def eventChannelClose(evtchn):
131 """Close an event channel.
132 """
133 if not evtchn: return
134 evtchn.close()
136 class ChannelFactory:
137 """Factory for creating control channels.
138 Maintains a table of channels.
139 """
141 """ Channels indexed by index. """
142 channels = None
144 thread = None
146 notifier = None
148 def __init__(self):
149 """Constructor - do not use. Use the channelFactory function."""
150 self.channels = {}
151 self.notifier = xu.notifier()
153 def start(self):
154 """Fork a thread to read messages.
155 """
156 if self.thread: return
157 self.thread = threading.Thread(name="ChannelFactory",
158 target=self.main)
159 self.thread.setDaemon(True)
160 self.thread.start()
162 def stop(self):
163 """Signal the thread to stop.
164 """
165 self.thread = None
167 def main(self):
168 """Main routine for the thread.
169 Reads the notifier and dispatches to channels.
170 """
171 while True:
172 if self.thread == None: return
173 port = self.notifier.read()
174 if port:
175 self.msgReceived(port)
176 else:
177 select.select([self.notifier], [], [], 1.0)
179 def msgReceived(self, port):
180 # We run the message handlers in their own threads.
181 # Note we use keyword args to lambda to save the values -
182 # otherwise lambda will use the variables, which will get
183 # assigned by the loop and the lambda will get the changed values.
184 received = 0
185 for chan in self.channels.values():
186 if self.thread == None: return
187 msg = chan.readResponse()
188 if msg:
189 received += 1
190 chan.responseReceived(msg)
191 for chan in self.channels.values():
192 if self.thread == None: return
193 msg = chan.readRequest()
194 if msg:
195 received += 1
196 self.runInThread(lambda chan=chan, msg=msg: chan.requestReceived(msg))
197 if port and received == 0:
198 log.warning("Port %s notified, but no messages found", port)
200 def runInThread(self, thunk):
201 thread = threading.Thread(target = thunk)
202 thread.setDaemon(True)
203 thread.start()
205 def newChannel(self, dom, local_port, remote_port):
206 """Create a new channel.
207 """
208 return self.addChannel(Channel(self, dom, local_port, remote_port))
210 def addChannel(self, channel):
211 """Add a channel.
212 """
213 self.channels[channel.getKey()] = channel
214 return channel
216 def delChannel(self, channel):
217 """Remove the channel.
218 """
219 key = channel.getKey()
220 if key in self.channels:
221 del self.channels[key]
223 def getChannel(self, dom, local_port, remote_port):
224 """Get the channel with the given domain and ports (if any).
225 """
226 key = (dom, local_port, remote_port)
227 return self.channels.get(key)
229 def findChannel(self, dom, local_port=0, remote_port=0):
230 """Find a channel. Ports given as zero are wildcards.
232 dom domain
234 returns channel
235 """
236 chan = self.getChannel(dom, local_port, remote_port)
237 if chan: return chan
238 if local_port and remote_port:
239 return None
240 for c in self.channels.values():
241 if c.dom != dom: continue
242 if local_port and local_port != c.getLocalPort(): continue
243 if remote_port and remote_port != c.getRemotePort(): continue
244 return c
245 return None
247 def openChannel(self, dom, local_port=0, remote_port=0):
248 chan = self.findChannel(dom, local_port=local_port,
249 remote_port=remote_port)
250 if chan:
251 return chan
252 chan = self.newChannel(dom, local_port, remote_port)
253 return chan
256 def createPort(self, dom, local_port=0, remote_port=0):
257 """Create a port for a channel to the given domain.
258 If only the domain is specified, a new channel with new port ids is
259 created. If one port id is specified and the given port id is in use,
260 the other port id is filled. If one port id is specified and the
261 given port id is not in use, a new channel is created with one port
262 id equal to the given id and a new id for the other end. If both
263 port ids are specified, a port is reconnected using the given port
264 ids.
266 @param dom: domain
267 @param local: local port id to use
268 @type local: int
269 @param remote: remote port id to use
270 @type remote: int
271 @return: port object
272 """
273 return xu.port(dom, local_port=local_port, remote_port=remote_port)
275 def restoreFromDB(self, db, dom, local, remote):
276 """Create a channel using ports restored from the db (if available).
277 Otherwise use the given ports. This is the inverse operation to
278 saveToDB() on a channel.
280 @param db db
281 @param dom domain the channel connects to
282 @param local default local port
283 @param remote default remote port
284 """
285 try:
286 local_port = int(db['local_port'])
287 except:
288 local_port = local
289 try:
290 remote_port = int(db['remote_port'])
291 except:
292 remote_port = remote
293 try:
294 chan = self.openChannel(dom, local_port, remote_port)
295 except:
296 return None
297 return chan
299 def channelFactory():
300 """Singleton constructor for the channel factory.
301 Use this instead of the class constructor.
302 """
303 global inst
304 try:
305 inst
306 except:
307 inst = ChannelFactory()
308 return inst
310 class Channel:
311 """Control channel to a domain.
312 Maintains a list of device handlers to dispatch requests to, based
313 on the request type.
314 """
316 def __init__(self, factory, dom, local_port, remote_port):
317 self.factory = factory
318 self.dom = int(dom)
319 # Registered device handlers.
320 self.devs = []
321 # Handlers indexed by the message types they handle.
322 self.devs_by_type = {}
323 self.port = self.factory.createPort(self.dom,
324 local_port=local_port,
325 remote_port=remote_port)
326 self.closed = False
327 # Queue of waiters for responses to requests.
328 self.queue = ResponseQueue(self)
329 # Make sure the port will deliver all the messages.
330 self.port.register(TYPE_WILDCARD)
332 def saveToDB(self, db, save=False):
333 """Save the channel ports to the db so the channel can be restored later,
334 using restoreFromDB() on the factory.
336 @param db db
337 """
338 if self.closed: return
339 db['local_port'] = str(self.getLocalPort())
340 db['remote_port'] = str(self.getRemotePort())
341 db.saveDB(save=save)
343 def getKey(self):
344 """Get the channel key.
345 """
346 return (self.dom, self.getLocalPort(), self.getRemotePort())
348 def sxpr(self):
349 val = ['channel']
350 val.append(['domain', self.dom])
351 if self.port:
352 val.append(['local_port', self.port.local_port])
353 val.append(['remote_port', self.port.remote_port])
354 return val
356 def close(self):
357 """Close the channel.
358 """
359 if DEBUG:
360 print 'Channel>close>', self
361 if self.closed: return
362 self.closed = True
363 self.factory.delChannel(self)
364 for d in self.devs[:]:
365 d.lostChannel(self)
366 self.devs = []
367 self.devs_by_type = {}
368 if self.port:
369 self.port.close()
370 #self.port = None
372 def getDomain(self):
373 return self.dom
375 def getLocalPort(self):
376 """Get the local port.
378 @return: local port
379 @rtype: int
380 """
381 if self.closed: return -1
382 return self.port.local_port
384 def getRemotePort(self):
385 """Get the remote port.
387 @return: remote port
388 @rtype: int
389 """
390 if self.closed: return -1
391 return self.port.remote_port
393 def __repr__(self):
394 return ('<Channel dom=%d ports=%d:%d>'
395 % (self.dom,
396 self.getLocalPort(),
397 self.getRemotePort()))
400 def registerDevice(self, types, dev):
401 """Register a device message handler.
403 @param types: message types handled
404 @type types: array of ints
405 @param dev: device handler
406 """
407 if self.closed: return
408 self.devs.append(dev)
409 for ty in types:
410 self.devs_by_type[ty] = dev
412 def deregisterDevice(self, dev):
413 """Remove the registration for a device handler.
415 @param dev: device handler
416 """
417 if dev in self.devs:
418 self.devs.remove(dev)
419 types = [ ty for (ty, d) in self.devs_by_type.items() if d == dev ]
420 for ty in types:
421 del self.devs_by_type[ty]
423 def getDevice(self, type):
424 """Get the handler for a message type.
426 @param type: message type
427 @type type: int
428 @return: controller or None
429 @rtype: device handler
430 """
431 return self.devs_by_type.get(type)
433 def requestReceived(self, msg):
434 """A request has been received on the channel.
435 Disptach it to the device handlers.
436 Called from the channel factory thread.
437 """
438 if DEBUG:
439 print 'Channel>requestReceived>', self,
440 printMsg(msg)
441 (ty, subty) = getMessageType(msg)
442 responded = False
443 dev = self.getDevice(ty)
444 if dev:
445 responded = dev.requestReceived(msg, ty, subty)
446 elif DEBUG:
447 print "Channel>requestReceived> No device handler", self,
448 printMsg(msg)
449 else:
450 pass
451 if not responded:
452 self.writeResponse(msg)
454 def writeRequest(self, msg):
455 """Write a request to the channel.
456 """
457 if DEBUG:
458 print 'Channel>writeRequest>', self,
459 printMsg(msg, all=True)
460 if self.closed: return -1
461 self.port.write_request(msg)
462 return 1
464 def writeResponse(self, msg):
465 """Write a response to the channel.
466 """
467 if DEBUG:
468 print 'Channel>writeResponse>', self,
469 printMsg(msg, all=True)
470 if self.port:
471 self.port.write_response(msg)
472 return 1
474 def readRequest(self):
475 """Read a request from the channel.
476 Called internally.
477 """
478 if self.closed:
479 val = None
480 else:
481 val = self.port.read_request()
482 return val
484 def readResponse(self):
485 """Read a response from the channel.
486 Called internally.
487 """
488 if self.closed:
489 val = None
490 else:
491 val = self.port.read_response()
492 if DEBUG and val:
493 print 'Channel>readResponse>', self,
494 printMsg(val, all=True)
495 return val
497 def requestResponse(self, msg, timeout=None):
498 """Write a request and wait for a response.
499 Raises IOError on timeout.
501 @param msg request message
502 @param timeout timeout (0 is forever)
503 @return response message
504 """
505 if self.closed:
506 raise IOError("closed")
507 if self.closed:
508 return None
509 if timeout is None:
510 timeout = RESPONSE_TIMEOUT
511 elif timeout <= 0:
512 timeout = None
513 return self.queue.call(msg, timeout)
515 def responseReceived(self, msg):
516 """A response has been received, look for a waiter to
517 give it to.
518 Called internally.
519 """
520 if DEBUG:
521 print 'Channel>responseReceived>', self,
522 printMsg(msg)
523 self.queue.response(getMessageId(msg), msg)
525 class Response:
526 """Entry in the response queue.
527 Used to signal a response to a message.
528 """
530 def __init__(self, mid):
531 self.mid = mid
532 self.msg = None
533 self.ready = threading.Event()
535 def response(self, msg):
536 """Signal arrival of a response to a waiting thread.
537 Passing msg None cancels the wait with an IOError.
538 """
539 if msg:
540 self.msg = msg
541 else:
542 self.mid = -1
543 self.ready.set()
545 def wait(self, timeout):
546 """Wait up to 'timeout' seconds for a response.
547 Returns the response or raises an IOError.
548 """
549 self.ready.wait(timeout)
550 if self.mid < 0:
551 raise IOError("wait canceled")
552 if self.msg is None:
553 raise IOError("response timeout")
554 return self.msg
556 class ResponseQueue:
557 """Response queue. Manages waiters for responses to messages.
558 """
560 def __init__(self, channel):
561 self.channel = channel
562 self.lock = threading.Lock()
563 self.responses = {}
565 def add(self, mid):
566 r = Response(mid)
567 self.responses[mid] = r
568 return r
570 def get(self, mid):
571 return self.responses.get(mid)
573 def remove(self, mid):
574 r = self.responses.get(mid)
575 if r:
576 del self.responses[mid]
577 return r
579 def response(self, mid, msg):
580 """Process a response - signals any waiter that a response
581 has arrived.
582 """
583 try:
584 self.lock.acquire()
585 r = self.remove(mid)
586 finally:
587 self.lock.release()
588 if r:
589 r.response(msg)
591 def call(self, msg, timeout):
592 """Send the message and wait for 'timeout' seconds for a response.
593 Returns the response.
594 Raises IOError on timeout.
595 """
596 mid = getMessageId(msg)
597 try:
598 self.lock.acquire()
599 r = self.add(mid)
600 finally:
601 self.lock.release()
602 self.channel.writeRequest(msg)
603 return r.wait(timeout)