普通文本  |  452行  |  17.05 KB

#!/usr/bin/python
#
# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.


"""Script to calculate timing stats for suites.

This script measures nine stats for a suite run.
1. Net suite runtime.
2. Suite scheduling overhead.
3. Average scheduling overhead.
4. Average Queuing time.
5. Average Resetting time.
6. Average provisioning time.
7. Average Running time.
8. Average Parsing time.
9. Average Gathering time.

When the cron_mode is enabled, this script throws all stats but the first one
(Net suite runtime) to Graphite because the first one is already
being sent to Graphite by Autotest online.

Net suite runtime is end-to-end time for a suite from the beginning
to the end.
It is stored in a field, "duration", of a type, "suite_runtime" in
elasticsearch (ES).

Suite scheduling overhead is defined by the average of DUT overheads.
Suite is composed of one or more jobs, and those jobs are run on
one or more DUTs that are available.
A DUT overhead is defined by:
    DUT_i overhead = sum(net time for job_k - runtime for job_k
                         - runtime for special tasks of job_k)
    Job_k are the jobs run on DUT_i.

Net time for a job is the time from job_queued_time to hqe_finished_time.
job_queued_time is stored in the "queued_time" column of "tko_jobs" table.
hqe_finished_time is stored in the "finished_on" of "afe_host_queue_entries"
table.
We do not use "job_finished_time" of "tko_jobs" as job_finished_time is
recorded before gathering/parsing/archiving.
We do not use hqe started time ("started_on" of "afe_host_queue_entries"),
as it does not account for the lag from a host is assigned to the job till
the scheduler sees the assignment.

Runtime for job_k is the sum of durations for the records of
"job_time_breakdown" type in ES that have "Queued" or "Running" status.
It is possible that a job has multiple "Queued" records when the job's test
failed and tried again.
We take into account only the last "Queued" record.

Runtime for special tasks of job_k is the sum of durations for the records
of "job_time_breakdown" type in ES that have "Resetting", "Provisioning",
"Gathering", or "Parsing" status.
We take into account only the records whose timestamp is larger than
the timestamp of the last "Queued" record.
"""

import argparse
from datetime import datetime
from datetime import timedelta

import common
from autotest_lib.client.common_lib import host_queue_entry_states
from autotest_lib.client.common_lib import time_utils
from autotest_lib.client.common_lib.cros.graphite import autotest_es
from autotest_lib.client.common_lib.cros.graphite import autotest_stats
from autotest_lib.frontend import setup_django_environment
from autotest_lib.frontend.afe import models
from autotest_lib.frontend.tko import models as tko_models
from autotest_lib.server import utils
from autotest_lib.site_utils import job_overhead


_options = None

_hqes = host_queue_entry_states.Status
_states = [
        _hqes.QUEUED, _hqes.RESETTING, _hqes.PROVISIONING,
        _hqes.RUNNING, _hqes.GATHERING, _hqes.PARSING]


def mean(l):
    """
    Calculates an Arithmetic Mean for the numbers in a list.

    @param l: A list of numbers.
    @return: Arithmetic mean if the list is not empty.
             Otherwise, returns zero.
    """
    return float(sum(l)) / len(l) if l else 0


def print_verbose(string, *args):
    if _options.verbose:
        print(string % args)


def get_nontask_runtime(job_id, dut, job_info_dict):
    """
    Get sum of durations for "Queued", "Running", "Parsing", and "Gathering"
    status records.
    job_info_dict will be modified in this function to store the duration
    for each status.

    @param job_id: The job id of interest.
    @param dut: Hostname of a DUT that the job ran on.
    @param job_info_dict: Dictionary that has information for jobs.
    @return: Tuple of sum of durations and the timestamp for the last
             Queued record.
    """
    results = autotest_es.query(
            fields_returned=['status', 'duration', 'time_recorded'],
            equality_constraints=[('_type', 'job_time_breakdown'),
                                  ('job_id', job_id),
                                  ('hostname', dut)],
            sort_specs=[{'time_recorded': 'desc'}])

    sum = 0
    last_queued_timestamp = 0
    # There could be multiple "Queued" records.
    # Get sum of durations for the records after the last "Queued" records
    # (including the last "Queued" record).
    # Exploits the fact that "results" are ordered in the descending order
    # of time_recorded.
    for hit in results.hits:
        job_info_dict[job_id][hit['status']] = float(hit['duration'])
        if hit['status'] == 'Queued':
            # The first Queued record is the last one because of the descending
            # order of "results".
            last_queued_timestamp = float(hit['time_recorded'])
            sum += float(hit['duration'])
            break
        else:
            sum += float(hit['duration'])
    return (sum, last_queued_timestamp)


