普通文本  |  632行  |  21.14 KB

# Copyright 2016 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

"""This class defines the CrosHost Label class."""

import collections
import logging
import os
import re

import common

from autotest_lib.client.bin import utils
from autotest_lib.client.common_lib import global_config
from autotest_lib.client.cros.audio import cras_utils
from autotest_lib.client.cros.video import constants as video_test_constants
from autotest_lib.server.cros.dynamic_suite import constants as ds_constants
from autotest_lib.server.hosts import base_label
from autotest_lib.server.hosts import common_label
from autotest_lib.server.hosts import servo_host
from autotest_lib.site_utils import hwid_lib

# pylint: disable=missing-docstring
LsbOutput = collections.namedtuple('LsbOutput', ['unibuild', 'board'])

def _parse_lsb_output(host):
    """Parses the LSB output and returns key data points for labeling.

    @param host: Host that the command will be executed against
    @returns: LsbOutput with the result of parsing the /etc/lsb-release output
    """
    release_info = utils.parse_cmd_output('cat /etc/lsb-release',
                                          run_method=host.run)

    unibuild = release_info.get('CHROMEOS_RELEASE_UNIBUILD') == '1'
    return LsbOutput(unibuild, release_info['CHROMEOS_RELEASE_BOARD'])


class BoardLabel(base_label.StringPrefixLabel):
    """Determine the correct board label for the device."""

    _NAME = ds_constants.BOARD_PREFIX.rstrip(':')

    def generate_labels(self, host):
        # We only want to apply the board labels once, which is when they get
        # added to the AFE.  That way we don't have to worry about the board
        # label switching on us if the wrong builds get put on the devices.
        # crbug.com/624207 records one event of the board label switching
        # unexpectedly on us.
        board = host.host_info_store.get().board
        if board:
            return [board]
        for label in host._afe_host.labels:
            if label.startswith(self._NAME + ':'):
                return [label.split(':')[-1]]

        return [_parse_lsb_output(host).board]


class ModelLabel(base_label.StringPrefixLabel):
    """Determine the correct model label for the device."""

    _NAME = ds_constants.MODEL_LABEL

    def generate_labels(self, host):
        # Based on the issue explained in BoardLabel, return the existing
        # label if it has already been set once.
        model = host.host_info_store.get().model
        if model:
            return [model]
        for label in host._afe_host.labels:
            if label.startswith(self._NAME + ':'):
                return [label.split(':')[-1]]

        lsb_output = _parse_lsb_output(host)
        model = None

        if lsb_output.unibuild:
            test_label_cmd = 'cros_config / test-label'
            result = host.run(command=test_label_cmd, ignore_status=True)
            if result.exit_status == 0:
                model = result.stdout.strip()
            if not model:
                mosys_cmd = 'mosys platform model'
                result = host.run(command=mosys_cmd, ignore_status=True)
                if result.exit_status == 0:
                    model = result.stdout.strip()

        # We need some sort of backwards compatibility for boards that
        # are not yet supported with mosys and unified builds.
        # This is necessary so that we can begin changing cbuildbot to take
        # advantage of the model/board label differentiations for
        # scheduling, while still retaining backwards compatibility.
        return [model or lsb_output.board]


class LightSensorLabel(base_label.BaseLabel):
    """Label indicating if a light sensor is detected."""

    _NAME = 'lightsensor'
    _LIGHTSENSOR_SEARCH_DIR = '/sys/bus/iio/devices'
    _LIGHTSENSOR_FILES = [
        "in_illuminance0_input",
        "in_illuminance_input",
        "in_illuminance0_raw",
        "in_illuminance_raw",
        "illuminance0_input",
    ]

    def exists(self, host):
        search_cmd = "find -L %s -maxdepth 4 | egrep '%s'" % (
            self._LIGHTSENSOR_SEARCH_DIR, '|'.join(self._LIGHTSENSOR_FILES))
        # Run the search cmd following the symlinks. Stderr_tee is set to
        # None as there can be a symlink loop, but the command will still
        # execute correctly with a few messages printed to stderr.
        result = host.run(search_cmd, stdout_tee=None, stderr_tee=None,
                          ignore_status=True)

        return result.exit_status == 0


