普通文本  |  327行  |  11.75 KB

# SPDX-License-Identifier: Apache-2.0
#
# Copyright (C) 2016, ARM Limited and contributors.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import json
import time
import re
import pandas
import StringIO

from unittest import SkipTest

from env import TestEnv
from test import LisaTest

"""
Goal
====

Check that the configuration of a given device is suitable
for running EAS.

Detailed Description
====================

This test reads the kernel configuration and digs around in sysfs to
check the following attributes are true:
    * the minimum set of required config options are enabled
    * all CPUs have access to the 'sched' CPUFreq governor
    * energy aware scheduling is present and enabled

Expected Behaviour
==================

All required config options are set, sched governor is present.

"""

TEST_CONF = {
    'modules': ['cpufreq'],
    'results_dir': 'PreliminaryTests',
    'tools': [
        'sysbench',
    ]
}

class BasicCheckTest(LisaTest):
    @classmethod
    def setUpClass(cls):
        cls.env = TestEnv(test_conf=TEST_CONF)
        cls.target = cls.env.target

class TestSchedGovernor(BasicCheckTest):
    def test_sched_governor_available(self):
        """
        Check that the 'sched' or 'schedutil' cpufreq governor is available
        """
        fail_list = []
        for cpu in self.target.list_online_cpus():
            governors = self.target.cpufreq.list_governors(cpu)
            if 'sched' not in governors and 'schedutil' not in governors:
                fail_list.append(cpu)
        msg = 'CPUs {} do not support sched[util] cpufreq governor'.format(
            fail_list)
        self.assertTrue(len(fail_list) == 0, msg=msg)

class TestKernelConfig(BasicCheckTest):
    def test_kernel_config(self):
        """
        Check that the kernel config has the basic requirements for EAS
        """
        kernel_config = self.target.config
        if not kernel_config.text:
            raise SkipTest('Kernel config not available on target')

        # NB: We don't test for schedtune/schedutil, that's tested by
        # TestSchedGovernor.
        necessary_configs = [
            # 'CONFIG_CPU_FREQ_STAT',
            'CONFIG_CGROUPS',
            'CONFIG_SMP',
            'CONFIG_SCHED_MC',
            'CONFIG_CPU_FREQ',
            'CONFIG_CPU_IDLE',
            'CONFIG_SCHED_DEBUG',
        ]

        fail_list = [c for c in necessary_configs
                     if not kernel_config.is_enabled(c)]

        if len(fail_list):
            message = 'Missing kernel configs: ' + ', '.join(fail_list)
            self.assertTrue(len(fail_list) == 0, msg=message)

class TestWorkThroughput(BasicCheckTest):
    """
    Check that compute throughput increases with CPU frequency

    That is, check that cpufreq really works in that setting a higher
    frequency provides greater CPU performance
    """
    def _run_sysbench_work(self, cpu, duration):
        """
        Run benchmark using 1 thread on a given CPU.

        :param cpu: cpu to run the benchmark on
        :type cpu: str
        :param duration: length of time, in seconds to run the benchmark

        :returns: float - performance score
        """
        args = '--test=cpu --num-threads=1 --max-time={} run'.format(duration)

        sysbench = self.target.path.join(self.target.executables_directory,
                                         'sysbench')
        bench_out = self.target.invoke(sysbench, args=args, on_cpus=[cpu])

        match = re.search(r'(total number of events:\s*)([\d.]*)', bench_out)
        return float(match.group(2))

    def _check_work_throughput(self, cpu, duration, margin):
        frequencies = self.target.cpufreq.list_frequencies(cpu)
        if len(frequencies) == 1:
            return True

        original_governor = self.target.cpufreq.get_governor(cpu)
        original_freq = None
        if original_governor == 'userspace':
            original_freq = self.target.cpufreq.get_frequency(cpu)

        # Set userspace governor
        self.target.cpufreq.set_governor(cpu, 'userspace')

        # Run at lowest & highest freq
        result = {}
        for freq in [frequencies[0], frequencies[-1]]:
            self.target.cpufreq.set_frequency(cpu, freq)
            result[freq] = self._run_sysbench_work(cpu, duration)

        # Restore governor
        self.target.cpufreq.set_governor(cpu, original_governor)
        if original_freq:
            self.target.cpufreq.set_frequency(cpu, original_freq)

        # Make sure work done at highest OPP is at least some % higher
        # than work done at lowest OPP - this filters the
        # +/- 1 sysbench result noise
        work_diff = result[frequencies[-1]] - result[frequencies[0]]
        ok = work_diff > result[frequencies[0]] * margin
        return ok

    def test_work_throughput(self):
        duration = 1.0
        margin = 0.1
        failed_cpus = []

        # Run test on each known cpu
        for cpu in range(self.target.number_of_cpus):
            if not self._check_work_throughput(cpu, duration, margin):
                failed_cpus.append(cpu)

        # Format error message
        msg='Problems detected on CPUs: {}\n'\
        'Work at highest OPP wasn\'t {}% bigger than work at lowest OPP on these CPUs'\
            .format(failed_cpus, margin * 100)

        self.assertFalse(len(failed_cpus), msg=msg)

class TestEnergyModelPresent(BasicCheckTest):
    def test_energy_model_present(self):
        """Test that we can see the energy model in sysctl"""
        if not self.target.file_exists(
                '/proc/sys/kernel/sched_domain/cpu0/domain0/group0/energy/'):
            raise AssertionError(
                'No energy model visible in procfs. Possible causes: \n'
                '- Kernel built without (CONFIG_SCHED_DEBUG && CONFIG_SYSCTL)\n'
                '- No energy model in kernel')

