普通文本  |  407行  |  17.42 KB

#
# Copyright (C) 2016 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import copy
import logging
import random
import sys

from google.protobuf import text_format

from vts.proto import AndroidSystemControlMessage_pb2 as ASysCtrlMsg
from vts.proto import ComponentSpecificationMessage_pb2 as CompSpecMsg
from vts.utils.python.fuzzer import FuzzerUtils
from vts.utils.python.mirror import mirror_object
from vts.utils.python.mirror import py2pb
from vts.utils.python.mirror import resource_mirror

_DEFAULT_TARGET_BASE_PATHS = ["/system/lib64/hw"]
_DEFAULT_HWBINDER_SERVICE = "default"

INTERFACE = "interface"
API = "api"


class MirrorObjectError(Exception):
    """Raised when there is a general error in manipulating a mirror object."""
    pass


class VtsEnum(object):
    """Enum's host-side mirror instance."""

    def __init__(self, attribute):
        logging.debug(attribute)
        for enumerator, scalar_value in zip(attribute.enum_value.enumerator,
                                            attribute.enum_value.scalar_value):
            setattr(self, enumerator,
                    getattr(scalar_value, attribute.enum_value.scalar_type))


class NativeEntityMirror(mirror_object.MirrorObject):
    """The class that acts as the mirror to an Android device's HAL layer.

    This is the base class used to communicate to a particular HAL or Lib in
    the VTS agent on the target side.

    Attributes:
        _client: the TCP client instance.
        _caller_uid: string, the caller's UID if not None.
        _if_spec_msg: the interface specification message of a host object to
                      mirror.
        _last_raw_code_coverage_data: NativeCodeCoverageRawDataMessage,
                                      last seen raw code coverage data.
    """

    def __init__(self,
                 client,
                 driver_id=None,
                 if_spec_message=None,
                 caller_uid=None):
        super(NativeEntityMirror, self).__init__(client, caller_uid)
        self._driver_id = driver_id
        self._if_spec_msg = if_spec_message
        self._last_raw_code_coverage_data = None

    def GetApi(self, api_name):
        """Gets the ProtoBuf message for given api.

        Args:
            api_name: string, the name of the target function API.

        Returns:
            FunctionSpecificationMessage if found, None otherwise
        """
        logging.debug("GetAPI %s for %s", api_name, self._if_spec_msg)
        # handle reserved methods first.
        if api_name == "notifySyspropsChanged":
            func_msg = CompSpecMsg.FunctionSpecificationMessage()
            func_msg.name = api_name
            return func_msg
        if isinstance(self._if_spec_msg,
                      CompSpecMsg.ComponentSpecificationMessage):
            if len(self._if_spec_msg.interface.api) > 0:
                for api in self._if_spec_msg.interface.api:
                    if api.name == api_name:
                        return copy.copy(api)
        else:
            logging.error("unknown spec type %s", type(self._if_spec_msg))
            sys.exit(1)
        return None

    def GetAttribute(self, attribute_name):
        """Gets the ProtoBuf message for given attribute.

        Args:
            attribute_name: string, the name of a target attribute
                            (optionally excluding the namespace).

        Returns:
            VariableSpecificationMessage if found, None otherwise
        """
        if self._if_spec_msg.attribute:
            for attribute in self._if_spec_msg.attribute:
                if (not attribute.is_const and
                    (attribute.name == attribute_name
                     or attribute.name.endswith("::" + attribute_name))):
                    return attribute
        if (self._if_spec_msg.interface
                and self._if_spec_msg.interface.attribute):
            for attribute in self._if_spec_msg.interface.attribute:
                if (not attribute.is_const and
                    (attribute.name == attribute_name
                     or attribute.name.endswith("::" + attribute_name))):
                    return attribute
        return None

    def GetConstType(self, type_name):
        """Returns the ProtoBuf message for given const type.

        Args:
            type_name: string, the name of the target const data variable.

        Returns:
            VariableSpecificationMessage if found, None otherwise
        """
        try:
            if self._if_spec_msg.attribute:
                for attribute in self._if_spec_msg.attribute:
                    if attribute.is_const and attribute.name == type_name:
                        return attribute
                    elif (attribute.type == CompSpecMsg.TYPE_ENUM
                          and attribute.name.endswith(type_name)):
                        return attribute
            if self._if_spec_msg.interface and self._if_spec_msg.interface.attribute:
                for attribute in self._if_spec_msg.interface.attribute:
                    if attribute.is_const and attribute.name == type_name:
                        return attribute
                    elif (attribute.type == CompSpecMsg.TYPE_ENUM
                          and attribute.name.endswith(type_name)):
                        return attribute
            return None
        except AttributeError as e:
            # TODO: check in advance whether self._if_spec_msg Interface
            # SpecificationMessage.
            return None

    def Py2Pb(self, attribute_name, py_values):
        """Returns the ProtoBuf of a give Python values.

        Args:
            attribute_name: string, the name of a target attribute.
            py_values: Python values.

        Returns:
            Converted VariableSpecificationMessage if found, None otherwise
        """
        attribute_spec = self.GetAttribute(attribute_name)
        if attribute_spec:
            converted_attr = py2pb.Convert(attribute_spec, py_values)
            if converted_attr is None:
              raise MirrorObjectError(
                  "Failed to convert attribute %s", attribute_spec)
            return coverted_attr
        logging.error("Can not find attribute: %s", attribute_name)
        return None

    # TODO: Guard against calls to this function after self.CleanUp is called.
    def __getattr__(self, api_name, *args, **kwargs):
        """Calls a target component's API.

        Args:
            api_name: string, the name of an API function to call.
            *args: a list of arguments
            **kwargs: a dict for the arg name and value pairs
        """

        def RemoteCall(*args, **kwargs):
            """Dynamically calls a remote API and returns the result value."""
            func_msg = self.GetApi(api_name)
            if not func_msg:
                raise MirrorObjectError("api %s unknown", func_msg)

            logging.debug("remote call %s%s", api_name, args)
            if args:
                for arg_msg, value_msg in zip(func_msg.arg, args):
                    logging.debug("arg msg %s", arg_msg)
                    logging.debug("value %s", value_msg)
                    if value_msg is not None:
                        converted_msg = py2pb.Convert(arg_msg, value_msg)
                        if converted_msg is None:
                          raise MirrorObjectError("Failed to convert arg %s", value_msg)
                        logging.debug("converted_message: %s", converted_msg)
                        arg_msg.CopyFrom(converted_msg)
            else:
                # TODO: use kwargs
                for arg in func_msg.arg:
                    # TODO: handle other
                    if (arg.type == CompSpecMsg.TYPE_SCALAR
                            and arg.scalar_type == "pointer"):
                        arg.scalar_value.pointer = 0
                logging.debug(func_msg)

            call_msg = CompSpecMsg.FunctionCallMessage()
            if self._if_spec_msg.component_class:
                call_msg.component_class = self._if_spec_msg.component_class
            call_msg.hal_driver_id = self._driver_id
            call_msg.api.CopyFrom(func_msg)
            logging.debug("final msg %s", call_msg)
            results = self._client.CallApi(
                text_format.MessageToString(call_msg), self._caller_uid)
            if (isinstance(results, tuple) and len(results) == 2
                    and isinstance(results[1], dict)
                    and "coverage" in results[1]):
                self._last_raw_code_coverage_data = results[1]["coverage"]
                results = results[0]

            if isinstance(results, list):  # Non-HIDL HAL does not return list.
                # Translate TYPE_HIDL_INTERFACE to halMirror.
                for i, _ in enumerate(results):
                    result = results[i]
                    if (not result or not isinstance(
                            result, CompSpecMsg.VariableSpecificationMessage)):
                        # no need to process the return values.
                        continue

                    if result.type == CompSpecMsg.TYPE_HIDL_INTERFACE:
                        if result.hidl_interface_id <= -1:
                            results[i] = None
                        driver_id = result.hidl_interface_id
                        nested_interface_name = \
                            result.predefined_type.split("::")[-1]
                        logging.debug("Nested interface name: %s",
                                      nested_interface_name)
                        nested_interface = self.GetHalMirrorForInterface(
                            nested_interface_name, driver_id)
                        results[i] = nested_interface
                    elif (result.type == CompSpecMsg.TYPE_FMQ_SYNC
                          or result.type == CompSpecMsg.TYPE_FMQ_UNSYNC):
                        if (result.fmq_value[0].fmq_id == -1):
                            logging.error("Invalid new queue_id.")
                            results[i] = None
                        else:
                            # Retrieve type of data in this FMQ.
                            data_type = None
                            # For scalar, read scalar_type field.
                            if result.fmq_value[0].type == \
                                    CompSpecMsg.TYPE_SCALAR:
                                data_type = result.fmq_value[0].scalar_type
                            # For enum, struct, and union, read predefined_type
                            # field.
                            elif (result.fmq_value[0].type ==
                                     CompSpecMsg.TYPE_ENUM or
                                  result.fmq_value[0].type ==
                                     CompSpecMsg.TYPE_STRUCT or
                                  result.fmq_value[0].type ==
                                     CompSpecMsg.TYPE_UNION):
                                data_type = result.fmq_value[0].predefined_type

                            # Encounter an unknown type in FMQ.
                            if data_type == None:
                                logging.error(
                                    "Unknown type %d in the new FMQ.",
                                    result.fmq_value[0].type)
                                results[i] = None
                                continue
                            sync = result.type == CompSpecMsg.TYPE_FMQ_SYNC
                            fmq_mirror = resource_mirror.ResourceFmqMirror(
                                data_type, sync, self._client,
                                result.fmq_value[0].fmq_id)
                            results[i] = fmq_mirror
                    elif result.type == CompSpecMsg.TYPE_HIDL_MEMORY:
                        if result.hidl_memory_value.mem_id == -1:
                            logging.error("Invalid new mem_id.")
                            results[i] = None
                        else:
                            mem_mirror = resource_mirror.ResourceHidlMemoryMirror(
                                self._client, result.hidl_memory_value.mem_id)
                            results[i] = mem_mirror
                    elif result.type == CompSpecMsg.TYPE_HANDLE:
                        if result.handle_value.handle_id == -1:
                            logging.error("Invalid new handle_id.")
                            results[i] = None
                        else:
                            handle_mirror = resource_mirror.ResourceHidlHandleMirror(
                                self._client, result.handle_value.handle_id)
                            results[i] = handle_mirror
                if len(results) == 1:
                    # single return result, return the value directly.
                    return results[0]
            return results

        def MessageGenerator(*args, **kwargs):
            """Dynamically generates a custom message instance."""
            if args:
                return self.Py2Pb(api_name, args)
            else:
                #TODO: handle kwargs
                return None

        #TODO: deprecate this or move it to a separate class.
        def MessageFuzzer(arg_msg):
            """Fuzz a custom message instance."""
            if not self.GetAttribute(api_name):
                raise MirrorObjectError("fuzz arg %s unknown" % arg_msg)

            if arg_msg.type == CompSpecMsg.TYPE_STRUCT:
                index = random.randint(0, len(arg_msg.struct_value))
                count = 0
                for struct_value in arg_msg.struct_value:
                    if count == index:
                        if struct_value.scalar_type == "uint32_t":
                            struct_value.scalar_value.uint32_t ^= FuzzerUtils.mask_uint32_t(
                            )
                        elif struct_value.scalar_type == "int32_t":
                            mask = FuzzerUtils.mask_int32_t()
                            if mask == (1 << 31):
                                struct_value.scalar_value.int32_t *= -1
                                struct_value.scalar_value.int32_t += 1
                            else:
                                struct_value.scalar_value.int32_t ^= mask
                        else:
                            raise MirrorObjectError(
                                "support %s" % struct_value.scalar_type)
                        break
                    count += 1
                logging.debug("fuzzed %s", arg_msg)
            else:
                raise MirrorObjectError(
                    "unsupported fuzz message type %s." % arg_msg.type)
            return arg_msg

        def ConstGenerator():
            """Dynamically generates a const variable's value."""
            arg_msg = self.GetConstType(api_name)
            if not arg_msg:
                raise MirrorObjectError("const %s unknown" % arg_msg)
            logging.debug("check %s", api_name)
            if arg_msg.type == CompSpecMsg.TYPE_SCALAR:
                ret_v = getattr(arg_msg.scalar_value, arg_msg.scalar_type,
                                None)
                if ret_v is None:
                    raise MirrorObjectError("No value found for type %s in %s."
                                            % (arg_msg.scalar_type, api_name))
                return ret_v
            elif arg_msg.type == CompSpecMsg.TYPE_STRING:
                return arg_msg.string_value.message
            elif arg_msg.type == CompSpecMsg.TYPE_ENUM:
                return VtsEnum(arg_msg)
            raise MirrorObjectError("const %s not found" % api_name)

        # handle APIs.
        func_msg = self.GetApi(api_name)
        if func_msg:
            logging.debug("api %s", func_msg)
            return RemoteCall

        # handle attributes.
        arg_msg = self.GetConstType(api_name)
        if arg_msg:
            logging.debug("const %s *\n%s", api_name, arg_msg)
            return ConstGenerator()

        fuzz = False
        if api_name.endswith("_fuzz"):
            fuzz = True
            api_name = api_name[:-5]
        arg_msg = self.GetAttribute(api_name)
        if arg_msg:
            logging.debug("arg %s", arg_msg)
            if not fuzz:
                return MessageGenerator
            else:
                return MessageFuzzer

        raise MirrorObjectError("unknown api name %s" % api_name)

    def GetRawCodeCoverage(self):
        """Returns any measured raw code coverage data."""
        return self._last_raw_code_coverage_data

    def __str__(self):
        """Prints all the attributes and methods."""
        result = ""
        if self._if_spec_msg:
            if self._if_spec_msg.attribute:
                for attribute in self._if_spec_msg.attribute:
                    result += "attribute %s\n" % attribute.name
            if self._if_spec_msg.interface:
                for attribute in self._if_spec_msg.interface.attribute:
                    result += "interface attribute %s\n" % attribute.name
                for api in self._if_spec_msg.interface.api:
                    result += "api %s\n" % api.name
        return result