普通文本  |  661行  |  26.09 KB

#
# Copyright 2008 Google Inc. All Rights Reserved.

"""
The job module contains the objects and methods used to
manage jobs in Autotest.

The valid actions are:
list:    lists job(s)
create:  create a job
abort:   abort job(s)
stat:    detailed listing of job(s)

The common options are:

See topic_common.py for a High Level Design and Algorithm.
"""

import getpass, os, pwd, re, socket, sys
from autotest_lib.cli import topic_common, action_common
from autotest_lib.client.common_lib import control_data


class job(topic_common.atest):
    """Job class
    atest job [create|clone|list|stat|abort] <options>"""
    usage_action = '[create|clone|list|stat|abort]'
    topic = msg_topic = 'job'
    msg_items = '<job_ids>'


    def _convert_status(self, results):
        for result in results:
            total = sum(result['status_counts'].values())
            status = ['%s=%s(%.1f%%)' % (key, val, 100.0*float(val)/total)
                      for key, val in result['status_counts'].iteritems()]
            status.sort()
            result['status_counts'] = ', '.join(status)


    def backward_compatibility(self, action, argv):
        """ 'job create --clone' became 'job clone --id' """
        if action == 'create':
            for option in ['-l', '--clone']:
                if option in argv:
                    argv[argv.index(option)] = '--id'
                    action = 'clone'
        return action


class job_help(job):
    """Just here to get the atest logic working.
    Usage is set by its parent"""
    pass


class job_list_stat(action_common.atest_list, job):
    def __init__(self):
        super(job_list_stat, self).__init__()

        self.topic_parse_info = topic_common.item_parse_info(
            attribute_name='jobs',
            use_leftover=True)


    def __split_jobs_between_ids_names(self):
        job_ids = []
        job_names = []

        # Sort between job IDs and names
        for job_id in self.jobs:
            if job_id.isdigit():
                job_ids.append(job_id)
            else:
                job_names.append(job_id)
        return (job_ids, job_names)


    def execute_on_ids_and_names(self, op, filters={},
                                 check_results={'id__in': 'id',
                                                'name__in': 'id'},
                                 tag_id='id__in', tag_name='name__in'):
        if not self.jobs:
            # Want everything
            return super(job_list_stat, self).execute(op=op, filters=filters)

        all_jobs = []
        (job_ids, job_names) = self.__split_jobs_between_ids_names()

        for items, tag in [(job_ids, tag_id),
                          (job_names, tag_name)]:
            if items:
                new_filters = filters.copy()
                new_filters[tag] = items
                jobs = super(job_list_stat,
                             self).execute(op=op,
                                           filters=new_filters,
                                           check_results=check_results)
                all_jobs.extend(jobs)

        return all_jobs


class job_list(job_list_stat):
    """atest job list [<jobs>] [--all] [--running] [--user <username>]"""
    def __init__(self):
        super(job_list, self).__init__()
        self.parser.add_option('-a', '--all', help='List jobs for all '
                               'users.', action='store_true', default=False)
        self.parser.add_option('-r', '--running', help='List only running '
                               'jobs', action='store_true')
        self.parser.add_option('-u', '--user', help='List jobs for given '
                               'user', type='string')


    def parse(self):
        options, leftover = super(job_list, self).parse()
        self.all = options.all
        self.data['running'] = options.running
        if options.user:
            if options.all:
                self.invalid_syntax('Only specify --all or --user, not both.')
            else:
                self.data['owner'] = options.user
        elif not options.all and not self.jobs:
            self.data['owner'] = getpass.getuser()

        return options, leftover


    def execute(self):
        return self.execute_on_ids_and_names(op='get_jobs_summary',
                                             filters=self.data)


    def output(self, results):
        keys = ['id', 'owner', 'name', 'status_counts']
        if self.verbose:
            keys.extend(['priority', 'control_type', 'created_on'])
        self._convert_status(results)
        super(job_list, self).output(results, keys)



