普通文本  |  594行  |  24.27 KB

# Copyright 2015 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

import collections
import logging
import multiprocessing
import sys
import time

from autotest_lib.client.common_lib import error
from autotest_lib.client.common_lib.cros.network import xmlrpc_datatypes
from autotest_lib.server.cros.network import connection_worker

"""DUT Control module is used to control all the DUT's in a Clique set.
We need to execute a sequence of steps on each DUT in the pool parallely and
collect the results from all the executions.

Class Hierarchy:
----------------
                                CliqueDUTControl
                                        |
            -------------------------------------------------------
            |                                                      |
        CliqueDUTRole                                          CliqueDUTBatch
            |                                                      |
   -------------------------------------               ---------------------
   |                                   |               |                   |
 DUTRoleConnectDisconnect DUTRoleFileTransfer     CliqueDUTSet     CliqueDUTPool

CliqueDUTControl - Base control class. Stores and retrieves test params used
for all control operations. Should never be directly instantiated.

CliqueDUTRole - Used to control one single DUT in the test. This is a base class
which should be derived to define a role to be performed by the DUT. Should
never be directly instantiated.

CliqueDUTBatch - Used to control a batch of DUT in the test. It could
either be controlling a DUT set or an entire DUT pool. Implements the setup,
cleanup and execute functions which spawn off multiple threads to
control the execution of each step in the objects controlled. Should
never be directly instantiated.

CliqueDUTSet - Used to control a set within the DUT pool. It has a number of
CliqueDUTRole objects to control.

CliqueDUTPool - Used to control the entire DUT pool. It has a number of
CliqueDUTSet objects to control.
"""


# Dummy result error reason to be used when exception is encountered in a role.
ROLE_SETUP_EXCEPTION = "Role Setup Exception! "
ROLE_EXECUTE_EXCEPTION = "Role Execute Exception! "
ROLE_CLEANUP_EXCEPTION = "Role Teardown Exception! "

# Dummy result error reason to be used when exception is encountered in a role.
POOL_SETUP_EXCEPTION = "Pool Setup Exception! "
POOL_CLEANUP_EXCEPTION = "Pool Teardown Exception! "

# Result to returned after execution a sequence of steps.
ControlResult = collections.namedtuple(
        'ControlResult', [ 'uid', 'run_num', 'success',
                           'error_reason', 'start_time', 'end_time' ])

class CliqueDUTUnknownParamError(error.TestError):
    """Indicates an error in finding a required param from the |test_params|."""
    pass


class CliqueControl(object):
    """CliqueControl is a base class which is used to control the DUT's in the
    test. Not to be directly instantiated.
    """

    def __init__(self, dut_objs, assoc_params=None, conn_worker=None,
                 test_params=None, uid=""):
        """Initialize.

        @param dut_objs: A list of objects that is being controlled by this
                         control object.
        @param assoc_params: Association paramters to be used for this control
                             object.
        @param conn_worker: ConnectionWorkerAbstract object, to run extra
                            work after successful connection.
        @param test_params: A dictionary of params to be used for executing the
                            test.
        @param uid: UID of this instance of the object. Host name for DUTRole
                    objects, Instance name for DUTBatch objects.
        """
        self._dut_objs = dut_objs
        self._test_params = test_params
        self._assoc_params = assoc_params
        self._conn_worker = conn_worker
        self._uid = uid

    def find_param(self, param_key):
        """Find the relevant param value for a role from internal dictionary.

        @param param_key: Look for the value of param_key in the dict.

        @raises CliqueDUTUnknownParamError if there is an error in lookup.
        """
        if not self._test_params.has_key(param_key):
            raise CliqueDUTUnknownParamError("Param %s not found in %s" %
                                             (param_key, self._test_params))
        return self._test_params.get(param_key)

    @property
    def dut_objs(self):
        """Returns the dut_objs controlled by the object."""
        return self._dut_objs

    @property
    def dut_obj(self):
        """Returns the first dut_obj controlled by the object."""
        return self._dut_objs[0]

    @property
    def uid(self):
        """Returns a unique identifier associated with this object. It could
        be just the hostname of the DUT in DUTRole objects or
        set-number/pool-number in DUTSet DUTPool objects.
        """
        return self._uid

    @property
    def assoc_params(self):
        """Returns the association params corresponding to the object."""
        return self._assoc_params

    @property
    def conn_worker(self):
        """Returns the connection worker corresponding to the object."""
        return self._conn_worker


    def setup(self, run_num):
        """Setup the DUT/DUT-set in the correct state before the sequence of
        actions to be taken for the role is executed.

        @param run_num: Run number of this execution.

        @returns: An instance of ControlResult corresponding to all the errors
                  that were returned by the DUT/DUT's in the DUT-set which
                  is being controlled.
        """
        pass

    def cleanup(self, run_num):
        """Cleanup the DUT/DUT-set state after the sequence of actions to be
        taken for the role is executed.

        @param run_num: Run number of this execution.

        @returns: An instance of ControlResult corresponding to all the errors
                  that were returned by the DUT/DUT's in the DUT-set which
                  is being controlled.
        """
        pass

    def execute(self, run_num):
        """Execute the sequence of actions to be taken for the role on the DUT
        /DUT-set.

        @param run_num: Run number of this execution.

        @returns: An instance of ControlResult corresponding to all the errors
                  that were returned by the DUT/DUT's in the DUT-set which
                  is being controlled.

        """
        pass