class BluetoothLabel(base_label.BaseLabel):
    """Label indicating if bluetooth is detected."""

    _NAME = 'bluetooth'

    def exists(self, host):
        result = host.run('test -d /sys/class/bluetooth/hci0',
                          ignore_status=True)

        return result.exit_status == 0


class ECLabel(base_label.BaseLabel):
    """Label to determine the type of EC on this host."""

    _NAME = 'ec:cros'

    def exists(self, host):
        cmd = 'mosys ec info'
        # The output should look like these, so that the last field should
        # match our EC version scheme:
        #
        #   stm | stm32f100 | snow_v1.3.139-375eb9f
        #   ti | Unknown-10de | peppy_v1.5.114-5d52788
        #
        # Non-Chrome OS ECs will look like these:
        #
        #   ENE | KB932 | 00BE107A00
        #   ite | it8518 | 3.08
        #
        # And some systems don't have ECs at all (Lumpy, for example).
        regexp = r'^.*\|\s*(\S+_v\d+\.\d+\.\d+-[0-9a-f]+)\s*$'

        ecinfo = host.run(command=cmd, ignore_status=True)
        if ecinfo.exit_status == 0:
            res = re.search(regexp, ecinfo.stdout)
            if res:
                logging.info("EC version is %s", res.groups()[0])
                return True
            logging.info("%s got: %s", cmd, ecinfo.stdout)
            # Has an EC, but it's not a Chrome OS EC
        logging.info("%s exited with status %d", cmd, ecinfo.exit_status)
        return False


class Cr50Label(base_label.StringPrefixLabel):
    """Label indicating the cr50 version."""

    _NAME = 'cr50'

    def __init__(self):
        self.ver = None


    def exists(self, host):
        # Make sure the gsctool version command runs ok
        self.ver = host.run('gsctool -a -f', ignore_status=True)
        return self.ver.exit_status == 0


    def generate_labels(self, host):
        # Check the major version to determine prePVT vs PVT
        major_ver = int(re.search('RW \d+\.(\d+)\.\d+[\r\n]',
                self.ver.stdout).group(1))
        # PVT images have a odd major version prePVT have even
        return ['pvt' if (major_ver % 2) else 'prepvt']


class AccelsLabel(base_label.BaseLabel):
    """Determine the type of accelerometers on this host."""

    _NAME = 'accel:cros-ec'

    def exists(self, host):
        # Check to make sure we have ectool
        rv = host.run('which ectool', ignore_status=True)
        if rv.exit_status:
            logging.info("No ectool cmd found; assuming no EC accelerometers")
            return False

        # Check that the EC supports the motionsense command
        rv = host.run('ectool motionsense', ignore_status=True)
        if rv.exit_status:
            logging.info("EC does not support motionsense command; "
                         "assuming no EC accelerometers")
            return False

        # Check that EC motion sensors are active
        active = host.run('ectool motionsense active').stdout.split('\n')
        if active[0] == "0":
            logging.info("Motion sense inactive; assuming no EC accelerometers")
            return False

        logging.info("EC accelerometers found")
        return True


class ChameleonLabel(base_label.BaseLabel):
    """Determine if a Chameleon is connected to this host."""

    _NAME = 'chameleon'

    def exists(self, host):
        return host._chameleon_host is not None


class ChameleonConnectionLabel(base_label.StringPrefixLabel):
    """Return the Chameleon connection label."""

    _NAME = 'chameleon'

    def exists(self, host):
        return host._chameleon_host is not None


    def generate_labels(self, host):
        return [host.chameleon.get_label()]


class ChameleonPeripheralsLabel(base_label.StringPrefixLabel):
    """Return the Chameleon peripherals labels.

    The 'chameleon:bt_hid' label is applied if the bluetooth
    classic hid device, i.e, RN-42 emulation kit, is detected.

    Any peripherals plugged into the chameleon board would be
    detected and applied proper labels in this class.
    """

    _NAME = 'chameleon'

    def exists(self, host):
        return host._chameleon_host is not None


    def generate_labels(self, host):
        bt_hid_device = host.chameleon.get_bluetooth_hid_mouse()
        return ['bt_hid'] if bt_hid_device.CheckSerialConnection() else []


