普通文本  |  362行  |  12.88 KB

# Copyright 2018 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

"""Module for CrOS dynamic test suite generation and execution."""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import contextlib
import itertools
import json
import logging
import os
import re
import time

from lucifer import autotest
from skylab_suite import cros_suite
from skylab_suite import swarming_lib


SKYLAB_DRONE_SWARMING_WORKER = '/opt/infra-tools/skylab_swarming_worker'
SKYLAB_SUITE_USER = 'skylab_suite_runner'
SKYLAB_TOOL = '/opt/infra-tools/skylab'

SUITE_WAIT_SLEEP_INTERVAL_SECONDS = 30

# See #5 in crbug.com/873886 for more details.
_NOT_SUPPORTED_DEPENDENCIES = ['skip_provision', 'cleanup-reboot', 'rpm',
                               'modem_repair']


def run(client, test_specs, suite_handler, dry_run=False):
    """Run a CrOS dynamic test suite.

    @param client: A swarming_lib.Client instance.
    @param test_specs: A list of cros_suite.TestSpec objects.
    @param suite_handler: A cros_suite.SuiteHandler object.
    @param dry_run: Whether to kick off dry runs of the tests.
    """
    assert isinstance(client, swarming_lib.Client)
    if suite_handler.suite_id:
        # Resume an existing suite.
        _resume_suite(client, test_specs, suite_handler, dry_run)
    else:
        # Make a new suite.
        _run_suite(test_specs, suite_handler, dry_run)


def _resume_suite(client, test_specs, suite_handler, dry_run=False):
    """Resume a suite and its child tasks by given suite id."""
    assert isinstance(client, swarming_lib.Client)
    suite_id = suite_handler.suite_id
    all_tasks = client.get_child_tasks(suite_id)
    not_yet_scheduled = _get_unscheduled_test_specs(
            test_specs, suite_handler, all_tasks)

    logging.info('Not yet scheduled test_specs: %r', not_yet_scheduled)
    _create_test_tasks(not_yet_scheduled, suite_handler, suite_id, dry_run)

    if suite_id is not None and suite_handler.should_wait():
        _wait_for_results(suite_handler, dry_run=dry_run)


def _get_unscheduled_test_specs(test_specs, suite_handler, all_tasks):
    not_yet_scheduled = []
    for test_spec in test_specs:
        if suite_handler.is_provision():
            # We cannot check bot_id because pending tasks do not have it yet.
            bot_id_tag = 'id:%s' % test_spec.bot_id
            tasks = [t for t in all_tasks if bot_id_tag in t['tags']]
        else:
            tasks = [t for t in all_tasks if t['name']==test_spec.test.name]

        if not tasks:
            not_yet_scheduled.append(test_spec)
            continue

        current_task = _get_current_task(tasks)
        test_task_id = (current_task['task_id'] if current_task
                        else tasks[0]['task_id'])
        remaining_retries = test_spec.test.job_retries - len(tasks)
        previous_retried_ids = [t['task_id'] for t in tasks
                                if t['task_id'] != test_task_id]
        suite_handler.add_test_by_task_id(
                test_task_id,
                cros_suite.TestHandlerSpec(
                        test_spec=test_spec,
                        remaining_retries=remaining_retries,
                        previous_retried_ids=previous_retried_ids))

    return not_yet_scheduled


def _get_current_task(tasks):
    """Get current running task.

    @param tasks: A list of task dicts including task_id, state, etc.

    @return a dict representing the current running task.
    """
    current_task = None
    for t in tasks:
        if t['state'] not in swarming_lib.TASK_FINISHED_STATUS:
            if current_task:
                raise ValueError(
                        'Parent task has 2 same running child tasks: %s, %s'
                        % (current_task['task_id'], t['task_id']))

            current_task = t

    return current_task


def _run_suite(test_specs, suite_handler, dry_run=False):
    """Make a new suite."""
    suite_id = os.environ.get('SWARMING_TASK_ID')
    if not suite_id:
        raise ValueError("Unable to determine suite's task id from env var "
                         "SWARMING_TASK_ID.")
    _create_test_tasks(test_specs, suite_handler, suite_id, dry_run)
    suite_handler.set_suite_id(suite_id)

    if suite_handler.should_wait():
        _wait_for_results(suite_handler, dry_run=dry_run)