def get_tasks_runtime(task_list, dut, t_start, job_id, job_info_dict):
    """
    Get sum of durations for special tasks.
    job_info_dict will be modified in this function to store the duration
    for each special task.

    @param task_list: List of task id.
    @param dut: Hostname of a DUT that the tasks ran on.
    @param t_start: Beginning timestamp.
    @param job_id: The job id that is related to the tasks.
                   This is used only for debugging purpose.
    @param job_info_dict: Dictionary that has information for jobs.
    @return: Sum of durations of the tasks.
    """
    t_start_epoch = time_utils.to_epoch_time(t_start)
    results = autotest_es.query(
            fields_returned=['status', 'task_id', 'duration'],
            equality_constraints=[('_type', 'job_time_breakdown'),
                                  ('hostname', dut)],
            range_constraints=[('time_recorded', t_start_epoch, None)],
            batch_constraints=[('task_id', task_list)])
    sum = 0
    for hit in results.hits:
        sum += float(hit['duration'])
        job_info_dict[job_id][hit['status']] = float(hit['duration'])
        print_verbose('Task %s for Job %s took %s',
                      hit['task_id'], job_id, hit['duration'])
    return sum


def get_job_runtime(job_id, dut, job_info_dict):
    """
    Get sum of durations for the entries that are related to a job.
    job_info_dict will be modified in this function.

    @param job_id: The job id of interest.
    @param dut: Hostname of a DUT that the job ran on.
    @param job_info_dict: Dictionary that has information for jobs.
    @return: Total duration taken by a job.
    """
    sum, t_last_queued = get_nontask_runtime(job_id, dut, job_info_dict)
    print_verbose('Job %s took %f, last Queued: %s',
                  job_id, sum, t_last_queued)
    sum += get_tasks_runtime(
            list(job_info_dict[job_id]['tasks']), dut, t_last_queued,
            job_id, job_info_dict)
    return sum


def get_dut_overhead(dut, jobs, job_info_dict):
    """
    Calculates the scheduling overhead of a DUT.

    The scheduling overhead of a DUT is defined by the sum of scheduling
    overheads for the jobs that ran on the DUT.
    The scheduling overhead for a job is defined by the difference
    of net job runtime and real job runtime.
    job_info_dict will be modified in this function.

    @param dut: Hostname of a DUT.
    @param jobs: The list of jobs that ran on the DUT.
    @param job_info_dict: Dictionary that has information for jobs.
    @return: Scheduling overhead of a DUT in a floating point value.
             The unit is a second.
    """
    overheads = []
    for job_id in jobs:
        (t_start, t_end) = job_info_dict[job_id]['timestamps']
        runtime = get_job_runtime(job_id, dut, job_info_dict)
        overheads.append(t_end - t_start - runtime)
        print_verbose('Job: %s, Net runtime: %f, Real runtime: %f, '
                      'Overhead: %f', job_id, t_end - t_start, runtime,
                      t_end - t_start - runtime)
    return sum(overheads)


