普通文本  |  166行  |  6.05 KB

#
# Copyright (C) 2018 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import logging
import os
import shutil
import tempfile

import environment_variables as env
import syzkaller_test_case

from vts.runners.host import asserts
from vts.runners.host import base_test
from vts.runners.host import config_parser
from vts.runners.host import keys
from vts.runners.host import test_runner
from vts.utils.python.common import cmd_utils
from vts.utils.python.controllers import adb
from vts.utils.python.controllers import android_device
from vts.utils.python.gcs import gcs_api_utils


class SyzkallerTest(base_test.BaseTestClass):
    """Runs Syzkaller tests on target."""

    start_vts_agents = False

    def setUpClass(self):
        """Creates a remote shell instance, and copies data files.

        Attributes:
            _env: dict, a mapping from key to environment path of this test.
            _gcs_api_utils: a GcsApiUtils object used for uploading logs.
            _dut: an android device controller object used for getting device info.
        """

        required_params = [
            keys.ConfigKeys.IKEY_SERVICE_JSON_PATH,
            keys.ConfigKeys.IKEY_FUZZING_GCS_BUCKET_NAME,
            keys.ConfigKeys.IKEY_SYZKALLER_PACKAGES_PATH,
            keys.ConfigKeys.IKEY_SYZKALLER_TEMPLATE_PATH
        ]
        self.getUserParams(required_params)

        _temp_dir = tempfile.mkdtemp()
        self._env = dict()
        self._env['temp_dir'] = _temp_dir
        self._env['syzkaller_dir'] = os.path.join(_temp_dir, env.syzkaller_dir)
        self._env['syzkaller_bin_dir'] = os.path.join(_temp_dir,
                                                      env.syzkaller_bin_dir)
        self._env['syzkaller_work_dir'] = os.path.join(_temp_dir,
                                                       env.syzkaller_work_dir)
        self._env['template_cfg'] = os.path.join(_temp_dir, env.template_cfg)

        self._gcs_api_utils = gcs_api_utils.GcsApiUtils(
            self.service_key_json_path, self.fuzzing_gcs_bucket_name)
        self._dut = self.android_devices[0]

    def FetchSyzkaller(self):
        """Fetch Syzkaller program from x20 and make sure files are executable."""
        try:
            logging.info('Fetching Syzkaller program.')
            shutil.copytree(self.syzkaller_packages_path,
                            self._env['syzkaller_dir'])
            logging.info('Fetching Syzkaller template configuration.')
            shutil.copy(self.syzkaller_template_path, self._env['temp_dir'])
        except IOError, e:
            logging.exception(e)
            self.skipAllTests(
                'Syzkaller program is not available. Skipping all tests.')

        for root, dirs, files in os.walk(self._env['syzkaller_bin_dir']):
            for filepath in files:
                os.chmod(os.path.join(root, filepath), 0755)

    def CreateTestCases(self):
        """Create syzkaller test cases.

        Returns:
            test_cases, list, the list of test_cases for this test.
        """
        test_cases = []
        test_cases.append(
            syzkaller_test_case.SyzkallerTestCase(self._env, 'linux/arm64',
                                                  self._dut.serial, 'adb',
                                                  'false', 12))
        return test_cases

    def RunTestCase(self, test_case):
        """Run a syzkaller test case.

        Args:
            test_case: SyzkallerTestCase object, the test case to run.
        """
        test_command = test_case.GetRunCommand()
        stdout, stderr, err_code = cmd_utils.ExecuteOneShellCommand(
            test_command, timeout=18000)
        if err_code:
            logging.error(stderr)
        else:
            logging.info(stdout)
        self.ReportTestCase(test_case)

    def ReportTestCase(self, test_case):
        """Asserts the result of the test case and uploads report to GCS.

        Args:
            test_case: SyzkallerTestCase object, the test case to report.
        """
        self.AssertTestCaseResult(test_case)
        self._gcs_api_utils.UploadDir(test_case._work_dir_path,
                                      'kernelfuzz_reports')

    def AssertTestCaseResult(self, test_case):
        """Asserts that test case finished as expected.

        If crash reports were generated during the test, reports test as failure.
        If crash reports were not generated during the test, reports test as success.

        Args:
            test_case: SyzkallerTestCase object, the test case to assert
                       as failure or success.
        """
        logging.info('Test case results.')
        crash_dir = os.path.join(test_case._work_dir_path, 'crashes')
        if os.listdir(crash_dir) == []:
            logging.info('%s did not cause crash in our device.',
                         test_case._test_name)
        else:
            asserts.fail('%s caused crash in our device.',
                         test_case._test_name)

    def tearDownClass(self):
        """Removes the temporary directory used for Syzkaller."""
        logging.debug('Temporary directory %s is being deleted',
                      self._env['temp_dir'])
        try:
            shutil.rmtree(self._env['temp_dir'])
        except OSError as e:
            logging.exception(e)

    def generateKernelFuzzerTests(self):
        """Runs kernel fuzzer tests."""
        self.FetchSyzkaller()
        self.runGeneratedTests(
            test_func=self.RunTestCase,
            settings=self.CreateTestCases(),
            name_func=lambda x: x._test_name)


if __name__ == '__main__':
    test_runner.main()