debuggers.hg

changeset 21290:0f403a63ef6b

Remus: use IFB for net buffer on newer kernels

IMQ does not work with ebtables on 2.6.31, and IFB is not a
third-party patch.

Signed-off-by: Brendan Cully <brendan@cs.ubc.ca>
author Keir Fraser <keir.fraser@citrix.com>
date Tue May 04 09:36:05 2010 +0100 (2010-05-04)
parents ca9519f09563
children 80aef59e8c8e
files tools/python/xen/remus/device.py tools/remus/remus
line diff
     1.1 --- a/tools/python/xen/remus/device.py	Tue May 04 09:35:42 2010 +0100
     1.2 +++ b/tools/python/xen/remus/device.py	Tue May 04 09:36:05 2010 +0100
     1.3 @@ -6,6 +6,9 @@ import os
     1.4  
     1.5  import netlink, qdisc, util
     1.6  
     1.7 +class ReplicatedDiskException(Exception): pass
     1.8 +class BufferedNICException(Exception): pass
     1.9 +
    1.10  class CheckpointedDevice(object):
    1.11      'Base class for buffered devices'
    1.12  
    1.13 @@ -21,8 +24,6 @@ class CheckpointedDevice(object):
    1.14          'called when backup has acknowledged checkpoint reception'
    1.15          pass
    1.16  
    1.17 -class ReplicatedDiskException(Exception): pass
    1.18 -
    1.19  class ReplicatedDisk(CheckpointedDevice):
    1.20      """
    1.21      Send a checkpoint message to a replicated disk while the domain
    1.22 @@ -62,36 +63,223 @@ class ReplicatedDisk(CheckpointedDevice)
    1.23          if msg != 'done':
    1.24              print 'Unknown message: %s' % msg
    1.25  
    1.26 -class BufferedNICException(Exception): pass
    1.27 +### Network
    1.28 +
    1.29 +# shared rtnl handle
    1.30 +_rth = None
    1.31 +def getrth():
    1.32 +    global _rth
    1.33 +
    1.34 +    if not _rth:
    1.35 +        _rth = netlink.rtnl()
    1.36 +    return _rth
    1.37 +
    1.38 +class Netbuf(object):
    1.39 +    "Proxy for netdev with a queueing discipline"
    1.40 +
    1.41 +    @staticmethod
    1.42 +    def devclass():
    1.43 +        "returns the name of this device class"
    1.44 +        return 'unknown'
    1.45 +
    1.46 +    @classmethod
    1.47 +    def available(cls):
    1.48 +        "returns True if this module can proxy the device"
    1.49 +        return cls._hasdev(cls.devclass())
    1.50 +
    1.51 +    def __init__(self, devname):
    1.52 +        self.devname = devname
    1.53 +        self.vif = None
    1.54 +
    1.55 +    # override in subclasses
    1.56 +    def install(self, vif):
    1.57 +        "set up proxy on device"
    1.58 +        raise BufferedNICException('unimplemented')
    1.59 +
    1.60 +    def uninstall(self):
    1.61 +        "remove proxy on device"
    1.62 +        raise BufferedNICException('unimplemented')
    1.63 +
    1.64 +    # protected
    1.65 +    @staticmethod
    1.66 +    def _hasdev(devclass):
    1.67 +        """check for existence of device, attempting to load kernel
    1.68 +        module if not present"""
    1.69 +        devname = '%s0' % devclass
    1.70 +        rth = getrth()
    1.71 +
    1.72 +        if rth.getlink(devname):
    1.73 +            return True
    1.74 +        if util.modprobe(devclass) and rth.getlink(devname):
    1.75 +            return True
    1.76 +
    1.77 +        return False
    1.78 +
    1.79 +class IFBBuffer(Netbuf):
    1.80 +    """Capture packets arriving on a VIF using an ingress filter and tc
    1.81 +    mirred action to forward them to an IFB device.
    1.82 +    """
    1.83 +
    1.84 +    @staticmethod
    1.85 +    def devclass():
    1.86 +        return 'ifb'
    1.87 +
    1.88 +    def install(self, vif):
    1.89 +        self.vif = vif
    1.90 +        # voodoo from http://www.linuxfoundation.org/collaborate/workgroups/networking/ifb#Typical_Usage
    1.91 +        util.runcmd('ip link set %s up' % self.devname)
    1.92 +        util.runcmd('tc qdisc add dev %s ingress' % vif.dev)
    1.93 +        util.runcmd('tc filter add dev %s parent ffff: proto ip pref 10 '
    1.94 +                    'u32 match u32 0 0 action mirred egress redirect '
    1.95 +                    'dev %s' % (vif.dev, self.devname))
    1.96 +
    1.97 +    def uninstall(self):
    1.98 +        util.runcmd('tc filter del dev %s parent ffff: proto ip pref 10 u32' \
    1.99 +                        % self.vif.dev)
   1.100 +        util.runcmd('tc qdisc del dev %s ingress' % self.vif.dev)
   1.101 +        util.runcmd('ip link set %s down' % self.devname)
   1.102 +
   1.103 +class IMQBuffer(Netbuf):
   1.104 +    """Redirect packets coming in on vif to an IMQ device."""
   1.105 +
   1.106 +    imqebt = '/usr/lib/xen/bin/imqebt'
   1.107 +
   1.108 +    @staticmethod
   1.109 +    def devclass():
   1.110 +        return 'imq'
   1.111 +
   1.112 +    def install(self, vif):
   1.113 +        # stopgap hack to set up IMQ for an interface. Wrong in many ways.
   1.114 +        self.vif = vif
   1.115 +
   1.116 +        for mod in ['imq', 'ebt_imq']:
   1.117 +            util.runcmd(['modprobe', mod])
   1.118 +        util.runcmd("ip link set %s up" % self.devname)
   1.119 +        util.runcmd("%s -F FORWARD" % self.imqebt)
   1.120 +        util.runcmd("%s -A FORWARD -i %s -j imq --todev %s" % (self.imqebt, vif.dev, self.devname))
   1.121 +
   1.122 +    def uninstall(self):
   1.123 +        util.runcmd("%s -F FORWARD" % self.imqebt)
   1.124 +        util.runcmd('ip link set %s down' % self.devname)
   1.125 +
   1.126 +# in order of desirability
   1.127 +netbuftypes = [IFBBuffer, IMQBuffer]
   1.128 +
   1.129 +def selectnetbuf():
   1.130 +    "Find the best available buffer type"
   1.131 +    for driver in netbuftypes:
   1.132 +        if driver.available():
   1.133 +            return driver
   1.134 +
   1.135 +    raise BufferedNICException('no net buffer available')
   1.136 +
   1.137 +class Netbufpool(object):
   1.138 +    """Allocates/releases proxy netdevs (IMQ/IFB)
   1.139 +
   1.140 +    A file contains a list of entries of the form <pid>:<device>\n
   1.141 +    To allocate a device, lock the file, then claim a new device if
   1.142 +    one is free. If there are no free devices, check each PID for liveness
   1.143 +    and take a device if the PID is dead, otherwise return failure.
   1.144 +    Add an entry to the file before releasing the lock.
   1.145 +    """
   1.146 +    def __init__(self, netbufclass):
   1.147 +        "Create a pool of Device"
   1.148 +        self.netbufclass = netbufclass
   1.149 +        self.path = '/var/run/remus/' + self.netbufclass.devclass()
   1.150 +
   1.151 +        self.devices = self.getdevs()
   1.152 +
   1.153 +        pooldir = os.path.dirname(self.path)
   1.154 +        if not os.path.exists(pooldir):
   1.155 +            os.makedirs(pooldir, 0755)
   1.156 +
   1.157 +    def get(self):
   1.158 +        "allocate a free device"
   1.159 +        def getfreedev(table):
   1.160 +            for dev in self.devices:
   1.161 +                if dev not in table or not util.checkpid(table[dev]):
   1.162 +                    return dev
   1.163 +
   1.164 +            return None
   1.165 +
   1.166 +        lock = util.Lock(self.path)
   1.167 +        table = self.load()
   1.168 +
   1.169 +        dev = getfreedev(table)
   1.170 +        if not dev:
   1.171 +            lock.unlock()
   1.172 +            raise BufferedNICException('no free devices')
   1.173 +        dev = self.netbufclass(dev)
   1.174 +
   1.175 +        table[dev.devname] = os.getpid()
   1.176 +
   1.177 +        self.save(table)
   1.178 +        lock.unlock()
   1.179 +
   1.180 +        return dev
   1.181 +
   1.182 +    def put(self, dev):
   1.183 +        "release claim on device"
   1.184 +        lock = util.Lock(self.path)
   1.185 +        table = self.load()
   1.186 +
   1.187 +        del table[dev.devname]
   1.188 +
   1.189 +        self.save(table)
   1.190 +        lock.unlock()
   1.191 +
   1.192 +    # private
   1.193 +    def load(self):
   1.194 +        """load and parse allocation table"""
   1.195 +        table = {}
   1.196 +        if not os.path.exists(self.path):
   1.197 +            return table
   1.198 +
   1.199 +        fd = open(self.path)
   1.200 +        for line in fd.readlines():
   1.201 +            iface, pid = line.strip().split()
   1.202 +            table[iface] = int(pid)
   1.203 +        fd.close()
   1.204 +        return table
   1.205 +
   1.206 +    def save(self, table):
   1.207 +        """write table to disk"""
   1.208 +        lines = ['%s %d\n' % (iface, table[iface]) for iface in sorted(table)]
   1.209 +        fd = open(self.path, 'w')
   1.210 +        fd.writelines(lines)
   1.211 +        fd.close()
   1.212 +
   1.213 +    def getdevs(self):
   1.214 +        """find all available devices of our device type"""
   1.215 +        ifaces = []
   1.216 +        for line in util.runcmd('ifconfig -a -s').splitlines():
   1.217 +            iface = line.split()[0]
   1.218 +            if iface.startswith(self.netbufclass.devclass()):
   1.219 +                ifaces.append(iface)
   1.220 +
   1.221 +        return ifaces
   1.222  
   1.223  class BufferedNIC(CheckpointedDevice):
   1.224      """
   1.225      Buffer a protected domain's network output between rounds so that
   1.226      nothing is issued that a failover might not know about.
   1.227      """
   1.228 -    # shared rtnetlink handle
   1.229 -    rth = None
   1.230  
   1.231 -    def __init__(self, domid):
   1.232 +    def __init__(self, vif):
   1.233          self.installed = False
   1.234 +        self.vif = vif
   1.235  
   1.236 -        if not self.rth:
   1.237 -            self.rth = netlink.rtnl()
   1.238 +        self.pool = Netbufpool(selectnetbuf())
   1.239 +        self.rth = getrth()
   1.240  
   1.241 -        self.devname = self._startimq(domid)
   1.242 -        dev = self.rth.getlink(self.devname)
   1.243 -        if not dev:
   1.244 -            raise BufferedNICException('could not find device %s' % self.devname)
   1.245 -        self.dev = dev['index']
   1.246 -        self.handle = qdisc.TC_H_ROOT
   1.247 -        self.q = qdisc.QueueQdisc()
   1.248 +        self.setup()
   1.249  
   1.250      def __del__(self):
   1.251          self.uninstall()
   1.252  
   1.253      def postsuspend(self):
   1.254          if not self.installed:
   1.255 -            self._setup()
   1.256 +            self.install()
   1.257  
   1.258          self._sendqmsg(qdisc.TC_QUEUE_CHECKPOINT)
   1.259  
   1.260 @@ -100,41 +288,53 @@ class BufferedNIC(CheckpointedDevice):
   1.261          the backup'''
   1.262          self._sendqmsg(qdisc.TC_QUEUE_RELEASE)
   1.263  
   1.264 +    # private
   1.265      def _sendqmsg(self, action):
   1.266          self.q.action = action
   1.267 -        req = qdisc.changerequest(self.dev, self.handle, self.q)
   1.268 +        req = qdisc.changerequest(self.bufdevno, self.handle, self.q)
   1.269          self.rth.talk(req.pack())
   1.270 +        return True
   1.271 +
   1.272 +    def setup(self):
   1.273 +        """install Remus queue on VIF outbound traffic"""
   1.274 +        self.bufdev = self.pool.get()
   1.275 +
   1.276 +        devname = self.bufdev.devname
   1.277 +        bufdev = self.rth.getlink(devname)
   1.278 +        if not bufdev:
   1.279 +            raise BufferedNICException('could not find device %s' % devname)
   1.280  
   1.281 -    def _setup(self):
   1.282 -        q = self.rth.getqdisc(self.dev)
   1.283 +        self.bufdev.install(self.vif)
   1.284 +
   1.285 +        self.bufdevno = bufdev['index']
   1.286 +        self.handle = qdisc.TC_H_ROOT
   1.287 +        self.q = qdisc.QueueQdisc()
   1.288 +
   1.289 +        if not util.modprobe('sch_queue'):
   1.290 +            raise BufferedNICException('could not load sch_queue module')
   1.291 +
   1.292 +    def install(self):
   1.293 +        devname = self.bufdev.devname
   1.294 +        q = self.rth.getqdisc(self.bufdevno)
   1.295          if q:
   1.296              if q['kind'] == 'queue':
   1.297                  self.installed = True
   1.298                  return
   1.299              if q['kind'] != 'pfifo_fast':
   1.300                  raise BufferedNICException('there is already a queueing '
   1.301 -                                           'discipline on %s' % self.devname)
   1.302 +                                           'discipline on %s' % devname)
   1.303  
   1.304 -        print 'installing buffer on %s' % self.devname
   1.305 -        req = qdisc.addrequest(self.dev, self.handle, self.q)
   1.306 +        print ('installing buffer on %s... ' % devname),
   1.307 +        req = qdisc.addrequest(self.bufdevno, self.handle, self.q)
   1.308          self.rth.talk(req.pack())
   1.309          self.installed = True
   1.310 +        print 'done.'
   1.311  
   1.312      def uninstall(self):
   1.313          if self.installed:
   1.314 -            req = qdisc.delrequest(self.dev, self.handle)
   1.315 +            req = qdisc.delrequest(self.bufdevno, self.handle)
   1.316              self.rth.talk(req.pack())
   1.317              self.installed = False
   1.318  
   1.319 -    def _startimq(self, domid):
   1.320 -        # stopgap hack to set up IMQ for an interface. Wrong in many ways.
   1.321 -        imqebt = '/usr/lib/xen/bin/imqebt'
   1.322 -        imqdev = 'imq0'
   1.323 -        vid = 'vif%d.0' % domid
   1.324 -        for mod in ['sch_queue', 'imq', 'ebt_imq']:
   1.325 -            util.runcmd(['modprobe', mod])
   1.326 -        util.runcmd("ip link set %s up" % (imqdev))
   1.327 -        util.runcmd("%s -F FORWARD" % (imqebt))
   1.328 -        util.runcmd("%s -A FORWARD -i %s -j imq --todev %s" % (imqebt, vid, imqdev))
   1.329 -
   1.330 -        return imqdev
   1.331 +        self.bufdev.uninstall()
   1.332 +        self.pool.put(self.bufdev)
     2.1 --- a/tools/remus/remus	Tue May 04 09:35:42 2010 +0100
     2.2 +++ b/tools/remus/remus	Tue May 04 09:36:05 2010 +0100
     2.3 @@ -9,7 +9,8 @@
     2.4  import optparse, os, re, select, signal, sys, time
     2.5  
     2.6  from xen.remus import save, util, vm
     2.7 -from xen.remus.device import ReplicatedDisk, BufferedNIC
     2.8 +from xen.remus.device import ReplicatedDisk, ReplicatedDiskException
     2.9 +from xen.remus.device import BufferedNIC, BufferedNICException
    2.10  from xen.xend import XendOptions
    2.11  
    2.12  class CfgException(Exception): pass
    2.13 @@ -115,7 +116,7 @@ def run(cfg):
    2.14  
    2.15      if cfg.netbuffer:
    2.16          for vif in dom.vifs:
    2.17 -            bufs.append(Netbuffer(dom.domid))
    2.18 +            bufs.append(BufferedNIC(vif))
    2.19  
    2.20      fd = save.MigrationSocket((cfg.host, cfg.port))
    2.21