def get_child_jobs_info(suite_job_id, num_child_jobs, sanity_check):
    """
    Gets information about child jobs of a suite.

    @param suite_job_id: Job id of a suite.
    @param num_child_jobs: Number of child jobs of the suite.
    @param sanity_check: Do sanity check if True.
    @return: A tuple of (dictionary, list). For dictionary, the key is
             a DUT's hostname and the value is a list of jobs that ran on
             the DUT. List is the list of all jobs of the suite.
    """
    results = autotest_es.query(
            fields_returned=['job_id', 'hostname'],
            equality_constraints=[('_type', 'host_history'),
                                  ('parent_job_id', suite_job_id),
                                  ('status', 'Running'),])

    dut_jobs_dict = {}
    job_filter = set()
    for hit in results.hits:
        job_id = hit['job_id']
        dut = hit['hostname']
        if job_id in job_filter:
            continue
        job_list = dut_jobs_dict.setdefault(dut, [])
        job_list.append(job_id)
        job_filter.add(job_id)

    if sanity_check and len(job_filter) != num_child_jobs:
        print('WARNING: Mismatch number of child jobs of a suite (%d): '
              '%d != %d' % (suite_job_id, len(job_filter), num_child_jobs))
    return dut_jobs_dict, list(job_filter)


def get_job_timestamps(job_list, job_info_dict):
    """
    Get beginning time and ending time for each job.

    The beginning time of a job is "queued_time" of "tko_jobs" table.
    The ending time of a job is "finished_on" of "afe_host_queue_entries" table.
    job_info_dict will be modified in this function to store the timestamps.

    @param job_list: List of job ids
    @param job_info_dict: Dictionary that timestamps for each job will be stored
    """
    tko = tko_models.Job.objects.filter(afe_job_id__in=job_list)
    hqe = models.HostQueueEntry.objects.filter(job_id__in=job_list)
    job_start = {}
    for t in tko:
        job_start[t.afe_job_id] = time_utils.to_epoch_time(t.queued_time)
    job_end = {}
    for h in hqe:
        job_end[h.job_id] = time_utils.to_epoch_time(h.finished_on)

    for job_id in job_list:
        info_dict = job_info_dict.setdefault(job_id, {})
        info_dict.setdefault('timestamps', (job_start[job_id], job_end[job_id]))


def get_job_tasks(job_list, job_info_dict):
    """
    Get task ids for each job.
    job_info_dict will be modified in this function to store the task ids.

    @param job_list: List of job ids
    @param job_info_dict: Dictionary that task ids for each job will be stored.
    """
    results = autotest_es.query(
            fields_returned=['job_id', 'task_id'],
            equality_constraints=[('_type', 'host_history')],
            batch_constraints=[('job_id', job_list)])
    for hit in results.hits:
        if 'task_id' in hit:
            info_dict = job_info_dict.setdefault(hit['job_id'], {})
            task_set = info_dict.setdefault('tasks', set())
            task_set.add(hit['task_id'])


def get_scheduling_overhead(suite_job_id, num_child_jobs, sanity_check=True):
    """
    Calculates a scheduling overhead.

    A scheduling overhead is defined by the average of DUT overheads
    for the DUTs that the child jobs of a suite ran on.

    @param suite_job_id: Job id of a suite.
    @param num_child_jobs: Number of child jobs of the suite.
    @param sanity_check: Do sanity check if True.
    @return: Dictionary storing stats.
    """
    dut_jobs_dict, job_list = get_child_jobs_info(
            suite_job_id, num_child_jobs, sanity_check)
    job_info_dict = {}
    get_job_timestamps(job_list, job_info_dict)
    get_job_tasks(job_list, job_info_dict)

    dut_overheads = []
    avg_overhead = 0
    for dut, jobs in dut_jobs_dict.iteritems():
        print_verbose('Dut: %s, Jobs: %s', dut, jobs)
        overhead = get_dut_overhead(dut, jobs, job_info_dict)
        avg_overhead += overhead
        print_verbose('Dut overhead: %f', overhead)
        dut_overheads.append(overhead)

    if job_list:
        avg_overhead = avg_overhead / len(job_list)

    state_samples_dict = {}
    for info in job_info_dict.itervalues():
        for state in _states:
            if state in info:
                samples = state_samples_dict.setdefault(state, [])
                samples.append(info[state])

    if state_samples_dict:
        result = {state: mean(state_samples_dict[state])
                  if state in state_samples_dict else 0
                  for state in _states}
    result['suite_overhead'] = mean(dut_overheads)
    result['overhead'] = avg_overhead
    result['num_duts'] = len(dut_jobs_dict)
    return result


