# Copyright 2016 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""This class defines the CrosHost Label class."""
import collections
import logging
import os
import re
import common
from autotest_lib.client.bin import utils
from autotest_lib.client.common_lib import global_config
from autotest_lib.client.cros.audio import cras_utils
from autotest_lib.client.cros.video import constants as video_test_constants
from autotest_lib.server.cros.dynamic_suite import constants as ds_constants
from autotest_lib.server.hosts import base_label
from autotest_lib.server.hosts import common_label
from autotest_lib.server.hosts import servo_host
from autotest_lib.site_utils import hwid_lib
# pylint: disable=missing-docstring
LsbOutput = collections.namedtuple('LsbOutput', ['unibuild', 'board'])
def _parse_lsb_output(host):
"""Parses the LSB output and returns key data points for labeling.
@param host: Host that the command will be executed against
@returns: LsbOutput with the result of parsing the /etc/lsb-release output
"""
release_info = utils.parse_cmd_output('cat /etc/lsb-release',
run_method=host.run)
unibuild = release_info.get('CHROMEOS_RELEASE_UNIBUILD') == '1'
return LsbOutput(unibuild, release_info['CHROMEOS_RELEASE_BOARD'])
class BoardLabel(base_label.StringPrefixLabel):
"""Determine the correct board label for the device."""
_NAME = ds_constants.BOARD_PREFIX.rstrip(':')
def generate_labels(self, host):
# We only want to apply the board labels once, which is when they get
# added to the AFE. That way we don't have to worry about the board
# label switching on us if the wrong builds get put on the devices.
# crbug.com/624207 records one event of the board label switching
# unexpectedly on us.
board = host.host_info_store.get().board
if board:
return [board]
for label in host._afe_host.labels:
if label.startswith(self._NAME + ':'):
return [label.split(':')[-1]]
return [_parse_lsb_output(host).board]
class ModelLabel(base_label.StringPrefixLabel):
"""Determine the correct model label for the device."""
_NAME = ds_constants.MODEL_LABEL
def generate_labels(self, host):
# Based on the issue explained in BoardLabel, return the existing
# label if it has already been set once.
model = host.host_info_store.get().model
if model:
return [model]
for label in host._afe_host.labels:
if label.startswith(self._NAME + ':'):
return [label.split(':')[-1]]
lsb_output = _parse_lsb_output(host)
model = None
if lsb_output.unibuild:
test_label_cmd = 'cros_config / test-label'
result = host.run(command=test_label_cmd, ignore_status=True)
if result.exit_status == 0:
model = result.stdout.strip()
if not model:
mosys_cmd = 'mosys platform model'
result = host.run(command=mosys_cmd, ignore_status=True)
if result.exit_status == 0:
model = result.stdout.strip()
# We need some sort of backwards compatibility for boards that
# are not yet supported with mosys and unified builds.
# This is necessary so that we can begin changing cbuildbot to take
# advantage of the model/board label differentiations for
# scheduling, while still retaining backwards compatibility.
return [model or lsb_output.board]
class LightSensorLabel(base_label.BaseLabel):
"""Label indicating if a light sensor is detected."""
_NAME = 'lightsensor'
_LIGHTSENSOR_SEARCH_DIR = '/sys/bus/iio/devices'
_LIGHTSENSOR_FILES = [
"in_illuminance0_input",
"in_illuminance_input",
"in_illuminance0_raw",
"in_illuminance_raw",
"illuminance0_input",
]
def exists(self, host):
search_cmd = "find -L %s -maxdepth 4 | egrep '%s'" % (
self._LIGHTSENSOR_SEARCH_DIR, '|'.join(self._LIGHTSENSOR_FILES))
# Run the search cmd following the symlinks. Stderr_tee is set to
# None as there can be a symlink loop, but the command will still
# execute correctly with a few messages printed to stderr.
result = host.run(search_cmd, stdout_tee=None, stderr_tee=None,
ignore_status=True)
return result.exit_status == 0
class BluetoothLabel(base_label.BaseLabel):
"""Label indicating if bluetooth is detected."""
_NAME = 'bluetooth'
def exists(self, host):
result = host.run('test -d /sys/class/bluetooth/hci0',
ignore_status=True)
return result.exit_status == 0
class ECLabel(base_label.BaseLabel):
"""Label to determine the type of EC on this host."""
_NAME = 'ec:cros'
def exists(self, host):
cmd = 'mosys ec info'
# The output should look like these, so that the last field should
# match our EC version scheme:
#
# stm | stm32f100 | snow_v1.3.139-375eb9f
# ti | Unknown-10de | peppy_v1.5.114-5d52788
#
# Non-Chrome OS ECs will look like these:
#
# ENE | KB932 | 00BE107A00
# ite | it8518 | 3.08
#
# And some systems don't have ECs at all (Lumpy, for example).
regexp = r'^.*\|\s*(\S+_v\d+\.\d+\.\d+-[0-9a-f]+)\s*$'
ecinfo = host.run(command=cmd, ignore_status=True)
if ecinfo.exit_status == 0:
res = re.search(regexp, ecinfo.stdout)
if res:
logging.info("EC version is %s", res.groups()[0])
return True
logging.info("%s got: %s", cmd, ecinfo.stdout)
# Has an EC, but it's not a Chrome OS EC
logging.info("%s exited with status %d", cmd, ecinfo.exit_status)
return False
class Cr50Label(base_label.StringPrefixLabel):
"""Label indicating the cr50 version."""
_NAME = 'cr50'
def __init__(self):
self.ver = None
def exists(self, host):
# Make sure the gsctool version command runs ok
self.ver = host.run('gsctool -a -f', ignore_status=True)
return self.ver.exit_status == 0
def generate_labels(self, host):
# Check the major version to determine prePVT vs PVT
major_ver = int(re.search('RW \d+\.(\d+)\.\d+[\r\n]',
self.ver.stdout).group(1))
# PVT images have a odd major version prePVT have even
return ['pvt' if (major_ver % 2) else 'prepvt']
class AccelsLabel(base_label.BaseLabel):
"""Determine the type of accelerometers on this host."""
_NAME = 'accel:cros-ec'
def exists(self, host):
# Check to make sure we have ectool
rv = host.run('which ectool', ignore_status=True)
if rv.exit_status:
logging.info("No ectool cmd found; assuming no EC accelerometers")
return False
# Check that the EC supports the motionsense command
rv = host.run('ectool motionsense', ignore_status=True)
if rv.exit_status:
logging.info("EC does not support motionsense command; "
"assuming no EC accelerometers")
return False
# Check that EC motion sensors are active
active = host.run('ectool motionsense active').stdout.split('\n')
if active[0] == "0":
logging.info("Motion sense inactive; assuming no EC accelerometers")
return False
logging.info("EC accelerometers found")
return True
class ChameleonLabel(base_label.BaseLabel):
"""Determine if a Chameleon is connected to this host."""
_NAME = 'chameleon'
def exists(self, host):
return host._chameleon_host is not None
class ChameleonConnectionLabel(base_label.StringPrefixLabel):
"""Return the Chameleon connection label."""
_NAME = 'chameleon'
def exists(self, host):
return host._chameleon_host is not None
def generate_labels(self, host):
return [host.chameleon.get_label()]
class ChameleonPeripheralsLabel(base_label.StringPrefixLabel):
"""Return the Chameleon peripherals labels.
The 'chameleon:bt_hid' label is applied if the bluetooth
classic hid device, i.e, RN-42 emulation kit, is detected.
Any peripherals plugged into the chameleon board would be
detected and applied proper labels in this class.
"""
_NAME = 'chameleon'
def exists(self, host):
return host._chameleon_host is not None
def generate_labels(self, host):
bt_hid_device = host.chameleon.get_bluetooth_hid_mouse()
return ['bt_hid'] if bt_hid_device.CheckSerialConnection() else []
class AudioLoopbackDongleLabel(base_label.BaseLabel):
"""Return the label if an audio loopback dongle is plugged in."""
_NAME = 'audio_loopback_dongle'
def exists(self, host):
nodes_info = host.run(command=cras_utils.get_cras_nodes_cmd(),
ignore_status=True).stdout
if (cras_utils.node_type_is_plugged('HEADPHONE', nodes_info) and
cras_utils.node_type_is_plugged('MIC', nodes_info)):
return True
return False
class PowerSupplyLabel(base_label.StringPrefixLabel):
"""
Return the label describing the power supply type.
Labels representing this host's power supply.
* `power:battery` when the device has a battery intended for
extended use
* `power:AC_primary` when the device has a battery not intended
for extended use (for moving the machine, etc)
* `power:AC_only` when the device has no battery at all.
"""
_NAME = 'power'
def __init__(self):
self.psu_cmd_result = None
def exists(self, host):
self.psu_cmd_result = host.run(command='mosys psu type',
ignore_status=True)
return self.psu_cmd_result.stdout.strip() != 'unknown'
def generate_labels(self, host):
if self.psu_cmd_result.exit_status:
# The psu command for mosys is not included for all platforms. The
# assumption is that the device will have a battery if the command
# is not found.
return ['battery']
return [self.psu_cmd_result.stdout.strip()]
class StorageLabel(base_label.StringPrefixLabel):
"""
Return the label describing the storage type.
Determine if the internal device is SCSI or dw_mmc device.
Then check that it is SSD or HDD or eMMC or something else.
Labels representing this host's internal device type:
* `storage:ssd` when internal device is solid state drive
* `storage:hdd` when internal device is hard disk drive
* `storage:mmc` when internal device is mmc drive
* `storage:nvme` when internal device is NVMe drive
* `storage:ufs` when internal device is ufs drive
* None When internal device is something else or
when we are unable to determine the type
"""
_NAME = 'storage'
def __init__(self):
self.type_str = ''
def exists(self, host):
# The output should be /dev/mmcblk* for SD/eMMC or /dev/sd* for scsi
rootdev_cmd = ' '.join(['. /usr/sbin/write_gpt.sh;',
'. /usr/share/misc/chromeos-common.sh;',
'load_base_vars;',
'get_fixed_dst_drive'])
rootdev = host.run(command=rootdev_cmd, ignore_status=True)
if rootdev.exit_status:
logging.info("Fail to run %s", rootdev_cmd)
return False
rootdev_str = rootdev.stdout.strip()
if not rootdev_str:
return False
rootdev_base = os.path.basename(rootdev_str)
mmc_pattern = '/dev/mmcblk[0-9]'
if re.match(mmc_pattern, rootdev_str):
# Use type to determine if the internal device is eMMC or somthing
# else. We can assume that MMC is always an internal device.
type_cmd = 'cat /sys/block/%s/device/type' % rootdev_base
type = host.run(command=type_cmd, ignore_status=True)
if type.exit_status:
logging.info("Fail to run %s", type_cmd)
return False
type_str = type.stdout.strip()
if type_str == 'MMC':
self.type_str = 'mmc'
return True
scsi_pattern = '/dev/sd[a-z]+'
if re.match(scsi_pattern, rootdev.stdout):
# Read symlink for /sys/block/sd* to determine if the internal
# device is connected via ata or usb.
link_cmd = 'readlink /sys/block/%s' % rootdev_base
link = host.run(command=link_cmd, ignore_status=True)
if link.exit_status:
logging.info("Fail to run %s", link_cmd)
return False
link_str = link.stdout.strip()
if 'usb' in link_str:
return False
elif 'ufs' in link_str:
self.type_str = 'ufs'
return True
# Read rotation to determine if the internal device is ssd or hdd.
rotate_cmd = str('cat /sys/block/%s/queue/rotational'
% rootdev_base)
rotate = host.run(command=rotate_cmd, ignore_status=True)
if rotate.exit_status:
logging.info("Fail to run %s", rotate_cmd)
return False
rotate_str = rotate.stdout.strip()
rotate_dict = {'0':'ssd', '1':'hdd'}
self.type_str = rotate_dict.get(rotate_str)
return True
nvme_pattern = '/dev/nvme[0-9]+n[0-9]+'
if re.match(nvme_pattern, rootdev_str):
self.type_str = 'nvme'
return True
# All other internal device / error case will always fall here
return False
def generate_labels(self, host):
return [self.type_str]
class ServoLabel(base_label.BaseLabel):
"""Label to apply if a servo is present."""
_NAME = 'servo'
def exists(self, host):
"""
Check if the servo label should apply to the host or not.
@returns True if a servo host is detected, False otherwise.
"""
servo_host_hostname = None
servo_args = servo_host.get_servo_args_for_host(host)
if servo_args:
servo_host_hostname = servo_args.get(servo_host.SERVO_HOST_ATTR)
return (servo_host_hostname is not None
and servo_host.servo_host_is_up(servo_host_hostname))
class ArcLabel(base_label.BaseLabel):
"""Label indicates if host has ARC support."""
_NAME = 'arc'
@base_label.forever_exists_decorate
def exists(self, host):
return 0 == host.run(
'grep CHROMEOS_ARC_VERSION /etc/lsb-release',
ignore_status=True).exit_status
class CtsArchLabel(base_label.StringLabel):
"""Labels to determine the abi of the CTS bundle (arm or x86 only)."""
_NAME = ['cts_abi_arm', 'cts_abi_x86', 'cts_cpu_arm', 'cts_cpu_x86']
def _get_cts_abis(self, arch):
"""Return supported CTS ABIs.
@return List of supported CTS bundle ABIs.
"""
cts_abis = {'x86_64': ['arm', 'x86'], 'arm': ['arm']}
return cts_abis.get(arch, [])
def _get_cts_cpus(self, arch):
"""Return supported CTS native CPUs.
This is needed for CTS_Instant scheduling.
@return List of supported CTS native CPUs.
"""
cts_cpus = {'x86_64': ['x86'], 'arm': ['arm']}
return cts_cpus.get(arch, [])
def generate_labels(self, host):
cpu_arch = host.get_cpu_arch()
abi_labels = ['cts_abi_' + abi for abi in self._get_cts_abis(cpu_arch)]
cpu_labels = ['cts_cpu_' + cpu for cpu in self._get_cts_cpus(cpu_arch)]
return abi_labels + cpu_labels
class VideoGlitchLabel(base_label.BaseLabel):
"""Label indicates if host supports video glitch detection tests."""
_NAME = 'video_glitch_detection'
def exists(self, host):
board = host.get_board().replace(ds_constants.BOARD_PREFIX, '')
return board in video_test_constants.SUPPORTED_BOARDS
class InternalDisplayLabel(base_label.StringLabel):
"""Label that determines if the device has an internal display."""
_NAME = 'internal_display'
def generate_labels(self, host):
from autotest_lib.client.cros.graphics import graphics_utils
from autotest_lib.client.common_lib import utils as common_utils
def __system_output(cmd):
return host.run(cmd).stdout
def __read_file(remote_path):
return host.run('cat %s' % remote_path).stdout
# Hijack the necessary client functions so that we can take advantage
# of the client lib here.
# FIXME: find a less hacky way than this
original_system_output = utils.system_output
original_read_file = common_utils.read_file
utils.system_output = __system_output
common_utils.read_file = __read_file
try:
return ([self._NAME]
if graphics_utils.has_internal_display()
else [])
finally:
utils.system_output = original_system_output
common_utils.read_file = original_read_file
class LucidSleepLabel(base_label.BaseLabel):
"""Label that determines if device has support for lucid sleep."""
# TODO(kevcheng): See if we can determine if this label is applicable a
# better way (crbug.com/592146).
_NAME = 'lucidsleep'
LUCID_SLEEP_BOARDS = ['nocturne', 'poppy']
def exists(self, host):
board = host.get_board().replace(ds_constants.BOARD_PREFIX, '')
return board in self.LUCID_SLEEP_BOARDS
class HWIDLabel(base_label.StringLabel):
"""Return all the labels generated from the hwid."""
# We leave out _NAME because hwid_lib will generate everything for us.
def __init__(self):
# Grab the key file needed to access the hwid service.
self.key_file = global_config.global_config.get_config_value(
'CROS', 'HWID_KEY', type=str)
def generate_labels(self, host):
hwid_labels = []
hwid = host.run_output('crossystem hwid').strip()
hwid_info_list = hwid_lib.get_hwid_info(hwid, hwid_lib.HWID_INFO_LABEL,
self.key_file).get('labels', [])
for hwid_info in hwid_info_list:
# If it's a prefix, we'll have:
# {'name': prefix_label, 'value': postfix_label} and create
# 'prefix_label:postfix_label'; otherwise it'll just be
# {'name': label} which should just be 'label'.
value = hwid_info.get('value', '')
name = hwid_info.get('name', '')
# There should always be a name but just in case there is not.
if name:
hwid_labels.append(name if not value else
'%s:%s' % (name, value))
return hwid_labels
def get_all_labels(self):
"""We need to try all labels as a prefix and as standalone.
We don't know for sure which labels are prefix labels and which are
standalone so we try all of them as both.
"""
all_hwid_labels = []
try:
all_hwid_labels = hwid_lib.get_all_possible_dut_labels(
self.key_file)
except IOError:
logging.error('Can not open key file: %s', self.key_file)
except hwid_lib.HwIdException as e:
logging.error('hwid service: %s', e)
return all_hwid_labels, all_hwid_labels
class DetachableBaseLabel(base_label.BaseLabel):
"""Label indicating if device has detachable keyboard."""
_NAME = 'detachablebase'
def exists(self, host):
return host.run('which hammerd', ignore_status=True).exit_status == 0
class FingerprintLabel(base_label.BaseLabel):
"""Label indicating whether device has fingerprint sensor."""
_NAME = 'fingerprint'
def exists(self, host):
return host.run('test -c /dev/cros_fp',
ignore_status=True).exit_status == 0
class ReferenceDesignLabel(base_label.StringPrefixLabel):
"""Determine the correct reference design label for the device. """
_NAME = 'reference_design'
def __init__(self):
self.response = None
def exists(self, host):
self.response = host.run('mosys platform family', ignore_status=True)
return self.response.exit_status == 0
def generate_labels(self, host):
if self.exists(host):
return [self.response.stdout.strip()]
CROS_LABELS = [
AccelsLabel(),
ArcLabel(),
AudioLoopbackDongleLabel(),
BluetoothLabel(),
BoardLabel(),
ModelLabel(),
ChameleonConnectionLabel(),
ChameleonLabel(),
ChameleonPeripheralsLabel(),
common_label.OSLabel(),
Cr50Label(),
CtsArchLabel(),
DetachableBaseLabel(),
ECLabel(),
FingerprintLabel(),
HWIDLabel(),
InternalDisplayLabel(),
LightSensorLabel(),
LucidSleepLabel(),
PowerSupplyLabel(),
ReferenceDesignLabel(),
ServoLabel(),
StorageLabel(),
VideoGlitchLabel(),
]