普通文本  |  197行  |  7.44 KB

#    Copyright 2015-2017 ARM Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#


import pandas as pd
import unittest

from trappy.plotter import AttrConf
from trappy.plotter.Constraint import Constraint, ConstraintManager

class TestConstraintManager(unittest.TestCase):
    """Test trappy.plotter.ConstraintManager"""

    def __init__(self, *args, **kwargs):
        """Init some common data for the tests"""

        self.dfrs = [pd.DataFrame({"load": [1, 2, 2, 3],
                                   "freq": [2, 3, 3, 4],
                                   "cpu": [0, 1, 0, 1]}),
                     pd.DataFrame({"load": [2, 3, 2, 1],
                                   "freq": [1, 2, 2, 1],
                                   "cpu": [1, 0, 1, 0]})]
        self.cols = ["load", "freq"]
        super(TestConstraintManager, self).__init__(*args, **kwargs)

    def test_one_constraint(self):
        """Test that the constraint manager works with one constraint"""

        dfr = self.dfrs[0]

        c_mgr = ConstraintManager(dfr, "load", None, AttrConf.PIVOT, {})

        self.assertEquals(len(c_mgr), 1)

        constraint = iter(c_mgr).next()
        series = constraint.result[AttrConf.PIVOT_VAL]
        self.assertEquals(series.to_dict().values(),
                          dfr["load"].to_dict().values())

    def test_no_pivot_multiple_traces(self):
        """Test that the constraint manager works with multiple traces and no pivots"""

        c_mgr = ConstraintManager(self.dfrs, "load", None, AttrConf.PIVOT, {})

        self.assertEquals(len(c_mgr), 2)

        for constraint, orig_dfr in zip(c_mgr, self.dfrs):
            series = constraint.result[AttrConf.PIVOT_VAL]
            self.assertEquals(series.to_dict().values(),
                              orig_dfr["load"].to_dict().values())

    def test_no_pivot_zipped_columns_and_traces(self):
        """Test the constraint manager with multiple columns and traces zipped"""

        c_mgr = ConstraintManager(self.dfrs, self.cols, None, AttrConf.PIVOT, {})

        self.assertEquals(len(c_mgr), 2)

        for constraint, orig_dfr, col in zip(c_mgr, self.dfrs, self.cols):
            series = constraint.result[AttrConf.PIVOT_VAL]
            self.assertEquals(series.to_dict().values(),
                              orig_dfr[col].to_dict().values())

    def test_no_pivot_multicolumns_multitraces(self):
        """Test the constraint manager with multiple traces that can have each multiple columns"""

        c_mgr = ConstraintManager(self.dfrs, self.cols, None, AttrConf.PIVOT,
                                  {}, zip_constraints=False)

        self.assertEquals(len(c_mgr), 4)

        expected_series = [dfr[col] for dfr in self.dfrs for col in self.cols]
        for constraint, orig_series in zip(c_mgr, expected_series):
            series = constraint.result[AttrConf.PIVOT_VAL]
            self.assertEquals(series.to_dict(), orig_series.to_dict())

    def test_no_pivot_filters(self):
        """Test the constraint manager with filters"""

        simple_filter = {"freq": [2]}

        c_mgr = ConstraintManager(self.dfrs, "load", None, AttrConf.PIVOT,
                                  simple_filter)

        num_constraints = len(c_mgr)
        self.assertEquals(num_constraints, 2)

        constraint_iter = iter(c_mgr)
        constraint = constraint_iter.next()
        self.assertEquals(len(constraint.result), 1)

        constraint = constraint_iter.next()
        series_second_frame = constraint.result[AttrConf.PIVOT_VAL]
        self.assertEquals(series_second_frame.to_dict().values(), [3, 2])

    def test_pivoted_data(self):
        """Test the constraint manager with a pivot and one trace"""

        c_mgr = ConstraintManager(self.dfrs[0], "load", None, "cpu", {})

        self.assertEquals(len(c_mgr), 1)

        constraint = iter(c_mgr).next()
        results = dict([(k, v.to_dict().values()) for k, v in constraint.result.items()])
        expected_results = {0: [1, 2], 1: [2, 3]}

        self.assertEquals(results, expected_results)

    def test_pivoted_multitrace(self):
        """Test the constraint manager with a pivot and multiple traces"""

        c_mgr = ConstraintManager(self.dfrs, "load", None, "cpu", {})

        self.assertEquals(len(c_mgr), 2)

        constraint_iter = iter(c_mgr)
        constraint = constraint_iter.next()
        self.assertEquals(constraint.result[0].to_dict().values(), [1, 2])

        constraint = constraint_iter.next()
        self.assertEquals(constraint.result[1].to_dict().values(), [2, 2])

    def test_pivoted_multitraces_multicolumns(self):
        """Test the constraint manager with multiple traces and columns"""

        c_mgr = ConstraintManager(self.dfrs, ["load", "freq"], None, "cpu", {})
        self.assertEquals(len(c_mgr), 2)

        constraint_iter = iter(c_mgr)
        constraint = constraint_iter.next()
        self.assertEquals(constraint.result[1].to_dict().values(), [2, 3])

        constraint = constraint_iter.next()
        self.assertEquals(constraint.result[0].to_dict().values(), [2, 1])

    def test_pivoted_with_filters(self):
        """Test the constraint manager with pivoted data and filters"""

        simple_filter = {"load": [2]}
        c_mgr = ConstraintManager(self.dfrs[0], "freq", None, "cpu",
                                  simple_filter)

        self.assertEquals(len(c_mgr), 1)

        constraint = iter(c_mgr).next()
        result = constraint.result

        self.assertEquals(result[0].iloc[0], 3)
        self.assertEquals(result[1].iloc[0], 3)

    def test_constraint_with_window(self):
        """Test that the constraint manager can constraint to a window of time"""
        c_mgr = ConstraintManager(self.dfrs[0], "freq", None, AttrConf.PIVOT, {},
                                  window=(1, 3))

        constraint = iter(c_mgr).next()
        series = constraint.result[AttrConf.PIVOT_VAL]
        self.assertEquals(len(series), 3)

        # For the graph to plot a value at 0.75, the resulting series
        # must contain the value before 0.75.  Same for the upper limit.
        c_mgr = ConstraintManager(self.dfrs[0], "freq", None, AttrConf.PIVOT, {},
                                  window=(0.75, 1.5))

        constraint = iter(c_mgr).next()
        series = constraint.result[AttrConf.PIVOT_VAL]
        self.assertEquals(series.index.tolist(), [0, 1, 2])

        c_mgr = ConstraintManager(self.dfrs[0], "freq", None, AttrConf.PIVOT, {},
                                  window=(0, 2))

        constraint = iter(c_mgr).next()
        series = constraint.result[AttrConf.PIVOT_VAL]
        self.assertEquals(len(series), 3)

class TestConstraint(unittest.TestCase):
    def test_str_constraint(self):
        """str(constraint) doesn't fail when the column is not a string"""
        dfr = pd.DataFrame({12: [1, 2, 3], 13: [3, 4, 5]})

        constraint = Constraint(dfr, AttrConf.PIVOT, 12, template=None,
                                trace_index=0, filters={}, window=None)

        self.assertEqual(str(constraint), "DataFrame 0:12")