普通文本  |  1083行  |  40.26 KB

# SPDX-License-Identifier: Apache-2.0
#
# Copyright (C) 2015, ARM Limited and contributors.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

""" Trace Parser Module """

import numpy as np
import os
import pandas as pd
import sys
import trappy
import json
import warnings
import operator
import logging

from analysis_register import AnalysisRegister
from collections import namedtuple
from devlib.utils.misc import memoized
from trappy.utils import listify, handle_duplicate_index


NON_IDLE_STATE = -1
ResidencyTime = namedtuple('ResidencyTime', ['total', 'active'])
ResidencyData = namedtuple('ResidencyData', ['label', 'residency'])

class Trace(object):
    """
    The Trace object is the LISA trace events parser.

    :param platform: a dictionary containing information about the target
        platform
    :type platform: dict or None

    :param data_dir: folder containing all trace data
    :type data_dir: str

    :param events: events to be parsed (everything in the trace by default)
    :type events: list(str)

    :param window: time window to consider when parsing the trace
    :type window: tuple(int, int)

    :param normalize_time: normalize trace time stamps
    :type normalize_time: bool

    :param trace_format: format of the trace. Possible values are:
        - FTrace
        - SysTrace
    :type trace_format: str

    :param plots_dir: directory where to save plots
    :type plots_dir: str

    :param plots_prefix: prefix for plots file names
    :type plots_prefix: str

    :param cgroup_info: add cgroup information for sanitization
      example:
        {
            'controller_ids': { 2: 'schedtune', 4: 'cpuset' },
            'cgroups': [ 'root', 'background', 'foreground' ],  # list of allowed cgroup names
        }
    :type cgroup_info: dict
    """

    def __init__(self, platform, data_dir, events,
                 tasks=None,
                 window=(0, None),
                 normalize_time=True,
                 trace_format='FTrace',
                 plots_dir=None,
                 plots_prefix='',
                 cgroup_info={}):

        # The platform used to run the experiments
        self.platform = platform or {}

        # TRAPpy Trace object
        self.ftrace = None

        # Trace format
        self.trace_format = trace_format

        # The time window used to limit trace parsing to
        self.window = window

        # Whether trace timestamps are normalized or not
        self.normalize_time = normalize_time

        # Dynamically registered TRAPpy events
        self.trappy_cls = {}

        # Maximum timespan for all collected events
        self.time_range = 0

        # Time the system was overutilzied
        self.overutilized_time = 0
        self.overutilized_prc = 0

        # List of events required by user
        self.events = []

        # List of events available in the parsed trace
        self.available_events = []

        # Cluster frequency coherency flag
        self.freq_coherency = True

        # Folder containing all trace data
        self.data_dir = None

        # Setup logging
        self._log = logging.getLogger('Trace')

        # Folder containing trace
        if not os.path.isdir(data_dir):
            self.data_dir = os.path.dirname(data_dir) or '.'
        else:
            self.data_dir = data_dir

        # By deafult, use the trace dir to save plots
        self.plots_dir = plots_dir
        if self.plots_dir is None:
            self.plots_dir = self.data_dir
        self.plots_prefix = plots_prefix

        # Cgroup info for sanitization
        self.cgroup_info = cgroup_info

        self.__registerTraceEvents(events) if events else None
        self.__parseTrace(data_dir, tasks, window, trace_format)

        # Minimum and Maximum x_time to use for all plots
        self.x_min = 0
        self.x_max = self.time_range

        # Reset x axis time range to full scale
        t_min = self.window[0]
        t_max = self.window[1]
        self.setXTimeRange(t_min, t_max)

        self.data_frame = TraceData()
        self._registerDataFrameGetters(self)

        # If we don't know the number of CPUs, check the trace for the
        # highest-numbered CPU that traced an event.
        if 'cpus_count' not in self.platform:
            max_cpu = max(int(self.data_frame.trace_event(e)['__cpu'].max())
                          for e in self.available_events)
            self.platform['cpus_count'] = max_cpu + 1

        self.analysis = AnalysisRegister(self)

    def _registerDataFrameGetters(self, module):
        """
        Internal utility function that looks up getter functions with a "_dfg_"
        prefix in their name and bounds them to the specified module.

        :param module: module to which the function is added
        :type module: class
        """
        self._log.debug('Registering [%s] local data frames', module)
        for func in dir(module):
            if not func.startswith('_dfg_'):
                continue
            dfg_name = func.replace('_dfg_', '')
            dfg_func = getattr(module, func)
            self._log.debug('   %s', dfg_name)
            setattr(self.data_frame, dfg_name, dfg_func)

    def setXTimeRange(self, t_min=None, t_max=None):
        """
        Set x axis time range to the specified values.

        :param t_min: lower bound
        :type t_min: int or float

        :param t_max: upper bound
        :type t_max: int or float
        """
        self.x_min = t_min if t_min is not None else self.start_time
        self.x_max = t_max if t_max is not None else self.start_time + self.time_range

        self._log.debug('Set plots time range to (%.6f, %.6f)[s]',
                       self.x_min, self.x_max)

    def __registerTraceEvents(self, events):
        """
        Save a copy of the parsed events.

        :param events: single event name or list of events names
        :type events: str or list(str)
        """
        if isinstance(events, basestring):
            self.events = events.split(' ')
        elif isinstance(events, list):
            self.events = events
        else:
            raise ValueError('Events must be a string or a list of strings')
        # Register devlib fake cpu_frequency events
        if 'cpu_frequency' in events:
            self.events.append('cpu_frequency_devlib')

    def __parseTrace(self, path, tasks, window, trace_format):
        """
        Internal method in charge of performing the actual parsing of the
        trace.

        :param path: path to the trace folder (or trace file)
        :type path: str

        :param tasks: filter data for the specified tasks only
        :type tasks: list(str)

        :param window: time window to consider when parsing the trace
        :type window: tuple(int, int)

        :param trace_format: format of the trace. Possible values are:
            - FTrace
            - SysTrace
        :type trace_format: str
        """
        self._log.debug('Loading [sched] events from trace in [%s]...', path)
        self._log.debug('Parsing events: %s', self.events if self.events else 'ALL')
        if trace_format.upper() == 'SYSTRACE' or path.endswith('html'):
            self._log.debug('Parsing SysTrace format...')
            trace_class = trappy.SysTrace
            self.trace_format = 'SysTrace'
        elif trace_format.upper() == 'FTRACE':
            self._log.debug('Parsing FTrace format...')
            trace_class = trappy.FTrace
            self.trace_format = 'FTrace'
        else:
            raise ValueError("Unknown trace format {}".format(trace_format))

        # If using normalized time, we should use
        # TRAPpy's `abs_window` instead of `window`
        window_kw = {}
        if self.normalize_time:
            window_kw['window'] = window
        else:
            window_kw['abs_window'] = window
        scope = 'custom' if self.events else 'all'
        self.ftrace = trace_class(path, scope=scope, events=self.events,
                                  normalize_time=self.normalize_time, **window_kw)

        # Load Functions profiling data
        has_function_stats = self._loadFunctionsStats(path)

        # Check for events available on the parsed trace
        self.__checkAvailableEvents()
        if len(self.available_events) == 0:
            if has_function_stats:
                self._log.info('Trace contains only functions stats')
                return
            raise ValueError('The trace does not contain useful events '
                             'nor function stats')

        # Index PIDs and Task names
        self.__loadTasksNames(tasks)

        self.__computeTimeSpan()

        # Sanitize cgroup info if any
        self._sanitize_CgroupAttachTask()

        # Setup internal data reference to interesting events/dataframes
        self._sanitize_SchedOverutilized()

        # Santization not possible if platform missing
        if not self.platform:
            # Setup internal data reference to interesting events/dataframes
            self._sanitize_SchedLoadAvgCpu()
            self._sanitize_SchedLoadAvgTask()
            self._sanitize_SchedCpuCapacity()
            self._sanitize_SchedBoostCpu()
            self._sanitize_SchedBoostTask()
            self._sanitize_SchedEnergyDiff()
            self._sanitize_CpuFrequency()

    def __checkAvailableEvents(self, key=""):
        """
        Internal method used to build a list of available events.

        :param key: key to be used for TRAPpy filtering
        :type key: str
        """
        for val in self.ftrace.get_filters(key):
            obj = getattr(self.ftrace, val)
            if len(obj.data_frame):
                self.available_events.append(val)
        self._log.debug('Events found on trace:')
        for evt in self.available_events:
            self._log.debug(' - %s', evt)

    def __loadTasksNames(self, tasks):
        """
        Try to load tasks names using one of the supported events.

        :param tasks: list of task names. If None, load all tasks found.
        :type tasks: list(str) or NoneType
        """
        def load(tasks, event, name_key, pid_key):
            df = self._dfg_trace_event(event)
            if tasks is None:
                tasks = df[name_key].unique()
            self._scanTasks(df, name_key=name_key, pid_key=pid_key)
            self._scanTgids(df)

        if 'sched_switch' in self.available_events:
            load(tasks, 'sched_switch', 'prev_comm', 'prev_pid')
            return

        if 'sched_load_avg_task' in self.available_events:
            load(tasks, 'sched_load_avg_task', 'comm', 'pid')
            return

        self._log.warning('Failed to load tasks names from trace events')

    def hasEvents(self, dataset):
        """
        Returns True if the specified event is present in the parsed trace,
        False otherwise.

        :param dataset: trace event name or list of trace events
        :type dataset: str or list(str)
        """
        if dataset in self.available_events:
            return True
        return False

    def __computeTimeSpan(self):
        """
        Compute time axis range, considering all the parsed events.
        """
        self.start_time = 0 if self.normalize_time else self.ftrace.basetime
        self.time_range = self.ftrace.get_duration()
        self._log.debug('Collected events spans a %.3f [s] time interval',
                       self.time_range)

        self.setXTimeRange(self.window[0], self.window[1])

    def _scanTgids(self, df):
        if not '__tgid' in df.columns:
            return
        df = df[['__pid', '__tgid']]
        df = df.drop_duplicates(keep='first').set_index('__pid')
        df.rename(columns = { '__pid': 'pid', '__tgid': 'tgid' },
                              inplace=True)
        self._pid_tgid = df

    def _scanTasks(self, df, name_key='comm', pid_key='pid'):
        """
        Extract tasks names and PIDs from the input data frame. The data frame
        should contain a task name column and PID column.

        :param df: data frame containing trace events from which tasks names
            and PIDs will be extracted
        :type df: :mod:`pandas.DataFrame`

        :param name_key: The name of the dataframe columns containing task
            names
        :type name_key: str

        :param pid_key: The name of the dataframe columns containing task PIDs
        :type pid_key: str
        """
        df = df[[name_key, pid_key]]
        self._tasks_by_name = df.set_index(name_key)
        self._tasks_by_pid = (df.drop_duplicates(subset=pid_key, keep='last')
                .rename(columns={
                    pid_key : 'PID',
                    name_key : 'TaskName'})
                .set_index('PID').sort_index())

    def getTaskByName(self, name):
        """
        Get the PIDs of all tasks with the specified name.

        The same PID can have different task names, mainly because once a task
        is generated it inherits the parent name and then its name is updated
        to represent what the task really is.

        This API works under the assumption that a task name is updated at
        most one time and it always considers the name a task had the last time
        it has been scheduled for execution in the current trace.

        :param name: task name
        :type name: str

        :return: a list of PID for tasks which name matches the required one,
                 the last time they ran in the current trace
        """
        return (self._tasks_by_pid[self._tasks_by_pid.TaskName == name]
                    .index.tolist())

    def getTaskByPid(self, pid):
        """
        Get the name of the task with the specified PID.

        The same PID can have different task names, mainly because once a task
        is generated it inherits the parent name and then its name is
        updated to represent what the task really is.

        This API works under the assumption that a task name is updated at
        most one time and it always report the name the task had the last time
        it has been scheduled for execution in the current trace.

        :param name: task PID
        :type name: int

        :return: the list of names of the tasks whose PID matches the required one,
                 the last time they ran in the current trace
        """
        try:
            return self._tasks_by_pid.ix[pid].values[0]
        except KeyError:
            return None

    def getTgidFromPid(self, pid):
        return _pid_tgid.ix[pid].values[0]

    def getTasks(self, dataframe=None,
                 task_names=None, name_key='comm', pid_key='pid'):
        """
        :return: the name of the task which PID matches the required one,
                 the last time they ran in the current trace
        """
        try:
            return self._tasks_by_pid.ix[pid].values[0]
        except KeyError:
            return None

    def getTasks(self):
        """
        Get a dictionary of all the tasks in the Trace.

        :return: a dictionary which maps each PID to the corresponding task
                 name
        """
        return self._tasks_by_pid.TaskName.to_dict()


