普通文本  |  201行  |  7.02 KB

#!/usr/bin/env python3.4
#
# Copyright (C) 2016 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import json
import logging
import os

from vts.runners.host import asserts
from vts.runners.host import base_test
from vts.runners.host import const
from vts.runners.host import keys
from vts.runners.host import test_runner
from vts.utils.python.controllers import adb
from vts.utils.python.controllers import android_device
from vts.utils.python.os import path_utils

from vts.testcases.security.poc.host import poc_test_config as config

class SecurityPoCKernelTest(base_test.BaseTestClass):
    """Runs security PoC kernel test cases.

    Attributes:
        _dut: AndroidDevice, the device under test as config
        _testcases: string list, list of testcases to run
        _model: string, device model e.g. "Nexus 5X"
    """
    def setUpClass(self):
        """Creates device under test instance, and copies data files."""
        required_params = [
            keys.ConfigKeys.IKEY_DATA_FILE_PATH,
            keys.ConfigKeys.IKEY_ABI_BITNESS,
            config.ConfigKeys.RUN_STAGING
        ]
        self.getUserParams(required_params)

        logging.info("%s: %s", keys.ConfigKeys.IKEY_DATA_FILE_PATH,
                self.data_file_path)

        self._dut = self.registerController(android_device, False)[0]
        self._testcases = config.POC_TEST_CASES_STABLE
        if self.run_staging:
            self._testcases += config.POC_TEST_CASES_STAGING

    def tearDownClass(self):
        """Deletes all copied data."""
        self._dut.adb.shell("rm -rf %s" % config.POC_TEST_DIR)

    def PushFiles(self):
        """adb pushes related file to target."""
        self._dut.adb.shell("mkdir %s -p" % config.POC_TEST_DIR)

        bitness = getattr(self, keys.ConfigKeys.IKEY_ABI_BITNESS)
        bitness_suffix = "64" if bitness == "64" else ""
        native_test_dir = "nativetest{0}".format(bitness_suffix)
        push_src = os.path.join(self.data_file_path, "DATA", native_test_dir,
                                "security", "poc", ".")
        self._dut.adb.push("%s %s" % (push_src, config.POC_TEST_DIR))

    def CreateHostInput(self, testcase):
        """Gathers information that will be passed to target-side code.

        Args:
            testcase: string, format testsuite/testname, specifies which
                test case to examine.

        Returns:
            dict, information passed to native PoC test, contains info collected
                from device and config. If None, poc should be skipped.
        """
        out = self._dut.adb.shell("getprop ro.product.model")
        device_model = out.strip()
        testcase_path = os.path.join(*testcase.split("/"))
        test_config_path = os.path.join(
            self.data_file_path, "vts", "testcases", "security", "poc",
            "target", testcase_path, "poc.config")

        with open(test_config_path) as test_config_file:
            poc_config = json.load(test_config_file)["target_models"]

            # If dut model is not in the test config, test should be skipped.
            if not device_model in poc_config.keys():
                return None

            params = poc_config.get("default", {})
            params.update(poc_config[device_model])

        host_input = {
            "device_model": device_model,
            "params": params
        }

        return host_input

    def CreateTestFlags(self, host_input):
        """Packs host input info into command line flags.

        Args:
            host_input: dict, information passed to native PoC test.

        Returns:
            string, host_input packed into command-line flags.
        """
        device_model_flag = "--device_model=\"%s\"" % host_input["device_model"]

        params = ["%s=%s" % (k, v) for k, v in host_input["params"].items()]
        params = ",".join(params)
        params_flag = "--params=\"%s\"" % params

        test_flags = [device_model_flag, params_flag]
        return " ".join(test_flags)

    def RunTestcase(self, testcase):
        """Runs the given testcase and asserts the result.

        Args:
            testcase: string, format testsuite/testname, specifies which
                test case to run.
        """
        host_input = self.CreateHostInput(testcase)
        asserts.skipIf(not host_input,
                "%s not configured to run against this target model." % testcase)

        items = testcase.split("/", 1)
        testsuite = items[0]

        chmod_cmd = "chmod -R 755 %s" % path_utils.JoinTargetPath(
            config.POC_TEST_DIR, testsuite)
        logging.info("Executing: %s", chmod_cmd)
        self._dut.adb.shell(chmod_cmd)

        test_flags = self.CreateTestFlags(host_input)
        test_cmd = "%s %s" % (
            path_utils.JoinTargetPath(config.POC_TEST_DIR, testcase),
            test_flags)
        logging.info("Executing: %s", test_cmd)

        try:
            stdout = self._dut.adb.shell(test_cmd)
            result = {
                const.STDOUT: stdout,
                const.STDERR: "",
                const.EXIT_CODE: 0
            }
        except adb.AdbError as e:
            result = {
                const.STDOUT: e.stdout,
                const.STDERR: e.stderr,
                const.EXIT_CODE: e.ret_code
            }
        logging.info("Test results:\n%s", result)

        self.AssertTestResult(result)

    def AssertTestResult(self, result):
        """Asserts that testcase finished as expected.

        Checks that device is in responsive state. If not, waits for boot
        then reports test as failure. If it is, asserts that all test commands
        returned exit code 0.

        Args:
            result: dict(str, str, int), stdout, stderr and return code
                from test run.
        """
        if self._dut.hasBooted():
            exit_code = result[const.EXIT_CODE]
            asserts.skipIf(exit_code == config.ExitCode.POC_TEST_SKIP,
                    "Test case was skipped.")
            asserts.assertFalse(exit_code == config.ExitCode.POC_TEST_FAIL,
                    "Test case failed.")
        else:
            self._dut.waitForBootCompletion()
            self._dut.rootAdb()
            self.PushFiles()
            asserts.fail("Test case left the device in unresponsive state.")

    def generateSecurityPoCTests(self):
        """Runs security PoC tests."""
        self.PushFiles()
        self.runGeneratedTests(
            test_func=self.RunTestcase,
            settings=self._testcases,
            name_func=lambda x: x.replace('/','_'))

if __name__ == "__main__":
    test_runner.main()