#
# Copyright (C) 2016 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import logging
import os
from vts.runners.host import asserts
from vts.runners.host import base_test
from vts.runners.host import const
from vts.runners.host import keys
from vts.runners.host import test_runner
from vts.utils.python.controllers import adb
from vts.utils.python.common import list_utils
from vts.utils.python.os import path_utils
from vts.testcases.template.llvmfuzzer_test import llvmfuzzer_test_config as config
class LLVMFuzzerTest(base_test.BaseTestClass):
"""Runs fuzzer tests on target.
Attributes:
_dut: AndroidDevice, the device under test as config
_testcases: string list, list of testcases to run
start_vts_agents: whether to start vts agents when registering new
android devices.
"""
start_vts_agents = False
def setUpClass(self):
"""Creates a remote shell instance, and copies data files."""
required_params = [
keys.ConfigKeys.IKEY_DATA_FILE_PATH,
config.ConfigKeys.FUZZER_CONFIGS
]
self.getUserParams(required_params)
self._testcases = map(lambda x: str(x), self.fuzzer_configs.keys())
logging.debug("Testcases: %s", self._testcases)
logging.debug("%s: %s", keys.ConfigKeys.IKEY_DATA_FILE_PATH,
self.data_file_path)
logging.debug("%s: %s", config.ConfigKeys.FUZZER_CONFIGS,
self.fuzzer_configs)
self._dut = self.android_devices[0]
self._dut.adb.shell("mkdir %s -p" % config.FUZZER_TEST_DIR)
def tearDownClass(self):
"""Deletes all copied data."""
self._dut.adb.shell("rm -rf %s" % config.FUZZER_TEST_DIR)
def PushFiles(self, testcase):
"""adb pushes testcase file to target.
Args:
testcase: string, path to executable fuzzer.
"""
push_src = os.path.join(self.data_file_path, config.FUZZER_SRC_DIR,
testcase)
self._dut.adb.push("%s %s" % (push_src, config.FUZZER_TEST_DIR))
logging.debug("Adb pushed: %s", testcase)
def CreateFuzzerFlags(self, fuzzer_config):
"""Creates flags for the fuzzer executable.
Args:
fuzzer_config: dict, contains configuration for the fuzzer.
Returns:
string, command line flags for fuzzer executable.
"""
def _SerializeVTSFuzzerParams(params):
"""Creates VTS command line flags for fuzzer executable.
Args:
params: dict, contains flags and their values.
Returns:
string, of form "--<flag0>=<val0> --<flag1>=<val1> ... "
"""
VTS_SPEC_FILES = "vts_spec_files"
VTS_EXEC_SIZE = "vts_exec_size"
DELIMITER = ":"
# vts_spec_files is a string list, will be serialized like this:
# [a, b, c] -> "a:b:c"
vts_spec_files = params.get(VTS_SPEC_FILES, {})
target_vts_spec_files = DELIMITER.join(map(
lambda x: path_utils.JoinTargetPath(config.FUZZER_SPEC_DIR, x),
vts_spec_files))
flags = "--%s=\"%s\" " % (VTS_SPEC_FILES, target_vts_spec_files)
vts_exec_size = params.get(VTS_EXEC_SIZE, {})
flags += "--%s=%s" % (VTS_EXEC_SIZE, vts_exec_size)
return flags
def _SerializeLLVMFuzzerParams(params):
"""Creates LLVM libfuzzer command line flags for fuzzer executable.
Args:
params: dict, contains flags and their values.
Returns:
string, of form "--<flag0>=<val0> --<flag1>=<val1> ... "
"""
return " ".join(["-%s=%s" % (k, v) for k, v in params.items()])
vts_fuzzer_params = fuzzer_config.get("vts_fuzzer_params", {})
llvmfuzzer_params = config.FUZZER_PARAMS.copy()
llvmfuzzer_params.update(fuzzer_config.get("llvmfuzzer_params", {}))
vts_fuzzer_flags = _SerializeVTSFuzzerParams(vts_fuzzer_params)
llvmfuzzer_flags = _SerializeLLVMFuzzerParams(llvmfuzzer_params)
return vts_fuzzer_flags + " -- " + llvmfuzzer_flags
def CreateCorpus(self, fuzzer, fuzzer_config):
"""Creates a corpus directory on target.
Args:
fuzzer: string, name of the fuzzer executable.
fuzzer_config: dict, contains configuration for the fuzzer.
Returns:
string, path to corpus directory on the target.
"""
corpus = fuzzer_config.get("corpus", [])
corpus_dir = path_utils.JoinTargetPath(config.FUZZER_TEST_DIR,
"%s_corpus" % fuzzer)
self._dut.adb.shell("mkdir %s -p" % corpus_dir)
for idx, corpus_entry in enumerate(corpus):
corpus_entry = corpus_entry.replace("x", "\\x")
corpus_entry_file = path_utils.JoinTargetPath(
corpus_dir, "input%s" % idx)
cmd = "echo -ne '%s' > %s" % (str(corpus_entry), corpus_entry_file)
# Vts shell drive doesn't play nicely with escape characters,
# so we use adb shell.
self._dut.adb.shell("\"%s\"" % cmd)
return corpus_dir
def RunTestcase(self, fuzzer):
"""Runs the given testcase and asserts the result.
Args:
fuzzer: string, name of fuzzer executable.
"""
self.PushFiles(fuzzer)
fuzzer_config = self.fuzzer_configs.get(fuzzer, {})
test_flags = self.CreateFuzzerFlags(fuzzer_config)
corpus_dir = self.CreateCorpus(fuzzer, fuzzer_config)
chmod_cmd = "chmod -R 755 %s" % path_utils.JoinTargetPath(
config.FUZZER_TEST_DIR, fuzzer)
self._dut.adb.shell(chmod_cmd)
cd_cmd = "cd %s" % config.FUZZER_TEST_DIR
ld_path = "LD_LIBRARY_PATH=/data/local/tmp/64:/data/local/tmp/32:$LD_LIBRARY_PATH"
test_cmd = "./%s" % fuzzer
fuzz_cmd = "%s && %s %s %s %s > /dev/null" % (cd_cmd, ld_path,
test_cmd, corpus_dir,
test_flags)
logging.debug("Executing: %s", fuzz_cmd)
# TODO(trong): vts shell doesn't handle timeouts properly, change this after it does.
try:
stdout = self._dut.adb.shell("'%s'" % fuzz_cmd)
result = {
const.STDOUT: stdout,
const.STDERR: "",
const.EXIT_CODE: 0
}
except adb.AdbError as e:
result = {
const.STDOUT: e.stdout,
const.STDERR: e.stderr,
const.EXIT_CODE: e.ret_code
}
self.AssertTestResult(fuzzer, result)
def LogCrashReport(self, fuzzer):
"""Logs crash-causing fuzzer input.
Reads the crash report file and logs the contents in format:
"\x01\x23\x45\x67\x89\xab\xcd\xef"
Args:
fuzzer: string, name of fuzzer executable.
"""
cmd = "xxd -p %s" % config.FUZZER_TEST_CRASH_REPORT
# output is string of a hexdump from crash report file.
# From the example above, output would be "0123456789abcdef".
output = self._dut.adb.shell(cmd)
remove_chars = ["\r", "\t", "\n", " "]
for char in remove_chars:
output = output.replace(char, "")
crash_report = ""
# output is guaranteed to be even in length since its a hexdump.
for offset in xrange(0, len(output), 2):
crash_report += "\\x%s" % output[offset:offset + 2]
logging.debug('FUZZER_TEST_CRASH_REPORT for %s: "%s"', fuzzer,
crash_report)
# TODO(trong): differentiate between crashes and sanitizer rule violations.
def AssertTestResult(self, fuzzer, result):
"""Asserts that testcase finished as expected.
Checks that device is in responsive state. If not, waits for boot
then reports test as failure. If it is, asserts that all test commands
returned exit code 0.
Args:
fuzzer: string, name of fuzzer executable.
result: dict(str, str, int), command results from shell.
"""
logging.debug("Test result: %s" % result)
if not self._dut.hasBooted():
self._dut.waitForBootCompletion()
asserts.fail("%s left the device in unresponsive state." % fuzzer)
exit_code = result[const.EXIT_CODE]
if exit_code == config.ExitCode.FUZZER_TEST_FAIL:
self.LogCrashReport(fuzzer)
asserts.fail("%s failed normally." % fuzzer)
elif exit_code != config.ExitCode.FUZZER_TEST_PASS:
asserts.fail("%s failed abnormally." % fuzzer)
def generateFuzzerTests(self):
"""Runs fuzzer tests."""
self.runGeneratedTests(
test_func=self.RunTestcase,
settings=self._testcases,
name_func=lambda x: x.split("/")[-1])
if __name__ == "__main__":
test_runner.main()