# SPDX-License-Identifier: Apache-2.0
#
# Copyright (C) 2016, ARM Limited and contributors.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from math import isnan

import numpy as np
import pandas as pd

from bart.common.Utils import area_under_curve

from energy_model import EnergyModel, EnergyModelCapacityError
from perf_analysis import PerfAnalysis
from test import LisaTest, experiment_test
from trace import Trace
from unittest import SkipTest


WORKLOAD_PERIOD_MS =  16
SET_IS_BIG_LITTLE = True
SET_INITIAL_TASK_UTIL = True

class _EnergyModelTest(LisaTest):
    """
    "Abstract" base class for generic EAS tests using the EnergyModel class

    Subclasses should provide a .workloads member to populate the 'wloads' field
    of the experiments_conf for the Executor. A set of helper methods are
    provided for making assertions about behaviour, most importantly the _test*
    methods which make assertions in a generic way.
    """

    test_conf = {
        "ftrace" : {
            "events" : [
                "sched_overutilized",
                "sched_energy_diff",
                "sched_load_avg_task",
                "sched_load_avg_cpu",
                "sched_migrate_task",
                "sched_switch",
                "cpu_frequency",
                "cpu_idle",
                "cpu_capacity",
            ],
        },
        "modules": ["cgroups"],
    }

    negative_slack_allowed_pct = 15
    """Percentage of RT-App task activations with negative slack allowed"""

    energy_est_threshold_pct = 5
    """
    Allowed margin for error in estimated energy cost for task placement,
    compared to optimal placment.
    """

    @classmethod
    def setUpClass(cls, *args, **kwargs):
        super(_EnergyModelTest, cls).runExperiments(*args, **kwargs)

    @classmethod
    def _getExperimentsConf(cls, test_env):
        if not test_env.nrg_model:
            try:
                test_env.nrg_model = EnergyModel.from_target(test_env.target)
            except Exception as e:
                raise SkipTest(
                    'This test requires an EnergyModel for the platform. '
                    'Either provide one manually or ensure it can be read '
                    'from the filesystem: {}'.format(e))

        conf = {
            'tag' : 'energy_aware',
            'flags' : ['ftrace', 'freeze_userspace'],
            'sched_features' : 'ENERGY_AWARE',
        }

        if 'cpufreq' in test_env.target.modules:
            available_govs = test_env.target.cpufreq.list_governors(0)
            if 'schedutil' in available_govs:
                conf['cpufreq'] = {'governor' : 'schedutil'}
            elif 'sched' in available_govs:
                conf['cpufreq'] = {'governor' : 'sched'}

        return {
            'wloads' : cls.workloads,
            'confs' : [conf],
        }

    @classmethod
    def _experimentsInit(cls, *args, **kwargs):
        super(_EnergyModelTest, cls)._experimentsInit(*args, **kwargs)

        if SET_IS_BIG_LITTLE:
            # This flag doesn't exist on mainline-integration kernels, so
            # don't worry if the file isn't present (hence verify=False)
            cls.target.write_value(
                "/proc/sys/kernel/sched_is_big_little", 1, verify=False)

        if SET_INITIAL_TASK_UTIL:
            # This flag doesn't exist on all kernels, so don't worry if the file
            # isn't present (hence verify=False)
            cls.target.write_value(
                "/proc/sys/kernel/sched_initial_task_util", 1024, verify=False)


    def get_task_utils_df(self, experiment):
        """
        Get a DataFrame with the *expected* utilization of each task over time

        :param experiment: The :class:Experiment to examine
        :returns: A Pandas DataFrame with a column for each task, showing how
                  the utilization of that task varies over time
        """
        util_scale = self.te.nrg_model.capacity_scale

        transitions = {}
        def add_transition(time, task, util):
            if time not in transitions:
                transitions[time] = {task: util}
            else:
                transitions[time][task] = util

        # First we'll build a dict D {time: {task_name: util}} where D[t][n] is
        # the expected utilization of task n from time t.
        for task, params in experiment.wload.params['profile'].iteritems():
            time = self.get_start_time(experiment) + params['delay']
            add_transition(time, task, 0)
            for _ in range(params.get('loops', 1)):
                for phase in params['phases']:
                    util = (phase.duty_cycle_pct * util_scale / 100.)
                    add_transition(time, task, util)
                    time += phase.duration_s
            add_transition(time, task, 0)

        index = sorted(transitions.keys())
        df = pd.DataFrame([transitions[k] for k in index], index=index)
        return df.fillna(method='ffill')

    def get_task_cpu_df(self, experiment):
        """
        Get a DataFrame mapping task names to the CPU they ran on

        Use the sched_switch trace event to find which CPU each task ran
        on. Does not reflect idleness - tasks not running are shown as running
        on the last CPU they woke on.

        :param experiment: The :class:Experiment to examine
        :returns: A Pandas DataFrame with a column for each task, showing the
                  CPU that the task was "on" at each moment in time
        """
        tasks = experiment.wload.tasks.keys()
        trace = self.get_trace(experiment)

        df = trace.ftrace.sched_switch.data_frame[['next_comm', '__cpu']]
        df = df[df['next_comm'].isin(tasks)]
        df = df.pivot(index=df.index, columns='next_comm').fillna(method='ffill')
        cpu_df = df['__cpu']
        # Drop consecutive duplicates
        cpu_df = cpu_df[(cpu_df.shift(+1) != cpu_df).any(axis=1)]
        return cpu_df

    def _sort_power_df_columns(self, df):
        """
        Helper method to re-order the columns of a power DataFrame

        This has no significance for code, but when examining DataFrames by hand
        they are easier to understand if the columns are in a logical order.
        """
        node_cpus = [node.cpus for node in self.te.nrg_model.root.iter_nodes()]
        return pd.DataFrame(df, columns=[c for c in node_cpus if c in df])

    def get_power_df(self, experiment):
        """
        Considering only the task placement, estimate power usage over time

        Examine a trace and use :meth:EnergyModel.estimate_from_cpu_util to get
        a DataFrame showing the estimated power usage over time. This assumes
        perfect cpuidle and cpufreq behaviour.

        :param experiment: The :class:Experiment to examine
        :returns: A Pandas DataFrame with a column node in the energy model
                  (keyed with a tuple of the CPUs contained by that node) Shows
                  the estimated power over time.
        """
        task_cpu_df = self.get_task_cpu_df(experiment)
        task_utils_df = self.get_task_utils_df(experiment)

        tasks = experiment.wload.tasks.keys()

        # Create a combined DataFrame with the utilization of a task and the CPU
        # it was running on at each moment. Looks like:
        #                       utils                  cpus
        #          task_wmig0 task_wmig1 task_wmig0 task_wmig1
        # 2.375056      102.4      102.4        NaN        NaN
        # 2.375105      102.4      102.4        2.0        NaN

        df = pd.concat([task_utils_df, task_cpu_df],
                       axis=1, keys=['utils', 'cpus'])
        df = df.sort_index().fillna(method='ffill')
        nrg_model = self.executor.te.nrg_model

        # Now make a DataFrame with the estimated power at each moment.
        def est_power(row):
            cpu_utils = [0 for cpu in nrg_model.cpus]
            for task in tasks:
                cpu = row['cpus'][task]
                util = row['utils'][task]
                if not isnan(cpu):
                    cpu_utils[int(cpu)] += util
            power = nrg_model.estimate_from_cpu_util(cpu_utils)
            columns = power.keys()
            return pd.Series([power[c] for c in columns], index=columns)
        return self._sort_power_df_columns(df.apply(est_power, axis=1))

    def get_expected_power_df(self, experiment):
        """
        Estimate *optimal* power usage over time

        Examine a trace and use :meth:get_optimal_placements and
        :meth:EnergyModel.estimate_from_cpu_util to get a DataFrame showing the
        estimated power usage over time under ideal EAS behaviour.

        :param experiment: The :class:Experiment to examine
        :returns: A Pandas DataFrame with a column each node in the energy model
                  (keyed with a tuple of the CPUs contained by that node) and a
                  "power" column with the sum of other columns. Shows the
                  estimated *optimal* power over time.
        """
        task_utils_df = self.get_task_utils_df(experiment)

        nrg_model = self.te.nrg_model

        def exp_power(row):
            task_utils = row.to_dict()
            expected_utils = nrg_model.get_optimal_placements(task_utils)
            power = nrg_model.estimate_from_cpu_util(expected_utils[0])
            columns = power.keys()
            return pd.Series([power[c] for c in columns], index=columns)
        return self._sort_power_df_columns(
            task_utils_df.apply(exp_power, axis=1))

    def _test_slack(self, experiment, tasks):
        """
        Assert that the RTApp workload was given enough performance

        Use :class:PerfAnalysis to find instances where the experiment's RT-App
        workload wasn't able to complete its activations (i.e. its reported
        "slack" was negative). Assert that this happened less that
        ``negative_slack_allowed_pct`` percent of the time.

        :meth:_test_task_placement asserts that estimated energy usage was
        low. That will pass for runs where too *little* energy was used,
        compromising performance. This method provides a separate test to
        counteract that problem.
        """

        pa = PerfAnalysis(experiment.out_dir)
        for task in tasks:
            slack = pa.df(task)["Slack"]

            bad_activations_pct = len(slack[slack < 0]) * 100. / len(slack)
            if bad_activations_pct > self.negative_slack_allowed_pct:
                raise AssertionError("task {} missed {}% of activations".format(
                    task, bad_activations_pct))

    def _test_task_placement(self, experiment, tasks):
        """
        Test that task placement was energy-efficient

        Use :meth:get_expected_power_df and :meth:get_power_df to estimate
        optimal and observed power usage for task placements of the experiment's
        workload. Assert that the observed power does not exceed the optimal
        power by more than 20%.
        """
        exp_power = self.get_expected_power_df(experiment)
        est_power = self.get_power_df(experiment)

        exp_energy = area_under_curve(exp_power.sum(axis=1), method='rect')
        est_energy = area_under_curve(est_power.sum(axis=1), method='rect')

        msg = 'Estimated {} bogo-Joules to run workload, expected {}'.format(
            est_energy, exp_energy)
        threshold = exp_energy * (1 + (self.energy_est_threshold_pct / 100.))
        self.assertLess(est_energy, threshold, msg=msg)

