普通文本  |  401行  |  13.47 KB

# Copyright 2015 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

import re
import logging
import time

from autotest_lib.client.common_lib import error

class PDConsoleUtils(object):
    """ Provides a set of methods common to USB PD FAFT tests

    Each instance of this class is associated with a particular
    servo UART console. USB PD tests will typically use the console
    command 'pd' and its subcommands to control/monitor Type C PD
    connections. The servo object used for UART operations is
    passed in and stored when this object is created.

    """

    SRC_CONNECT = 'SRC_READY'
    SNK_CONNECT = 'SNK_READY'
    SRC_DISC = 'SRC_DISCONNECTED'
    SNK_DISC = 'SNK_DISCONNECTED'
    PD_MAX_PORTS = 2
    CONNECT_TIME = 4

    # dualrole input/ouput values
    DUALROLE_QUERY_DELAY = 0.25
    dual_index = {'on': 0, 'off': 1, 'snk': 2, 'src': 3}
    dualrole_cmd = ['on', 'off', 'sink', 'source']
    dualrole_resp = ['on', 'off', 'force sink', 'force source']

    # Dictionary for 'pd 0/1 state' parsing
    PD_STATE_DICT = {
        'port': 'Port\s+([\w]+)',
        'role': 'Role:\s+([\w]+-[\w]+)',
        'pd_state': 'State:\s+([\w]+_[\w]+)',
        'flags': 'Flags:\s+([\w]+)',
        'polarity': '(CC\d)'
    }

    # Dictionary for PD control message types
    PD_CONTROL_MSG_MASK = 0x1f
    PD_CONTROL_MSG_DICT = {
        'GoodCRC': 1,
        'GotoMin': 2,
        'Accept': 3,
        'Reject': 4,
        'Ping': 5,
        'PS_RDY': 6,
        'Get_Source_Cap': 7,
        'Get_Sink_Cap': 8,
        'DR_Swap': 9,
        'PR_Swap': 10,
        'VCONN_Swap': 11,
        'Wait': 12,
        'Soft_Reset': 13
    }

    # Dictionary for PD firmware state flags
    PD_STATE_FLAGS_DICT = {
        'power_swap': 1 << 1,
        'data_swap': 1 << 2,
        'data_swap_active': 1 << 3,
        'vconn_on': 1 << 12
    }

    def __init__(self, console):
        """ Console can be either usbpd, ec, or plankton_ec UART
        This object with then be used by the class which creates
        the PDConsoleUtils class to send/receive commands to UART
        """
        # save console for UART access functions
        self.console = console

    def send_pd_command(self, cmd):
        """Send command to PD console UART

        @param cmd: pd command string
        """
        self.console.send_command(cmd)

    def send_pd_command_get_output(self, cmd, regexp):
        """Send command to PD console, wait for response

        @param cmd: pd command string
        @param regexp: regular expression for desired output
        """
        return self.console.send_command_get_output(cmd, regexp)

    def send_pd_command_get_reply_msg(self, cmd):
        """Send PD protocol msg, get PD control msg reply

        The PD console debug mode is enabled prior to sending
        a pd protocol message. This allows the
        control message reply to be extracted. The debug mode
        is disabled prior to exiting.

        @param cmd: pd command to issue to the UART console

        @returns: PD control header message
        """
        # Enable PD console debug mode to show control messages
        self.enable_pd_console_debug()
        m = self.send_pd_command_get_output(cmd, ['RECV\s([\w]+)'])
        ctrl_msg = int(m[0][1], 16) & self.PD_CONTROL_MSG_MASK
        self.disable_pd_console_debug()
        return ctrl_msg

    def verify_pd_console(self):
        """Verify that PD commands exist on UART console

        Send 'help' command to UART console
        @returns: True if 'pd' is found, False if not
        """

        l = self.console.send_command_get_output('help', ['(pd)\s+([\w]+)'])
        if l[0][1] == 'pd':
            return True
        else:
            return False

    def execute_pd_state_cmd(self, port):
        """Get PD state for specified channel

        pd 0/1 state command gives produces 5 fields. The full response
        line is captured and then parsed to extract each field to fill
        the dict containing port, polarity, role, pd_state, and flags.

        @param port: Type C PD port 0 or 1

        @returns: A dict with the 5 fields listed above
        """
        cmd = 'pd'
        subcmd = 'state'
        pd_cmd = cmd +" " + str(port) + " " + subcmd
        # Two FW versions for this command, get full line.
        m = self.send_pd_command_get_output(pd_cmd,
                                            ['(Port.*) - (Role:.*)\n'])

        # Extract desired values from result string
        state_result = {}
        for key, regexp in self.PD_STATE_DICT.iteritems():
            value = re.search(regexp, m[0][0])
            if value:
                state_result[key] = value.group(1)
            else:
                raise error.TestFail('pd 0/1 state: %r value not found' % (key))

        return state_result

    def get_pd_state(self, port):
        """Get the current PD state

        @param port: Type C PD port 0/1
        @returns: current pd state
        """

        pd_dict = self.execute_pd_state_cmd(port)
        return pd_dict['pd_state']

    def get_pd_port(self, port):
        """Get the current PD port

        @param port: Type C PD port 0/1
        @returns: current pd state
        """
        pd_dict = self.execute_pd_state_cmd(port)
        return pd_dict['port']

    def get_pd_role(self, port):
        """Get the current PD power role (source or sink)

        @param port: Type C PD port 0/1
        @returns: current pd state
        """
        pd_dict = self.execute_pd_state_cmd(port)
        return pd_dict['role']

    def get_pd_flags(self, port):
        """Get the current PD flags

        @param port: Type C PD port 0/1
        @returns: current pd state
        """
        pd_dict = self.execute_pd_state_cmd(port)
        return pd_dict['flags']

    def get_pd_dualrole(self):
        """Get the current PD dualrole setting

        @returns: current PD dualrole setting
        """
        cmd = 'pd dualrole'
        dual_list = self.send_pd_command_get_output(cmd,
                                ['dual-role toggling:\s+([\w ]+)'])
        return dual_list[0][1]

    def set_pd_dualrole(self, value):
        """Set pd dualrole

        It can be set to either:
        1. on
        2. off
        3. snk (force sink mode)
        4. src (force source mode)
        After setting, the current value is read to confirm that it
        was set properly.

        @param value: One of the 4 options listed
        """
        # Get string required for console command
        dual_index = self.dual_index[value]
        # Create console command
        cmd = 'pd dualrole ' + self.dualrole_cmd[dual_index]
        self.console.send_command(cmd)
        time.sleep(self.DUALROLE_QUERY_DELAY)
        # Get current setting to verify that command was successful
        dual = self.get_pd_dualrole()
        # If it doesn't match, then raise error
        if dual != self.dualrole_resp[dual_index]:
            raise error.TestFail("dualrole error: " +
                                 self.dualrole_resp[dual_index] + " != "+dual)

    def query_pd_connection(self):
        """Determine if PD connection is present

        Try the 'pd 0/1 state' command and see if it's in either
        expected state of a conneciton. Record the port number
        that has an active connection

        @returns: dict with params port, connect, and state
        """
        status = {}
        port = 0;
        status['connect'] = False
        status['port'] = port
        state = self.get_pd_state(port)
        # Check port 0 first
        if state == self.SRC_CONNECT or state == self.SNK_CONNECT:
            status['connect'] = True
            status['role'] = state
        else:
            port = 1
            status['port'] = port
            state = self.get_pd_state(port)
            # Check port 1
            if state == self.SRC_CONNECT or state == self.SNK_CONNECT:
                status['connect'] = True
                status['role'] = state

        return status

    def swap_power_role(self, port):
        """Attempt a power role swap

        This method attempts to execute a power role swap. A check
        is made to ensure that dualrole mode is enabled and that
        a PD contract is currently established. If both checks pass,
        then the power role swap command is issued. After a delay,
        if a PD contract is established and the current state does
        not equal the starting state, then it was successful.

        @param port: pd port number

        @returns: True if power swap is successful, False otherwise.
        """
        # Get starting state
        if self.is_pd_dual_role_enabled() == False:
            logging.info('Dualrole Mode not enabled!')
            return False
        if self.is_pd_connected(port) == False:
            logging.info('PD contract not established!')
            return False
        current_pr = self.get_pd_state(port)
        swap_cmd = 'pd %d swap power' % port
        self.send_pd_command(swap_cmd)
        time.sleep(self.CONNECT_TIME)
        new_pr = self.get_pd_state(port)
        logging.info('Power swap: %s -> %s', current_pr, new_pr)
        if self.is_pd_connected(port) == False:
            return False
        return bool(current_pr != new_pr)

    def disable_pd_console_debug(self):
        """Turn off PD console debug

        """
        cmd = 'pd dump 0'
        self.send_pd_command(cmd)

    def enable_pd_console_debug(self):
        """Enable PD console debug level 1

        """
        cmd = 'pd dump 1'
        self.send_pd_command(cmd)

    def is_pd_flag_set(self, port, key):
        """Test a bit in PD protocol state flags

        The flag word contains various PD protocol state information.
        This method allows for a specific flag to be tested.

        @param port: Port which has the active PD connection
        @param key: dict key to retrieve the flag bit mapping

        @returns True if the bit to be tested is set
        """
        pd_flags = self.get_pd_flags(port)
        return bool(self.PD_STATE_FLAGS_DICT[key] & int(pd_flags, 16))

    def is_pd_connected(self, port):
        """Check if a PD connection is active

        @param port: port to be used for pd console commands

        @returns True if port is in connected state
        """
        state = self.get_pd_state(port)
        return bool(state == self.SRC_CONNECT or state == self.SNK_CONNECT)

    def is_pd_dual_role_enabled(self):
        """Check if a PD device is in dualrole mode

        @returns True is dualrole mode is active, false otherwise
        """
        drp = self.get_pd_dualrole()
        return bool(drp == self.dualrole_resp[self.dual_index['on']])


