debuggers.hg

view tools/remus/remus @ 21067:b4a1832a916f

Update Xen version to 4.0.0-rc6
author Keir Fraser <keir.fraser@citrix.com>
date Tue Mar 09 18:18:05 2010 +0000 (2010-03-09)
parents 62ce32978d2e
children 8559e324941f
line source
1 #!/usr/bin/env python
2 #
3 # This is a save process which also buffers outgoing I/O between
4 # rounds, so that external viewers never see anything that hasn't
5 # been committed at the backup
6 #
7 # TODO: fencing.
9 import optparse, os, re, select, signal, sys, time
10 from xen.remus import save, vm
11 from xen.xend import XendOptions
12 from xen.remus import netlink, qdisc, util
14 class CfgException(Exception): pass
16 class Cfg(object):
17 def __init__(self):
18 # must be set
19 self.domid = 0
21 self.host = 'localhost'
22 self.port = XendOptions.instance().get_xend_relocation_port()
23 self.interval = 200
24 self.netbuffer = True
25 self.nobackup = False
26 self.timer = False
28 parser = optparse.OptionParser()
29 parser.usage = '%prog [options] domain [destination]'
30 parser.add_option('-i', '--interval', dest='interval', type='int',
31 metavar='MS',
32 help='checkpoint every MS milliseconds')
33 parser.add_option('-p', '--port', dest='port', type='int',
34 help='send stream to port PORT', metavar='PORT')
35 parser.add_option('', '--no-net', dest='nonet', action='store_true',
36 help='run without net buffering (benchmark option)')
37 parser.add_option('', '--timer', dest='timer', action='store_true',
38 help='force pause at checkpoint interval (experimental)')
39 parser.add_option('', '--no-backup', dest='nobackup',
40 action='store_true',
41 help='prevent backup from starting up (benchmark '
42 'option)')
43 self.parser = parser
45 def usage(self):
46 self.parser.print_help()
48 def getargs(self):
49 opts, args = self.parser.parse_args()
51 if opts.interval:
52 self.interval = opts.interval
53 if opts.port:
54 self.port = opts.port
55 if opts.nonet:
56 self.netbuffer = False
57 if opts.timer:
58 self.timer = True
60 if not args:
61 raise CfgException('Missing domain')
62 self.domid = args[0]
63 if (len(args) > 1):
64 self.host = args[1]
66 class ReplicatedDiskException(Exception): pass
68 class BufferedDevice(object):
69 'Base class for buffered devices'
71 def postsuspend(self):
72 'called after guest has suspended'
73 pass
75 def preresume(self):
76 'called before guest resumes'
77 pass
79 def commit(self):
80 'called when backup has acknowledged checkpoint reception'
81 pass
83 class ReplicatedDisk(BufferedDevice):
84 """
85 Send a checkpoint message to a replicated disk while the domain
86 is paused between epochs.
87 """
88 FIFODIR = '/var/run/tap'
90 def __init__(self, disk):
91 # look up disk, make sure it is tap:buffer, and set up socket
92 # to request commits.
93 self.ctlfd = None
95 if not disk.uname.startswith('tap:remus:') and not disk.uname.startswith('tap:tapdisk:remus:'):
96 raise ReplicatedDiskException('Disk is not replicated: %s' %
97 str(disk))
98 fifo = re.match("tap:.*(remus.*)\|", disk.uname).group(1).replace(':', '_')
99 absfifo = os.path.join(self.FIFODIR, fifo)
100 absmsgfifo = absfifo + '.msg'
102 self.installed = False
103 self.ctlfd = open(absfifo, 'w+b')
104 self.msgfd = open(absmsgfifo, 'r+b')
106 def __del__(self):
107 self.uninstall()
109 def setup(self):
110 #self.ctlfd.write('buffer')
111 #self.ctlfd.flush()
112 self.installed = True
114 def uninstall(self):
115 if self.ctlfd:
116 self.ctlfd.close()
117 self.ctlfd = None
119 def postsuspend(self):
120 if not self.installed:
121 self.setup()
123 os.write(self.ctlfd.fileno(), 'flush')
125 def commit(self):
126 msg = os.read(self.msgfd.fileno(), 4)
127 if msg != 'done':
128 print 'Unknown message: %s' % msg
130 class NetbufferException(Exception): pass
132 class Netbuffer(BufferedDevice):
133 """
134 Buffer a protected domain's network output between rounds so that
135 nothing is issued that a failover might not know about.
136 """
137 # shared rtnetlink handle
138 rth = None
140 def __init__(self, domid):
141 self.installed = False
143 if not self.rth:
144 self.rth = netlink.rtnl()
146 self.devname = self._startimq(domid)
147 dev = self.rth.getlink(self.devname)
148 if not dev:
149 raise NetbufferException('could not find device %s' % self.devname)
150 self.dev = dev['index']
151 self.handle = qdisc.TC_H_ROOT
152 self.q = qdisc.QueueQdisc()
154 def __del__(self):
155 self.uninstall()
157 def postsuspend(self):
158 if not self.installed:
159 self._setup()
161 self._sendqmsg(qdisc.TC_QUEUE_CHECKPOINT)
163 def commit(self):
164 '''Called when checkpoint has been acknowledged by
165 the backup'''
166 self._sendqmsg(qdisc.TC_QUEUE_RELEASE)
168 def _sendqmsg(self, action):
169 self.q.action = action
170 req = qdisc.changerequest(self.dev, self.handle, self.q)
171 self.rth.talk(req.pack())
173 def _setup(self):
174 q = self.rth.getqdisc(self.dev)
175 if q:
176 if q['kind'] == 'queue':
177 self.installed = True
178 return
179 if q['kind'] != 'pfifo_fast':
180 raise NetbufferException('there is already a queueing '
181 'discipline on %s' % self.devname)
183 print 'installing buffer on %s' % self.devname
184 req = qdisc.addrequest(self.dev, self.handle, self.q)
185 self.rth.talk(req.pack())
186 self.installed = True
188 def uninstall(self):
189 if self.installed:
190 req = qdisc.delrequest(self.dev, self.handle)
191 self.rth.talk(req.pack())
192 self.installed = False
194 def _startimq(self, domid):
195 # stopgap hack to set up IMQ for an interface. Wrong in many ways.
196 imqebt = '/usr/lib/xen/bin/imqebt'
197 imqdev = 'imq0'
198 vid = 'vif%d.0' % domid
199 for mod in ['sch_queue', 'imq', 'ebt_imq']:
200 util.runcmd(['modprobe', mod])
201 util.runcmd("ip link set %s up" % (imqdev))
202 util.runcmd("%s -F FORWARD" % (imqebt))
203 util.runcmd("%s -A FORWARD -i %s -j imq --todev %s" % (imqebt, vid, imqdev))
205 return imqdev
207 class SignalException(Exception): pass
209 def run(cfg):
210 closure = lambda: None
211 closure.cmd = None
213 def sigexception(signo, frame):
214 raise SignalException(signo)
216 def die():
217 # I am not sure what the best way to die is. xm destroy is another option,
218 # or we could attempt to trigger some instant reboot.
219 print "dying..."
220 print util.runcmd(['sudo', 'ifdown', 'eth2'])
221 # dangling imq0 handle on vif locks up the system
222 for buf in bufs:
223 buf.uninstall()
224 print util.runcmd(['sudo', 'xm', 'destroy', cfg.domid])
225 print util.runcmd(['sudo', 'ifup', 'eth2'])
227 def getcommand():
228 """Get a command to execute while running.
229 Commands include:
230 s: die prior to postsuspend hook
231 s2: die after postsuspend hook
232 r: die prior to preresume hook
233 r2: die after preresume hook
234 c: die prior to commit hook
235 c2: die after commit hook
236 """
237 r, w, x = select.select([sys.stdin], [], [], 0)
238 if sys.stdin not in r:
239 return
241 cmd = sys.stdin.readline().strip()
242 if cmd not in ('s', 's2', 'r', 'r2', 'c', 'c2'):
243 print "unknown command: %s" % cmd
244 closure.cmd = cmd
246 signal.signal(signal.SIGTERM, sigexception)
248 dom = vm.VM(cfg.domid)
250 # set up I/O buffers
251 bufs = []
253 # disks must commit before network can be released
254 for disk in dom.disks:
255 try:
256 bufs.append(ReplicatedDisk(disk))
257 except ReplicatedDiskException, e:
258 print e
259 continue
261 if cfg.netbuffer:
262 for vif in dom.vifs:
263 bufs.append(Netbuffer(dom.domid))
265 fd = save.MigrationSocket((cfg.host, cfg.port))
267 def postsuspend():
268 'Begin external checkpointing after domain has paused'
269 if not cfg.timer:
270 # when not using a timer thread, sleep until now + interval
271 closure.starttime = time.time()
273 if closure.cmd == 's':
274 die()
276 for buf in bufs:
277 buf.postsuspend()
279 if closure.cmd == 's2':
280 die()
282 def preresume():
283 'Complete external checkpointing before domain resumes'
284 if closure.cmd == 'r':
285 die()
287 for buf in bufs:
288 buf.preresume()
290 if closure.cmd == 'r2':
291 die()
293 def commit():
294 'commit network buffer'
295 if closure.cmd == 'c':
296 die()
298 print >> sys.stderr, "PROF: flushed memory at %0.6f" % (time.time())
300 for buf in bufs:
301 buf.commit()
303 if closure.cmd == 'c2':
304 die()
306 # Since the domain is running at this point, it's a good time to
307 # check for control channel commands
308 getcommand()
310 if not cfg.timer:
311 endtime = time.time()
312 elapsed = (endtime - closure.starttime) * 1000
314 if elapsed < cfg.interval:
315 time.sleep((cfg.interval - elapsed) / 1000.0)
317 # False ends checkpointing
318 return True
320 if cfg.timer:
321 interval = cfg.interval
322 else:
323 interval = 0
325 rc = 0
327 checkpointer = save.Saver(cfg.domid, fd, postsuspend, preresume, commit,
328 interval)
330 try:
331 checkpointer.start()
332 except save.CheckpointError, e:
333 print e
334 rc = 1
335 except KeyboardInterrupt:
336 pass
337 except SignalException:
338 print '*** signalled ***'
340 for buf in bufs:
341 buf.uninstall()
343 if cfg.nobackup:
344 # lame attempt to kill backup if protection is stopped deliberately.
345 # It would be much better to move this into the heartbeat "protocol".
346 print util.runcmd(['sudo', '-u', os.getlogin(), 'ssh', cfg.host, 'sudo', 'xm', 'destroy', dom.name])
348 sys.exit(rc)
350 cfg = Cfg()
351 try:
352 cfg.getargs()
353 except CfgException, inst:
354 print str(inst)
355 cfg.usage()
356 sys.exit(1)
358 try:
359 run(cfg)
360 except vm.VMException, inst:
361 print str(inst)
362 sys.exit(1)