###############################################################################
# DataFrame Getter Methods
###############################################################################

    def df(self, event):
        """
        Get a dataframe containing all occurrences of the specified trace event
        in the parsed trace.

        :param event: Trace event name
        :type event: str
        """
        warnings.simplefilter('always', DeprecationWarning) #turn off filter
        warnings.warn("\n\tUse of Trace::df() is deprecated and will be soon removed."
                      "\n\tUse Trace::data_frame.trace_event(event_name) instead.",
                      category=DeprecationWarning)
        warnings.simplefilter('default', DeprecationWarning) #reset filter
        return self._dfg_trace_event(event)

    def _dfg_trace_event(self, event):
        """
        Get a dataframe containing all occurrences of the specified trace event
        in the parsed trace.

        :param event: Trace event name
        :type event: str
        """
        if self.data_dir is None:
            raise ValueError("trace data not (yet) loaded")
        if self.ftrace and hasattr(self.ftrace, event):
            return getattr(self.ftrace, event).data_frame
        raise ValueError('Event [{}] not supported. '
                         'Supported events are: {}'
                         .format(event, self.available_events))

    def _dfg_functions_stats(self, functions=None):
        """
        Get a DataFrame of specified kernel functions profile data

        For each profiled function a DataFrame is returned which reports stats
        on kernel functions execution time. The reported stats are per-CPU and
        includes: number of times the function has been executed (hits),
        average execution time (avg), overall execution time (time) and samples
        variance (s_2).
        By default returns a DataFrame of all the functions profiled.

        :param functions: the name of the function or a list of function names
                          to report
        :type functions: str or list(str)
        """
        if not hasattr(self, '_functions_stats_df'):
            return None
        df = self._functions_stats_df
        if not functions:
            return df
        return df.loc[df.index.get_level_values(1).isin(listify(functions))]

    # cgroup_attach_task with just merged fake and real events
    def _cgroup_attach_task(self):
        cgroup_events = ['cgroup_attach_task', 'cgroup_attach_task_devlib']
        df = None

        if set(cgroup_events).isdisjoint(set(self.available_events)):
            self._log.error('atleast one of {} is needed for cgroup_attach_task event generation'.format(cgroup_events))
            return None

        for cev in cgroup_events:
            if not cev in self.available_events:
                continue
            cdf = self._dfg_trace_event(cev)
            cdf = cdf[['__line', 'pid', 'controller', 'cgroup']]
            if not isinstance(df, pd.DataFrame):
                df = cdf
            else:
                df = pd.concat([cdf, df])

        # Always drop na since this DF is used as secondary
        df.dropna(inplace=True, how='any')
        return df

    @memoized
    def _dfg_cgroup_attach_task(self, controllers = ['schedtune', 'cpuset']):
        # Since fork doesn't result in attach events, generate fake attach events
        # The below mechanism doesn't work to propogate nested fork levels:
        # For ex:
        # cgroup_attach_task: pid=1166
        # fork: pid=1166 child_pid=2222  <-- fake attach generated
        # fork: pid=2222 child_pid=3333  <-- fake attach not generated
        def fork_add_cgroup(fdf, cdf, controller):
            cdf = cdf[cdf['controller'] == controller]
            ret_df = trappy.utils.merge_dfs(fdf, cdf, pivot='pid')
            return ret_df

        if not 'sched_process_fork' in self.available_events:
            self._log.error('sched_process_fork is mandatory to get proper cgroup_attach events')
            return None
        fdf = self._dfg_trace_event('sched_process_fork')

        forks_len = len(fdf)
        forkdf = fdf
        cdf = self._cgroup_attach_task()
        for idx, c in enumerate(controllers):
            fdf = fork_add_cgroup(fdf, cdf, c)
            if (idx != (len(controllers) - 1)):
                fdf = pd.concat([fdf, forkdf]).sort_values(by='__line')

        fdf = fdf[['__line', 'child_pid', 'controller', 'cgroup']]
        fdf.rename(columns = { 'child_pid': 'pid' }, inplace=True)

        # Always drop na since this DF is used as secondary
        fdf.dropna(inplace=True, how='any')

        new_forks_len = len(fdf) / len(controllers)

        fdf = pd.concat([fdf, cdf]).sort_values(by='__line')

        if new_forks_len < forks_len:
            dropped = forks_len - new_forks_len
            self._log.info("Couldn't attach all forks cgroup with-attach events ({} dropped)".format(dropped))
        return fdf

    @memoized
    def _dfg_sched_switch_cgroup(self, controllers = ['schedtune', 'cpuset']):
        def sched_switch_add_cgroup(sdf, cdf, controller, direction):
            cdf = cdf[cdf['controller'] == controller]

            ret_df = sdf.rename(columns = { direction + '_pid': 'pid' })
            ret_df = trappy.utils.merge_dfs(ret_df, cdf, pivot='pid')
            ret_df.rename(columns = { 'pid': direction + '_pid' }, inplace=True)

            ret_df.drop('controller', axis=1, inplace=True)
            ret_df.rename(columns = { 'cgroup': direction + '_' + controller }, inplace=True)
            return ret_df

        if not 'sched_switch' in self.available_events:
            self._log.error('sched_switch is mandatory to generate sched_switch_cgroup event')
            return None
        sdf = self._dfg_trace_event('sched_switch')
        cdf = self._dfg_cgroup_attach_task()

        for c in controllers:
            sdf = sched_switch_add_cgroup(sdf, cdf, c, 'next')
            sdf = sched_switch_add_cgroup(sdf, cdf, c, 'prev')

        # Augment with TGID information
        sdf = sdf.join(self._pid_tgid, on='next_pid').rename(columns = {'tgid': 'next_tgid'})
        sdf = sdf.join(self._pid_tgid, on='prev_pid').rename(columns = {'tgid': 'prev_tgid'})

        df = self._tasks_by_pid.rename(columns = { 'next_comm': 'comm' })

        sdf = sdf.join(df, on='next_tgid').rename(columns = {'TaskName': 'next_tgid_comm'})
        sdf = sdf.join(df, on='prev_tgid').rename(columns = {'TaskName': 'prev_tgid_comm'})
        return sdf

