# Copyright 2015 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import re
import logging
import time
from autotest_lib.client.common_lib import error
class PDConsoleUtils(object):
""" Provides a set of methods common to USB PD FAFT tests
Each instance of this class is associated with a particular
servo UART console. USB PD tests will typically use the console
command 'pd' and its subcommands to control/monitor Type C PD
connections. The servo object used for UART operations is
passed in and stored when this object is created.
"""
SRC_CONNECT = 'SRC_READY'
SNK_CONNECT = 'SNK_READY'
SRC_DISC = 'SRC_DISCONNECTED'
SNK_DISC = 'SNK_DISCONNECTED'
PD_MAX_PORTS = 2
CONNECT_TIME = 4
# dualrole input/ouput values
DUALROLE_QUERY_DELAY = 0.25
dual_index = {'on': 0, 'off': 1, 'snk': 2, 'src': 3}
dualrole_cmd = ['on', 'off', 'sink', 'source']
dualrole_resp = ['on', 'off', 'force sink', 'force source']
# Dictionary for 'pd 0/1 state' parsing
PD_STATE_DICT = {
'port': 'Port\s+([\w]+)',
'role': 'Role:\s+([\w]+-[\w]+)',
'pd_state': 'State:\s+([\w]+_[\w]+)',
'flags': 'Flags:\s+([\w]+)',
'polarity': '(CC\d)'
}
# Dictionary for PD control message types
PD_CONTROL_MSG_MASK = 0x1f
PD_CONTROL_MSG_DICT = {
'GoodCRC': 1,
'GotoMin': 2,
'Accept': 3,
'Reject': 4,
'Ping': 5,
'PS_RDY': 6,
'Get_Source_Cap': 7,
'Get_Sink_Cap': 8,
'DR_Swap': 9,
'PR_Swap': 10,
'VCONN_Swap': 11,
'Wait': 12,
'Soft_Reset': 13
}
# Dictionary for PD firmware state flags
PD_STATE_FLAGS_DICT = {
'power_swap': 1 << 1,
'data_swap': 1 << 2,
'data_swap_active': 1 << 3,
'vconn_on': 1 << 12
}
def __init__(self, console):
""" Console can be either usbpd, ec, or plankton_ec UART
This object with then be used by the class which creates
the PDConsoleUtils class to send/receive commands to UART
"""
# save console for UART access functions
self.console = console
def send_pd_command(self, cmd):
"""Send command to PD console UART
@param cmd: pd command string
"""
self.console.send_command(cmd)
def send_pd_command_get_output(self, cmd, regexp):
"""Send command to PD console, wait for response
@param cmd: pd command string
@param regexp: regular expression for desired output
"""
return self.console.send_command_get_output(cmd, regexp)
def send_pd_command_get_reply_msg(self, cmd):
"""Send PD protocol msg, get PD control msg reply
The PD console debug mode is enabled prior to sending
a pd protocol message. This allows the
control message reply to be extracted. The debug mode
is disabled prior to exiting.
@param cmd: pd command to issue to the UART console
@returns: PD control header message
"""
# Enable PD console debug mode to show control messages
self.enable_pd_console_debug()
m = self.send_pd_command_get_output(cmd, ['RECV\s([\w]+)'])
ctrl_msg = int(m[0][1], 16) & self.PD_CONTROL_MSG_MASK
self.disable_pd_console_debug()
return ctrl_msg
def verify_pd_console(self):
"""Verify that PD commands exist on UART console
Send 'help' command to UART console
@returns: True if 'pd' is found, False if not
"""
l = self.console.send_command_get_output('help', ['(pd)\s+([\w]+)'])
if l[0][1] == 'pd':
return True
else:
return False
def execute_pd_state_cmd(self, port):
"""Get PD state for specified channel
pd 0/1 state command gives produces 5 fields. The full response
line is captured and then parsed to extract each field to fill
the dict containing port, polarity, role, pd_state, and flags.
@param port: Type C PD port 0 or 1
@returns: A dict with the 5 fields listed above
"""
cmd = 'pd'
subcmd = 'state'
pd_cmd = cmd +" " + str(port) + " " + subcmd
# Two FW versions for this command, get full line.
m = self.send_pd_command_get_output(pd_cmd,
['(Port.*) - (Role:.*)\n'])
# Extract desired values from result string
state_result = {}
for key, regexp in self.PD_STATE_DICT.iteritems():
value = re.search(regexp, m[0][0])
if value:
state_result[key] = value.group(1)
else:
raise error.TestFail('pd 0/1 state: %r value not found' % (key))
return state_result
def get_pd_state(self, port):
"""Get the current PD state
@param port: Type C PD port 0/1
@returns: current pd state
"""
pd_dict = self.execute_pd_state_cmd(port)
return pd_dict['pd_state']
def get_pd_port(self, port):
"""Get the current PD port
@param port: Type C PD port 0/1
@returns: current pd state
"""
pd_dict = self.execute_pd_state_cmd(port)
return pd_dict['port']
def get_pd_role(self, port):
"""Get the current PD power role (source or sink)
@param port: Type C PD port 0/1
@returns: current pd state
"""
pd_dict = self.execute_pd_state_cmd(port)
return pd_dict['role']
def get_pd_flags(self, port):
"""Get the current PD flags
@param port: Type C PD port 0/1
@returns: current pd state
"""
pd_dict = self.execute_pd_state_cmd(port)
return pd_dict['flags']
def get_pd_dualrole(self):
"""Get the current PD dualrole setting
@returns: current PD dualrole setting
"""
cmd = 'pd dualrole'
dual_list = self.send_pd_command_get_output(cmd,
['dual-role toggling:\s+([\w ]+)'])
return dual_list[0][1]
def set_pd_dualrole(self, value):
"""Set pd dualrole
It can be set to either:
1. on
2. off
3. snk (force sink mode)
4. src (force source mode)
After setting, the current value is read to confirm that it
was set properly.
@param value: One of the 4 options listed
"""
# Get string required for console command
dual_index = self.dual_index[value]
# Create console command
cmd = 'pd dualrole ' + self.dualrole_cmd[dual_index]
self.console.send_command(cmd)
time.sleep(self.DUALROLE_QUERY_DELAY)
# Get current setting to verify that command was successful
dual = self.get_pd_dualrole()
# If it doesn't match, then raise error
if dual != self.dualrole_resp[dual_index]:
raise error.TestFail("dualrole error: " +
self.dualrole_resp[dual_index] + " != "+dual)
def query_pd_connection(self):
"""Determine if PD connection is present
Try the 'pd 0/1 state' command and see if it's in either
expected state of a conneciton. Record the port number
that has an active connection
@returns: dict with params port, connect, and state
"""
status = {}
port = 0;
status['connect'] = False
status['port'] = port
state = self.get_pd_state(port)
# Check port 0 first
if state == self.SRC_CONNECT or state == self.SNK_CONNECT:
status['connect'] = True
status['role'] = state
else:
port = 1
status['port'] = port
state = self.get_pd_state(port)
# Check port 1
if state == self.SRC_CONNECT or state == self.SNK_CONNECT:
status['connect'] = True
status['role'] = state
return status
def swap_power_role(self, port):
"""Attempt a power role swap
This method attempts to execute a power role swap. A check
is made to ensure that dualrole mode is enabled and that
a PD contract is currently established. If both checks pass,
then the power role swap command is issued. After a delay,
if a PD contract is established and the current state does
not equal the starting state, then it was successful.
@param port: pd port number
@returns: True if power swap is successful, False otherwise.
"""
# Get starting state
if self.is_pd_dual_role_enabled() == False:
logging.info('Dualrole Mode not enabled!')
return False
if self.is_pd_connected(port) == False:
logging.info('PD contract not established!')
return False
current_pr = self.get_pd_state(port)
swap_cmd = 'pd %d swap power' % port
self.send_pd_command(swap_cmd)
time.sleep(self.CONNECT_TIME)
new_pr = self.get_pd_state(port)
logging.info('Power swap: %s -> %s', current_pr, new_pr)
if self.is_pd_connected(port) == False:
return False
return bool(current_pr != new_pr)
def disable_pd_console_debug(self):
"""Turn off PD console debug
"""
cmd = 'pd dump 0'
self.send_pd_command(cmd)
def enable_pd_console_debug(self):
"""Enable PD console debug level 1
"""
cmd = 'pd dump 1'
self.send_pd_command(cmd)
def is_pd_flag_set(self, port, key):
"""Test a bit in PD protocol state flags
The flag word contains various PD protocol state information.
This method allows for a specific flag to be tested.
@param port: Port which has the active PD connection
@param key: dict key to retrieve the flag bit mapping
@returns True if the bit to be tested is set
"""
pd_flags = self.get_pd_flags(port)
return bool(self.PD_STATE_FLAGS_DICT[key] & int(pd_flags, 16))
def is_pd_connected(self, port):
"""Check if a PD connection is active
@param port: port to be used for pd console commands
@returns True if port is in connected state
"""
state = self.get_pd_state(port)
return bool(state == self.SRC_CONNECT or state == self.SNK_CONNECT)
def is_pd_dual_role_enabled(self):
"""Check if a PD device is in dualrole mode
@returns True is dualrole mode is active, false otherwise
"""
drp = self.get_pd_dualrole()
return bool(drp == self.dualrole_resp[self.dual_index['on']])
class PDConnectionUtils(PDConsoleUtils):
"""Provides a set of methods common to USB PD FAFT tests
This Class is used for PD utility methods that require access
to both Plankton and DUT PD consoles.
"""
def __init__(self, dut_console, plankton_console):
"""
@param dut_console: PD console object for DUT
@param plankton_console: PD console object for Plankton
"""
# save console for DUT PD UART access functions
self.dut_console = dut_console
# save console for Plankton UART access functions
self.plankton_console = plankton_console
super(PDConnectionUtils, self).__init__(dut_console)
def _verify_plankton_connection(self, port):
"""Verify DUT to Plankton PD connection
This method checks for a Plankton PD connection for the
given port by first verifying if a PD connection is present.
If found, then it uses a Plankton feature to force a PD disconnect.
If the port is no longer in the connected state, and following
a delay, is found to be back in the connected state, then
a DUT pd to Plankton connection is verified.
@param port: DUT pd port to test
@returns True if DUT to Plankton pd connection is verified
"""
DISCONNECT_CHECK_TIME = 0.5
DISCONNECT_TIME_SEC = 2
# plankton console command to force PD disconnect
disc_cmd = 'fake_disconnect 100 %d' % (DISCONNECT_TIME_SEC * 1000)
# Only check for Plankton if DUT has active PD connection
if self.dut_console.is_pd_connected(port):
# Attempt to force PD disconnection
self.plankton_console.send_pd_command(disc_cmd)
time.sleep(DISCONNECT_CHECK_TIME)
# Verify that DUT PD port is no longer connected
if self.dut_console.is_pd_connected(port) == False:
# Wait for disconnect timer and give time to reconnect
time.sleep(self.dut_console.CONNECT_TIME + DISCONNECT_TIME_SEC)
if self.dut_console.is_pd_connected(port):
logging.info('Plankton connection verified on port %d',
port)
return True
else:
# Could have disconnected other port, allow it to reconnect
# before exiting.
time.sleep(self.dut_console.CONNECT_TIME + DISCONNECT_TIME_SEC)
return False
def find_dut_to_plankton_connection(self):
"""Find the PD port which is connected to Plankton
@returns DUT pd port number if found, None otherwise
"""
for port in xrange(self.dut_console.PD_MAX_PORTS):
# Check for DUT to Plankton connection on port
if self._verify_plankton_connection(port):
# Plankton PD connection found so exit
return port
return None