# Copyright (c) 2017 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. import collections import json import logging import numpy import operator import os import re import time import urllib import urllib2 from autotest_lib.client.bin import utils from autotest_lib.client.common_lib import error from autotest_lib.client.common_lib import lsbrelease_utils from autotest_lib.client.common_lib.cros import retry from autotest_lib.client.cros.power import power_status from autotest_lib.client.cros.power import power_utils _HTML_CHART_STR = ''' <!DOCTYPE html> <html> <head> <script type="text/javascript" src="https://www.gstatic.com/charts/loader.js"> </script> <script type="text/javascript"> google.charts.load('current', {{'packages':['corechart']}}); google.charts.setOnLoadCallback(drawChart); function drawChart() {{ var data = google.visualization.arrayToDataTable([ {data} ]); var unit = '{unit}'; var options = {{ width: 1600, height: 1200, lineWidth: 1, legend: {{ position: 'top', maxLines: 3 }}, vAxis: {{ viewWindow: {{min: 0}}, title: '{type} ({unit})' }}, hAxis: {{ viewWindow: {{min: 0}}, title: 'time (second)' }}, }}; var element = document.getElementById('{type}'); var chart; if (unit == 'percent') {{ options['isStacked'] = true; chart = new google.visualization.SteppedAreaChart(element); }} else {{ chart = new google.visualization.LineChart(element); }} chart.draw(data, options); }} </script> </head> <body> <div id="{type}"></div> </body> </html> ''' class BaseDashboard(object): """Base class that implements method for prepare and upload data to power dashboard. """ def __init__(self, logger, testname, start_ts=None, resultsdir=None, uploadurl=None): """Create BaseDashboard objects. Args: logger: object that store the log. This will get convert to dictionary by self._convert() testname: name of current test start_ts: timestamp of when test started in seconds since epoch resultsdir: directory to save the power json uploadurl: url to upload power data """ self._logger = logger self._testname = testname self._start_ts = start_ts if start_ts else time.time() self._resultsdir = resultsdir self._uploadurl = uploadurl def _create_powerlog_dict(self, raw_measurement): """Create powerlog dictionary from raw measurement data Data format in go/power-dashboard-data. Args: raw_measurement: dictionary contains raw measurement data Returns: A dictionary of powerlog """ powerlog_dict = { 'format_version': 5, 'timestamp': self._start_ts, 'test': self._testname, 'dut': self._create_dut_info_dict(raw_measurement['data'].keys()), 'power': raw_measurement, } return powerlog_dict def _create_dut_info_dict(self, power_rails): """Create a dictionary that contain information of the DUT. MUST be implemented in subclass. Args: power_rails: list of measured power rails Returns: DUT info dictionary """ raise NotImplementedError def _save_json(self, powerlog_dict, resultsdir, filename='power_log.json'): """Convert powerlog dict to human readable formatted JSON and append to <resultsdir>/<filename>. Args: powerlog_dict: dictionary of power data resultsdir: directory to save formatted JSON object filename: filename to append to """ if not os.path.exists(resultsdir): raise error.TestError('resultsdir %s does not exist.' % resultsdir) filename = os.path.join(resultsdir, filename) json_str = json.dumps(powerlog_dict, indent=4, separators=(',', ': '), ensure_ascii=False) json_str = utils.strip_non_printable(json_str) with file(filename, 'a') as f: f.write(json_str) def _save_html(self, powerlog_dict, resultsdir, filename='power_log.html'): """Convert powerlog dict to chart in HTML page and append to <resultsdir>/<filename>. Note that this results in multiple HTML objects in one file but Chrome can render all of it in one page. Args: powerlog_dict: dictionary of power data resultsdir: directory to save HTML page filename: filename to append to """ # Create dict from type to sorted list of rail names. rail_type = collections.defaultdict(list) for r, t in powerlog_dict['power']['type'].iteritems(): rail_type[t].append(r) for t in rail_type: rail_type[t] = sorted(rail_type[t]) html_str = '' row_indent = ' ' * 12 for t in rail_type: data_str_list = [] # Generate rail name data string. header = ['time'] + rail_type[t] header_str = row_indent + "['" + "', '".join(header) + "']" data_str_list.append(header_str) # Generate measurements data string. for i in range(powerlog_dict['power']['sample_count']): row = [str(i * powerlog_dict['power']['sample_duration'])] for r in rail_type[t]: row.append(str(powerlog_dict['power']['data'][r][i])) row_str = row_indent + '[' + ', '.join(row) + ']' data_str_list.append(row_str) data_str = ',\n'.join(data_str_list) unit = powerlog_dict['power']['unit'][rail_type[t][0]] html_str += _HTML_CHART_STR.format(data=data_str, unit=unit, type=t) if not os.path.exists(resultsdir): raise error.TestError('resultsdir %s does not exist.' % resultsdir) filename = os.path.join(resultsdir, filename) with file(filename, 'a') as f: f.write(html_str) def _upload(self, powerlog_dict, uploadurl): """Convert powerlog dict to minimal size JSON and upload to dashboard. Args: powerlog_dict: dictionary of power data uploadurl: url to upload the power data """ json_str = json.dumps(powerlog_dict, ensure_ascii=False) data_obj = {'data': utils.strip_non_printable(json_str)} encoded = urllib.urlencode(data_obj) req = urllib2.Request(uploadurl, encoded) @retry.retry(urllib2.URLError, blacklist=[urllib2.HTTPError], timeout_min=5.0, delay_sec=1, backoff=2) def _do_upload(): urllib2.urlopen(req) _do_upload() def _create_checkpoint_dict(self): """Create dictionary for checkpoint. @returns a dictionary of tags to their corresponding intervals in the following format: { tag1: [(start1, end1), (start2, end2), ...], tag2: [(start3, end3), (start4, end4), ...], ... } """ raise NotImplementedError def _tag_with_checkpoint(self, power_dict): """Tag power_dict with checkpoint data. This function translates the checkpoint intervals into a list of tags for each data point. @param power_dict: a dictionary with power data; assume this dictionary has attributes 'sample_count' and 'sample_duration'. """ checkpoint_dict = self._create_checkpoint_dict() # Create list of check point event tuple. # Tuple format: (checkpoint_name:str, event_time:float, is_start:bool) checkpoint_event_list = [] for name, intervals in checkpoint_dict.iteritems(): for start, finish in intervals: checkpoint_event_list.append((name, start, True)) checkpoint_event_list.append((name, finish, False)) checkpoint_event_list = sorted(checkpoint_event_list, key=operator.itemgetter(1)) # Add dummy check point at 1e9 seconds. checkpoint_event_list.append(('dummy', 1e9, True)) interval_set = set() event_index = 0 checkpoint_list = [] for i in range(power_dict['sample_count']): curr_time = i * power_dict['sample_duration'] # Process every checkpoint event until current point of time while checkpoint_event_list[event_index][1] <= curr_time: name, _, is_start = checkpoint_event_list[event_index] if is_start: interval_set.add(name) else: interval_set.discard(name) event_index += 1 checkpoint_list.append(list(interval_set)) power_dict['checkpoint'] = checkpoint_list def _convert(self): """Convert data from self._logger object to raw power measurement dictionary. MUST be implemented in subclass. Return: raw measurement dictionary """ raise NotImplementedError def upload(self): """Upload powerlog to dashboard and save data to results directory. """ raw_measurement = self._convert() if raw_measurement is None: return powerlog_dict = self._create_powerlog_dict(raw_measurement) if self._resultsdir is not None: self._save_json(powerlog_dict, self._resultsdir) self._save_html(powerlog_dict, self._resultsdir) if self._uploadurl is not None: self._upload(powerlog_dict, self._uploadurl) class ClientTestDashboard(BaseDashboard): """Dashboard class for autotests that run on client side. """ def __init__(self, logger, testname, start_ts=None, resultsdir=None, uploadurl=None, note=''): """Create BaseDashboard objects. Args: logger: object that store the log. This will get convert to dictionary by self._convert() testname: name of current test start_ts: timestamp of when test started in seconds since epoch resultsdir: directory to save the power json uploadurl: url to upload power data note: note for current test run """ super(ClientTestDashboard, self).__init__(logger, testname, start_ts, resultsdir, uploadurl) self._note = note def _create_dut_info_dict(self, power_rails): """Create a dictionary that contain information of the DUT. Args: power_rails: list of measured power rails Returns: DUT info dictionary """ board = utils.get_board() platform = utils.get_platform() if not platform.startswith(board): board += '_' + platform if power_utils.has_hammer(): board += '_hammer' dut_info_dict = { 'board': board, 'version': { 'hw': utils.get_hardware_revision(), 'milestone': lsbrelease_utils.get_chromeos_release_milestone(), 'os': lsbrelease_utils.get_chromeos_release_version(), 'channel': lsbrelease_utils.get_chromeos_channel(), 'firmware': utils.get_firmware_version(), 'ec': utils.get_ec_version(), 'kernel': utils.get_kernel_version(), }, 'sku': { 'cpu': utils.get_cpu_name(), 'memory_size': utils.get_mem_total_gb(), 'storage_size': utils.get_disk_size_gb(utils.get_root_device()), 'display_resolution': utils.get_screen_resolution(), }, 'ina': { 'version': 0, 'ina': power_rails, }, 'note': self._note, } if power_utils.has_battery(): status = power_status.get_status() if status.battery: # Round the battery size to nearest tenth because it is # fluctuated for platform without battery nominal voltage data. dut_info_dict['sku']['battery_size'] = round( status.battery[0].energy_full_design, 1) dut_info_dict['sku']['battery_shutdown_percent'] = \ power_utils.get_low_battery_shutdown_percent() return dut_info_dict class MeasurementLoggerDashboard(ClientTestDashboard): """Dashboard class for power_status.MeasurementLogger. """ def __init__(self, logger, testname, resultsdir=None, uploadurl=None, note=''): super(MeasurementLoggerDashboard, self).__init__(logger, testname, None, resultsdir, uploadurl, note) self._unit = None self._type = None self._padded_domains = None def _create_powerlog_dict(self, raw_measurement): """Create powerlog dictionary from raw measurement data Data format in go/power-dashboard-data. Args: raw_measurement: dictionary contains raw measurement data Returns: A dictionary of powerlog """ powerlog_dict = \ super(MeasurementLoggerDashboard, self)._create_powerlog_dict( raw_measurement) # Using start time of the logger as the timestamp of powerlog dict. powerlog_dict['timestamp'] = self._logger.times[0] return powerlog_dict def _create_padded_domains(self): """Pad the domains name for dashboard to make the domain name better sorted in alphabetical order""" pass def _create_checkpoint_dict(self): """Create dictionary for checkpoint. """ start_time = self._logger.times[0] return self._logger._checkpoint_logger.convert_relative(start_time) def _convert(self): """Convert data from power_status.MeasurementLogger object to raw power measurement dictionary. Return: raw measurement dictionary or None if no readings """ if len(self._logger.readings) == 0: logging.warn('No readings in logger ... ignoring') return None power_dict = collections.defaultdict(dict, { 'sample_count': len(self._logger.readings) - 1, 'sample_duration': 0, 'average': dict(), 'data': dict(), }) if power_dict['sample_count'] > 1: total_duration = self._logger.times[-1] - self._logger.times[0] power_dict['sample_duration'] = \ 1.0 * total_duration / power_dict['sample_count'] self._create_padded_domains() for i, domain_readings in enumerate(zip(*self._logger.readings)): if self._padded_domains: domain = self._padded_domains[i] else: domain = self._logger.domains[i] # Remove first item because that is the log before the test begin. power_dict['data'][domain] = domain_readings[1:] power_dict['average'][domain] = \ numpy.average(power_dict['data'][domain]) if self._unit: power_dict['unit'][domain] = self._unit if self._type: power_dict['type'][domain] = self._type self._tag_with_checkpoint(power_dict) return power_dict class PowerLoggerDashboard(MeasurementLoggerDashboard): """Dashboard class for power_status.PowerLogger. """ def __init__(self, logger, testname, resultsdir=None, uploadurl=None, note=''): if uploadurl is None: uploadurl = 'http://chrome-power.appspot.com/rapl' super(PowerLoggerDashboard, self).__init__(logger, testname, resultsdir, uploadurl, note) self._unit = 'watt' self._type = 'power' class TempLoggerDashboard(MeasurementLoggerDashboard): """Dashboard class for power_status.PowerLogger. """ def __init__(self, logger, testname, resultsdir=None, uploadurl=None, note=''): if uploadurl is None: uploadurl = 'http://chrome-power.appspot.com/rapl' super(TempLoggerDashboard, self).__init__(logger, testname, resultsdir, uploadurl, note) self._unit = 'celsius' self._type = 'temperature' class SimplePowerLoggerDashboard(ClientTestDashboard): """Dashboard class for simple system power measurement taken and publishing it to the dashboard. """ def __init__(self, duration_secs, power_watts, testname, start_ts, resultsdir=None, uploadurl=None, note=''): if uploadurl is None: uploadurl = 'http://chrome-power.appspot.com/rapl' super(SimplePowerLoggerDashboard, self).__init__( None, testname, start_ts, resultsdir, uploadurl, note) self._unit = 'watt' self._type = 'power' self._duration_secs = duration_secs self._power_watts = power_watts self._testname = testname def _convert(self): """Convert vbat to raw power measurement dictionary. Return: raw measurement dictionary """ power_dict = { 'sample_count': 1, 'sample_duration': self._duration_secs, 'average': {'system': self._power_watts}, 'data': {'system': [self._power_watts]}, 'unit': {'system': self._unit}, 'type': {'system': self._type}, 'checkpoint': [[self._testname]], } return power_dict class CPUStatsLoggerDashboard(MeasurementLoggerDashboard): """Dashboard class for power_status.CPUStatsLogger. """ def __init__(self, logger, testname, resultsdir=None, uploadurl=None, note=''): if uploadurl is None: uploadurl = 'http://chrome-power.appspot.com/rapl' super(CPUStatsLoggerDashboard, self).__init__( logger, testname, resultsdir, uploadurl, note) @staticmethod def _split_domain(domain): """Return domain_type and domain_name for given domain. Example: Split ................... to ........... and ....... cpuidle_C1E-SKL cpuidle C1E-SKL cpuidle_0_3_C0 cpuidle_0_3 C0 cpupkg_C0_C1 cpupkg C0_C1 cpufreq_0_3_1512000 cpufreq_0_3 1512000 Args: domain: cpu stat domain name to split Return: tuple of domain_type and domain_name """ # Regex explanation # .*? matches type non-greedily (cpuidle) # (?:_\d+)* matches cpu part, ?: makes it not a group (_0_1_2_3) # .* matches name greedily (C0_C1) return re.match(r'(.*?(?:_\d+)*)_(.*)', domain).groups() def _convert(self): power_dict = super(CPUStatsLoggerDashboard, self)._convert() remove_rail = [] for rail in power_dict['data']: if rail.startswith('wavg_cpu'): power_dict['type'][rail] = 'cpufreq_wavg' power_dict['unit'][rail] = 'kilohertz' elif rail.startswith('wavg_gpu'): power_dict['type'][rail] = 'gpufreq_wavg' power_dict['unit'][rail] = 'megahertz' else: # Remove all aggregate stats, only 'non-c0' and 'non-C0_C1' now if self._split_domain(rail)[1].startswith('non'): remove_rail.append(rail) continue power_dict['type'][rail] = self._split_domain(rail)[0] power_dict['unit'][rail] = 'percent' for rail in remove_rail: del power_dict['data'][rail] del power_dict['average'][rail] return power_dict def _create_padded_domains(self): """Padded number in the domain name with dot to make it sorted alphabetically. Example: cpuidle_C1-SKL, cpuidle_C1E-SKL, cpuidle_C2-SKL, cpuidle_C10-SKL will be changed to cpuidle_C.1-SKL, cpuidle_C.1E-SKL, cpuidle_C.2-SKL, cpuidle_C10-SKL which make it in alphabetically order. """ longest = collections.defaultdict(int) searcher = re.compile(r'\d+') number_strs = [] splitted_domains = \ [self._split_domain(domain) for domain in self._logger.domains] for domain_type, domain_name in splitted_domains: result = searcher.search(domain_name) if not result: number_strs.append('') continue number_str = result.group(0) number_strs.append(number_str) longest[domain_type] = max(longest[domain_type], len(number_str)) self._padded_domains = [] for i in range(len(self._logger.domains)): if not number_strs[i]: self._padded_domains.append(self._logger.domains[i]) continue domain_type, domain_name = splitted_domains[i] formatter_component = '{:.>%ds}' % longest[domain_type] # Change "cpuidle_C1E-SKL" to "cpuidle_C{:.>2s}E-SKL" formatter_str = domain_type + '_' + \ searcher.sub(formatter_component, domain_name, count=1) # Run "cpuidle_C{:_>2s}E-SKL".format("1") to get "cpuidle_C.1E-SKL" self._padded_domains.append(formatter_str.format(number_strs[i]))