def _create_test_tasks(test_specs, suite_handler, suite_id, dry_run=False):
    """Create test tasks for a list of tests (TestSpecs).

    Given a list of TestSpec object, this function will schedule them on
    swarming one by one, and add them to the swarming_task_id-to-test map
    of suite_handler to keep monitoring them.

    @param test_specs: A list of cros_suite.TestSpec objects to schedule.
    @param suite_handler: A cros_suite.SuiteHandler object to monitor the
        test_specs' progress.
    @param suite_id: A string ID for a suite task, it's the parent task id for
        these to-be-scheduled test_specs.
    @param dry_run: Whether to kick off dry runs of the tests.
    """
    for test_spec in test_specs:
        test_task_id = _create_test_task(
                test_spec,
                suite_id=suite_id,
                is_provision=suite_handler.is_provision(),
                dry_run=dry_run)
        suite_handler.add_test_by_task_id(
                test_task_id,
                cros_suite.TestHandlerSpec(
                        test_spec=test_spec,
                        remaining_retries=test_spec.test.job_retries - 1,
                        previous_retried_ids=[]))


def _create_test_task(test_spec, suite_id=None,
                      is_provision=False, dry_run=False):
    """Create a test task for a given test spec.

    @param test_spec: A cros_suite.TestSpec object.
    @param suite_id: the suite task id of the test.
    @param dry_run: If true, don't actually create task.

    @return the swarming task id of this task.
    """
    logging.info('Creating task for test %s', test_spec.test.name)
    skylab_tool_path = os.environ.get('SKYLAB_TOOL', SKYLAB_TOOL)

    cmd = [
        skylab_tool_path, 'create-test',
        '-board', test_spec.board,
        '-image', test_spec.build,
        '-service-account-json', os.environ['SWARMING_CREDS'],
        ]
    if _is_dev():
        cmd += ['-dev']
    if test_spec.pool:
        # TODO(akeshet): Clean up this hack around pool name translation.
        autotest_pool_label = 'pool:%s' % test_spec.pool
        pool_dependency_value = swarming_lib.task_dependencies_from_labels(
            [autotest_pool_label])['label-pool']
        cmd += ['-pool', pool_dependency_value]

    if test_spec.model:
        cmd += ['-model', test_spec.model]
    if test_spec.quota_account:
        cmd += ['-qs-account', test_spec.quota_account]
    if test_spec.test.test_type.lower() == 'client':
        cmd += ['-client-test']

    tags = _compute_tags(test_spec.build, suite_id)
    dimensions = _compute_dimensions(
            test_spec.bot_id, test_spec.test.dependencies)
    keyvals_flat = _compute_job_keyvals_flat(test_spec.keyvals, suite_id)

    for tag in tags:
        cmd += ['-tag', tag]
    for keyval in keyvals_flat:
        cmd += ['-keyval', keyval]
    cmd += [test_spec.test.name]
    cmd += dimensions

    if dry_run:
        logging.info('Would have created task with command %s', cmd)
        return

    # TODO(akeshet): Avoid this late chromite import.
    cros_build_lib = autotest.chromite_load('cros_build_lib')
    result = cros_build_lib.RunCommand(cmd, capture_output=True)
    # TODO(akeshet): Use -json flag and json-parse output of the command instead
    # of regex matching to determine task_id.
    m = re.match('.*id=(.*)$', result.output)
    task_id = m.group(1)
    logging.info('Created task with id %s', task_id)
    return task_id


# TODO(akeshet): Eliminate the need for this, by either adding an explicit
# swarming_server argument to skylab tool, or having the tool respect the
# SWARMING_SERVER environment variable. See crbug.com/948774
def _is_dev():
    """Detect whether skylab tool should be invoked with -dev flag."""
    return 'chromium-swarm-dev' in os.environ['SWARMING_SERVER']

def _compute_tags(build, suite_id):
    tags = [
        'build:%s' % build,
    ]
    if suite_id is not None:
        tags += ['parent_task_id:%s' % suite_id]
    return tags


def _compute_dimensions(bot_id, dependencies):
    dimensions = []
    if bot_id:
        dimensions += ['id:%s' % bot_id]
    deps = _filter_unsupported_dependencies(dependencies)
    flattened_swarming_deps = sorted([
        '%s:%s' % (k, v) for
        k, v in swarming_lib.task_dependencies_from_labels(deps).items()
        ])
    dimensions += flattened_swarming_deps
    return dimensions