def print_suite_stats(suite_stats):
    """Prints out statistics for a suite to standard output."""
    print('suite_overhead: %(suite_overhead)f, overhead: %(overhead)f,' %
          suite_stats),
    for state in _states:
        if state in suite_stats:
            print('%s: %f,' % (state, suite_stats[state])),
    print('num_duts: %(num_duts)d' % suite_stats)


def analyze_suites(start_time, end_time):
    """
    Calculates timing stats (i.e., suite runtime, scheduling overhead)
    for the suites that finished within the timestamps given by parameters.

    @param start_time: Beginning timestamp.
    @param end_time: Ending timestamp.
    """
    print('Analyzing suites from %s to %s...' % (
          time_utils.epoch_time_to_date_string(start_time),
          time_utils.epoch_time_to_date_string(end_time)))

    if _options.bvtonly:
        batch_constraints = [
                ('suite_name', ['bvt-inline', 'bvt-cq', 'bvt-perbuild'])]
    else:
        batch_constraints = []

    start_time_epoch = time_utils.to_epoch_time(start_time)
    end_time_epoch = time_utils.to_epoch_time(end_time)
    results = autotest_es.query(
            fields_returned=['suite_name', 'suite_job_id', 'board', 'build',
                             'num_child_jobs', 'duration'],
            equality_constraints=[('_type', job_overhead.SUITE_RUNTIME_KEY),],
            range_constraints=[('time_recorded', start_time_epoch,
                                end_time_epoch)],
            sort_specs=[{'time_recorded': 'asc'}],
            batch_constraints=batch_constraints)
    print('Found %d suites' % (results.total))

    for hit in results.hits:
        suite_job_id = hit['suite_job_id']

        try:
            suite_name = hit['suite_name']
            num_child_jobs = int(hit['num_child_jobs'])
            suite_runtime = float(hit['duration'])

            print('Suite: %s (%s), Board: %s, Build: %s, Num child jobs: %d' % (
                    suite_name, suite_job_id, hit['board'], hit['build'],
                    num_child_jobs))

            suite_stats = get_scheduling_overhead(suite_job_id, num_child_jobs)
            print('Suite: %s (%s) runtime: %f,' % (
                    suite_name, suite_job_id, suite_runtime)),
            print_suite_stats(suite_stats)

            if _options.cron_mode:
                key = utils.get_data_key(
                        'suite_time_stats', suite_name, hit['build'],
                        hit['board'])
                autotest_stats.Timer(key).send('suite_runtime', suite_runtime)
                for stat, val in suite_stats.iteritems():
                    autotest_stats.Timer(key).send(stat, val)
        except Exception as e:
            print('ERROR: Exception is raised while processing suite %s' % (
                    suite_job_id))
            print e


def analyze_suite(suite_job_id):
    suite_stats = get_scheduling_overhead(suite_job_id, 0, False)
    print('Suite (%s)' % suite_job_id),
    print_suite_stats(suite_stats)


def main():
    """main script."""
    parser = argparse.ArgumentParser(
            formatter_class=argparse.ArgumentDefaultsHelpFormatter)
    parser.add_argument('-c', dest='cron_mode', action='store_true',
                        help=('Run in a cron mode. Cron mode '
                              'sends calculated stat data to Graphite.'),
                        default=False)
    parser.add_argument('-s', type=int, dest='span',
                        help=('Number of hours that stats should be '
                              'collected.'),
                        default=1)
    parser.add_argument('--bvtonly', dest='bvtonly', action='store_true',
                        help=('Gets bvt suites only (i.e., bvt-inline,'
                              'bvt-cq, bvt-perbuild).'),
                        default=False)
    parser.add_argument('--suite', type=int, dest='suite_job_id',
                        help=('Job id of a suite.'))
    parser.add_argument('--verbose', dest='verbose', action='store_true',
                        help=('Prints out more info if True.'),
                        default=False)
    global _options
    _options = parser.parse_args()

    if _options.suite_job_id:
        analyze_suite(_options.suite_job_id)
    else:
        end_time = time_utils.to_epoch_time(datetime.now())
        start_time = end_time - timedelta(hours=_options.span).total_seconds()
        analyze_suites(start_time, end_time)


if __name__ == '__main__':
    main()