class job_stat(job_list_stat):
    """atest job stat <job>"""
    usage_action = 'stat'

    def __init__(self):
        super(job_stat, self).__init__()
        self.parser.add_option('-f', '--control-file',
                               help='Display the control file',
                               action='store_true', default=False)
        self.parser.add_option('-N', '--list-hosts',
                               help='Display only a list of hosts',
                               action='store_true')
        self.parser.add_option('-s', '--list-hosts-status',
                               help='Display only the hosts in these statuses '
                               'for a job.', action='store')


    def parse(self):
        status_list = topic_common.item_parse_info(
                attribute_name='status_list',
                inline_option='list_hosts_status')
        options, leftover = super(job_stat, self).parse([status_list],
                                                        req_items='jobs')

        if not self.jobs:
            self.invalid_syntax('Must specify at least one job.')

        self.show_control_file = options.control_file
        self.list_hosts = options.list_hosts

        if self.list_hosts and self.status_list:
            self.invalid_syntax('--list-hosts is implicit when using '
                                '--list-hosts-status.')
        if len(self.jobs) > 1 and (self.list_hosts or self.status_list):
            self.invalid_syntax('--list-hosts and --list-hosts-status should '
                                'only be used on a single job.')

        return options, leftover


    def _merge_results(self, summary, qes):
        hosts_status = {}
        for qe in qes:
            if qe['host']:
                job_id = qe['job']['id']
                hostname = qe['host']['hostname']
                hosts_status.setdefault(job_id,
                                        {}).setdefault(qe['status'],
                                                       []).append(hostname)

        for job in summary:
            job_id = job['id']
            if hosts_status.has_key(job_id):
                this_job = hosts_status[job_id]
                job['hosts'] = ' '.join(' '.join(host) for host in
                                        this_job.itervalues())
                host_per_status = ['%s="%s"' %(status, ' '.join(host))
                                   for status, host in this_job.iteritems()]
                job['hosts_status'] = ', '.join(host_per_status)
                if self.status_list:
                    statuses = set(s.lower() for s in self.status_list)
                    all_hosts = [s for s in host_per_status if s.split('=',
                                 1)[0].lower() in statuses]
                    job['hosts_selected_status'] = '\n'.join(all_hosts)
            else:
                job['hosts_status'] = ''

            if not job.get('hosts'):
                self.generic_error('Job has unassigned meta-hosts, '
                                   'try again shortly.')

        return summary


    def execute(self):
        summary = self.execute_on_ids_and_names(op='get_jobs_summary')

        # Get the real hostnames
        qes = self.execute_on_ids_and_names(op='get_host_queue_entries',
                                            check_results={},
                                            tag_id='job__in',
                                            tag_name='job__name__in')

        self._convert_status(summary)

        return self._merge_results(summary, qes)


    def output(self, results):
        if self.list_hosts:
            keys = ['hosts']
        elif self.status_list:
            keys = ['hosts_selected_status']
        elif not self.verbose:
            keys = ['id', 'name', 'priority', 'status_counts', 'hosts_status']
        else:
            keys = ['id', 'name', 'priority', 'status_counts', 'hosts_status',
                    'owner', 'control_type',  'synch_count', 'created_on',
                    'run_verify', 'reboot_before', 'reboot_after',
                    'parse_failed_repair']

        if self.show_control_file:
            keys.append('control_file')

        super(job_stat, self).output(results, keys)


