普通文本  |  787行  |  29.53 KB

#
# Copyright (C) 2018 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import logging

from vts.proto import AndroidSystemControlMessage_pb2 as ASysCtrlMsg
from vts.proto import VtsResourceControllerMessage_pb2 as ResControlMsg
from vts.proto import ComponentSpecificationMessage_pb2 as CompSpecMsg
from vts.utils.python.mirror import mirror_object


class ResourceFmqMirror(mirror_object.MirrorObject):
    """This is a class that mirrors FMQ resource allocated on the target side.

    Attributes:
        SUPPORTED_SCALAR_TYPES: set, contains all scalar types supported by FMQ.
                                If the type of FMQ is one of those, this class
                                prepares the write data from caller provided
                                Python data.
        _client: VtsTcpClient, the TCP client instance.
        _queue_id: int, used to identify the queue object on the target side.
        _data_type: type of data in the queue.
        _sync: bool, whether the queue is synchronized.
    """

    SUPPORTED_SCALAR_TYPES = {
        "uint8_t", "int8_t", "uint16_t", "int16_t", "uint32_t", "int32_t",
        "uint64_t", "int64_t", "bool_t", "double_t"
    }

    def __init__(self, data_type, sync, client, queue_id=-1):
        """Initialize a FMQ mirror.

        Args:
            data_type: string, type of data in the queue
                       (e.g. "uint32_t", "int16_t").
            sync: bool, whether queue is synchronized (only has one reader).
            client: VtsTcpClient, specifies the session that this mirror use.
            queue_id: int, identifies the queue on the target side.
                      Optional if caller initializes a new FMQ mirror.
        """
        super(ResourceFmqMirror, self).__init__(client)
        self._data_type = data_type
        self._sync = sync
        self._queue_id = queue_id

    def _create(self, queue_id, queue_size, blocking, reset_pointers):
        """Initiate a fast message queue object on the target side.

        This method registers a FMQ object on the target side, and stores
        the queue_id in the class attribute.
        Users should not directly call this method because it will overwrite
        the original queue_id stored in the mirror object, leaving that
        queue object out of reference.
        Users should always call InitFmq() in mirror_tracker.py to obtain a
        new queue object.

        Args:
            queue_id: int, identifies the message queue object on the target side.
            queue_size: int, size of the queue.
            blocking: bool, whether blocking is enabled in the queue.
            reset_pointers: bool, whether to reset read/write pointers when
              creating a message queue object based on an existing message queue.
        """
        # Prepare arguments.
        request_msg = self._createTemplateRequestMessage(
            ResControlMsg.FMQ_CREATE, queue_id)
        request_msg.queue_size = queue_size
        request_msg.blocking = blocking
        request_msg.reset_pointers = reset_pointers

        # Send and receive data.
        fmq_response = self._client.SendFmqRequest(request_msg)
        if fmq_response is not None and fmq_response.queue_id != -1:
            self._queue_id = fmq_response.queue_id
        else:
            self._queue_id = -1
            logging.error("Failed to create a new queue object.")

    def read(self, data, data_size):
        """Initiate a non-blocking read request to FMQ driver.

        Args:
            data: list, data to be filled by this function. The list will
                  be emptied before the function starts to put read data into
                  it, which is consistent with the function behavior on the
                  target side.
            data_size: int, length of data to read.

        Returns:
            bool, true if the operation succeeds,
                  false otherwise.
        """
        # Prepare arguments.
        del data[:]
        request_msg = self._createTemplateRequestMessage(
            ResControlMsg.FMQ_READ, self._queue_id)
        request_msg.read_data_size = data_size

        # Send and receive data.
        fmq_response = self._client.SendFmqRequest(request_msg)
        if fmq_response is not None and fmq_response.success:
            self._extractReadData(fmq_response, data)
            return True
        return False

    # TODO: support long-form blocking read in the future when there is use case.
    def readBlocking(self, data, data_size, time_out_nanos=0):
        """Initiate a blocking read request (short-form) to FMQ driver.

        Args:
            data: list, data to be filled by this function. The list will
                  be emptied before the function starts to put read data into
                  it, which is consistent with the function behavior on the
                  target side.
            data_size: int, length of data to read.
            time_out_nanos: int, wait time (in nanoseconds) when blocking.
                            The default value is 0 (no blocking).

        Returns:
            bool, true if the operation succeeds,
                  false otherwise.
        """
        # Prepare arguments.
        del data[:]
        request_msg = self._createTemplateRequestMessage(
            ResControlMsg.FMQ_READ_BLOCKING, self._queue_id)
        request_msg.read_data_size = data_size
        request_msg.time_out_nanos = time_out_nanos

        # Send and receive data.
        fmq_response = self._client.SendFmqRequest(request_msg)
        if fmq_response is not None and fmq_response.success:
            self._extractReadData(fmq_response, data)
            return True
        return False

    def write(self, data, data_size):
        """Initiate a non-blocking write request to FMQ driver.

        Args:
            data: list, data to be written.
            data_size: int, length of data to write.
                       The function will only write data up until data_size,
                       i.e. extraneous data will be discarded.

        Returns:
            bool, true if the operation succeeds,
                  false otherwise.
        """
        # Prepare arguments.
        request_msg = self._createTemplateRequestMessage(
            ResControlMsg.FMQ_WRITE, self._queue_id)
        prepare_result = self._prepareWriteData(request_msg, data[:data_size])
        if not prepare_result:
            # Prepare write data failure, error logged in _prepareWriteData().
            return False

        # Send and receive data.
        fmq_response = self._client.SendFmqRequest(request_msg)
        if fmq_response is not None:
            return fmq_response.success
        return False

    # TODO: support long-form blocking write in the future when there is use case.
    def writeBlocking(self, data, data_size, time_out_nanos=0):
        """Initiate a blocking write request (short-form) to FMQ driver.

        Args:
            data: list, data to be written.
            data_size: int, length of data to write.
                       The function will only write data up until data_size,
                       i.e. extraneous data will be discarded.
            time_out_nanos: int, wait time (in nanoseconds) when blocking.
                            The default value is 0 (no blocking).

        Returns:
            bool, true if the operation succeeds,
                  false otherwise.
        """
        # Prepare arguments.
        request_msg = self._createTemplateRequestMessage(
            ResControlMsg.FMQ_WRITE_BLOCKING, self._queue_id)
        prepare_result = self._prepareWriteData(request_msg, data[:data_size])
        if not prepare_result:
            # Prepare write data failure, error logged in _prepareWriteData().
            return False
        request_msg.time_out_nanos = time_out_nanos

        # Send and receive data.
        fmq_response = self._client.SendFmqRequest(request_msg)
        if fmq_response is not None:
            return fmq_response.success
        return False

    def availableToWrite(self):
        """Get space available to write in the queue.

        Returns:
            int, number of slots available.
        """
        # Prepare arguments.
        request_msg = self._createTemplateRequestMessage(
            ResControlMsg.FMQ_AVAILABLE_WRITE, self._queue_id)

        # Send and receive data.
        return self._processUtilMethod(request_msg)

    def availableToRead(self):
        """Get number of items available to read.

        Returns:
            int, number of items.
        """
        # Prepare arguments.
        request_msg = self._createTemplateRequestMessage(
            ResControlMsg.FMQ_AVAILABLE_READ, self._queue_id)

        # Send and receive data.
        return self._processUtilMethod(request_msg)

    def getQuantumSize(self):
        """Get size of item in the queue.

        Returns:
            int, size of item.
        """
        # Prepare arguments.
        request_msg = self._createTemplateRequestMessage(
            ResControlMsg.FMQ_GET_QUANTUM_SIZE, self._queue_id)

        # send and receive data
        return self._processUtilMethod(request_msg)

    def getQuantumCount(self):
        """Get number of items that fit in the queue.

        Returns:
            int, number of items.
        """
        # Prepare arguments.
        request_msg = self._createTemplateRequestMessage(
            ResControlMsg.FMQ_GET_QUANTUM_COUNT, self._queue_id)

        # Send and receive data.
        return self._processUtilMethod(request_msg)

    def isValid(self):
        """Check if the queue is valid.

        Returns:
            bool, true if the queue is valid.
        """
        # Prepare arguments.
        request_msg = self._createTemplateRequestMessage(
            ResControlMsg.FMQ_IS_VALID, self._queue_id)

        # Send and receive data.
        fmq_response = self._client.SendFmqRequest(request_msg)
        if fmq_response is not None:
            return fmq_response.success
        return False

    @property
    def queueId(self):
        """Gets the id assigned from the target side.

        Returns:
            int, id of the queue.
        """
        return self._queue_id

    @property
    def dataType(self):
        """Get the type of data of this FMQ mirror.

        Returns:
            string, type of data in the queue
        """
        return self._data_type

    @property
    def sync(self):
        """Get the synchronization option of this FMQ mirror.

        Returns:
            bool, true if the queue is synchronized (only has one reader).
        """
        return self._sync

    def _createTemplateRequestMessage(self, operation, queue_id):
        """Creates a template FmqRequestMessage with common arguments among
           all FMQ operations.

        Args:
            operation: FmqOp, fmq operations.
                       (see test/vts/proto/VtsResourceControllerMessage.proto).
            queue_id: int, identifies the message queue object on target side.

        Returns:
            FmqRequestMessage, fmq request message.
                (See test/vts/proto/VtsResourceControllerMessage.proto).
        """
        request_msg = ResControlMsg.FmqRequestMessage()
        request_msg.operation = operation
        request_msg.data_type = self._data_type
        request_msg.sync = self._sync
        request_msg.queue_id = queue_id
        return request_msg

    def _prepareWriteData(self, request_msg, data):
        """Converts python list to repeated protobuf field.

        If the type of data in the queue is a supported scalar, caller can
        directly supply the python native value. Otherwise, caller needs to
        supply a list of VariableSpecificationMessage.

        Args:
            request_msg: FmqRequestMessage, arguments for a FMQ operation
                         request.
            data: VariableSpecificationMessage list or a list of scalar values.
                  If the type of FMQ is scalar type, caller can directly
                  specify the Python scalar data. Otherwise, caller has to
                  provide each item as VariableSpecificationMessage.

        Returns:
            bool, true if preparation succeeds, false otherwise.
            This function can fail if caller doesn't provide a list of
            VariableSpecificationMessage when type of data in the queue
            is not a supported scalar type.
        """
        for curr_value in data:
            new_message = request_msg.write_data.add()
            if isinstance(curr_value,
                          CompSpecMsg.VariableSpecificationMessage):
                new_message.CopyFrom(curr_value)
            elif self._data_type in self.SUPPORTED_SCALAR_TYPES:
                new_message.type = CompSpecMsg.TYPE_SCALAR
                new_message.scalar_type = self._data_type
                setattr(new_message.scalar_value, self._data_type, curr_value)
            else:
                logging.error("Need to provide VariableSpecificationMessage " +
                              "if type of data in the queue is not a " +
                              "supported scalar type.")
                return False
        return True

    def _extractReadData(self, response_msg, data):
        """Extracts read data from the response message returned by client.

        Args:
            response_msg: FmqResponseMessage, contains response from FMQ driver.
            data: list, to be filled by this function. data buffer is provided
                  by caller, so this function will append every element to the
                  buffer.
        """
        for item in response_msg.read_data:
            data.append(self._client.GetPythonDataOfVariableSpecMsg(item))

    def _processUtilMethod(self, request_msg):
        """Sends request message and process response message for util methods
           that return an unsigned integer,
           e.g. availableToWrite, availableToRead.

        Args:
            request_msg: FmqRequestMessage, arguments for a FMQ operation request.

        Returns: int, information about the queue,
                 None if the operation is unsuccessful.
        """
        fmq_response = self._client.SendFmqRequest(request_msg)
        if fmq_response is not None and fmq_response.success:
            return fmq_response.sizet_return_val
        return None


