普通文本  |  219行  |  7.9 KB

# Copyright 2015 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

import logging
import Queue
import signal
import struct
import time
import numpy

from collections import namedtuple
from usb import core

import common
from autotest_lib.client.cros.cellular.mbim_compliance import mbim_errors


USBNotificationPacket = namedtuple(
        'USBNotificationPacket',
        ['bmRequestType', 'bNotificationCode', 'wValue', 'wIndex',
         'wLength'])


class MBIMChannelEndpoint(object):
    """
    An object dedicated to interacting with the MBIM capable USB device.

    This object interacts with the USB devies in a forever loop, servicing
    command requests from |MBIMChannel| as well as surfacing any notifications
    from the modem.

    """
    USB_PACKET_HEADER_FORMAT = '<BBHHH'
    # Sleeping for 0 seconds *may* hint for the schedular to relinquish CPU.
    QUIET_TIME_MS = 0
    INTERRUPT_READ_TIMEOUT_MS = 1  # We don't really want to wait.
    GET_ENCAPSULATED_RESPONSE_TIMEOUT_MS = 50
    SEND_ENCAPSULATED_REQUEST_TIMEOUT_MS = 50
    GET_ENCAPSULATED_RESPONSE_ARGS = {
            'bmRequestType' : 0b10100001,
            'bRequest' : 0b00000001,
            'wValue' : 0x0000}
    SEND_ENCAPSULATED_COMMAND_ARGS = {
            'bmRequestType' : 0b00100001,
            'bRequest' : 0b00000000,
            'wValue' : 0x0000}

    def __init__(self,
                 device,
                 interface_number,
                 interrupt_endpoint_address,
                 in_buffer_size,
                 request_queue,
                 response_queue,
                 stop_request_event,
                 strict=True):
        """
        @param device: Device handle returned by PyUSB for the modem to test.
        @param interface_number: |bInterfaceNumber| of the MBIM interface.
        @param interrupt_endpoint_address: |bEndpointAddress| for the usb
                INTERRUPT IN endpoint for notifications.
        @param in_buffer_size: The (fixed) buffer size to use for in control
                transfers.
        @param request_queue: A process safe queue where we expect commands
                to send be be enqueued.
        @param response_queue: A process safe queue where we enqueue
                non-notification responses from the device.
        @param strict: In strict mode (default), any unexpected error causes an
                abort. Otherwise, we merely warn.

        """
        self._device = device
        self._interface_number = interface_number
        self._interrupt_endpoint_address = interrupt_endpoint_address
        self._in_buffer_size = in_buffer_size
        self._request_queue = request_queue
        self._response_queue = response_queue
        self._stop_requested = stop_request_event
        self._strict = strict

        self._num_outstanding_responses = 0
        self._response_available_packet = USBNotificationPacket(
                bmRequestType=0b10100001,
                bNotificationCode=0b00000001,
                wValue=0x0000,
                wIndex=self._interface_number,
                wLength=0x0000)

        # SIGINT recieved by the parent process is forwarded to this process.
        # Exit graciously when that happens.
        signal.signal(signal.SIGINT,
                      lambda signum, frame: self._stop_requested.set())
        self.start()


    def start(self):
        """ Start the busy-loop that periodically interacts with the modem. """
        while not self._stop_requested.is_set():
            try:
                self._tick()
            except mbim_errors.MBIMComplianceChannelError as e:
                if self._strict:
                    raise

            time.sleep(self.QUIET_TIME_MS / 1000)


    def _tick(self):
        """ Work done in one time slice. """
        self._check_response()
        response = self._get_response()
        self._check_response()
        if response is not None:
            try:
                self._response_queue.put_nowait(response)
            except Queue.Full:
                mbim_errors.log_and_raise(
                        mbim_errors.MBIMComplianceChannelError,
                        'Response queue full.')

        self._check_response()
        try:
            request = self._request_queue.get_nowait()
            if request:
                self._send_request(request)
        except Queue.Empty:
            pass

        self._check_response()


    def _check_response(self):
        """
        Check if there is a response available.

        If a response is available, increment |outstanding_responses|.

        This method is kept separate from |_get_response| because interrupts are
        time critical. A separate method underscores this point. It also opens
        up the possibility of giving this method higher priority wherever
        possible.

        """
        try:
            in_data = self._device.read(
                    self._interrupt_endpoint_address,
                    struct.calcsize(self.USB_PACKET_HEADER_FORMAT),
                    self._interface_number,
                    self.INTERRUPT_READ_TIMEOUT_MS)
        except core.USBError:
            # If there is no response available, the modem will response with
            # STALL messages, and pyusb will raise an exception.
            return

        if len(in_data) != struct.calcsize(self.USB_PACKET_HEADER_FORMAT):
            mbim_errors.log_and_raise(
                    mbim_errors.MBIMComplianceChannelError,
                    'Received unexpected notification (%s) of length %d.' %
                    (in_data, len(in_data)))

        in_packet = USBNotificationPacket(
                *struct.unpack(self.USB_PACKET_HEADER_FORMAT, in_data))
        if in_packet != self._response_available_packet:
            mbim_errors.log_and_raise(
                    mbim_errors.MBIMComplianceChannelError,
                    'Received unexpected notification (%s).' % in_data)

        self._num_outstanding_responses += 1


    def _get_response(self):
        """
        Get the outstanding response from the device.

        @returns: The MBIM payload, if any. None otherwise.

        """
        if self._num_outstanding_responses == 0:
            return None

        # We count all failed cases also as an attempt.
        self._num_outstanding_responses -= 1
        response = self._device.ctrl_transfer(
                wIndex=self._interface_number,
                data_or_wLength=self._in_buffer_size,
                timeout=self.GET_ENCAPSULATED_RESPONSE_TIMEOUT_MS,
                **self.GET_ENCAPSULATED_RESPONSE_ARGS)
        numpy.set_printoptions(formatter={'int':lambda x: hex(int(x))},
                               linewidth=1000)
        logging.debug('Control Channel: Received %d bytes response. Payload:%s',
                      len(response), numpy.array(response))
        return response


    def _send_request(self, payload):
        """
        Send payload (one fragment) down to the device.

        @raises MBIMComplianceGenericError if the complete |payload| could not
                be sent.

        """
        actual_written = self._device.ctrl_transfer(
                wIndex=self._interface_number,
                data_or_wLength=payload,
                timeout=self.SEND_ENCAPSULATED_REQUEST_TIMEOUT_MS,
                **self.SEND_ENCAPSULATED_COMMAND_ARGS)
        numpy.set_printoptions(formatter={'int':lambda x: hex(int(x))},
                               linewidth=1000)
        logging.debug('Control Channel: Sent %d bytes out of %d bytes '
                      'requested. Payload:%s',
                      actual_written, len(payload), numpy.array(payload))
        if actual_written < len(payload):
            mbim_errors.log_and_raise(
                    mbim_errors.MBIMComplianceGenericError,
                    'Could not send the complete packet (%d/%d bytes sent)' %
                    actual_written, len(payload))