# SPDX-License-Identifier: Apache-2.0 # # Copyright (C) 2016, ARM Limited and contributors. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # from math import isnan import numpy as np import pandas as pd from bart.common.Utils import area_under_curve from energy_model import EnergyModel, EnergyModelCapacityError from perf_analysis import PerfAnalysis from test import LisaTest, experiment_test from trace import Trace from unittest import SkipTest WORKLOAD_PERIOD_MS = 16 SET_IS_BIG_LITTLE = True SET_INITIAL_TASK_UTIL = True class _EnergyModelTest(LisaTest): """ "Abstract" base class for generic EAS tests using the EnergyModel class Subclasses should provide a .workloads member to populate the 'wloads' field of the experiments_conf for the Executor. A set of helper methods are provided for making assertions about behaviour, most importantly the _test* methods which make assertions in a generic way. """ test_conf = { "ftrace" : { "events" : [ "sched_overutilized", "sched_energy_diff", "sched_load_avg_task", "sched_load_avg_cpu", "sched_migrate_task", "sched_switch", "cpu_frequency", "cpu_idle", "cpu_capacity", ], }, "modules": ["cgroups"], } negative_slack_allowed_pct = 15 """Percentage of RT-App task activations with negative slack allowed""" energy_est_threshold_pct = 5 """ Allowed margin for error in estimated energy cost for task placement, compared to optimal placment. """ @classmethod def setUpClass(cls, *args, **kwargs): super(_EnergyModelTest, cls).runExperiments(*args, **kwargs) @classmethod def _getExperimentsConf(cls, test_env): if not test_env.nrg_model: try: test_env.nrg_model = EnergyModel.from_target(test_env.target) except Exception as e: raise SkipTest( 'This test requires an EnergyModel for the platform. ' 'Either provide one manually or ensure it can be read ' 'from the filesystem: {}'.format(e)) conf = { 'tag' : 'energy_aware', 'flags' : ['ftrace', 'freeze_userspace'], 'sched_features' : 'ENERGY_AWARE', } if 'cpufreq' in test_env.target.modules: available_govs = test_env.target.cpufreq.list_governors(0) if 'schedutil' in available_govs: conf['cpufreq'] = {'governor' : 'schedutil'} elif 'sched' in available_govs: conf['cpufreq'] = {'governor' : 'sched'} return { 'wloads' : cls.workloads, 'confs' : [conf], } @classmethod def _experimentsInit(cls, *args, **kwargs): super(_EnergyModelTest, cls)._experimentsInit(*args, **kwargs) if SET_IS_BIG_LITTLE: # This flag doesn't exist on mainline-integration kernels, so # don't worry if the file isn't present (hence verify=False) cls.target.write_value( "/proc/sys/kernel/sched_is_big_little", 1, verify=False) if SET_INITIAL_TASK_UTIL: # This flag doesn't exist on all kernels, so don't worry if the file # isn't present (hence verify=False) cls.target.write_value( "/proc/sys/kernel/sched_initial_task_util", 1024, verify=False) def get_task_utils_df(self, experiment): """ Get a DataFrame with the *expected* utilization of each task over time :param experiment: The :class:Experiment to examine :returns: A Pandas DataFrame with a column for each task, showing how the utilization of that task varies over time """ util_scale = self.te.nrg_model.capacity_scale transitions = {} def add_transition(time, task, util): if time not in transitions: transitions[time] = {task: util} else: transitions[time][task] = util # First we'll build a dict D {time: {task_name: util}} where D[t][n] is # the expected utilization of task n from time t. for task, params in experiment.wload.params['profile'].iteritems(): time = self.get_start_time(experiment) + params['delay'] add_transition(time, task, 0) for _ in range(params.get('loops', 1)): for phase in params['phases']: util = (phase.duty_cycle_pct * util_scale / 100.) add_transition(time, task, util) time += phase.duration_s add_transition(time, task, 0) index = sorted(transitions.keys()) df = pd.DataFrame([transitions[k] for k in index], index=index) return df.fillna(method='ffill') def get_task_cpu_df(self, experiment): """ Get a DataFrame mapping task names to the CPU they ran on Use the sched_switch trace event to find which CPU each task ran on. Does not reflect idleness - tasks not running are shown as running on the last CPU they woke on. :param experiment: The :class:Experiment to examine :returns: A Pandas DataFrame with a column for each task, showing the CPU that the task was "on" at each moment in time """ tasks = experiment.wload.tasks.keys() trace = self.get_trace(experiment) df = trace.ftrace.sched_switch.data_frame[['next_comm', '__cpu']] df = df[df['next_comm'].isin(tasks)] df = df.pivot(index=df.index, columns='next_comm').fillna(method='ffill') cpu_df = df['__cpu'] # Drop consecutive duplicates cpu_df = cpu_df[(cpu_df.shift(+1) != cpu_df).any(axis=1)] return cpu_df def _sort_power_df_columns(self, df): """ Helper method to re-order the columns of a power DataFrame This has no significance for code, but when examining DataFrames by hand they are easier to understand if the columns are in a logical order. """ node_cpus = [node.cpus for node in self.te.nrg_model.root.iter_nodes()] return pd.DataFrame(df, columns=[c for c in node_cpus if c in df]) def get_power_df(self, experiment): """ Considering only the task placement, estimate power usage over time Examine a trace and use :meth:EnergyModel.estimate_from_cpu_util to get a DataFrame showing the estimated power usage over time. This assumes perfect cpuidle and cpufreq behaviour. :param experiment: The :class:Experiment to examine :returns: A Pandas DataFrame with a column node in the energy model (keyed with a tuple of the CPUs contained by that node) Shows the estimated power over time. """ task_cpu_df = self.get_task_cpu_df(experiment) task_utils_df = self.get_task_utils_df(experiment) tasks = experiment.wload.tasks.keys() # Create a combined DataFrame with the utilization of a task and the CPU # it was running on at each moment. Looks like: # utils cpus # task_wmig0 task_wmig1 task_wmig0 task_wmig1 # 2.375056 102.4 102.4 NaN NaN # 2.375105 102.4 102.4 2.0 NaN df = pd.concat([task_utils_df, task_cpu_df], axis=1, keys=['utils', 'cpus']) df = df.sort_index().fillna(method='ffill') nrg_model = self.executor.te.nrg_model # Now make a DataFrame with the estimated power at each moment. def est_power(row): cpu_utils = [0 for cpu in nrg_model.cpus] for task in tasks: cpu = row['cpus'][task] util = row['utils'][task] if not isnan(cpu): cpu_utils[int(cpu)] += util power = nrg_model.estimate_from_cpu_util(cpu_utils) columns = power.keys() return pd.Series([power[c] for c in columns], index=columns) return self._sort_power_df_columns(df.apply(est_power, axis=1)) def get_expected_power_df(self, experiment): """ Estimate *optimal* power usage over time Examine a trace and use :meth:get_optimal_placements and :meth:EnergyModel.estimate_from_cpu_util to get a DataFrame showing the estimated power usage over time under ideal EAS behaviour. :param experiment: The :class:Experiment to examine :returns: A Pandas DataFrame with a column each node in the energy model (keyed with a tuple of the CPUs contained by that node) and a "power" column with the sum of other columns. Shows the estimated *optimal* power over time. """ task_utils_df = self.get_task_utils_df(experiment) nrg_model = self.te.nrg_model def exp_power(row): task_utils = row.to_dict() expected_utils = nrg_model.get_optimal_placements(task_utils) power = nrg_model.estimate_from_cpu_util(expected_utils[0]) columns = power.keys() return pd.Series([power[c] for c in columns], index=columns) return self._sort_power_df_columns( task_utils_df.apply(exp_power, axis=1)) def _test_slack(self, experiment, tasks): """ Assert that the RTApp workload was given enough performance Use :class:PerfAnalysis to find instances where the experiment's RT-App workload wasn't able to complete its activations (i.e. its reported "slack" was negative). Assert that this happened less that ``negative_slack_allowed_pct`` percent of the time. :meth:_test_task_placement asserts that estimated energy usage was low. That will pass for runs where too *little* energy was used, compromising performance. This method provides a separate test to counteract that problem. """ pa = PerfAnalysis(experiment.out_dir) for task in tasks: slack = pa.df(task)["Slack"] bad_activations_pct = len(slack[slack < 0]) * 100. / len(slack) if bad_activations_pct > self.negative_slack_allowed_pct: raise AssertionError("task {} missed {}% of activations".format( task, bad_activations_pct)) def _test_task_placement(self, experiment, tasks): """ Test that task placement was energy-efficient Use :meth:get_expected_power_df and :meth:get_power_df to estimate optimal and observed power usage for task placements of the experiment's workload. Assert that the observed power does not exceed the optimal power by more than 20%. """ exp_power = self.get_expected_power_df(experiment) est_power = self.get_power_df(experiment) exp_energy = area_under_curve(exp_power.sum(axis=1), method='rect') est_energy = area_under_curve(est_power.sum(axis=1), method='rect') msg = 'Estimated {} bogo-Joules to run workload, expected {}'.format( est_energy, exp_energy) threshold = exp_energy * (1 + (self.energy_est_threshold_pct / 100.)) self.assertLess(est_energy, threshold, msg=msg) class OneSmallTask(_EnergyModelTest): """ Test EAS for a single 20% task over 2 seconds """ workloads = { 'one_small' : { 'type' : 'rt-app', 'conf' : { 'class' : 'periodic', 'params' : { 'duty_cycle_pct': 20, 'duration_s': 2, 'period_ms': WORKLOAD_PERIOD_MS, }, 'tasks' : 1, 'prefix' : 'many', }, }, } @experiment_test def test_slack(self, experiment, tasks): self._test_slack(experiment, tasks) @experiment_test def test_task_placement(self, experiment, tasks): self._test_task_placement(experiment, tasks) class ThreeSmallTasks(_EnergyModelTest): """ Test EAS for 3 20% tasks over 2 seconds """ # The energy estimation for this test is probably not very accurate and this # isn't a very realistic workload. It doesn't really matter if we pick an # "ideal" task placement for this workload, we just want to avoid using big # CPUs in a big.LITTLE system. So use a larger energy threshold that # hopefully prevents too much use of big CPUs but otherwise is flexible in # allocation of LITTLEs. energy_est_threshold_pct = 20 workloads = { 'three_small' : { 'type' : 'rt-app', 'conf' : { 'class' : 'periodic', 'params' : { 'duty_cycle_pct': 20, 'duration_s': 2, 'period_ms': WORKLOAD_PERIOD_MS, }, 'tasks' : 3, 'prefix' : 'many', }, }, } @experiment_test def test_slack(self, experiment, tasks): self._test_slack(experiment, tasks) @experiment_test def test_task_placement(self, experiment, tasks): self._test_task_placement(experiment, tasks) class TwoBigTasks(_EnergyModelTest): """ Test EAS for 2 80% tasks over 2 seconds """ workloads = { 'two_big' : { 'type' : 'rt-app', 'conf' : { 'class' : 'periodic', 'params' : { 'duty_cycle_pct': 80, 'duration_s': 2, 'period_ms': WORKLOAD_PERIOD_MS, }, 'tasks' : 2, 'prefix' : 'many', }, }, } @experiment_test def test_slack(self, experiment, tasks): self._test_slack(experiment, tasks) @experiment_test def test_task_placement(self, experiment, tasks): self._test_task_placement(experiment, tasks) class TwoBigThreeSmall(_EnergyModelTest): """ Test EAS for 2 70% tasks and 3 10% tasks over 2 seconds """ workloads = { 'two_big_three_small' : { 'type' : 'rt-app', 'conf' : { 'class' : 'profile', 'params' : { 'large' : { 'kind' : 'Periodic', 'params' : { 'duty_cycle_pct': 70, 'duration_s': 2, 'period_ms': WORKLOAD_PERIOD_MS, }, 'tasks' : 2, }, 'small' : { 'kind' : 'Periodic', 'params' : { 'duty_cycle_pct': 10, 'duration_s': 2, 'period_ms': WORKLOAD_PERIOD_MS, }, 'tasks' : 3, }, }, }, }, } @experiment_test def test_slack(self, experiment, tasks): self._test_slack(experiment, tasks) @experiment_test def test_task_placement(self, experiment, tasks): self._test_task_placement(experiment, tasks) class RampUp(_EnergyModelTest): """ Test EAS for a task ramping from 5% up to 70% over 2 seconds """ workloads = { "ramp_up" : { "type": "rt-app", "conf" : { "class" : "profile", "params" : { "r5_10-60" : { "kind" : "Ramp", "params" : { 'period_ms': WORKLOAD_PERIOD_MS, "start_pct" : 5, "end_pct" : 70, "delta_pct" : 5, "time_s" : 2, }, }, }, }, }, } @experiment_test def test_slack(self, experiment, tasks): self._test_slack(experiment, tasks) @experiment_test def test_task_placement(self, experiment, tasks): self._test_task_placement(experiment, tasks) class RampDown(_EnergyModelTest): """ Test EAS for a task ramping from 70% down to 5% over 2 seconds """ # The main purpose of this test is to ensure that as it reduces in load, a # task is migrated from big to LITTLE CPUs on a big.LITTLE system. # This migration naturally happens some time _after_ it could possibly be # done, since there must be some hysteresis to avoid a performance cost. # Therefore allow a larger energy usage threshold energy_est_threshold_pct = 15 workloads = { "ramp_down" : { "type": "rt-app", "conf" : { "class" : "profile", "params" : { "r5_10-60" : { "kind" : "Ramp", "params" : { 'period_ms': WORKLOAD_PERIOD_MS, "start_pct" : 70, "end_pct" : 5, "delta_pct" : 5, "time_s" : 2, }, }, }, }, }, } @experiment_test def test_slack(self, experiment, tasks): self._test_slack(experiment, tasks) @experiment_test def test_task_placement(self, experiment, tasks): self._test_task_placement(experiment, tasks) class EnergyModelWakeMigration(_EnergyModelTest): """ Test EAS for tasks alternating beetween 10% and 50% """ workloads = { 'em_wake_migration' : { 'type' : 'rt-app', 'conf' : { 'class' : 'profile', 'params' : { 'wmig' : { 'kind' : 'Step', 'params' : { "period_ms" : WORKLOAD_PERIOD_MS, 'start_pct': 10, 'end_pct': 50, 'time_s': 2, 'loops': 2 }, # Create one task for each big cpu 'tasks' : 'big', }, }, }, }, } @experiment_test def test_slack(self, experiment, tasks): self._test_slack(experiment, tasks) @experiment_test def test_task_placement(self, experiment, tasks): self._test_task_placement(experiment, tasks)