class job_create_or_clone(action_common.atest_create, job):
    """Class containing the code common to the job create and clone actions"""
    msg_items = 'job_name'

    def __init__(self):
        super(job_create_or_clone, self).__init__()
        self.hosts = []
        self.data_item_key = 'name'
        self.parser.add_option('-p', '--priority', help='Job priority (low, '
                               'medium, high, urgent), default=medium',
                               type='choice', choices=('low', 'medium', 'high',
                               'urgent'), default='medium')
        self.parser.add_option('-b', '--labels',
                               help='Comma separated list of labels '
                               'to get machine list from.', default='')
        self.parser.add_option('-m', '--machine', help='List of machines to '
                               'run on')
        self.parser.add_option('-M', '--mlist',
                               help='File listing machines to use',
                               type='string', metavar='MACHINE_FLIST')
        self.parser.add_option('--one-time-hosts',
                               help='List of one time hosts')
        self.parser.add_option('-e', '--email',
                               help='A comma seperated list of '
                               'email addresses to notify of job completion',
                               default='')


    def _parse_hosts(self, args):
        """ Parses the arguments to generate a list of hosts and meta_hosts
        A host is a regular name, a meta_host is n*label or *label.
        These can be mixed on the CLI, and separated by either commas or
        spaces, e.g.: 5*Machine_Label host0 5*Machine_Label2,host2 """

        hosts = []
        meta_hosts = []

        for arg in args:
            for host in arg.split(','):
                if re.match('^[0-9]+[*]', host):
                    num, host = host.split('*', 1)
                    meta_hosts += int(num) * [host]
                elif re.match('^[*](\w*)', host):
                    meta_hosts += [re.match('^[*](\w*)', host).group(1)]
                elif host != '' and host not in hosts:
                    # Real hostname and not a duplicate
                    hosts.append(host)

        return (hosts, meta_hosts)


    def parse(self, parse_info=[]):
        host_info = topic_common.item_parse_info(attribute_name='hosts',
                                                 inline_option='machine',
                                                 filename_option='mlist')
        job_info = topic_common.item_parse_info(attribute_name='jobname',
                                                use_leftover=True)
        oth_info = topic_common.item_parse_info(attribute_name='one_time_hosts',
                                                inline_option='one_time_hosts')
        label_info = topic_common.item_parse_info(attribute_name='labels',
                                                  inline_option='labels')

        options, leftover = super(job_create_or_clone, self).parse(
                [host_info, job_info, oth_info, label_info] + parse_info,
                req_items='jobname')
        self.data = {}
        jobname = getattr(self, 'jobname')
        if len(jobname) > 1:
            self.invalid_syntax('Too many arguments specified, only expected '
                                'to receive job name: %s' % jobname)
        self.jobname = jobname[0]

        if options.priority:
            self.data['priority'] = options.priority.capitalize()

        if self.one_time_hosts:
            self.data['one_time_hosts'] = self.one_time_hosts

        if self.labels:
            label_hosts = self.execute_rpc(op='get_hosts',
                                           multiple_labels=self.labels)
            for host in label_hosts:
                self.hosts.append(host['hostname'])

        self.data['name'] = self.jobname

        (self.data['hosts'],
         self.data['meta_hosts']) = self._parse_hosts(self.hosts)

        self.data['email_list'] = options.email

        return options, leftover


    def create_job(self):
        job_id = self.execute_rpc(op='create_job', **self.data)
        return ['%s (id %s)' % (self.jobname, job_id)]


    def get_items(self):
        return [self.jobname]