class ResourceHidlMemoryMirror(mirror_object.MirrorObject):
    """This class mirrors hidl_memory resource allocated on the target side.

    Attributes:
        _client: the TCP client instance.
        _mem_id: int, used to identify the memory region on the target side.
    """

    def __init__(self, client, mem_id=-1):
        super(ResourceHidlMemoryMirror, self).__init__(client)
        self._mem_id = mem_id

    def _allocate(self, mem_size):
        """Initiate a hidl_memory region on the target side.

        This method stores the mem_id in the class attribute.
        Users should not directly call this method to get a new memory region,
        because it will overwrite the original memory object with mem_id,
        making that memory object out of reference.
        Users should always call InitHidlMemory() in mirror_tracker.py to get
        a new memory region.

        Args:
            mem_size: int, size of the requested memory region.
        """
        # Prepare arguments.
        request_msg = self._createTemplateRequestMessage(
            ResControlMsg.MEM_PROTO_ALLOCATE)
        request_msg.mem_size = mem_size

        # Send and receive data.
        response_msg = self._client.SendHidlMemoryRequest(request_msg)
        if response_msg is not None and response_msg.new_mem_id != -1:
            self._mem_id = response_msg.new_mem_id
        else:
            logging.error("Failed to allocate memory region.")

    def read(self):
        """Notify that caller will read the entire memory region.

        Before every actual read operation, caller must call this method
        or readRange() first.

        Returns:
            bool, true if the operation succeeds, false otherwise.
        """
        request_msg = self._createTemplateRequestMessage(
            ResControlMsg.MEM_PROTO_START_READ)

        response_msg = self._client.SendHidlMemoryRequest(request_msg)
        if response_msg is not None:
            if not response_msg.success:
                logging.error("Failed to find memory region with id %d",
                              self._mem_id)
            return response_msg.success
        return False

    def readRange(self, start, length):
        """Notify that caller will read only part of memory region.

        Notify that caller will read starting at start and
        ending at start + length.
        Before every actual read operation, caller must call this method
        or read() first.

        Args:
            start: int, offset from the start of memory region to be modified.
            length: int, number of bytes to be modified.

        Returns:
            bool, true if the operation succeeds, false otherwise.
        """
        request_msg = self._createTemplateRequestMessage(
            ResControlMsg.MEM_PROTO_START_READ_RANGE)
        request_msg.start = start
        request_msg.length = length

        response_msg = self._client.SendHidlMemoryRequest(request_msg)
        if response_msg is not None:
            if not response_msg.success:
                logging.error("Failed to find memory region with id %d",
                              self._mem_id)
            return response_msg.success
        return False

    def update(self):
        """Notify that caller will possibly write to all memory region.

        Before every actual write operation, caller must call this method
        or updateRange() first.

        Returns:
            bool, true if the operation succeeds, false otherwise.
        """
        request_msg = self._createTemplateRequestMessage(
            ResControlMsg.MEM_PROTO_START_UPDATE)

        response_msg = self._client.SendHidlMemoryRequest(request_msg)
        if response_msg is not None:
            if not response_msg.success:
                logging.error("Failed to find memory region with id %d",
                              self._mem_id)
            return response_msg.success
        return False

    def updateRange(self, start, length):
        """Notify that caller will only write to part of memory region.

        Notify that caller will only write starting at start and
        ending at start + length.
        Before every actual write operation, caller must call this method
        or update() first.

        Args:
            start: int, offset from the start of memory region to be modified.
            length: int, number of bytes to be modified.

        Returns:
            bool, true if the operation succeeds, false otherwise.
        """
        request_msg = self._createTemplateRequestMessage(
            ResControlMsg.MEM_PROTO_START_UPDATE_RANGE)
        request_msg.start = start
        request_msg.length = length

        response_msg = self._client.SendHidlMemoryRequest(request_msg)
        if response_msg is not None:
            if not response_msg.success:
                logging.error("Failed to find memory region with id %d",
                              self._mem_id)
            return response_msg.success
        return False

    def readBytes(self, length, start=0):
        """This method performs actual read operation.

        This method helps caller perform actual read operation on the
        memory region, because host side won't be able to cast c++ pointers.

        Args:
            length: int, number of bytes to read.
            start: int, offset from the start of memory region to read.

        Returns:
            string, data read from memory.
                    Caller can perform conversion on the result to obtain the
                    corresponding data structure in python.
            None, indicate if the read fails.
        """
        request_msg = self._createTemplateRequestMessage(
            ResControlMsg.MEM_PROTO_READ_BYTES)
        request_msg.start = start
        request_msg.length = length

        response_msg = self._client.SendHidlMemoryRequest(request_msg)
        if response_msg is not None:
            if response_msg.success:
                return response_msg.read_data
            logging.error("Failed to find memory region with id %d",
                          self._mem_id)
        return None

    def updateBytes(self, data, length, start=0):
        """This method performs actual write operation.

        This method helps caller perform actual write operation on the
        memory region, because host side won't be able to cast c++ pointers.

        Args:
            data: string, bytes to be written into memory.
                  Caller can use bytearray() function to convert python
                  data structures into python, and call str() on the resulting
                  bytearray object.
            length: int, number of bytes to write.
            start: int, offset from the start of memory region to be modified.

        Returns:
            bool, true if the operation succeeds, false otherwise.
        """
        request_msg = self._createTemplateRequestMessage(
            ResControlMsg.MEM_PROTO_UPDATE_BYTES)
        request_msg.write_data = data
        request_msg.start = start
        request_msg.length = length

        response_msg = self._client.SendHidlMemoryRequest(request_msg)
        if response_msg is not None:
            if not response_msg.success:
                logging.error("Failed to find memory region with id %d",
                              self._mem_id)
            return response_msg.success
        return False

    def commit(self):
        """Caller signals done with operating on the memory region.

        Caller needs to call this method after reading/writing.

        Returns:
            bool, true if the operation succeeds, false otherwise.
        """
        request_msg = self._createTemplateRequestMessage(
            ResControlMsg.MEM_PROTO_COMMIT)

        response_msg = self._client.SendHidlMemoryRequest(request_msg)
        if response_msg is not None:
            if not response_msg.success:
                logging.error("Failed to find memory region with id %d",
                              self._mem_id)
            return response_msg.success
        return False

    def getSize(self):
        """Gets the size of the memory region.

        Returns:
            int, size of memory region, -1 to signal operation failure.
        """
        request_msg = self._createTemplateRequestMessage(
            ResControlMsg.MEM_PROTO_GET_SIZE)

        response_msg = self._client.SendHidlMemoryRequest(request_msg)
        if response_msg is not None:
            if response_msg.success:
                return response_msg.mem_size
            logging.error("Failed to find memory region with id %d",
                          self._mem_id)
        return -1

    @property    
    def memId(self):
        """Gets the id assigned from the target side.

        Returns:
            int, id of the memory object.
        """
        return self._mem_id

    def _createTemplateRequestMessage(self, operation):
        """Creates a template HidlMemoryRequestMessage.

        This method creates a message that contains common arguments among
        all hidl_memory operations.

        Args:
            operation: HidlMemoryOp, hidl_memory operations.
                       (see test/vts/proto/VtsResourceControllerMessage.proto).

        Returns:
            HidlMemoryRequestMessage, hidl_memory request message.
                (See test/vts/proto/VtsResourceControllerMessage.proto).
        """
        request_msg = ResControlMsg.HidlMemoryRequestMessage()
        request_msg.operation = operation
        request_msg.mem_id = self._mem_id
        return request_msg


