# SPDX-License-Identifier: Apache-2.0 # # Copyright (C) 2015, ARM Limited and contributors. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # """ Trace Parser Module """ import numpy as np import os import pandas as pd import sys import trappy import json import warnings import operator import logging from analysis_register import AnalysisRegister from collections import namedtuple from devlib.utils.misc import memoized from trappy.utils import listify, handle_duplicate_index NON_IDLE_STATE = -1 ResidencyTime = namedtuple('ResidencyTime', ['total', 'active']) ResidencyData = namedtuple('ResidencyData', ['label', 'residency']) class Trace(object): """ The Trace object is the LISA trace events parser. :param platform: a dictionary containing information about the target platform :type platform: dict or None :param data_dir: folder containing all trace data :type data_dir: str :param events: events to be parsed (everything in the trace by default) :type events: list(str) :param window: time window to consider when parsing the trace :type window: tuple(int, int) :param normalize_time: normalize trace time stamps :type normalize_time: bool :param trace_format: format of the trace. Possible values are: - FTrace - SysTrace :type trace_format: str :param plots_dir: directory where to save plots :type plots_dir: str :param plots_prefix: prefix for plots file names :type plots_prefix: str :param cgroup_info: add cgroup information for sanitization example: { 'controller_ids': { 2: 'schedtune', 4: 'cpuset' }, 'cgroups': [ 'root', 'background', 'foreground' ], # list of allowed cgroup names } :type cgroup_info: dict """ def __init__(self, platform, data_dir, events, tasks=None, window=(0, None), normalize_time=True, trace_format='FTrace', plots_dir=None, plots_prefix='', cgroup_info={}): # The platform used to run the experiments self.platform = platform or {} # TRAPpy Trace object self.ftrace = None # Trace format self.trace_format = trace_format # The time window used to limit trace parsing to self.window = window # Whether trace timestamps are normalized or not self.normalize_time = normalize_time # Dynamically registered TRAPpy events self.trappy_cls = {} # Maximum timespan for all collected events self.time_range = 0 # Time the system was overutilzied self.overutilized_time = 0 self.overutilized_prc = 0 # List of events required by user self.events = [] # List of events available in the parsed trace self.available_events = [] # Cluster frequency coherency flag self.freq_coherency = True # Folder containing all trace data self.data_dir = None # Setup logging self._log = logging.getLogger('Trace') # Folder containing trace if not os.path.isdir(data_dir): self.data_dir = os.path.dirname(data_dir) or '.' else: self.data_dir = data_dir # By deafult, use the trace dir to save plots self.plots_dir = plots_dir if self.plots_dir is None: self.plots_dir = self.data_dir self.plots_prefix = plots_prefix # Cgroup info for sanitization self.cgroup_info = cgroup_info self.__registerTraceEvents(events) if events else None self.__parseTrace(data_dir, tasks, window, trace_format) # Minimum and Maximum x_time to use for all plots self.x_min = 0 self.x_max = self.time_range # Reset x axis time range to full scale t_min = self.window[0] t_max = self.window[1] self.setXTimeRange(t_min, t_max) self.data_frame = TraceData() self._registerDataFrameGetters(self) # If we don't know the number of CPUs, check the trace for the # highest-numbered CPU that traced an event. if 'cpus_count' not in self.platform: max_cpu = max(int(self.data_frame.trace_event(e)['__cpu'].max()) for e in self.available_events) self.platform['cpus_count'] = max_cpu + 1 self.analysis = AnalysisRegister(self) def _registerDataFrameGetters(self, module): """ Internal utility function that looks up getter functions with a "_dfg_" prefix in their name and bounds them to the specified module. :param module: module to which the function is added :type module: class """ self._log.debug('Registering [%s] local data frames', module) for func in dir(module): if not func.startswith('_dfg_'): continue dfg_name = func.replace('_dfg_', '') dfg_func = getattr(module, func) self._log.debug(' %s', dfg_name) setattr(self.data_frame, dfg_name, dfg_func) def setXTimeRange(self, t_min=None, t_max=None): """ Set x axis time range to the specified values. :param t_min: lower bound :type t_min: int or float :param t_max: upper bound :type t_max: int or float """ self.x_min = t_min if t_min is not None else self.start_time self.x_max = t_max if t_max is not None else self.start_time + self.time_range self._log.debug('Set plots time range to (%.6f, %.6f)[s]', self.x_min, self.x_max) def __registerTraceEvents(self, events): """ Save a copy of the parsed events. :param events: single event name or list of events names :type events: str or list(str) """ if isinstance(events, basestring): self.events = events.split(' ') elif isinstance(events, list): self.events = events else: raise ValueError('Events must be a string or a list of strings') # Register devlib fake cpu_frequency events if 'cpu_frequency' in events: self.events.append('cpu_frequency_devlib') def __parseTrace(self, path, tasks, window, trace_format): """ Internal method in charge of performing the actual parsing of the trace. :param path: path to the trace folder (or trace file) :type path: str :param tasks: filter data for the specified tasks only :type tasks: list(str) :param window: time window to consider when parsing the trace :type window: tuple(int, int) :param trace_format: format of the trace. Possible values are: - FTrace - SysTrace :type trace_format: str """ self._log.debug('Loading [sched] events from trace in [%s]...', path) self._log.debug('Parsing events: %s', self.events if self.events else 'ALL') if trace_format.upper() == 'SYSTRACE' or path.endswith('html'): self._log.debug('Parsing SysTrace format...') trace_class = trappy.SysTrace self.trace_format = 'SysTrace' elif trace_format.upper() == 'FTRACE': self._log.debug('Parsing FTrace format...') trace_class = trappy.FTrace self.trace_format = 'FTrace' else: raise ValueError("Unknown trace format {}".format(trace_format)) # If using normalized time, we should use # TRAPpy's `abs_window` instead of `window` window_kw = {} if self.normalize_time: window_kw['window'] = window else: window_kw['abs_window'] = window scope = 'custom' if self.events else 'all' self.ftrace = trace_class(path, scope=scope, events=self.events, normalize_time=self.normalize_time, **window_kw) # Load Functions profiling data has_function_stats = self._loadFunctionsStats(path) # Check for events available on the parsed trace self.__checkAvailableEvents() if len(self.available_events) == 0: if has_function_stats: self._log.info('Trace contains only functions stats') return raise ValueError('The trace does not contain useful events ' 'nor function stats') # Index PIDs and Task names self.__loadTasksNames(tasks) self.__computeTimeSpan() # Sanitize cgroup info if any self._sanitize_CgroupAttachTask() # Setup internal data reference to interesting events/dataframes self._sanitize_SchedOverutilized() # Santization not possible if platform missing if not self.platform: # Setup internal data reference to interesting events/dataframes self._sanitize_SchedLoadAvgCpu() self._sanitize_SchedLoadAvgTask() self._sanitize_SchedCpuCapacity() self._sanitize_SchedBoostCpu() self._sanitize_SchedBoostTask() self._sanitize_SchedEnergyDiff() self._sanitize_CpuFrequency() def __checkAvailableEvents(self, key=""): """ Internal method used to build a list of available events. :param key: key to be used for TRAPpy filtering :type key: str """ for val in self.ftrace.get_filters(key): obj = getattr(self.ftrace, val) if len(obj.data_frame): self.available_events.append(val) self._log.debug('Events found on trace:') for evt in self.available_events: self._log.debug(' - %s', evt) def __loadTasksNames(self, tasks): """ Try to load tasks names using one of the supported events. :param tasks: list of task names. If None, load all tasks found. :type tasks: list(str) or NoneType """ def load(tasks, event, name_key, pid_key): df = self._dfg_trace_event(event) if tasks is None: tasks = df[name_key].unique() self._scanTasks(df, name_key=name_key, pid_key=pid_key) self._scanTgids(df) if 'sched_switch' in self.available_events: load(tasks, 'sched_switch', 'prev_comm', 'prev_pid') return if 'sched_load_avg_task' in self.available_events: load(tasks, 'sched_load_avg_task', 'comm', 'pid') return self._log.warning('Failed to load tasks names from trace events') def hasEvents(self, dataset): """ Returns True if the specified event is present in the parsed trace, False otherwise. :param dataset: trace event name or list of trace events :type dataset: str or list(str) """ if dataset in self.available_events: return True return False def __computeTimeSpan(self): """ Compute time axis range, considering all the parsed events. """ self.start_time = 0 if self.normalize_time else self.ftrace.basetime self.time_range = self.ftrace.get_duration() self._log.debug('Collected events spans a %.3f [s] time interval', self.time_range) self.setXTimeRange(self.window[0], self.window[1]) def _scanTgids(self, df): if not '__tgid' in df.columns: return df = df[['__pid', '__tgid']] df = df.drop_duplicates(keep='first').set_index('__pid') df.rename(columns = { '__pid': 'pid', '__tgid': 'tgid' }, inplace=True) self._pid_tgid = df def _scanTasks(self, df, name_key='comm', pid_key='pid'): """ Extract tasks names and PIDs from the input data frame. The data frame should contain a task name column and PID column. :param df: data frame containing trace events from which tasks names and PIDs will be extracted :type df: :mod:`pandas.DataFrame` :param name_key: The name of the dataframe columns containing task names :type name_key: str :param pid_key: The name of the dataframe columns containing task PIDs :type pid_key: str """ df = df[[name_key, pid_key]] self._tasks_by_name = df.set_index(name_key) self._tasks_by_pid = (df.drop_duplicates(subset=pid_key, keep='last') .rename(columns={ pid_key : 'PID', name_key : 'TaskName'}) .set_index('PID').sort_index()) def getTaskByName(self, name): """ Get the PIDs of all tasks with the specified name. The same PID can have different task names, mainly because once a task is generated it inherits the parent name and then its name is updated to represent what the task really is. This API works under the assumption that a task name is updated at most one time and it always considers the name a task had the last time it has been scheduled for execution in the current trace. :param name: task name :type name: str :return: a list of PID for tasks which name matches the required one, the last time they ran in the current trace """ return (self._tasks_by_pid[self._tasks_by_pid.TaskName == name] .index.tolist()) def getTaskByPid(self, pid): """ Get the name of the task with the specified PID. The same PID can have different task names, mainly because once a task is generated it inherits the parent name and then its name is updated to represent what the task really is. This API works under the assumption that a task name is updated at most one time and it always report the name the task had the last time it has been scheduled for execution in the current trace. :param name: task PID :type name: int :return: the list of names of the tasks whose PID matches the required one, the last time they ran in the current trace """ try: return self._tasks_by_pid.ix[pid].values[0] except KeyError: return None def getTgidFromPid(self, pid): return _pid_tgid.ix[pid].values[0] def getTasks(self, dataframe=None, task_names=None, name_key='comm', pid_key='pid'): """ :return: the name of the task which PID matches the required one, the last time they ran in the current trace """ try: return self._tasks_by_pid.ix[pid].values[0] except KeyError: return None def getTasks(self): """ Get a dictionary of all the tasks in the Trace. :return: a dictionary which maps each PID to the corresponding task name """ return self._tasks_by_pid.TaskName.to_dict() ############################################################################### # DataFrame Getter Methods ############################################################################### def df(self, event): """ Get a dataframe containing all occurrences of the specified trace event in the parsed trace. :param event: Trace event name :type event: str """ warnings.simplefilter('always', DeprecationWarning) #turn off filter warnings.warn("\n\tUse of Trace::df() is deprecated and will be soon removed." "\n\tUse Trace::data_frame.trace_event(event_name) instead.", category=DeprecationWarning) warnings.simplefilter('default', DeprecationWarning) #reset filter return self._dfg_trace_event(event) def _dfg_trace_event(self, event): """ Get a dataframe containing all occurrences of the specified trace event in the parsed trace. :param event: Trace event name :type event: str """ if self.data_dir is None: raise ValueError("trace data not (yet) loaded") if self.ftrace and hasattr(self.ftrace, event): return getattr(self.ftrace, event).data_frame raise ValueError('Event [{}] not supported. ' 'Supported events are: {}' .format(event, self.available_events)) def _dfg_functions_stats(self, functions=None): """ Get a DataFrame of specified kernel functions profile data For each profiled function a DataFrame is returned which reports stats on kernel functions execution time. The reported stats are per-CPU and includes: number of times the function has been executed (hits), average execution time (avg), overall execution time (time) and samples variance (s_2). By default returns a DataFrame of all the functions profiled. :param functions: the name of the function or a list of function names to report :type functions: str or list(str) """ if not hasattr(self, '_functions_stats_df'): return None df = self._functions_stats_df if not functions: return df return df.loc[df.index.get_level_values(1).isin(listify(functions))] # cgroup_attach_task with just merged fake and real events def _cgroup_attach_task(self): cgroup_events = ['cgroup_attach_task', 'cgroup_attach_task_devlib'] df = None if set(cgroup_events).isdisjoint(set(self.available_events)): self._log.error('atleast one of {} is needed for cgroup_attach_task event generation'.format(cgroup_events)) return None for cev in cgroup_events: if not cev in self.available_events: continue cdf = self._dfg_trace_event(cev) cdf = cdf[['__line', 'pid', 'controller', 'cgroup']] if not isinstance(df, pd.DataFrame): df = cdf else: df = pd.concat([cdf, df]) # Always drop na since this DF is used as secondary df.dropna(inplace=True, how='any') return df @memoized def _dfg_cgroup_attach_task(self, controllers = ['schedtune', 'cpuset']): # Since fork doesn't result in attach events, generate fake attach events # The below mechanism doesn't work to propogate nested fork levels: # For ex: # cgroup_attach_task: pid=1166 # fork: pid=1166 child_pid=2222 <-- fake attach generated # fork: pid=2222 child_pid=3333 <-- fake attach not generated def fork_add_cgroup(fdf, cdf, controller): cdf = cdf[cdf['controller'] == controller] ret_df = trappy.utils.merge_dfs(fdf, cdf, pivot='pid') return ret_df if not 'sched_process_fork' in self.available_events: self._log.error('sched_process_fork is mandatory to get proper cgroup_attach events') return None fdf = self._dfg_trace_event('sched_process_fork') forks_len = len(fdf) forkdf = fdf cdf = self._cgroup_attach_task() for idx, c in enumerate(controllers): fdf = fork_add_cgroup(fdf, cdf, c) if (idx != (len(controllers) - 1)): fdf = pd.concat([fdf, forkdf]).sort_values(by='__line') fdf = fdf[['__line', 'child_pid', 'controller', 'cgroup']] fdf.rename(columns = { 'child_pid': 'pid' }, inplace=True) # Always drop na since this DF is used as secondary fdf.dropna(inplace=True, how='any') new_forks_len = len(fdf) / len(controllers) fdf = pd.concat([fdf, cdf]).sort_values(by='__line') if new_forks_len < forks_len: dropped = forks_len - new_forks_len self._log.info("Couldn't attach all forks cgroup with-attach events ({} dropped)".format(dropped)) return fdf @memoized def _dfg_sched_switch_cgroup(self, controllers = ['schedtune', 'cpuset']): def sched_switch_add_cgroup(sdf, cdf, controller, direction): cdf = cdf[cdf['controller'] == controller] ret_df = sdf.rename(columns = { direction + '_pid': 'pid' }) ret_df = trappy.utils.merge_dfs(ret_df, cdf, pivot='pid') ret_df.rename(columns = { 'pid': direction + '_pid' }, inplace=True) ret_df.drop('controller', axis=1, inplace=True) ret_df.rename(columns = { 'cgroup': direction + '_' + controller }, inplace=True) return ret_df if not 'sched_switch' in self.available_events: self._log.error('sched_switch is mandatory to generate sched_switch_cgroup event') return None sdf = self._dfg_trace_event('sched_switch') cdf = self._dfg_cgroup_attach_task() for c in controllers: sdf = sched_switch_add_cgroup(sdf, cdf, c, 'next') sdf = sched_switch_add_cgroup(sdf, cdf, c, 'prev') # Augment with TGID information sdf = sdf.join(self._pid_tgid, on='next_pid').rename(columns = {'tgid': 'next_tgid'}) sdf = sdf.join(self._pid_tgid, on='prev_pid').rename(columns = {'tgid': 'prev_tgid'}) df = self._tasks_by_pid.rename(columns = { 'next_comm': 'comm' }) sdf = sdf.join(df, on='next_tgid').rename(columns = {'TaskName': 'next_tgid_comm'}) sdf = sdf.join(df, on='prev_tgid').rename(columns = {'TaskName': 'prev_tgid_comm'}) return sdf ############################################################################### # Trace Events Sanitize Methods ############################################################################### @property def has_big_little(self): return ('clusters' in self.platform and 'big' in self.platform['clusters'] and 'little' in self.platform['clusters'] and 'nrg_model' in self.platform) def _sanitize_SchedCpuCapacity(self): """ Add more columns to cpu_capacity data frame if the energy model is available and the platform is big.LITTLE. """ if not self.hasEvents('cpu_capacity') \ or 'nrg_model' not in self.platform \ or not self.has_big_little: return df = self._dfg_trace_event('cpu_capacity') # Add column with LITTLE and big CPUs max capacities nrg_model = self.platform['nrg_model'] max_lcap = nrg_model['little']['cpu']['cap_max'] max_bcap = nrg_model['big']['cpu']['cap_max'] df['max_capacity'] = np.select( [df.cpu.isin(self.platform['clusters']['little'])], [max_lcap], max_bcap) # Add LITTLE and big CPUs "tipping point" threshold tip_lcap = 0.8 * max_lcap tip_bcap = 0.8 * max_bcap df['tip_capacity'] = np.select( [df.cpu.isin(self.platform['clusters']['little'])], [tip_lcap], tip_bcap) def _sanitize_SchedLoadAvgCpu(self): """ If necessary, rename certain signal names from v5.0 to v5.1 format. """ if not self.hasEvents('sched_load_avg_cpu'): return df = self._dfg_trace_event('sched_load_avg_cpu') if 'utilization' in df: df.rename(columns={'utilization': 'util_avg'}, inplace=True) df.rename(columns={'load': 'load_avg'}, inplace=True) def _sanitize_SchedLoadAvgTask(self): """ If necessary, rename certain signal names from v5.0 to v5.1 format. """ if not self.hasEvents('sched_load_avg_task'): return df = self._dfg_trace_event('sched_load_avg_task') if 'utilization' in df: df.rename(columns={'utilization': 'util_avg'}, inplace=True) df.rename(columns={'load': 'load_avg'}, inplace=True) df.rename(columns={'avg_period': 'period_contrib'}, inplace=True) df.rename(columns={'runnable_avg_sum': 'load_sum'}, inplace=True) df.rename(columns={'running_avg_sum': 'util_sum'}, inplace=True) if not self.has_big_little: return df['cluster'] = np.select( [df.cpu.isin(self.platform['clusters']['little'])], ['LITTLE'], 'big') if 'nrg_model' not in self.platform: return # Add a column which represents the max capacity of the smallest # clustre which can accomodate the task utilization little_cap = self.platform['nrg_model']['little']['cpu']['cap_max'] big_cap = self.platform['nrg_model']['big']['cpu']['cap_max'] df['min_cluster_cap'] = df.util_avg.map( lambda util_avg: big_cap if util_avg > little_cap else little_cap ) def _sanitize_SchedBoostCpu(self): """ Add a boosted utilization signal as the sum of utilization and margin. Also, if necessary, rename certain signal names from v5.0 to v5.1 format. """ if not self.hasEvents('sched_boost_cpu'): return df = self._dfg_trace_event('sched_boost_cpu') if 'usage' in df: df.rename(columns={'usage': 'util'}, inplace=True) df['boosted_util'] = df['util'] + df['margin'] def _sanitize_SchedBoostTask(self): """ Add a boosted utilization signal as the sum of utilization and margin. Also, if necessary, rename certain signal names from v5.0 to v5.1 format. """ if not self.hasEvents('sched_boost_task'): return df = self._dfg_trace_event('sched_boost_task') if 'utilization' in df: # Convert signals name from to v5.1 format df.rename(columns={'utilization': 'util'}, inplace=True) df['boosted_util'] = df['util'] + df['margin'] def _sanitize_SchedEnergyDiff(self): """ If a energy model is provided, some signals are added to the sched_energy_diff trace event data frame. Also convert between existing field name formats for sched_energy_diff """ if not self.hasEvents('sched_energy_diff') \ or 'nrg_model' not in self.platform \ or not self.has_big_little: return nrg_model = self.platform['nrg_model'] em_lcluster = nrg_model['little']['cluster'] em_bcluster = nrg_model['big']['cluster'] em_lcpu = nrg_model['little']['cpu'] em_bcpu = nrg_model['big']['cpu'] lcpus = len(self.platform['clusters']['little']) bcpus = len(self.platform['clusters']['big']) SCHED_LOAD_SCALE = 1024 power_max = em_lcpu['nrg_max'] * lcpus + em_bcpu['nrg_max'] * bcpus + \ em_lcluster['nrg_max'] + em_bcluster['nrg_max'] self._log.debug( "Maximum estimated system energy: {0:d}".format(power_max)) df = self._dfg_trace_event('sched_energy_diff') translations = {'nrg_d' : 'nrg_diff', 'utl_d' : 'usage_delta', 'payoff' : 'nrg_payoff' } df.rename(columns=translations, inplace=True) df['nrg_diff_pct'] = SCHED_LOAD_SCALE * df.nrg_diff / power_max # Tag columns by usage_delta ccol = df.usage_delta df['usage_delta_group'] = np.select( [ccol < 150, ccol < 400, ccol < 600], ['< 150', '< 400', '< 600'], '>= 600') # Tag columns by nrg_payoff ccol = df.nrg_payoff df['nrg_payoff_group'] = np.select( [ccol > 2e9, ccol > 0, ccol > -2e9], ['Optimal Accept', 'SchedTune Accept', 'SchedTune Reject'], 'Suboptimal Reject') def _sanitize_SchedOverutilized(self): """ Add a column with overutilized status duration. """ if not self.hasEvents('sched_overutilized'): return df = self._dfg_trace_event('sched_overutilized') df['start'] = df.index df['len'] = (df.start - df.start.shift()).fillna(0).shift(-1) df.drop('start', axis=1, inplace=True) # Fix the last event, which will have a NaN duration # Set duration to trace_end - last_event df.loc[df.index[-1], 'len'] = self.start_time + self.time_range - df.index[-1] # Build a stat on trace overutilization df = self._dfg_trace_event('sched_overutilized') self.overutilized_time = df[df.overutilized == 1].len.sum() self.overutilized_prc = 100. * self.overutilized_time / self.time_range self._log.debug('Overutilized time: %.6f [s] (%.3f%% of trace time)', self.overutilized_time, self.overutilized_prc) # Sanitize cgroup information helper def _helper_sanitize_CgroupAttachTask(self, df, allowed_cgroups, controller_id_name): # Drop rows that aren't in the root-id -> name map df = df[df['dst_root'].isin(controller_id_name.keys())] def get_cgroup_name(path, valid_names): name = os.path.basename(path) name = 'root' if not name in valid_names else name return name def get_cgroup_names(rows): ret = [] for r in rows.iterrows(): ret.append(get_cgroup_name(r[1]['dst_path'], allowed_cgroups)) return ret def get_controller_names(rows): ret = [] for r in rows.iterrows(): ret.append(controller_id_name[r[1]['dst_root']]) return ret # Sanitize cgroup names # cgroup column isn't in mainline, add it in # its already added for some out of tree kernels so check first if not 'cgroup' in df.columns: if not 'dst_path' in df.columns: raise RuntimeError('Cant santize cgroup DF, need dst_path') df = df.assign(cgroup = get_cgroup_names) # Sanitize controller names if not 'controller' in df.columns: if not 'dst_root' in df.columns: raise RuntimeError('Cant santize cgroup DF, need dst_path') df = df.assign(controller = get_controller_names) return df def _sanitize_CgroupAttachTask(self): def sanitize_cgroup_event(name): if not name in self.available_events: return df = self._dfg_trace_event(name) if len(df.groupby(level=0).filter(lambda x: len(x) > 1)) > 0: self._log.warning('Timstamp Collisions seen in {} event!'.format(name)) df = self._helper_sanitize_CgroupAttachTask(df, self.cgroup_info['cgroups'], self.cgroup_info['controller_ids']) getattr(self.ftrace, name).data_frame = df sanitize_cgroup_event('cgroup_attach_task') sanitize_cgroup_event('cgroup_attach_task_devlib') def _chunker(self, seq, size): """ Given a data frame or a series, generate a sequence of chunks of the given size. :param seq: data to be split into chunks :type seq: :mod:`pandas.Series` or :mod:`pandas.DataFrame` :param size: size of each chunk :type size: int """ return (seq.iloc[pos:pos + size] for pos in range(0, len(seq), size)) def _sanitize_CpuFrequency(self): """ Verify that all platform reported clusters are frequency coherent (i.e. frequency scaling is performed at a cluster level). """ if not self.hasEvents('cpu_frequency_devlib') \ or 'clusters' not in self.platform: return devlib_freq = self._dfg_trace_event('cpu_frequency_devlib') devlib_freq.rename(columns={'cpu_id':'cpu'}, inplace=True) devlib_freq.rename(columns={'state':'frequency'}, inplace=True) df = self._dfg_trace_event('cpu_frequency') clusters = self.platform['clusters'] # devlib always introduces fake cpu_frequency events, in case the # OS has not generated cpu_frequency envets there are the only # frequency events to report if len(df) == 0: # Register devlib injected events as 'cpu_frequency' events setattr(self.ftrace.cpu_frequency, 'data_frame', devlib_freq) df = devlib_freq self.available_events.append('cpu_frequency') # make sure fake cpu_frequency events are never interleaved with # OS generated events else: if len(devlib_freq) > 0: # Frequencies injection is done in a per-cluster based. # This is based on the assumption that clusters are # frequency choerent. # For each cluster we inject devlib events only if # these events does not overlaps with os-generated ones. # Inject "initial" devlib frequencies os_df = df dl_df = devlib_freq.iloc[:self.platform['cpus_count']] for _,c in self.platform['clusters'].iteritems(): dl_freqs = dl_df[dl_df.cpu.isin(c)] os_freqs = os_df[os_df.cpu.isin(c)] self._log.debug("First freqs for %s:\n%s", c, dl_freqs) # All devlib events "before" os-generated events self._log.debug("Min os freq @: %s", os_freqs.index.min()) if os_freqs.empty or \ os_freqs.index.min() > dl_freqs.index.max(): self._log.debug("Insert devlib freqs for %s", c) df = pd.concat([dl_freqs, df]) # Inject "final" devlib frequencies os_df = df dl_df = devlib_freq.iloc[self.platform['cpus_count']:] for _,c in self.platform['clusters'].iteritems(): dl_freqs = dl_df[dl_df.cpu.isin(c)] os_freqs = os_df[os_df.cpu.isin(c)] self._log.debug("Last freqs for %s:\n%s", c, dl_freqs) # All devlib events "after" os-generated events self._log.debug("Max os freq @: %s", os_freqs.index.max()) if os_freqs.empty or \ os_freqs.index.max() < dl_freqs.index.min(): self._log.debug("Append devlib freqs for %s", c) df = pd.concat([df, dl_freqs]) df.sort_index(inplace=True) setattr(self.ftrace.cpu_frequency, 'data_frame', df) # Frequency Coherency Check for _, cpus in clusters.iteritems(): cluster_df = df[df.cpu.isin(cpus)] for chunk in self._chunker(cluster_df, len(cpus)): f = chunk.iloc[0].frequency if any(chunk.frequency != f): self._log.warning('Cluster Frequency is not coherent! ' 'Failure in [cpu_frequency] events at:') self._log.warning(chunk) self.freq_coherency = False return self._log.info('Platform clusters verified to be Frequency coherent') ############################################################################### # Utility Methods ############################################################################### def integrate_square_wave(self, sq_wave): """ Compute the integral of a square wave time series. :param sq_wave: square wave assuming only 1.0 and 0.0 values :type sq_wave: :mod:`pandas.Series` """ sq_wave.iloc[-1] = 0.0 # Compact signal to obtain only 1-0-1-0 sequences comp_sig = sq_wave.loc[sq_wave.shift() != sq_wave] # First value for computing the difference must be a 1 if comp_sig.iloc[0] == 0.0: return sum(comp_sig.iloc[2::2].index - comp_sig.iloc[1:-1:2].index) else: return sum(comp_sig.iloc[1::2].index - comp_sig.iloc[:-1:2].index) def _loadFunctionsStats(self, path='trace.stats'): """ Read functions profiling file and build a data frame containing all relevant data. :param path: path to the functions profiling trace file :type path: str """ if os.path.isdir(path): path = os.path.join(path, 'trace.stats') if (path.endswith('dat') or path.endswith('txt') or path.endswith('html')): pre, ext = os.path.splitext(path) path = pre + '.stats' if not os.path.isfile(path): return False # Opening functions profiling JSON data file self._log.debug('Loading functions profiling data from [%s]...', path) with open(os.path.join(path), 'r') as fh: trace_stats = json.load(fh) # Build DataFrame of function stats frames = {} for cpu, data in trace_stats.iteritems(): frames[int(cpu)] = pd.DataFrame.from_dict(data, orient='index') # Build and keep track of the DataFrame self._functions_stats_df = pd.concat(frames.values(), keys=frames.keys()) return len(self._functions_stats_df) > 0 @memoized def getCPUActiveSignal(self, cpu): """ Build a square wave representing the active (i.e. non-idle) CPU time, i.e.: cpu_active[t] == 1 if the CPU is reported to be non-idle by cpuidle at time t cpu_active[t] == 0 otherwise :param cpu: CPU ID :type cpu: int :returns: A :mod:`pandas.Series` or ``None`` if the trace contains no "cpu_idle" events """ if not self.hasEvents('cpu_idle'): self._log.warning('Events [cpu_idle] not found, ' 'cannot compute CPU active signal!') return None idle_df = self._dfg_trace_event('cpu_idle') cpu_df = idle_df[idle_df.cpu_id == cpu] cpu_active = cpu_df.state.apply( lambda s: 1 if s == NON_IDLE_STATE else 0 ) start_time = 0.0 if not self.ftrace.normalized_time: start_time = self.ftrace.basetime if cpu_active.empty: cpu_active = pd.Series([0], index=[start_time]) elif cpu_active.index[0] != start_time: entry_0 = pd.Series(cpu_active.iloc[0] ^ 1, index=[start_time]) cpu_active = pd.concat([entry_0, cpu_active]) # Fix sequences of wakeup/sleep events reported with the same index return handle_duplicate_index(cpu_active) @memoized def getClusterActiveSignal(self, cluster): """ Build a square wave representing the active (i.e. non-idle) cluster time, i.e.: cluster_active[t] == 1 if at least one CPU is reported to be non-idle by CPUFreq at time t cluster_active[t] == 0 otherwise :param cluster: list of CPU IDs belonging to a cluster :type cluster: list(int) :returns: A :mod:`pandas.Series` or ``None`` if the trace contains no "cpu_idle" events """ if not self.hasEvents('cpu_idle'): self._log.warning('Events [cpu_idle] not found, ' 'cannot compute cluster active signal!') return None active = self.getCPUActiveSignal(cluster[0]).to_frame(name=cluster[0]) for cpu in cluster[1:]: active = active.join( self.getCPUActiveSignal(cpu).to_frame(name=cpu), how='outer' ) active.fillna(method='ffill', inplace=True) # There might be NaNs in the signal where we got data from some CPUs # before others. That will break the .astype(int) below, so drop rows # with NaN in them. active.dropna(inplace=True) # Cluster active is the OR between the actives on each CPU # belonging to that specific cluster cluster_active = reduce( operator.or_, [cpu_active.astype(int) for _, cpu_active in active.iteritems()] ) return cluster_active class TraceData: """ A DataFrame collector exposed to Trace's clients """ pass # vim :set tabstop=4 shiftwidth=4 expandtab