class job_create(job_create_or_clone):
    """atest job create [--priority <Low|Medium|High|Urgent>]
    [--synch_count] [--control-file </path/to/cfile>]
    [--on-server] [--test <test1,test2>] [--kernel <http://kernel>]
    [--mlist </path/to/machinelist>] [--machine <host1 host2 host3>]
    [--labels <list of labels of machines to run on>]
    [--reboot_before <option>] [--reboot_after <option>]
    [--noverify] [--timeout <timeout>] [--max_runtime <max runtime>]
    [--one-time-hosts <hosts>] [--email <email>]
    [--dependencies <labels this job is dependent on>]
    [--atomic_group <atomic group name>] [--parse-failed-repair <option>]
    [--image <http://path/to/image>] [--require-ssp]
    job_name

    Creating a job is rather different from the other create operations,
    so it only uses the __init__() and output() from its superclass.
    """
    op_action = 'create'

    def __init__(self):
        super(job_create, self).__init__()
        self.ctrl_file_data = {}
        self.parser.add_option('-y', '--synch_count', type=int,
                               help='Number of machines to use per autoserv '
                                    'execution')
        self.parser.add_option('-f', '--control-file',
                               help='use this control file', metavar='FILE')
        self.parser.add_option('-s', '--server',
                               help='This is server-side job',
                               action='store_true', default=False)
        self.parser.add_option('-t', '--test',
                               help='List of tests to run')

        self.parser.add_option('-k', '--kernel', help='A comma separated list'
                               ' of kernel versions/URLs/filenames to run the'
                               ' job on')
        self.parser.add_option('--kernel-cmdline', help='A string that will be'
                               ' given as cmdline to the booted kernel(s)'
                               ' specified by the -k option')

        self.parser.add_option('-d', '--dependencies', help='Comma separated '
                               'list of labels this job is dependent on.',
                               default='')
        self.parser.add_option('-G', '--atomic_group', help='Name of an Atomic '
                               'Group to schedule this job on.',
                               default='')

        self.parser.add_option('-B', '--reboot_before',
                               help='Whether or not to reboot the machine '
                                    'before the job (never/if dirty/always)',
                               type='choice',
                               choices=('never', 'if dirty', 'always'))
        self.parser.add_option('-a', '--reboot_after',
                               help='Whether or not to reboot the machine '
                                    'after the job (never/if all tests passed/'
                                    'always)',
                               type='choice',
                               choices=('never', 'if all tests passed',
                                        'always'))

        self.parser.add_option('--parse-failed-repair',
                               help='Whether or not to parse failed repair '
                                    'results as part of the job',
                               type='choice',
                               choices=('true', 'false'))
        self.parser.add_option('-n', '--noverify',
                               help='Do not run verify for job',
                               default=False, action='store_true')
        self.parser.add_option('-o', '--timeout_mins',
                               help='Job timeout in minutes.',
                               metavar='TIMEOUT')
        self.parser.add_option('--max_runtime',
                               help='Job maximum runtime in minutes')

        self.parser.add_option('-i', '--image',
                               help='OS image to install before running the '
                                    'test.')
        self.parser.add_option('--require-ssp',
                               help='Require server-side packaging',
                               default=False, action='store_true')


    @staticmethod
    def _get_kernel_data(kernel_list, cmdline):
        # the RPC supports cmdline per kernel version in a dictionary
        kernels = []
        for version in re.split(r'[, ]+', kernel_list):
            if not version:
                continue
            kernel_info = {'version': version}
            if cmdline:
                kernel_info['cmdline'] = cmdline
            kernels.append(kernel_info)

        return kernels


    def parse(self):
        deps_info = topic_common.item_parse_info(attribute_name='dependencies',
                                                 inline_option='dependencies')
        options, leftover = super(job_create, self).parse(
                parse_info=[deps_info])

        if (len(self.hosts) == 0 and not self.one_time_hosts
            and not options.labels and not options.atomic_group):
            self.invalid_syntax('Must specify at least one machine '
                                'or an atomic group '
                                '(-m, -M, -b, -G or --one-time-hosts).')
        if not options.control_file and not options.test:
            self.invalid_syntax('Must specify either --test or --control-file'
                                ' to create a job.')
        if options.control_file and options.test:
            self.invalid_syntax('Can only specify one of --control-file or '
                                '--test, not both.')
        if options.kernel:
            self.ctrl_file_data['kernel'] = self._get_kernel_data(
                    options.kernel, options.kernel_cmdline)
        if options.control_file:
            try:
                control_file_f = open(options.control_file)
                try:
                    control_file_data = control_file_f.read()
                finally:
                    control_file_f.close()
            except IOError:
                self.generic_error('Unable to read from specified '
                                   'control-file: %s' % options.control_file)
            if options.kernel:
                # execute() will pass this to the AFE server to wrap this
                # control file up to include the kernel installation steps.
                self.ctrl_file_data['client_control_file'] = control_file_data
            else:
                self.data['control_file'] = control_file_data
        if options.test:
            if options.server:
                self.invalid_syntax('If you specify tests, then the '
                                    'client/server setting is implicit and '
                                    'cannot be overriden.')
            tests = [t.strip() for t in options.test.split(',') if t.strip()]
            self.ctrl_file_data['tests'] = tests

        if options.image:
            self.data['image'] = options.image

        if options.reboot_before:
            self.data['reboot_before'] = options.reboot_before.capitalize()
        if options.reboot_after:
            self.data['reboot_after'] = options.reboot_after.capitalize()
        if options.parse_failed_repair:
            self.data['parse_failed_repair'] = (
                options.parse_failed_repair == 'true')
        if options.noverify:
            self.data['run_verify'] = False
        if options.timeout_mins:
            self.data['timeout_mins'] = options.timeout_mins
        if options.max_runtime:
            self.data['max_runtime_mins'] = options.max_runtime

        if options.atomic_group:
            self.data['atomic_group_name'] = options.atomic_group

        self.data['dependencies'] = self.dependencies

        if options.synch_count:
            self.data['synch_count'] = options.synch_count
        if options.server:
            self.data['control_type'] = control_data.CONTROL_TYPE_NAMES.SERVER
        else:
            self.data['control_type'] = control_data.CONTROL_TYPE_NAMES.CLIENT

        self.data['require_ssp'] = options.require_ssp

        return options, leftover


    def execute(self):
        if self.ctrl_file_data:
            uploading_kernel = 'kernel' in self.ctrl_file_data
            if uploading_kernel:
                default_timeout = socket.getdefaulttimeout()
                socket.setdefaulttimeout(topic_common.UPLOAD_SOCKET_TIMEOUT)
                print 'Uploading Kernel: this may take a while...',
                sys.stdout.flush()
            try:
                cf_info = self.execute_rpc(op='generate_control_file',
                                           item=self.jobname,
                                           **self.ctrl_file_data)
            finally:
                if uploading_kernel:
                    socket.setdefaulttimeout(default_timeout)

            if uploading_kernel:
                print 'Done'
            self.data['control_file'] = cf_info['control_file']
            if 'synch_count' not in self.data:
                self.data['synch_count'] = cf_info['synch_count']
            if cf_info['is_server']:
                self.data['control_type'] = control_data.CONTROL_TYPE_NAMES.SERVER
            else:
                self.data['control_type'] = control_data.CONTROL_TYPE_NAMES.CLIENT

            # Get the union of the 2 sets of dependencies
            deps = set(self.data['dependencies'])
            deps = sorted(deps.union(cf_info['dependencies']))
            self.data['dependencies'] = list(deps)

        if 'synch_count' not in self.data:
            self.data['synch_count'] = 1

        return self.create_job()


