# # Copyright (C) 2016 The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # import copy import logging import random import sys from google.protobuf import text_format from vts.proto import AndroidSystemControlMessage_pb2 as ASysCtrlMsg from vts.proto import ComponentSpecificationMessage_pb2 as CompSpecMsg from vts.utils.python.fuzzer import FuzzerUtils from vts.utils.python.mirror import mirror_object from vts.utils.python.mirror import py2pb from vts.utils.python.mirror import resource_mirror _DEFAULT_TARGET_BASE_PATHS = ["/system/lib64/hw"] _DEFAULT_HWBINDER_SERVICE = "default" INTERFACE = "interface" API = "api" class MirrorObjectError(Exception): """Raised when there is a general error in manipulating a mirror object.""" pass class VtsEnum(object): """Enum's host-side mirror instance.""" def __init__(self, attribute): logging.debug(attribute) for enumerator, scalar_value in zip(attribute.enum_value.enumerator, attribute.enum_value.scalar_value): setattr(self, enumerator, getattr(scalar_value, attribute.enum_value.scalar_type)) class NativeEntityMirror(mirror_object.MirrorObject): """The class that acts as the mirror to an Android device's HAL layer. This is the base class used to communicate to a particular HAL or Lib in the VTS agent on the target side. Attributes: _client: the TCP client instance. _caller_uid: string, the caller's UID if not None. _if_spec_msg: the interface specification message of a host object to mirror. _last_raw_code_coverage_data: NativeCodeCoverageRawDataMessage, last seen raw code coverage data. """ def __init__(self, client, driver_id=None, if_spec_message=None, caller_uid=None): super(NativeEntityMirror, self).__init__(client, caller_uid) self._driver_id = driver_id self._if_spec_msg = if_spec_message self._last_raw_code_coverage_data = None def GetApi(self, api_name): """Gets the ProtoBuf message for given api. Args: api_name: string, the name of the target function API. Returns: FunctionSpecificationMessage if found, None otherwise """ logging.debug("GetAPI %s for %s", api_name, self._if_spec_msg) # handle reserved methods first. if api_name == "notifySyspropsChanged": func_msg = CompSpecMsg.FunctionSpecificationMessage() func_msg.name = api_name return func_msg if isinstance(self._if_spec_msg, CompSpecMsg.ComponentSpecificationMessage): if len(self._if_spec_msg.interface.api) > 0: for api in self._if_spec_msg.interface.api: if api.name == api_name: return copy.copy(api) else: logging.error("unknown spec type %s", type(self._if_spec_msg)) sys.exit(1) return None def GetAttribute(self, attribute_name): """Gets the ProtoBuf message for given attribute. Args: attribute_name: string, the name of a target attribute (optionally excluding the namespace). Returns: VariableSpecificationMessage if found, None otherwise """ if self._if_spec_msg.attribute: for attribute in self._if_spec_msg.attribute: if (not attribute.is_const and (attribute.name == attribute_name or attribute.name.endswith("::" + attribute_name))): return attribute if (self._if_spec_msg.interface and self._if_spec_msg.interface.attribute): for attribute in self._if_spec_msg.interface.attribute: if (not attribute.is_const and (attribute.name == attribute_name or attribute.name.endswith("::" + attribute_name))): return attribute return None def GetConstType(self, type_name): """Returns the ProtoBuf message for given const type. Args: type_name: string, the name of the target const data variable. Returns: VariableSpecificationMessage if found, None otherwise """ try: if self._if_spec_msg.attribute: for attribute in self._if_spec_msg.attribute: if attribute.is_const and attribute.name == type_name: return attribute elif (attribute.type == CompSpecMsg.TYPE_ENUM and attribute.name.endswith(type_name)): return attribute if self._if_spec_msg.interface and self._if_spec_msg.interface.attribute: for attribute in self._if_spec_msg.interface.attribute: if attribute.is_const and attribute.name == type_name: return attribute elif (attribute.type == CompSpecMsg.TYPE_ENUM and attribute.name.endswith(type_name)): return attribute return None except AttributeError as e: # TODO: check in advance whether self._if_spec_msg Interface # SpecificationMessage. return None def Py2Pb(self, attribute_name, py_values): """Returns the ProtoBuf of a give Python values. Args: attribute_name: string, the name of a target attribute. py_values: Python values. Returns: Converted VariableSpecificationMessage if found, None otherwise """ attribute_spec = self.GetAttribute(attribute_name) if attribute_spec: converted_attr = py2pb.Convert(attribute_spec, py_values) if converted_attr is None: raise MirrorObjectError( "Failed to convert attribute %s", attribute_spec) return coverted_attr logging.error("Can not find attribute: %s", attribute_name) return None # TODO: Guard against calls to this function after self.CleanUp is called. def __getattr__(self, api_name, *args, **kwargs): """Calls a target component's API. Args: api_name: string, the name of an API function to call. *args: a list of arguments **kwargs: a dict for the arg name and value pairs """ def RemoteCall(*args, **kwargs): """Dynamically calls a remote API and returns the result value.""" func_msg = self.GetApi(api_name) if not func_msg: raise MirrorObjectError("api %s unknown", func_msg) logging.debug("remote call %s%s", api_name, args) if args: for arg_msg, value_msg in zip(func_msg.arg, args): logging.debug("arg msg %s", arg_msg) logging.debug("value %s", value_msg) if value_msg is not None: converted_msg = py2pb.Convert(arg_msg, value_msg) if converted_msg is None: raise MirrorObjectError("Failed to convert arg %s", value_msg) logging.debug("converted_message: %s", converted_msg) arg_msg.CopyFrom(converted_msg) else: # TODO: use kwargs for arg in func_msg.arg: # TODO: handle other if (arg.type == CompSpecMsg.TYPE_SCALAR and arg.scalar_type == "pointer"): arg.scalar_value.pointer = 0 logging.debug(func_msg) call_msg = CompSpecMsg.FunctionCallMessage() if self._if_spec_msg.component_class: call_msg.component_class = self._if_spec_msg.component_class call_msg.hal_driver_id = self._driver_id call_msg.api.CopyFrom(func_msg) logging.debug("final msg %s", call_msg) results = self._client.CallApi( text_format.MessageToString(call_msg), self._caller_uid) if (isinstance(results, tuple) and len(results) == 2 and isinstance(results[1], dict) and "coverage" in results[1]): self._last_raw_code_coverage_data = results[1]["coverage"] results = results[0] if isinstance(results, list): # Non-HIDL HAL does not return list. # Translate TYPE_HIDL_INTERFACE to halMirror. for i, _ in enumerate(results): result = results[i] if (not result or not isinstance( result, CompSpecMsg.VariableSpecificationMessage)): # no need to process the return values. continue if result.type == CompSpecMsg.TYPE_HIDL_INTERFACE: if result.hidl_interface_id <= -1: results[i] = None driver_id = result.hidl_interface_id nested_interface_name = \ result.predefined_type.split("::")[-1] logging.debug("Nested interface name: %s", nested_interface_name) nested_interface = self.GetHalMirrorForInterface( nested_interface_name, driver_id) results[i] = nested_interface elif (result.type == CompSpecMsg.TYPE_FMQ_SYNC or result.type == CompSpecMsg.TYPE_FMQ_UNSYNC): if (result.fmq_value[0].fmq_id == -1): logging.error("Invalid new queue_id.") results[i] = None else: # Retrieve type of data in this FMQ. data_type = None # For scalar, read scalar_type field. if result.fmq_value[0].type == \ CompSpecMsg.TYPE_SCALAR: data_type = result.fmq_value[0].scalar_type # For enum, struct, and union, read predefined_type # field. elif (result.fmq_value[0].type == CompSpecMsg.TYPE_ENUM or result.fmq_value[0].type == CompSpecMsg.TYPE_STRUCT or result.fmq_value[0].type == CompSpecMsg.TYPE_UNION): data_type = result.fmq_value[0].predefined_type # Encounter an unknown type in FMQ. if data_type == None: logging.error( "Unknown type %d in the new FMQ.", result.fmq_value[0].type) results[i] = None continue sync = result.type == CompSpecMsg.TYPE_FMQ_SYNC fmq_mirror = resource_mirror.ResourceFmqMirror( data_type, sync, self._client, result.fmq_value[0].fmq_id) results[i] = fmq_mirror elif result.type == CompSpecMsg.TYPE_HIDL_MEMORY: if result.hidl_memory_value.mem_id == -1: logging.error("Invalid new mem_id.") results[i] = None else: mem_mirror = resource_mirror.ResourceHidlMemoryMirror( self._client, result.hidl_memory_value.mem_id) results[i] = mem_mirror elif result.type == CompSpecMsg.TYPE_HANDLE: if result.handle_value.handle_id == -1: logging.error("Invalid new handle_id.") results[i] = None else: handle_mirror = resource_mirror.ResourceHidlHandleMirror( self._client, result.handle_value.handle_id) results[i] = handle_mirror if len(results) == 1: # single return result, return the value directly. return results[0] return results def MessageGenerator(*args, **kwargs): """Dynamically generates a custom message instance.""" if args: return self.Py2Pb(api_name, args) else: #TODO: handle kwargs return None #TODO: deprecate this or move it to a separate class. def MessageFuzzer(arg_msg): """Fuzz a custom message instance.""" if not self.GetAttribute(api_name): raise MirrorObjectError("fuzz arg %s unknown" % arg_msg) if arg_msg.type == CompSpecMsg.TYPE_STRUCT: index = random.randint(0, len(arg_msg.struct_value)) count = 0 for struct_value in arg_msg.struct_value: if count == index: if struct_value.scalar_type == "uint32_t": struct_value.scalar_value.uint32_t ^= FuzzerUtils.mask_uint32_t( ) elif struct_value.scalar_type == "int32_t": mask = FuzzerUtils.mask_int32_t() if mask == (1 << 31): struct_value.scalar_value.int32_t *= -1 struct_value.scalar_value.int32_t += 1 else: struct_value.scalar_value.int32_t ^= mask else: raise MirrorObjectError( "support %s" % struct_value.scalar_type) break count += 1 logging.debug("fuzzed %s", arg_msg) else: raise MirrorObjectError( "unsupported fuzz message type %s." % arg_msg.type) return arg_msg def ConstGenerator(): """Dynamically generates a const variable's value.""" arg_msg = self.GetConstType(api_name) if not arg_msg: raise MirrorObjectError("const %s unknown" % arg_msg) logging.debug("check %s", api_name) if arg_msg.type == CompSpecMsg.TYPE_SCALAR: ret_v = getattr(arg_msg.scalar_value, arg_msg.scalar_type, None) if ret_v is None: raise MirrorObjectError("No value found for type %s in %s." % (arg_msg.scalar_type, api_name)) return ret_v elif arg_msg.type == CompSpecMsg.TYPE_STRING: return arg_msg.string_value.message elif arg_msg.type == CompSpecMsg.TYPE_ENUM: return VtsEnum(arg_msg) raise MirrorObjectError("const %s not found" % api_name) # handle APIs. func_msg = self.GetApi(api_name) if func_msg: logging.debug("api %s", func_msg) return RemoteCall # handle attributes. arg_msg = self.GetConstType(api_name) if arg_msg: logging.debug("const %s *\n%s", api_name, arg_msg) return ConstGenerator() fuzz = False if api_name.endswith("_fuzz"): fuzz = True api_name = api_name[:-5] arg_msg = self.GetAttribute(api_name) if arg_msg: logging.debug("arg %s", arg_msg) if not fuzz: return MessageGenerator else: return MessageFuzzer raise MirrorObjectError("unknown api name %s" % api_name) def GetRawCodeCoverage(self): """Returns any measured raw code coverage data.""" return self._last_raw_code_coverage_data def __str__(self): """Prints all the attributes and methods.""" result = "" if self._if_spec_msg: if self._if_spec_msg.attribute: for attribute in self._if_spec_msg.attribute: result += "attribute %s\n" % attribute.name if self._if_spec_msg.interface: for attribute in self._if_spec_msg.interface.attribute: result += "interface attribute %s\n" % attribute.name for api in self._if_spec_msg.interface.api: result += "api %s\n" % api.name return result