# # Copyright (C) 2016 The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import io import logging import os import shutil import time import zipfile from google.protobuf.internal.containers import RepeatedCompositeFieldContainer from vts.runners.host import keys from vts.utils.python.archive import archive_parser from vts.utils.python.build.api import artifact_fetcher from vts.utils.python.coverage import coverage_report from vts.utils.python.coverage import gcda_parser from vts.utils.python.coverage import gcno_parser from vts.utils.python.coverage.parser import FileFormatError from vts.utils.python.web import feature_utils TARGET_COVERAGE_PATH = "/data/misc/gcov/" LOCAL_COVERAGE_PATH = "/tmp/vts-test-coverage" GCNO_SUFFIX = ".gcno" GCDA_SUFFIX = ".gcda" COVERAGE_SUFFIX = ".gcnodir" GIT_PROJECT = "git_project" MODULE_NAME = "module_name" NAME = "name" PATH = "path" _BRANCH = "master" # TODO: make this a runtime parameter _CHECKSUM_GCNO_DICT = "checksum_gcno_dict" _COVERAGE_ZIP = "coverage_zip" _REVISION_DICT = "revision_dict" class CoverageFeature(feature_utils.Feature): """Feature object for coverage functionality. Attributes: enabled: boolean, True if coverage is enabled, False otherwise web: (optional) WebFeature, object storing web feature util for test run """ _TOGGLE_PARAM = keys.ConfigKeys.IKEY_ENABLE_COVERAGE _REQUIRED_PARAMS = [ keys.ConfigKeys.IKEY_ANDROID_DEVICE, keys.ConfigKeys.IKEY_SERVICE_JSON_PATH, keys.ConfigKeys.IKEY_ANDROID_DEVICE ] _OPTIONAL_PARAMS = [keys.ConfigKeys.IKEY_MODULES] def __init__(self, user_params, web=None): """Initializes the coverage feature. Args: user_params: A dictionary from parameter name (String) to parameter value. web: (optional) WebFeature, object storing web feature util for test run """ self.ParseParameters(self._TOGGLE_PARAM, self._REQUIRED_PARAMS, self._OPTIONAL_PARAMS, user_params) self.web = web logging.info("Coverage enabled: %s", self.enabled) def _ExtractSourceName(self, gcno_summary, file_name): """Gets the source name from the GCNO summary object. Gets the original source file name from the FileSummary object describing a gcno file using the base filename of the gcno/gcda file. Args: gcno_summary: a FileSummary object describing a gcno file file_name: the base filename (without extensions) of the gcno or gcda file Returns: The relative path to the original source file corresponding to the provided gcno summary. The path is relative to the root of the build. """ src_file_path = None for key in gcno_summary.functions: src_file_path = gcno_summary.functions[key].src_file_name src_parts = src_file_path.rsplit(".", 1) src_file_name = src_parts[0] src_extension = src_parts[1] if len(src_parts) > 1 else None if src_extension not in ["c", "cpp", "cc"]: logging.warn("Found unsupported file type: %s", src_file_path) continue if src_file_name.endswith(file_name): logging.info("Coverage source file: %s", src_file_path) break return src_file_path def _GetChecksumGcnoDict(self, cov_zip): """Generates a dictionary from gcno checksum to GCNOParser object. Processes the gcnodir files in the zip file to produce a mapping from gcno checksum to the GCNOParser object wrapping the gcno content. Args: cov_zip: the zip file containing gcnodir files from the device build Returns: the dictionary of gcno checksums to GCNOParser objects """ checksum_gcno_dict = dict() fnames = cov_zip.namelist() instrumented_modules = [f for f in fnames if f.endswith(COVERAGE_SUFFIX)] for instrumented_module in instrumented_modules: # Read the gcnodir file archive = archive_parser.Archive(cov_zip.open(instrumented_module).read()) try: archive.Parse() except ValueError: logging.error("Archive could not be parsed: %s", name) continue for gcno_file_path in archive.files: file_name_path = gcno_file_path.rsplit(".", 1)[0] file_name = os.path.basename(file_name_path) gcno_stream = io.BytesIO(archive.files[gcno_file_path]) gcno_file_parser = gcno_parser.GCNOParser(gcno_stream) checksum_gcno_dict[gcno_file_parser.checksum] = gcno_file_parser return checksum_gcno_dict def InitializeDeviceCoverage(self, dut): """Initializes the device for coverage before tests run. Finds and removes all gcda files under TARGET_COVERAGE_PATH before tests run. Args: dut: the device under test. """ logging.info("Removing existing gcda files.") gcda_files = dut.adb.shell("find %s -name \"*.gcda\" -type f -delete" % TARGET_COVERAGE_PATH) def GetGcdaDict(self, dut, local_coverage_path=None): """Retrieves GCDA files from device and creates a dictionary of files. Find all GCDA files on the target device, copy them to the host using adb, then return a dictionary mapping from the gcda basename to the temp location on the host. Args: dut: the device under test. local_coverage_path: the host path (string) in which to copy gcda files Returns: A dictionary with gcda basenames as keys and contents as the values. """ logging.info("Creating gcda dictionary") gcda_dict = {} if not local_coverage_path: timestamp = str(int(time.time() * 1000000)) local_coverage_path = os.path.join(LOCAL_COVERAGE_PATH, timestamp) if os.path.exists(local_coverage_path): shutil.rmtree(local_coverage_path) os.makedirs(local_coverage_path) logging.info("Storing gcda tmp files to: %s", local_coverage_path) gcda_files = dut.adb.shell("find %s -name \"*.gcda\"" % TARGET_COVERAGE_PATH).split("\n") for gcda in gcda_files: if gcda: basename = os.path.basename(gcda.strip()) file_name = os.path.join(local_coverage_path, basename) dut.adb.pull("%s %s" % (gcda, file_name)) gcda_content = open(file_name, "rb").read() gcda_dict[basename] = gcda_content return gcda_dict def _AutoProcess(self, gcda_dict, isGlobal): """Process coverage data and appends coverage reports to the report message. Matches gcno files with gcda files and processes them into a coverage report with references to the original source code used to build the system image. Coverage information is appended as a CoverageReportMessage to the provided report message. Git project information is automatically extracted from the build info and the source file name enclosed in each gcno file. Git project names must resemble paths and may differ from the paths to their project root by at most one. If no match is found, then coverage information will not be be processed. e.g. if the project path is test/vts, then its project name may be test/vts or <some folder>/test/vts in order to be recognized. Args: gcda_dict: the dictionary of gcda basenames to gcda content (binary string) isGlobal: boolean, True if the coverage data is for the entire test, False if only for the current test case. """ revision_dict = getattr(self, _REVISION_DICT, None) checksum_gcno_dict = getattr(self, _CHECKSUM_GCNO_DICT, None) for gcda_name in gcda_dict: gcda_stream = io.BytesIO(gcda_dict[gcda_name]) gcda_file_parser = gcda_parser.GCDAParser(gcda_stream) if not gcda_file_parser.checksum in checksum_gcno_dict: logging.info("No matching gcno file for gcda: %s", gcda_name) continue gcno_file_parser = checksum_gcno_dict[gcda_file_parser.checksum] try: gcno_summary = gcno_file_parser.Parse() except FileFormatError: logging.error("Error parsing gcno for gcda %s", gcda_name) continue file_name = gcda_name.rsplit(".", 1)[0] src_file_path = self._ExtractSourceName(gcno_summary, file_name) if not src_file_path: logging.error("No source file found for gcda %s.", gcda_name) continue # Process and merge gcno/gcda data try: gcda_file_parser.Parse(gcno_summary) except FileFormatError: logging.error("Error parsing gcda file %s", gcda_name) continue # Get the git project information # Assumes that the project name and path to the project root are similar revision = None for project_name in revision_dict: # Matches cases when source file root and project name are the same if src_file_path.startswith(str(project_name)): git_project_name = str(project_name) git_project_path = str(project_name) revision = str(revision_dict[project_name]) logging.info("Source file '%s' matched with project '%s'", src_file_path, git_project_name) break parts = os.path.normpath(str(project_name)).split(os.sep, 1) # Matches when project name has an additional prefix before the # project path root. if len(parts) > 1 and src_file_path.startswith(parts[-1]): git_project_name = str(project_name) git_project_path = parts[-1] revision = str(revision_dict[project_name]) logging.info("Source file '%s' matched with project '%s'", src_file_path, git_project_name) if not revision: logging.info("Could not find git info for %s", src_file_path) continue if self.web and self.web.enabled: coverage_vec = coverage_report.GenerateLineCoverageVector( src_file_path, gcno_summary) total_count, covered_count = coverage_report.GetCoverageStats(coverage_vec) self.web.AddCoverageReport( coverage_vec, src_file_path, git_project_name, git_project_path, revision, covered_count, total_count, isGlobal) def _ManualProcess(self, gcda_dict, isGlobal): """Process coverage data and appends coverage reports to the report message. Opens the gcno files in the cov_zip for the specified modules and matches gcno/gcda files. Then, coverage vectors are generated for each set of matching gcno/gcda files and appended as a CoverageReportMessage to the provided report message. Unlike AutoProcess, coverage information is only processed for the modules explicitly defined in 'modules'. Args: gcda_dict: the dictionary of gcda basenames to gcda content (binary string) isGlobal: boolean, True if the coverage data is for the entire test, False if only for the current test case. """ cov_zip = getattr(self, _COVERAGE_ZIP, None) revision_dict = getattr(self, _REVISION_DICT, None) modules = getattr(self, keys.ConfigKeys.IKEY_MODULES, None) covered_modules = set(cov_zip.namelist()) for module in modules: if MODULE_NAME not in module or GIT_PROJECT not in module: logging.error("Coverage module must specify name and git project: %s", module) continue project = module[GIT_PROJECT] if PATH not in project or NAME not in project: logging.error("Project name and path not specified: %s", project) continue name = str(module[MODULE_NAME]) + COVERAGE_SUFFIX git_project = str(project[NAME]) git_project_path = str(project[PATH]) if name not in covered_modules: logging.error("No coverage information for module %s", name) continue if git_project not in revision_dict: logging.error("Git project not present in device revision dict: %s", git_project) continue revision = str(revision_dict[git_project]) archive = archive_parser.Archive(cov_zip.open(name).read()) try: archive.Parse() except ValueError: logging.error("Archive could not be parsed: %s", name) continue for gcno_file_path in archive.files: file_name_path = gcno_file_path.rsplit(".", 1)[0] file_name = os.path.basename(file_name_path) gcno_content = archive.files[gcno_file_path] gcno_stream = io.BytesIO(gcno_content) try: gcno_summary = gcno_parser.GCNOParser(gcno_stream).Parse() except FileFormatError: logging.error("Error parsing gcno file %s", gcno_file_path) continue src_file_path = None # Match gcno file with gcda file gcda_name = file_name + GCDA_SUFFIX if gcda_name not in gcda_dict: logging.error("No gcda file found %s.", gcda_name) continue src_file_path = self._ExtractSourceName(gcno_summary, file_name) if not src_file_path: logging.error("No source file found for %s.", gcno_file_path) continue # Process and merge gcno/gcda data gcda_content = gcda_dict[gcda_name] gcda_stream = io.BytesIO(gcda_content) try: gcda_parser.GCDAParser(gcda_stream).Parse(gcno_summary) except FileFormatError: logging.error("Error parsing gcda file %s", gcda_content) continue if self.web and self.web.enabled: coverage_vec = coverage_report.GenerateLineCoverageVector( src_file_path, gcno_summary) total_count, covered_count = coverage_report.GetCoverageStats(coverage_vec) self.web.AddCoverageReport( coverage_vec, src_file_path, git_project, git_project_path, revision, covered_count, total_count, isGlobal) def LoadArtifacts(self): """Initializes the test for coverage instrumentation. Downloads build artifacts from the build server (gcno zip file and git revision dictionary) and prepares for coverage measurement. Requires coverage feature enabled; no-op otherwise. """ if not self.enabled: return self.enabled = False # Use first device info to get product, flavor, and ID # TODO: support multi-device builds android_devices = getattr(self, keys.ConfigKeys.IKEY_ANDROID_DEVICE) if not isinstance(android_devices, list): logging.warn("android device information not available") return device_spec = android_devices[0] build_flavor = device_spec.get(keys.ConfigKeys.IKEY_BUILD_FLAVOR) device_build_id = device_spec.get(keys.ConfigKeys.IKEY_BUILD_ID) if not build_flavor or not device_build_id: logging.error("Could not read device information.") return build_flavor = str(build_flavor) if not "coverage" in build_flavor: build_flavor = "{0}_coverage".format(build_flavor) product = build_flavor.split("-", 1)[0] build_id = str(device_build_id) # Get service json path service_json_path = getattr(self, keys.ConfigKeys.IKEY_SERVICE_JSON_PATH) # Instantiate build client try: build_client = artifact_fetcher.AndroidBuildClient(service_json_path) except Exception as e: logging.exception('Failed to instantiate build client: %s', e) return # Fetch repo dictionary try: revision_dict = build_client.GetRepoDictionary(_BRANCH, build_flavor, device_build_id) setattr(self, _REVISION_DICT, revision_dict) except Exception as e: logging.exception('Failed to fetch repo dictionary: %s', e) logging.info('Coverage disabled') return # Fetch coverage zip try: cov_zip = io.BytesIO( build_client.GetCoverage(_BRANCH, build_flavor, device_build_id, product)) cov_zip = zipfile.ZipFile(cov_zip) setattr(self, _COVERAGE_ZIP, cov_zip) except Exception as e: logging.exception('Failed to fetch coverage zip: %s', e) logging.info('Coverage disabled') return if not hasattr(self, keys.ConfigKeys.IKEY_MODULES): checksum_gcno_dict = self._GetChecksumGcnoDict(cov_zip) setattr(self, _CHECKSUM_GCNO_DICT, checksum_gcno_dict) self.enabled = True def SetCoverageData(self, coverage_data=None, isGlobal=False, dut=None): """Sets and processes coverage data. Organizes coverage data and processes it into a coverage report in the current test case Requires feature to be enabled; no-op otherwise. Args: coverage_data may be either: (1) a dict where gcda name is the key and binary content is the value, or (2) a list of NativeCodeCoverageRawDataMessage objects (3) None if the data will be pulled from dut isGlobal: True if the coverage data is for the entire test, False if if the coverage data is just for the current test case. dut: (optional) the device object for which to pull coverage data """ if not self.enabled: return if not coverage_data and dut: coverage_data = self.GetGcdaDict(dut) if not coverage_data: logging.info("SetCoverageData: empty coverage data") return if isinstance(coverage_data, RepeatedCompositeFieldContainer): gcda_dict = {} for coverage_msg in coverage_data: gcda_dict[coverage_msg.file_path] = coverage_msg.gcda elif isinstance(coverage_data, dict): gcda_dict = coverage_data else: logging.error("SetCoverageData: unexpected coverage_data type: %s", str(type(coverage_data))) return logging.info("coverage file paths %s", str([fp for fp in gcda_dict])) if not hasattr(self, keys.ConfigKeys.IKEY_MODULES): # auto-process coverage data self._AutoProcess(gcda_dict, isGlobal) else: # explicitly process coverage data for the specified modules self._ManualProcess(gcda_dict, isGlobal)