#
# Copyright (C) 2016 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import copy
import logging
import random
import sys
from vts.utils.python.fuzzer import FuzzerUtils
from vts.utils.python.mirror import mirror_object_for_types
from vts.proto import ComponentSpecificationMessage_pb2 as CompSpecMsg
from google.protobuf import text_format
# a dict containing the IDs of the registered function pointers.
_function_pointer_id_dict = {}
INTERFACE = "interface"
API = "api"
class MirrorObjectError(Exception):
"""Raised when there is a general error in manipulating a mirror object."""
pass
class MirrorObject(object):
"""The class that mirrors objects on the native side.
This class exists on the host and can be used to communicate to a
particular HAL in the HAL agent on the target side.
Attributes:
_client: the TCP client instance.
_if_spec_msg: the interface specification message of a host object to
mirror.
_callback_server: the instance of a callback server.
_parent_path: the name of a sub struct this object mirrors.
_last_raw_code_coverage_data: NativeCodeCoverageRawDataMessage,
last seen raw code coverage data.
__caller_uid: string, the caller's UID if not None.
"""
def __init__(self, client, msg, callback_server, parent_path=None):
self._client = client
self._if_spec_msg = msg
self._callback_server = callback_server
self._parent_path = parent_path
self._last_raw_code_coverage_data = None
self.__caller_uid = None
def GetFunctionPointerID(self, function_pointer):
"""Returns the function pointer ID for the given one."""
max_num = 0
for key in _function_pointer_id_dict:
if _function_pointer_id_dict[key] == function_pointer:
return str(key)
if not max_num or key > max_num:
max_num = key
_function_pointer_id_dict[max_num + 1] = function_pointer
id = str(max_num + 1)
if self._callback_server:
self._callback_server.RegisterCallback(id, function_pointer)
return id
def OpenConventionalHal(self, module_name=None):
"""Opens the target conventional HAL component.
This is only needed for conventional HAL.
Args:
module_name: string, the name of a module to load.
"""
func_msg = CompSpecMsg.FunctionSpecificationMessage()
func_msg.name = "#Open"
logging.debug("remote call %s", func_msg.name)
if module_name:
arg = func_msg.arg.add()
arg.type = CompSpecMsg.TYPE_STRING
arg.string_value.message = module_name
func_msg.return_type.type == CompSpecMsg.TYPE_SCALAR
func_msg.return_type.scalar_type = "int32_t"
logging.debug("final msg %s", func_msg)
result = self._client.CallApi(text_format.MessageToString(func_msg),
self.__caller_uid)
logging.debug(result)
return result
def SetCallerUid(self, uid):
"""Sets target-side caller's UID.
Args:
uid: string, the caller's UID.
"""
self.__caller_uid = uid
def GetAttributeValue(self, attribute_name):
"""Retrieves the value of an attribute from a target.
Args:
attribute_name: string, the name of an attribute.
Returns:
FunctionSpecificationMessage which contains the value.
"""
def RemoteCallToGetAttribute(*args, **kwargs):
"""Makes a remote call and retrieves an attribute."""
func_msg = self.GetAttribute(attribute_name)
if not func_msg:
raise MirrorObjectError("attribute %s unknown", func_msg)
logging.debug("remote call %s.%s", self._parent_path, attribute_name)
logging.info("remote call %s%s", attribute_name, args)
if self._parent_path:
func_msg.parent_path = self._parent_path
try:
if isinstance(self._if_spec_msg,
CompSpecMsg.ComponentSpecificationMessage):
logging.info("component_class %s",
self._if_spec_msg.component_class)
if (self._if_spec_msg.component_class
== CompSpecMsg.HAL_CONVENTIONAL_SUBMODULE):
submodule_name = self._if_spec_msg.original_data_structure_name
if submodule_name.endswith("*"):
submodule_name = submodule_name[:-1]
func_msg.submodule_name = submodule_name
elif isinstance(self._if_spec_msg,
CompSpecMsg.StructSpecificationMessage):
pass
else:
logging.error("unknown type %s", type(self._if_spec_msg))
sys.exit(1)
except AttributeError as e:
logging.exception("%s" % e)
pass
result = self._client.GetAttribute(
text_format.MessageToString(func_msg))
logging.debug(result)
return result
var_msg = self.GetAttribute(attribute_name)
if var_msg:
logging.debug("attribute: %s", var_msg)
return RemoteCallToGetAttribute()
raise MirrorObjectError("unknown attribute name %s" % attribute_name)
def GetHidlCallbackInterface(self, interface_name, **kwargs):
var_msg = CompSpecMsg.VariableSpecificationMessage()
var_msg.name = interface_name
var_msg.type = CompSpecMsg.TYPE_FUNCTION_POINTER
var_msg.is_callback = True
msg = self._if_spec_msg
specification = self._client.ReadSpecification(
interface_name,
msg.component_class,
msg.component_type,
msg.component_type_version,
msg.package)
logging.info("specification: %s", specification)
interface = getattr(specification, INTERFACE, None)
apis = getattr(interface, API, [])
for api in apis:
function_pointer = None
if api.name in kwargs:
function_pointer = kwargs[api.name]
else:
def dummy(*args):
"""Dummy implementation for any callback function."""
logging.info("Entering dummy implementation"
" for callback function: %s", api.name)
for arg_index in range(len(args)):
logging.info("arg%s: %s", arg_index, args[arg_index])
function_pointer = dummy
func_pt_msg = var_msg.function_pointer.add()
func_pt_msg.function_name = api.name
func_pt_msg.id = self.GetFunctionPointerID(function_pointer)
return var_msg
def GetHidlTypeInterface(self, interface_name,
target_class=None, target_type=None,
version=None, package=None):
"""Gets HIDL type interface's host-side mirror.
Args:
interface_name: string, the name of a target interface to read.
target_class: integer, optional used to override the loaded HAL's
component_class.
target_type: integer, optional used to override the loaded HAL's
component_type.
version: integer, optional used to override the loaded HAL's
component_type_version.
package: integer, optional used to override the loaded HAL's
package.
Returns:
a host-side mirror of a HIDL interface
"""
msg = self._if_spec_msg
result = self._client.ReadSpecification(
interface_name,
msg.component_class if target_class is None else target_class,
msg.component_type if target_type is None else target_type,
msg.component_type_version if version is None else version,
msg.package if package is None else package,
recursive=True)
logging.info("result %s", result)
return mirror_object_for_types.MirrorObjectForTypes(result)
def CleanUp(self):
self._client.Disconnect()
def GetApi(self, api_name):
"""Returns the Function Specification Message.
Args:
api_name: string, the name of the target function API.
Returns:
FunctionSpecificationMessage or StructSpecificationMessage if found,
None otherwise
"""
logging.debug("GetAPI %s for %s", api_name, self._if_spec_msg)
# handle reserved methods first.
if api_name == "notifySyspropsChanged":
func_msg = CompSpecMsg.FunctionSpecificationMessage()
func_msg.name = api_name
return func_msg
if isinstance(self._if_spec_msg, CompSpecMsg.ComponentSpecificationMessage):
if len(self._if_spec_msg.interface.api) > 0:
for api in self._if_spec_msg.interface.api:
if api.name == api_name:
return copy.copy(api)
elif isinstance(self._if_spec_msg, CompSpecMsg.StructSpecificationMessage):
if len(self._if_spec_msg.api) > 0:
for api in self._if_spec_msg.api:
logging.info("api %s", api)
if api.name == api_name:
return copy.copy(api)
if len(self._if_spec_msg.sub_struct) > 0:
for sub_struct in self._if_spec_msg.sub_struct:
if len(sub_struct.api) > 0:
for api in sub_struct.api:
logging.info("api %s", api)
if api.name == api_name:
return copy.copy(api)
else:
logging.error("unknown spec type %s", type(self._if_spec_msg))
sys.exit(1)
return None
def GetAttribute(self, attribute_name):
"""Returns the Message.
"""
logging.debug("GetAttribute %s for %s",
attribute_name, self._if_spec_msg)
if self._if_spec_msg.attribute:
for attribute in self._if_spec_msg.attribute:
if attribute.name == attribute_name:
func_msg = CompSpecMsg.FunctionSpecificationMessage()
func_msg.name = attribute.name
func_msg.return_type.type = attribute.type
if func_msg.return_type.type == CompSpecMsg.TYPE_SCALAR:
func_msg.return_type.scalar_type = attribute.scalar_type
else:
func_msg.return_type.predefined_type = attribute.predefined_type
logging.info("GetAttribute attribute: %s", attribute)
logging.info("GetAttribute request: %s", func_msg)
return copy.copy(func_msg)
if (self._if_spec_msg.interface and
self._if_spec_msg.interface.attribute):
for attribute in self._if_spec_msg.interface.attribute:
if attribute.name == attribute_name:
func_msg = CompSpecMsg.FunctionSpecificationMessage()
func_msg.name = attribute.name
func_msg.return_type.type = attribute.type
if func_msg.return_type.type == CompSpecMsg.TYPE_SCALAR:
func_msg.return_type.scalar_type = attribute.scalar_type
else:
func_msg.return_type.predefined_type = attribute.predefined_type
logging.info("GetAttribute attribute: %s", attribute)
logging.info("GetAttribute request: %s", func_msg)
return copy.copy(func_msg)
return None
def GetSubStruct(self, sub_struct_name):
"""Returns the Struct Specification Message.
Args:
sub_struct_name: string, the name of the target sub struct attribute.
Returns:
StructSpecificationMessage if found, None otherwise
"""
if isinstance(self._if_spec_msg, CompSpecMsg.ComponentSpecificationMessage):
if (self._if_spec_msg.interface and
self._if_spec_msg.interface.sub_struct):
for sub_struct in self._if_spec_msg.interface.sub_struct:
if sub_struct.name == sub_struct_name:
return copy.copy(sub_struct)
elif isinstance(self._if_spec_msg, CompSpecMsg.StructSpecificationMessage):
if len(self._if_spec_msg.sub_struct) > 0:
for sub_struct in self._if_spec_msg.sub_struct:
if sub_struct.name == sub_struct_name:
return copy.copy(sub_struct)
return None
def GetCustomAggregateType(self, type_name):
"""Returns the Argument Specification Message.
Args:
type_name: string, the name of the target data type.
Returns:
VariableSpecificationMessage if found, None otherwise
"""
try:
if self._if_spec_msg.attribute:
for attribute in self._if_spec_msg.attribute:
if not attribute.is_const and attribute.name == type_name:
return copy.copy(attribute)
if (self._if_spec_msg.interface and
self._if_spec_msg.interface.attribute):
for attribute in self._if_spec_msg.interface.attribute:
if not attribute.is_const and attribute.name == type_name:
return copy.copy(attribute)
return None
except AttributeError as e:
# TODO: check in advance whether self._if_spec_msg Interface
# SpecificationMessage.
return None
def GetConstType(self, type_name):
"""Returns the Argument Specification Message.
Args:
type_name: string, the name of the target const data variable.
Returns:
VariableSpecificationMessage if found, None otherwise
"""
try:
if self._if_spec_msg.attribute:
for attribute in self._if_spec_msg.attribute:
if attribute.is_const and attribute.name == type_name:
return copy.copy(attribute)
elif attribute.type == CompSpecMsg.TYPE_ENUM:
for enumerator in attribute.enum_value.enumerator:
if enumerator == type_name:
return copy.copy(attribute)
if self._if_spec_msg.interface and self._if_spec_msg.interface.attribute:
for attribute in self._if_spec_msg.interface.attribute:
if attribute.is_const and attribute.name == type_name:
return copy.copy(attribute)
elif attribute.type == CompSpecMsg.TYPE_ENUM:
for enumerator in attribute.enum_value.enumerator:
if enumerator == type_name:
return copy.copy(attribute)
return None
except AttributeError as e:
# TODO: check in advance whether self._if_spec_msg Interface
# SpecificationMessage.
return None
def ArgToPb(self, arg_msg, value_msg):
"""Converts args to a ProtoBuf message.
Args:
arg_msg: the ProtoBuf message where the result will be stored in.
value_msg: value given as an argument. it can be either Python
data structure, a ProtoBuf message, or both.
Raises:
AttributeError: when unexpected type is seen.
"""
if isinstance(value_msg, CompSpecMsg.VariableSpecificationMessage):
arg_msg.CopyFrom(value_msg)
elif isinstance(value_msg, int):
arg_msg.type = CompSpecMsg.TYPE_SCALAR
if not arg_msg.scalar_type:
arg_msg.scalar_type = "int32_t"
setattr(arg_msg.scalar_value, arg_msg.scalar_type, value_msg)
elif isinstance(value_msg, float):
arg_msg.type = CompSpecMsg.TYPE_SCALAR
arg_msg.scalar_value.float_t = value_msg
arg_msg.scalar_type = "float_t"
elif isinstance(value_msg, str):
if ((arg_msg.type == CompSpecMsg.TYPE_SCALAR and
(arg_msg.scalar_type == "char_pointer" or
arg_msg.scalar_type == "uchar_pointer")) or
arg_msg.type == CompSpecMsg.TYPE_STRING):
arg_msg.string_value.message = value_msg
arg_msg.string_value.length = len(value_msg)
else:
raise MirrorObjectError(
"unsupported type %s for str" % arg_msg)
elif isinstance(value_msg, list):
if (arg_msg.type == CompSpecMsg.TYPE_VECTOR or
arg_msg.type == CompSpecMsg.TYPE_ARRAY):
first = True
for list_element in value_msg:
if first:
self.ArgToPb(arg_msg.vector_value[0], list_element)
first = False
else:
self.ArgToPb(arg_msg.vector_value.add(), list_element)
arg_msg.vector_size = len(value_msg)
else:
raise MirrorObjectError(
"unsupported arg_msg type %s for list" % arg_msg.type)
else:
raise MirrorObjectError(
"unsupported value type %s" % type(value_msg))
# TODO: Guard against calls to this function after self.CleanUp is called.
def __getattr__(self, api_name, *args, **kwargs):
"""Calls a target component's API.
Args:
api_name: string, the name of an API function to call.
*args: a list of arguments
**kwargs: a dict for the arg name and value pairs
"""
def RemoteCall(*args, **kwargs):
"""Dynamically calls a remote API."""
func_msg = self.GetApi(api_name)
if not func_msg:
raise MirrorObjectError("api %s unknown", func_msg)
logging.debug("remote call %s.%s", self._parent_path, api_name)
logging.info("remote call %s%s", api_name, args)
if args:
for arg_msg, value_msg in zip(func_msg.arg, args):
logging.debug("arg msg %s", arg_msg)
logging.debug("value %s", value_msg)
if value_msg is not None:
self.ArgToPb(arg_msg, value_msg)
logging.debug("final msg %s", func_msg)
else:
# TODO: use kwargs
for arg in func_msg.arg:
# TODO: handle other
if (arg.type == CompSpecMsg.TYPE_SCALAR
and arg.scalar_type == "pointer"):
arg.scalar_value.pointer = 0
logging.debug(func_msg)
if self._parent_path:
func_msg.parent_path = self._parent_path
if isinstance(self._if_spec_msg, CompSpecMsg.ComponentSpecificationMessage):
if self._if_spec_msg.component_class:
logging.info("component_class %s",
self._if_spec_msg.component_class)
if self._if_spec_msg.component_class == CompSpecMsg.HAL_CONVENTIONAL_SUBMODULE:
submodule_name = self._if_spec_msg.original_data_structure_name
if submodule_name.endswith("*"):
submodule_name = submodule_name[:-1]
func_msg.submodule_name = submodule_name
result = self._client.CallApi(text_format.MessageToString(func_msg),
self.__caller_uid)
logging.debug(result)
if (isinstance(result, tuple) and len(result) == 2 and
isinstance(result[1], dict) and "coverage" in result[1]):
self._last_raw_code_coverage_data = result[1]["coverage"]
return result[0]
return result
def MessageGenerator(*args, **kwargs):
"""Dynamically generates a custom message instance."""
arg_msg = self.GetCustomAggregateType(api_name)
if not arg_msg:
raise MirrorObjectError("arg %s unknown" % arg_msg)
logging.info("MessageGenerator %s %s", api_name, arg_msg)
logging.debug("MESSAGE %s", api_name)
if arg_msg.type == CompSpecMsg.TYPE_STRUCT:
for struct_value in arg_msg.struct_value:
logging.debug("for %s %s",
struct_value.name, struct_value.scalar_type)
for given_name, given_value in kwargs.iteritems():
logging.debug("check %s %s", struct_value.name, given_name)
if given_name == struct_value.name:
logging.debug("match type=%s", struct_value.scalar_type)
if struct_value.type == CompSpecMsg.TYPE_SCALAR:
if struct_value.scalar_type == "uint32_t":
struct_value.scalar_value.uint32_t = given_value
elif struct_value.scalar_type == "int32_t":
struct_value.scalar_value.int32_t = given_value
else:
raise MirrorObjectError(
"support %s" % struct_value.scalar_type)
continue
elif arg_msg.type == CompSpecMsg.TYPE_FUNCTION_POINTER:
for fp_value in arg_msg.function_pointer:
logging.debug("for %s", fp_value.function_name)
for given_name, given_value in kwargs.iteritems():
logging.debug("check %s %s", fp_value.function_name, given_name)
if given_name == fp_value.function_name:
fp_value.id = self.GetFunctionPointerID(given_value)
break
if arg_msg.type == CompSpecMsg.TYPE_STRUCT:
for struct_value, given_value in zip(arg_msg.struct_value, args):
logging.debug("arg match type=%s", struct_value.scalar_type)
if struct_value.type == CompSpecMsg.TYPE_SCALAR:
if struct_value.scalar_type == "uint32_t":
struct_value.scalar_value.uint32_t = given_value
elif struct_value.scalar_type == "int32_t":
struct_value.scalar_value.int32_t = given_value
else:
raise MirrorObjectError("support %s" % p_type)
elif arg_msg.type == CompSpecMsg.TYPE_FUNCTION_POINTER:
for fp_value, given_value in zip(arg_msg.function_pointer, args):
logging.debug("for %s", fp_value.function_name)
fp_value.id = self.GetFunctionPointerID(given_value)
logging.debug("fp %s", fp_value)
logging.debug("generated %s", arg_msg)
return arg_msg
def MessageFuzzer(arg_msg):
"""Fuzz a custom message instance."""
if not self.GetCustomAggregateType(api_name):
raise MirrorObjectError("fuzz arg %s unknown" % arg_msg)
if arg_msg.type == CompSpecMsg.TYPE_STRUCT:
index = random.randint(0, len(arg_msg.struct_value))
count = 0
for struct_value in arg_msg.struct_value:
if count == index:
if struct_value.scalar_type == "uint32_t":
struct_value.scalar_value.uint32_t ^= FuzzerUtils.mask_uint32_t()
elif struct_value.scalar_type == "int32_t":
mask = FuzzerUtils.mask_int32_t()
if mask == (1 << 31):
struct_value.scalar_value.int32_t *= -1
struct_value.scalar_value.int32_t += 1
else:
struct_value.scalar_value.int32_t ^= mask
else:
raise MirrorObjectError(
"support %s" % struct_value.scalar_type)
break
count += 1
logging.debug("fuzzed %s", arg_msg)
else:
raise MirrorObjectError(
"unsupported fuzz message type %s." % arg_msg.type)
return arg_msg
def ConstGenerator():
"""Dynamically generates a const variable's value."""
arg_msg = self.GetConstType(api_name)
if not arg_msg:
raise MirrorObjectError("const %s unknown" % arg_msg)
logging.debug("check %s", api_name)
if arg_msg.type == CompSpecMsg.TYPE_SCALAR:
ret_v = getattr(arg_msg.scalar_value, arg_msg.scalar_type, None)
if ret_v is None:
raise MirrorObjectError(
"No value found for type %s in %s." %
(arg_msg.scalar_type, api_name))
return ret_v
elif arg_msg.type == CompSpecMsg.TYPE_STRING:
return arg_msg.string_value.message
elif arg_msg.type == CompSpecMsg.TYPE_ENUM:
for enumerator, scalar_value in zip(
arg_msg.enum_value.enumerator,
arg_msg.enum_value.scalar_value):
if enumerator == api_name:
return getattr(scalar_value,
arg_msg.enum_value.scalar_type)
raise MirrorObjectError("const %s not found" % api_name)
# handle APIs.
func_msg = self.GetApi(api_name)
if func_msg:
logging.debug("api %s", func_msg)
return RemoteCall
struct_msg = self.GetSubStruct(api_name)
if struct_msg:
logging.debug("sub_struct %s", struct_msg)
if self._parent_path:
parent_name = "%s.%s" % (self._parent_path, api_name)
else:
parent_name = api_name
return MirrorObject(self._client, struct_msg,
self._callback_server,
parent_path=parent_name)
# handle attributes.
fuzz = False
if api_name.endswith("_fuzz"):
fuzz = True
api_name = api_name[:-5]
arg_msg = self.GetCustomAggregateType(api_name)
if arg_msg:
logging.debug("arg %s", arg_msg)
if not fuzz:
return MessageGenerator
else:
return MessageFuzzer
arg_msg = self.GetConstType(api_name)
if arg_msg:
logging.debug("const %s *\n%s", api_name, arg_msg)
return ConstGenerator()
raise MirrorObjectError("unknown api name %s" % api_name)
def GetRawCodeCoverage(self):
"""Returns any measured raw code coverage data."""
return self._last_raw_code_coverage_data
def __str__(self):
"""Prints all the attributes and methods."""
result = ""
if self._if_spec_msg:
if self._if_spec_msg.attribute:
for attribute in self._if_spec_msg.attribute:
result += "attribute %s\n" % attribute.name
if self._if_spec_msg.api:
for api in self._if_spec_msg.api:
result += "api %s\n" % api.name
return result