class AudioLoopbackDongleLabel(base_label.BaseLabel):
    """Return the label if an audio loopback dongle is plugged in."""

    _NAME = 'audio_loopback_dongle'

    def exists(self, host):
        nodes_info = host.run(command=cras_utils.get_cras_nodes_cmd(),
                              ignore_status=True).stdout
        if (cras_utils.node_type_is_plugged('HEADPHONE', nodes_info) and
            cras_utils.node_type_is_plugged('MIC', nodes_info)):
                return True
        return False


class PowerSupplyLabel(base_label.StringPrefixLabel):
    """
    Return the label describing the power supply type.

    Labels representing this host's power supply.
         * `power:battery` when the device has a battery intended for
                extended use
         * `power:AC_primary` when the device has a battery not intended
                for extended use (for moving the machine, etc)
         * `power:AC_only` when the device has no battery at all.
    """

    _NAME = 'power'

    def __init__(self):
        self.psu_cmd_result = None


    def exists(self, host):
        self.psu_cmd_result = host.run(command='mosys psu type',
                                       ignore_status=True)
        return self.psu_cmd_result.stdout.strip() != 'unknown'


    def generate_labels(self, host):
        if self.psu_cmd_result.exit_status:
            # The psu command for mosys is not included for all platforms. The
            # assumption is that the device will have a battery if the command
            # is not found.
            return ['battery']
        return [self.psu_cmd_result.stdout.strip()]


class StorageLabel(base_label.StringPrefixLabel):
    """
    Return the label describing the storage type.

    Determine if the internal device is SCSI or dw_mmc device.
    Then check that it is SSD or HDD or eMMC or something else.

    Labels representing this host's internal device type:
             * `storage:ssd` when internal device is solid state drive
             * `storage:hdd` when internal device is hard disk drive
             * `storage:mmc` when internal device is mmc drive
             * `storage:nvme` when internal device is NVMe drive
             * `storage:ufs` when internal device is ufs drive
             * None          When internal device is something else or
                             when we are unable to determine the type
    """

    _NAME = 'storage'

    def __init__(self):
        self.type_str = ''


    def exists(self, host):
        # The output should be /dev/mmcblk* for SD/eMMC or /dev/sd* for scsi
        rootdev_cmd = ' '.join(['. /usr/sbin/write_gpt.sh;',
                                '. /usr/share/misc/chromeos-common.sh;',
                                'load_base_vars;',
                                'get_fixed_dst_drive'])
        rootdev = host.run(command=rootdev_cmd, ignore_status=True)
        if rootdev.exit_status:
            logging.info("Fail to run %s", rootdev_cmd)
            return False
        rootdev_str = rootdev.stdout.strip()

        if not rootdev_str:
            return False

        rootdev_base = os.path.basename(rootdev_str)

        mmc_pattern = '/dev/mmcblk[0-9]'
        if re.match(mmc_pattern, rootdev_str):
            # Use type to determine if the internal device is eMMC or somthing
            # else. We can assume that MMC is always an internal device.
            type_cmd = 'cat /sys/block/%s/device/type' % rootdev_base
            type = host.run(command=type_cmd, ignore_status=True)
            if type.exit_status:
                logging.info("Fail to run %s", type_cmd)
                return False
            type_str = type.stdout.strip()

            if type_str == 'MMC':
                self.type_str = 'mmc'
                return True

        scsi_pattern = '/dev/sd[a-z]+'
        if re.match(scsi_pattern, rootdev.stdout):
            # Read symlink for /sys/block/sd* to determine if the internal
            # device is connected via ata or usb.
            link_cmd = 'readlink /sys/block/%s' % rootdev_base
            link = host.run(command=link_cmd, ignore_status=True)
            if link.exit_status:
                logging.info("Fail to run %s", link_cmd)
                return False
            link_str = link.stdout.strip()
            if 'usb' in link_str:
                return False
            elif 'ufs' in link_str:
              self.type_str = 'ufs'
              return True

            # Read rotation to determine if the internal device is ssd or hdd.
            rotate_cmd = str('cat /sys/block/%s/queue/rotational'
                              % rootdev_base)
            rotate = host.run(command=rotate_cmd, ignore_status=True)
            if rotate.exit_status:
                logging.info("Fail to run %s", rotate_cmd)
                return False
            rotate_str = rotate.stdout.strip()

            rotate_dict = {'0':'ssd', '1':'hdd'}
            self.type_str = rotate_dict.get(rotate_str)
            return True

        nvme_pattern = '/dev/nvme[0-9]+n[0-9]+'
        if re.match(nvme_pattern, rootdev_str):
            self.type_str = 'nvme'
            return True

        # All other internal device / error case will always fall here
        return False


    def generate_labels(self, host):
        return [self.type_str]


