普通文本  |  103行  |  4.16 KB

# Copyright 2015 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

import logging
import numpy

import common
from autotest_lib.client.cros.cellular.mbim_compliance import mbim_errors


class MBIMDataChannel(object):
    """
    Provides access to the data channel of a MBIM modem.

    The object is used to send and receive MBIM frames to/from the modem.
    The object uses the BULK-IN endpoint exposed in the data interface for any
    reads from the modem to the host.
    The object uses the BULK-OUT endpoint exposed in the data interface for any
    writes from the host to the modem.
    The channel does not deaggregate/aggregate packets into MBIM frames. The
    caller is expected to validate/provide MBIM frames to the channel. The
    channel is just used to send raw bytes to the device and read raw bytes from
    the device.

    """
    _READ_TIMEOUT_MS = 10000
    _WRITE_TIMEOUT_MS = 10000

    def __init__(self,
                 device,
                 data_interface_number,
                 bulk_in_endpoint_address,
                 bulk_out_endpoint_address,
                 max_in_buffer_size):
        """
        @param device: Device handle returned by PyUSB for the modem to test.
        @param bulk_in_endpoint_address: |bEndpointAddress| for the usb
                BULK IN endpoint from the data interface.
        @param bulk_out_endpoint_address: |bEndpointAddress| for the usb
                BULK OUT endpoint from the data interface.
        @param max_in_buffer_size: The (fixed) buffer size to used for in
                data transfers.

        """
        self._device = device
        self._data_interface_number = data_interface_number
        self._bulk_in_endpoint_address = bulk_in_endpoint_address
        self._bulk_out_endpoint_address = bulk_out_endpoint_address
        self._max_in_buffer_size = max_in_buffer_size


    def send_ntb(self, ntb):
        """
        Send the specified payload down to the device using the bulk-out USB
        pipe.

        @param ntb: Byte array of complete MBIM NTB to be sent to the device.
        @raises MBIMComplianceDataTransferError if the complete |ntb| could not
                be sent.

        """
        ntb_length = len(ntb)
        written = self._device.write(endpoint=self._bulk_out_endpoint_address,
                                     data=ntb,
                                     timeout=self._WRITE_TIMEOUT_MS,
                                     interface=self._data_interface_number)
        numpy.set_printoptions(formatter={'int':lambda x: hex(int(x))},
                               linewidth=1000)
        logging.debug('Data Channel: Sent %d bytes out of %d bytes requested. '
                      'Payload: %s',
                       written, ntb_length, numpy.array(ntb))
        if written < ntb_length:
            mbim_errors.log_and_raise(
                    mbim_errors.MBIMComplianceDataTransferError,
                    'Could not send the complete NTB (%d/%d bytes sent)' %
                    written, ntb_length)


    def receive_ntb(self):
        """
        Receive a payload from the device using the bulk-in USB pipe.

        This API will return any data it receives from the device within
        |_READ_TIMEOUT_S| seconds. If nothing is received within this duration,
        it returns an empty byte array. The API returns only one MBIM NTB
        received per invocation.

        @returns Byte array of complete MBIM NTB received from the device. This
                could be empty if nothing is received from the device.

        """
        ntb = self._device.read(endpoint=self._bulk_in_endpoint_address,
                                size=self._max_in_buffer_size,
                                timeout=self._READ_TIMEOUT_MS,
                                interface=self._data_interface_number)
        ntb_length = len(ntb)
        numpy.set_printoptions(formatter={'int':lambda x: hex(int(x))},
                               linewidth=1000)
        logging.debug('Data Channel: Received %d bytes response. Payload: %s',
                       ntb_length, numpy.array(ntb))
        return ntb