普通文本  |  587行  |  22.17 KB

# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

import collections
import logging
import math
import numbers
import time
import os.path

from autotest_lib.client.common_lib import error
from autotest_lib.client.common_lib.cros import path_utils


class NetperfResult(object):
    """Encapsulates logic to parse and represent netperf results."""

    @staticmethod
    def from_netperf_results(test_type, results, duration_seconds):
        """Parse the text output of netperf and return a NetperfResult.

        @param test_type string one of NetperfConfig.TEST_TYPE_* below.
        @param results string raw results from netperf.
        @param duration_seconds float number of seconds the test ran for.
        @return NetperfResult result.

        """
        lines = results.splitlines()
        if test_type in NetperfConfig.TCP_STREAM_TESTS:
            """Parses the following (works for both TCP_STREAM, TCP_MAERTS and
            TCP_SENDFILE) and returns a singleton containing throughput.

            TCP STREAM TEST from 0.0.0.0 (0.0.0.0) port 0 AF_INET to \
            foo.bar.com (10.10.10.3) port 0 AF_INET
            Recv   Send    Send
            Socket Socket  Message  Elapsed
            Size   Size    Size     Time     Throughput
            bytes  bytes   bytes    secs.    10^6bits/sec

            87380  16384  16384    2.00      941.28
            """
            if len(lines) < 7:
                return None

            result = NetperfResult(test_type, duration_seconds,
                                   throughput=float(lines[6].split()[4]))
        elif test_type in NetperfConfig.UDP_STREAM_TESTS:
            """Parses the following and returns a tuple containing throughput
            and the number of errors.

            UDP UNIDIRECTIONAL SEND TEST from 0.0.0.0 (0.0.0.0) port 0 AF_INET \
            to foo.bar.com (10.10.10.3) port 0 AF_INET
            Socket  Message  Elapsed      Messages
            Size    Size     Time         Okay Errors   Throughput
            bytes   bytes    secs            #      #   10^6bits/sec

            129024   65507   2.00         3673      0     961.87
            131072           2.00         3673            961.87
            """
            if len(lines) < 6:
                return None

            udp_tokens = lines[5].split()
            result = NetperfResult(test_type, duration_seconds,
                                   throughput=float(udp_tokens[5]),
                                   errors=float(udp_tokens[4]))
        elif test_type in NetperfConfig.REQUEST_RESPONSE_TESTS:
            """Parses the following which works for both rr (TCP and UDP)
            and crr tests and returns a singleton containing transfer rate.

            TCP REQUEST/RESPONSE TEST from 0.0.0.0 (0.0.0.0) port 0 AF_INET \
            to foo.bar.com (10.10.10.3) port 0 AF_INET
            Local /Remote
            Socket Size   Request  Resp.   Elapsed  Trans.
            Send   Recv   Size     Size    Time     Rate
            bytes  Bytes  bytes    bytes   secs.    per sec

            16384  87380  1        1       2.00     14118.53
            16384  87380
            """
            if len(lines) < 7:
                return None

            result = NetperfResult(test_type, duration_seconds,
                                   transaction_rate=float(lines[6].split()[5]))
        else:
            raise error.TestFail('Invalid netperf test type: %r.' % test_type)

        logging.info('%r', result)
        return result


    @staticmethod
    def _get_stats(samples, field_name):
        if any(map(lambda x: getattr(x, field_name) is None, samples)):
            return (None, None)

        values = map(lambda x: getattr(x, field_name), samples)
        N = len(samples)
        mean = math.fsum(values) / N
        deviation = None
        if N > 1:
            differences = map(lambda x: math.pow(mean - x, 2), values)
            deviation = math.sqrt(math.fsum(differences) / (N - 1))
        return mean, deviation


    @staticmethod
    def from_samples(samples):
        """Build an averaged NetperfResult from |samples|.

        Calculate an representative sample with averaged values
        and standard deviation from samples.

        @param samples list of NetperfResult objects.
        @return NetperfResult object.

        """
        if len(set([x.test_type for x in samples])) != 1:
            # We have either no samples or multiple test types.
            return None

        duration_seconds, duration_seconds_dev = NetperfResult._get_stats(
                samples, 'duration_seconds')
        throughput, throughput_dev = NetperfResult._get_stats(
                samples, 'throughput')
        errors, errors_dev = NetperfResult._get_stats(samples, 'errors')
        transaction_rate, transaction_rate_dev = NetperfResult._get_stats(
                samples, 'transaction_rate')
        return NetperfResult(
                samples[0].test_type,
                duration_seconds, duration_seconds_dev=duration_seconds_dev,
                throughput=throughput, throughput_dev=throughput_dev,
                errors=errors, errors_dev=errors_dev,
                transaction_rate=transaction_rate,
                transaction_rate_dev=transaction_rate_dev)


    @property
    def human_readable_tag(self):
        """@return string human readable test description."""
        return NetperfConfig.test_type_to_human_readable_tag(self.test_type)


    @property
    def tag(self):
        """@return string very short test description."""
        return NetperfConfig.test_type_to_tag(self.test_type)


    def __init__(self, test_type, duration_seconds, duration_seconds_dev=None,
                 throughput=None, throughput_dev=None,
                 errors=None, errors_dev=None,
                 transaction_rate=None, transaction_rate_dev=None):
        """Construct a NetperfResult.

        @param duration_seconds float how long the test took.
        @param throughput float test throughput in Mbps.
        @param errors int number of UDP errors in test.
        @param transaction_rate float transactions per second.

        """
        self.test_type = test_type
        self.duration_seconds = duration_seconds
        self.duration_seconds_dev = duration_seconds_dev
        self.throughput = throughput
        self.throughput_dev = throughput_dev
        self.errors = errors
        self.errors_dev = errors_dev
        self.transaction_rate = transaction_rate
        self.transaction_rate_dev = transaction_rate_dev
        if throughput is None and transaction_rate is None and errors is None:
            logging.error('Created a NetperfResult with no data.')


    def __repr__(self):
        fields = ['test_type=%s' % self.test_type]
        fields += ['%s=%0.2f' % item
                   for item in vars(self).iteritems()
                   if item[1] is not None
                   and isinstance(item[1], numbers.Number)]
        return '%s(%s)' % (self.__class__.__name__, ', '.join(fields))


    def all_deviations_less_than_fraction(self, fraction):
        """Check that this result is "acurate" enough.

        We say that a NetperfResult is "acurate" enough when for each
        measurement X with standard deviation d(X), d(X)/X <= |fraction|.

        @param fraction float used in constraint above.
        @return True on above condition.

        """
        for measurement in ['throughput', 'errors', 'transaction_rate']:
            value = getattr(self, measurement)
            dev = getattr(self, measurement + '_dev')
            if value is None or dev is None:
                continue

            if not dev and not value:
                # 0/0 is undefined, but take this to be good for our purposes.
                continue

            if dev and not value:
                # Deviation is non-zero, but the average is 0.  Deviation
                # as a fraction of the value is undefined but in theory
                # a "very large number."
                return False

            if dev / value > fraction:
                return False

        return True


    def get_keyval(self, prefix='', suffix=''):
        ret = {}
        if prefix:
            prefix = prefix + '_'
        if suffix:
            suffix = '_' + suffix

        for measurement in ['throughput', 'errors', 'transaction_rate']:
            value = getattr(self, measurement)
            dev = getattr(self, measurement + '_dev')
            if dev is None:
                margin = ''
            else:
                margin = '+-%0.2f' % dev
            if value is not None:
                ret[prefix + measurement + suffix] = '%0.2f%s' % (value, margin)
        return ret