class CliqueDUTRole(CliqueControl):
    """CliqueDUTRole is a base class which defines the role entrusted to each
    DUT in the Clique Test. Not to be directly instantiated.
    """

    def __init__(self, dut, assoc_params=None, conn_worker=None,
                 test_params=None):
        """Initialize.

        @param dut: A DUTObject representing a DUT in the set.
        @param assoc_params: Association paramters to be used for this role.
        @param conn_worker: ConnectionWorkerAbstract object, to run extra
                            work after successful connection.
        @param test_params: A dictionary of params to be used for executing the
                            test.
        """
        super(CliqueDUTRole, self).__init__(
                dut_objs=[dut], assoc_params=assoc_params,
                conn_worker=conn_worker, test_params=test_params,
                uid=dut.host.hostname)

    def setup(self, run_num):
        try:
            assoc_params = self.assoc_params
            self.dut_obj.wifi_client.shill.disconnect(assoc_params.ssid)
            if not self.dut_obj.wifi_client.shill.init_test_network_state():
                result = ControlResult(uid=self.uid,
                                       run_num=run_num,
                                       success=False,
                                       error_reason="Failed to set up isolated "
                                                    "test context profile.",
                                       start_time="",
                                       end_time="")
                return result
            else:
                return None
        except Exception as e:
            result = ControlResult(uid=self.uid,
                                   run_num=run_num,
                                   success=False,
                                   error_reason=ROLE_SETUP_EXCEPTION + str(e),
                                   start_time="",
                                   end_time="")
            return result

    def cleanup(self, run_num):
        try:
            self.dut_obj.wifi_client.shill.clean_profiles()
            return None
        except Exception as e:
            result = ControlResult(uid=self.uid,
                                   run_num=run_num,
                                   success=False,
                                   error_reason=ROLE_CLEANUP_EXCEPTION + str(e),
                                   start_time="",
                                   end_time="")
            return result

    def _connect_wifi(self, run_num):
        """Helper function to make a connection to the associated AP."""
        assoc_params = self.assoc_params
        logging.info('Connection attempt %d', run_num)
        self.dut_obj.host.syslog('Connection attempt %d' % run_num)
        start_time = self.dut_obj.host.run("date '+%FT%T.%N%:z'").stdout
        start_time = start_time.strip()
        assoc_result = xmlrpc_datatypes.deserialize(
            self.dut_obj.wifi_client.shill.connect_wifi(assoc_params))
        end_time = self.dut_obj.host.run("date '+%FT%T.%N%:z'").stdout
        end_time = end_time.strip()
        success = assoc_result.success
        if not success:
            logging.error('Connection attempt %d failed; reason: %s',
                          run_num, assoc_result.failure_reason)
            result = ControlResult(uid=self.uid,
                                   run_num=run_num,
                                   success=success,
                                   error_reason=assoc_result.failure_reason,
                                   start_time=start_time,
                                   end_time=end_time)
            return result
        else:
            logging.info('Connection attempt %d passed', run_num)
            return None

    def _disconnect_wifi(self):
        """Helper function to disconnect from the associated AP."""
        assoc_params = self.assoc_params
        self.dut_obj.wifi_client.shill.disconnect(assoc_params.ssid)


