#
# Copyright (C) 2016 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import logging
import os.path
import posixpath as targetpath
import time
from vts.runners.host import asserts
from vts.runners.host import base_test
from vts.runners.host import const
from vts.runners.host import errors
from vts.runners.host import keys
from vts.runners.host import test_runner
from vts.utils.python.common import list_utils
from vts.utils.python.coverage import coverage_utils
from vts.utils.python.os import path_utils
from vts.utils.python.precondition import precondition_utils
from vts.utils.python.web import feature_utils
from vts.testcases.template.binary_test import binary_test_case
DATA_NATIVETEST = 'data/nativetest'
DATA_NATIVETEST64 = '%s64' % DATA_NATIVETEST
class BinaryTest(base_test.BaseTestClass):
'''Base class to run binary tests on target.
Attributes:
_dut: AndroidDevice, the device under test as config
shell: ShellMirrorObject, shell mirror
testcases: list of BinaryTestCase objects, list of test cases to run
tags: all the tags that appeared in binary list
DEVICE_TMP_DIR: string, temp location for storing binary
TAG_DELIMITER: string, separator used to separate tag and path
'''
DEVICE_TMP_DIR = '/data/local/tmp'
TAG_DELIMITER = '::'
PUSH_DELIMITER = '->'
DEFAULT_TAG_32 = '_%s' % const.SUFFIX_32BIT
DEFAULT_TAG_64 = '_%s' % const.SUFFIX_64BIT
DEFAULT_LD_LIBRARY_PATH_32 = '/data/local/tmp/32/'
DEFAULT_LD_LIBRARY_PATH_64 = '/data/local/tmp/64/'
DEFAULT_PROFILING_LIBRARY_PATH_32 = '/data/local/tmp/32/'
DEFAULT_PROFILING_LIBRARY_PATH_64 = '/data/local/tmp/64/'
def setUpClass(self):
'''Prepare class, push binaries, set permission, create test cases.'''
required_params = [
keys.ConfigKeys.IKEY_DATA_FILE_PATH,
]
opt_params = [
keys.ConfigKeys.IKEY_BINARY_TEST_SOURCE,
keys.ConfigKeys.IKEY_BINARY_TEST_WORKING_DIRECTORY,
keys.ConfigKeys.IKEY_BINARY_TEST_ENVP,
keys.ConfigKeys.IKEY_BINARY_TEST_ARGS,
keys.ConfigKeys.IKEY_BINARY_TEST_LD_LIBRARY_PATH,
keys.ConfigKeys.IKEY_BINARY_TEST_PROFILING_LIBRARY_PATH,
keys.ConfigKeys.IKEY_NATIVE_SERVER_PROCESS_NAME,
keys.ConfigKeys.IKEY_PRECONDITION_FILE_PATH_PREFIX,
keys.ConfigKeys.IKEY_PRECONDITION_SYSPROP,
]
self.getUserParams(
req_param_names=required_params, opt_param_names=opt_params)
# test-module-name is required in binary tests.
self.getUserParam(
keys.ConfigKeys.KEY_TESTBED_NAME, error_if_not_found=True)
logging.debug("%s: %s", keys.ConfigKeys.IKEY_DATA_FILE_PATH,
self.data_file_path)
self.binary_test_source = self.getUserParam(
keys.ConfigKeys.IKEY_BINARY_TEST_SOURCE, default_value=[])
self.working_directory = {}
if hasattr(self, keys.ConfigKeys.IKEY_BINARY_TEST_WORKING_DIRECTORY):
self.binary_test_working_directory = map(
str, self.binary_test_working_directory)
for token in self.binary_test_working_directory:
tag = ''
path = token
if self.TAG_DELIMITER in token:
tag, path = token.split(self.TAG_DELIMITER)
self.working_directory[tag] = path
self.envp = {}
if hasattr(self, keys.ConfigKeys.IKEY_BINARY_TEST_ENVP):
self.binary_test_envp = map(str, self.binary_test_envp)
for token in self.binary_test_envp:
tag = ''
path = token
split = token.find(self.TAG_DELIMITER)
if split >= 0:
tag, path = token[:split], token[
split + len(self.TAG_DELIMITER):]
if tag in self.envp:
self.envp[tag] += ' %s' % path
else:
self.envp[tag] = path
self.args = {}
if hasattr(self, keys.ConfigKeys.IKEY_BINARY_TEST_ARGS):
self.binary_test_args = map(str, self.binary_test_args)
for token in self.binary_test_args:
tag = ''
arg = token
split = token.find(self.TAG_DELIMITER)
if split >= 0:
tag, arg = token[:split], token[
split + len(self.TAG_DELIMITER):]
if tag in self.args:
self.args[tag] += ' %s' % arg
else:
self.args[tag] = arg
if hasattr(self, keys.ConfigKeys.IKEY_PRECONDITION_FILE_PATH_PREFIX):
self.file_path_prefix = {
self.DEFAULT_TAG_32: [],
self.DEFAULT_TAG_64: [],
}
self.precondition_file_path_prefix = map(
str, self.precondition_file_path_prefix)
for token in self.precondition_file_path_prefix:
tag = ''
path = token
if self.TAG_DELIMITER in token:
tag, path = token.split(self.TAG_DELIMITER)
if tag == '':
self.file_path_prefix[self.DEFAULT_TAG_32].append(path)
self.file_path_prefix[self.DEFAULT_TAG_64].append(path)
elif tag in self.file_path_prefix:
self.file_path_prefix[tag].append(path)
else:
logging.warn(
"Incorrect tag %s in precondition-file-path-prefix",
tag)
self.ld_library_path = {
self.DEFAULT_TAG_32: self.DEFAULT_LD_LIBRARY_PATH_32,
self.DEFAULT_TAG_64: self.DEFAULT_LD_LIBRARY_PATH_64,
}
if hasattr(self, keys.ConfigKeys.IKEY_BINARY_TEST_LD_LIBRARY_PATH):
self.binary_test_ld_library_path = map(
str, self.binary_test_ld_library_path)
for token in self.binary_test_ld_library_path:
tag = ''
path = token
if self.TAG_DELIMITER in token:
tag, path = token.split(self.TAG_DELIMITER)
if tag in self.ld_library_path:
self.ld_library_path[tag] = '{}:{}'.format(
path, self.ld_library_path[tag])
else:
self.ld_library_path[tag] = path
self.profiling_library_path = {
self.DEFAULT_TAG_32: self.DEFAULT_PROFILING_LIBRARY_PATH_32,
self.DEFAULT_TAG_64: self.DEFAULT_PROFILING_LIBRARY_PATH_64,
}
if hasattr(self,
keys.ConfigKeys.IKEY_BINARY_TEST_PROFILING_LIBRARY_PATH):
self.binary_test_profiling_library_path = map(
str, self.binary_test_profiling_library_path)
for token in self.binary_test_profiling_library_path:
tag = ''
path = token
if self.TAG_DELIMITER in token:
tag, path = token.split(self.TAG_DELIMITER)
self.profiling_library_path[tag] = path
self._dut = self.android_devices[0]
self.shell = self._dut.shell
if self.coverage.enabled and self.coverage.global_coverage:
self.coverage.InitializeDeviceCoverage(self._dut)
for tag in [self.DEFAULT_TAG_32, self.DEFAULT_TAG_64]:
if tag in self.envp:
self.envp[tag] = '%s %s'.format(
self.envp[tag], coverage_utils.COVERAGE_TEST_ENV)
else:
self.envp[tag] = coverage_utils.COVERAGE_TEST_ENV
self.testcases = []
if not precondition_utils.CheckSysPropPrecondition(
self, self._dut, self.shell):
logging.warn('Precondition sysprop not met; '
'all tests skipped.')
self.skipAllTests('precondition sysprop not met')
self.tags = set()
self.CreateTestCases()
cmd = list(
set('chmod 755 %s' % test_case.path
for test_case in self.testcases))
cmd_results = self.shell.Execute(cmd)
if any(cmd_results[const.EXIT_CODE]):
logging.error('Failed to set permission to some of the binaries:\n'
'%s\n%s', cmd, cmd_results)
def CreateTestCases(self):
'''Push files to device and create test case objects.'''
source_list = list(map(self.ParseTestSource, self.binary_test_source))
def isValidSource(source):
'''Checks that the truth value and bitness of source is valid.
Args:
source: a tuple of (string, string, string or None),
representing (host side absolute path, device side absolute
path, tag), is the return value of self.ParseTestSource
Returns:
False if source has a false truth value or its bitness does
not match the abi_bitness of the test run.
'''
if not source:
return False
tag = source[2]
if tag is None:
return True
tag = str(tag)
if (tag.endswith(const.SUFFIX_32BIT) and self.abi_bitness == '64'
) or (tag.endswith(const.SUFFIX_64BIT) and
self.abi_bitness == '32'):
logging.debug('Bitness of test source, %s, does not match the '
'abi_bitness, %s, of test run. Skipping',
str(source[0]),
self.abi_bitness)
return False
return True
source_list = filter(isValidSource, source_list)
logging.debug('Parsed test sources: %s', source_list)
# Push source files first
for src, dst, tag in source_list:
if src:
if os.path.isdir(src):
src = os.path.join(src, '.')
logging.debug('Pushing from %s to %s.', src, dst)
self._dut.adb.push('{src} {dst}'.format(src=src, dst=dst))
self.shell.Execute('ls %s' % dst)
if not hasattr(self, 'testcases'):
self.testcases = []
# Then create test cases
for src, dst, tag in source_list:
if tag is not None:
# tag not being None means to create a test case
self.tags.add(tag)
logging.debug('Creating test case from %s with tag %s', dst,
tag)
testcase = self.CreateTestCase(dst, tag)
if not testcase:
continue
if type(testcase) is list:
self.testcases.extend(testcase)
else:
self.testcases.append(testcase)
if not self.testcases:
logging.warn("No test case is found or generated.")
def PutTag(self, name, tag):
'''Put tag on name and return the resulting string.
Args:
name: string, a test name
tag: string
Returns:
String, the result string after putting tag on the name
'''
return '{}{}'.format(name, tag)
def ExpandListItemTags(self, input_list):
'''Expand list items with tags.
Since binary test allows a tag to be added in front of the binary
path, test names are generated with tags attached. This function is
used to expand the filters correspondingly. If a filter contains
a tag, only test name with that tag will be included in output.
Otherwise, all known tags will be paired to the test name in output
list.
Args:
input_list: list of string, the list to expand
Returns:
A list of string
'''
result = []
for item in input_list:
if self.TAG_DELIMITER in item:
tag, name = item.split(self.TAG_DELIMITER)
result.append(self.PutTag(name, tag))
for tag in self.tags:
result.append(self.PutTag(item, tag))
return result
def tearDownClass(self):
'''Perform clean-up tasks'''
# Retrieve coverage if applicable
if self.coverage.enabled and self.coverage.global_coverage:
if not self.isSkipAllTests():
self.coverage.SetCoverageData(dut=self._dut, isGlobal=True)
if self.profiling.enabled:
self.profiling.DisableVTSProfiling(self.shell)
# Clean up the pushed binaries
logging.debug('Start class cleaning up jobs.')
# Delete pushed files
sources = [
self.ParseTestSource(src) for src in self.binary_test_source
]
sources = set(filter(bool, sources))
paths = [dst for src, dst, tag in sources if src and dst]
cmd = ['rm -rf %s' % dst for dst in paths]
cmd_results = self.shell.Execute(cmd, no_except=True)
if not cmd_results or any(cmd_results[const.EXIT_CODE]):
logging.warning('Failed to clean up test class: %s', cmd_results)
# Delete empty directories in working directories
dir_set = set(path_utils.TargetDirName(dst) for dst in paths)
dir_set.add(self.ParseTestSource('')[1])
dirs = list(dir_set)
dirs.sort(lambda x, y: cmp(len(y), len(x)))
cmd = ['rmdir %s' % d for d in dirs]
cmd_results = self.shell.Execute(cmd, no_except=True)
if not cmd_results or any(cmd_results[const.EXIT_CODE]):
logging.warning('Failed to remove: %s', cmd_results)
if not self.isSkipAllTests() and self.profiling.enabled:
self.profiling.ProcessAndUploadTraceData()
logging.debug('Finished class cleaning up jobs.')
def ParseTestSource(self, source):
'''Convert host side binary path to device side path.
Args:
source: string, binary test source string
Returns:
A tuple of (string, string, string), representing (host side
absolute path, device side absolute path, tag). Returned tag
will be None if the test source is for pushing file to working
directory only. If source file is specified for adb push but does not
exist on host, None will be returned.
'''
tag = ''
path = source
if self.TAG_DELIMITER in source:
tag, path = source.split(self.TAG_DELIMITER)
src = path
dst = None
if self.PUSH_DELIMITER in path:
src, dst = path.split(self.PUSH_DELIMITER)
if src:
src = os.path.join(self.data_file_path, src)
if not os.path.exists(src):
logging.warning('binary source file is specified '
'but does not exist on host: %s', src)
return None
push_only = dst is not None and dst == ''
if not dst:
parent = self.working_directory[
tag] if tag in self.working_directory else self._GetDefaultBinaryPushDstPath(
src, tag)
dst = path_utils.JoinTargetPath(parent, os.path.basename(src))
if push_only:
tag = None
return str(src), str(dst), tag
def _GetDefaultBinaryPushDstPath(self, src, tag):
'''Get default binary push destination path.
This method is called to get default push destination path when
it is not specified.
If binary source path contains 'data/nativetest[64]', then the binary
will be pushed to /data/nativetest[64] instead of /data/local/tmp
Args:
src: string, source path of binary
tag: string, tag of binary source
Returns:
string, default push path
'''
src_lower = src.lower()
if DATA_NATIVETEST64 in src_lower:
parent_path = targetpath.sep + DATA_NATIVETEST64
elif DATA_NATIVETEST in src_lower:
parent_path = targetpath.sep + DATA_NATIVETEST
else:
parent_path = self.DEVICE_TMP_DIR
return targetpath.join(
parent_path, 'vts_binary_test_%s' % self.__class__.__name__, tag)
def CreateTestCase(self, path, tag=''):
'''Create a list of TestCase objects from a binary path.
Args:
path: string, absolute path of a binary on device
tag: string, a tag that will be appended to the end of test name
Returns:
A list of BinaryTestCase objects
'''
working_directory = self.working_directory[
tag] if tag in self.working_directory else None
envp = self.envp[tag] if tag in self.envp else ''
args = self.args[tag] if tag in self.args else ''
ld_library_path = self.ld_library_path[
tag] if tag in self.ld_library_path else None
profiling_library_path = self.profiling_library_path[
tag] if tag in self.profiling_library_path else None
return binary_test_case.BinaryTestCase(
'',
path_utils.TargetBaseName(path),
path,
tag,
self.PutTag,
working_directory,
ld_library_path,
profiling_library_path,
envp=envp,
args=args)
def VerifyTestResult(self, test_case, command_results):
'''Parse test case command result.
Args:
test_case: BinaryTestCase object, the test case whose command
command_results: dict of lists, shell command result
'''
asserts.assertTrue(command_results, 'Empty command response.')
asserts.assertFalse(
any(command_results[const.EXIT_CODE]),
'Test {} failed with the following results: {}'.format(
test_case, command_results))
def RunTestCase(self, test_case):
'''Runs a test_case.
Args:
test_case: BinaryTestCase object
'''
if self.profiling.enabled:
self.profiling.EnableVTSProfiling(self.shell,
test_case.profiling_library_path)
cmd = test_case.GetRunCommand()
logging.debug("Executing binary test command: %s", cmd)
command_results = self.shell.Execute(cmd)
self.VerifyTestResult(test_case, command_results)
if self.profiling.enabled:
self.profiling.ProcessTraceDataForTestCase(self._dut)
self.profiling.DisableVTSProfiling(self.shell)
def generateAllTests(self):
'''Runs all binary tests.'''
self.runGeneratedTests(
test_func=self.RunTestCase, settings=self.testcases, name_func=str)
if __name__ == "__main__":
test_runner.main()