普通文本  |  221行  |  7.77 KB

# Copyright (c) 2014 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

import logging
import os
import re
from time import sleep

from autotest_lib.client.bin import test, utils
from autotest_lib.client.common_lib import error

class kernel_Ktime(test.test):
    """
    Test to ensure that ktime and the RTC clock are consistent.

    """
    version = 1

    MIN_KERNEL_VER = '3.8'
    MODULE_NAME = 'udelay_test'
    UDELAY_PATH = '/sys/kernel/debug/udelay_test'
    RTC_PATH = '/sys/class/rtc/rtc0/since_epoch'

    # How many iterations to run the test for, each iteration is usally
    # a second, but might be more if the skew is too large when retrieving
    # the RTC and ktime.
    TEST_DURATION = 250

    # Allowable drift (as a function of elapsed RTC time): 0.01%
    ALLOWABLE_DRIFT = 0.0001

    # Maximum skew between ktime readings when aligning RTC and ktime
    MAX_SKEW = 0.050

    # Diffs to average for the rolling display
    DIFFS_TO_AVERAGE = 30

    def _set_file(self, contents, filename):
        """
        Write a string to a file.

        @param contents: the contents to write to the file
        @param filename: the filename to use

        """
        logging.debug('setting %s to %s', filename, contents)
        with open(filename, 'w') as f:
            f.write(contents)


    def _get_file(self, filename):
        """
        Read a string from a file.

        @returns: the contents of the file (string)

        """
        with open(filename, 'r') as f:
            return f.read()


    def _get_rtc(self):
        """
        Get the current RTC time.

        @returns: the current RTC time since epoch (int)

        """
        return int(self._get_file(self.RTC_PATH))


    def _get_ktime(self):
        """
        Get the current ktime.

        @returns: the current ktime (float)

        """
        # Writing a delay of 0 will return info including the current ktime.
        self._set_file('0', self.UDELAY_PATH)
        with open(self.UDELAY_PATH, 'r') as f:
            for line in f:
                line = line.rstrip()
                logging.debug('result: %s', line)
                m = re.search(r'kt=(\d+.\d+)', line)
                if m:
                    return float(m.group(1))
        return 0.0


    def _get_times(self):
        """
        Get the rtc and estimated ktime and max potential error.

        Returns the RTC and a best guess of the ktime when the RTC actually
        ticked over to the current value.  Also returns the maximum potential
        error of how far they are off by.

        RTC ticked in the range of [ktime - max_error, ktime + max_error]

        @returns: list of the current rtc, estimated ktime, max error

        """
        # Times are read k1, r1, k2, r2, k3.  RTC ticks over somewhere between
        # r1 and r2, but since we don't know exactly when that is, the best
        # guess we have is between k1 and k3.
        rtc_older = self._get_rtc()
        ktime_older = self._get_ktime()
        rtc_old = self._get_rtc()
        ktime_old = self._get_ktime()

        # Ensure that this function returns in a reasonable number of
        # iterations.  If excessive skew occurs repeatedly (eg RTC is too
        # slow), abort.
        bad_skew = 0
        while bad_skew < 10:
            rtc = self._get_rtc()
            ktime = self._get_ktime()
            skew = ktime - ktime_older
            if skew > self.MAX_SKEW:
                # Time between successive calls to ktime was too slow to
                # bound the error to a reasonable value.  A few occurrences
                # isn't anything to be concerned about, but if it's happening
                # every second, it's worth investigating and could indicate
                # that the RTC is very slow and MAX_SKEW needs to be increased.
                logging.info((
                    'retrying excessive skew: '
                    'rtc [%d %d %d] ktime [%f %f %f] skew %f'),
                    rtc_older, rtc_old, rtc, ktime_older, ktime_old, ktime,
                    skew)
                bad_skew += 1
            elif rtc != rtc_old:
                if rtc_older != rtc_old or rtc != rtc_old + 1:
                    # This could happen if we took more than one second per
                    # loop and could be changed to a warning if legitimate.
                    raise error.TestFail('rtc progressed from %u to %u to %u' %
                            (rtc_older, rtc_old, rtc))
                return rtc, ktime_older + skew / 2, skew / 2
            rtc_older = rtc_old
            ktime_older = ktime_old
            rtc_old = rtc
            ktime_old = ktime
        raise error.TestFail('could not reach skew %f after %d attempts' % (
                self.MAX_SKEW, bad_skew))


    def run_once(self):
        kernel_ver = os.uname()[2]
        if utils.compare_versions(kernel_ver, self.MIN_KERNEL_VER) < 0:
            logging.info(
                    'skipping test: old kernel %s (min %s) missing module %s',
                    kernel_ver, self.MIN_KERNEL_VER, self.MODULE_NAME)
            return

        utils.load_module(self.MODULE_NAME)

        start_rtc, start_ktime, start_error = self._get_times()
        logging.info(
                'start rtc %d ktime %f error %f',
                start_rtc, start_ktime, start_error)

        recent_diffs = []
        max_diff = 0
        sum_rtc = 0
        sum_diff = 0
        sum_rtc_rtc = 0
        sum_rtc_diff = 0
        sum_diff_diff = 0
        for i in xrange(self.TEST_DURATION):
            # Sleep some amount of time to avoid busy waiting the entire time
            sleep((i % 10) * 0.1)

            current_rtc, current_ktime, current_error = self._get_times()
            elapsed_rtc = current_rtc - start_rtc
            elapsed_ktime = current_ktime - start_ktime
            elapsed_diff = float(elapsed_rtc) - elapsed_ktime

            # Allow for inaccurate ktime off ALLOWABLE_DRIFT from elapsed RTC,
            # and take into account start and current error in times gathering
            max_error = start_error + current_error
            drift_threshold = elapsed_rtc * self.ALLOWABLE_DRIFT + max_error

            # Track rolling average and maximum diff
            recent_diffs.append(elapsed_diff)
            if len(recent_diffs) > self.DIFFS_TO_AVERAGE:
                recent_diffs.pop(0)
            rolling_diff = sum(recent_diffs) / len(recent_diffs)
            if abs(elapsed_diff) > abs(max_diff):
                max_diff = elapsed_diff

            # Track linear regression
            sum_rtc += elapsed_rtc
            sum_diff += elapsed_diff
            sum_rtc_rtc += elapsed_rtc * elapsed_rtc
            sum_rtc_diff += elapsed_rtc * elapsed_diff
            sum_diff_diff += elapsed_diff * elapsed_diff

            logging.info((
                    'current rtc %d ktime %f error %f; elapsed rtc %d '
                    'ktime %f: threshold %f diff %+f rolling %+f'),
                    current_rtc, current_ktime, current_error, elapsed_rtc,
                    elapsed_ktime, drift_threshold, elapsed_diff, rolling_diff)

            if abs(elapsed_diff) > drift_threshold:
                raise error.TestFail((
                        'elapsed rtc %d and ktime %f diff %f '
                        'is greater than threshold %f') %
                        (elapsed_rtc, elapsed_ktime, elapsed_diff,
                        drift_threshold))

        # Dump final statistics
        logging.info('max_diff %f', max_diff)
        mean_rtc = sum_rtc / self.TEST_DURATION
        mean_diff = sum_diff / self.TEST_DURATION
        slope = ((sum_rtc_diff - sum_rtc * mean_diff) /
                (sum_rtc_rtc - sum_rtc * mean_rtc))
        logging.info('drift %.9f', slope)

        utils.unload_module(self.MODULE_NAME)