普通文本  |  316行  |  11.9 KB

#
# Copyright (C) 2016 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import logging
import sys

from vts.proto import ComponentSpecificationMessage_pb2 as CompSpecMsg


def PyValue2PbEnum(message, pb_spec, py_value):
    """Converts Python value to VTS VariableSecificationMessage (Enum).

    Args:
        message: VariableSpecificationMessage is the current and result
                 value message.
        pb_spec: VariableSpecificationMessage which captures the
                 specification of a target attribute.
        py_value: Python value provided by a test case.

    Returns:
        Converted VariableSpecificationMessage if found, None otherwise
    """
    if pb_spec.name:
        message.name = pb_spec.name
    message.type = CompSpecMsg.TYPE_ENUM
    # Look for the enum definition and retrieve the scalar type.
    scalar_type = pb_spec.enum_value.scalar_type
    if scalar_type != "":
        # If the scalar type definition is found, set it and return.
        setattr(message.scalar_value, scalar_type, py_value)
        return
    # Use default scalar_type int32_t for enum definition if the definition
    # is not found.
    setattr(message.scalar_value, "int32_t", py_value)


def PyValue2PbScalar(message, pb_spec, py_value):
    """Converts Python value to VTS VariableSecificationMessage (Scalar).

    Args:
        message: VariableSpecificationMessage is the current and result
                 value message.
        pb_spec: VariableSpecificationMessage which captures the
                 specification of a target attribute.
        py_value: Python value provided by a test case.

    Returns:
        Converted VariableSpecificationMessage if found, None otherwise
    """
    if pb_spec.name:
        message.name = pb_spec.name
    message.type = CompSpecMsg.TYPE_SCALAR
    message.scalar_type = pb_spec.scalar_type
    setattr(message.scalar_value, pb_spec.scalar_type, py_value)


def PyString2PbString(message, pb_spec, py_value):
    """Converts Python string to VTS VariableSecificationMessage (String).

    Args:
        message: VariableSpecificationMessage is the current and result
                 value message.
        pb_spec: VariableSpecificationMessage which captures the
                 specification of a target attribute.
        py_value: Python value provided by a test case.

    Returns:
        Converted VariableSpecificationMessage if found, None otherwise
    """
    if pb_spec.name:
        message.name = pb_spec.name
    message.type = CompSpecMsg.TYPE_STRING
    message.string_value.message = py_value
    message.string_value.length = len(py_value)


def PyList2PbVector(message, pb_spec, py_value):
    """Converts Python list value to VTS VariableSecificationMessage (Vector).

    Args:
        message: VariableSpecificationMessage is the current and result
                 value message.
        pb_spec: VariableSpecificationMessage which captures the
                 specification of a target attribute.
        py_value: Python value provided by a test case.

    Returns:
        Converted VariableSpecificationMessage if found, None otherwise
    """
    if pb_spec.name:
        message.name = pb_spec.name
    message.type = CompSpecMsg.TYPE_VECTOR
    if len(py_value) == 0:
        return message

    vector_spec = pb_spec.vector_value[0]
    for curr_value in py_value:
        new_vector_message = message.vector_value.add()
        new_vector_message.CopyFrom(Convert(vector_spec, curr_value))
    message.vector_size = len(py_value)
    return message


def FindSubStructType(pb_spec, sub_struct_name):
    """Finds a specific sub_struct type.

    Args:
        pb_spec: VariableSpecificationMessage which captures the
                 specification of a target attribute.
        sub_struct_name: string, the name of a sub struct to look up.

    Returns:
        VariableSpecificationMessage if found or None otherwise.
    """
    for sub_struct in pb_spec.sub_struct:
        if sub_struct.name == sub_struct_name:
            return sub_struct
    return None


def FindSubUnionType(pb_spec, sub_union_name):
    """Finds a specific sub_union type.

    Args:
        pb_spec: VariableSpecificationMessage which captures the
                 specification of a target attribute.
        sub_union_name: string, the name of a sub union to look up.

    Returns:
        VariableSpecificationMessage if found or None otherwise.
    """
    for sub_union in pb_spec.sub_union:
        if sub_union.name == sub_union_name:
            return sub_union
    return None


def PyDict2PbStruct(message, pb_spec, py_value):
    """Converts Python dict to VTS VariableSecificationMessage (struct).

    Args:
        pb_spec: VariableSpecificationMessage which captures the
                 specification of a target attribute.
        py_value: A dictionary that represents a struct.

    Returns:
        Converted VariableSpecificationMessage if found, None otherwise
    """
    if pb_spec.name:
        message.name = pb_spec.name
    message.type = CompSpecMsg.TYPE_STRUCT
    provided_attrs = set(py_value.keys())
    for attr in pb_spec.struct_value:
        if attr.name in py_value:
            provided_attrs.remove(attr.name)
            curr_value = py_value[attr.name]
            attr_msg = message.struct_value.add()
            if attr.type == CompSpecMsg.TYPE_ENUM:
                PyValue2PbEnum(attr_msg, attr, curr_value)
            elif attr.type == CompSpecMsg.TYPE_SCALAR:
                PyValue2PbScalar(attr_msg, attr, curr_value)
            elif attr.type == CompSpecMsg.TYPE_STRING:
                PyString2PbString(attr_msg, attr, curr_value)
            elif attr.type == CompSpecMsg.TYPE_VECTOR:
                PyList2PbVector(attr_msg, attr, curr_value)
            elif attr.type == CompSpecMsg.TYPE_STRUCT:
                sub_attr = FindSubStructType(pb_spec, attr.predefined_type)
                if sub_attr:
                    PyDict2PbStruct(attr_msg, sub_attr, curr_value)
                else:
                    logging.error("PyDict2PbStruct: substruct not found.")
                    return None
            elif attr.type == CompSpecMsg.TYPE_UNION:
                sub_attr = FindSubStructType(pb_spec, attr.predefined_type)
                if sub_attr:
                    PyDict2PbUnion(attr_msg, sub_attr, curr_value)
                else:
                    logging.error("PyDict2PbStruct: subunion not found.")
                    return None
            else:
                logging.error("PyDict2PbStruct: unsupported type %s",
                              attr.type)
                return None
        else:
            # TODO: instead crash the test, consider to generate default value
            # in case not provided in the py_value.
            logging.error("PyDict2PbStruct: attr %s not provided", attr.name)
            return None
    if len(provided_attrs) > 0:
        logging.error("PyDict2PbStruct: provided dictionary included elements" +
                      " not part of the type being converted to: %s",
                      provided_attrs)
        return None
    return message


