debuggers.hg

view tools/python/xen/xend/XendMigrate.py @ 4672:d781b9d08e80

bitkeeper revision 1.1327.2.4 (426918a34Af7gihN8mTkq-P3KrAZXg)

Remove twisted from save/migrate handling.
This needs to use threads, so add thread support for
http server requests.

Signed-off-by: Mike Wray <mike.wray@hp.com>
author mjw@wray-m-3.hpl.hp.com
date Fri Apr 22 15:30:43 2005 +0000 (2005-04-22)
parents 445b12a7221a
children e81b04e1c86a
line source
1 # Copyright (C) 2004 Mike Wray <mike.wray@hp.com>
3 import traceback
4 import threading
6 import errno
7 import sys
8 import socket
9 import time
10 import types
12 from xen.web import reactor
13 from xen.web.protocol import Protocol, ClientFactory
15 import sxp
16 import XendDB
17 import EventServer; eserver = EventServer.instance()
18 from XendError import XendError
19 from XendLogging import log
21 """The port for the migrate/save daemon xfrd."""
22 XFRD_PORT = 8002
24 """The transfer protocol major version number."""
25 XFR_PROTO_MAJOR = 1
26 """The transfer protocol minor version number."""
27 XFR_PROTO_MINOR = 0
29 class Xfrd(Protocol):
30 """Protocol handler for a connection to the migration/save daemon xfrd.
31 """
33 def __init__(self, xinfo):
34 self.parser = sxp.Parser()
35 self.xinfo = xinfo
37 def connectionMade(self, addr=None):
38 # Send hello.
39 self.request(['xfr.hello', XFR_PROTO_MAJOR, XFR_PROTO_MINOR])
40 # Send request.
41 self.xinfo.request(self)
42 # Run the transport mainLoop which reads from the peer.
43 self.transport.mainLoop()
45 def request(self, req):
46 sxp.show(req, out=self.transport)
48 def loseConnection(self):
49 self.transport.loseConnection()
51 def connectionLost(self, reason):
52 self.xinfo.connectionLost(reason)
54 def dataReceived(self, data):
55 self.parser.input(data)
56 if self.parser.ready():
57 val = self.parser.get_val()
58 self.xinfo.dispatch(self, val)
59 if self.parser.at_eof():
60 self.loseConnection()
62 class XfrdClientFactory(ClientFactory):
63 """Factory for clients of the migration/save daemon xfrd.
64 """
66 def __init__(self, xinfo):
67 #ClientFactory.__init__(self)
68 self.xinfo = xinfo
69 self.readyCond = threading.Condition()
70 self.ready = False
71 self.err = None
73 def start(self):
74 print 'XfrdClientFactory>start>'
75 reactor.connectTCP('localhost', XFRD_PORT, self)
76 try:
77 self.readyCond.acquire()
78 while not self.ready:
79 self.readyCond.wait()
80 finally:
81 self.readyCond.release()
82 print 'XfrdClientFactory>start>', 'err=', self.err
83 if self.err:
84 raise self.err
85 return 0
87 def notifyReady(self):
88 try:
89 self.readyCond.acquire()
90 self.ready = True
91 self.err = self.xinfo.error_summary()
92 self.readyCond.notify()
93 finally:
94 self.readyCond.release()
96 def startedConnecting(self, connector):
97 pass
99 def buildProtocol(self, addr):
100 return Xfrd(self.xinfo)
102 def clientConnectionLost(self, connector, reason):
103 print "XfrdClientFactory>clientConnectionLost>", reason
104 self.notifyReady()
106 def clientConnectionFailed(self, connector, reason):
107 print "XfrdClientFactory>clientConnectionFailed>", reason
108 self.xinfo.error(reason)
109 self.notifyReady()
111 class SuspendHandler:
113 def __init__(self, xinfo, vmid, timeout):
114 self.xinfo = xinfo
115 self.vmid = vmid
116 self.timeout = timeout
117 self.readyCond = threading.Condition()
118 self.ready = False
119 self.err = None
121 def start(self):
122 self.subscribe(on=True)
123 timer = reactor.callLater(self.timeout, self.onTimeout)
124 try:
125 self.readyCond.acquire()
126 while not self.ready:
127 self.readyCond.wait()
128 finally:
129 self.readyCond.release()
130 self.subscribe(on=False)
131 timer.cancel()
132 if self.err:
133 raise XendError(self.err)
135 def notifyReady(self, err=None):
136 try:
137 self.readyCond.acquire()
138 if not self.ready:
139 self.ready = True
140 self.err = err
141 self.readyCond.notify()
142 finally:
143 self.readyCond.release()
145 def subscribe(self, on=True):
146 # Subscribe to 'suspended' events so we can tell when the
147 # suspend completes. Subscribe to 'died' events so we can tell if
148 # the domain died.
149 if on:
150 action = eserver.subscribe
151 else:
152 action = eserver.unsubscribe
153 action('xend.domain.suspended', self.onSuspended)
154 action('xend.domain.died', self.onDied)
156 def onSuspended(self, e, v):
157 if v[1] != self.vmid: return
158 print 'SuspendHandler>onSuspended>', e, v
159 self.notifyReady()
161 def onDied(self, e, v):
162 if v[1] != self.vmid: return
163 print 'SuspendHandler>onDied>', e, v
164 self.notifyReady('Domain %s died while suspending' % self.vmid)
166 def onTimeout(self):
167 print 'SuspendHandler>onTimeout>'
168 self.notifyReady('Domain %s suspend timed out' % self.vmid)
170 class XfrdInfo:
171 """Abstract class for info about a session with xfrd.
172 Has subclasses for save and migrate.
173 """
175 """Suspend timeout (seconds).
176 We set a timeout because suspending a domain can hang."""
177 timeout = 30
179 def __init__(self):
180 from xen.xend import XendDomain
181 self.xd = XendDomain.instance()
182 self.suspended = {}
183 self.paused = {}
184 self.state = 'init'
185 # List of errors encountered.
186 self.errors = []
188 def vmconfig(self):
189 dominfo = self.xd.domain_get(self.src_dom)
190 if dominfo:
191 val = sxp.to_string(dominfo.sxpr())
192 else:
193 val = None
194 return val
196 def add_error(self, err):
197 """Add an error to the error list.
198 Returns the error added.
199 """
200 #while isinstance(err, Failure):
201 # err = err.value
202 if err not in self.errors:
203 self.errors.append(err)
204 return err
206 def error_summary(self, msg=None):
207 """Get a XendError summarising the errors (if any).
208 """
209 if not self.errors:
210 return None
211 if msg is None:
212 msg = "errors"
213 if self.errors:
214 errmsg = msg + ': ' + ', '.join(map(str, self.errors))
215 else:
216 errmsg = msg
217 return XendError(errmsg)
219 def get_errors(self):
220 """Get the list of errors.
221 """
222 return self.errors
224 def error(self, err):
225 print 'XfrdInfo>error>', err
226 self.state = 'error'
227 self.add_error(err)
229 def dispatch(self, xfrd, val):
230 print 'XfrdInfo>dispatch>', val
231 op = sxp.name(val)
232 op = op.replace('.', '_')
233 if op.startswith('xfr_'):
234 fn = getattr(self, op, self.unknown)
235 else:
236 fn = self.unknown
237 try:
238 val = fn(xfrd, val)
239 if val:
240 sxp.show(val, out=xfrd.transport)
241 except Exception, err:
242 print 'XfrdInfo>dispatch> error:', err
243 val = ['xfr.err', errno.EINVAL]
244 sxp.show(val, out=xfrd.transport)
245 self.error(err)
247 def unknown(self, xfrd, val):
248 xfrd.loseConnection()
249 return None
251 def xfr_err(self, xfrd, val):
252 # If we get an error with non-zero code the operation failed.
253 # An error with code zero indicates hello success.
254 print 'XfrdInfo>xfr_err>', val
255 v = sxp.child0(val)
256 err = int(sxp.child0(val))
257 if not err: return
258 self.error("transfer daemon (xfrd) error: " + str(err))
259 xfrd.loseConnection()
260 return None
262 def xfr_progress(self, xfrd, val):
263 return None
265 def xfr_vm_destroy(self, xfrd, val):
266 try:
267 vmid = sxp.child0(val)
268 val = self.xd.domain_destroy(vmid)
269 if vmid in self.paused:
270 del self.paused[vmid]
271 if vmid in self.suspended:
272 del self.suspended[vmid]
273 except StandardError, err:
274 self.add_error("vm_destroy failed")
275 self.add_error(err)
276 val = errno.EINVAL
277 return ['xfr.err', val]
279 def xfr_vm_pause(self, xfrd, val):
280 try:
281 vmid = sxp.child0(val)
282 val = self.xd.domain_pause(vmid)
283 self.paused[vmid] = 1
284 except StandardError, err:
285 self.add_error("vm_pause failed")
286 self.add_error(err)
287 val = errno.EINVAL
288 return ['xfr.err', val]
290 def xfr_vm_unpause(self, xfrd, val):
291 try:
292 vmid = sxp.child0(val)
293 val = self.xd.domain_unpause(vmid)
294 if vmid in self.paused:
295 del self.paused[vmid]
296 except StandardError, err:
297 self.add_error("vm_unpause failed")
298 self.add_error(err)
299 val = errno.EINVAL
300 return ['xfr.err', val]
302 def xfr_vm_suspend(self, xfrd, val):
303 """Suspend a domain.
304 Suspending can hang, so we set a timeout and fail if it
305 takes too long.
306 """
307 try:
308 vmid = sxp.child0(val)
309 h = SuspendHandler(self, vmid, self.timeout)
310 val = self.xd.domain_shutdown(vmid, reason='suspend')
311 self.suspended[vmid] = 1
312 h.start()
313 print 'xfr_vm_suspend> suspended', vmid
314 except Exception, err:
315 print 'xfr_vm_suspend> err', err
316 self.add_error("suspend failed")
317 self.add_error(err)
318 traceback.print_exc()
319 val = errno.EINVAL
320 return ['xfr.err', val]
322 def connectionLost(self, reason=None):
323 print 'XfrdInfo>connectionLost>', reason
324 for vmid in self.suspended:
325 try:
326 self.xd.domain_destroy(vmid)
327 except:
328 pass
329 for vmid in self.paused:
330 try:
331 self.xd.domain_unpause(vmid)
332 except:
333 pass
335 class XendMigrateInfo(XfrdInfo):
336 """Representation of a migrate in-progress and its interaction with xfrd.
337 """
339 def __init__(self, xid, dominfo, host, port, live=0, resource=0):
340 XfrdInfo.__init__(self)
341 self.xid = xid
342 self.dominfo = dominfo
343 self.state = 'begin'
344 self.src_host = socket.gethostname()
345 self.src_dom = dominfo.id
346 self.dst_host = host
347 self.dst_port = port
348 self.dst_dom = None
349 self.live = live
350 self.resource = resource
351 self.start = 0
353 def sxpr(self):
354 sxpr = ['migrate',
355 ['id', self.xid ],
356 ['state', self.state ],
357 ['live', self.live ],
358 ['resource', self.resource ] ]
359 sxpr_src = ['src', ['host', self.src_host], ['domain', self.src_dom] ]
360 sxpr.append(sxpr_src)
361 sxpr_dst = ['dst', ['host', self.dst_host] ]
362 if self.dst_dom:
363 sxpr_dst.append(['domain', self.dst_dom])
364 sxpr.append(sxpr_dst)
365 return sxpr
367 def request(self, xfrd):
368 vmconfig = self.vmconfig()
369 if not vmconfig:
370 self.error(XendError("vm config not found"))
371 xfrd.loseConnection()
372 return
373 log.info('Migrate BEGIN: %s' % str(self.sxpr()))
374 eserver.inject('xend.domain.migrate',
375 [ self.dominfo.name, self.dominfo.id, "begin", self.sxpr() ])
376 xfrd.request(['xfr.migrate',
377 self.src_dom,
378 vmconfig,
379 self.dst_host,
380 self.dst_port,
381 self.live,
382 self.resource ])
384 def xfr_migrate_ok(self, xfrd, val):
385 dom = int(sxp.child0(val))
386 self.state = 'ok'
387 self.dst_dom = dom
388 self.xd.domain_destroy(self.src_dom)
389 #if not self.deferred.called:
390 # self.deferred.callback(self)
392 def connectionLost(self, reason=None):
393 print 'XendMigrateInfo>connectionLost>', reason
394 XfrdInfo.connectionLost(self, reason)
395 if self.state =='ok':
396 log.info('Migrate OK: ' + str(self.sxpr()))
397 else:
398 self.state = 'error'
399 self.error("migrate failed")
400 log.info('Migrate ERROR: ' + str(self.sxpr()))
401 eserver.inject('xend.domain.migrate',
402 [ self.dominfo.name, self.dominfo.id, self.state, self.sxpr() ])
404 class XendSaveInfo(XfrdInfo):
405 """Representation of a save in-progress and its interaction with xfrd.
406 """
408 def __init__(self, xid, dominfo, file):
409 XfrdInfo.__init__(self)
410 self.xid = xid
411 self.dominfo = dominfo
412 self.state = 'begin'
413 self.src_dom = dominfo.id
414 self.file = file
415 self.start = 0
417 def sxpr(self):
418 sxpr = ['save',
419 ['id', self.xid],
420 ['state', self.state],
421 ['domain', self.src_dom],
422 ['file', self.file] ]
423 return sxpr
425 def request(self, xfrd):
426 vmconfig = self.vmconfig()
427 if not vmconfig:
428 self.error(XendError("vm config not found"))
429 xfrd.loseConnection()
430 return
431 log.info('Save BEGIN: ' + str(self.sxpr()))
432 eserver.inject('xend.domain.save',
433 [ self.dominfo.name, self.dominfo.id,
434 "begin", self.sxpr() ])
435 xfrd.request(['xfr.save', self.src_dom, vmconfig, self.file ])
437 def xfr_save_ok(self, xfrd, val):
438 self.state = 'ok'
439 self.xd.domain_destroy(self.src_dom)
440 #if not self.deferred.called:
441 # self.deferred.callback(self)
443 def connectionLost(self, reason=None):
444 print 'XendSaveInfo>connectionLost>', reason
445 XfrdInfo.connectionLost(self, reason)
446 if self.state =='ok':
447 log.info('Save OK: ' + str(self.sxpr()))
448 else:
449 self.state = 'error'
450 self.error("save failed")
451 log.info('Save ERROR: ' + str(self.sxpr()))
452 eserver.inject('xend.domain.save',
453 [ self.dominfo.name, self.dominfo.id,
454 self.state, self.sxpr() ])
456 class XendRestoreInfo(XfrdInfo):
457 """Representation of a restore in-progress and its interaction with xfrd.
458 """
460 def __init__(self, xid, file):
461 XfrdInfo.__init__(self)
462 self.xid = xid
463 self.state = 'begin'
464 self.file = file
466 def sxpr(self):
467 sxpr = ['restore',
468 ['id', self.xid],
469 ['file', self.file] ]
470 return sxpr
472 def request(self, xfrd):
473 log.info('restore BEGIN: ' + str(self.sxpr()))
474 eserver.inject('xend.restore', [ 'begin', self.sxpr()])
476 xfrd.request(['xfr.restore', self.file ])
478 def xfr_restore_ok(self, xfrd, val):
479 dom = int(sxp.child0(val))
480 dominfo = self.xd.domain_get(dom)
481 self.state = 'ok'
482 if not self.deferred.called:
483 self.deferred.callback(dominfo)
485 def connectionLost(self, reason=None):
486 XfrdInfo.connectionLost(self, reason)
487 if self.state =='ok':
488 log.info('Restore OK: ' + self.file)
489 else:
490 self.state = 'error'
491 self.error("restore failed")
492 log.info('Restore ERROR: ' + str(self.sxpr()))
493 eserver.inject('xend.restore', [ self.state, self.sxpr()])
495 class XendMigrate:
496 """External api for interaction with xfrd for migrate and save.
497 Singleton.
498 """
499 # Use log for indications of begin/end/errors?
500 # Need logging of: domain create/halt, migrate begin/end/fail
501 # Log via event server?
503 dbpath = "migrate"
505 def __init__(self):
506 self.db = XendDB.XendDB(self.dbpath)
507 self.session = {}
508 self.session_db = self.db.fetchall("")
509 self.xid = 0
511 def nextid(self):
512 self.xid += 1
513 return "%d" % self.xid
515 def sync(self):
516 self.db.saveall("", self.session_db)
518 def sync_session(self, xid):
519 self.db.save(xid, self.session_db[xid])
521 def close(self):
522 pass
524 def _add_session(self, info):
525 xid = info.xid
526 self.session[xid] = info
527 self.session_db[xid] = info.sxpr()
528 self.sync_session(xid)
530 def _delete_session(self, xid):
531 if xid in self.session:
532 del self.session[xid]
533 if xid in self.session_db:
534 del self.session_db[xid]
535 self.db.delete(xid)
537 def session_ls(self):
538 return self.session.keys()
540 def sessions(self):
541 return self.session.values()
543 def session_get(self, xid):
544 return self.session.get(xid)
546 def session_begin(self, info):
547 """Add the session to the table and start it.
548 Remove the session from the table when it finishes.
550 @param info: session
551 @return: deferred
552 """
553 self._add_session(info)
554 try:
555 xcf = XfrdClientFactory(info)
556 return xcf.start()
557 finally:
558 self._delete_session(info.xid)
560 def migrate_begin(self, dominfo, host, port=XFRD_PORT, live=0, resource=0):
561 """Begin to migrate a domain to another host.
563 @param dominfo: domain info
564 @param host: destination host
565 @param port: destination port
566 @return: deferred
567 """
568 xid = self.nextid()
569 info = XendMigrateInfo(xid, dominfo, host, port, live, resource)
570 return self.session_begin(info)
572 def save_begin(self, dominfo, file):
573 """Begin saving a domain to file.
575 @param dominfo: domain info
576 @param file: destination file
577 @return: deferred
578 """
579 xid = self.nextid()
580 info = XendSaveInfo(xid, dominfo, file)
581 return self.session_begin(info)
583 def restore_begin(self, file):
584 xid = self.nextid()
585 info = XendRestoreInfo(xid, file)
586 return self.session_begin(info)
589 def instance():
590 global inst
591 try:
592 inst
593 except:
594 inst = XendMigrate()
595 return inst