class OneSmallTask(_EnergyModelTest):
    """
    Test EAS for a single 20% task over 2 seconds
    """
    workloads = {
        'one_small' : {
            'type' : 'rt-app',
            'conf' : {
                'class' : 'periodic',
                'params' : {
                    'duty_cycle_pct': 20,
                    'duration_s': 2,
                    'period_ms': WORKLOAD_PERIOD_MS,
                },
                'tasks' : 1,
                'prefix' : 'many',
            },
        },
    }
    @experiment_test
    def test_slack(self, experiment, tasks):
        self._test_slack(experiment, tasks)
    @experiment_test
    def test_task_placement(self, experiment, tasks):
        self._test_task_placement(experiment, tasks)

class ThreeSmallTasks(_EnergyModelTest):
    """
    Test EAS for 3 20% tasks over 2 seconds
    """

    # The energy estimation for this test is probably not very accurate and this
    # isn't a very realistic workload. It doesn't really matter if we pick an
    # "ideal" task placement for this workload, we just want to avoid using big
    # CPUs in a big.LITTLE system. So use a larger energy threshold that
    # hopefully prevents too much use of big CPUs but otherwise is flexible in
    # allocation of LITTLEs.
    energy_est_threshold_pct = 20

    workloads = {
        'three_small' : {
            'type' : 'rt-app',
            'conf' : {
                'class' : 'periodic',
                'params' : {
                    'duty_cycle_pct': 20,
                    'duration_s': 2,
                    'period_ms': WORKLOAD_PERIOD_MS,
                },
                'tasks' : 3,
                'prefix' : 'many',
            },
        },
    }
    @experiment_test
    def test_slack(self, experiment, tasks):
        self._test_slack(experiment, tasks)
    @experiment_test
    def test_task_placement(self, experiment, tasks):
        self._test_task_placement(experiment, tasks)