# todo(rpius): Move these role implementations to a separate file since we'll
# end up having a lot of roles defined.
class DUTRoleConnectDisconnect(CliqueDUTRole):
    """DUTRoleConnectDisconnect is used to make a DUT connect and disconnect
    to a given AP repeatedly.
    """

    def execute(self, run_num):
        try:
            result = self._connect_wifi(run_num)
            if result:
                return result

            # Now disconnect from the AP.
            self._disconnect_wifi()

            return None
        except Exception as e:
            result = ControlResult(uid=self.uid,
                                   run_num=run_num,
                                   success=False,
                                   error_reason=ROLE_EXECUTE_EXCEPTION + str(e),
                                   start_time="",
                                   end_time="")
            return result


class DUTRoleConnectDuration(CliqueDUTRole):
    """DUTRoleConnectDuration is used to make a DUT connect to a given AP and
    then check the liveness of the connection from another worker device.
    """

    def setup(self, run_num):
        result = super(DUTRoleConnectDuration, self).setup(run_num)
        if result:
            return result
        # Let's check for the worker client now.
        if not self.conn_worker:
            return ControlResult(uid=self.uid,
                                 run_num=run_num,
                                 success=False,
                                 error_reason="No connection worker found",
                                 start_time="",
                                 end_time="")

    def execute(self, run_num):
        try:
            result = self._connect_wifi(run_num)
            if result:
                return result

            # Let's start the ping from the worker client.
            worker = connection_worker.ConnectionDuration.create_from_parent(
                    self.conn_worker)
            worker.run(self.dut_obj.wifi_client)

            return None
        except Exception as e:
            result = ControlResult(uid=self.uid,
                                   run_num=run_num,
                                   success=False,
                                   error_reason=ROLE_EXECUTE_EXCEPTION + str(e),
                                   start_time="",
                                   end_time="")
            return result


def dut_batch_worker(dut_control_obj, method, error_results_queue, run_num):
    """The method called by multiprocessing worker pool for running the DUT
    control object's setup/execute/cleanup methods. This function is the
    function which is repeatedly scheduled for each DUT/DUT-set through the
    multiprocessing worker. This has to be defined outside the class because it
    needs to be pickleable.

    @param dut_control_obj: An object corresponding to DUT/DUT-set to control.
    @param method: Method name to be invoked on the dut_control_obj.
                   it has to be one of setup/execute/teardown.
    @param error_results_queue: Queue to put the error results after test.
    @param run_num: Run number of this execution.
    """
    logging.info("%s: Running %s", dut_control_obj.uid, method)
    run_method = getattr(dut_control_obj, method, None)
    if callable(run_method):
        result = run_method(run_num)
        if result:
            error_results_queue.put(result)


