debuggers.hg
changeset 21286:321dddf767e2
Remus: move device handling into its own module
Signed-off-by: Brendan Cully <brendan@cs.ubc.ca>
Signed-off-by: Brendan Cully <brendan@cs.ubc.ca>
author | Keir Fraser <keir.fraser@citrix.com> |
---|---|
date | Tue May 04 09:34:23 2010 +0100 (2010-05-04) |
parents | 8559e324941f |
children | d6bd61a5a0fc |
files | tools/python/xen/remus/device.py tools/remus/remus |
line diff
1.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000 1.2 +++ b/tools/python/xen/remus/device.py Tue May 04 09:34:23 2010 +0100 1.3 @@ -0,0 +1,140 @@ 1.4 +# Remus device interface 1.5 +# 1.6 +# Coordinates with devices at suspend, resume, and commit hooks 1.7 + 1.8 +import os 1.9 + 1.10 +import netlink, qdisc, util 1.11 + 1.12 +class CheckpointedDevice(object): 1.13 + 'Base class for buffered devices' 1.14 + 1.15 + def postsuspend(self): 1.16 + 'called after guest has suspended' 1.17 + pass 1.18 + 1.19 + def preresume(self): 1.20 + 'called before guest resumes' 1.21 + pass 1.22 + 1.23 + def commit(self): 1.24 + 'called when backup has acknowledged checkpoint reception' 1.25 + pass 1.26 + 1.27 +class ReplicatedDiskException(Exception): pass 1.28 + 1.29 +class ReplicatedDisk(CheckpointedDevice): 1.30 + """ 1.31 + Send a checkpoint message to a replicated disk while the domain 1.32 + is paused between epochs. 1.33 + """ 1.34 + FIFODIR = '/var/run/tap' 1.35 + 1.36 + def __init__(self, disk): 1.37 + # look up disk, make sure it is tap:buffer, and set up socket 1.38 + # to request commits. 1.39 + self.ctlfd = None 1.40 + 1.41 + if not disk.uname.startswith('tap:remus:') and not disk.uname.startswith('tap:tapdisk:remus:'): 1.42 + raise ReplicatedDiskException('Disk is not replicated: %s' % 1.43 + str(disk)) 1.44 + fifo = re.match("tap:.*(remus.*)\|", disk.uname).group(1).replace(':', '_') 1.45 + absfifo = os.path.join(self.FIFODIR, fifo) 1.46 + absmsgfifo = absfifo + '.msg' 1.47 + 1.48 + self.installed = False 1.49 + self.ctlfd = open(absfifo, 'w+b') 1.50 + self.msgfd = open(absmsgfifo, 'r+b') 1.51 + 1.52 + def __del__(self): 1.53 + self.uninstall() 1.54 + 1.55 + def uninstall(self): 1.56 + if self.ctlfd: 1.57 + self.ctlfd.close() 1.58 + self.ctlfd = None 1.59 + 1.60 + def postsuspend(self): 1.61 + os.write(self.ctlfd.fileno(), 'flush') 1.62 + 1.63 + def commit(self): 1.64 + msg = os.read(self.msgfd.fileno(), 4) 1.65 + if msg != 'done': 1.66 + print 'Unknown message: %s' % msg 1.67 + 1.68 +class BufferedNICException(Exception): pass 1.69 + 1.70 +class BufferedNIC(CheckpointedDevice): 1.71 + """ 1.72 + Buffer a protected domain's network output between rounds so that 1.73 + nothing is issued that a failover might not know about. 1.74 + """ 1.75 + # shared rtnetlink handle 1.76 + rth = None 1.77 + 1.78 + def __init__(self, domid): 1.79 + self.installed = False 1.80 + 1.81 + if not self.rth: 1.82 + self.rth = netlink.rtnl() 1.83 + 1.84 + self.devname = self._startimq(domid) 1.85 + dev = self.rth.getlink(self.devname) 1.86 + if not dev: 1.87 + raise BufferedNICException('could not find device %s' % self.devname) 1.88 + self.dev = dev['index'] 1.89 + self.handle = qdisc.TC_H_ROOT 1.90 + self.q = qdisc.QueueQdisc() 1.91 + 1.92 + def __del__(self): 1.93 + self.uninstall() 1.94 + 1.95 + def postsuspend(self): 1.96 + if not self.installed: 1.97 + self._setup() 1.98 + 1.99 + self._sendqmsg(qdisc.TC_QUEUE_CHECKPOINT) 1.100 + 1.101 + def commit(self): 1.102 + '''Called when checkpoint has been acknowledged by 1.103 + the backup''' 1.104 + self._sendqmsg(qdisc.TC_QUEUE_RELEASE) 1.105 + 1.106 + def _sendqmsg(self, action): 1.107 + self.q.action = action 1.108 + req = qdisc.changerequest(self.dev, self.handle, self.q) 1.109 + self.rth.talk(req.pack()) 1.110 + 1.111 + def _setup(self): 1.112 + q = self.rth.getqdisc(self.dev) 1.113 + if q: 1.114 + if q['kind'] == 'queue': 1.115 + self.installed = True 1.116 + return 1.117 + if q['kind'] != 'pfifo_fast': 1.118 + raise BufferedNICException('there is already a queueing ' 1.119 + 'discipline on %s' % self.devname) 1.120 + 1.121 + print 'installing buffer on %s' % self.devname 1.122 + req = qdisc.addrequest(self.dev, self.handle, self.q) 1.123 + self.rth.talk(req.pack()) 1.124 + self.installed = True 1.125 + 1.126 + def uninstall(self): 1.127 + if self.installed: 1.128 + req = qdisc.delrequest(self.dev, self.handle) 1.129 + self.rth.talk(req.pack()) 1.130 + self.installed = False 1.131 + 1.132 + def _startimq(self, domid): 1.133 + # stopgap hack to set up IMQ for an interface. Wrong in many ways. 1.134 + imqebt = '/usr/lib/xen/bin/imqebt' 1.135 + imqdev = 'imq0' 1.136 + vid = 'vif%d.0' % domid 1.137 + for mod in ['sch_queue', 'imq', 'ebt_imq']: 1.138 + util.runcmd(['modprobe', mod]) 1.139 + util.runcmd("ip link set %s up" % (imqdev)) 1.140 + util.runcmd("%s -F FORWARD" % (imqebt)) 1.141 + util.runcmd("%s -A FORWARD -i %s -j imq --todev %s" % (imqebt, vid, imqdev)) 1.142 + 1.143 + return imqdev
2.1 --- a/tools/remus/remus Tue May 04 09:31:13 2010 +0100 2.2 +++ b/tools/remus/remus Tue May 04 09:34:23 2010 +0100 2.3 @@ -7,9 +7,10 @@ 2.4 # TODO: fencing. 2.5 2.6 import optparse, os, re, select, signal, sys, time 2.7 -from xen.remus import save, vm 2.8 + 2.9 +from xen.remus import save, util, vm 2.10 +from xen.remus.device import ReplicatedDisk, BufferedNIC 2.11 from xen.xend import XendOptions 2.12 -from xen.remus import netlink, qdisc, util 2.13 2.14 class CfgException(Exception): pass 2.15 2.16 @@ -58,139 +59,6 @@ class Cfg(object): 2.17 if (len(args) > 1): 2.18 self.host = args[1] 2.19 2.20 -class ReplicatedDiskException(Exception): pass 2.21 - 2.22 -class BufferedDevice(object): 2.23 - 'Base class for buffered devices' 2.24 - 2.25 - def postsuspend(self): 2.26 - 'called after guest has suspended' 2.27 - pass 2.28 - 2.29 - def preresume(self): 2.30 - 'called before guest resumes' 2.31 - pass 2.32 - 2.33 - def commit(self): 2.34 - 'called when backup has acknowledged checkpoint reception' 2.35 - pass 2.36 - 2.37 -class ReplicatedDisk(BufferedDevice): 2.38 - """ 2.39 - Send a checkpoint message to a replicated disk while the domain 2.40 - is paused between epochs. 2.41 - """ 2.42 - FIFODIR = '/var/run/tap' 2.43 - 2.44 - def __init__(self, disk): 2.45 - # look up disk, make sure it is tap:buffer, and set up socket 2.46 - # to request commits. 2.47 - self.ctlfd = None 2.48 - 2.49 - if not disk.uname.startswith('tap:remus:') and not disk.uname.startswith('tap:tapdisk:remus:'): 2.50 - raise ReplicatedDiskException('Disk is not replicated: %s' % 2.51 - str(disk)) 2.52 - fifo = re.match("tap:.*(remus.*)\|", disk.uname).group(1).replace(':', '_') 2.53 - absfifo = os.path.join(self.FIFODIR, fifo) 2.54 - absmsgfifo = absfifo + '.msg' 2.55 - 2.56 - self.installed = False 2.57 - self.ctlfd = open(absfifo, 'w+b') 2.58 - self.msgfd = open(absmsgfifo, 'r+b') 2.59 - 2.60 - def __del__(self): 2.61 - self.uninstall() 2.62 - 2.63 - def uninstall(self): 2.64 - if self.ctlfd: 2.65 - self.ctlfd.close() 2.66 - self.ctlfd = None 2.67 - 2.68 - def postsuspend(self): 2.69 - os.write(self.ctlfd.fileno(), 'flush') 2.70 - 2.71 - def commit(self): 2.72 - msg = os.read(self.msgfd.fileno(), 4) 2.73 - if msg != 'done': 2.74 - print 'Unknown message: %s' % msg 2.75 - 2.76 -class NetbufferException(Exception): pass 2.77 - 2.78 -class Netbuffer(BufferedDevice): 2.79 - """ 2.80 - Buffer a protected domain's network output between rounds so that 2.81 - nothing is issued that a failover might not know about. 2.82 - """ 2.83 - # shared rtnetlink handle 2.84 - rth = None 2.85 - 2.86 - def __init__(self, domid): 2.87 - self.installed = False 2.88 - 2.89 - if not self.rth: 2.90 - self.rth = netlink.rtnl() 2.91 - 2.92 - self.devname = self._startimq(domid) 2.93 - dev = self.rth.getlink(self.devname) 2.94 - if not dev: 2.95 - raise NetbufferException('could not find device %s' % self.devname) 2.96 - self.dev = dev['index'] 2.97 - self.handle = qdisc.TC_H_ROOT 2.98 - self.q = qdisc.QueueQdisc() 2.99 - 2.100 - def __del__(self): 2.101 - self.uninstall() 2.102 - 2.103 - def postsuspend(self): 2.104 - if not self.installed: 2.105 - self._setup() 2.106 - 2.107 - self._sendqmsg(qdisc.TC_QUEUE_CHECKPOINT) 2.108 - 2.109 - def commit(self): 2.110 - '''Called when checkpoint has been acknowledged by 2.111 - the backup''' 2.112 - self._sendqmsg(qdisc.TC_QUEUE_RELEASE) 2.113 - 2.114 - def _sendqmsg(self, action): 2.115 - self.q.action = action 2.116 - req = qdisc.changerequest(self.dev, self.handle, self.q) 2.117 - self.rth.talk(req.pack()) 2.118 - 2.119 - def _setup(self): 2.120 - q = self.rth.getqdisc(self.dev) 2.121 - if q: 2.122 - if q['kind'] == 'queue': 2.123 - self.installed = True 2.124 - return 2.125 - if q['kind'] != 'pfifo_fast': 2.126 - raise NetbufferException('there is already a queueing ' 2.127 - 'discipline on %s' % self.devname) 2.128 - 2.129 - print 'installing buffer on %s' % self.devname 2.130 - req = qdisc.addrequest(self.dev, self.handle, self.q) 2.131 - self.rth.talk(req.pack()) 2.132 - self.installed = True 2.133 - 2.134 - def uninstall(self): 2.135 - if self.installed: 2.136 - req = qdisc.delrequest(self.dev, self.handle) 2.137 - self.rth.talk(req.pack()) 2.138 - self.installed = False 2.139 - 2.140 - def _startimq(self, domid): 2.141 - # stopgap hack to set up IMQ for an interface. Wrong in many ways. 2.142 - imqebt = '/usr/lib/xen/bin/imqebt' 2.143 - imqdev = 'imq0' 2.144 - vid = 'vif%d.0' % domid 2.145 - for mod in ['sch_queue', 'imq', 'ebt_imq']: 2.146 - util.runcmd(['modprobe', mod]) 2.147 - util.runcmd("ip link set %s up" % (imqdev)) 2.148 - util.runcmd("%s -F FORWARD" % (imqebt)) 2.149 - util.runcmd("%s -A FORWARD -i %s -j imq --todev %s" % (imqebt, vid, imqdev)) 2.150 - 2.151 - return imqdev 2.152 - 2.153 class SignalException(Exception): pass 2.154 2.155 def run(cfg):