普通文本  |  605行  |  21.8 KB

# Copyright (c) 2017 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

import collections
import json
import logging
import numpy
import operator
import os
import re
import time
import urllib
import urllib2

from autotest_lib.client.bin import utils
from autotest_lib.client.common_lib import error
from autotest_lib.client.common_lib import lsbrelease_utils
from autotest_lib.client.common_lib.cros import retry
from autotest_lib.client.cros.power import power_status
from autotest_lib.client.cros.power import power_utils

<!DOCTYPE html>
<script type="text/javascript" src="https://www.gstatic.com/charts/loader.js">
<script type="text/javascript">
    google.charts.load('current', {{'packages':['corechart']}});
    function drawChart() {{
        var data = google.visualization.arrayToDataTable([
        var unit = '{unit}';
        var options = {{
            width: 1600,
            height: 1200,
            lineWidth: 1,
            legend: {{ position: 'top', maxLines: 3 }},
            vAxis: {{ viewWindow: {{min: 0}}, title: '{type} ({unit})' }},
            hAxis: {{ viewWindow: {{min: 0}}, title: 'time (second)' }},
        var element = document.getElementById('{type}');
        var chart;
        if (unit == 'percent') {{
            options['isStacked'] = true;
            chart = new google.visualization.SteppedAreaChart(element);
        }} else {{
            chart = new google.visualization.LineChart(element);
        chart.draw(data, options);
<div id="{type}"></div>

class BaseDashboard(object):
    """Base class that implements method for prepare and upload data to power

    def __init__(self, logger, testname, start_ts=None, resultsdir=None,
        """Create BaseDashboard objects.

            logger: object that store the log. This will get convert to
                    dictionary by self._convert()
            testname: name of current test
            start_ts: timestamp of when test started in seconds since epoch
            resultsdir: directory to save the power json
            uploadurl: url to upload power data
        self._logger = logger
        self._testname = testname
        self._start_ts = start_ts if start_ts else time.time()
        self._resultsdir = resultsdir
        self._uploadurl = uploadurl

    def _create_powerlog_dict(self, raw_measurement):
        """Create powerlog dictionary from raw measurement data
        Data format in go/power-dashboard-data.

            raw_measurement: dictionary contains raw measurement data

            A dictionary of powerlog
        powerlog_dict = {
            'format_version': 5,
            'timestamp': self._start_ts,
            'test': self._testname,
            'dut': self._create_dut_info_dict(raw_measurement['data'].keys()),
            'power': raw_measurement,

        return powerlog_dict

    def _create_dut_info_dict(self, power_rails):
        """Create a dictionary that contain information of the DUT.

        MUST be implemented in subclass.

            power_rails: list of measured power rails

            DUT info dictionary
        raise NotImplementedError

    def _save_json(self, powerlog_dict, resultsdir, filename='power_log.json'):
        """Convert powerlog dict to human readable formatted JSON and
        append to <resultsdir>/<filename>.

            powerlog_dict: dictionary of power data
            resultsdir: directory to save formatted JSON object
            filename: filename to append to
        if not os.path.exists(resultsdir):
            raise error.TestError('resultsdir %s does not exist.' % resultsdir)
        filename = os.path.join(resultsdir, filename)
        json_str = json.dumps(powerlog_dict, indent=4, separators=(',', ': '),
        json_str = utils.strip_non_printable(json_str)
        with file(filename, 'a') as f:

    def _save_html(self, powerlog_dict, resultsdir, filename='power_log.html'):
        """Convert powerlog dict to chart in HTML page and append to

        Note that this results in multiple HTML objects in one file but Chrome
        can render all of it in one page.

            powerlog_dict: dictionary of power data
            resultsdir: directory to save HTML page
            filename: filename to append to
        # Create dict from type to sorted list of rail names.
        rail_type = collections.defaultdict(list)
        for r, t in powerlog_dict['power']['type'].iteritems():
        for t in rail_type:
            rail_type[t] = sorted(rail_type[t])

        html_str = ''
        row_indent = ' ' * 12
        for t in rail_type:
            data_str_list = []

            # Generate rail name data string.
            header = ['time'] + rail_type[t]
            header_str = row_indent + "['" + "', '".join(header) + "']"

            # Generate measurements data string.
            for i in range(powerlog_dict['power']['sample_count']):
                row = [str(i * powerlog_dict['power']['sample_duration'])]
                for r in rail_type[t]:
                row_str = row_indent + '[' + ', '.join(row) + ']'

            data_str = ',\n'.join(data_str_list)
            unit = powerlog_dict['power']['unit'][rail_type[t][0]]
            html_str += _HTML_CHART_STR.format(data=data_str, unit=unit, type=t)

        if not os.path.exists(resultsdir):
            raise error.TestError('resultsdir %s does not exist.' % resultsdir)
        filename = os.path.join(resultsdir, filename)
        with file(filename, 'a') as f:

    def _upload(self, powerlog_dict, uploadurl):
        """Convert powerlog dict to minimal size JSON and upload to dashboard.

            powerlog_dict: dictionary of power data
            uploadurl: url to upload the power data
        json_str = json.dumps(powerlog_dict, ensure_ascii=False)
        data_obj = {'data': utils.strip_non_printable(json_str)}
        encoded = urllib.urlencode(data_obj)
        req = urllib2.Request(uploadurl, encoded)

        @retry.retry(urllib2.URLError, blacklist=[urllib2.HTTPError],
                     timeout_min=5.0, delay_sec=1, backoff=2)
        def _do_upload():


    def _create_checkpoint_dict(self):
        """Create dictionary for checkpoint.

        @returns a dictionary of tags to their corresponding intervals in the
                 following format:
                      tag1: [(start1, end1), (start2, end2), ...],
                      tag2: [(start3, end3), (start4, end4), ...],
        raise NotImplementedError

    def _tag_with_checkpoint(self, power_dict):
        """Tag power_dict with checkpoint data.

        This function translates the checkpoint intervals into a list of tags
        for each data point.

        @param power_dict: a dictionary with power data; assume this dictionary
                           has attributes 'sample_count' and 'sample_duration'.
        checkpoint_dict = self._create_checkpoint_dict()

        # Create list of check point event tuple.
        # Tuple format: (checkpoint_name:str, event_time:float, is_start:bool)
        checkpoint_event_list = []
        for name, intervals in checkpoint_dict.iteritems():
            for start, finish in intervals:
                checkpoint_event_list.append((name, start, True))
                checkpoint_event_list.append((name, finish, False))

        checkpoint_event_list = sorted(checkpoint_event_list,

        # Add dummy check point at 1e9 seconds.
        checkpoint_event_list.append(('dummy', 1e9, True))

        interval_set = set()
        event_index = 0
        checkpoint_list = []
        for i in range(power_dict['sample_count']):
            curr_time = i * power_dict['sample_duration']

            # Process every checkpoint event until current point of time
            while checkpoint_event_list[event_index][1] <= curr_time:
                name, _, is_start = checkpoint_event_list[event_index]
                if is_start:
                event_index += 1

        power_dict['checkpoint'] = checkpoint_list

    def _convert(self):
        """Convert data from self._logger object to raw power measurement

        MUST be implemented in subclass.

            raw measurement dictionary
        raise NotImplementedError

    def upload(self):
        """Upload powerlog to dashboard and save data to results directory.
        raw_measurement = self._convert()
        if raw_measurement is None:

        powerlog_dict = self._create_powerlog_dict(raw_measurement)
        if self._resultsdir is not None:
            self._save_json(powerlog_dict, self._resultsdir)
            self._save_html(powerlog_dict, self._resultsdir)
        if self._uploadurl is not None:
            self._upload(powerlog_dict, self._uploadurl)

class ClientTestDashboard(BaseDashboard):
    """Dashboard class for autotests that run on client side.

    def __init__(self, logger, testname, start_ts=None, resultsdir=None,
                 uploadurl=None, note=''):
        """Create BaseDashboard objects.

            logger: object that store the log. This will get convert to
                    dictionary by self._convert()
            testname: name of current test
            start_ts: timestamp of when test started in seconds since epoch
            resultsdir: directory to save the power json
            uploadurl: url to upload power data
            note: note for current test run
        super(ClientTestDashboard, self).__init__(logger, testname, start_ts,
                                                  resultsdir, uploadurl)
        self._note = note

    def _create_dut_info_dict(self, power_rails):
        """Create a dictionary that contain information of the DUT.

            power_rails: list of measured power rails

            DUT info dictionary
        board = utils.get_board()
        platform = utils.get_platform()

        if not platform.startswith(board):
            board += '_' + platform

        if power_utils.has_hammer():
            board += '_hammer'

        dut_info_dict = {
            'board': board,
            'version': {
                'hw': utils.get_hardware_revision(),
                'milestone': lsbrelease_utils.get_chromeos_release_milestone(),
                'os': lsbrelease_utils.get_chromeos_release_version(),
                'channel': lsbrelease_utils.get_chromeos_channel(),
                'firmware': utils.get_firmware_version(),
                'ec': utils.get_ec_version(),
                'kernel': utils.get_kernel_version(),
            'sku': {
                'cpu': utils.get_cpu_name(),
                'memory_size': utils.get_mem_total_gb(),
                'storage_size': utils.get_disk_size_gb(utils.get_root_device()),
                'display_resolution': utils.get_screen_resolution(),
            'ina': {
                'version': 0,
                'ina': power_rails,
            'note': self._note,

        if power_utils.has_battery():
            status = power_status.get_status()
            if status.battery:
                # Round the battery size to nearest tenth because it is
                # fluctuated for platform without battery nominal voltage data.
                dut_info_dict['sku']['battery_size'] = round(
                        status.battery[0].energy_full_design, 1)
                dut_info_dict['sku']['battery_shutdown_percent'] = \
        return dut_info_dict

class MeasurementLoggerDashboard(ClientTestDashboard):
    """Dashboard class for power_status.MeasurementLogger.

    def __init__(self, logger, testname, resultsdir=None, uploadurl=None,
        super(MeasurementLoggerDashboard, self).__init__(logger, testname, None,
                                                         resultsdir, uploadurl,
        self._unit = None
        self._type = None
        self._padded_domains = None

    def _create_powerlog_dict(self, raw_measurement):
        """Create powerlog dictionary from raw measurement data
        Data format in go/power-dashboard-data.

            raw_measurement: dictionary contains raw measurement data

            A dictionary of powerlog
        powerlog_dict = \
                super(MeasurementLoggerDashboard, self)._create_powerlog_dict(

        # Using start time of the logger as the timestamp of powerlog dict.
        powerlog_dict['timestamp'] = self._logger.times[0]

        return powerlog_dict

    def _create_padded_domains(self):
        """Pad the domains name for dashboard to make the domain name better
        sorted in alphabetical order"""

    def _create_checkpoint_dict(self):
        """Create dictionary for checkpoint.
        start_time = self._logger.times[0]
        return self._logger._checkpoint_logger.convert_relative(start_time)

    def _convert(self):
        """Convert data from power_status.MeasurementLogger object to raw
        power measurement dictionary.

            raw measurement dictionary or None if no readings
        if len(self._logger.readings) == 0:
            logging.warn('No readings in logger ... ignoring')
            return None

        power_dict = collections.defaultdict(dict, {
            'sample_count': len(self._logger.readings) - 1,
            'sample_duration': 0,
            'average': dict(),
            'data': dict(),
        if power_dict['sample_count'] > 1:
            total_duration = self._logger.times[-1] - self._logger.times[0]
            power_dict['sample_duration'] = \
                    1.0 * total_duration / power_dict['sample_count']

        for i, domain_readings in enumerate(zip(*self._logger.readings)):
            if self._padded_domains:
                domain = self._padded_domains[i]
                domain = self._logger.domains[i]
            # Remove first item because that is the log before the test begin.
            power_dict['data'][domain] = domain_readings[1:]
            power_dict['average'][domain] = \
            if self._unit:
                power_dict['unit'][domain] = self._unit
            if self._type:
                power_dict['type'][domain] = self._type

        return power_dict

class PowerLoggerDashboard(MeasurementLoggerDashboard):
    """Dashboard class for power_status.PowerLogger.

    def __init__(self, logger, testname, resultsdir=None, uploadurl=None,
        if uploadurl is None:
            uploadurl = 'http://chrome-power.appspot.com/rapl'
        super(PowerLoggerDashboard, self).__init__(logger, testname, resultsdir,
                                                   uploadurl, note)
        self._unit = 'watt'
        self._type = 'power'

class TempLoggerDashboard(MeasurementLoggerDashboard):
    """Dashboard class for power_status.PowerLogger.

    def __init__(self, logger, testname, resultsdir=None, uploadurl=None,
        if uploadurl is None:
            uploadurl = 'http://chrome-power.appspot.com/rapl'
        super(TempLoggerDashboard, self).__init__(logger, testname, resultsdir,
                                                  uploadurl, note)
        self._unit = 'celsius'
        self._type = 'temperature'

class SimplePowerLoggerDashboard(ClientTestDashboard):
    """Dashboard class for simple system power measurement taken and publishing
    it to the dashboard.

    def __init__(self, duration_secs, power_watts, testname, start_ts,
                 resultsdir=None, uploadurl=None, note=''):

        if uploadurl is None:
            uploadurl = 'http://chrome-power.appspot.com/rapl'
        super(SimplePowerLoggerDashboard, self).__init__(
                None, testname, start_ts, resultsdir, uploadurl, note)

        self._unit = 'watt'
        self._type = 'power'
        self._duration_secs = duration_secs
        self._power_watts = power_watts
        self._testname = testname

    def _convert(self):
        """Convert vbat to raw power measurement dictionary.

            raw measurement dictionary
        power_dict = {
            'sample_count': 1,
            'sample_duration': self._duration_secs,
            'average': {'system': self._power_watts},
            'data': {'system': [self._power_watts]},
            'unit': {'system': self._unit},
            'type': {'system': self._type},
            'checkpoint': [[self._testname]],
        return power_dict

class CPUStatsLoggerDashboard(MeasurementLoggerDashboard):
    """Dashboard class for power_status.CPUStatsLogger.

    def __init__(self, logger, testname, resultsdir=None, uploadurl=None,
        if uploadurl is None:
            uploadurl = 'http://chrome-power.appspot.com/rapl'
        super(CPUStatsLoggerDashboard, self).__init__(
                logger, testname, resultsdir, uploadurl, note)

    def _split_domain(domain):
        """Return domain_type and domain_name for given domain.

        Example: Split ................... to ........... and .......
                       cpuidle_C1E-SKL        cpuidle         C1E-SKL
                       cpuidle_0_3_C0         cpuidle_0_3     C0
                       cpupkg_C0_C1           cpupkg          C0_C1
                       cpufreq_0_3_1512000    cpufreq_0_3     1512000

            domain: cpu stat domain name to split

            tuple of domain_type and domain_name
        # Regex explanation
        # .*?           matches type non-greedily                 (cpuidle)
        # (?:_\d+)*     matches cpu part, ?: makes it not a group (_0_1_2_3)
        # .*            matches name greedily                     (C0_C1)
        return re.match(r'(.*?(?:_\d+)*)_(.*)', domain).groups()

    def _convert(self):
        power_dict = super(CPUStatsLoggerDashboard, self)._convert()
        remove_rail = []
        for rail in power_dict['data']:
            if rail.startswith('wavg_cpu'):
                power_dict['type'][rail] = 'cpufreq_wavg'
                power_dict['unit'][rail] = 'kilohertz'
            elif rail.startswith('wavg_gpu'):
                power_dict['type'][rail] = 'gpufreq_wavg'
                power_dict['unit'][rail] = 'megahertz'
                # Remove all aggregate stats, only 'non-c0' and 'non-C0_C1' now
                if self._split_domain(rail)[1].startswith('non'):
                power_dict['type'][rail] = self._split_domain(rail)[0]
                power_dict['unit'][rail] = 'percent'
        for rail in remove_rail:
            del power_dict['data'][rail]
            del power_dict['average'][rail]
        return power_dict

    def _create_padded_domains(self):
        """Padded number in the domain name with dot to make it sorted

        cpuidle_C1-SKL, cpuidle_C1E-SKL, cpuidle_C2-SKL, cpuidle_C10-SKL
        will be changed to
        cpuidle_C.1-SKL, cpuidle_C.1E-SKL, cpuidle_C.2-SKL, cpuidle_C10-SKL
        which make it in alphabetically order.
        longest = collections.defaultdict(int)
        searcher = re.compile(r'\d+')
        number_strs = []
        splitted_domains = \
                [self._split_domain(domain) for domain in self._logger.domains]
        for domain_type, domain_name in splitted_domains:
            result = searcher.search(domain_name)
            if not result:
            number_str = result.group(0)
            longest[domain_type] = max(longest[domain_type], len(number_str))

        self._padded_domains = []
        for i in range(len(self._logger.domains)):
            if not number_strs[i]:

            domain_type, domain_name = splitted_domains[i]
            formatter_component = '{:.>%ds}' % longest[domain_type]

            # Change "cpuidle_C1E-SKL" to "cpuidle_C{:.>2s}E-SKL"
            formatter_str = domain_type + '_' + \
                    searcher.sub(formatter_component, domain_name, count=1)

            # Run "cpuidle_C{:_>2s}E-SKL".format("1") to get "cpuidle_C.1E-SKL"