class NetperfAssertion(object):
    """Defines a set of expectations for netperf results."""

    def _passes(self, result, field):
        value = getattr(result, field)
        deviation = getattr(result, field + '_dev')
        bounds = getattr(self, field + '_bounds')
        if bounds[0] is None and bounds[1] is None:
            return True

        if value is None:
            # We have bounds requirements, but no value to check?
            return False

        if bounds[0] is not None and bounds[0] > value + deviation:
            return False

        if bounds[1] is not None and bounds[1] < value - deviation:
            return False

        return True


    def __init__(self, duration_seconds_min=None, duration_seconds_max=None,
                 throughput_min=None, throughput_max=None,
                 error_min=None, error_max=None,
                 transaction_rate_min=None, transaction_rate_max=None):
        """Construct a NetperfAssertion.

        Leaving bounds undefined sets them to values which are permissive.

        @param duration_seconds_min float minimal test duration in seconds.
        @param duration_seconds_max float maximal test duration in seconds.
        @param throughput_min float minimal throughput in Mbps.
        @param throughput_max float maximal throughput in Mbps.
        @param error_min int minimal number of UDP frame errors.
        @param error_max int max number of UDP frame errors.
        @param transaction_rate_min float minimal number of transactions
                per second.
        @param transaction_rate_max float max number of transactions per second.

        """
        Bound = collections.namedtuple('Bound', ['lower', 'upper'])
        self.duration_seconds_bounds = Bound(duration_seconds_min,
                                             duration_seconds_max)
        self.throughput_bounds = Bound(throughput_min, throughput_max)
        self.errors_bounds = Bound(error_min, error_max)
        self.transaction_rate_bounds = Bound(transaction_rate_min,
                                             transaction_rate_max)


    def passes(self, result):
        """Check that a result matches the given assertion.

        @param result NetperfResult object produced by a test.
        @return True iff all this assertion passes for the give result.

        """
        passed = [self._passes(result, field)
                  for field in ['duration_seconds', 'throughput',
                                'errors', 'transaction_rate']]
        if all(passed):
            return True

        return False


    def __repr__(self):
        fields = {'duration_seconds_min': self.duration_seconds_bounds.lower,
                  'duration_seconds_max': self.duration_seconds_bounds.upper,
                  'throughput_min': self.throughput_bounds.lower,
                  'throughput_max': self.throughput_bounds.upper,
                  'error_min': self.errors_bounds.lower,
                  'error_max': self.errors_bounds.upper,
                  'transaction_rate_min': self.transaction_rate_bounds.lower,
                  'transaction_rate_max': self.transaction_rate_bounds.upper}
        return '%s(%s)' % (self.__class__.__name__,
                           ', '.join(['%s=%r' % item
                                      for item in fields.iteritems()
                                      if item[1] is not None]))