class job_clone(job_create_or_clone):
    """atest job clone [--priority <Low|Medium|High|Urgent>]
    [--mlist </path/to/machinelist>] [--machine <host1 host2 host3>]
    [--labels <list of labels of machines to run on>]
    [--one-time-hosts <hosts>] [--email <email>]
    job_name

    Cloning a job is rather different from the other create operations,
    so it only uses the __init__() and output() from its superclass.
    """
    op_action = 'clone'
    usage_action = 'clone'

    def __init__(self):
        super(job_clone, self).__init__()
        self.parser.add_option('-i', '--id', help='Job id to clone',
                               default=False,
                               metavar='JOB_ID')
        self.parser.add_option('-r', '--reuse-hosts',
                               help='Use the exact same hosts as the '
                               'cloned job.',
                               action='store_true', default=False)


    def parse(self):
        options, leftover = super(job_clone, self).parse()

        self.clone_id = options.id
        self.reuse_hosts = options.reuse_hosts

        host_specified = self.hosts or self.one_time_hosts or options.labels
        if self.reuse_hosts and host_specified:
            self.invalid_syntax('Cannot specify hosts and reuse the same '
                                'ones as the cloned job.')

        if not (self.reuse_hosts or host_specified):
            self.invalid_syntax('Must reuse or specify at least one '
                                'machine (-r, -m, -M, -b or '
                                '--one-time-hosts).')

        return options, leftover


    def execute(self):
        clone_info = self.execute_rpc(op='get_info_for_clone',
                                      id=self.clone_id,
                                      preserve_metahosts=self.reuse_hosts)

        # Remove fields from clone data that cannot be reused
        for field in ('name', 'created_on', 'id', 'owner'):
            del clone_info['job'][field]

        # Also remove parameterized_job field, as the feature still is
        # incomplete, this tool does not attempt to support it for now,
        # it uses a different API function and it breaks create_job()
        if clone_info['job'].has_key('parameterized_job'):
            del clone_info['job']['parameterized_job']

        # Keyword args cannot be unicode strings
        self.data.update((str(key), val)
                         for key, val in clone_info['job'].iteritems())

        if self.reuse_hosts:
            # Convert host list from clone info that can be used for job_create
            for label, qty in clone_info['meta_host_counts'].iteritems():
                self.data['meta_hosts'].extend([label]*qty)

            self.data['hosts'].extend(host['hostname']
                                      for host in clone_info['hosts'])

        return self.create_job()


class job_abort(job, action_common.atest_delete):
    """atest job abort <job(s)>"""
    usage_action = op_action = 'abort'
    msg_done = 'Aborted'

    def parse(self):
        job_info = topic_common.item_parse_info(attribute_name='jobids',
                                                use_leftover=True)
        options, leftover = super(job_abort, self).parse([job_info],
                                                         req_items='jobids')


    def execute(self):
        data = {'job__id__in': self.jobids}
        self.execute_rpc(op='abort_host_queue_entries', **data)
        print 'Aborting jobs: %s' % ', '.join(self.jobids)


    def get_items(self):
        return self.jobids