# SPDX-License-Identifier: Apache-2.0
#
# Copyright (C) 2015, ARM Limited and contributors.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
""" Tasks Analysis Module """
import matplotlib.gridspec as gridspec
import matplotlib.pyplot as plt
import numpy as np
import pylab as pl
import re
from analysis_module import AnalysisModule
from devlib.utils.misc import memoized
from trappy.utils import listify
class TasksAnalysis(AnalysisModule):
"""
Support for Tasks signals analysis.
:param trace: input Trace object
:type trace: :mod:`libs.utils.Trace`
"""
def __init__(self, trace):
super(TasksAnalysis, self).__init__(trace)
###############################################################################
# DataFrame Getter Methods
###############################################################################
def _dfg_top_big_tasks(self, min_samples=100, min_utilization=None):
"""
Tasks which had 'utilization' samples bigger than the specified
threshold
:param min_samples: minumum number of samples over the min_utilization
:type min_samples: int
:param min_utilization: minimum utilization used to filter samples
default: capacity of a little cluster
:type min_utilization: int
"""
if not self._trace.hasEvents('sched_load_avg_task'):
self._log.warning('Events [sched_load_avg_task] not found')
return None
if min_utilization is None:
min_utilization = self._little_cap
# Get utilization samples >= min_utilization
df = self._dfg_trace_event('sched_load_avg_task')
big_tasks_events = df[df.util_avg > min_utilization]
if not len(big_tasks_events):
self._log.warning('No tasks with with utilization samples > %d',
min_utilization)
return None
# Report the number of tasks which match the min_utilization condition
big_tasks = big_tasks_events.pid.unique()
self._log.info('%5d tasks with samples of utilization > %d',
len(big_tasks), min_utilization)
# Compute number of samples above threshold
big_tasks_stats = big_tasks_events.groupby('pid')\
.describe(include=['object'])
big_tasks_stats = big_tasks_stats.unstack()['comm']\
.sort_values(by=['count'], ascending=False)
# Filter for number of occurrences
big_tasks_stats = big_tasks_stats[big_tasks_stats['count'] > min_samples]
if not len(big_tasks_stats):
self._log.warning(' but none with more than %d samples',
min_samples)
return None
self._log.info(' %d with more than %d samples',
len(big_tasks_stats), min_samples)
# Add task name column
big_tasks_stats['comm'] = big_tasks_stats.index.map(
lambda pid: self._trace.getTaskByPid(pid))
# Filter columns of interest
big_tasks_stats = big_tasks_stats[['count', 'comm']]
big_tasks_stats.rename(columns={'count': 'samples'}, inplace=True)
return big_tasks_stats
def _dfg_top_wakeup_tasks(self, min_wakeups=100):
"""
Tasks which wakeup more frequently than a specified threshold.
:param min_wakeups: minimum number of wakeups
:type min_wakeups: int
"""
if not self._trace.hasEvents('sched_wakeup'):
self._log.warning('Events [sched_wakeup] not found')
return None
df = self._dfg_trace_event('sched_wakeup')
# Compute number of wakeups above threshold
wkp_tasks_stats = df.groupby('pid').describe(include=['object'])
wkp_tasks_stats = wkp_tasks_stats.unstack()['comm']\
.sort_values(by=['count'], ascending=False)
# Filter for number of occurrences
wkp_tasks_stats = wkp_tasks_stats[
wkp_tasks_stats['count'] > min_wakeups]
if not len(df):
self._log.warning('No tasks with more than %d wakeups',
len(wkp_tasks_stats))
return None
self._log.info('%5d tasks with more than %d wakeups',
len(df), len(wkp_tasks_stats))
# Add task name column
wkp_tasks_stats['comm'] = wkp_tasks_stats.index.map(
lambda pid: self._trace.getTaskByPid(pid))
# Filter columns of interest
wkp_tasks_stats = wkp_tasks_stats[['count', 'comm']]
wkp_tasks_stats.rename(columns={'count': 'samples'}, inplace=True)
return wkp_tasks_stats
def _dfg_rt_tasks(self, min_prio=100):
"""
Tasks with RT priority
NOTE: priorities uses scheduler values, thus: the lower the value the
higher is the task priority.
RT Priorities: [ 0..100]
FAIR Priorities: [101..120]
:param min_prio: minumum priority
:type min_prio: int
"""
if not self._trace.hasEvents('sched_switch'):
self._log.warning('Events [sched_switch] not found')
return None
df = self._dfg_trace_event('sched_switch')
# Filters tasks which have a priority bigger than threshold
df = df[df.next_prio <= min_prio]
# Filter columns of interest
rt_tasks = df[['next_pid', 'next_prio']]
# Remove all duplicateds
rt_tasks = rt_tasks.drop_duplicates()
# Order by priority
rt_tasks.sort_values(by=['next_prio', 'next_pid'], ascending=True,
inplace=True)
rt_tasks.rename(columns={'next_pid': 'pid', 'next_prio': 'prio'},
inplace=True)
# Set PID as index
rt_tasks.set_index('pid', inplace=True)
# Add task name column
rt_tasks['comm'] = rt_tasks.index.map(
lambda pid: self._trace.getTaskByPid(pid))
return rt_tasks
###############################################################################
# Plotting Methods
###############################################################################
def plotTasks(self, tasks, signals=None):
"""
Generate a common set of useful plots for each of the specified tasks
This method allows to filter which signals should be plot, if data are
available in the input trace. The list of signals supported are:
Tasks signals plot:
load_avg, util_avg, boosted_util, sched_overutilized
Tasks residencies on CPUs:
residencies, sched_overutilized
Tasks PELT signals:
load_sum, util_sum, period_contrib, sched_overutilized
At least one of the previous signals must be specified to get a valid
plot.
Addidional custom signals can be specified and they will be represented
in the "Task signals plots" if they represent valid keys of the task
load/utilization trace event (e.g. sched_load_avg_task).
Note:
sched_overutilized: enable the plotting of overutilization bands on
top of each subplot
residencies: enable the generation of the CPUs residencies plot
:param tasks: the list of task names and/or PIDs to plot.
Numerical PIDs and string task names can be mixed
in the same list.
:type tasks: list(str) or list(int)
:param signals: list of signals (and thus plots) to generate
default: all the plots and signals available in the
current trace
:type signals: list(str)
"""
if not signals:
signals = ['load_avg', 'util_avg', 'boosted_util',
'sched_overutilized',
'load_sum', 'util_sum', 'period_contrib',
'residencies']
# Check for the minimum required signals to be available
if not self._trace.hasEvents('sched_load_avg_task'):
self._log.warning('Events [sched_load_avg_task] not found, '
'plot DISABLED!')
return
# Defined list of tasks to plot
if tasks and \
not isinstance(tasks, str) and \
not isinstance(tasks, list):
raise ValueError('Wrong format for tasks parameter')
if tasks:
tasks_to_plot = listify(tasks)
else:
raise ValueError('No tasks to plot specified')
# Compute number of plots to produce
plots_count = 0
plots_signals = [
# Fist plot: task's utilization
{'load_avg', 'util_avg', 'boosted_util'},
# Second plot: task residency
{'residencies'},
# Third plot: tasks's load
{'load_sum', 'util_sum', 'period_contrib'}
]
hr = []
ysize = 0
for plot_id, signals_to_plot in enumerate(plots_signals):
signals_to_plot = signals_to_plot.intersection(signals)
if len(signals_to_plot):
plots_count = plots_count + 1
# Use bigger size only for the first plot
hr.append(3 if plot_id == 0 else 1)
ysize = ysize + (8 if plot_id else 4)
# Grid
gs = gridspec.GridSpec(plots_count, 1, height_ratios=hr)
gs.update(wspace=0.1, hspace=0.1)
# Build list of all PIDs for each task_name to plot
pids_to_plot = []
for task in tasks_to_plot:
# Add specified PIDs to the list
if isinstance(task, int):
pids_to_plot.append(task)
continue
# Otherwise: add all the PIDs for task with the specified name
pids_to_plot.extend(self._trace.getTaskByName(task))
for tid in pids_to_plot:
savefig = False
task_name = self._trace.getTaskByPid(tid)
self._log.info('Plotting [%d:%s]...', tid, task_name)
plot_id = 0
# For each task create a figure with plots_count plots
plt.figure(figsize=(16, ysize))
plt.suptitle('Task Signals',
y=.94, fontsize=16, horizontalalignment='center')
# Plot load and utilization
signals_to_plot = {'load_avg', 'util_avg', 'boosted_util'}
signals_to_plot = list(signals_to_plot.intersection(signals))
if len(signals_to_plot) > 0:
axes = plt.subplot(gs[plot_id, 0])
axes.set_title('Task [{0:d}:{1:s}] Signals'
.format(tid, task_name))
plot_id = plot_id + 1
is_last = (plot_id == plots_count)
self._plotTaskSignals(axes, tid, signals, is_last)
savefig = True
# Plot CPUs residency
signals_to_plot = {'residencies'}
signals_to_plot = list(signals_to_plot.intersection(signals))
if len(signals_to_plot) > 0:
axes = plt.subplot(gs[plot_id, 0])
axes.set_title(
'Task [{0:d}:{1:s}] Residency (green: LITTLE, red: big)'
.format(tid, task_name)
)
plot_id = plot_id + 1
is_last = (plot_id == plots_count)
if 'sched_overutilized' in signals:
signals_to_plot.append('sched_overutilized')
self._plotTaskResidencies(axes, tid, signals_to_plot, is_last)
savefig = True
# Plot PELT signals
signals_to_plot = {'load_sum', 'util_sum', 'period_contrib'}
signals_to_plot = list(signals_to_plot.intersection(signals))
if len(signals_to_plot) > 0:
axes = plt.subplot(gs[plot_id, 0])
axes.set_title('Task [{0:d}:{1:s}] PELT Signals'
.format(tid, task_name))
plot_id = plot_id + 1
if 'sched_overutilized' in signals:
signals_to_plot.append('sched_overutilized')
self._plotTaskPelt(axes, tid, signals_to_plot)
savefig = True
if not savefig:
self._log.warning('Nothing to plot for %s', task_name)
continue
# Save generated plots into datadir
if isinstance(task_name, list):
task_name = re.sub('[:/]', '_', task_name[0])
else:
task_name = re.sub('[:/]', '_', task_name)
figname = '{}/{}task_util_{}_{}.png'\
.format(self._trace.plots_dir, self._trace.plots_prefix,
tid, task_name)
pl.savefig(figname, bbox_inches='tight')
def plotBigTasks(self, max_tasks=10, min_samples=100,
min_utilization=None):
"""
For each big task plot utilization and show the smallest cluster
capacity suitable for accommodating task utilization.
:param max_tasks: maximum number of tasks to consider
:type max_tasks: int
:param min_samples: minumum number of samples over the min_utilization
:type min_samples: int
:param min_utilization: minimum utilization used to filter samples
default: capacity of a little cluster
:type min_utilization: int
"""
# Get PID of big tasks
big_frequent_task_df = self._dfg_top_big_tasks(
min_samples, min_utilization)
if max_tasks > 0:
big_frequent_task_df = big_frequent_task_df.head(max_tasks)
big_frequent_task_pids = big_frequent_task_df.index.values
big_frequent_tasks_count = len(big_frequent_task_pids)
if big_frequent_tasks_count == 0:
self._log.warning('No big/frequent tasks to plot')
return
# Get the list of events for all big frequent tasks
df = self._dfg_trace_event('sched_load_avg_task')
big_frequent_tasks_events = df[df.pid.isin(big_frequent_task_pids)]
# Define axes for side-by-side plottings
fig, axes = plt.subplots(big_frequent_tasks_count, 1,
figsize=(16, big_frequent_tasks_count*4))
plt.subplots_adjust(wspace=0.1, hspace=0.2)
plot_idx = 0
for pid, group in big_frequent_tasks_events.groupby('pid'):
# # Build task names (there could be multiple, during the task lifetime)
task_name = 'Task [%d:%s]'.format(pid, self._trace.getTaskByPid(pid))
# Plot title
if big_frequent_tasks_count == 1:
ax = axes
else:
ax = axes[plot_idx]
ax.set_title(task_name)
# Left axis: utilization
ax = group.plot(y=['util_avg', 'min_cluster_cap'],
style=['r.', '-b'],
drawstyle='steps-post',
linewidth=1,
ax=ax)
ax.set_xlim(self._trace.x_min, self._trace.x_max)
ax.set_ylim(0, 1100)
ax.set_ylabel('util_avg')
ax.set_xlabel('')
ax.grid(True)
self._trace.analysis.status.plotOverutilized(ax)
plot_idx += 1
ax.set_xlabel('Time [s]')
self._log.info('Tasks which have been a "utilization" of %d for at least %d samples',
self._little_cap, min_samples)
def plotWakeupTasks(self, max_tasks=10, min_wakeups=0, per_cluster=False):
"""
Show waking up tasks over time and newly forked tasks in two separate
plots.
:param max_tasks: maximum number of tasks to consider
:param max_tasks: int
:param min_wakeups: minimum number of wakeups of each task
:type min_wakeups: int
:param per_cluster: if True get per-cluster wakeup events
:type per_cluster: bool
"""
if per_cluster is True and \
not self._trace.hasEvents('sched_wakeup_new'):
self._log.warning('Events [sched_wakeup_new] not found, '
'plots DISABLED!')
return
elif not self._trace.hasEvents('sched_wakeup') and \
not self._trace.hasEvents('sched_wakeup_new'):
self._log.warning('Events [sched_wakeup, sched_wakeup_new] not found, '
'plots DISABLED!')
return
# Define axes for side-by-side plottings
fig, axes = plt.subplots(2, 1, figsize=(14, 5))
plt.subplots_adjust(wspace=0.2, hspace=0.3)
if per_cluster:
# Get per cluster wakeup events
df = self._dfg_trace_event('sched_wakeup_new')
big_frequent = df.target_cpu.isin(self._big_cpus)
ntbc = df[big_frequent]
ntbc_count = len(ntbc)
little_frequent = df.target_cpu.isin(self._little_cpus)
ntlc = df[little_frequent];
ntlc_count = len(ntlc)
self._log.info('%5d tasks forked on big cluster (%3.1f %%)',
ntbc_count,
100. * ntbc_count / (ntbc_count + ntlc_count))
self._log.info('%5d tasks forked on LITTLE cluster (%3.1f %%)',
ntlc_count,
100. * ntlc_count / (ntbc_count + ntlc_count))
ax = axes[0]
ax.set_title('Tasks Forks on big CPUs');
ntbc.pid.plot(style=['g.'], ax=ax);
ax.set_xlim(self._trace.x_min, self._trace.x_max);
ax.set_xticklabels([])
ax.set_xlabel('')
ax.grid(True)
self._trace.analysis.status.plotOverutilized(ax)
ax = axes[1]
ax.set_title('Tasks Forks on LITTLE CPUs');
ntlc.pid.plot(style=['g.'], ax=ax);
ax.set_xlim(self._trace.x_min, self._trace.x_max);
ax.grid(True)
self._trace.analysis.status.plotOverutilized(ax)
return
# Keep events of defined big tasks
wkp_task_pids = self._dfg_top_wakeup_tasks(min_wakeups)
if len(wkp_task_pids):
wkp_task_pids = wkp_task_pids.index.values[:max_tasks]
self._log.info('Plotting %d frequent wakeup tasks',
len(wkp_task_pids))
ax = axes[0]
ax.set_title('Tasks WakeUps Events')
df = self._dfg_trace_event('sched_wakeup')
if len(df):
df = df[df.pid.isin(wkp_task_pids)]
df.pid.astype(int).plot(style=['b.'], ax=ax)
ax.set_xlim(self._trace.x_min, self._trace.x_max)
ax.set_xticklabels([])
ax.set_xlabel('')
ax.grid(True)
self._trace.analysis.status.plotOverutilized(ax)
ax = axes[1]
ax.set_title('Tasks Forks Events')
df = self._dfg_trace_event('sched_wakeup_new')
if len(df):
df = df[df.pid.isin(wkp_task_pids)]
df.pid.astype(int).plot(style=['r.'], ax=ax)
ax.set_xlim(self._trace.x_min, self._trace.x_max)
ax.grid(True)
self._trace.analysis.status.plotOverutilized(ax)
def plotBigTasksVsCapacity(self, min_samples=1,
min_utilization=None, big_cluster=True):
"""
Draw a plot that shows whether tasks are placed on the correct cluster
based on their utilization and cluster capacity. Green dots mean the
task was placed on the correct cluster, Red means placement was wrong
:param min_samples: minumum number of samples over the min_utilization
:type min_samples: int
:param min_utilization: minimum utilization used to filter samples
default: capacity of a little cluster
:type min_utilization: int
:param big_cluster:
:type big_cluster: bool
"""
if not self._trace.hasEvents('sched_load_avg_task'):
self._log.warning('Events [sched_load_avg_task] not found')
return
if not self._trace.hasEvents('cpu_frequency'):
self._log.warning('Events [cpu_frequency] not found')
return
if big_cluster:
cluster_correct = 'big'
cpus = self._big_cpus
else:
cluster_correct = 'LITTLE'
cpus = self._little_cpus
# Get all utilization update events
df = self._dfg_trace_event('sched_load_avg_task')
# Keep events of defined big tasks
big_task_pids = self._dfg_top_big_tasks(
min_samples, min_utilization)
if big_task_pids is not None:
big_task_pids = big_task_pids.index.values
df = df[df.pid.isin(big_task_pids)]
if not df.size:
self._log.warning('No events for tasks with more then %d utilization '
'samples bigger than %d, plots DISABLED!')
return
fig, axes = plt.subplots(2, 1, figsize=(14, 5))
plt.subplots_adjust(wspace=0.2, hspace=0.3)
# Add column of expected cluster depending on:
# a) task utilization value
# b) capacity of the selected cluster
bu_bc = ((df['util_avg'] > self._little_cap) &
(df['cpu'].isin(self._big_cpus)))
su_lc = ((df['util_avg'] <= self._little_cap) &
(df['cpu'].isin(self._little_cpus)))
# The Cluster CAPacity Matches the UTILization (ccap_mutil) iff:
# - tasks with util_avg > little_cap are running on a BIG cpu
# - tasks with util_avg <= little_cap are running on a LITTLe cpu
df.loc[:,'ccap_mutil'] = np.select([(bu_bc | su_lc)], [True], False)
df_freq = self._dfg_trace_event('cpu_frequency')
df_freq = df_freq[df_freq.cpu == cpus[0]]
ax = axes[0]
ax.set_title('Tasks Utilization vs Allocation')
for ucolor, umatch in zip('gr', [True, False]):
cdata = df[df['ccap_mutil'] == umatch]
if len(cdata) > 0:
cdata['util_avg'].plot(ax=ax,
style=[ucolor+'.'], legend=False)
ax.set_xlim(self._trace.x_min, self._trace.x_max)
ax.set_xticklabels([])
ax.set_xlabel('')
ax.grid(True)
self._trace.analysis.status.plotOverutilized(ax)
ax = axes[1]
ax.set_title('Frequencies on "{}" cluster'.format(cluster_correct))
df_freq['frequency'].plot(style=['-b'], ax=ax, drawstyle='steps-post')
ax.set_xlim(self._trace.x_min, self._trace.x_max);
ax.grid(True)
self._trace.analysis.status.plotOverutilized(ax)
legend_y = axes[0].get_ylim()[1]
axes[0].annotate('Utilization-Capacity Matches',
xy=(0, legend_y),
xytext=(-50, 45), textcoords='offset points',
fontsize=18)
axes[0].annotate('Task schduled (green) or not (red) on min cluster',
xy=(0, legend_y),
xytext=(-50, 25), textcoords='offset points',
fontsize=14)
###############################################################################
# Utility Methods
###############################################################################
def _plotTaskSignals(self, axes, tid, signals, is_last=False):
"""
For task with ID `tid` plot the specified signals.
:param axes: axes over which to generate the plot
:type axes: :mod:`matplotlib.axes.Axes`
:param tid: task ID
:type tid: int
:param signals: signals to be plot
:param signals: list(str)
:param is_last: if True this is the last plot
:type is_last: bool
"""
# Get dataframe for the required task
util_df = self._dfg_trace_event('sched_load_avg_task')
# Plot load and util
signals_to_plot = set(signals).difference({'boosted_util'})
for signal in signals_to_plot:
if signal not in util_df.columns:
continue
data = util_df[util_df.pid == tid][signal]
data.plot(ax=axes, drawstyle='steps-post', legend=True)
# Plot boost utilization if available
if 'boosted_util' in signals and \
self._trace.hasEvents('sched_boost_task'):
boost_df = self._dfg_trace_event('sched_boost_task')
data = boost_df[boost_df.pid == tid][['boosted_util']]
if len(data):
data.plot(ax=axes, style=['y-'], drawstyle='steps-post')
else:
task_name = self._trace.getTaskByPid(tid)
self._log.warning('No "boosted_util" data for task [%d:%s]',
tid, task_name)
# Add Capacities data if avilable
if 'nrg_model' in self._platform:
nrg_model = self._platform['nrg_model']
max_lcap = nrg_model['little']['cpu']['cap_max']
max_bcap = nrg_model['big']['cpu']['cap_max']
tip_lcap = 0.8 * max_lcap
tip_bcap = 0.8 * max_bcap
self._log.debug(
'LITTLE capacity tip/max: %d/%d, big capacity tip/max: %d/%d',
tip_lcap, max_lcap, tip_bcap, max_bcap
)
axes.axhline(tip_lcap, color='y', linestyle=':', linewidth=2)
axes.axhline(max_lcap, color='y', linestyle='--', linewidth=2)
axes.axhline(tip_bcap, color='r', linestyle=':', linewidth=2)
axes.axhline(max_bcap, color='r', linestyle='--', linewidth=2)
axes.set_ylim(0, 1100)
axes.set_xlim(self._trace.x_min, self._trace.x_max)
axes.grid(True)
if not is_last:
axes.set_xticklabels([])
axes.set_xlabel('')
if 'sched_overutilized' in signals:
self._trace.analysis.status.plotOverutilized(axes)
def _plotTaskResidencies(self, axes, tid, signals, is_last=False):
"""
For task with ID `tid` plot residency information.
:param axes: axes over which to generate the plot
:type axes: :mod:`matplotlib.axes.Axes`
:param tid: task ID
:type tid: int
:param signals: signals to be plot
:param signals: list(str)
:param is_last: if True this is the last plot
:type is_last: bool
"""
util_df = self._dfg_trace_event('sched_load_avg_task')
if 'cluster' in util_df:
data = util_df[util_df.pid == tid][['cluster', 'cpu']]
for ccolor, clabel in zip('gr', ['LITTLE', 'big']):
cdata = data[data.cluster == clabel]
if len(cdata) > 0:
cdata.plot(ax=axes, style=[ccolor+'+'], legend=False)
# Y Axis - placeholders for legend, acutal CPUs. topmost empty lane
cpus = [str(n) for n in range(self._platform['cpus_count'])]
ylabels = [''] + cpus
axes.set_yticklabels(ylabels)
axes.set_ylim(-1, len(cpus))
axes.set_ylabel('CPUs')
# X Axis
axes.set_xlim(self._trace.x_min, self._trace.x_max)
axes.grid(True)
if not is_last:
axes.set_xticklabels([])
axes.set_xlabel('')
if 'sched_overutilized' in signals:
self._trace.analysis.status.plotOverutilized(axes)
def _plotTaskPelt(self, axes, tid, signals):
"""
For task with ID `tid` plot PELT-related signals.
:param axes: axes over which to generate the plot
:type axes: :mod:`matplotlib.axes.Axes`
:param tid: task ID
:type tid: int
:param signals: signals to be plot
:param signals: list(str)
"""
util_df = self._dfg_trace_event('sched_load_avg_task')
data = util_df[util_df.pid == tid][['load_sum',
'util_sum',
'period_contrib']]
data.plot(ax=axes, drawstyle='steps-post')
axes.set_xlim(self._trace.x_min, self._trace.x_max)
axes.ticklabel_format(style='scientific', scilimits=(0, 0),
axis='y', useOffset=False)
axes.grid(True)
if 'sched_overutilized' in signals:
self._trace.analysis.status.plotOverutilized(axes)
# vim :set tabstop=4 shiftwidth=4 expandtab