class CliqueDUTBatch(CliqueControl):
    """CliqueDUTBatch is a base class which is used to control a batch of DUTs.
    This could either be a DUT set or the entire DUT pool. Not to be directly
    instantiated.
    """
    # Used to store the instance number of derived classes.
    BATCH_UID_NUM = {}

    def __init__(self, dut_objs, test_params=None):
        """Initialize.

        @param dut_objs: A list of DUTRole objects representing the DUTs in set.
        @param test_params: A dictionary of params to be used for executing the
                            test.
        """
        uid_num = self.BATCH_UID_NUM.get(self.__class__.__name__, 1)
        uid = self.__class__.__name__ + str(uid_num)
        self.BATCH_UID_NUM[self.__class__.__name__] = uid_num + 1
        super(CliqueDUTBatch, self).__init__(
                dut_objs=dut_objs, test_params=test_params, uid=uid)

    def _spawn_worker_threads(self, method, run_num):
        """Spawns multiple threads to run the the |method(run_num)| on all the
        control objects in parallel.

        @param method: Method to be invoked on the dut_objs.
        @param run_num: Run number of this execution.

        @returns: An instance of ControlResult corresponding to all the errors
                  that were returned by the DUT/DUT's in the DUT-set which
                  is being controlled.
        """
        tasks = []
        error_results_queue = multiprocessing.Queue()
        for dut_obj in self.dut_objs:
            task = multiprocessing.Process(
                    target=dut_batch_worker,
                    args=(dut_obj, method, error_results_queue, run_num))
            tasks.append(task)
        # Run the tasks in parallel.
        for task in tasks:
            task.start()
        for task in tasks:
            task.join()
        error_results = []
        while not error_results_queue.empty():
            result = error_results_queue.get()
            # error_results returned at the DUT set level will be a list of
            # ControlResult objects from each of the DUTs in the set.
            # error_results returned at the DUT pool level will be a list of
            # lists from each DUT set. Let's flatten out the list in that case
            # since there could be ControlResult objects that are generated at
            # the pool or set level which will make the final error result list
            # assymetric where some elements are lists of ControlResult objects
            # and some are just ControlResult objects.
            if isinstance(result, list):
                error_results.extend(result)
            else:
                error_results.append(result)
        return error_results

    def setup(self, run_num):
        """Setup the DUT-set/pool in the correct state before the sequence of
        actions to be taken for the role is executed.

        @param run_num: Run number of this execution.

        @returns: An instance of ControlResult corresponding to all the errors
                  that were returned by the DUT/DUT's in the DUT-set which
                  is being controlled.
        """
        return self._spawn_worker_threads("setup", run_num)

    def cleanup(self, run_num):
        """Cleanup the DUT-set/pool state after the sequence of actions to be
        taken for the role is executed.

        @param run_num: Run number of this execution.

        @returns: An instance of ControlResult corresponding to all the errors
                  that were returned by the DUT/DUT's in the DUT-set which
                  is being controlled.
        """
        return self._spawn_worker_threads("cleanup", run_num)

    def execute(self, run_num):
        """Execute the sequence of actions to be taken for the role on the
        DUT-set/pool.

        @param run_num: Run number of this execution.

        @returns: An instance of ControlResult corresponding to all the errors
                  that were returned by the DUT/DUT's in the DUT-set which
                  is being controlled.

        """
        return self._spawn_worker_threads("execute", run_num)


class CliqueDUTSet(CliqueDUTBatch):
    """CliqueDUTSet is an object which is used to control all the DUT's in a DUT
    set.
    """
    def setup(self, run_num):
        # Placeholder to add any set specific actions.
        return super(CliqueDUTSet, self).setup(run_num)

    def cleanup(self, run_num):
        # Placeholder to add any set specific actions.
        return super(CliqueDUTSet, self).cleanup(run_num)

    def execute(self, run_num):
        # Placeholder to add any set specific actions.
        return super(CliqueDUTSet, self).execute(run_num)


class CliqueDUTPool(CliqueDUTBatch):
    """CliqueDUTSet is an object which is used to control all the DUT-sets in a
    DUT pool.
    """

    def setup(self, run_num):
        # Let's start the packet capture before we kick off the entire pool
        # execution.
        try:
            capturer = self.find_param('capturer')
            capturer_frequency = self.find_param('capturer_frequency')
            capturer_ht_type = self.find_param('capturer_ht_type')
            capturer.start_capture(capturer_frequency, ht_type=capturer_ht_type)
        except Exception as e:
            result = ControlResult(uid=self.uid,
                                   run_num=run_num,
                                   success=False,
                                   error_reason=POOL_SETUP_EXCEPTION + str(e),
                                   start_time="",
                                   end_time="")
            # We cannot proceed with the test if this failed.
            return result
        # Now execute the setup on all the DUT-sets.
        return super(CliqueDUTPool, self).setup(run_num)

    def cleanup(self, run_num):
        # First execute the cleanup on all the DUT-sets.
        results = super(CliqueDUTPool, self).cleanup(run_num)
        # Now stop the packet capture.
        try:
            capturer = self.find_param('capturer')
            filename = str('connect_try_%d.trc' % (run_num)),
            capturer.stop_capture(save_dir=self.outputdir,
                                  save_filename=filename)
        except Exception as e:
            result = ControlResult(uid=self.uid,
                                   run_num=run_num,
                                   success=False,
                                   error_reason=POOL_CLEANUP_EXCEPTION + str(e),
                                   start_time="",
                                   end_time="")
            if results:
                results.append(result)
            else:
                results = result
        return results

    def execute(self, run_num):
        # Placeholder to add any pool specific actions.
        return super(CliqueDUTPool, self).execute(run_num)


