# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import logging, os, re, time, random
from autotest_lib.client.bin import utils
from autotest_lib.server import test
from autotest_lib.client.common_lib import error
from autotest_lib.server.cros.usb_mux_controller import USBMuxController
_WAIT_DELAY = 5
_USB_DIR = '/sys/bus/usb/devices'
MAX_PORTS = 8
TMP_FAILED_TEST_LIST = list()
PORT_BEING_TESTED = list()
class kernel_ExternalUsbPeripheralsDetectionStress(test.test):
"""Uses USB multiplexer to repeatedly connect and disconnect USB devices."""
version = 1
def set_hub_power(self, on=True):
"""Setting USB hub power status
@param on: To power on the servo-usb hub or not.
"""
reset = 'off'
if not on:
reset = 'on'
self.host.servo.set('dut_hub1_rst1', reset)
time.sleep(_WAIT_DELAY)
def check_manufacturer_and_product_info(
self, vId, pId, manufacturer, pName):
"""Check manufacturer and product info from lsusb against dict values.
@param vId: Vendor id of the connected USB device.
@param pId: Product id of the connected USB device.
@param manufacturer: Manufacturer name of the connected USB device.
@param pName: Product name of the connected USB device
@param result: To track test result.
@return result value
"""
result = True
manu_cmd = ('lsusb -v -d ' + vId + ':' + pId + ' | grep iManufacturer')
prod_cmd = ('lsusb -v -d ' + vId + ':' + pId + ' | grep iProduct')
manu_cmd_output = (self.host.run(manu_cmd, ignore_status=True).
stdout.strip())
prod_cmd_output = (self.host.run(prod_cmd, ignore_status=True).
stdout.strip())
manu_verify = 'iManufacturer.*' + manufacturer
prod_verify = 'iProduct.*' + pName
match_result_manu = re.search(manu_verify, manu_cmd_output) != None
match_result_prod = re.search(prod_verify, prod_cmd_output) != None
if not match_result_manu or not match_result_prod:
logging.debug('Manufacturer or productName do not match.')
result = False
return result
def check_driver_symlink_and_dir(self, devicePath, productName):
"""Check driver symlink and dir against devicePath value from dict.
@param devicePath: Device driver path.
@param productName: Product name of the connected USB device.
@param result: To track test result.
@return result value
"""
result = True
tmp_list = [device_dir for device_dir in
self.host.run('ls -1 %s' % devicePath,
ignore_status=True).stdout.split('\n')
if re.match(r'\d-\d.*:\d\.\d', device_dir)]
if not tmp_list:
logging.debug('No driver created/loaded for %s', productName)
result = False
flag = False
for device_dir in tmp_list:
driver_path = os.path.join(devicePath,
'%s/driver' % device_dir)
if self._exists_on(driver_path):
flag = True
link = (self.host.run('ls -l %s | grep ^l'
'| grep driver'
% driver_path, ignore_status=True)
.stdout.strip())
logging.info('%s', link)
if not flag:
logging.debug('Device driver not found')
result = False
return result
def check_usb_peripherals_details(self, connected_device_dict):
"""Checks USB peripheral details against the values in dictionary.
@param connected_device_dict: Dictionary of device attributes.
"""
result = True
usbPort = connected_device_dict['usb_port']
PORT_BEING_TESTED.append(usbPort)
self.usb_mux.enable_port(usbPort)
vendorId = connected_device_dict['deviceInfo']['vendorId']
productId = connected_device_dict['deviceInfo']['productId']
manufacturer = connected_device_dict['deviceInfo']['manufacturer']
productName = connected_device_dict['deviceInfo']['productName']
lsusbOutput = connected_device_dict['deviceInfo']['lsusb']
devicePath = connected_device_dict['deviceInfo']['devicePath']
try:
utils.poll_for_condition(
lambda: self.host.path_exists(devicePath),
exception=utils.TimeoutError('Trouble finding USB device '
'on port %d' % usbPort), timeout=15, sleep_interval=1)
except utils.TimeoutError:
logging.debug('Trouble finding USB device on port %d', usbPort)
result = False
pass
if not ((vendorId + ':' + productId in lsusbOutput) and
self.check_manufacturer_and_product_info(
vendorId, productId, manufacturer, productName) and
self.check_driver_symlink_and_dir(devicePath, productName)):
result = False
self.usb_mux.disable_all_ports()
try:
utils.poll_for_condition(
lambda: not self.host.path_exists(devicePath),
exception=utils.TimeoutError('Device driver path does not '
'disappear after device is disconnected.'), timeout=15,
sleep_interval=1)
except utils.TimeoutError:
logging.debug('Device driver path is still present after device is '
'disconnected from the DUT.')
result = False
pass
if result is False:
logging.debug('Test failed on port %s.', usbPort)
TMP_FAILED_TEST_LIST.append(usbPort)
def get_usb_device_dirs(self):
"""Gets the usb device dirs from _USB_DIR path.
@returns list with number of device dirs else None
"""
usb_dir_list = list()
cmd = 'ls -1 %s' % _USB_DIR
cmd_output = self.host.run(cmd).stdout.strip().split('\n')
for d in cmd_output:
usb_dir_list.append(os.path.join(_USB_DIR, d))
return usb_dir_list
def parse_device_dir_for_info(self, dir_list, usb_device_dict):
"""Uses vendorId/device path and to get other device attributes.
@param dir_list: Complete path of directories.
@param usb_device_dict: Dictionary to store device attributes.
@returns usb_device_dict with device attributes
"""
for d_path in dir_list:
file_name = os.path.join(d_path, 'idVendor')
if self._exists_on(file_name):
vendor_id = self.host.run('cat %s' % file_name).stdout.strip()
if vendor_id:
usb_device_dict['deviceInfo']['vendorId'] = vendor_id
usb_device_dict['deviceInfo']['devicePath'] = d_path
usb_device_dict['deviceInfo']['productId'] = (
self.get_product_info(d_path, 'idProduct'))
usb_device_dict['deviceInfo']['productName'] = (
self.get_product_info(d_path, 'product'))
usb_device_dict['deviceInfo']['manufacturer'] = (
self.get_product_info(d_path, 'manufacturer'))
return usb_device_dict
def get_product_info(self, directory, prod_string):
"""Gets the product id, name and manufacturer info from device path.
@param directory: Driver path for the USB device.
@param prod_string: Device attribute string.
returns the output of the cat command
"""
product_file_name = os.path.join(directory, prod_string)
if self._exists_on(product_file_name):
return self.host.run('cat %s' % product_file_name).stdout.strip()
return None
def _exists_on(self, path):
"""Checks if file exists on host or not.
@returns True or False
"""
return self.host.run('ls %s' % path,
ignore_status=True).exit_status == 0
def check_lsusb_diff(self, original_lsusb_output):
"""Compare LSUSB output for before and after connecting each device.
@param original_lsusb_output: lsusb output prior to connecting device.
@returns the difference between new and old lsusb outputs
"""
lsusb_output = (self.host.run('lsusb', ignore_status=True).
stdout.strip().split('\n'))
lsusb_diff = (list(set(lsusb_output).
difference(set(original_lsusb_output))))
return lsusb_diff
def run_once(self, host, loop_count):
"""Main function to run the autotest.
@param host: Host object representing the DUT.
@param loop_count: Number of iteration cycles.
@raise error.TestFail if one or more USB devices are not detected
"""
self.host = host
self.usb_mux = USBMuxController(self.host)
# Make sure all USB ports are disabled prior to starting the test.
self.usb_mux.disable_all_ports()
self.host.servo.switch_usbkey('dut')
self.host.servo.set('usb_mux_sel3', 'dut_sees_usbkey')
self.set_hub_power(False)
# Collect the USB devices directories before switching on hub
usb_list_dir_off = self.get_usb_device_dirs()
self.set_hub_power(True)
# Collect the USB devices directories after switching on hub
usb_list_dir_on = self.get_usb_device_dirs()
lsusb_original_out = (self.host.run('lsusb', ignore_status=True).
stdout.strip().split('\n'))
list_of_usb_device_dictionaries = list()
usb_port = 0
# Extract connected USB device information and store it in a dict.
while usb_port < MAX_PORTS:
usb_device_dict = {'usb_port':None,'deviceInfo':
{'devicePath':None,'vendorId':None,'productId':None,
'productName':None,'manufacturer':None,'lsusb':None}}
usb_device_dir_list = list()
self.usb_mux.enable_port(usb_port)
try:
utils.poll_for_condition(
lambda: self.check_lsusb_diff(lsusb_original_out),
exception=utils.TimeoutError('No USB device on port '
'%d' % usb_port), timeout=_WAIT_DELAY, sleep_interval=1)
except utils.TimeoutError:
logging.debug('No USB device found on port %d', usb_port)
pass
# Maintain list of associated dirs for each connected USB device
for device in self.get_usb_device_dirs():
if device not in usb_list_dir_on:
usb_device_dir_list.append(device)
usb_device_dict = self.parse_device_dir_for_info(
usb_device_dir_list, usb_device_dict)
lsusb_diff = self.check_lsusb_diff(lsusb_original_out)
if lsusb_diff:
usb_device_dict['usb_port'] = usb_port
usb_device_dict['deviceInfo']['lsusb'] = lsusb_diff[0]
list_of_usb_device_dictionaries.append(usb_device_dict)
self.usb_mux.disable_all_ports()
try:
utils.poll_for_condition(
lambda: not self.check_lsusb_diff(lsusb_original_out),
exception=utils.TimeoutError('Timed out waiting for '
'USB device to disappear.'), timeout=_WAIT_DELAY,
sleep_interval=1)
except utils.TimeoutError:
logging.debug('Timed out waiting for USB device to disappear.')
pass
logging.info('%s', usb_device_dict)
usb_port += 1
if len(list_of_usb_device_dictionaries) == 0:
# Fails if no devices detected
raise error.TestError('No connected devices were detected. Make '
'sure the devices are connected to USB_KEY '
'and DUT_HUB1_USB on the servo board.')
logging.info('Connected devices list: %s',
list_of_usb_device_dictionaries)
# loop_count defines the number of times the USB peripheral details
# should be checked and random.choice is used to randomly select one of
# the elements from the usb device list specifying the device whose
# details should be checked.
for i in xrange(loop_count):
self.check_usb_peripherals_details(
random.choice(list_of_usb_device_dictionaries))
logging.info('Sequence of ports tested with random picker: %s',
', '.join(map(str, PORT_BEING_TESTED)))
if TMP_FAILED_TEST_LIST:
logging.info('Failed to verify devices on following ports: %s',
', '.join(map(str, TMP_FAILED_TEST_LIST)))
raise error.TestFail('Failed to do full device verification on '
'some ports.')