class TwoBigTasks(_EnergyModelTest):
    """
    Test EAS for 2 80% tasks over 2 seconds
    """
    workloads = {
        'two_big' : {
            'type' : 'rt-app',
            'conf' : {
                'class' : 'periodic',
                'params' : {
                    'duty_cycle_pct': 80,
                    'duration_s': 2,
                    'period_ms': WORKLOAD_PERIOD_MS,
                },
                'tasks' : 2,
                'prefix' : 'many',
            },
        },
    }
    @experiment_test
    def test_slack(self, experiment, tasks):
        self._test_slack(experiment, tasks)
    @experiment_test
    def test_task_placement(self, experiment, tasks):
        self._test_task_placement(experiment, tasks)

class TwoBigThreeSmall(_EnergyModelTest):
    """
    Test EAS for 2 70% tasks and 3 10% tasks over 2 seconds
    """
    workloads = {
        'two_big_three_small' : {
            'type' : 'rt-app',
            'conf' : {
                'class' : 'profile',
                'params' : {
                    'large' : {
                        'kind' : 'Periodic',
                        'params' : {
                            'duty_cycle_pct': 70,
                            'duration_s': 2,
                            'period_ms': WORKLOAD_PERIOD_MS,
                        },
                        'tasks' : 2,
                    },
                    'small' : {
                        'kind' : 'Periodic',
                        'params' : {
                            'duty_cycle_pct': 10,
                            'duration_s': 2,
                            'period_ms': WORKLOAD_PERIOD_MS,
                        },
                        'tasks' : 3,
                    },
                },
            },
        },
    }
    @experiment_test
    def test_slack(self, experiment, tasks):
        self._test_slack(experiment, tasks)
    @experiment_test
    def test_task_placement(self, experiment, tasks):
        self._test_task_placement(experiment, tasks)

