普通文本  |  191行  |  6.18 KB

# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

'''
USB to I2C controller.
'''

import glob
import logging
import os
import re
import serial
import time

from autotest_lib.client.cros import tty

# Least significant bit of I2C address.
WRITE_BIT = 0
READ_BIT = 1


def create_i2c_controller(chipset_config):
    '''Factory method for I2CController.

    This function is a factory method to create an I2CController instance.

    @param chipset_config: Chipset configuration.
    @return An I2CController if succeeded.
    @throws AssertionError if a valid instance cannot be created.
    '''
    if chipset_config.split(':')[0] == 'SC18IM700':
        usb_uart_driver = chipset_config.split(':')[1]
        # Try to find a tty terminal that driver name matches usb_uart_driver.
        tty_path = tty.find_tty_by_driver(usb_uart_driver)
        if tty_path:
            return _I2CControllerSC18IM700(tty_path)

    assert False, "Unsupported configuration: %s" % chipset_config


class I2CController(object):
    '''
    The base class of I2C controller.
    '''

    # Constants indicate I2C bus status.
    I2C_OK = 1
    I2C_NACK_ON_ADDRESS = 2
    I2C_NACK_ON_DATA = 3
    I2C_TIME_OUT = 4

    def send_and_check_status(self, slave_addr, int_array):
        '''Sends data to I2C slave device and checks the bus status.

        @param slave_addr: The address of slave in 7bits format.
        @param int_array: The data to send in integer array.
        @param status_check: Whether to check I2C bus status.

        @return An integer indicates I2C bus status.
        '''
        self.send(slave_addr, int_array)
        return self.read_bus_status()

    def read_bus_status(self):
        '''Returns the I2C bus status.'''
        raise NotImplementedError

    def send(self, slave_addr, int_array):
        '''Sends data to I2C slave device.

        Caller should call read_bus_status() explicitly to confirm whether the
        data sent successfully.

        @param slave_addr: The address of slave in 7bits format.
        @param int_array: The data to send in integer array.
        '''
        raise NotImplementedError

    def read(self, slave_addr, bytes_to_read):
        '''Reads data from I2C slave device.

        @param slave_addr: The address of slave in 7bits format.
        @param bytes_to_read: The number of bytes to read from device.
        @return An array of data.
        '''
        raise NotImplementedError


class _I2CControllerSC18IM700(I2CController):
    '''
    Implementation of I2C Controller for NXP SC18IM700.
    '''
    SEC_WAIT_I2C = 0.1

    # Constants from official datasheet.
    # http://www.nxp.com/documents/data_sheet/SC18IM700.pdf
    I2C_STATUS = {0b11110000: I2CController.I2C_OK,
                  0b11110001: I2CController.I2C_NACK_ON_ADDRESS,
                  0b11110011: I2CController.I2C_NACK_ON_DATA,
                  0b11111000: I2CController.I2C_TIME_OUT}

    def __init__(self, device_path):
        '''Connects to NXP via serial port.

        @param device_path: The device path of serial port.
        '''
        self.logger = logging.getLogger('SC18IM700')
        self.logger.info('Setup serial device... [%s]', device_path)
        self.device_path = device_path
        self.serial = serial.Serial(port=self.device_path,
                                    baudrate=9600,
                                    bytesize=serial.EIGHTBITS,
                                    parity=serial.PARITY_NONE,
                                    stopbits=serial.STOPBITS_ONE,
                                    xonxoff=False,
                                    rtscts=True,
                                    interCharTimeout=1)
        self.logger.info('pySerial [%s] configuration : %s',
                         serial.VERSION, self.serial.__repr__())
        # Clean the buffer.
        self.serial.flush()

    def _write(self, data):
        '''Converts data to bytearray and writes to the serial port.'''
        self.serial.write(bytearray(data))
        self.serial.flush()

    def _read(self):
        '''Reads data from serial port(Non-Blocking).'''
        ret = self.serial.read(self.serial.inWaiting())
        self.logger.info('Hex and binary dump of datas - ')
        for char in ret:
            self.logger.info('  %x - %s', ord(char), bin(ord(char)))
        return ret

    @staticmethod
    def _convert_to_8bits_addr(slave_addr_7bits, lsb):
        '''Converts slave_addr from 7 bits to 8 bits with given LSB.'''
        assert (slave_addr_7bits >> 7) == 0, "Address must not exceed 7 bits."
        assert (lsb & ~0x01) == 0, "lsb must not exceed one bit."
        return (slave_addr_7bits << 1) | lsb

    def read_bus_status(self):
        cmd = [ord('R'), 0x0A, ord('P')]
        self._write(cmd)
        time.sleep(self.SEC_WAIT_I2C)
        ret = self._read()
        if (len(ret) == 1) and (ord(ret[0]) in self.I2C_STATUS):
            return self.I2C_STATUS[ord(ret[0])]
        raise IOError("I2C_STATUS_READ_FAILED")

    def send(self, slave_addr, int_array):
        cmd = ([ord('S'),
                self._convert_to_8bits_addr(slave_addr, WRITE_BIT),
                len(int_array)] +
               int_array + [ord('P')])
        self._write(cmd)

    def read(self, slave_addr, bytes_to_read):
        cmd = ([ord('S'),
                self._convert_to_8bits_addr(slave_addr, READ_BIT),
                bytes_to_read,
                ord('P')])
        self._write(cmd)
        time.sleep(self.SEC_WAIT_I2C)
        return self._read()

    def write_gpio(self, data):
        self._write([ord('O'), data, ord('P')])

    def read_gpio(self):
        self._write([ord('I'), ord('P')])
        time.sleep(self.SEC_WAIT_I2C)
        return self._read()

    def write_register(self, regs, datas):
        assert len(regs) == len(datas)
        cmd = [ord('W')]
        for i in range(len(regs)):
            cmd.append(regs[i])
            cmd.append(datas[i])
        cmd.append(ord('P'))
        self._write(cmd)

    def read_register(self, regs):
        cmd = [ord('R')] + regs + [ord('P')]
        self._write(cmd)
        time.sleep(self.SEC_WAIT_I2C)
        return self._read()