debuggers.hg

view tools/python/xen/xend/XendMigrate.py @ 2628:98bdf2c88015

bitkeeper revision 1.1159.1.201 (41600e1fkVMoQU0dVgk1h6vT502hEg)

Merge
author iap10@labyrinth.cl.cam.ac.uk
date Sun Oct 03 14:35:11 2004 +0000 (2004-10-03)
parents f7b2e90dac20 05ae99de2d3f
children cc42f35f9597 4b6dc2da0d4c 41e31d1ac03f 5233708cfa46 8aa9d487a8dd
line source
1 # Copyright (C) 2004 Mike Wray <mike.wray@hp.com>
3 import traceback
5 import errno
6 import sys
7 import socket
8 import time
10 from twisted.internet import reactor
11 from twisted.internet import defer
12 #defer.Deferred.debug = 1
13 from twisted.internet.protocol import Protocol
14 from twisted.internet.protocol import ClientFactory
16 import sxp
17 import XendDB
18 import EventServer; eserver = EventServer.instance()
19 from XendError import XendError
20 from XendLogging import log
22 """The port for the migrate/save daemon xfrd."""
23 XFRD_PORT = 8002
25 """The transfer protocol major version number."""
26 XFR_PROTO_MAJOR = 1
27 """The transfer protocol minor version number."""
28 XFR_PROTO_MINOR = 0
30 class Xfrd(Protocol):
31 """Protocol handler for a connection to the migration/save daemon xfrd.
32 """
34 def __init__(self, xinfo):
35 self.parser = sxp.Parser()
36 self.xinfo = xinfo
38 def connectionMade(self):
39 # Send hello.
40 self.request(['xfr.hello', XFR_PROTO_MAJOR, XFR_PROTO_MINOR])
41 # Send request.
42 self.xinfo.request(self)
44 def request(self, req):
45 sxp.show(req, out=self.transport)
47 def loseConnection(self):
48 print 'Xfrd>loseConnection>'
49 self.transport.loseConnection()
51 def connectionLost(self, reason):
52 print 'Xfrd>connectionLost>', reason
53 self.xinfo.connectionLost(reason)
55 def dataReceived(self, data):
56 self.parser.input(data)
57 if self.parser.ready():
58 val = self.parser.get_val()
59 self.xinfo.dispatch(self, val)
60 if self.parser.at_eof():
61 self.loseConnection()
64 class XfrdClientFactory(ClientFactory):
65 """Factory for clients of the migration/save daemon xfrd.
66 """
68 def __init__(self, xinfo):
69 #ClientFactory.__init__(self)
70 self.xinfo = xinfo
72 def startedConnecting(self, connector):
73 print 'Started to connect', 'self=', self, 'connector=', connector
75 def buildProtocol(self, addr):
76 print 'buildProtocol>', addr
77 return Xfrd(self.xinfo)
79 def clientConnectionLost(self, connector, reason):
80 print 'clientConnectionLost>', 'connector=', connector, 'reason=', reason
82 def clientConnectionFailed(self, connector, reason):
83 print 'clientConnectionFailed>', 'connector=', connector, 'reason=', reason
84 self.xinfo.error(reason)
86 class XfrdInfo:
87 """Abstract class for info about a session with xfrd.
88 Has subclasses for save and migrate.
89 """
91 """Suspend timeout (seconds).
92 We set a timeout because suspending a domain can hang."""
93 timeout = 30
95 def __init__(self):
96 from xen.xend import XendDomain
97 self.xd = XendDomain.instance()
98 self.deferred = defer.Deferred()
99 self.suspended = {}
100 self.paused = {}
102 def vmconfig(self):
103 dominfo = self.xd.domain_get(self.src_dom)
104 if dominfo:
105 val = sxp.to_string(dominfo.sxpr())
106 else:
107 val = None
108 return val
110 def error(self, err):
111 print 'Error>', err
112 self.state = 'error'
113 if not self.deferred.called:
114 print 'Error> calling errback'
115 self.deferred.errback(err)
117 def dispatch(self, xfrd, val):
119 def cbok(v):
120 if v is None: return
121 sxp.show(v, out=xfrd.transport)
123 def cberr(err):
124 v = ['xfr.err', errno.EINVAL]
125 sxp.show(v, out=xfrd.transport)
126 self.error(err)
128 op = sxp.name(val)
129 op = op.replace('.', '_')
130 if op.startswith('xfr_'):
131 fn = getattr(self, op, self.unknown)
132 else:
133 fn = self.unknown
134 val = fn(xfrd, val)
135 if isinstance(val, defer.Deferred):
136 val.addCallback(cbok)
137 val.addErrback(cberr)
138 else:
139 cbok(val)
141 def unknown(self, xfrd, val):
142 print 'unknown>', val
143 xfrd.loseConnection()
144 return None
146 def xfr_err(self, xfrd, val):
147 # If we get an error with non-zero code the operation failed.
148 # An error with code zero indicates hello success.
149 print 'xfr_err>', val
150 v = sxp.child0(val)
151 print 'xfr_err>', type(v), v
152 err = int(sxp.child0(val))
153 if not err: return
154 self.error(err);
155 xfrd.loseConnection()
156 return None
158 def xfr_progress(self, xfrd, val):
159 print 'xfr_progress>', val
160 return None
162 def xfr_vm_destroy(self, xfrd, val):
163 print 'xfr_vm_destroy>', val
164 try:
165 vmid = sxp.child0(val)
166 val = self.xd.domain_destroy(vmid)
167 if vmid in self.paused:
168 del self.paused[vmid]
169 if vmid in self.suspended:
170 del self.suspended[vmid]
171 except:
172 val = errno.EINVAL
173 return ['xfr.err', val]
175 def xfr_vm_pause(self, xfrd, val):
176 print 'xfr_vm_pause>', val
177 try:
178 vmid = sxp.child0(val)
179 val = self.xd.domain_pause(vmid)
180 self.paused[vmid] = 1
181 except:
182 val = errno.EINVAL
183 return ['xfr.err', val]
185 def xfr_vm_unpause(self, xfrd, val):
186 print 'xfr_vm_unpause>', val
187 try:
188 vmid = sxp.child0(val)
189 val = self.xd.domain_unpause(vmid)
190 if vmid in self.paused:
191 del self.paused[vmid]
192 except:
193 val = errno.EINVAL
194 return ['xfr.err', val]
196 def xfr_vm_suspend(self, xfrd, val):
197 """Suspend a domain. Suspending takes time, so we return
198 a Deferred that is called when the suspend completes.
199 Suspending can hang, so we set a timeout and fail if it
200 takes too long.
201 """
202 print 'xfr_vm_suspend>', val
203 try:
204 vmid = sxp.child0(val)
205 d = defer.Deferred()
206 # Subscribe to 'suspended' events so we can tell when the
207 # suspend completes. Subscribe to 'died' events so we can tell if
208 # the domain died. Set a timeout and error handler so the subscriptions
209 # will be cleaned up if suspending hangs or there is an error.
210 def onSuspended(e, v):
211 print 'xfr_vm_suspend>onSuspended>', e, v
212 if v[1] != vmid: return
213 subscribe(on=0)
214 d.callback(v)
216 def onDied(e, v):
217 print 'xfr_vm_suspend>onDied>', e, v
218 if v[1] != vmid: return
219 d.errback(XendError('Domain died'))
221 def subscribe(on=1):
222 if on:
223 action = eserver.subscribe
224 else:
225 action = eserver.unsubscribe
226 action('xend.domain.suspended', onSuspended)
227 action('xend.domain.died', onDied)
229 def cberr(err):
230 print 'xfr_vm_suspend>cberr>', err
231 subscribe(on=0)
232 return err
234 subscribe()
235 val = self.xd.domain_shutdown(vmid, reason='suspend')
236 self.suspended[vmid] = 1
237 d.addErrback(cberr)
238 d.setTimeout(self.timeout)
239 return d
240 except Exception, err:
241 print 'xfr_vm_suspend> Exception', err
242 traceback.print_exc()
243 val = errno.EINVAL
244 return ['xfr.err', val]
246 def connectionLost(self, reason=None):
247 print 'XfrdInfo>connectionLost>', reason
248 for vmid in self.suspended:
249 try:
250 self.xd.domain_destroy(vmid)
251 except:
252 pass
253 for vmid in self.paused:
254 try:
255 self.xd.domain_unpause(vmid)
256 except:
257 pass
259 class XendMigrateInfo(XfrdInfo):
260 """Representation of a migrate in-progress and its interaction with xfrd.
261 """
263 def __init__(self, xid, dominfo, host, port, live):
264 XfrdInfo.__init__(self)
265 self.xid = xid
266 self.dominfo = dominfo
267 self.state = 'begin'
268 self.src_host = socket.gethostname()
269 self.src_dom = dominfo.id
270 self.dst_host = host
271 self.dst_port = port
272 self.dst_dom = None
273 self.live = live
274 self.start = 0
276 def sxpr(self):
277 sxpr = ['migrate',
278 ['id', self.xid ],
279 ['state', self.state ],
280 ['live', self.live ] ]
281 sxpr_src = ['src', ['host', self.src_host], ['domain', self.src_dom] ]
282 sxpr.append(sxpr_src)
283 sxpr_dst = ['dst', ['host', self.dst_host] ]
284 if self.dst_dom:
285 sxpr_dst.append(['domain', self.dst_dom])
286 sxpr.append(sxpr_dst)
287 return sxpr
289 def request(self, xfrd):
290 vmconfig = self.vmconfig()
291 if not vmconfig:
292 xfrd.loseConnection()
293 return
294 log.info('Migrate BEGIN: ' + str(self.sxpr()))
295 eserver.inject('xend.domain.migrate',
296 [ self.dominfo.name, self.dominfo.id,
297 "begin", self.sxpr() ])
298 xfrd.request(['xfr.migrate',
299 self.src_dom,
300 vmconfig,
301 self.dst_host,
302 self.dst_port,
303 self.live ])
305 ## def xfr_vm_suspend(self, xfrd, val):
306 ## def cbok(val):
307 ## # Special case for localhost: destroy devices early.
308 ## if self.dst_host in ["localhost", "127.0.0.1"]:
309 ## self.dominfo.restart_cancel()
310 ## self.dominfo.cleanup()
311 ## self.dominfo.destroy_console()
312 ## return val
314 ## d = XfrdInfo.xfr_vm_suspend(self, xfrd, val)
315 ## d.addCallback(cbok)
316 ## return d
318 def xfr_migrate_ok(self, xfrd, val):
319 dom = int(sxp.child0(val))
320 self.state = 'ok'
321 self.dst_dom = dom
322 self.xd.domain_destroy(self.src_dom)
323 if not self.deferred.called:
324 self.deferred.callback(self)
326 def connectionLost(self, reason=None):
327 print 'XfrdMigrateInfo>connectionLost>', reason
328 XfrdInfo.connectionLost(self, reason)
329 if self.state =='ok':
330 log.info('Migrate OK: ' + str(self.sxpr()))
331 else:
332 self.state = 'error'
333 self.error(XendError("migrate failed"))
334 log.info('Migrate ERROR: ' + str(self.sxpr()))
335 eserver.inject('xend.domain.migrate',
336 [ self.dominfo.name, self.dominfo.id,
337 self.state, self.sxpr() ])
339 class XendSaveInfo(XfrdInfo):
340 """Representation of a save in-progress and its interaction with xfrd.
341 """
343 def __init__(self, xid, dominfo, file):
344 XfrdInfo.__init__(self)
345 self.xid = xid
346 self.dominfo = dominfo
347 self.state = 'begin'
348 self.src_dom = dominfo.id
349 self.file = file
350 self.start = 0
352 def sxpr(self):
353 sxpr = ['save',
354 ['id', self.xid],
355 ['state', self.state],
356 ['domain', self.src_dom],
357 ['file', self.file] ]
358 return sxpr
360 def request(self, xfrd):
361 print '***request>', self.vmconfig()
362 vmconfig = self.vmconfig()
363 if not vmconfig:
364 xfrd.loseConnection()
365 return
366 print '***request> begin'
367 log.info('Save BEGIN: ' + str(self.sxpr()))
368 eserver.inject('xend.domain.save',
369 [self.dominfo.name, self.dominfo.id,
370 "begin", self.sxpr()])
371 xfrd.request(['xfr.save', self.src_dom, vmconfig, self.file ])
373 def xfr_save_ok(self, xfrd, val):
374 self.state = 'ok'
375 self.xd.domain_destroy(self.src_dom)
376 if not self.deferred.called:
377 self.deferred.callback(self)
379 def connectionLost(self, reason=None):
380 print 'XfrdSaveInfo>connectionLost>', reason
381 XfrdInfo.connectionLost(self, reason)
382 if self.state =='ok':
383 log.info('Save OK: ' + str(self.sxpr()))
384 else:
385 self.state = 'error'
386 self.error(XendError("save failed"))
387 log.info('Save ERROR: ' + str(self.sxpr()))
388 eserver.inject('xend.domain.save',
389 [ self.dominfo.name, self.dominfo.id,
390 self.state, self.sxpr() ])
392 class XendRestoreInfo(XfrdInfo):
393 """Representation of a restore in-progress and its interaction with xfrd.
394 """
396 def __init__(self, xid, file):
397 XfrdInfo.__init__(self)
398 self.xid = xid
399 self.state = 'begin'
400 self.file = file
402 def sxpr(self):
403 sxpr = ['restore',
404 ['id', self.xid],
405 ['file', self.file] ]
406 return sxpr
408 def request(self, xfrd):
409 print '***request>', self.file
410 log.info('restore BEGIN: ' + str(self.sxpr()))
411 xfrd.request(['xfr.restore', self.file ])
413 def xfr_restore_ok(self, xfrd, val):
414 dom = int(sxp.child0(val))
415 dominfo = self.xd.domain_get(dom)
416 self.state = 'ok'
417 if not self.deferred.called:
418 self.deferred.callback(dominfo)
421 class XendMigrate:
422 """External api for interaction with xfrd for migrate and save.
423 Singleton.
424 """
425 # Use log for indications of begin/end/errors?
426 # Need logging of: domain create/halt, migrate begin/end/fail
427 # Log via event server?
429 dbpath = "migrate"
431 def __init__(self):
432 self.db = XendDB.XendDB(self.dbpath)
433 self.session = {}
434 self.session_db = self.db.fetchall("")
435 self.xid = 0
437 def nextid(self):
438 self.xid += 1
439 return "%d" % self.xid
441 def sync(self):
442 self.db.saveall("", self.session_db)
444 def sync_session(self, xid):
445 print 'sync_session>', type(xid), xid, self.session_db[xid]
446 self.db.save(xid, self.session_db[xid])
448 def close(self):
449 pass
451 def _add_session(self, info):
452 xid = info.xid
453 self.session[xid] = info
454 self.session_db[xid] = info.sxpr()
455 self.sync_session(xid)
457 def _delete_session(self, xid):
458 print '***_delete_session>', xid
459 if xid in self.session:
460 del self.session[xid]
461 if xid in self.session_db:
462 del self.session_db[xid]
463 self.db.delete(xid)
465 def session_ls(self):
466 return self.session.keys()
468 def sessions(self):
469 return self.session.values()
471 def session_get(self, xid):
472 return self.session.get(xid)
474 def session_begin(self, info):
475 """Add the session to the table and start it.
476 Set up callbacks to remove the session from the table
477 when it finishes.
479 @param info: session
480 @return: deferred
481 """
482 def cbremove(val):
483 print '***cbremove>', val
484 self._delete_session(info.xid)
485 return val
486 self._add_session(info)
487 info.deferred.addCallback(cbremove)
488 info.deferred.addErrback(cbremove)
489 xcf = XfrdClientFactory(info)
490 reactor.connectTCP('localhost', XFRD_PORT, xcf)
491 return info.deferred
493 def migrate_begin(self, dominfo, host, port=XFRD_PORT, live=0):
494 """Begin to migrate a domain to another host.
496 @param dominfo: domain info
497 @param host: destination host
498 @param port: destination port
499 @return: deferred
500 """
501 xid = self.nextid()
502 info = XendMigrateInfo(xid, dominfo, host, port, live)
503 return self.session_begin(info)
505 def save_begin(self, dominfo, file):
506 """Begin saving a domain to file.
508 @param dominfo: domain info
509 @param file: destination file
510 @return: deferred
511 """
512 xid = self.nextid()
513 info = XendSaveInfo(xid, dominfo, file)
514 return self.session_begin(info)
516 def restore_begin(self, file):
517 xid = self.nextid()
518 info = XendRestoreInfo(xid, file)
519 return self.session_begin(info)
522 def instance():
523 global inst
524 try:
525 inst
526 except:
527 inst = XendMigrate()
528 return inst