def PyDict2PbUnion(message, pb_spec, py_value):
    """Converts Python dict to VTS VariableSecificationMessage (union).

    Args:
        pb_spec: VariableSpecificationMessage which captures the
                 specification of a target attribute.
        py_value: A dictionary that represents a struct.

    Returns:
        Converted VariableSpecificationMessage if found, None otherwise
    """
    if len(py_value) > 1:
        logging.error("PyDict2PbUnion: Union only allows specifying " +
                      "at most one field. Current Python dictionary " +
                      "has size %d", len(py_value))
        return None

    if pb_spec.name:
        message.name = pb_spec.name
    message.type = CompSpecMsg.TYPE_UNION
    provided_attrs = set(py_value.keys())
    for attr in pb_spec.union_value:
        # Since it is a union type, we stop after finding one field name
        # that matches, and shouldn't throw an error when name is not found.
        if attr.name in py_value:
            provided_attrs.remove(attr.name)
            curr_value = py_value[attr.name]
            attr_msg = message.union_value.add()
            if attr.type == CompSpecMsg.TYPE_ENUM:
                PyValue2PbEnum(attr_msg, attr, curr_value)
            elif attr.type == CompSpecMsg.TYPE_SCALAR:
                PyValue2PbScalar(attr_msg, attr, curr_value)
            elif attr.type == CompSpecMsg.TYPE_STRING:
                PyString2PbString(attr_msg, attr, curr_value)
            elif attr.type == CompSpecMsg.TYPE_VECTOR:
                PyList2PbVector(attr_msg, attr, curr_value)
            elif attr.type == CompSpecMsg.TYPE_STRUCT:
                # TODO: is a nested struct in union stored in sub_union field.
                sub_attr = FindSubUnionType(pb_spec, attr.predefined_type)
                if sub_attr:
                    PyDict2PbStruct(attr_msg, sub_attr, curr_value)
                else:
                    logging.error("PyDict2PbStruct: substruct not found.")
                    return None
            elif attr.type == CompSpecMsg.TYPE_UNION:
                sub_attr = FindSubUnionType(pb_spec, attr.predefined_type)
                if sub_attr:
                    PyDict2PbUnion(attr_msg, sub_attr, curr_value)
                else:
                    logging.error("PyDict2PbUnion: subunion not found.")
                    return None
            else:
                logging.error("PyDict2PbStruct: unsupported type %s",
                              attr.type)
                return None
        else:
            # Add a field, where name field is initialized as an empty string.
            # In generated driver implementation, driver knows this field is
            # not used, and skip reading it.
            message.union_value.add()

    if len(provided_attrs) > 0:
        logging.error("PyDict2PbUnion: specified field is not in the union " +
                      "definition for union type %s", provided_attrs)
        return None
    return message


def Convert(pb_spec, py_value):
    """Converts Python native data structure to VTS VariableSecificationMessage.

    Args:
        pb_spec: VariableSpecificationMessage which captures the
                 specification of a target attribute.
        py_value: Python value provided by a test case.

    Returns:
        Converted VariableSpecificationMessage if found, None otherwise
    """
    if not pb_spec:
        logging.error("py2pb.Convert: ProtoBuf spec is None", pb_spec)
        return None

    message = CompSpecMsg.VariableSpecificationMessage()
    message.name = pb_spec.name

    if isinstance(py_value, CompSpecMsg.VariableSpecificationMessage):
        message.CopyFrom(py_value)
    elif pb_spec.type == CompSpecMsg.TYPE_STRUCT:
        PyDict2PbStruct(message, pb_spec, py_value)
    elif pb_spec.type == CompSpecMsg.TYPE_UNION:
        PyDict2PbUnion(message, pb_spec, py_value)
    elif pb_spec.type == CompSpecMsg.TYPE_ENUM:
        PyValue2PbEnum(message, pb_spec, py_value)
    elif pb_spec.type == CompSpecMsg.TYPE_SCALAR:
        PyValue2PbScalar(message, pb_spec, py_value)
    elif pb_spec.type == CompSpecMsg.TYPE_STRING:
        PyString2PbString(message, pb_spec, py_value)
    elif pb_spec.type == CompSpecMsg.TYPE_VECTOR:
        PyList2PbVector(message, pb_spec, py_value)
    else:
        logging.error("py2pb.Convert: unsupported type %s",
                      pb_spec.type)
        return None

    return message