###############################################################################
# Trace Events Sanitize Methods
###############################################################################
    @property
    def has_big_little(self):
        return ('clusters' in self.platform
                and 'big' in self.platform['clusters']
                and 'little' in self.platform['clusters']
                and 'nrg_model' in self.platform)

    def _sanitize_SchedCpuCapacity(self):
        """
        Add more columns to cpu_capacity data frame if the energy model is
        available and the platform is big.LITTLE.
        """
        if not self.hasEvents('cpu_capacity') \
           or 'nrg_model' not in self.platform \
           or not self.has_big_little:
            return

        df = self._dfg_trace_event('cpu_capacity')

        # Add column with LITTLE and big CPUs max capacities
        nrg_model = self.platform['nrg_model']
        max_lcap = nrg_model['little']['cpu']['cap_max']
        max_bcap = nrg_model['big']['cpu']['cap_max']
        df['max_capacity'] = np.select(
                [df.cpu.isin(self.platform['clusters']['little'])],
                [max_lcap], max_bcap)
        # Add LITTLE and big CPUs "tipping point" threshold
        tip_lcap = 0.8 * max_lcap
        tip_bcap = 0.8 * max_bcap
        df['tip_capacity'] = np.select(
                [df.cpu.isin(self.platform['clusters']['little'])],
                [tip_lcap], tip_bcap)

    def _sanitize_SchedLoadAvgCpu(self):
        """
        If necessary, rename certain signal names from v5.0 to v5.1 format.
        """
        if not self.hasEvents('sched_load_avg_cpu'):
            return
        df = self._dfg_trace_event('sched_load_avg_cpu')
        if 'utilization' in df:
            df.rename(columns={'utilization': 'util_avg'}, inplace=True)
            df.rename(columns={'load': 'load_avg'}, inplace=True)

    def _sanitize_SchedLoadAvgTask(self):
        """
        If necessary, rename certain signal names from v5.0 to v5.1 format.
        """
        if not self.hasEvents('sched_load_avg_task'):
            return
        df = self._dfg_trace_event('sched_load_avg_task')
        if 'utilization' in df:
            df.rename(columns={'utilization': 'util_avg'}, inplace=True)
            df.rename(columns={'load': 'load_avg'}, inplace=True)
            df.rename(columns={'avg_period': 'period_contrib'}, inplace=True)
            df.rename(columns={'runnable_avg_sum': 'load_sum'}, inplace=True)
            df.rename(columns={'running_avg_sum': 'util_sum'}, inplace=True)

        if not self.has_big_little:
            return

        df['cluster'] = np.select(
                [df.cpu.isin(self.platform['clusters']['little'])],
                ['LITTLE'], 'big')

        if 'nrg_model' not in self.platform:
            return

        # Add a column which represents the max capacity of the smallest
        # clustre which can accomodate the task utilization
        little_cap = self.platform['nrg_model']['little']['cpu']['cap_max']
        big_cap = self.platform['nrg_model']['big']['cpu']['cap_max']
        df['min_cluster_cap'] = df.util_avg.map(
            lambda util_avg: big_cap if util_avg > little_cap else little_cap
        )

    def _sanitize_SchedBoostCpu(self):
        """
        Add a boosted utilization signal as the sum of utilization and margin.

        Also, if necessary, rename certain signal names from v5.0 to v5.1
        format.
        """
        if not self.hasEvents('sched_boost_cpu'):
            return
        df = self._dfg_trace_event('sched_boost_cpu')
        if 'usage' in df:
            df.rename(columns={'usage': 'util'}, inplace=True)
        df['boosted_util'] = df['util'] + df['margin']

    def _sanitize_SchedBoostTask(self):
        """
        Add a boosted utilization signal as the sum of utilization and margin.

        Also, if necessary, rename certain signal names from v5.0 to v5.1
        format.
        """
        if not self.hasEvents('sched_boost_task'):
            return
        df = self._dfg_trace_event('sched_boost_task')
        if 'utilization' in df:
            # Convert signals name from to v5.1 format
            df.rename(columns={'utilization': 'util'}, inplace=True)
        df['boosted_util'] = df['util'] + df['margin']

    def _sanitize_SchedEnergyDiff(self):
        """
        If a energy model is provided, some signals are added to the
        sched_energy_diff trace event data frame.

        Also convert between existing field name formats for sched_energy_diff
        """
        if not self.hasEvents('sched_energy_diff') \
           or 'nrg_model' not in self.platform \
           or not self.has_big_little:
            return
        nrg_model = self.platform['nrg_model']
        em_lcluster = nrg_model['little']['cluster']
        em_bcluster = nrg_model['big']['cluster']
        em_lcpu = nrg_model['little']['cpu']
        em_bcpu = nrg_model['big']['cpu']
        lcpus = len(self.platform['clusters']['little'])
        bcpus = len(self.platform['clusters']['big'])
        SCHED_LOAD_SCALE = 1024

        power_max = em_lcpu['nrg_max'] * lcpus + em_bcpu['nrg_max'] * bcpus + \
            em_lcluster['nrg_max'] + em_bcluster['nrg_max']
        self._log.debug(
            "Maximum estimated system energy: {0:d}".format(power_max))

        df = self._dfg_trace_event('sched_energy_diff')

        translations = {'nrg_d' : 'nrg_diff',
                        'utl_d' : 'usage_delta',
                        'payoff' : 'nrg_payoff'
        }
        df.rename(columns=translations, inplace=True)

        df['nrg_diff_pct'] = SCHED_LOAD_SCALE * df.nrg_diff / power_max

        # Tag columns by usage_delta
        ccol = df.usage_delta
        df['usage_delta_group'] = np.select(
            [ccol < 150, ccol < 400, ccol < 600],
            ['< 150', '< 400', '< 600'], '>= 600')

        # Tag columns by nrg_payoff
        ccol = df.nrg_payoff
        df['nrg_payoff_group'] = np.select(
            [ccol > 2e9, ccol > 0, ccol > -2e9],
            ['Optimal Accept', 'SchedTune Accept', 'SchedTune Reject'],
            'Suboptimal Reject')

    def _sanitize_SchedOverutilized(self):
        """ Add a column with overutilized status duration. """
        if not self.hasEvents('sched_overutilized'):
            return
        df = self._dfg_trace_event('sched_overutilized')
        df['start'] = df.index
        df['len'] = (df.start - df.start.shift()).fillna(0).shift(-1)
        df.drop('start', axis=1, inplace=True)

        # Fix the last event, which will have a NaN duration
        # Set duration to trace_end - last_event
        df.loc[df.index[-1], 'len'] = self.start_time + self.time_range - df.index[-1]

        # Build a stat on trace overutilization
        df = self._dfg_trace_event('sched_overutilized')
        self.overutilized_time = df[df.overutilized == 1].len.sum()
        self.overutilized_prc = 100. * self.overutilized_time / self.time_range

        self._log.debug('Overutilized time: %.6f [s] (%.3f%% of trace time)',
                        self.overutilized_time, self.overutilized_prc)

    # Sanitize cgroup information helper
    def _helper_sanitize_CgroupAttachTask(self, df, allowed_cgroups, controller_id_name):
        # Drop rows that aren't in the root-id -> name map
        df = df[df['dst_root'].isin(controller_id_name.keys())]

        def get_cgroup_name(path, valid_names):
            name = os.path.basename(path)
            name = 'root' if not name in valid_names else name
            return name

        def get_cgroup_names(rows):
            ret = []
            for r in rows.iterrows():
                 ret.append(get_cgroup_name(r[1]['dst_path'], allowed_cgroups))
            return ret

        def get_controller_names(rows):
            ret = []
            for r in rows.iterrows():
                 ret.append(controller_id_name[r[1]['dst_root']])
            return ret

        # Sanitize cgroup names
        # cgroup column isn't in mainline, add it in
        # its already added for some out of tree kernels so check first
        if not 'cgroup' in df.columns:
            if not 'dst_path' in df.columns:
                raise RuntimeError('Cant santize cgroup DF, need dst_path')
            df = df.assign(cgroup = get_cgroup_names)

        # Sanitize controller names
        if not 'controller' in df.columns:
            if not 'dst_root' in df.columns:
                raise RuntimeError('Cant santize cgroup DF, need dst_path')
            df = df.assign(controller = get_controller_names)

        return df

    def _sanitize_CgroupAttachTask(self):
        def sanitize_cgroup_event(name):
            if not name in self.available_events:
                return

            df = self._dfg_trace_event(name)

            if len(df.groupby(level=0).filter(lambda x: len(x) > 1)) > 0:
                self._log.warning('Timstamp Collisions seen in {} event!'.format(name))

            df = self._helper_sanitize_CgroupAttachTask(df, self.cgroup_info['cgroups'],
                                              self.cgroup_info['controller_ids'])
            getattr(self.ftrace, name).data_frame = df
        sanitize_cgroup_event('cgroup_attach_task')
        sanitize_cgroup_event('cgroup_attach_task_devlib')

    def _chunker(self, seq, size):
        """
        Given a data frame or a series, generate a sequence of chunks of the
        given size.

        :param seq: data to be split into chunks
        :type seq: :mod:`pandas.Series` or :mod:`pandas.DataFrame`

        :param size: size of each chunk
        :type size: int
        """
        return (seq.iloc[pos:pos + size] for pos in range(0, len(seq), size))

    def _sanitize_CpuFrequency(self):
        """
        Verify that all platform reported clusters are frequency coherent (i.e.
        frequency scaling is performed at a cluster level).
        """
        if not self.hasEvents('cpu_frequency_devlib') \
           or 'clusters' not in self.platform:
            return

        devlib_freq = self._dfg_trace_event('cpu_frequency_devlib')
        devlib_freq.rename(columns={'cpu_id':'cpu'}, inplace=True)
        devlib_freq.rename(columns={'state':'frequency'}, inplace=True)

        df = self._dfg_trace_event('cpu_frequency')
        clusters = self.platform['clusters']

        # devlib always introduces fake cpu_frequency events, in case the
        # OS has not generated cpu_frequency envets there are the only
        # frequency events to report
        if len(df) == 0:
            # Register devlib injected events as 'cpu_frequency' events
            setattr(self.ftrace.cpu_frequency, 'data_frame', devlib_freq)
            df = devlib_freq
            self.available_events.append('cpu_frequency')

        # make sure fake cpu_frequency events are never interleaved with
        # OS generated events
        else:
            if len(devlib_freq) > 0:

                # Frequencies injection is done in a per-cluster based.
                # This is based on the assumption that clusters are
                # frequency choerent.
                # For each cluster we inject devlib events only if
                # these events does not overlaps with os-generated ones.

                # Inject "initial" devlib frequencies
                os_df = df
                dl_df = devlib_freq.iloc[:self.platform['cpus_count']]
                for _,c in self.platform['clusters'].iteritems():
                    dl_freqs = dl_df[dl_df.cpu.isin(c)]
                    os_freqs = os_df[os_df.cpu.isin(c)]
                    self._log.debug("First freqs for %s:\n%s", c, dl_freqs)
                    # All devlib events "before" os-generated events
                    self._log.debug("Min os freq @: %s", os_freqs.index.min())
                    if os_freqs.empty or \
                       os_freqs.index.min() > dl_freqs.index.max():
                        self._log.debug("Insert devlib freqs for %s", c)
                        df = pd.concat([dl_freqs, df])

                # Inject "final" devlib frequencies
                os_df = df
                dl_df = devlib_freq.iloc[self.platform['cpus_count']:]
                for _,c in self.platform['clusters'].iteritems():
                    dl_freqs = dl_df[dl_df.cpu.isin(c)]
                    os_freqs = os_df[os_df.cpu.isin(c)]
                    self._log.debug("Last freqs for %s:\n%s", c, dl_freqs)
                    # All devlib events "after" os-generated events
                    self._log.debug("Max os freq @: %s", os_freqs.index.max())
                    if os_freqs.empty or \
                       os_freqs.index.max() < dl_freqs.index.min():
                        self._log.debug("Append devlib freqs for %s", c)
                        df = pd.concat([df, dl_freqs])

                df.sort_index(inplace=True)

            setattr(self.ftrace.cpu_frequency, 'data_frame', df)

        # Frequency Coherency Check
        for _, cpus in clusters.iteritems():
            cluster_df = df[df.cpu.isin(cpus)]
            for chunk in self._chunker(cluster_df, len(cpus)):
                f = chunk.iloc[0].frequency
                if any(chunk.frequency != f):
                    self._log.warning('Cluster Frequency is not coherent! '
                                      'Failure in [cpu_frequency] events at:')
                    self._log.warning(chunk)
                    self.freq_coherency = False
                    return
        self._log.info('Platform clusters verified to be Frequency coherent')

