# Copyright (c) 2012 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. import logging import re import time import xmlrpclib from autotest_lib.client.common_lib import error from autotest_lib.server.cros.faft.firmware_test import FirmwareTest class firmware_ECThermal(FirmwareTest): """ Servo based EC thermal engine test. """ version = 1 # Delay for waiting fan to start or stop FAN_DELAY = 5 # Delay for waiting device stressing to stablize STRESS_DELAY = 30 # Delay for stressing device with fan off to check temperature increase STRESS_DELAY_NO_FAN = 12 # Margin for comparing servo based and ectool based CPU temperature TEMP_MISMATCH_MARGIN = 3 # Minimum increase of CPU temperature when stressing DUT TEMP_STRESS_INCREASE = 3 # Pseudo INT_MAX. Used as infinity when comparing temperature readings INT_MAX = 10000 # Sensor type ID of ignored sensors SENSOR_TYPE_IGNORED = 255 # PID of DUT stressing processes _stress_pid = list() def enable_auto_fan_control(self): """Enable EC automatic fan speed control""" # We use set_nocheck because servo reports current target # RPM instead 'auto', and therefore servo.set always fails. self.servo.set_nocheck('fan_target_rpm', 'auto') def max_fan(self): """Maximize fan speed""" # We use set_nocheck because servo reports current target # RPM instead 'max', and therefore servo.set always fails. self.servo.set_nocheck('fan_target_rpm', 'max') def turn_off_fan(self): """Turn off fan""" self.servo.set('fan_target_rpm', 'off') def _get_setting_for_type(self, type_id): """ Retrieve thermal setting for a given type of sensor Args: type_id: The ID of sensor type. Returns: A list containing thresholds in the following order: Warning CPU off All power off Fan speed thresholds """ setting = list() current_id = 0 while True: try: lines = self.faft_client.system.run_shell_command_get_output( 'ectool thermalget %d %d' % (type_id, current_id)) except xmlrpclib.Fault: break pattern = re.compile('Threshold \d* [a-z ]* \d* is (\d*) K.') for line in lines: matched = pattern.match(line) if matched is not None: # Convert degree K to degree C setting.append(int(matched.group(1)) - 273) current_id = current_id + 1 if len(setting) == 0: return None return setting def get_fan_steps(self): """Retrieve fan step config from EC""" num_steps = len(self._thermal_setting[0]) - 3 self._fan_steps = list() expected_pat = (["Lowest speed: ([0-9-]+) RPM"] + ["\d+ K:\s+([0-9-]+) RPM"] * num_steps) match = self.ec.send_command_get_output("thermalfan 0", expected_pat) for m in match: self._fan_steps.append(int(m[1])) # Get the actual value of each fan step for i in xrange(num_steps + 1): if self._fan_steps[i] == 0: continue self.servo.set_nocheck('fan_target_rpm', "%d" % self._fan_steps[i]) self._fan_steps[i] = int(self.servo.get('fan_target_rpm')) logging.info("Actual fan steps: %s", self._fan_steps) def get_thermal_setting(self): """Retrieve thermal engine setting from EC""" self._thermal_setting = list() type_id = 0 while True: setting = self._get_setting_for_type(type_id) if setting is None: break self._thermal_setting.append(setting) type_id = type_id + 1 logging.info("Number of tempearture sensor types: %d", type_id) # Get the number of temperature sensors self._num_temp_sensor = 0 while True: try: self.faft_client.system.run_shell_command('ectool temps %d' % self._num_temp_sensor) self._num_temp_sensor = self._num_temp_sensor + 1 except xmlrpclib.Fault: break logging.info("Number of temperature sensor: %d", self._num_temp_sensor) def initialize(self, host, cmdline_args): super(firmware_ECThermal, self).initialize(host, cmdline_args) # Don't bother if there is no Chrome EC. if not self.check_ec_capability(): raise error.TestNAError("Nothing needs to be tested on this device") self.ec.send_command("chan 0") try: self.faft_client.system.run_shell_command('stop temp_metrics') except xmlrpclib.Fault: self._has_temp_metrics = False else: logging.info('Stopped temp_metrics') self._has_temp_metrics = True if self.check_ec_capability(['thermal']): self.get_thermal_setting() self.get_fan_steps() self.enable_auto_fan_control() def cleanup(self): try: if self.check_ec_capability(['thermal']): self.enable_auto_fan_control() if self._has_temp_metrics: logging.info('Starting temp_metrics') self.faft_client.system.run_shell_command('start temp_metrics') self.ec.send_command("chan 0xffffffff") except Exception as e: logging.error("Caught exception: %s", str(e)) super(firmware_ECThermal, self).cleanup() def _find_cpu_sensor_id(self): """ This function find CPU temperature sensor using ectool. Returns: Integer ID of CPU temperature sensor. Raises: error.TestFail: Raised if we fail to find PECI temparture through ectool. """ for temp_id in range(self._num_temp_sensor): lines = self.faft_client.system.run_shell_command_get_output( 'ectool tempsinfo %d' % temp_id) for line in lines: matched = re.match('Sensor name: (.*)', line) if matched is not None and matched.group(1) == 'PECI': return temp_id raise error.TestFail('Cannot find CPU temperature sensor ID.') def _get_temp_reading(self, sensor_id): """ Get temperature reading on a sensor through ectool Args: sensor_id: Temperature sensor ID. Returns: Temperature reading in degree C. Raises: xmlrpclib.Fault: Raised when we fail to read temperature. error.TestError: Raised if ectool doesn't behave as we expected. """ assert sensor_id < self._num_temp_sensor pattern = re.compile('Reading temperature...(\d*)') lines = self.faft_client.system.run_shell_command_get_output( 'ectool temps %d' % sensor_id) for line in lines: matched = pattern.match(line) if matched is not None: return int(matched.group(1)) - 273 # Should never reach here raise error.TestError("Unexpected error occurred") def check_temp_report(self): """ Checker of temperature reporting. This function reads CPU temperature from servo and ectool. If the two readings mismatches by more than TEMP_MISMATCH_MARGIN,' test fails. Raises: error.TestFail: Raised when temperature reading mismatches by more than TEMP_MISMATCH_MARGIN. """ cpu_temp_id = self._find_cpu_sensor_id() logging.info("CPU temperature sensor ID is %d", cpu_temp_id) ectool_cpu_temp = self._get_temp_reading(cpu_temp_id) servo_cpu_temp = int(self.servo.get('cpu_temp')) logging.info("CPU temperature from servo: %d C", servo_cpu_temp) logging.info("CPU temperature from ectool: %d C", ectool_cpu_temp) if abs(ectool_cpu_temp - servo_cpu_temp) > self.TEMP_MISMATCH_MARGIN: raise error.TestFail( 'CPU temperature readings from servo and ectool differ') def _stress_dut(self, threads=4): """ Stress DUT system. By reading from /dev/urandom and writing to /dev/null, we can stress DUT and cause CPU temperature to go up. We stress the system forever, until _stop_stressing is called to kill the stress threads. This function is non-blocking. Args: threads: Number of threads (processes) when stressing forever. Returns: A list of stress process IDs is returned. """ logging.info("Stressing DUT with %d threads...", threads) self.faft_client.system.run_shell_command('pkill dd') stress_cmd = 'dd if=/dev/urandom of=/dev/null bs=1M &' # Grep for [d]d instead of dd to prevent getting the PID of grep # itself. pid_cmd = "ps -ef | grep '[d]d if=/dev/urandom' | awk '{print $2}'" self._stress_pid = list() for _ in xrange(threads): self.faft_client.system.run_shell_command(stress_cmd) lines = self.faft_client.system.run_shell_command_get_output( pid_cmd) for line in lines: logging.info("PID is %s", line) self._stress_pid.append(int(line.strip())) return self._stress_pid def _stop_stressing(self): """Stop stressing DUT system""" stop_cmd = 'kill -9 %d' for pid in self._stress_pid: self.faft_client.system.run_shell_command(stop_cmd % pid) def check_fan_off(self): """ Checker of fan turned off. The function first delay FAN_DELAY seconds to ensure fan stops. Then it reads fan speed and return False if fan speed is non-zero. Then it stresses the system a bit and check if the temperature goes up by more than TEMP_STRESS_INCREASE. Raises: error.TestFail: Raised when temperature doesn't increase by more than TEMP_STRESS_INCREASE. """ time.sleep(self.FAN_DELAY) fan_speed = self.servo.get('fan_actual_rpm') if int(fan_speed) != 0: raise error.TestFail("Fan is not turned off.") logging.info("EC reports fan turned off.") cpu_temp_before = int(self.servo.get('cpu_temp')) logging.info("CPU temperature before stressing is %d C", cpu_temp_before) self._stress_dut() time.sleep(self.STRESS_DELAY_NO_FAN) cpu_temp_after = int(self.servo.get('cpu_temp')) self._stop_stressing() logging.info("CPU temperature after stressing is %d C", cpu_temp_after) if cpu_temp_after - cpu_temp_before < self.TEMP_STRESS_INCREASE: raise error.TestFail( "CPU temperature did not go up by more than %d degrees" % self.TEMP_STRESS_INCREASE) def _get_temp_sensor_type(self, sensor_id): """ Get type of a given temperature sensor Args: sensor_id: Temperature sensor ID. Returns: Type ID of the temperature sensor. Raises: error.TestError: Raised when ectool doesn't behave as we expected. """ assert sensor_id < self._num_temp_sensor pattern = re.compile('Sensor type: (\d*)') lines = self.faft_client.system.run_shell_command_get_output( 'ectool tempsinfo %d' % sensor_id) for line in lines: matched = pattern.match(line) if matched is not None: return int(matched.group(1)) # Should never reach here raise error.TestError("Unexpected error occurred") def _check_fan_speed_per_sensor(self, fan_speed, sensor_id): """ Check if the given fan_speed is reasonable from the view of certain temperature sensor. There could be three types of outcome: 1. Fan speed is higher than expected. This may be due to other sensor sensing higher temperature and setting fan to higher speed. 2. Fan speed is as expected. 3. Fan speed is lower than expected. In this case, EC is not working as expected and an error should be raised. Args: fan_speed: The current fan speed in RPM. sensor_id: The ID of temperature sensor. Returns: 0x00: Fan speed is higher than expected. 0x01: Fan speed is as expected. 0x10: Fan speed is lower than expected. Raises: error.TestError: Raised when getting unexpected fan speed. """ sensor_type = self._get_temp_sensor_type(sensor_id) if sensor_type == self.SENSOR_TYPE_IGNORED: # This sensor should be ignored return 0x00 if self._thermal_setting[sensor_type][-1] == -273: # The fan stepping for this type of sensor is disabled return 0x00 try: idx = self._fan_steps.index(fan_speed) except: raise error.TestError("Unexpected fan speed: %d" % fan_speed) if idx == 0: lower_bound = -self.INT_MAX upper_bound = self._thermal_setting[sensor_type][3] elif idx == len(self._fan_steps) - 1: lower_bound = self._thermal_setting[sensor_type][idx + 2] - 3 upper_bound = self.INT_MAX else: lower_bound = self._thermal_setting[sensor_type][idx + 2] - 3 upper_bound = self._thermal_setting[sensor_type][idx + 3] temp_reading = self._get_temp_reading(sensor_id) logging.info("Sensor %d = %d C", sensor_id, temp_reading) logging.info(" Expecting %d - %d C", lower_bound, upper_bound) if temp_reading > upper_bound: return 0x00 elif temp_reading < lower_bound: return 0x10 else: return 0x01 def check_auto_fan(self): """ Checker of thermal engine automatic fan speed control. Stress DUT system for a longer period to make temperature more stable and check if fan speed is controlled as expected. Raises: error.TestFail: Raised when fan speed is not as expected. """ self._stress_dut() time.sleep(self.STRESS_DELAY) fan_rpm = int(self.servo.get('fan_target_rpm')) logging.info('Fan speed is %d RPM', fan_rpm) try: result = reduce(lambda x, y: x | y, [self._check_fan_speed_per_sensor(fan_rpm, x) for x in range(self._num_temp_sensor)]) finally: self._stop_stressing() if result == 0x00: raise error.TestFail("Fan speed higher than expected") if result == 0x10: raise error.TestFail("Fan speed lower than expected") def run_once(self): """Execute the main body of the test. """ if not self.check_ec_capability(['thermal']): raise error.TestNAError("Nothing needs to be tested on this device") logging.info("Checking host temperature report.") self.check_temp_report() self.turn_off_fan() logging.info("Verifying fan is turned off.") self.check_fan_off() self.enable_auto_fan_control() logging.info("Verifying automatic fan control functionality.") self.check_auto_fan()