debuggers.hg

changeset 21286:321dddf767e2

Remus: move device handling into its own module

Signed-off-by: Brendan Cully <brendan@cs.ubc.ca>
author Keir Fraser <keir.fraser@citrix.com>
date Tue May 04 09:34:23 2010 +0100 (2010-05-04)
parents 8559e324941f
children d6bd61a5a0fc
files tools/python/xen/remus/device.py tools/remus/remus
line diff
     1.1 --- /dev/null	Thu Jan 01 00:00:00 1970 +0000
     1.2 +++ b/tools/python/xen/remus/device.py	Tue May 04 09:34:23 2010 +0100
     1.3 @@ -0,0 +1,140 @@
     1.4 +# Remus device interface
     1.5 +#
     1.6 +# Coordinates with devices at suspend, resume, and commit hooks
     1.7 +
     1.8 +import os
     1.9 +
    1.10 +import netlink, qdisc, util
    1.11 +
    1.12 +class CheckpointedDevice(object):
    1.13 +    'Base class for buffered devices'
    1.14 +
    1.15 +    def postsuspend(self):
    1.16 +        'called after guest has suspended'
    1.17 +        pass
    1.18 +
    1.19 +    def preresume(self):
    1.20 +        'called before guest resumes'
    1.21 +        pass
    1.22 +
    1.23 +    def commit(self):
    1.24 +        'called when backup has acknowledged checkpoint reception'
    1.25 +        pass
    1.26 +
    1.27 +class ReplicatedDiskException(Exception): pass
    1.28 +
    1.29 +class ReplicatedDisk(CheckpointedDevice):
    1.30 +    """
    1.31 +    Send a checkpoint message to a replicated disk while the domain
    1.32 +    is paused between epochs.
    1.33 +    """
    1.34 +    FIFODIR = '/var/run/tap'
    1.35 +
    1.36 +    def __init__(self, disk):
    1.37 +        # look up disk, make sure it is tap:buffer, and set up socket
    1.38 +        # to request commits.
    1.39 +        self.ctlfd = None
    1.40 +
    1.41 +        if not disk.uname.startswith('tap:remus:') and not disk.uname.startswith('tap:tapdisk:remus:'):
    1.42 +            raise ReplicatedDiskException('Disk is not replicated: %s' %
    1.43 +                                        str(disk))
    1.44 +        fifo = re.match("tap:.*(remus.*)\|", disk.uname).group(1).replace(':', '_')
    1.45 +        absfifo = os.path.join(self.FIFODIR, fifo)
    1.46 +        absmsgfifo = absfifo + '.msg'
    1.47 +
    1.48 +        self.installed = False
    1.49 +        self.ctlfd = open(absfifo, 'w+b')
    1.50 +        self.msgfd = open(absmsgfifo, 'r+b')
    1.51 +
    1.52 +    def __del__(self):
    1.53 +        self.uninstall()
    1.54 +
    1.55 +    def uninstall(self):
    1.56 +        if self.ctlfd:
    1.57 +            self.ctlfd.close()
    1.58 +            self.ctlfd = None
    1.59 +
    1.60 +    def postsuspend(self):
    1.61 +        os.write(self.ctlfd.fileno(), 'flush')
    1.62 +
    1.63 +    def commit(self):
    1.64 +        msg = os.read(self.msgfd.fileno(), 4)
    1.65 +        if msg != 'done':
    1.66 +            print 'Unknown message: %s' % msg
    1.67 +
    1.68 +class BufferedNICException(Exception): pass
    1.69 +
    1.70 +class BufferedNIC(CheckpointedDevice):
    1.71 +    """
    1.72 +    Buffer a protected domain's network output between rounds so that
    1.73 +    nothing is issued that a failover might not know about.
    1.74 +    """
    1.75 +    # shared rtnetlink handle
    1.76 +    rth = None
    1.77 +
    1.78 +    def __init__(self, domid):
    1.79 +        self.installed = False
    1.80 +
    1.81 +        if not self.rth:
    1.82 +            self.rth = netlink.rtnl()
    1.83 +
    1.84 +        self.devname = self._startimq(domid)
    1.85 +        dev = self.rth.getlink(self.devname)
    1.86 +        if not dev:
    1.87 +            raise BufferedNICException('could not find device %s' % self.devname)
    1.88 +        self.dev = dev['index']
    1.89 +        self.handle = qdisc.TC_H_ROOT
    1.90 +        self.q = qdisc.QueueQdisc()
    1.91 +
    1.92 +    def __del__(self):
    1.93 +        self.uninstall()
    1.94 +
    1.95 +    def postsuspend(self):
    1.96 +        if not self.installed:
    1.97 +            self._setup()
    1.98 +
    1.99 +        self._sendqmsg(qdisc.TC_QUEUE_CHECKPOINT)
   1.100 +
   1.101 +    def commit(self):
   1.102 +        '''Called when checkpoint has been acknowledged by
   1.103 +        the backup'''
   1.104 +        self._sendqmsg(qdisc.TC_QUEUE_RELEASE)
   1.105 +
   1.106 +    def _sendqmsg(self, action):
   1.107 +        self.q.action = action
   1.108 +        req = qdisc.changerequest(self.dev, self.handle, self.q)
   1.109 +        self.rth.talk(req.pack())
   1.110 +
   1.111 +    def _setup(self):
   1.112 +        q = self.rth.getqdisc(self.dev)
   1.113 +        if q:
   1.114 +            if q['kind'] == 'queue':
   1.115 +                self.installed = True
   1.116 +                return
   1.117 +            if q['kind'] != 'pfifo_fast':
   1.118 +                raise BufferedNICException('there is already a queueing '
   1.119 +                                           'discipline on %s' % self.devname)
   1.120 +
   1.121 +        print 'installing buffer on %s' % self.devname
   1.122 +        req = qdisc.addrequest(self.dev, self.handle, self.q)
   1.123 +        self.rth.talk(req.pack())
   1.124 +        self.installed = True
   1.125 +
   1.126 +    def uninstall(self):
   1.127 +        if self.installed:
   1.128 +            req = qdisc.delrequest(self.dev, self.handle)
   1.129 +            self.rth.talk(req.pack())
   1.130 +            self.installed = False
   1.131 +
   1.132 +    def _startimq(self, domid):
   1.133 +        # stopgap hack to set up IMQ for an interface. Wrong in many ways.
   1.134 +        imqebt = '/usr/lib/xen/bin/imqebt'
   1.135 +        imqdev = 'imq0'
   1.136 +        vid = 'vif%d.0' % domid
   1.137 +        for mod in ['sch_queue', 'imq', 'ebt_imq']:
   1.138 +            util.runcmd(['modprobe', mod])
   1.139 +        util.runcmd("ip link set %s up" % (imqdev))
   1.140 +        util.runcmd("%s -F FORWARD" % (imqebt))
   1.141 +        util.runcmd("%s -A FORWARD -i %s -j imq --todev %s" % (imqebt, vid, imqdev))
   1.142 +
   1.143 +        return imqdev
     2.1 --- a/tools/remus/remus	Tue May 04 09:31:13 2010 +0100
     2.2 +++ b/tools/remus/remus	Tue May 04 09:34:23 2010 +0100
     2.3 @@ -7,9 +7,10 @@
     2.4  # TODO: fencing.
     2.5  
     2.6  import optparse, os, re, select, signal, sys, time
     2.7 -from xen.remus import save, vm
     2.8 +
     2.9 +from xen.remus import save, util, vm
    2.10 +from xen.remus.device import ReplicatedDisk, BufferedNIC
    2.11  from xen.xend import XendOptions
    2.12 -from xen.remus import netlink, qdisc, util
    2.13  
    2.14  class CfgException(Exception): pass
    2.15  
    2.16 @@ -58,139 +59,6 @@ class Cfg(object):
    2.17          if (len(args) > 1):
    2.18              self.host = args[1]
    2.19  
    2.20 -class ReplicatedDiskException(Exception): pass
    2.21 -
    2.22 -class BufferedDevice(object):
    2.23 -    'Base class for buffered devices'
    2.24 -
    2.25 -    def postsuspend(self):
    2.26 -        'called after guest has suspended'
    2.27 -        pass
    2.28 -
    2.29 -    def preresume(self):
    2.30 -        'called before guest resumes'
    2.31 -        pass
    2.32 -
    2.33 -    def commit(self):
    2.34 -        'called when backup has acknowledged checkpoint reception'
    2.35 -        pass
    2.36 -
    2.37 -class ReplicatedDisk(BufferedDevice):
    2.38 -    """
    2.39 -    Send a checkpoint message to a replicated disk while the domain
    2.40 -    is paused between epochs.
    2.41 -    """
    2.42 -    FIFODIR = '/var/run/tap'
    2.43 -
    2.44 -    def __init__(self, disk):
    2.45 -        # look up disk, make sure it is tap:buffer, and set up socket
    2.46 -        # to request commits.
    2.47 -        self.ctlfd = None
    2.48 -
    2.49 -        if not disk.uname.startswith('tap:remus:') and not disk.uname.startswith('tap:tapdisk:remus:'):
    2.50 -            raise ReplicatedDiskException('Disk is not replicated: %s' %
    2.51 -                                        str(disk))
    2.52 -        fifo = re.match("tap:.*(remus.*)\|", disk.uname).group(1).replace(':', '_')
    2.53 -        absfifo = os.path.join(self.FIFODIR, fifo)
    2.54 -        absmsgfifo = absfifo + '.msg'
    2.55 -
    2.56 -        self.installed = False
    2.57 -        self.ctlfd = open(absfifo, 'w+b')
    2.58 -        self.msgfd = open(absmsgfifo, 'r+b')
    2.59 -
    2.60 -    def __del__(self):
    2.61 -        self.uninstall()
    2.62 -
    2.63 -    def uninstall(self):
    2.64 -        if self.ctlfd:
    2.65 -            self.ctlfd.close()
    2.66 -            self.ctlfd = None
    2.67 -
    2.68 -    def postsuspend(self):
    2.69 -        os.write(self.ctlfd.fileno(), 'flush')
    2.70 -
    2.71 -    def commit(self):
    2.72 -        msg = os.read(self.msgfd.fileno(), 4)
    2.73 -        if msg != 'done':
    2.74 -            print 'Unknown message: %s' % msg
    2.75 -
    2.76 -class NetbufferException(Exception): pass
    2.77 -
    2.78 -class Netbuffer(BufferedDevice):
    2.79 -    """
    2.80 -    Buffer a protected domain's network output between rounds so that
    2.81 -    nothing is issued that a failover might not know about.
    2.82 -    """
    2.83 -    # shared rtnetlink handle
    2.84 -    rth = None
    2.85 -
    2.86 -    def __init__(self, domid):
    2.87 -        self.installed = False
    2.88 -
    2.89 -        if not self.rth:
    2.90 -            self.rth = netlink.rtnl()
    2.91 -
    2.92 -        self.devname = self._startimq(domid)
    2.93 -        dev = self.rth.getlink(self.devname)
    2.94 -        if not dev:
    2.95 -            raise NetbufferException('could not find device %s' % self.devname)
    2.96 -        self.dev = dev['index']
    2.97 -        self.handle = qdisc.TC_H_ROOT
    2.98 -        self.q = qdisc.QueueQdisc()
    2.99 -
   2.100 -    def __del__(self):
   2.101 -        self.uninstall()
   2.102 -
   2.103 -    def postsuspend(self):
   2.104 -        if not self.installed:
   2.105 -            self._setup()
   2.106 -
   2.107 -        self._sendqmsg(qdisc.TC_QUEUE_CHECKPOINT)
   2.108 -
   2.109 -    def commit(self):
   2.110 -        '''Called when checkpoint has been acknowledged by
   2.111 -        the backup'''
   2.112 -        self._sendqmsg(qdisc.TC_QUEUE_RELEASE)
   2.113 -
   2.114 -    def _sendqmsg(self, action):
   2.115 -        self.q.action = action
   2.116 -        req = qdisc.changerequest(self.dev, self.handle, self.q)
   2.117 -        self.rth.talk(req.pack())
   2.118 -
   2.119 -    def _setup(self):
   2.120 -        q = self.rth.getqdisc(self.dev)
   2.121 -        if q:
   2.122 -            if q['kind'] == 'queue':
   2.123 -                self.installed = True
   2.124 -                return
   2.125 -            if q['kind'] != 'pfifo_fast':
   2.126 -                raise NetbufferException('there is already a queueing '
   2.127 -                                         'discipline on %s' % self.devname)
   2.128 -
   2.129 -        print 'installing buffer on %s' % self.devname
   2.130 -        req = qdisc.addrequest(self.dev, self.handle, self.q)
   2.131 -        self.rth.talk(req.pack())
   2.132 -        self.installed = True
   2.133 -
   2.134 -    def uninstall(self):
   2.135 -        if self.installed:
   2.136 -            req = qdisc.delrequest(self.dev, self.handle)
   2.137 -            self.rth.talk(req.pack())
   2.138 -            self.installed = False
   2.139 -
   2.140 -    def _startimq(self, domid):
   2.141 -        # stopgap hack to set up IMQ for an interface. Wrong in many ways.
   2.142 -        imqebt = '/usr/lib/xen/bin/imqebt'
   2.143 -        imqdev = 'imq0'
   2.144 -        vid = 'vif%d.0' % domid
   2.145 -        for mod in ['sch_queue', 'imq', 'ebt_imq']:
   2.146 -            util.runcmd(['modprobe', mod])
   2.147 -        util.runcmd("ip link set %s up" % (imqdev))
   2.148 -        util.runcmd("%s -F FORWARD" % (imqebt))
   2.149 -        util.runcmd("%s -A FORWARD -i %s -j imq --todev %s" % (imqebt, vid, imqdev))
   2.150 -
   2.151 -        return imqdev
   2.152 -
   2.153  class SignalException(Exception): pass
   2.154  
   2.155  def run(cfg):