普通文本  |  480行  |  20.11 KB

#
# Copyright (C) 2016 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import io
import logging
import os
import shutil
import time
import zipfile

from google.protobuf.internal.containers import RepeatedCompositeFieldContainer

from vts.runners.host import keys
from vts.utils.python.archive import archive_parser
from vts.utils.python.build.api import artifact_fetcher
from vts.utils.python.coverage import coverage_report
from vts.utils.python.coverage import gcda_parser
from vts.utils.python.coverage import gcno_parser
from vts.utils.python.coverage.parser import FileFormatError
from vts.utils.python.web import feature_utils

TARGET_COVERAGE_PATH = "/data/misc/gcov/"
LOCAL_COVERAGE_PATH = "/tmp/vts-test-coverage"

GCNO_SUFFIX = ".gcno"
GCDA_SUFFIX = ".gcda"
COVERAGE_SUFFIX = ".gcnodir"
GIT_PROJECT = "git_project"
MODULE_NAME = "module_name"
NAME = "name"
PATH = "path"

_BRANCH = "master"  # TODO: make this a runtime parameter
_CHECKSUM_GCNO_DICT = "checksum_gcno_dict"
_COVERAGE_ZIP = "coverage_zip"
_REVISION_DICT = "revision_dict"


class CoverageFeature(feature_utils.Feature):
    """Feature object for coverage functionality.

    Attributes:
        enabled: boolean, True if coverage is enabled, False otherwise
        web: (optional) WebFeature, object storing web feature util for test run
    """

    _TOGGLE_PARAM = keys.ConfigKeys.IKEY_ENABLE_COVERAGE
    _REQUIRED_PARAMS = [
            keys.ConfigKeys.IKEY_ANDROID_DEVICE,
            keys.ConfigKeys.IKEY_SERVICE_JSON_PATH,
            keys.ConfigKeys.IKEY_ANDROID_DEVICE
        ]
    _OPTIONAL_PARAMS = [keys.ConfigKeys.IKEY_MODULES]

    def __init__(self, user_params, web=None):
        """Initializes the coverage feature.

        Args:
            user_params: A dictionary from parameter name (String) to parameter value.
            web: (optional) WebFeature, object storing web feature util for test run
        """
        self.ParseParameters(self._TOGGLE_PARAM, self._REQUIRED_PARAMS, self._OPTIONAL_PARAMS,
                             user_params)
        self.web = web
        logging.info("Coverage enabled: %s", self.enabled)

    def _ExtractSourceName(self, gcno_summary, file_name):
        """Gets the source name from the GCNO summary object.

        Gets the original source file name from the FileSummary object describing
        a gcno file using the base filename of the gcno/gcda file.

        Args:
            gcno_summary: a FileSummary object describing a gcno file
            file_name: the base filename (without extensions) of the gcno or gcda file

        Returns:
            The relative path to the original source file corresponding to the
            provided gcno summary. The path is relative to the root of the build.
        """
        src_file_path = None
        for key in gcno_summary.functions:
            src_file_path = gcno_summary.functions[key].src_file_name
            src_parts = src_file_path.rsplit(".", 1)
            src_file_name = src_parts[0]
            src_extension = src_parts[1] if len(src_parts) > 1 else None
            if src_extension not in ["c", "cpp", "cc"]:
                logging.warn("Found unsupported file type: %s", src_file_path)
                continue
            if src_file_name.endswith(file_name):
                logging.info("Coverage source file: %s", src_file_path)
                break
        return src_file_path

    def _GetChecksumGcnoDict(self, cov_zip):
        """Generates a dictionary from gcno checksum to GCNOParser object.

        Processes the gcnodir files in the zip file to produce a mapping from gcno
        checksum to the GCNOParser object wrapping the gcno content.

        Args:
            cov_zip: the zip file containing gcnodir files from the device build

        Returns:
            the dictionary of gcno checksums to GCNOParser objects
        """
        checksum_gcno_dict = dict()
        fnames = cov_zip.namelist()
        instrumented_modules = [f for f in fnames if f.endswith(COVERAGE_SUFFIX)]
        for instrumented_module in instrumented_modules:
            # Read the gcnodir file
            archive = archive_parser.Archive(cov_zip.open(instrumented_module).read())
            try:
                archive.Parse()
            except ValueError:
                logging.error("Archive could not be parsed: %s", name)
                continue

            for gcno_file_path in archive.files:
                file_name_path = gcno_file_path.rsplit(".", 1)[0]
                file_name = os.path.basename(file_name_path)
                gcno_stream = io.BytesIO(archive.files[gcno_file_path])
                gcno_file_parser = gcno_parser.GCNOParser(gcno_stream)
                checksum_gcno_dict[gcno_file_parser.checksum] = gcno_file_parser
        return checksum_gcno_dict

    def InitializeDeviceCoverage(self, dut):
        """Initializes the device for coverage before tests run.

        Finds and removes all gcda files under TARGET_COVERAGE_PATH before tests
        run.

        Args:
            dut: the device under test.
        """
        logging.info("Removing existing gcda files.")
        gcda_files = dut.adb.shell("find %s -name \"*.gcda\" -type f -delete" %
                                   TARGET_COVERAGE_PATH)

    def GetGcdaDict(self, dut, local_coverage_path=None):
        """Retrieves GCDA files from device and creates a dictionary of files.

        Find all GCDA files on the target device, copy them to the host using
        adb, then return a dictionary mapping from the gcda basename to the
        temp location on the host.

        Args:
            dut: the device under test.
            local_coverage_path: the host path (string) in which to copy gcda files

        Returns:
            A dictionary with gcda basenames as keys and contents as the values.
        """
        logging.info("Creating gcda dictionary")
        gcda_dict = {}
        if not local_coverage_path:
            timestamp = str(int(time.time() * 1000000))
            local_coverage_path = os.path.join(LOCAL_COVERAGE_PATH, timestamp)
        if os.path.exists(local_coverage_path):
            shutil.rmtree(local_coverage_path)
        os.makedirs(local_coverage_path)
        logging.info("Storing gcda tmp files to: %s", local_coverage_path)
        gcda_files = dut.adb.shell("find %s -name \"*.gcda\"" %
                                   TARGET_COVERAGE_PATH).split("\n")
        for gcda in gcda_files:
            if gcda:
                basename = os.path.basename(gcda.strip())
                file_name = os.path.join(local_coverage_path,
                                         basename)
                dut.adb.pull("%s %s" % (gcda, file_name))
                gcda_content = open(file_name, "rb").read()
                gcda_dict[basename] = gcda_content
        return gcda_dict

    def _AutoProcess(self, gcda_dict, isGlobal):
        """Process coverage data and appends coverage reports to the report message.

        Matches gcno files with gcda files and processes them into a coverage report
        with references to the original source code used to build the system image.
        Coverage information is appended as a CoverageReportMessage to the provided
        report message.

        Git project information is automatically extracted from the build info and
        the source file name enclosed in each gcno file. Git project names must
        resemble paths and may differ from the paths to their project root by at
        most one. If no match is found, then coverage information will not be
        be processed.

        e.g. if the project path is test/vts, then its project name may be
             test/vts or <some folder>/test/vts in order to be recognized.

        Args:
            gcda_dict: the dictionary of gcda basenames to gcda content (binary string)
            isGlobal: boolean, True if the coverage data is for the entire test, False if only for
                      the current test case.
        """
        revision_dict = getattr(self, _REVISION_DICT, None)
        checksum_gcno_dict = getattr(self, _CHECKSUM_GCNO_DICT, None)
        for gcda_name in gcda_dict:
            gcda_stream = io.BytesIO(gcda_dict[gcda_name])
            gcda_file_parser = gcda_parser.GCDAParser(gcda_stream)

            if not gcda_file_parser.checksum in checksum_gcno_dict:
                logging.info("No matching gcno file for gcda: %s", gcda_name)
                continue
            gcno_file_parser = checksum_gcno_dict[gcda_file_parser.checksum]

            try:
                gcno_summary = gcno_file_parser.Parse()
            except FileFormatError:
                logging.error("Error parsing gcno for gcda %s", gcda_name)
                continue

            file_name = gcda_name.rsplit(".", 1)[0]
            src_file_path = self._ExtractSourceName(gcno_summary, file_name)

            if not src_file_path:
                logging.error("No source file found for gcda %s.", gcda_name)
                continue

            # Process and merge gcno/gcda data
            try:
                gcda_file_parser.Parse(gcno_summary)
            except FileFormatError:
                logging.error("Error parsing gcda file %s", gcda_name)
                continue

            # Get the git project information
            # Assumes that the project name and path to the project root are similar
            revision = None
            for project_name in revision_dict:
                # Matches cases when source file root and project name are the same
                if src_file_path.startswith(str(project_name)):
                    git_project_name = str(project_name)
                    git_project_path = str(project_name)
                    revision = str(revision_dict[project_name])
                    logging.info("Source file '%s' matched with project '%s'",
                                 src_file_path, git_project_name)
                    break

                parts = os.path.normpath(str(project_name)).split(os.sep, 1)
                # Matches when project name has an additional prefix before the
                # project path root.
                if len(parts) > 1 and src_file_path.startswith(parts[-1]):
                    git_project_name = str(project_name)
                    git_project_path = parts[-1]
                    revision = str(revision_dict[project_name])
                    logging.info("Source file '%s' matched with project '%s'",
                                 src_file_path, git_project_name)

            if not revision:
                logging.info("Could not find git info for %s", src_file_path)
                continue

            if self.web and self.web.enabled:
                coverage_vec = coverage_report.GenerateLineCoverageVector(
                    src_file_path, gcno_summary)
                total_count, covered_count = coverage_report.GetCoverageStats(coverage_vec)
                self.web.AddCoverageReport(
                    coverage_vec, src_file_path, git_project_name, git_project_path,
                    revision, covered_count, total_count, isGlobal)

    def _ManualProcess(self, gcda_dict, isGlobal):
        """Process coverage data and appends coverage reports to the report message.

        Opens the gcno files in the cov_zip for the specified modules and matches
        gcno/gcda files. Then, coverage vectors are generated for each set of matching
        gcno/gcda files and appended as a CoverageReportMessage to the provided
        report message. Unlike AutoProcess, coverage information is only processed
        for the modules explicitly defined in 'modules'.

        Args:
            gcda_dict: the dictionary of gcda basenames to gcda content (binary string)
            isGlobal: boolean, True if the coverage data is for the entire test, False if only for
                      the current test case.
        """
        cov_zip = getattr(self, _COVERAGE_ZIP, None)
        revision_dict = getattr(self, _REVISION_DICT, None)
        modules = getattr(self, keys.ConfigKeys.IKEY_MODULES, None)
        covered_modules = set(cov_zip.namelist())
        for module in modules:
            if MODULE_NAME not in module or GIT_PROJECT not in module:
                logging.error("Coverage module must specify name and git project: %s",
                              module)
                continue
            project = module[GIT_PROJECT]
            if PATH not in project or NAME not in project:
                logging.error("Project name and path not specified: %s", project)
                continue

            name = str(module[MODULE_NAME]) + COVERAGE_SUFFIX
            git_project = str(project[NAME])
            git_project_path = str(project[PATH])

            if name not in covered_modules:
                logging.error("No coverage information for module %s", name)
                continue
            if git_project not in revision_dict:
                logging.error("Git project not present in device revision dict: %s",
                              git_project)
                continue

            revision = str(revision_dict[git_project])
            archive = archive_parser.Archive(cov_zip.open(name).read())
            try:
                archive.Parse()
            except ValueError:
                logging.error("Archive could not be parsed: %s", name)
                continue

            for gcno_file_path in archive.files:
                file_name_path = gcno_file_path.rsplit(".", 1)[0]
                file_name = os.path.basename(file_name_path)
                gcno_content = archive.files[gcno_file_path]
                gcno_stream = io.BytesIO(gcno_content)
                try:
                    gcno_summary = gcno_parser.GCNOParser(gcno_stream).Parse()
                except FileFormatError:
                    logging.error("Error parsing gcno file %s", gcno_file_path)
                    continue
                src_file_path = None

                # Match gcno file with gcda file
                gcda_name = file_name + GCDA_SUFFIX
                if gcda_name not in gcda_dict:
                    logging.error("No gcda file found %s.", gcda_name)
                    continue

                src_file_path = self._ExtractSourceName(gcno_summary, file_name)

                if not src_file_path:
                    logging.error("No source file found for %s.", gcno_file_path)
                    continue

                # Process and merge gcno/gcda data
                gcda_content = gcda_dict[gcda_name]
                gcda_stream = io.BytesIO(gcda_content)
                try:
                    gcda_parser.GCDAParser(gcda_stream).Parse(gcno_summary)
                except FileFormatError:
                    logging.error("Error parsing gcda file %s", gcda_content)
                    continue

                if self.web and self.web.enabled:
                    coverage_vec = coverage_report.GenerateLineCoverageVector(
                        src_file_path, gcno_summary)
                    total_count, covered_count = coverage_report.GetCoverageStats(coverage_vec)
                    self.web.AddCoverageReport(
                        coverage_vec, src_file_path, git_project, git_project_path,
                        revision, covered_count, total_count, isGlobal)

    def LoadArtifacts(self):
        """Initializes the test for coverage instrumentation.

        Downloads build artifacts from the build server
        (gcno zip file and git revision dictionary) and prepares for coverage
        measurement.

        Requires coverage feature enabled; no-op otherwise.
        """
        if not self.enabled:
            return

        self.enabled = False

        # Use first device info to get product, flavor, and ID
        # TODO: support multi-device builds
        android_devices = getattr(self, keys.ConfigKeys.IKEY_ANDROID_DEVICE)
        if not isinstance(android_devices, list):
            logging.warn("android device information not available")
            return

        device_spec = android_devices[0]
        build_flavor = device_spec.get(keys.ConfigKeys.IKEY_BUILD_FLAVOR)
        device_build_id = device_spec.get(keys.ConfigKeys.IKEY_BUILD_ID)

        if not build_flavor or not device_build_id:
            logging.error("Could not read device information.")
            return

        build_flavor = str(build_flavor)
        if not "coverage" in build_flavor:
            build_flavor = "{0}_coverage".format(build_flavor)
        product = build_flavor.split("-", 1)[0]
        build_id = str(device_build_id)

        # Get service json path
        service_json_path = getattr(self, keys.ConfigKeys.IKEY_SERVICE_JSON_PATH)

        # Instantiate build client
        try:
            build_client = artifact_fetcher.AndroidBuildClient(service_json_path)
        except Exception as e:
            logging.exception('Failed to instantiate build client: %s', e)
            return

        # Fetch repo dictionary
        try:
            revision_dict = build_client.GetRepoDictionary(_BRANCH, build_flavor, device_build_id)
            setattr(self, _REVISION_DICT, revision_dict)
        except Exception as e:
            logging.exception('Failed to fetch repo dictionary: %s', e)
            logging.info('Coverage disabled')
            return

        # Fetch coverage zip
        try:
            cov_zip = io.BytesIO(
                build_client.GetCoverage(_BRANCH, build_flavor, device_build_id, product))
            cov_zip = zipfile.ZipFile(cov_zip)
            setattr(self, _COVERAGE_ZIP, cov_zip)
        except Exception as e:
            logging.exception('Failed to fetch coverage zip: %s', e)
            logging.info('Coverage disabled')
            return

        if not hasattr(self, keys.ConfigKeys.IKEY_MODULES):
            checksum_gcno_dict = self._GetChecksumGcnoDict(cov_zip)
            setattr(self, _CHECKSUM_GCNO_DICT, checksum_gcno_dict)

        self.enabled = True

    def SetCoverageData(self, coverage_data=None, isGlobal=False, dut=None):
        """Sets and processes coverage data.

        Organizes coverage data and processes it into a coverage report in the
        current test case

        Requires feature to be enabled; no-op otherwise.

        Args:
            coverage_data may be either:
                          (1) a dict where gcda name is the key and binary
                              content is the value, or
                          (2) a list of NativeCodeCoverageRawDataMessage objects
                          (3) None if the data will be pulled from dut
            isGlobal: True if the coverage data is for the entire test, False if
                      if the coverage data is just for the current test case.
            dut: (optional) the device object for which to pull coverage data
        """
        if not self.enabled:
            return

        if not coverage_data and dut:
            coverage_data = self.GetGcdaDict(dut)

        if not coverage_data:
            logging.info("SetCoverageData: empty coverage data")
            return

        if isinstance(coverage_data, RepeatedCompositeFieldContainer):
            gcda_dict = {}
            for coverage_msg in coverage_data:
                gcda_dict[coverage_msg.file_path] = coverage_msg.gcda
        elif isinstance(coverage_data, dict):
            gcda_dict = coverage_data
        else:
            logging.error("SetCoverageData: unexpected coverage_data type: %s",
                          str(type(coverage_data)))
            return
        logging.info("coverage file paths %s", str([fp for fp in gcda_dict]))

        if not hasattr(self, keys.ConfigKeys.IKEY_MODULES):
            # auto-process coverage data
            self._AutoProcess(gcda_dict, isGlobal)
        else:
            # explicitly process coverage data for the specified modules
            self._ManualProcess(gcda_dict, isGlobal)