# Copyright 2015-2017 ARM Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import matplotlib
import os
import pandas as pd
import re
import shutil
import subprocess
import tempfile
import time
import unittest
from test_thermal import BaseTestThermal
import trappy
import utils_tests
class TestFTrace(BaseTestThermal):
def __init__(self, *args, **kwargs):
super(TestFTrace, self).__init__(*args, **kwargs)
self.map_label = {"00000000,00000006": "A57", "00000000,00000039": "A53"}
def test_ftrace_has_all_classes(self):
"""The FTrace() class has members for all classes"""
trace = trappy.FTrace()
for attr in trace.class_definitions.iterkeys():
self.assertTrue(hasattr(trace, attr))
def test_ftrace_has_all_classes_scope_all(self):
"""The FTrace() class has members for all classes with scope=all"""
trace = trappy.FTrace(scope="all")
for attr in trace.class_definitions.iterkeys():
self.assertTrue(hasattr(trace, attr))
def test_ftrace_has_all_classes_scope_thermal(self):
"""The FTrace() class has only members for thermal classes with scope=thermal"""
trace = trappy.FTrace(scope="thermal")
for attr in trace.thermal_classes.iterkeys():
self.assertTrue(hasattr(trace, attr))
for attr in trace.sched_classes.iterkeys():
self.assertFalse(hasattr(trace, attr))
def test_ftrace_has_all_classes_scope_sched(self):
"""The FTrace() class has only members for sched classes with scope=sched"""
trace = trappy.FTrace(scope="sched")
for attr in trace.thermal_classes.iterkeys():
self.assertFalse(hasattr(trace, attr))
for attr in trace.sched_classes.iterkeys():
self.assertTrue(hasattr(trace, attr))
def test_ftrace_has_no_classes_scope_dynamic(self):
"""The FTrace() class has only dynamically registered classes with scope=custom"""
trace = trappy.FTrace(scope="custom")
for attr in trace.thermal_classes.iterkeys():
self.assertFalse(hasattr(trace, attr))
for attr in trace.sched_classes.iterkeys():
self.assertFalse(hasattr(trace, attr))
ftrace_parser = trappy.register_dynamic_ftrace("ADynamicEvent",
"a_dynamic_event")
trace = trappy.FTrace(scope="custom")
self.assertTrue(hasattr(trace, "a_dynamic_event"))
trappy.unregister_dynamic_ftrace(ftrace_parser)
def test_ftrace_doesnt_overwrite_parsed_event(self):
"""FTrace().add_parsed_event() should not override an event that's already present"""
trace = trappy.FTrace()
dfr = pd.DataFrame({"temp": [45000, 46724, 45520]},
index=pd.Series([1.020, 1.342, 1.451], name="Time"))
with self.assertRaises(ValueError):
trace.add_parsed_event("sched_switch", dfr)
def test_fail_if_no_trace_dat(self):
"""Raise an IOError with the path if there's no trace.dat and trace.txt"""
os.remove("trace.txt")
self.assertRaises(IOError, trappy.FTrace)
cwd = os.getcwd()
try:
trappy.FTrace(cwd)
except IOError as exception:
pass
self.assertTrue(cwd in str(exception))
def test_other_directory(self):
"""FTrace() can grab the trace.dat from other directories"""
other_random_dir = tempfile.mkdtemp()
os.chdir(other_random_dir)
dfr = trappy.FTrace(self.out_dir).thermal.data_frame
self.assertTrue(len(dfr) > 0)
self.assertEquals(os.getcwd(), other_random_dir)
def test_ftrace_arbitrary_trace_txt(self):
"""FTrace() works if the trace is called something other than trace.txt"""
arbitrary_trace_name = "my_trace.txt"
shutil.move("trace.txt", arbitrary_trace_name)
dfr = trappy.FTrace(arbitrary_trace_name).thermal.data_frame
self.assertTrue(len(dfr) > 0)
self.assertFalse(os.path.exists("trace.txt"))
# As there is no raw trace requested. The mytrace.raw.txt
# Should not have been generated
self.assertFalse(os.path.exists("mytrace.raw.txt"))
def test_ftrace_autonormalize_time(self):
"""FTrace() normalizes by default"""
trace = trappy.FTrace()
self.assertEquals(round(trace.thermal.data_frame.index[0], 7), 0)
def test_ftrace_dont_normalize_time(self):
"""FTrace() doesn't normalize if asked not to"""
trace = trappy.FTrace(normalize_time=False)
self.assertNotEquals(round(trace.thermal.data_frame.index[0], 7), 0)
def test_ftrace_basetime(self):
"""Test that basetime calculation is correct"""
trace = trappy.FTrace(normalize_time=False)
basetime = trace.thermal.data_frame.index[0]
self.assertEqual(trace.basetime, basetime)
def test_ftrace_duration(self):
"""Test get_duration: normalize_time=False"""
trace = trappy.FTrace(normalize_time=True)
duration = trace.thermal_governor.data_frame.index[-1] - trace.thermal.data_frame.index[0]
self.assertEqual(trace.get_duration(), duration)
def test_ftrace_duration_window(self):
"""Test that duration is correct with time window (normalize_time=True)"""
trace = trappy.FTrace(normalize_time=True, window=[1, 5])
duration = trace.thermal_governor.data_frame.index[-1] - trace.thermal.data_frame.index[0]
self.assertEqual(trace.get_duration(), duration)
def test_ftrace_duration_window_not_normalized(self):
"""Test that duration is correct with time window (normalize_time=False)"""
trace = trappy.FTrace(normalize_time=False, window=[1, 5])
duration = trace.thermal_governor.data_frame.index[-1] - trace.thermal.data_frame.index[0]
self.assertEqual(trace.get_duration(), duration)
def test_ftrace_duration_not_normalized(self):
"""Test get_duration: normalize_time=True"""
trace = trappy.FTrace(normalize_time=False)
duration = trace.thermal_governor.data_frame.index[-1] - trace.thermal.data_frame.index[0]
self.assertEqual(trace.get_duration(), duration)
def test_ftrace_normalize_time(self):
"""FTrace()._normalize_time() works accross all classes"""
trace = trappy.FTrace(normalize_time=False)
prev_inpower_basetime = trace.cpu_in_power.data_frame.index[0]
prev_inpower_last = trace.cpu_in_power.data_frame.index[-1]
trace._normalize_time()
self.assertEquals(round(trace.thermal.data_frame.index[0], 7), 0)
exp_inpower_first = prev_inpower_basetime - trace.basetime
self.assertEquals(round(trace.cpu_in_power.data_frame.index[0] - exp_inpower_first, 7), 0)
exp_inpower_last = prev_inpower_last - trace.basetime
self.assertEquals(round(trace.cpu_in_power.data_frame.index[-1] - exp_inpower_last, 7), 0)
def test_ftrace_accepts_events(self):
"""The FTrace class accepts an events parameter with only the parameters interesting for a trace"""
trace = trappy.FTrace(scope="custom", events=["cdev_update"])
self.assertGreater(len(trace.cdev_update.data_frame), 1)
# If you specify events as a string by mistake, trappy does the right thing
trace = trappy.FTrace(scope="custom", events="foo")
self.assertTrue(hasattr(trace, "foo"))
def test_ftrace_already_registered_events_are_not_registered_again(self):
"""FTrace(events="foo") uses class for foo if it is a known class for trappy"""
events = ["sched_switch", "sched_load_avg_sg"]
trace = trappy.FTrace(scope="custom", events=events)
self.assertTrue(trace.sched_switch.parse_raw)
self.assertEquals(trace.sched_load_avg_sg.pivot, "cpus")
def test_get_all_freqs_data(self):
"""Test get_all_freqs_data()"""
allfreqs = trappy.FTrace().get_all_freqs_data(self.map_label)
self.assertEquals(allfreqs[1][1]["A53_freq_out"].iloc[3], 850)
self.assertEquals(allfreqs[1][1]["A53_freq_in"].iloc[1], 850)
self.assertEquals(allfreqs[0][1]["A57_freq_out"].iloc[2], 1100)
self.assertTrue("gpu_freq_in" in allfreqs[2][1].columns)
# Make sure there are no NaNs in the middle of the array
self.assertTrue(allfreqs[0][1]["A57_freq_in"].notnull().all())
def test_apply_callbacks(self):
"""Test apply_callbacks()"""
counts = {
"cpu_in_power": 0,
"cpu_out_power": 0
}
def cpu_in_power_fn(data):
counts["cpu_in_power"] += 1
def cpu_out_power_fn(data):
counts["cpu_out_power"] += 1
fn_map = {
"cpu_in_power": cpu_in_power_fn,
"cpu_out_power": cpu_out_power_fn
}
trace = trappy.FTrace()
trace.apply_callbacks(fn_map)
self.assertEqual(counts["cpu_in_power"], 134)
self.assertEqual(counts["cpu_out_power"], 134)
def test_plot_freq_hists(self):
"""Test that plot_freq_hists() doesn't bomb"""
trace = trappy.FTrace()
_, axis = matplotlib.pyplot.subplots(nrows=2)
trace.plot_freq_hists(self.map_label, axis)
matplotlib.pyplot.close('all')
def test_plot_load(self):
"""Test that plot_load() doesn't explode"""
trace = trappy.FTrace()
trace.plot_load(self.map_label, title="Util")
_, ax = matplotlib.pyplot.subplots()
trace.plot_load(self.map_label, ax=ax)
def test_plot_normalized_load(self):
"""Test that plot_normalized_load() doesn't explode"""
trace = trappy.FTrace()
_, ax = matplotlib.pyplot.subplots()
trace.plot_normalized_load(self.map_label, ax=ax)
def test_plot_allfreqs(self):
"""Test that plot_allfreqs() doesn't bomb"""
trace = trappy.FTrace()
trace.plot_allfreqs(self.map_label)
matplotlib.pyplot.close('all')
_, axis = matplotlib.pyplot.subplots(nrows=2)
trace.plot_allfreqs(self.map_label, ax=axis)
matplotlib.pyplot.close('all')
def test_plot_allfreqs_with_one_actor(self):
"""Check that plot_allfreqs() works with one actor"""
in_data = """ kworker/4:1-397 [004] 720.741349: thermal_power_cpu_get: cpus=00000000,00000006 freq=1400000 raw_cpu_power=189 load={23, 12} power=14
kworker/4:1-397 [004] 720.741679: thermal_power_cpu_limit: cpus=00000000,00000006 freq=1400000 cdev_state=1 power=14"""
with open("trace.txt", "w") as fout:
fout.write(in_data)
trace = trappy.FTrace()
map_label = {"00000000,00000006": "A57"}
_, axis = matplotlib.pyplot.subplots(nrows=1)
trace.plot_allfreqs(map_label, ax=[axis])
matplotlib.pyplot.close('all')
def test_trace_metadata(self):
"""Test if metadata gets populated correctly"""
expected_metadata = {}
expected_metadata["version"] = "6"
expected_metadata["cpus"] = "6"
trace = trappy.FTrace()
for key, value in expected_metadata.items():
self.assertTrue(hasattr(trace, "_" + key))
self.assertEquals(getattr(trace, "_" + key), value)
def test_missing_metadata(self):
"""Test if trappy.FTrace() works with a trace missing metadata info"""
lines = []
with open("trace.txt", "r") as fil:
lines += fil.readlines()
lines = lines[7:]
fil.close()
with open("trace.txt", "w") as fil:
fil.write("".join(lines))
fil.close()
trace = trappy.FTrace()
self.assertEquals(trace._cpus, None)
self.assertEquals(trace._version, None)
self.assertTrue(len(trace.thermal.data_frame) > 0)
def test_ftrace_accepts_window(self):
"""FTrace class accepts a window parameter"""
trace = trappy.FTrace(window=(1.234726, 5.334726))
self.assertEquals(trace.thermal.data_frame.iloc[0]["temp"], 68989)
self.assertEquals(trace.thermal.data_frame.iloc[-1]["temp"], 69530)
def test_ftrace_accepts_abs_window(self):
"""FTrace class accepts an abs_window parameter"""
trace = trappy.FTrace(abs_window=(1585, 1589.1))
self.assertEquals(trace.thermal.data_frame.iloc[0]["temp"], 68989)
self.assertEquals(trace.thermal.data_frame.iloc[-1]["temp"], 69530)
def test_parse_tracing_mark_write_events(self):
"""Check that tracing_mark_write events are parsed without errors"""
in_data = """ sh-1379 [002] 353.397813: print: tracing_mark_write: TRACE_MARKER_START
shutils-1381 [001] 353.680439: print: tracing_mark_write: cpu_frequency: state=450000 cpu_id=5"""
with open("trace.txt", "w") as fout:
fout.write(in_data)
try:
trace = trappy.FTrace()
except TypeError as e:
self.fail("tracing_mark_write parsing failed with {} exception"\
.format(e.message))
# The second event is recognised as a cpu_frequency event and therefore
# put under trace.cpu_frequency
self.assertEquals(trace.tracing_mark_write.data_frame.iloc[-1]["string"],
"TRACE_MARKER_START")
self.assertEquals(len(trace.tracing_mark_write.data_frame), 1)
@unittest.skipUnless(utils_tests.trace_cmd_installed(),
"trace-cmd not installed")
class TestFTraceRawDat(utils_tests.SetupDirectory):
def __init__(self, *args, **kwargs):
super(TestFTraceRawDat, self).__init__(
[("raw_trace.dat", "trace.dat")],
*args,
**kwargs)
def test_raw_dat(self):
"""Tests an event that relies on raw parsing"""
trace = trappy.FTrace()
self.assertTrue(hasattr(trace, "sched_switch"))
self.assertTrue(len(trace.sched_switch.data_frame) > 0)
self.assertTrue("prev_comm" in trace.sched_switch.data_frame.columns)
def test_raw_dat_arb_name(self):
"""Tests an event that relies on raw parsing with arbitrary .dat file name"""
arbitrary_name = "my_trace.dat"
shutil.move("trace.dat", arbitrary_name)
trace = trappy.FTrace(arbitrary_name)
self.assertTrue(hasattr(trace, "sched_switch"))
self.assertTrue(len(trace.sched_switch.data_frame) > 0)
class TestFTraceRawBothTxt(utils_tests.SetupDirectory):
def __init__(self, *args, **kwargs):
super(TestFTraceRawBothTxt, self).__init__(
[("raw_trace.txt", "trace.txt"),],
*args,
**kwargs)
def test_both_txt_files(self):
"""test raw parsing for txt files"""
self.assertFalse(os.path.isfile("trace.dat"))
trace = trappy.FTrace()
self.assertTrue(hasattr(trace, "sched_switch"))
self.assertTrue(len(trace.sched_switch.data_frame) > 0)
def test_both_txt_arb_name(self):
"""Test raw parsing for txt files arbitrary name"""
arbitrary_name = "my_trace.txt"
shutil.move("trace.txt", arbitrary_name)
trace = trappy.FTrace(arbitrary_name)
self.assertTrue(hasattr(trace, "sched_switch"))
self.assertTrue(len(trace.sched_switch.data_frame) > 0)
class TestFTraceSched(utils_tests.SetupDirectory):
"""Tests using a trace with only sched info and no (or partial) thermal"""
def __init__(self, *args, **kwargs):
super(TestFTraceSched, self).__init__(
[("trace_empty.txt", "trace.txt")],
*args,
**kwargs)
def test_ftrace_basetime_empty(self):
"""Test that basetime is 0 if data frame of all data objects is empty"""
trace = trappy.FTrace(normalize_time=False)
self.assertEqual(trace.basetime, 0)
def test_ftrace_unique_but_no_fields(self):
"""Test with a matching unique but no special fields"""
version_parser = trappy.register_dynamic_ftrace("Version", "version")
# Append invalid line to file
with open("trace.txt", "a") as fil:
fil.write("version = 6")
with self.assertRaises(ValueError):
trappy.FTrace(scope="custom")
trappy.unregister_dynamic_ftrace(version_parser)
def test_ftrace_normalize_some_tracepoints(self):
"""Test that normalizing time works if not all the tracepoints are in the trace"""
with open("trace.txt", "a") as fil:
fil.write(" kworker/4:1-1219 [004] 508.424826: thermal_temperature: thermal_zone=exynos-therm id=0 temp_prev=24000 temp=24000")
trace = trappy.FTrace()
self.assertEqual(trace.thermal.data_frame.index[0], 0)
@unittest.skipUnless(utils_tests.trace_cmd_installed(),
"trace-cmd not installed")
class TestTraceDat(utils_tests.SetupDirectory):
"""Test that trace.dat handling work"""
def __init__(self, *args, **kwargs):
super(TestTraceDat, self).__init__(
[("trace.dat", "trace.dat")],
*args, **kwargs)
def assert_thermal_in_trace(self, fname):
"""Assert that the thermal event is in the trace
fname is the trace file, usually "trace.txt" or "trace.raw.txt"
"""
found = False
with open(fname) as fin:
for line in fin:
if re.search("thermal", line):
found = True
break
self.assertTrue(found)
def test_do_txt_if_not_there(self):
"""Create trace.txt if it's not there"""
self.assertFalse(os.path.isfile("trace.txt"))
trappy.FTrace()
self.assert_thermal_in_trace("trace.txt")
def test_ftrace_arbitrary_trace_dat(self):
"""FTrace() works if asked to parse a binary trace with a filename other than trace.dat"""
arbitrary_trace_name = "my_trace.dat"
shutil.move("trace.dat", arbitrary_trace_name)
dfr = trappy.FTrace(arbitrary_trace_name).thermal.data_frame
self.assertTrue(os.path.exists("my_trace.txt"))
self.assertTrue(len(dfr) > 0)
self.assertFalse(os.path.exists("trace.dat"))
self.assertFalse(os.path.exists("trace.txt"))
def test_regenerate_txt_if_outdated(self):
"""Regenerate the trace.txt if it's older than the trace.dat"""
trappy.FTrace()
# Empty the trace.txt
with open("trace.txt", "w") as fout:
fout.write("")
# Set access and modified time of trace.txt to 10 seconds ago
now = time.time()
os.utime("trace.txt", (now - 10, now - 10))
# touch trace.dat
os.utime("trace.dat", None)
trappy.FTrace()
self.assert_thermal_in_trace("trace.txt")