class NetperfConfig(object):
    """Defines a single netperf run."""

    DEFAULT_TEST_TIME = 10
    # Measures how many times we can connect, request a byte, and receive a
    # byte per second.
    TEST_TYPE_TCP_CRR = 'TCP_CRR'
    # MAERTS is stream backwards.  Measure bitrate of a stream from the netperf
    # server to the client.
    TEST_TYPE_TCP_MAERTS = 'TCP_MAERTS'
    # Measures how many times we can request a byte and receive a byte per
    # second.
    TEST_TYPE_TCP_RR = 'TCP_RR'
    # This is like a TCP_STREAM test except that the netperf client will use
    # a platform dependent call like sendfile() rather than the simple send()
    # call.  This can result in better performance.
    TEST_TYPE_TCP_SENDFILE = 'TCP_SENDFILE'
    # Measures throughput sending bytes from the client to the server in a
    # TCP stream.
    TEST_TYPE_TCP_STREAM = 'TCP_STREAM'
    # Measures how many times we can request a byte from the client and receive
    # a byte from the server.  If any datagram is dropped, the client or server
    # will block indefinitely.  This failure is not evident except as a low
    # transaction rate.
    TEST_TYPE_UDP_RR = 'UDP_RR'
    # Test UDP throughput sending from the client to the server.  There is no
    # flow control here, and generally sending is easier that receiving, so
    # there will be two types of throughput, both receiving and sending.
    TEST_TYPE_UDP_STREAM = 'UDP_STREAM'
    # This isn't a real test type, but we can emulate a UDP stream from the
    # server to the DUT by running the netperf server on the DUT and the
    # client on the server and then doing a UDP_STREAM test.
    TEST_TYPE_UDP_MAERTS = 'UDP_MAERTS'
    # Different kinds of tests have different output formats.
    REQUEST_RESPONSE_TESTS = [ TEST_TYPE_TCP_CRR,
                               TEST_TYPE_TCP_RR,
                               TEST_TYPE_UDP_RR ]
    TCP_STREAM_TESTS = [ TEST_TYPE_TCP_MAERTS,
                         TEST_TYPE_TCP_SENDFILE,
                         TEST_TYPE_TCP_STREAM ]
    UDP_STREAM_TESTS = [ TEST_TYPE_UDP_STREAM,
                         TEST_TYPE_UDP_MAERTS ]

    SHORT_TAGS = { TEST_TYPE_TCP_CRR: 'tcp_crr',
                   TEST_TYPE_TCP_MAERTS: 'tcp_rx',
                   TEST_TYPE_TCP_RR: 'tcp_rr',
                   TEST_TYPE_TCP_SENDFILE: 'tcp_stx',
                   TEST_TYPE_TCP_STREAM: 'tcp_tx',
                   TEST_TYPE_UDP_RR: 'udp_rr',
                   TEST_TYPE_UDP_STREAM: 'udp_tx',
                   TEST_TYPE_UDP_MAERTS: 'udp_rx' }

    READABLE_TAGS = { TEST_TYPE_TCP_CRR: 'tcp_connect_roundtrip_rate',
                      TEST_TYPE_TCP_MAERTS: 'tcp_downstream',
                      TEST_TYPE_TCP_RR: 'tcp_roundtrip_rate',
                      TEST_TYPE_TCP_SENDFILE: 'tcp_upstream_sendfile',
                      TEST_TYPE_TCP_STREAM: 'tcp_upstream',
                      TEST_TYPE_UDP_RR: 'udp_roundtrip',
                      TEST_TYPE_UDP_STREAM: 'udp_upstream',
                      TEST_TYPE_UDP_MAERTS: 'udp_downstream' }


    @staticmethod
    def _assert_is_valid_test_type(test_type):
        """Assert that |test_type| is one of TEST_TYPE_* above.

        @param test_type string test type.

        """
        if (test_type not in NetperfConfig.REQUEST_RESPONSE_TESTS and
            test_type not in NetperfConfig.TCP_STREAM_TESTS and
            test_type not in NetperfConfig.UDP_STREAM_TESTS):
            raise error.TestFail('Invalid netperf test type: %r.' % test_type)


    @staticmethod
    def test_type_to_tag(test_type):
        """Convert a test type to a concise unique tag.

        @param test_type string, one of TEST_TYPE_* above.
        @return string very short test description.

        """
        return NetperfConfig.SHORT_TAGS.get(test_type, 'unknown')


    @staticmethod
    def test_type_to_human_readable_tag(test_type):
        """Convert a test type to a unique human readable tag.

        @param test_type string, one of TEST_TYPE_* above.
        @return string human readable test description.

        """
        return NetperfConfig.READABLE_TAGS.get(test_type, 'unknown')

    @property
    def human_readable_tag(self):
        """@return string human readable test description."""
        return self.test_type_to_human_readable_tag(self.test_type)


    @property
    def netperf_test_type(self):
        """@return string test type suitable for passing to netperf."""
        if self.test_type == self.TEST_TYPE_UDP_MAERTS:
            return self.TEST_TYPE_UDP_STREAM

        return self.test_type


    @property
    def server_serves(self):
        """False iff the server and DUT should switch roles for running netperf.

        @return True iff netserv should be run on server host.  When false
                this indicates that the DUT should run netserv and netperf
                should be run on the server against the client.

        """
        return self.test_type != self.TEST_TYPE_UDP_MAERTS


    @property
    def tag(self):
        """@return string very short test description."""
        return self.test_type_to_tag(self.test_type)


    def __init__(self, test_type, test_time=DEFAULT_TEST_TIME):
        """Construct a NetperfConfig.

        @param test_type string one of TEST_TYPE_* above.
        @param test_time int number of seconds to run the test for.

        """
        self.test_type = test_type
        self.test_time = test_time
        self._assert_is_valid_test_type(self.netperf_test_type)


    def __repr__(self):
        return '%s(test_type=%r, test_time=%r' % (
                self.__class__.__name__,
                self.test_type,
                self.test_time)


