普通文本  |  332行  |  13.05 KB

# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

import logging, os, re, time, random

from autotest_lib.client.bin import utils
from autotest_lib.server import test
from autotest_lib.client.common_lib import error
from autotest_lib.server.cros.usb_mux_controller import USBMuxController

_WAIT_DELAY = 5
_USB_DIR = '/sys/bus/usb/devices'
MAX_PORTS = 8
TMP_FAILED_TEST_LIST = list()
PORT_BEING_TESTED = list()

class kernel_ExternalUsbPeripheralsDetectionStress(test.test):
    """Uses USB multiplexer to repeatedly connect and disconnect USB devices."""
    version = 1


    def set_hub_power(self, on=True):
        """Setting USB hub power status

        @param on: To power on the servo-usb hub or not.

        """
        reset = 'off'
        if not on:
            reset = 'on'
        self.host.servo.set('dut_hub1_rst1', reset)
        time.sleep(_WAIT_DELAY)


    def check_manufacturer_and_product_info(
            self, vId, pId, manufacturer, pName):
        """Check manufacturer and product info from lsusb against dict values.

        @param vId: Vendor id of the connected USB device.
        @param pId: Product id of the connected USB device.
        @param manufacturer: Manufacturer name of the connected USB device.
        @param pName: Product name of the connected USB device
        @param result: To track test result.
        @return result value

        """
        result = True
        manu_cmd = ('lsusb -v -d ' + vId + ':' +  pId + ' | grep iManufacturer')
        prod_cmd = ('lsusb -v -d ' + vId + ':' +  pId + ' | grep iProduct')

        manu_cmd_output = (self.host.run(manu_cmd, ignore_status=True).
                           stdout.strip())
        prod_cmd_output = (self.host.run(prod_cmd, ignore_status=True).
                           stdout.strip())

        manu_verify = 'iManufacturer.*' + manufacturer
        prod_verify = 'iProduct.*' + pName

        match_result_manu = re.search(manu_verify, manu_cmd_output) != None
        match_result_prod = re.search(prod_verify, prod_cmd_output) != None

        if not match_result_manu or not match_result_prod:
            logging.debug('Manufacturer or productName do not match.')
            result = False

        return result


    def check_driver_symlink_and_dir(self, devicePath, productName):
        """Check driver symlink and dir against devicePath value from dict.

        @param devicePath: Device driver path.
        @param productName: Product name of the connected USB device.
        @param result: To track test result.
        @return result value

        """
        result = True
        tmp_list = [device_dir for device_dir in
                    self.host.run('ls -1 %s' % devicePath,
                    ignore_status=True).stdout.split('\n')
                    if re.match(r'\d-\d.*:\d\.\d', device_dir)]

        if not tmp_list:
            logging.debug('No driver created/loaded for %s', productName)
            result = False

        flag = False
        for device_dir in tmp_list:
            driver_path = os.path.join(devicePath,
                                       '%s/driver' % device_dir)
            if self._exists_on(driver_path):
                flag = True
                link = (self.host.run('ls -l %s | grep ^l'
                                      '| grep driver'
                                      % driver_path, ignore_status=True)
                                      .stdout.strip())
                logging.info('%s', link)

        if not flag:
            logging.debug('Device driver not found')
            result = False

        return result


    def check_usb_peripherals_details(self, connected_device_dict):
        """Checks USB peripheral details against the values in dictionary.

        @param connected_device_dict: Dictionary of device attributes.

        """
        result = True
        usbPort = connected_device_dict['usb_port']
        PORT_BEING_TESTED.append(usbPort)
        self.usb_mux.enable_port(usbPort)

        vendorId = connected_device_dict['deviceInfo']['vendorId']
        productId = connected_device_dict['deviceInfo']['productId']
        manufacturer = connected_device_dict['deviceInfo']['manufacturer']
        productName = connected_device_dict['deviceInfo']['productName']
        lsusbOutput = connected_device_dict['deviceInfo']['lsusb']
        devicePath = connected_device_dict['deviceInfo']['devicePath']

        try:
            utils.poll_for_condition(
                    lambda: self.host.path_exists(devicePath),
                    exception=utils.TimeoutError('Trouble finding USB device '
                    'on port %d' % usbPort), timeout=15, sleep_interval=1)
        except utils.TimeoutError:
            logging.debug('Trouble finding USB device on port %d', usbPort)
            result = False
            pass

        if not ((vendorId + ':' + productId in lsusbOutput) and
                self.check_manufacturer_and_product_info(
                vendorId, productId, manufacturer, productName) and
                self.check_driver_symlink_and_dir(devicePath, productName)):
            result = False

        self.usb_mux.disable_all_ports()
        try:
            utils.poll_for_condition(
                    lambda: not self.host.path_exists(devicePath),
                    exception=utils.TimeoutError('Device driver path does not '
                    'disappear after device is disconnected.'), timeout=15,
                    sleep_interval=1)
        except utils.TimeoutError:
            logging.debug('Device driver path is still present after device is '
                         'disconnected from the DUT.')
            result = False
            pass

        if result is False:
            logging.debug('Test failed on port %s.', usbPort)
            TMP_FAILED_TEST_LIST.append(usbPort)


    def get_usb_device_dirs(self):
        """Gets the usb device dirs from _USB_DIR path.

        @returns list with number of device dirs else None

        """
        usb_dir_list = list()
        cmd = 'ls -1 %s' % _USB_DIR
        cmd_output = self.host.run(cmd).stdout.strip().split('\n')
        for d in cmd_output:
            usb_dir_list.append(os.path.join(_USB_DIR, d))
        return usb_dir_list


    def parse_device_dir_for_info(self, dir_list, usb_device_dict):
        """Uses vendorId/device path and to get other device attributes.

        @param dir_list: Complete path of directories.
        @param usb_device_dict: Dictionary to store device attributes.
        @returns usb_device_dict with device attributes

        """
        for d_path in dir_list:
            file_name = os.path.join(d_path, 'idVendor')
            if self._exists_on(file_name):
                vendor_id = self.host.run('cat %s' % file_name).stdout.strip()
                if vendor_id:
                    usb_device_dict['deviceInfo']['vendorId'] = vendor_id
                    usb_device_dict['deviceInfo']['devicePath'] = d_path
                    usb_device_dict['deviceInfo']['productId'] = (
                            self.get_product_info(d_path, 'idProduct'))
                    usb_device_dict['deviceInfo']['productName'] = (
                            self.get_product_info(d_path, 'product'))
                    usb_device_dict['deviceInfo']['manufacturer'] = (
                            self.get_product_info(d_path, 'manufacturer'))
        return usb_device_dict


    def get_product_info(self, directory, prod_string):
        """Gets the product id, name and manufacturer info from device path.

        @param directory: Driver path for the USB device.
        @param prod_string: Device attribute string.
        returns the output of the cat command

        """
        product_file_name = os.path.join(directory, prod_string)
        if self._exists_on(product_file_name):
            return self.host.run('cat %s' % product_file_name).stdout.strip()
        return None


    def _exists_on(self, path):
        """Checks if file exists on host or not.

        @returns True or False
        """
        return self.host.run('ls %s' % path,
                             ignore_status=True).exit_status == 0


    def check_lsusb_diff(self, original_lsusb_output):
        """Compare LSUSB output for before and after connecting each device.

        @param original_lsusb_output: lsusb output prior to connecting device.
        @returns the difference between new and old lsusb outputs

        """
        lsusb_output = (self.host.run('lsusb', ignore_status=True).
                        stdout.strip().split('\n'))
        lsusb_diff = (list(set(lsusb_output).
                      difference(set(original_lsusb_output))))
        return lsusb_diff


    def run_once(self, host, loop_count):
        """Main function to run the autotest.

        @param host: Host object representing the DUT.
        @param loop_count: Number of iteration cycles.
        @raise error.TestFail if one or more USB devices are not detected

        """
        self.host = host
        self.usb_mux = USBMuxController(self.host)

        # Make sure all USB ports are disabled prior to starting the test.
        self.usb_mux.disable_all_ports()

        self.host.servo.switch_usbkey('dut')
        self.host.servo.set('usb_mux_sel3', 'dut_sees_usbkey')

        self.set_hub_power(False)
        # Collect the USB devices directories before switching on hub
        usb_list_dir_off = self.get_usb_device_dirs()

        self.set_hub_power(True)
        # Collect the USB devices directories after switching on hub
        usb_list_dir_on = self.get_usb_device_dirs()

        lsusb_original_out = (self.host.run('lsusb', ignore_status=True).
                                 stdout.strip().split('\n'))
        list_of_usb_device_dictionaries = list()
        usb_port = 0

        # Extract connected USB device information and store it in a dict.
        while usb_port < MAX_PORTS:
            usb_device_dict = {'usb_port':None,'deviceInfo':
                    {'devicePath':None,'vendorId':None,'productId':None,
                    'productName':None,'manufacturer':None,'lsusb':None}}
            usb_device_dir_list = list()
            self.usb_mux.enable_port(usb_port)
            try:
                utils.poll_for_condition(
                        lambda: self.check_lsusb_diff(lsusb_original_out),
                        exception=utils.TimeoutError('No USB device on port '
                        '%d' % usb_port), timeout=_WAIT_DELAY, sleep_interval=1)
            except utils.TimeoutError:
                logging.debug('No USB device found on port %d', usb_port)
                pass

            # Maintain list of associated dirs for each connected USB device
            for device in self.get_usb_device_dirs():
                if device not in usb_list_dir_on:
                    usb_device_dir_list.append(device)

            usb_device_dict = self.parse_device_dir_for_info(
                    usb_device_dir_list, usb_device_dict)

            lsusb_diff = self.check_lsusb_diff(lsusb_original_out)
            if lsusb_diff:
                usb_device_dict['usb_port'] = usb_port
                usb_device_dict['deviceInfo']['lsusb'] = lsusb_diff[0]
                list_of_usb_device_dictionaries.append(usb_device_dict)

            self.usb_mux.disable_all_ports()
            try:
                utils.poll_for_condition(
                        lambda: not self.check_lsusb_diff(lsusb_original_out),
                        exception=utils.TimeoutError('Timed out waiting for '
                        'USB device to disappear.'), timeout=_WAIT_DELAY,
                        sleep_interval=1)
            except utils.TimeoutError:
                logging.debug('Timed out waiting for USB device to disappear.')
                pass
            logging.info('%s', usb_device_dict)
            usb_port += 1

        if len(list_of_usb_device_dictionaries) == 0:
            # Fails if no devices detected
            raise error.TestError('No connected devices were detected. Make '
                                  'sure the devices are connected to USB_KEY '
                                  'and DUT_HUB1_USB on the servo board.')
        logging.info('Connected devices list: %s',
                      list_of_usb_device_dictionaries)

        # loop_count defines the number of times the USB peripheral details
        # should be checked and random.choice is used to randomly select one of
        # the elements from the usb device list specifying the device whose
        # details should be checked.
        for i in xrange(loop_count):
            self.check_usb_peripherals_details(
                    random.choice(list_of_usb_device_dictionaries))

        logging.info('Sequence of ports tested with random picker: %s',
                      ', '.join(map(str, PORT_BEING_TESTED)))

        if TMP_FAILED_TEST_LIST:
            logging.info('Failed to verify devices on following ports: %s',
                         ', '.join(map(str, TMP_FAILED_TEST_LIST)))
            raise error.TestFail('Failed to do full device verification on '
                                 'some ports.')