# Copyright 2015 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. import logging import Queue import signal import struct import time import numpy from collections import namedtuple from usb import core import common from autotest_lib.client.cros.cellular.mbim_compliance import mbim_errors USBNotificationPacket = namedtuple( 'USBNotificationPacket', ['bmRequestType', 'bNotificationCode', 'wValue', 'wIndex', 'wLength']) class MBIMChannelEndpoint(object): """ An object dedicated to interacting with the MBIM capable USB device. This object interacts with the USB devies in a forever loop, servicing command requests from |MBIMChannel| as well as surfacing any notifications from the modem. """ USB_PACKET_HEADER_FORMAT = '<BBHHH' # Sleeping for 0 seconds *may* hint for the schedular to relinquish CPU. QUIET_TIME_MS = 0 INTERRUPT_READ_TIMEOUT_MS = 1 # We don't really want to wait. GET_ENCAPSULATED_RESPONSE_TIMEOUT_MS = 50 SEND_ENCAPSULATED_REQUEST_TIMEOUT_MS = 50 GET_ENCAPSULATED_RESPONSE_ARGS = { 'bmRequestType' : 0b10100001, 'bRequest' : 0b00000001, 'wValue' : 0x0000} SEND_ENCAPSULATED_COMMAND_ARGS = { 'bmRequestType' : 0b00100001, 'bRequest' : 0b00000000, 'wValue' : 0x0000} def __init__(self, device, interface_number, interrupt_endpoint_address, in_buffer_size, request_queue, response_queue, stop_request_event, strict=True): """ @param device: Device handle returned by PyUSB for the modem to test. @param interface_number: |bInterfaceNumber| of the MBIM interface. @param interrupt_endpoint_address: |bEndpointAddress| for the usb INTERRUPT IN endpoint for notifications. @param in_buffer_size: The (fixed) buffer size to use for in control transfers. @param request_queue: A process safe queue where we expect commands to send be be enqueued. @param response_queue: A process safe queue where we enqueue non-notification responses from the device. @param strict: In strict mode (default), any unexpected error causes an abort. Otherwise, we merely warn. """ self._device = device self._interface_number = interface_number self._interrupt_endpoint_address = interrupt_endpoint_address self._in_buffer_size = in_buffer_size self._request_queue = request_queue self._response_queue = response_queue self._stop_requested = stop_request_event self._strict = strict self._num_outstanding_responses = 0 self._response_available_packet = USBNotificationPacket( bmRequestType=0b10100001, bNotificationCode=0b00000001, wValue=0x0000, wIndex=self._interface_number, wLength=0x0000) # SIGINT recieved by the parent process is forwarded to this process. # Exit graciously when that happens. signal.signal(signal.SIGINT, lambda signum, frame: self._stop_requested.set()) self.start() def start(self): """ Start the busy-loop that periodically interacts with the modem. """ while not self._stop_requested.is_set(): try: self._tick() except mbim_errors.MBIMComplianceChannelError as e: if self._strict: raise time.sleep(self.QUIET_TIME_MS / 1000) def _tick(self): """ Work done in one time slice. """ self._check_response() response = self._get_response() self._check_response() if response is not None: try: self._response_queue.put_nowait(response) except Queue.Full: mbim_errors.log_and_raise( mbim_errors.MBIMComplianceChannelError, 'Response queue full.') self._check_response() try: request = self._request_queue.get_nowait() if request: self._send_request(request) except Queue.Empty: pass self._check_response() def _check_response(self): """ Check if there is a response available. If a response is available, increment |outstanding_responses|. This method is kept separate from |_get_response| because interrupts are time critical. A separate method underscores this point. It also opens up the possibility of giving this method higher priority wherever possible. """ try: in_data = self._device.read( self._interrupt_endpoint_address, struct.calcsize(self.USB_PACKET_HEADER_FORMAT), self._interface_number, self.INTERRUPT_READ_TIMEOUT_MS) except core.USBError: # If there is no response available, the modem will response with # STALL messages, and pyusb will raise an exception. return if len(in_data) != struct.calcsize(self.USB_PACKET_HEADER_FORMAT): mbim_errors.log_and_raise( mbim_errors.MBIMComplianceChannelError, 'Received unexpected notification (%s) of length %d.' % (in_data, len(in_data))) in_packet = USBNotificationPacket( *struct.unpack(self.USB_PACKET_HEADER_FORMAT, in_data)) if in_packet != self._response_available_packet: mbim_errors.log_and_raise( mbim_errors.MBIMComplianceChannelError, 'Received unexpected notification (%s).' % in_data) self._num_outstanding_responses += 1 def _get_response(self): """ Get the outstanding response from the device. @returns: The MBIM payload, if any. None otherwise. """ if self._num_outstanding_responses == 0: return None # We count all failed cases also as an attempt. self._num_outstanding_responses -= 1 response = self._device.ctrl_transfer( wIndex=self._interface_number, data_or_wLength=self._in_buffer_size, timeout=self.GET_ENCAPSULATED_RESPONSE_TIMEOUT_MS, **self.GET_ENCAPSULATED_RESPONSE_ARGS) numpy.set_printoptions(formatter={'int':lambda x: hex(int(x))}, linewidth=1000) logging.debug('Control Channel: Received %d bytes response. Payload:%s', len(response), numpy.array(response)) return response def _send_request(self, payload): """ Send payload (one fragment) down to the device. @raises MBIMComplianceGenericError if the complete |payload| could not be sent. """ actual_written = self._device.ctrl_transfer( wIndex=self._interface_number, data_or_wLength=payload, timeout=self.SEND_ENCAPSULATED_REQUEST_TIMEOUT_MS, **self.SEND_ENCAPSULATED_COMMAND_ARGS) numpy.set_printoptions(formatter={'int':lambda x: hex(int(x))}, linewidth=1000) logging.debug('Control Channel: Sent %d bytes out of %d bytes ' 'requested. Payload:%s', actual_written, len(payload), numpy.array(payload)) if actual_written < len(payload): mbim_errors.log_and_raise( mbim_errors.MBIMComplianceGenericError, 'Could not send the complete packet (%d/%d bytes sent)' % actual_written, len(payload))