# Copyright 2015 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""
MBIM Data transfer module is responsible for generating valid MBIM NTB frames
from IP packets and for extracting IP packets from received MBIM NTB frames.
"""
import array
import struct
from collections import namedtuple
from autotest_lib.client.cros.cellular.mbim_compliance import mbim_constants
from autotest_lib.client.cros.cellular.mbim_compliance \
import mbim_data_channel
from autotest_lib.client.cros.cellular.mbim_compliance import mbim_errors
NTH_SIGNATURE_32 = 0x686D636E # "ncmh"
NDP_SIGNATURE_IPS_32 = 0x00737069 # "ips0"
NDP_SIGNATURE_DSS_32 = 0x00737364 # "dss0"
NTH_SIGNATURE_16 = 0x484D434E # "NCMH"
NDP_SIGNATURE_IPS_16 = 0x00535049 # "IPS0"
NDP_SIGNATURE_DSS_16 = 0x00535344 # "DSS0"
class MBIMDataTransfer(object):
"""
MBIMDataTransfer class is the public interface for any data transfer
from/to the device via the MBIM data endpoints (BULK-IN/BULK-OUT).
The class encapsulates the MBIM NTB frame generation/parsing as well as
sending the the NTB frames to the device and vice versa.
Users are expected to:
1. Initialize the channel data transfer module by providing a valid
device context which holds all the required info regarding the devie under
test.
2. Use send_data_packets to send IP packets to the device.
3. Use receive_data_packets to receive IP packets from the device.
"""
def __init__(self, device_context):
"""
Initialize the Data Transfer object. The data transfer object
instantiates the data channel to prepare for any data transfer from/to
the device using the bulk pipes.
@params device_context: The device context which contains all the USB
descriptors, NTB params and USB handle to the device.
"""
self._device_context = device_context
mbim_data_interface = (
device_context.descriptor_cache.mbim_data_interface)
bulk_in_endpoint = (
device_context.descriptor_cache.bulk_in_endpoint)
bulk_out_endpoint = (
device_context.descriptor_cache.bulk_out_endpoint)
self._data_channel = mbim_data_channel.MBIMDataChannel(
device=device_context.device,
data_interface_number=mbim_data_interface.bInterfaceNumber,
bulk_in_endpoint_address=bulk_in_endpoint.bEndpointAddress,
bulk_out_endpoint_address=bulk_out_endpoint.bEndpointAddress,
max_in_buffer_size=device_context.max_in_data_transfer_size)
def send_data_packets(self, ntb_format, data_packets):
"""
Creates an MBIM frame for the payload provided and sends it out to the
device using bulk out pipe.
@param ntb_format: Whether to send an NTB16 or NTB32 frame.
@param data_packets: Array of data packets. Each packet is a byte array
corresponding to the IP packet or any other payload to be sent.
"""
ntb_object = MBIMNtb(ntb_format)
ntb_frame = ntb_object.generate_ntb(
data_packets,
self._device_context.max_out_data_transfer_size,
self._device_context.out_data_transfer_divisor,
self._device_context.out_data_transfer_payload_remainder,
self._device_context.out_data_transfer_ndp_alignment)
self._data_channel.send_ntb(ntb_frame)
def receive_data_packets(self, ntb_format):
"""
Receives an MBIM frame from the device using the bulk in pipe,
deaggregates the payload from the frame and returns it to the caller.
Will return an empty tuple, if no frame is received from the device.
@param ntb_format: Whether to receive an NTB16 or NTB32 frame.
@returns tuple of (nth, ndp, ndp_entries, payload) where,
nth - NTH header object received.
ndp - NDP header object received.
ndp_entries - Array of NDP entry header objects.
payload - Array of packets where each packet is a byte array.
"""
ntb_frame = self._data_channel.receive_ntb()
if not ntb_frame:
return ()
ntb_object = MBIMNtb(ntb_format)
return ntb_object.parse_ntb(ntb_frame)
class MBIMNtb(object):
"""
MBIM NTB class used for MBIM data transfer.
This class is used to generate/parse NTB frames.
Limitations:
1. We currently only support a single NDP frame within an NTB.
2. We only support IP data payload. This can be overcome by using the DSS
(instead of IPS) prefix in NDP signature if required.
"""
_NEXT_SEQUENCE_NUMBER = 0
def __init__(self, ntb_format):
"""
Initialization of the NTB object.
We assign the appropriate header classes required based on whether
we are going to work with NTB16 or NTB32 data frames.
@param ntb_format: Type of NTB: 16 vs 32
"""
self._ntb_format = ntb_format
# Defining the tuples to be used for the headers.
if ntb_format == mbim_constants.NTB_FORMAT_16:
self._nth_class = Nth16
self._ndp_class = Ndp16
self._ndp_entry_class = NdpEntry16
self._nth_signature = NTH_SIGNATURE_16
self._ndp_signature = NDP_SIGNATURE_IPS_16
else:
self._nth_class = Nth32
self._ndp_class = Ndp32
self._ndp_entry_class = NdpEntry32
self._nth_signature = NTH_SIGNATURE_32
self._ndp_signature = NDP_SIGNATURE_IPS_32
@classmethod
def get_next_sequence_number(cls):
"""
Returns incrementing sequence numbers on successive calls. We start
the sequence numbering at 0.
@returns The sequence number for data transfers.
"""
# Make sure to rollover the 16 bit sequence number.
if MBIMNtb._NEXT_SEQUENCE_NUMBER > (0xFFFF - 2):
MBIMNtb._NEXT_SEQUENCE_NUMBER = 0x0000
sequence_number = MBIMNtb._NEXT_SEQUENCE_NUMBER
MBIMNtb._NEXT_SEQUENCE_NUMBER += 1
return sequence_number
@classmethod
def reset_sequence_number(cls):
"""
Resets the sequence number to be used for NTB's sent from host. This
has to be done every time the device is reset.
"""
cls._NEXT_SEQUENCE_NUMBER = 0x00000000
def get_next_payload_offset(self,
current_offset,
ntb_divisor,
ntb_payload_remainder):
"""
Helper function to find the offset to place the next payload
Alignment of payloads follow this formula:
Offset % ntb_divisor == ntb_payload_remainder.
@params current_offset: Current index offset in the frame.
@param ntb_divisor: Used for payload alignment within the frame.
@param ntb_payload_remainder: Used for payload alignment within the
frame.
@returns offset to place the next payload at.
"""
next_payload_offset = (
(((current_offset + (ntb_divisor - 1)) / ntb_divisor) *
ntb_divisor) + ntb_payload_remainder)
return next_payload_offset
def generate_ntb(self,
payload,
max_ntb_size,
ntb_divisor,
ntb_payload_remainder,
ntb_ndp_alignment):
"""
This function generates an NTB frame out of the payload provided.
@param payload: Array of packets to sent to the device. Each packet
contains the raw byte array of IP packet to be sent.
@param max_ntb_size: Max size of NTB frame supported by the device.
@param ntb_divisor: Used for payload alignment within the frame.
@param ntb_payload_remainder: Used for payload alignment within the
frame.
@param ntb_ndp_alignment : Used for NDP header alignment within the
frame.
@raises MBIMComplianceNtbError if the complete |ntb| can not fit into
|max_ntb_size|.
@returns the raw MBIM NTB byte array.
"""
cls = self.__class__
# We start with the NTH header, then the payload and then finally
# the NDP header and the associated NDP entries.
ntb_curr_offset = self._nth_class.get_struct_len()
num_packets = len(payload)
nth_length = self._nth_class.get_struct_len()
ndp_length = self._ndp_class.get_struct_len()
# We need one extra ZLP NDP entry at the end, so account for it.
ndp_entries_length = (
self._ndp_entry_class.get_struct_len() * (num_packets + 1))
# Create the NDP header and an NDP_ENTRY header for each packet.
# We can create the NTH header only after we calculate the total length.
self.ndp = self._ndp_class(
signature=self._ndp_signature,
length=ndp_length+ndp_entries_length,
next_ndp_index=0)
self.ndp_entries = []
# We'll also construct the payload raw data as we loop thru the packets.
# The padding in between the payload is added in place.
raw_ntb_frame_payload = array.array('B', [])
for packet in payload:
offset = self.get_next_payload_offset(
ntb_curr_offset, ntb_divisor, ntb_payload_remainder)
align_length = offset - ntb_curr_offset
length = len(packet)
# Add align zeroes, then payload, then pad zeroes
raw_ntb_frame_payload += array.array('B', [0] * align_length)
raw_ntb_frame_payload += packet
self.ndp_entries.append(self._ndp_entry_class(
datagram_index=offset, datagram_length=length))
ntb_curr_offset = offset + length
# Add the ZLP entry
self.ndp_entries.append(self._ndp_entry_class(
datagram_index=0, datagram_length=0))
# Store the NDP offset to be used in creating NTH header.
# NDP alignment is specified by the device with a minimum of 4 and it
# always a multiple of 2.
ndp_align_mask = ntb_ndp_alignment - 1
if ntb_curr_offset & ndp_align_mask:
pad_length = ntb_ndp_alignment - (ntb_curr_offset & ndp_align_mask)
raw_ntb_frame_payload += array.array('B', [0] * pad_length)
ntb_curr_offset += pad_length
ndp_offset = ntb_curr_offset
ntb_curr_offset += ndp_length
ntb_curr_offset += ndp_entries_length
if ntb_curr_offset > max_ntb_size:
mbim_errors.log_and_raise(
mbim_errors.MBIMComplianceNtbError,
'Could not fit the complete NTB of size %d into %d bytes' %
ntb_curr_offset, max_ntb_size)
# Now create the NTH header
self.nth = self._nth_class(
signature=self._nth_signature,
header_length=nth_length,
sequence_number=cls.get_next_sequence_number(),
block_length=ntb_curr_offset,
fp_index=ndp_offset)
# Create the raw bytes now, we create the raw bytes of the header and
# attach it to the payload raw bytes with padding already created above.
raw_ntb_frame = array.array('B', [])
raw_ntb_frame += array.array('B', self.nth.pack())
raw_ntb_frame += raw_ntb_frame_payload
raw_ntb_frame += array.array('B', self.ndp.pack())
for entry in self.ndp_entries:
raw_ntb_frame += array.array('B', entry.pack())
self.payload = payload
self.raw_ntb_frame = raw_ntb_frame
return raw_ntb_frame
def parse_ntb(self, raw_ntb_frame):
"""
This function parses an NTB frame and returns the NTH header, NDP header
and the payload parsed which can be used to inspect the response
from the device.
@param raw_ntb_frame: Array of bytes of an MBIM NTB frame.
@raises MBIMComplianceNtbError if there is an error in parsing.
@returns tuple of (nth, ndp, ndp_entries, payload) where,
nth - NTH header object received.
ndp - NDP header object received.
ndp_entries - Array of NDP entry header objects.
payload - Array of packets where each packet is a byte array.
"""
# Read the nth header to find the ndp header index
self.nth = self._nth_class(raw_data=raw_ntb_frame)
ndp_offset = self.nth.fp_index
# Verify the total length field
if len(raw_ntb_frame) != self.nth.block_length:
mbim_errors.log_and_raise(
mbim_errors.MBIMComplianceNtbError,
'NTB size mismatch Total length: %x Reported: %x bytes' % (
len(raw_ntb_frame), self.nth.block_length))
# Read the NDP header to find the number of packets in the entry
self.ndp = self._ndp_class(raw_data=raw_ntb_frame[ndp_offset:])
num_ndp_entries = (
(self.ndp.length - self._ndp_class.get_struct_len()) /
self._ndp_entry_class.get_struct_len())
ndp_entries_offset = ndp_offset + self._ndp_class.get_struct_len()
self.payload = []
self.ndp_entries = []
for _ in range(0, num_ndp_entries):
ndp_entry = self._ndp_entry_class(
raw_data=raw_ntb_frame[ndp_entries_offset:])
ndp_entries_offset += self._ndp_entry_class.get_struct_len()
packet_start_offset = ndp_entry.datagram_index
packet_end_offset = (
ndp_entry.datagram_index + ndp_entry.datagram_length)
# There is one extra ZLP NDP entry at the end, so account for it.
if ndp_entry.datagram_index and ndp_entry.datagram_length:
packet = array.array('B', raw_ntb_frame[packet_start_offset:
packet_end_offset])
self.payload.append(packet)
self.ndp_entries.append(ndp_entry)
self.raw_ntb_frame = raw_ntb_frame
return (self.nth, self.ndp, self.ndp_entries, self.payload)
def header_class_new(cls, **kwargs):
"""
Creates a header instance with either the given field name/value
pairs or raw data buffer.
@param kwargs: Dictionary of (field_name, field_value) pairs or
raw_data=Packed binary array.
@returns New header object created.
"""
field_values = []
if 'raw_data' in kwargs and kwargs['raw_data']:
raw_data = kwargs['raw_data']
data_format = cls.get_field_format_string()
unpack_length = cls.get_struct_len()
data_length = len(raw_data)
if data_length < unpack_length:
mbim_errors.log_and_raise(
mbim_errors.MBIMComplianceDataTransferError,
'Length of Data (%d) to be parsed less than header'
' structure length (%d)' %
(data_length, unpack_length))
field_values = struct.unpack_from(data_format, raw_data)
else:
field_names = cls.get_field_names()
for field_name in field_names:
if field_name not in kwargs:
field_value = 0
field_values.append(field_value)
else:
field_values.append(kwargs.pop(field_name))
if kwargs:
mbim_errors.log_and_raise(
mbim_errors.MBIMComplianceDataTransferError,
'Unexpected fields (%s) in %s' % (
kwargs.keys(), cls.__name__))
obj = super(cls, cls).__new__(cls, *field_values)
return obj
class MBIMNtbHeadersMeta(type):
"""
Metaclass for all the NTB headers. This is relatively toned down metaclass
to create namedtuples out of the header fields.
Header definition attributes:
_FIELDS: Used to define structure elements. Each element contains a format
specifier and the field name.
"""
def __new__(mcs, name, bases, attrs):
if object in bases:
return super(MBIMNtbHeadersMeta, mcs).__new__(
mcs, name, bases, attrs)
fields = attrs['_FIELDS']
if not fields:
mbim_errors.log_and_raise(
mbim_errors.MBIMComplianceDataTransfer,
'%s header must have some fields defined' % name)
_, field_names = zip(*fields)
attrs['__new__'] = header_class_new
header_class = namedtuple(name, field_names)
# Prepend the class created via namedtuple to |bases| in order to
# correctly resolve the __new__ method while preserving the class
# hierarchy.
cls = super(MBIMNtbHeadersMeta, mcs).__new__(
mcs, name, (header_class,) + bases, attrs)
return cls
class MBIMNtbHeaders(object):
"""
Base class for all NTB headers.
This class should not be instantiated on it's own.
The base class overrides namedtuple's __new__ to:
1. Create a tuple out of raw object.
2. Put value of zero for fields which are not specified by the caller,
For ex: reserved fields
"""
__metaclass__ = MBIMNtbHeadersMeta
@classmethod
def get_fields(cls):
"""
Helper function to find all the fields of this class.
@returns Fields of the structure.
"""
return cls._FIELDS
@classmethod
def get_field_names(cls):
"""
Helper function to return the field names of the header.
@returns The field names of the header structure.
"""
_, field_names = zip(*cls.get_fields())
return field_names
@classmethod
def get_field_formats(cls):
"""
Helper function to return the field formats of the header.
@returns The format of fields of the header structure.
"""
field_formats, _ = zip(*cls.get_fields())
return field_formats
@classmethod
def get_field_format_string(cls):
"""
Helper function to return the field format string of the header.
@returns The format string of the header structure.
"""
format_string = '<' + ''.join(cls.get_field_formats())
return format_string
@classmethod
def get_struct_len(cls):
"""
Returns the length of the structure representing the header.
@returns Length of the structure.
"""
return struct.calcsize(cls.get_field_format_string())
def pack(self):
"""
Packs a header based on the field format specified.
@returns The packet in binary array form.
"""
cls = self.__class__
field_names = cls.get_field_names()
format_string = cls.get_field_format_string()
field_values = [getattr(self, name) for name in field_names]
return array.array('B', struct.pack(format_string, *field_values))
class Nth16(MBIMNtbHeaders):
""" The class for MBIM NTH16 objects. """
_FIELDS = (('I', 'signature'),
('H', 'header_length'),
('H', 'sequence_number'),
('H', 'block_length'),
('H', 'fp_index'))
class Ndp16(MBIMNtbHeaders):
""" The class for MBIM NDP16 objects. """
_FIELDS = (('I', 'signature'),
('H', 'length'),
('H', 'next_ndp_index'))
class NdpEntry16(MBIMNtbHeaders):
""" The class for MBIM NDP16 objects. """
_FIELDS = (('H', 'datagram_index'),
('H', 'datagram_length'))
class Nth32(MBIMNtbHeaders):
""" The class for MBIM NTH32 objects. """
_FIELDS = (('I', 'signature'),
('H', 'header_length'),
('H', 'sequence_number'),
('I', 'block_length'),
('I', 'fp_index'))
class Ndp32(MBIMNtbHeaders):
""" The class for MBIM NTH32 objects. """
_FIELDS = (('I', 'signature'),
('H', 'length'),
('H', 'reserved_6'),
('I', 'next_ndp_index'),
('I', 'reserved_12'))
class NdpEntry32(MBIMNtbHeaders):
""" The class for MBIM NTH32 objects. """
_FIELDS = (('I', 'datagram_index'),
('I', 'datagram_length'))