def _compute_job_keyvals_flat(keyvals, suite_id):
    # Job keyvals calculation.
    job_keyvals = keyvals.copy()
    if suite_id is not None:
        # TODO(akeshet): Avoid this late autotest constants import.
        constants = autotest.load('server.cros.dynamic_suite.constants')
        job_keyvals[constants.PARENT_JOB_ID] = suite_id
    keyvals_flat = sorted(
        ['%s:%s' % (k, v) for k, v in job_keyvals.items()])
    return keyvals_flat


def _filter_unsupported_dependencies(dependencies):
    """Filter out Skylab-unsupported test dependencies, with a warning."""
    deps = []
    for dep in dependencies:
        if dep in _NOT_SUPPORTED_DEPENDENCIES:
            logging.warning('Dependency %s is not supported in skylab', dep)
        else:
            deps.append(dep)
    return deps


@contextlib.contextmanager
def disable_logging(logging_level):
    """Context manager for disabling logging of a given logging level."""
    try:
        logging.disable(logging_level)
        yield
    finally:
        logging.disable(logging.NOTSET)


def _loop_and_wait_forever(suite_handler, dry_run):
    """Wait for child tasks to finish or break."""
    for iterations in itertools.count(0):
        # Log progress every 300 seconds.
        no_logging = bool(iterations * SUITE_WAIT_SLEEP_INTERVAL_SECONDS % 300)
        with disable_logging(logging.INFO if no_logging else logging.NOTSET):
            suite_handler.handle_results(suite_handler.suite_id)
            if suite_handler.is_finished_waiting():
                break

        for t in suite_handler.retried_tasks:
            _retry_test(suite_handler, t['task_id'], dry_run=dry_run)

        time.sleep(SUITE_WAIT_SLEEP_INTERVAL_SECONDS)


def _wait_for_results(suite_handler, dry_run=False):
    """Wait for child tasks to finish and return their results.

    @param suite_handler: a cros_suite.SuiteHandler object.
    """
    timeout_util = autotest.chromite_load('timeout_util')
    try:
        with timeout_util.Timeout(suite_handler.timeout_mins * 60 -
                                  suite_handler.passed_mins * 60):
            _loop_and_wait_forever(suite_handler, dry_run)
    except timeout_util.TimeoutError:
        logging.error('Timeout in waiting for child tasks.')
        return

    logging.info('Finished to wait for child tasks.')


def _retry_test(suite_handler, task_id, dry_run=False):
    """Retry test for a suite.

    We will execute the following actions for retrying a test:
        1. Schedule the test.
        2. Add the test with the new swarming task id to the suite's
           retry handler, but reduce its remaining retries by 1.
        3. Reduce the suite-level max retries by 1.
        4. Remove prevous failed test from retry handler since it's not
           actively monitored by the suite.

    @param suite_handler: a cros_suite.SuiteHandler object.
    @param task_id: The swarming task id for the retried test.
    @param dry_run: Whether to retry a dry run of the test.
    """
    last_retry_spec = suite_handler.get_test_by_task_id(task_id)
    logging.info('Retrying test %s, remaining %d retries.',
                 last_retry_spec.test_spec.test.name,
                 last_retry_spec.remaining_retries - 1)
    retried_task_id = _create_test_task(
            last_retry_spec.test_spec,
            suite_id=suite_handler.suite_id,
            is_provision=suite_handler.is_provision(),
            dry_run=dry_run)
    previous_retried_ids = last_retry_spec.previous_retried_ids + [task_id]
    suite_handler.add_test_by_task_id(
            retried_task_id,
            cros_suite.TestHandlerSpec(
                    test_spec=last_retry_spec.test_spec,
                    remaining_retries=last_retry_spec.remaining_retries - 1,
                    previous_retried_ids=previous_retried_ids))
    suite_handler.set_max_retries(suite_handler.max_retries - 1)
    suite_handler.remove_test_by_task_id(task_id)


def _convert_dict_to_string(input_dict):
    """Convert dictionary to a string.

    @param input_dict: A dictionary.
    """
    for k, v in input_dict.iteritems():
        if isinstance(v, dict):
            input_dict[k] = _convert_dict_to_string(v)
        else:
            input_dict[k] = str(v)

    return json.dumps(input_dict)