class TestSchedutilTunables(BasicCheckTest):
    MAX_RATE_LIMIT_US = 20 * 1e3

    def test_rate_limit_not_too_high(self):
        """Test that the schedutil ratelimiting is not too harsh"""
        governors = self.target.cpufreq.list_governors(0)
        if 'schedutil' not in governors:
            raise SkipTest('schedutil not present on target')
        self.target.cpufreq.set_all_governors('schedutil')

        cpus = set(range(self.target.number_of_cpus))
        fail_cpus = []

        while cpus:
            cpu = iter(cpus).next()
            domain = tuple(self.target.cpufreq.get_related_cpus(cpu))

            tunables = self.target.cpufreq.get_governor_tunables(cpu)
            for name, value in tunables.iteritems():
                if name.endswith('rate_limit_us'):
                    if int(value) > self.MAX_RATE_LIMIT_US:
                        fail_cpus += domain

            cpus = cpus.difference(domain)

        self.assertTrue(
            fail_cpus == [],
            'schedutil rate limit greater than {}us on CPUs {}. '
            'Responsiveness will be affected.'.format(
                self.MAX_RATE_LIMIT_US, fail_cpus))

class TestSchedDomainFlags(BasicCheckTest):
    """Test requirements of sched_domain flags"""

    # See include/linux/sched.h in an EAS kernel
    SD_ASYM_CPUCAPACITY = 0x0040
    SD_SHARE_CAP_STATES = 0x8000

    def setUp(self):
        if not self.target.file_exists('/proc/sys/kernel/sched_domain/'):
            raise SkipTest('sched_domain info not exposed in procfs. '
                           'Enable CONFIG_SCHED_DEBUG in target kernel')

    def iter_cpu_sd_flags(self, cpu):
        """
        Get the flags for a given CPU's sched_domains

        :param cpu: Logical CPU number whose sched_domains' flags we want
        :returns: Iterator over the flags, as an int, of each of that CPU's
                  domains, highest-level (i.e. typically "DIE") first.
        """
        base_path = '/proc/sys/kernel/sched_domain/cpu{}/'.format(cpu)
        for domain in sorted(self.target.list_directory(base_path), reverse=True):
            flags_path = self.target.path.join(base_path, domain, 'flags')
            yield self.target.read_int(flags_path)

    def test_share_cap_states(self):
        """
        Check that some domain exists with SD_SHARE_CAP_STATES set

        EAS silently does nothing if this flag is not set at any level (see
        use of sd_scs percpu variable in scheduler code).
        """
        cpu0_flags = []
        for flags in self.iter_cpu_sd_flags(0):
            if flags & self.SD_SHARE_CAP_STATES:
                return
            cpu0_flags.append(flags)
        flags_str = ', '.join([hex(f) for f in cpu0_flags])
        raise AssertionError('No sched_domain with SD_SHARE_CAP_STATES flag. '
                             'flags: {}'.format(flags_str))

    def _get_cpu_cap_path(self, cpu):
        return '/sys/devices/system/cpu/cpu{}/cpu_capacity'.format(cpu)

    def read_cpu_caps(self):
        """Get all the CPUs' capacities from sysfs as a list of ints"""
        return [self.target.read_int(self._get_cpu_cap_path(cpu))
                for cpu in range(self.target.number_of_cpus)]

    def write_cpu_caps(self, caps):
        """Write all the CPUs' capacites to sysfs from a list of ints"""
        for cpu, cap in enumerate(caps):
            self.target.write_value(self._get_cpu_cap_path(cpu), cap)

    def _test_asym_cpucapacity(self, caps, expect_asym):
        top_sd_flags = self.iter_cpu_sd_flags(0).next()
        if expect_asym:
            self.assertTrue(
                top_sd_flags & self.SD_ASYM_CPUCAPACITY,
                'SD_ASYM_CPUCAPACITY not set on highest sched_domain. '
                'cpu_capacity values: {}'.format(caps))
        else:
            self.assertFalse(
                top_sd_flags & self.SD_ASYM_CPUCAPACITY,
                'SD_ASYM_CPUCAPACITY set unexpectedly on highest sched_domain. '
                'cpu_capacity values: {}'.format(caps))

    def test_asym_cpucapacity(self):
        """
        Check that the SD_ASYM_CPUCAPACITY flag gets set when it should

        SD_ASYM_CPUCAPACITY should be set at least on the highest domain when a
        system is asymmetric.

        - Test that it is set appropriately for the current
          cpu_capacity values
        - Invert the apparent symmetry of the system by modifying the
          cpu_capacity sysfs files, and check the flag is inverted.
        - Finally, revert to the old cpu_capacity values and check the flag
          returns to its old value.
        """
        if not self.target.file_exists(self._get_cpu_cap_path(0)):
            raise SkipTest('cpu_capacity info not exposed in sysfs.')

        old_caps = self.read_cpu_caps()
        old_caps_asym = any(c != old_caps[0] for c in old_caps[1:])

        self._test_asym_cpucapacity(old_caps, old_caps_asym)

        if old_caps_asym:
            # Make the (currently asymmetrical) system look symmetrical
            test_caps = [1024 for _ in range(self.target.number_of_cpus)]
        else:
            # Make the (currently symmetrical) system look asymmetrical
            test_caps = range(self.target.number_of_cpus)

        # Use a try..finally so that we leave the cpu_capacity files as we found
        # them, even if the test fails (i.e. we raise an AssertionError).
        try:
            self.write_cpu_caps(test_caps)
            self._test_asym_cpucapacity(old_caps, not old_caps_asym)
        finally:
            self.write_cpu_caps(old_caps)
            self._test_asym_cpucapacity(old_caps, old_caps_asym)