###############################################################################
# Utility Methods
###############################################################################

    def integrate_square_wave(self, sq_wave):
        """
        Compute the integral of a square wave time series.

        :param sq_wave: square wave assuming only 1.0 and 0.0 values
        :type sq_wave: :mod:`pandas.Series`
        """
        sq_wave.iloc[-1] = 0.0
        # Compact signal to obtain only 1-0-1-0 sequences
        comp_sig = sq_wave.loc[sq_wave.shift() != sq_wave]
        # First value for computing the difference must be a 1
        if comp_sig.iloc[0] == 0.0:
            return sum(comp_sig.iloc[2::2].index - comp_sig.iloc[1:-1:2].index)
        else:
            return sum(comp_sig.iloc[1::2].index - comp_sig.iloc[:-1:2].index)

    def _loadFunctionsStats(self, path='trace.stats'):
        """
        Read functions profiling file and build a data frame containing all
        relevant data.

        :param path: path to the functions profiling trace file
        :type path: str
        """
        if os.path.isdir(path):
            path = os.path.join(path, 'trace.stats')
        if (path.endswith('dat') or
            path.endswith('txt') or
            path.endswith('html')):
            pre, ext = os.path.splitext(path)
            path = pre + '.stats'
        if not os.path.isfile(path):
            return False

        # Opening functions profiling JSON data file
        self._log.debug('Loading functions profiling data from [%s]...', path)
        with open(os.path.join(path), 'r') as fh:
            trace_stats = json.load(fh)

        # Build DataFrame of function stats
        frames = {}
        for cpu, data in trace_stats.iteritems():
            frames[int(cpu)] = pd.DataFrame.from_dict(data, orient='index')

        # Build and keep track of the DataFrame
        self._functions_stats_df = pd.concat(frames.values(),
                                             keys=frames.keys())

        return len(self._functions_stats_df) > 0

    @memoized
    def getCPUActiveSignal(self, cpu):
        """
        Build a square wave representing the active (i.e. non-idle) CPU time,
        i.e.:

          cpu_active[t] == 1 if the CPU is reported to be non-idle by cpuidle at
          time t
          cpu_active[t] == 0 otherwise

        :param cpu: CPU ID
        :type cpu: int

        :returns: A :mod:`pandas.Series` or ``None`` if the trace contains no
                  "cpu_idle" events
        """
        if not self.hasEvents('cpu_idle'):
            self._log.warning('Events [cpu_idle] not found, '
                              'cannot compute CPU active signal!')
            return None

        idle_df = self._dfg_trace_event('cpu_idle')
        cpu_df = idle_df[idle_df.cpu_id == cpu]

        cpu_active = cpu_df.state.apply(
            lambda s: 1 if s == NON_IDLE_STATE else 0
        )

        start_time = 0.0
        if not self.ftrace.normalized_time:
            start_time = self.ftrace.basetime

        if cpu_active.empty:
            cpu_active = pd.Series([0], index=[start_time])
        elif cpu_active.index[0] != start_time:
            entry_0 = pd.Series(cpu_active.iloc[0] ^ 1, index=[start_time])
            cpu_active = pd.concat([entry_0, cpu_active])

        # Fix sequences of wakeup/sleep events reported with the same index
        return handle_duplicate_index(cpu_active)


    @memoized
    def getClusterActiveSignal(self, cluster):
        """
        Build a square wave representing the active (i.e. non-idle) cluster
        time, i.e.:

          cluster_active[t] == 1 if at least one CPU is reported to be non-idle
          by CPUFreq at time t
          cluster_active[t] == 0 otherwise

        :param cluster: list of CPU IDs belonging to a cluster
        :type cluster: list(int)

        :returns: A :mod:`pandas.Series` or ``None`` if the trace contains no
                  "cpu_idle" events
        """
        if not self.hasEvents('cpu_idle'):
            self._log.warning('Events [cpu_idle] not found, '
                              'cannot compute cluster active signal!')
            return None

        active = self.getCPUActiveSignal(cluster[0]).to_frame(name=cluster[0])
        for cpu in cluster[1:]:
            active = active.join(
                self.getCPUActiveSignal(cpu).to_frame(name=cpu),
                how='outer'
            )

        active.fillna(method='ffill', inplace=True)
        # There might be NaNs in the signal where we got data from some CPUs
        # before others. That will break the .astype(int) below, so drop rows
        # with NaN in them.
        active.dropna(inplace=True)

        # Cluster active is the OR between the actives on each CPU
        # belonging to that specific cluster
        cluster_active = reduce(
            operator.or_,
            [cpu_active.astype(int) for _, cpu_active in
             active.iteritems()]
        )

        return cluster_active


class TraceData:
    """ A DataFrame collector exposed to Trace's clients """
    pass

# vim :set tabstop=4 shiftwidth=4 expandtab