# Copyright 2015 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import collections
import logging
import multiprocessing
import sys
import time
from autotest_lib.client.common_lib import error
from autotest_lib.client.common_lib.cros.network import xmlrpc_datatypes
from autotest_lib.server.cros.network import connection_worker
"""DUT Control module is used to control all the DUT's in a Clique set.
We need to execute a sequence of steps on each DUT in the pool parallely and
collect the results from all the executions.
Class Hierarchy:
----------------
CliqueDUTControl
|
-------------------------------------------------------
| |
CliqueDUTRole CliqueDUTBatch
| |
------------------------------------- ---------------------
| | | |
DUTRoleConnectDisconnect DUTRoleFileTransfer CliqueDUTSet CliqueDUTPool
CliqueDUTControl - Base control class. Stores and retrieves test params used
for all control operations. Should never be directly instantiated.
CliqueDUTRole - Used to control one single DUT in the test. This is a base class
which should be derived to define a role to be performed by the DUT. Should
never be directly instantiated.
CliqueDUTBatch - Used to control a batch of DUT in the test. It could
either be controlling a DUT set or an entire DUT pool. Implements the setup,
cleanup and execute functions which spawn off multiple threads to
control the execution of each step in the objects controlled. Should
never be directly instantiated.
CliqueDUTSet - Used to control a set within the DUT pool. It has a number of
CliqueDUTRole objects to control.
CliqueDUTPool - Used to control the entire DUT pool. It has a number of
CliqueDUTSet objects to control.
"""
# Dummy result error reason to be used when exception is encountered in a role.
ROLE_SETUP_EXCEPTION = "Role Setup Exception! "
ROLE_EXECUTE_EXCEPTION = "Role Execute Exception! "
ROLE_CLEANUP_EXCEPTION = "Role Teardown Exception! "
# Dummy result error reason to be used when exception is encountered in a role.
POOL_SETUP_EXCEPTION = "Pool Setup Exception! "
POOL_CLEANUP_EXCEPTION = "Pool Teardown Exception! "
# Result to returned after execution a sequence of steps.
ControlResult = collections.namedtuple(
'ControlResult', [ 'uid', 'run_num', 'success',
'error_reason', 'start_time', 'end_time' ])
class CliqueDUTUnknownParamError(error.TestError):
"""Indicates an error in finding a required param from the |test_params|."""
pass
class CliqueControl(object):
"""CliqueControl is a base class which is used to control the DUT's in the
test. Not to be directly instantiated.
"""
def __init__(self, dut_objs, assoc_params=None, conn_worker=None,
test_params=None, uid=""):
"""Initialize.
@param dut_objs: A list of objects that is being controlled by this
control object.
@param assoc_params: Association paramters to be used for this control
object.
@param conn_worker: ConnectionWorkerAbstract object, to run extra
work after successful connection.
@param test_params: A dictionary of params to be used for executing the
test.
@param uid: UID of this instance of the object. Host name for DUTRole
objects, Instance name for DUTBatch objects.
"""
self._dut_objs = dut_objs
self._test_params = test_params
self._assoc_params = assoc_params
self._conn_worker = conn_worker
self._uid = uid
def find_param(self, param_key):
"""Find the relevant param value for a role from internal dictionary.
@param param_key: Look for the value of param_key in the dict.
@raises CliqueDUTUnknownParamError if there is an error in lookup.
"""
if not self._test_params.has_key(param_key):
raise CliqueDUTUnknownParamError("Param %s not found in %s" %
(param_key, self._test_params))
return self._test_params.get(param_key)
@property
def dut_objs(self):
"""Returns the dut_objs controlled by the object."""
return self._dut_objs
@property
def dut_obj(self):
"""Returns the first dut_obj controlled by the object."""
return self._dut_objs[0]
@property
def uid(self):
"""Returns a unique identifier associated with this object. It could
be just the hostname of the DUT in DUTRole objects or
set-number/pool-number in DUTSet DUTPool objects.
"""
return self._uid
@property
def assoc_params(self):
"""Returns the association params corresponding to the object."""
return self._assoc_params
@property
def conn_worker(self):
"""Returns the connection worker corresponding to the object."""
return self._conn_worker
def setup(self, run_num):
"""Setup the DUT/DUT-set in the correct state before the sequence of
actions to be taken for the role is executed.
@param run_num: Run number of this execution.
@returns: An instance of ControlResult corresponding to all the errors
that were returned by the DUT/DUT's in the DUT-set which
is being controlled.
"""
pass
def cleanup(self, run_num):
"""Cleanup the DUT/DUT-set state after the sequence of actions to be
taken for the role is executed.
@param run_num: Run number of this execution.
@returns: An instance of ControlResult corresponding to all the errors
that were returned by the DUT/DUT's in the DUT-set which
is being controlled.
"""
pass
def execute(self, run_num):
"""Execute the sequence of actions to be taken for the role on the DUT
/DUT-set.
@param run_num: Run number of this execution.
@returns: An instance of ControlResult corresponding to all the errors
that were returned by the DUT/DUT's in the DUT-set which
is being controlled.
"""
pass
class CliqueDUTRole(CliqueControl):
"""CliqueDUTRole is a base class which defines the role entrusted to each
DUT in the Clique Test. Not to be directly instantiated.
"""
def __init__(self, dut, assoc_params=None, conn_worker=None,
test_params=None):
"""Initialize.
@param dut: A DUTObject representing a DUT in the set.
@param assoc_params: Association paramters to be used for this role.
@param conn_worker: ConnectionWorkerAbstract object, to run extra
work after successful connection.
@param test_params: A dictionary of params to be used for executing the
test.
"""
super(CliqueDUTRole, self).__init__(
dut_objs=[dut], assoc_params=assoc_params,
conn_worker=conn_worker, test_params=test_params,
uid=dut.host.hostname)
def setup(self, run_num):
try:
assoc_params = self.assoc_params
self.dut_obj.wifi_client.shill.disconnect(assoc_params.ssid)
if not self.dut_obj.wifi_client.shill.init_test_network_state():
result = ControlResult(uid=self.uid,
run_num=run_num,
success=False,
error_reason="Failed to set up isolated "
"test context profile.",
start_time="",
end_time="")
return result
else:
return None
except Exception as e:
result = ControlResult(uid=self.uid,
run_num=run_num,
success=False,
error_reason=ROLE_SETUP_EXCEPTION + str(e),
start_time="",
end_time="")
return result
def cleanup(self, run_num):
try:
self.dut_obj.wifi_client.shill.clean_profiles()
return None
except Exception as e:
result = ControlResult(uid=self.uid,
run_num=run_num,
success=False,
error_reason=ROLE_CLEANUP_EXCEPTION + str(e),
start_time="",
end_time="")
return result
def _connect_wifi(self, run_num):
"""Helper function to make a connection to the associated AP."""
assoc_params = self.assoc_params
logging.info('Connection attempt %d', run_num)
self.dut_obj.host.syslog('Connection attempt %d' % run_num)
start_time = self.dut_obj.host.run("date '+%FT%T.%N%:z'").stdout
start_time = start_time.strip()
assoc_result = xmlrpc_datatypes.deserialize(
self.dut_obj.wifi_client.shill.connect_wifi(assoc_params))
end_time = self.dut_obj.host.run("date '+%FT%T.%N%:z'").stdout
end_time = end_time.strip()
success = assoc_result.success
if not success:
logging.error('Connection attempt %d failed; reason: %s',
run_num, assoc_result.failure_reason)
result = ControlResult(uid=self.uid,
run_num=run_num,
success=success,
error_reason=assoc_result.failure_reason,
start_time=start_time,
end_time=end_time)
return result
else:
logging.info('Connection attempt %d passed', run_num)
return None
def _disconnect_wifi(self):
"""Helper function to disconnect from the associated AP."""
assoc_params = self.assoc_params
self.dut_obj.wifi_client.shill.disconnect(assoc_params.ssid)
# todo(rpius): Move these role implementations to a separate file since we'll
# end up having a lot of roles defined.
class DUTRoleConnectDisconnect(CliqueDUTRole):
"""DUTRoleConnectDisconnect is used to make a DUT connect and disconnect
to a given AP repeatedly.
"""
def execute(self, run_num):
try:
result = self._connect_wifi(run_num)
if result:
return result
# Now disconnect from the AP.
self._disconnect_wifi()
return None
except Exception as e:
result = ControlResult(uid=self.uid,
run_num=run_num,
success=False,
error_reason=ROLE_EXECUTE_EXCEPTION + str(e),
start_time="",
end_time="")
return result
class DUTRoleConnectDuration(CliqueDUTRole):
"""DUTRoleConnectDuration is used to make a DUT connect to a given AP and
then check the liveness of the connection from another worker device.
"""
def setup(self, run_num):
result = super(DUTRoleConnectDuration, self).setup(run_num)
if result:
return result
# Let's check for the worker client now.
if not self.conn_worker:
return ControlResult(uid=self.uid,
run_num=run_num,
success=False,
error_reason="No connection worker found",
start_time="",
end_time="")
def execute(self, run_num):
try:
result = self._connect_wifi(run_num)
if result:
return result
# Let's start the ping from the worker client.
worker = connection_worker.ConnectionDuration.create_from_parent(
self.conn_worker)
worker.run(self.dut_obj.wifi_client)
return None
except Exception as e:
result = ControlResult(uid=self.uid,
run_num=run_num,
success=False,
error_reason=ROLE_EXECUTE_EXCEPTION + str(e),
start_time="",
end_time="")
return result
def dut_batch_worker(dut_control_obj, method, error_results_queue, run_num):
"""The method called by multiprocessing worker pool for running the DUT
control object's setup/execute/cleanup methods. This function is the
function which is repeatedly scheduled for each DUT/DUT-set through the
multiprocessing worker. This has to be defined outside the class because it
needs to be pickleable.
@param dut_control_obj: An object corresponding to DUT/DUT-set to control.
@param method: Method name to be invoked on the dut_control_obj.
it has to be one of setup/execute/teardown.
@param error_results_queue: Queue to put the error results after test.
@param run_num: Run number of this execution.
"""
logging.info("%s: Running %s", dut_control_obj.uid, method)
run_method = getattr(dut_control_obj, method, None)
if callable(run_method):
result = run_method(run_num)
if result:
error_results_queue.put(result)
class CliqueDUTBatch(CliqueControl):
"""CliqueDUTBatch is a base class which is used to control a batch of DUTs.
This could either be a DUT set or the entire DUT pool. Not to be directly
instantiated.
"""
# Used to store the instance number of derived classes.
BATCH_UID_NUM = {}
def __init__(self, dut_objs, test_params=None):
"""Initialize.
@param dut_objs: A list of DUTRole objects representing the DUTs in set.
@param test_params: A dictionary of params to be used for executing the
test.
"""
uid_num = self.BATCH_UID_NUM.get(self.__class__.__name__, 1)
uid = self.__class__.__name__ + str(uid_num)
self.BATCH_UID_NUM[self.__class__.__name__] = uid_num + 1
super(CliqueDUTBatch, self).__init__(
dut_objs=dut_objs, test_params=test_params, uid=uid)
def _spawn_worker_threads(self, method, run_num):
"""Spawns multiple threads to run the the |method(run_num)| on all the
control objects in parallel.
@param method: Method to be invoked on the dut_objs.
@param run_num: Run number of this execution.
@returns: An instance of ControlResult corresponding to all the errors
that were returned by the DUT/DUT's in the DUT-set which
is being controlled.
"""
tasks = []
error_results_queue = multiprocessing.Queue()
for dut_obj in self.dut_objs:
task = multiprocessing.Process(
target=dut_batch_worker,
args=(dut_obj, method, error_results_queue, run_num))
tasks.append(task)
# Run the tasks in parallel.
for task in tasks:
task.start()
for task in tasks:
task.join()
error_results = []
while not error_results_queue.empty():
result = error_results_queue.get()
# error_results returned at the DUT set level will be a list of
# ControlResult objects from each of the DUTs in the set.
# error_results returned at the DUT pool level will be a list of
# lists from each DUT set. Let's flatten out the list in that case
# since there could be ControlResult objects that are generated at
# the pool or set level which will make the final error result list
# assymetric where some elements are lists of ControlResult objects
# and some are just ControlResult objects.
if isinstance(result, list):
error_results.extend(result)
else:
error_results.append(result)
return error_results
def setup(self, run_num):
"""Setup the DUT-set/pool in the correct state before the sequence of
actions to be taken for the role is executed.
@param run_num: Run number of this execution.
@returns: An instance of ControlResult corresponding to all the errors
that were returned by the DUT/DUT's in the DUT-set which
is being controlled.
"""
return self._spawn_worker_threads("setup", run_num)
def cleanup(self, run_num):
"""Cleanup the DUT-set/pool state after the sequence of actions to be
taken for the role is executed.
@param run_num: Run number of this execution.
@returns: An instance of ControlResult corresponding to all the errors
that were returned by the DUT/DUT's in the DUT-set which
is being controlled.
"""
return self._spawn_worker_threads("cleanup", run_num)
def execute(self, run_num):
"""Execute the sequence of actions to be taken for the role on the
DUT-set/pool.
@param run_num: Run number of this execution.
@returns: An instance of ControlResult corresponding to all the errors
that were returned by the DUT/DUT's in the DUT-set which
is being controlled.
"""
return self._spawn_worker_threads("execute", run_num)
class CliqueDUTSet(CliqueDUTBatch):
"""CliqueDUTSet is an object which is used to control all the DUT's in a DUT
set.
"""
def setup(self, run_num):
# Placeholder to add any set specific actions.
return super(CliqueDUTSet, self).setup(run_num)
def cleanup(self, run_num):
# Placeholder to add any set specific actions.
return super(CliqueDUTSet, self).cleanup(run_num)
def execute(self, run_num):
# Placeholder to add any set specific actions.
return super(CliqueDUTSet, self).execute(run_num)
class CliqueDUTPool(CliqueDUTBatch):
"""CliqueDUTSet is an object which is used to control all the DUT-sets in a
DUT pool.
"""
def setup(self, run_num):
# Let's start the packet capture before we kick off the entire pool
# execution.
try:
capturer = self.find_param('capturer')
capturer_frequency = self.find_param('capturer_frequency')
capturer_ht_type = self.find_param('capturer_ht_type')
capturer.start_capture(capturer_frequency, ht_type=capturer_ht_type)
except Exception as e:
result = ControlResult(uid=self.uid,
run_num=run_num,
success=False,
error_reason=POOL_SETUP_EXCEPTION + str(e),
start_time="",
end_time="")
# We cannot proceed with the test if this failed.
return result
# Now execute the setup on all the DUT-sets.
return super(CliqueDUTPool, self).setup(run_num)
def cleanup(self, run_num):
# First execute the cleanup on all the DUT-sets.
results = super(CliqueDUTPool, self).cleanup(run_num)
# Now stop the packet capture.
try:
capturer = self.find_param('capturer')
filename = str('connect_try_%d.trc' % (run_num)),
capturer.stop_capture(save_dir=self.outputdir,
save_filename=filename)
except Exception as e:
result = ControlResult(uid=self.uid,
run_num=run_num,
success=False,
error_reason=POOL_CLEANUP_EXCEPTION + str(e),
start_time="",
end_time="")
if results:
results.append(result)
else:
results = result
return results
def execute(self, run_num):
# Placeholder to add any pool specific actions.
return super(CliqueDUTPool, self).execute(run_num)
def execute_dut_pool(dut_pool, dut_role_classes, assoc_params_list,
conn_workers, test_params, num_runs=1):
"""Controls the DUT's in a given test scenario. The DUT's are assigned a
role according to the dut_role_classes provided for each DUT-set and all of
the sequence of steps are executed parallely on all the DUT's in the pool.
@param dut_pool: 2D list of DUT objects corresponding to the DUT's in the
DUT pool.
@param dut_role_classes: List of roles to be assigned to each set in the DUT
pool. Each element has to be a derived class of
CliqueDUTRole.
@param assoc_params_list: List of association parameters corrresponding
to the AP to test against for each set in the
DUT.
@param conn_workers: List of ConnectionWorkerAbstract objects, to
run extra work after successful connection.
@param test_params: List of params to be used for the test.
@num_runs: Number of iterations of the test to be run.
"""
# Every DUT set in the pool needs to have a corresponding DUT role,
# association parameters and connection worker assigned from the test.
# It is the responsibilty of the test scenario to make sure that there is a
# one to one mapping of all these elements since DUT control is going to
# be generic.
# This might mean that the test needs to duplicate the association
# parameters in the list if there is only 1 AP and 2 DUT sets.
# Or if there is no connection worker required, then the test should create
# a list of 'None' objects with length of 2.
# DUT control does not care if the same AP is used for 2 DUT sets or if the
# same connection worker is shared across 2 DUT sets as long as the
# length of the lists are equal.
if ((len(dut_pool) != len(dut_role_classes)) or
(len(dut_pool) != len(assoc_params_list)) or
(len(dut_pool) != len(conn_workers))):
raise error.TestError("Incorrect test configuration. Num DUT sets: %d, "
"Num DUT roles: %d, Num association params: %d, "
"Num connection workers: %d" %
(len(dut_pool), len(dut_role_classes),
len(assoc_params_list), len(conn_workers)))
dut_set_control_objs = []
for dut_set, dut_role_class, assoc_params, conn_worker in \
zip(dut_pool, dut_role_classes, assoc_params_list, conn_workers):
dut_control_objs = []
for dut in dut_set:
dut_control_obj = dut_role_class(
dut, assoc_params, conn_worker, test_params)
dut_control_objs.append(dut_control_obj)
dut_set_control_obj = CliqueDUTSet(dut_control_objs, test_params)
dut_set_control_objs.append(dut_set_control_obj)
dut_pool_control_obj = CliqueDUTPool(dut_set_control_objs, test_params)
for run_num in range(0, num_runs):
# This setup, execute, cleanup calls on pool object, results in parallel
# invocation of call on all the DUT-sets which in turn results in
# parallel invocation of call on all the DUTs.
error_results = dut_pool_control_obj.setup(run_num)
if error_results:
return error_results
error_results = dut_pool_control_obj.execute(run_num)
if error_results:
# Try to cleanup before we leave.
dut_pool_control_obj.cleanup(run_num)
return error_results
error_results = dut_pool_control_obj.cleanup(run_num)
if error_results:
return error_results
return None