class RampUp(_EnergyModelTest):
    """
    Test EAS for a task ramping from 5% up to 70% over 2 seconds
    """
    workloads = {
        "ramp_up" : {
            "type": "rt-app",
            "conf" : {
                "class"  : "profile",
                "params"  : {
                    "r5_10-60" : {
                        "kind"   : "Ramp",
                        "params" : {
                            'period_ms': WORKLOAD_PERIOD_MS,
                            "start_pct" :  5,
                            "end_pct"   : 70,
                            "delta_pct" :  5,
                            "time_s"    :  2,
                         },
                    },
                },
            },
        },
    }

    @experiment_test
    def test_slack(self, experiment, tasks):
        self._test_slack(experiment, tasks)
    @experiment_test
    def test_task_placement(self, experiment, tasks):
        self._test_task_placement(experiment, tasks)

class RampDown(_EnergyModelTest):
    """
    Test EAS for a task ramping from 70% down to 5% over 2 seconds
    """

    # The main purpose of this test is to ensure that as it reduces in load, a
    # task is migrated from big to LITTLE CPUs on a big.LITTLE system.
    # This migration naturally happens some time _after_ it could possibly be
    # done, since there must be some hysteresis to avoid a performance cost.
    # Therefore allow a larger energy usage threshold
    energy_est_threshold_pct = 15

    workloads = {
        "ramp_down" : {
            "type": "rt-app",
            "conf" : {
                "class"  : "profile",
                "params"  : {
                    "r5_10-60" : {
                        "kind"   : "Ramp",
                        "params" : {
                            'period_ms': WORKLOAD_PERIOD_MS,
                            "start_pct" : 70,
                            "end_pct"   :  5,
                            "delta_pct" :  5,
                            "time_s"    :  2,
                         },
                    },
                },
            },
        },
    }

    @experiment_test
    def test_slack(self, experiment, tasks):
        self._test_slack(experiment, tasks)
    @experiment_test
    def test_task_placement(self, experiment, tasks):
        self._test_task_placement(experiment, tasks)

class EnergyModelWakeMigration(_EnergyModelTest):
    """
    Test EAS for tasks alternating beetween 10% and 50%
    """
    workloads = {
        'em_wake_migration' : {
            'type' : 'rt-app',
            'conf' : {
                'class' : 'profile',
                'params' : {
                    'wmig' : {
                        'kind' : 'Step',
                        'params' : {
                            "period_ms" : WORKLOAD_PERIOD_MS,
                            'start_pct': 10,
                            'end_pct': 50,
                            'time_s': 2,
                            'loops': 2
                        },
                        # Create one task for each big cpu
                        'tasks' : 'big',
                    },
                },
            },
        },
    }
    @experiment_test
    def test_slack(self, experiment, tasks):
        self._test_slack(experiment, tasks)
    @experiment_test
    def test_task_placement(self, experiment, tasks):
        self._test_task_placement(experiment, tasks)