class ResourceHidlHandleMirror(mirror_object.MirrorObject):
    """This class mirrors hidl_handle resource allocated on the target side.

    TODO: support more than file types in the future, e.g. socket, pipe.

    Attributes:
        _client: the TCP client instance.
        _handle_id: int, used to identify the handle object on the target side.
    """

    def __init__(self, client, handle_id=-1):
        super(ResourceHidlHandleMirror, self).__init__(client)
        self._handle_id = handle_id

    def CleanUp(self):
        """Close open file descriptors on target-side drivers.

        Developers can call this method to close open file descriptors
        in all handle objects.
        Note: This method needs to be called before self._client
        is disconnected. self._client is most likely initialized in
        one of the hal_mirror.
        """
        request_msg = self._createTemplateRequestMessage(
            ResControlMsg.HANDLE_PROTO_DELETE)
        self._client.SendHidlHandleRequest(request_msg)

    def _createHandleForSingleFile(self, filepath, mode, int_data):
        """Initiate a hidl_handle object containing a single file descriptor.

        This method stores the handle_id in the class attribute.
        Users should not directly call this method to create a new
        handle object, because it will overwrite the original handle object,
        making that handle object out of reference.
        Users should always call InitHidlHandle() in mirror_tracker.py to get
        a new handle object.

        Args:
            filepath: string, path to the file to be opened.
            mode: string, specifying the mode to open the file.
            int_data: int list, useful integers to store in the handle object.
        """
        # Prepare arguments.
        request_msg = self._createTemplateRequestMessage(
            ResControlMsg.HANDLE_PROTO_CREATE_FILE)
        request_msg.handle_info.num_fds = 1
        request_msg.handle_info.num_ints = len(int_data)

        # TODO: support more than one file descriptors at once.
        # Add the file information into proto message.
        fd_message = request_msg.handle_info.fd_val.add()
        fd_message.type = CompSpecMsg.FILE_TYPE
        fd_message.file_mode_str = mode
        fd_message.file_name = filepath

        # Add the integers into proto message.
        request_msg.handle_info.int_val.extend(int_data)

        # Send and receive data.
        response_msg = self._client.SendHidlHandleRequest(request_msg)
        if response_msg is not None and response_msg.new_handle_id != -1:
            self._handle_id = response_msg.new_handle_id
        else:
            logging.error("Failed to create handle object.")

    def readFile(self, read_data_size, index=0):
        """Reads from a given file in the handle object.

        Args:
            read_data_size: int, number of bytes to read.
            index: int, index of file among all files in the handle object.
                        Optional if host only wants to read from one file.

        Returns:
            string, data read from the file.
        """
        # Prepare arguments.
        request_msg = self._createTemplateRequestMessage(
            ResControlMsg.HANDLE_PROTO_READ_FILE)
        request_msg.read_data_size = read_data_size

        # Send and receive data.
        response_msg = self._client.SendHidlHandleRequest(request_msg)
        if response_msg is not None and response_msg.success:
            return response_msg.read_data
        # TODO: more detailed error message.
        logging.error("Failed to read from the file.")
        return None

    def writeFile(self, write_data, index=0):
        """Writes to a given file to the handle object.

        Args:
            write_data: string, data to be written into file.
            index: int, index of file among all files in the handle object.
                        Optional if host only wants to write into one file.

        Returns:
            int, number of bytes written.
        """
        # Prepare arguments.
        request_msg = self._createTemplateRequestMessage(
            ResControlMsg.HANDLE_PROTO_WRITE_FILE)
        request_msg.write_data = write_data

        # Send and receive data.
        response_msg = self._client.SendHidlHandleRequest(request_msg)
        if response_msg is not None and response_msg.success:
            return response_msg.write_data_size
        # TODO: more detailed error message.
        logging.error("Failed to write into the file.")
        return None

    @property
    def handleId(self):
        """Gets the id assigned from the target side.

        Returns:
            int, id of the handle object.
        """
        return self._handle_id

    def _createTemplateRequestMessage(self, operation):
        """Creates a template HidlHandleRequestMessage.

        This method creates a HidlHandleRequestMessage with common arguments
        among all hidl_handle operations.

        Args:
            operation: HidlHandleOp, hidl_handle operations.
                       (see test/vts/proto/VtsResourceControllerMessage.proto).

        Returns:
            HidlHandleRequestMessage, hidl_handle request message.
                (See test/vts/proto/VtsResourceControllerMessage.proto).
        """
        request_msg = ResControlMsg.HidlHandleRequestMessage()
        request_msg.operation = operation
        request_msg.handle_id = self._handle_id
        return request_msg