普通文本  |  149行  |  4.15 KB

"""The ABAT harness interface

The interface as required for ABAT.
"""

__author__ = """Copyright Andy Whitcroft 2006"""

from autotest_lib.client.bin import utils
import os, harness, time, re

def autobench_load(fn):
    disks = re.compile(r'^\s*DATS_FREE_DISKS\s*=(.*\S)\s*$')
    parts = re.compile(r'^\s*DATS_FREE_PARTITIONS\s*=(.*\S)\s*$')
    modules = re.compile(r'^\s*INITRD_MODULES\s*=(.*\S)\s*$')

    conf = {}

    try:
        fd = file(fn, "r")
    except:
        return conf
    for ln in fd.readlines():
        m = disks.match(ln)
        if m:
            val = m.groups()[0]
            conf['disks'] = val.strip('"').split()
        m = parts.match(ln)
        if m:
            val = m.groups()[0]
            conf['partitions'] = val.strip('"').split()
        m = modules.match(ln)
        if m:
            val = m.groups()[0]
            conf['modules'] = val.strip('"').split()
    fd.close()

    return conf


class harness_ABAT(harness.harness):
    """The ABAT server harness

    Properties:
            job
                    The job object for this job
    """

    def __init__(self, job, harness_args):
        """
                job
                        The job object for this job
        """
        self.setup(job)

        if 'ABAT_STATUS' in os.environ:
            self.status = file(os.environ['ABAT_STATUS'], "w")
        else:
            self.status = None


    def __send(self, msg):
        if self.status:
            msg = msg.rstrip()
            self.status.write(msg + "\n")
            self.status.flush()


    def __send_status(self, code, subdir, operation, msg):
        self.__send("STATUS %s %s %s %s" % (code, subdir, operation, msg))


    def __root_device(self):
        device = None
        root = re.compile(r'^\S*(/dev/\S+).*\s/\s*$')

        df = utils.system_output('df -lP')
        for line in df.split("\n"):
            m = root.match(line)
            if m:
                device = m.groups()[0]

        return device


    def run_start(self):
        """A run within this job is starting"""
        self.__send_status('GOOD', '----', '----', 'run starting')

        # Load up the autobench.conf if it exists.
        conf = autobench_load("/etc/autobench.conf")
        if 'partitions' in conf:
            self.job.config_set('partition.partitions',
                    conf['partitions'])

        # Search the boot loader configuration for the autobench entry,
        # and extract its args.
        args = None
        for entry in self.job.bootloader.get_entries().itervalues():
            if entry['title'].startswith('autobench'):
                args = entry.get('args')

        if args:
            args = re.sub(r'autobench_args:.*', '', args)
            args = re.sub(r'root=\S*', '', args)
            args += " root=" + self.__root_device()

            self.job.config_set('boot.default_args', args)

        # Turn off boot_once semantics.
        self.job.config_set('boot.set_default', True)

        # For RedHat installs we do not load up the module.conf
        # as they cannot be builtin.  Pass them as arguments.
        vendor = utils.get_os_vendor()
        if vendor in ['Red Hat', 'Fedora Core'] and 'modules' in conf:
            args = '--allow-missing'
            for mod in conf['modules']:
                args += " --with " + mod
            self.job.config_set('kernel.mkinitrd_extra_args', args)


    def run_reboot(self):
        """A run within this job is performing a reboot
           (expect continue following reboot)
        """
        self.__send("REBOOT")


    def run_complete(self):
        """A run within this job is completing (all done)"""
        self.__send("DONE")


    def test_status_detail(self, code, subdir, operation, msg, tag,
                           optional_fields):
        """A test within this job is completing (detail)"""

        # Send the first line with the status code as a STATUS message.
        lines = msg.split("\n")
        self.__send_status(code, subdir, operation, lines[0])


    def test_status(self, msg, tag):
        lines = msg.split("\n")

        # Send each line as a SUMMARY message.
        for line in lines:
            self.__send("SUMMARY :" + line)