class PDConnectionUtils(PDConsoleUtils):
    """Provides a set of methods common to USB PD FAFT tests

    This Class is used for PD utility methods that require access
    to both Plankton and DUT PD consoles.

    """

    def __init__(self, dut_console, plankton_console):
        """
        @param dut_console: PD console object for DUT
        @param plankton_console: PD console object for Plankton
        """
        # save console for DUT PD UART access functions
        self.dut_console = dut_console
        # save console for Plankton UART access functions
        self.plankton_console = plankton_console
        super(PDConnectionUtils, self).__init__(dut_console)

    def _verify_plankton_connection(self, port):
        """Verify DUT to Plankton PD connection

        This method checks for a Plankton PD connection for the
        given port by first verifying if a PD connection is present.
        If found, then it uses a Plankton feature to force a PD disconnect.
        If the port is no longer in the connected state, and following
        a delay, is found to be back in the connected state, then
        a DUT pd to Plankton connection is verified.

        @param port: DUT pd port to test

        @returns True if DUT to Plankton pd connection is verified
        """
        DISCONNECT_CHECK_TIME = 0.5
        DISCONNECT_TIME_SEC = 2
        # plankton console command to force PD disconnect
        disc_cmd = 'fake_disconnect 100 %d' % (DISCONNECT_TIME_SEC * 1000)
        # Only check for Plankton if DUT has active PD connection
        if self.dut_console.is_pd_connected(port):
            # Attempt to force PD disconnection
            self.plankton_console.send_pd_command(disc_cmd)
            time.sleep(DISCONNECT_CHECK_TIME)
            # Verify that DUT PD port is no longer connected
            if self.dut_console.is_pd_connected(port) == False:
                # Wait for disconnect timer and give time to reconnect
                time.sleep(self.dut_console.CONNECT_TIME + DISCONNECT_TIME_SEC)
                if self.dut_console.is_pd_connected(port):
                    logging.info('Plankton connection verified on port %d',
                                 port)
                    return True
            else:
                # Could have disconnected other port, allow it to reconnect
                # before exiting.
                time.sleep(self.dut_console.CONNECT_TIME + DISCONNECT_TIME_SEC)
        return False

    def find_dut_to_plankton_connection(self):
        """Find the PD port which is connected to Plankton

        @returns DUT pd port number if found, None otherwise
        """
        for port in xrange(self.dut_console.PD_MAX_PORTS):
            # Check for DUT to Plankton connection on port
            if self._verify_plankton_connection(port):
                # Plankton PD connection found so exit
                return port
        return None