class ServoLabel(base_label.BaseLabel):
    """Label to apply if a servo is present."""

    _NAME = 'servo'

    def exists(self, host):
        """
        Check if the servo label should apply to the host or not.

        @returns True if a servo host is detected, False otherwise.
        """
        servo_host_hostname = None
        servo_args = servo_host.get_servo_args_for_host(host)
        if servo_args:
            servo_host_hostname = servo_args.get(servo_host.SERVO_HOST_ATTR)
        return (servo_host_hostname is not None
                and servo_host.servo_host_is_up(servo_host_hostname))


class ArcLabel(base_label.BaseLabel):
    """Label indicates if host has ARC support."""

    _NAME = 'arc'

    @base_label.forever_exists_decorate
    def exists(self, host):
        return 0 == host.run(
            'grep CHROMEOS_ARC_VERSION /etc/lsb-release',
            ignore_status=True).exit_status


class CtsArchLabel(base_label.StringLabel):
    """Labels to determine the abi of the CTS bundle (arm or x86 only)."""

    _NAME = ['cts_abi_arm', 'cts_abi_x86', 'cts_cpu_arm', 'cts_cpu_x86']

    def _get_cts_abis(self, arch):
        """Return supported CTS ABIs.

        @return List of supported CTS bundle ABIs.
        """
        cts_abis = {'x86_64': ['arm', 'x86'], 'arm': ['arm']}
        return cts_abis.get(arch, [])

    def _get_cts_cpus(self, arch):
        """Return supported CTS native CPUs.

        This is needed for CTS_Instant scheduling.
        @return List of supported CTS native CPUs.
        """
        cts_cpus = {'x86_64': ['x86'], 'arm': ['arm']}
        return cts_cpus.get(arch, [])

    def generate_labels(self, host):
        cpu_arch = host.get_cpu_arch()
        abi_labels = ['cts_abi_' + abi for abi in self._get_cts_abis(cpu_arch)]
        cpu_labels = ['cts_cpu_' + cpu for cpu in self._get_cts_cpus(cpu_arch)]
        return abi_labels + cpu_labels


class VideoGlitchLabel(base_label.BaseLabel):
    """Label indicates if host supports video glitch detection tests."""

    _NAME = 'video_glitch_detection'

    def exists(self, host):
        board = host.get_board().replace(ds_constants.BOARD_PREFIX, '')

        return board in video_test_constants.SUPPORTED_BOARDS


class InternalDisplayLabel(base_label.StringLabel):
    """Label that determines if the device has an internal display."""

    _NAME = 'internal_display'

    def generate_labels(self, host):
        from autotest_lib.client.cros.graphics import graphics_utils
        from autotest_lib.client.common_lib import utils as common_utils

        def __system_output(cmd):
            return host.run(cmd).stdout

        def __read_file(remote_path):
            return host.run('cat %s' % remote_path).stdout

        # Hijack the necessary client functions so that we can take advantage
        # of the client lib here.
        # FIXME: find a less hacky way than this
        original_system_output = utils.system_output
        original_read_file = common_utils.read_file
        utils.system_output = __system_output
        common_utils.read_file = __read_file
        try:
            return ([self._NAME]
                    if graphics_utils.has_internal_display()
                    else [])
        finally:
            utils.system_output = original_system_output
            common_utils.read_file = original_read_file