def execute_dut_pool(dut_pool, dut_role_classes, assoc_params_list,
                     conn_workers, test_params, num_runs=1):

    """Controls the DUT's in a given test scenario. The DUT's are assigned a
    role according to the dut_role_classes provided for each DUT-set and all of
    the sequence of steps are executed parallely on all the DUT's in the pool.

    @param dut_pool: 2D list of DUT objects corresponding to the DUT's in the
                    DUT pool.
    @param dut_role_classes: List of roles to be assigned to each set in the DUT
                             pool. Each element has to be a derived class of
                             CliqueDUTRole.
    @param assoc_params_list: List of association parameters corrresponding
                              to the AP to test against for each set in the
                              DUT.
    @param conn_workers: List of ConnectionWorkerAbstract objects, to
                         run extra work after successful connection.
    @param test_params: List of params to be used for the test.
    @num_runs: Number of iterations of the test to be run.
    """
    # Every DUT set in the pool needs to have a corresponding DUT role,
    # association parameters and connection worker assigned from the test.
    # It is the responsibilty of the test scenario to make sure that there is a
    # one to one mapping of all these elements since DUT control is going to
    # be generic.
    # This might mean that the test needs to duplicate the association
    # parameters in the list if there is only 1 AP and 2 DUT sets.
    # Or if there is no connection worker required, then the test should create
    # a list of 'None' objects with length of 2.
    # DUT control does not care if the same AP is used for 2 DUT sets or if the
    # same connection worker is shared across 2 DUT sets as long as the
    # length of the lists are equal.

    if ((len(dut_pool) != len(dut_role_classes)) or
        (len(dut_pool) != len(assoc_params_list)) or
        (len(dut_pool) != len(conn_workers))):
        raise error.TestError("Incorrect test configuration. Num DUT sets: %d, "
                              "Num DUT roles: %d, Num association params: %d, "
                              "Num connection workers: %d" %
                              (len(dut_pool), len(dut_role_classes),
                               len(assoc_params_list), len(conn_workers)))

    dut_set_control_objs = []
    for dut_set, dut_role_class, assoc_params, conn_worker in \
        zip(dut_pool, dut_role_classes, assoc_params_list, conn_workers):
        dut_control_objs = []
        for dut in dut_set:
            dut_control_obj = dut_role_class(
                    dut, assoc_params, conn_worker, test_params)
            dut_control_objs.append(dut_control_obj)
        dut_set_control_obj = CliqueDUTSet(dut_control_objs, test_params)
        dut_set_control_objs.append(dut_set_control_obj)
    dut_pool_control_obj = CliqueDUTPool(dut_set_control_objs, test_params)

    for run_num in range(0, num_runs):
        # This setup, execute, cleanup calls on pool object, results in parallel
        # invocation of call on all the DUT-sets which in turn results in
        # parallel invocation of call on all the DUTs.
        error_results = dut_pool_control_obj.setup(run_num)
        if error_results:
            return error_results

        error_results = dut_pool_control_obj.execute(run_num)
        if error_results:
            # Try to cleanup before we leave.
            dut_pool_control_obj.cleanup(run_num)
            return error_results

        error_results = dut_pool_control_obj.cleanup(run_num)
        if error_results:
            return error_results
    return None