class NetperfRunner(object):
    """Delegate to run netperf on a client/server pair."""

    NETPERF_DATA_PORT = 12866
    NETPERF_PORT = 12865
    NETSERV_STARTUP_WAIT_TIME = 3
    NETPERF_COMMAND_TIMEOUT_MARGIN = 120


    def __init__(self, client_proxy, server_proxy, config):
        """Construct a NetperfRunner.

        @param client WiFiClient object.
        @param server LinuxSystem object.

        """
        self._client_proxy = client_proxy
        self._server_proxy = server_proxy
        if config.server_serves:
            self._server_host = server_proxy.host
            self._client_host = client_proxy.host
            self._target_ip = server_proxy.wifi_ip
        else:
            self._server_host = client_proxy.host
            self._client_host = server_proxy.host
            self._target_ip = client_proxy.wifi_ip
        self._command_netserv = path_utils.must_be_installed(
                'netserver', host=self._server_host)
        self._command_netperf = path_utils.must_be_installed(
                'netperf', host=self._client_host)
        self._config = config


    def __enter__(self):
        self._restart_netserv()
        return self


    def __exit__(self, exc_type, exc_value, traceback):
        self._client_proxy.firewall_cleanup()
        self._kill_netserv()


    def _kill_netserv(self):
        """Kills any existing netserv process on the serving host."""
        self._server_host.run('pkill %s' %
                              os.path.basename(self._command_netserv),
                              ignore_status=True)


    def _restart_netserv(self):
        logging.info('Starting netserver...')
        self._kill_netserv()
        self._server_host.run('%s -p %d >/dev/null 2>&1' %
                              (self._command_netserv, self.NETPERF_PORT))
        startup_time = time.time()
        self._client_proxy.firewall_open('tcp', self._server_proxy.wifi_ip)
        self._client_proxy.firewall_open('udp', self._server_proxy.wifi_ip)
        # Wait for the netserv to come up.
        while time.time() - startup_time < self.NETSERV_STARTUP_WAIT_TIME:
            time.sleep(0.1)


    def run(self, ignore_failures=False, retry_count=3):
        """Run netperf and take a performance measurement.

        @param ignore_failures bool True iff netperf runs that fail should be
                ignored.  If this happens, run will return a None value rather
                than a NetperfResult.
        @param retry_count int number of times to retry the netperf command if
                it fails due to an internal timeout within netperf.
        @return NetperfResult summarizing a netperf run.

        """
        netperf = '%s -H %s -p %s -t %s -l %d -- -P 0,%d' % (
                self._command_netperf,
                self._target_ip,
                self.NETPERF_PORT,
                self._config.netperf_test_type,
                self._config.test_time,
                self.NETPERF_DATA_PORT)
        logging.debug('Running netperf client.')
        logging.info('Running netperf for %d seconds.', self._config.test_time)
        timeout = self._config.test_time + self.NETPERF_COMMAND_TIMEOUT_MARGIN
        for attempt in range(retry_count):
            start_time = time.time()
            result = self._client_host.run(netperf, ignore_status=True,
                                           ignore_timeout=ignore_failures,
                                           timeout=timeout)
            if not result:
                logging.info('Retrying netperf after empty result.')
                continue

            # Exit retry loop on success.
            if not result.exit_status:
                break

            # Only retry for known retryable conditions.
            if 'Interrupted system call' in result.stderr:
                logging.info('Retrying netperf after internal timeout error.')
                continue

            if 'establish the control connection' in result.stdout:
                logging.info('Restarting netserv after client failed connect.')
                self._restart_netserv()
                continue

            # We are in an unhandled error case.
            logging.info('Retrying netperf after an unknown error.')

        if result.exit_status and not ignore_failures:
            raise error.CmdError(netperf, result,
                                 "Command returned non-zero exit status")

        duration = time.time() - start_time
        if result is None or result.exit_status:
            return None

        return NetperfResult.from_netperf_results(
                self._config.test_type, result.stdout, duration)