class LucidSleepLabel(base_label.BaseLabel):
    """Label that determines if device has support for lucid sleep."""

    # TODO(kevcheng): See if we can determine if this label is applicable a
    # better way (crbug.com/592146).
    _NAME = 'lucidsleep'
    LUCID_SLEEP_BOARDS = ['nocturne', 'poppy']

    def exists(self, host):
        board = host.get_board().replace(ds_constants.BOARD_PREFIX, '')
        return board in self.LUCID_SLEEP_BOARDS


class HWIDLabel(base_label.StringLabel):
    """Return all the labels generated from the hwid."""

    # We leave out _NAME because hwid_lib will generate everything for us.

    def __init__(self):
        # Grab the key file needed to access the hwid service.
        self.key_file = global_config.global_config.get_config_value(
                'CROS', 'HWID_KEY', type=str)


    def generate_labels(self, host):
        hwid_labels = []
        hwid = host.run_output('crossystem hwid').strip()
        hwid_info_list = hwid_lib.get_hwid_info(hwid, hwid_lib.HWID_INFO_LABEL,
                                                self.key_file).get('labels', [])

        for hwid_info in hwid_info_list:
            # If it's a prefix, we'll have:
            # {'name': prefix_label, 'value': postfix_label} and create
            # 'prefix_label:postfix_label'; otherwise it'll just be
            # {'name': label} which should just be 'label'.
            value = hwid_info.get('value', '')
            name = hwid_info.get('name', '')
            # There should always be a name but just in case there is not.
            if name:
                hwid_labels.append(name if not value else
                                   '%s:%s' % (name, value))
        return hwid_labels


    def get_all_labels(self):
        """We need to try all labels as a prefix and as standalone.

        We don't know for sure which labels are prefix labels and which are
        standalone so we try all of them as both.
        """
        all_hwid_labels = []
        try:
            all_hwid_labels = hwid_lib.get_all_possible_dut_labels(
                    self.key_file)
        except IOError:
            logging.error('Can not open key file: %s', self.key_file)
        except hwid_lib.HwIdException as e:
            logging.error('hwid service: %s', e)
        return all_hwid_labels, all_hwid_labels


class DetachableBaseLabel(base_label.BaseLabel):
    """Label indicating if device has detachable keyboard."""

    _NAME = 'detachablebase'

    def exists(self, host):
        return host.run('which hammerd', ignore_status=True).exit_status == 0


class FingerprintLabel(base_label.BaseLabel):
    """Label indicating whether device has fingerprint sensor."""

    _NAME = 'fingerprint'

    def exists(self, host):
        return host.run('test -c /dev/cros_fp',
                        ignore_status=True).exit_status == 0


class ReferenceDesignLabel(base_label.StringPrefixLabel):
    """Determine the correct reference design label for the device. """

    _NAME = 'reference_design'

    def __init__(self):
        self.response = None

    def exists(self, host):
        self.response = host.run('mosys platform family', ignore_status=True)
        return self.response.exit_status == 0

    def generate_labels(self, host):
        if self.exists(host):
            return [self.response.stdout.strip()]


CROS_LABELS = [
    AccelsLabel(),
    ArcLabel(),
    AudioLoopbackDongleLabel(),
    BluetoothLabel(),
    BoardLabel(),
    ModelLabel(),
    ChameleonConnectionLabel(),
    ChameleonLabel(),
    ChameleonPeripheralsLabel(),
    common_label.OSLabel(),
    Cr50Label(),
    CtsArchLabel(),
    DetachableBaseLabel(),
    ECLabel(),
    FingerprintLabel(),
    HWIDLabel(),
    InternalDisplayLabel(),
    LightSensorLabel(),
    LucidSleepLabel(),
    PowerSupplyLabel(),
    ReferenceDesignLabel(),
    ServoLabel(),
    StorageLabel(),
    VideoGlitchLabel(),
]