# Copyright (c) 2012 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. # Author: Cosimo Alfarano <cosimo.alfarano@collabora.co.uk> import datetime import logging import os from autotest_lib.client.cros import storage as storage_mod from autotest_lib.client.common_lib import autotemp, error from autotest_lib.client.bin import base_utils USECS_IN_SEC = 1000000.0 class hardware_Usb30Throughput(storage_mod.StorageTester): version = 1 preserve_srcdir = True _autosrc = None _autodst = None results = {} def cleanup(self): if self._autosrc: self._autosrc.clean() if self._autodst: self._autodst.clean() self.scanner.unmount_all() super(hardware_Usb30Throughput, self).cleanup() def run_once(self, measurements=5, size=1, min_speed=300.0): """ @param measurements: (int) the number of measurements to do. For the test to fail at least one measurement needs to be below |min_speed| @param size: (int) size of the file to be copied for testing the transfer rate, it represent the size in megabytes. Generally speaking, the bigger is the file used for |measurements| the slower the test will run and the more accurate it will be. e.g.: 10 is 10MB, 101 is 101MB @param min_speed: (float) in Mbit/sec. It's the min throughput a USB 3.0 device should perform to be accepted. Conceptually it's the max USB 3.0 throughput minus a tollerance. Defaults to 300Mbit/sec (ie 350Mbits/sec minus ~15% tollerance) """ volume_filter = {'bus': 'usb'} storage = self.wait_for_device(volume_filter, cycles=1, mount_volume=True)[0] # in Megabytes (power of 10, to be consistent with the throughput unit) size *= 1000*1000 self._autosrc = autotemp.tempfile(unique_id='autotest.src', dir=storage['mountpoint']) self._autodst = autotemp.tempfile(unique_id='autotest.dst', dir=self.tmpdir) # Create random file storage_mod.create_file(self._autosrc.name, size) num_failures = 0 for measurement in range(measurements): xfer_rate = get_xfer_rate(self._autosrc.name, self._autodst.name) key = 'Mbit_per_sec_measurement_%d' % measurement self.results[key] = xfer_rate logging.debug('xfer rate (measurement %d) %.2f (min=%.2f)', measurement, xfer_rate, min_speed) if xfer_rate < min_speed: num_failures += 1 # Apparently self.postprocess_iteration is not called on TestFail # so we need to process data here in order to have some performance log # even on TestFail self.results['Mbit_per_sec_average'] = (sum(self.results.values()) / len(self.results)) self.write_perf_keyval(self.results) if num_failures > 0: msg = ('%d/%d measured transfer rates under performed ' '(min_speed=%.2fMbit/sec)' % (num_failures, measurements, min_speed)) raise error.TestFail(msg) def get_xfer_rate(src, dst): """Compute transfer rate from src to dst as Mbit/sec Execute a copy from |src| to |dst| and returns the file copy transfer rate in Mbit/sec @param src, dst: paths for source and destination @return trasfer rate (float) in Mbit/sec """ assert os.path.isfile(src) assert os.path.isfile(dst) base_utils.drop_caches() start = datetime.datetime.now() base_utils.force_copy(src, dst) end = datetime.datetime.now() delta = end - start # compute seconds (as float) from microsecs delta_secs = delta.seconds + (delta.microseconds/USECS_IN_SEC) # compute Mbit from bytes size_Mbit = (os.path.getsize(src)*8.0)/(1000*1000) logging.info('file trasferred: size (Mbits): %f, start: %f, end: %d,' ' delta (secs): %f', size_Mbit, start.second+start.microsecond/USECS_IN_SEC, end.second+end.microsecond/USECS_IN_SEC, delta_secs) # return the xfer rate in Mbits/secs having bytes/microsec return size_Mbit / delta_secs