普通文本  |  893行  |  36.42 KB

# Copyright 2017 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

import logging
import os
import pprint
import StringIO
import subprocess
import time

from autotest_lib.client.bin import utils
from autotest_lib.client.common_lib import error, utils
from autotest_lib.client.common_lib.cros import cr50_utils, tpm_utils
from autotest_lib.server.cros import debugd_dev_tools, gsutil_wrapper
from autotest_lib.server.cros.faft.firmware_test import FirmwareTest


class Cr50Test(FirmwareTest):
    """Base class that sets up helper objects/functions for cr50 tests."""
    version = 1

    CR50_GS_URL = 'gs://chromeos-localmirror-private/distfiles/chromeos-cr50-%s/'
    CR50_DEBUG_FILE = '*/cr50_dbg_%s.bin'
    CR50_PROD_FILE = 'cr50.%s.bin.prod'
    NONE = 0
    # Saved the original device state during init.
    INITIAL_STATE = 1 << 0
    # Saved the original image, the device image, and the debug image. These
    # images are needed to be able to restore the original image and board id.
    IMAGES = 1 << 1
    PP_SHORT_INTERVAL = 3
    CLEARED_FWMP_EXIT_STATUS = 1
    CLEARED_FWMP_ERROR_MSG = ('CRYPTOHOME_ERROR_FIRMWARE_MANAGEMENT_PARAMETERS'
                              '_INVALID')
    # Cr50 may have flash operation errors during the test. Here's an example
    # of one error message.
    # do_flash_op:245 errors 20 fsh_pe_control 40720004
    # The stuff after the ':' may change, but all flash operation errors
    # contain do_flash_op. do_flash_op is only ever printed if there is an
    # error during the flash operation. Just search for do_flash_op to simplify
    # the search string and make it applicable to all flash op errors.
    CR50_FLASH_OP_ERROR_MSG = 'do_flash_op'

    def initialize(self, host, cmdline_args, full_args,
            restore_cr50_state=False, cr50_dev_path='', provision_update=False):
        self._saved_state = self.NONE
        self._raise_error_on_mismatch = not restore_cr50_state
        self._provision_update = provision_update
        super(Cr50Test, self).initialize(host, cmdline_args)

        if not hasattr(self, 'cr50'):
            raise error.TestNAError('Test can only be run on devices with '
                                    'access to the Cr50 console')

        logging.info('Test Args: %r', full_args)

        self.can_set_ccd_level = (not self.cr50.using_ccd() or
            self.cr50.testlab_is_on())
        self.original_ccd_level = self.cr50.get_ccd_level()
        self.original_ccd_settings = self.cr50.get_cap_dict(
                info=self.cr50.CAP_SETTING)

        self.host = host
        tpm_utils.ClearTPMOwnerRequest(self.host, wait_for_ready=True)
        # Clear the FWMP, so it can't disable CCD.
        self.clear_fwmp()

        if self.can_set_ccd_level:
            # Lock cr50 so the console will be restricted
            self.cr50.set_ccd_level('lock')
        elif self.original_ccd_level != 'lock':
            raise error.TestNAError('Lock the console before running cr50 test')

        self._save_original_state()

        # Verify cr50 is still running the correct version
        cr50_qual_version = full_args.get('cr50_qual_version', '').strip()
        if cr50_qual_version:
            _, running_rw, running_bid = self.get_saved_cr50_original_version()
            expected_rw, expected_bid_sym = cr50_qual_version.split('/')
            expected_bid = cr50_utils.GetBoardIdInfoString(expected_bid_sym,
                                                           symbolic=False)
            logging.debug('Running %s %s Expect %s %s', running_rw, running_bid,
                          expected_rw, expected_bid)
            if running_rw != expected_rw or expected_bid != running_bid:
                raise error.TestError('Not running %s' % cr50_qual_version)

        # We successfully saved the device state
        self._saved_state |= self.INITIAL_STATE
        try:
            self._save_node_locked_dev_image(cr50_dev_path)
            self._save_original_images(full_args.get('release_path', ''))
            # We successfully saved the device images
            self._saved_state |= self.IMAGES
        except:
            if restore_cr50_state:
                raise


    def after_run_once(self):
        """Log which iteration just ran"""
        logging.info('successfully ran iteration %d', self.iteration)


    def _save_node_locked_dev_image(self, cr50_dev_path):
        """Save or download the node locked dev image.

        Args:
            cr50_dev_path: The path to the node locked cr50 image.
        """
        if os.path.isfile(cr50_dev_path):
            self._node_locked_cr50_image = cr50_dev_path
        else:
            devid = self.servo.get('cr50_devid')
            self._node_locked_cr50_image = self.download_cr50_debug_image(
                devid)[0]


    def _save_original_images(self, release_path):
        """Use the saved state to find all of the device images.

        This will download running cr50 image and the device image.

        Args:
            release_path: The release path given by test args
        """
        # Copy the prod and prepvt images from the DUT
        _, prod_rw, prod_bid = self._original_state['device_prod_ver']
        filename = 'prod_device_image_' + prod_rw
        self._device_prod_image = os.path.join(self.resultsdir,
                filename)
        self.host.get_file(cr50_utils.CR50_PROD,
                self._device_prod_image)

        if cr50_utils.HasPrepvtImage(self.host):
            _, prepvt_rw, prepvt_bid = self._original_state['device_prepvt_ver']
            filename = 'prepvt_device_image_' + prepvt_rw
            self._device_prepvt_image = os.path.join(self.resultsdir,
                    filename)
            self.host.get_file(cr50_utils.CR50_PREPVT,
                    self._device_prepvt_image)
            prepvt_bid = cr50_utils.GetBoardIdInfoString(prepvt_bid)
        else:
            self._device_prepvt_image = None
            prepvt_rw = None
            prepvt_bid = None

        if os.path.isfile(release_path):
            self._original_cr50_image = release_path
            logging.info('using supplied image')
            return
        # If the running cr50 image version matches the image on the DUT use
        # the DUT image as the original image. If the versions don't match
        # download the image from google storage
        _, running_rw, running_bid = self.get_saved_cr50_original_version()

        # Make sure prod_bid and running_bid are in the same format
        prod_bid = cr50_utils.GetBoardIdInfoString(prod_bid)
        running_bid = cr50_utils.GetBoardIdInfoString(running_bid)
        if running_rw == prod_rw and running_bid == prod_bid:
            logging.info('Using device cr50 prod image %s %s', prod_rw,
                    prod_bid)
            self._original_cr50_image = self._device_prod_image
        elif running_rw == prepvt_rw and running_bid == prepvt_bid:
            logging.info('Using device cr50 prepvt image %s %s', prepvt_rw,
                    prepvt_bid)
            self._original_cr50_image = self._device_prepvt_image
        else:
            logging.info('Downloading cr50 image %s %s', running_rw,
                    running_bid)
            self._original_cr50_image = self.download_cr50_release_image(
                running_rw, running_bid)[0]


    def _save_original_state(self):
        """Save the cr50 related state.

        Save the device's current cr50 version, cr50 board id, rlz, and image
        at /opt/google/cr50/firmware/cr50.bin.prod. These will be used to
        restore the state during cleanup.
        """
        self._original_state = self.get_cr50_device_state()


    def get_saved_cr50_original_version(self):
        """Return (ro ver, rw ver, bid)."""
        if ('running_ver' not in self._original_state or 'cr50_image_bid' not in
            self._original_state):
            raise error.TestError('No record of original cr50 image version')
        return (self._original_state['running_ver'][0],
                self._original_state['running_ver'][1],
                self._original_state['cr50_image_bid'])


    def get_saved_cr50_original_path(self):
        """Return the local path for the original cr50 image."""
        if not hasattr(self, '_original_cr50_image'):
            raise error.TestError('No record of original image')
        return self._original_cr50_image


    def has_saved_cr50_dev_path(self):
        """Returns true if we saved the node locked debug image."""
        return hasattr(self, '_node_locked_cr50_image')


    def get_saved_cr50_dev_path(self):
        """Return the local path for the cr50 dev image."""
        if not self.has_saved_cr50_dev_path():
            raise error.TestError('No record of debug image')
        return self._node_locked_cr50_image


    def _restore_original_image(self, chip_bid, chip_flags):
        """Restore the cr50 image and erase the state.

        Make 3 attempts to update to the original image. Use a rollback from
        the DBG image to erase the state that can only be erased by a DBG image.
        Set the chip board id during rollback

        Args:
            chip_bid: the integer representation of chip board id or None if the
                      board id should be erased
            chip_flags: the integer representation of chip board id flags or
                        None if the board id should be erased
        """
        for i in range(3):
            try:
                # Update to the node-locked DBG image so we can erase all of
                # the state we are trying to reset
                self.cr50_update(self._node_locked_cr50_image)

                # Rollback to the original cr50 image.
                self.cr50_update(self._original_cr50_image, rollback=True,
                                 chip_bid=chip_bid, chip_flags=chip_flags)
                break
            except Exception, e:
                logging.warning('Failed to restore original image attempt %d: '
                                '%r', i, e)


    def rootfs_verification_disable(self):
        """Remove rootfs verification."""
        if not self._rootfs_verification_is_disabled():
            logging.debug('Removing rootfs verification.')
            self.rootfs_tool.enable()


    def _rootfs_verification_is_disabled(self):
        """Returns true if rootfs verification is enabled."""
        # Clear the TPM owner before trying to check rootfs verification
        tpm_utils.ClearTPMOwnerRequest(self.host, wait_for_ready=True)
        self.rootfs_tool = debugd_dev_tools.RootfsVerificationTool()
        self.rootfs_tool.initialize(self.host)
        # rootfs_tool.is_enabled is True, that means rootfs verification is
        # disabled.
        return self.rootfs_tool.is_enabled()


    def _restore_original_state(self):
        """Restore the original cr50 related device state."""
        if not (self._saved_state & self.IMAGES):
            logging.warning('Did not save the original images. Cannot restore '
                            'state')
            return
        # Remove the prepvt image if the test installed one.
        if (not self._original_state['has_prepvt'] and
            cr50_utils.HasPrepvtImage(self.host)):
            self.host.run('rm %s' % cr50_utils.CR50_PREPVT)
        # If rootfs verification has been disabled, copy the cr50 device image
        # back onto the DUT.
        if self._rootfs_verification_is_disabled():
            cr50_utils.InstallImage(self.host, self._device_prod_image,
                    cr50_utils.CR50_PROD)
            # Install the prepvt image if there was one.
            if self._device_prepvt_image:
                cr50_utils.InstallImage(self.host, self._device_prepvt_image,
                        cr50_utils.CR50_PREPVT)

        chip_bid_info = self._original_state['chip_bid']
        bid_is_erased = chip_bid_info == cr50_utils.ERASED_CHIP_BID
        chip_bid = None if bid_is_erased else chip_bid_info[0]
        chip_flags = None if bid_is_erased else chip_bid_info[2]
        # Update to the original image and erase the board id
        self._restore_original_image(chip_bid, chip_flags)

        # Set the RLZ code
        cr50_utils.SetRLZ(self.host, self._original_state['rlz'])

        # Verify everything is still the same
        mismatch = self._check_original_state()
        if mismatch:
            raise error.TestError('Could not restore state: %s' % mismatch)

        logging.info('Successfully restored the original cr50 state')


    def get_cr50_device_state(self):
        """Get a dict with the current device cr50 information.

        The state dict will include the platform brand, rlz code, chip board id,
        the running cr50 image version, the running cr50 image board id, and the
        device cr50 image version.
        """
        state = {}
        state['mosys platform brand'] = self.host.run('mosys platform brand',
            ignore_status=True).stdout.strip()
        state['device_prod_ver'] = cr50_utils.GetBinVersion(self.host,
                cr50_utils.CR50_PROD)
        state['has_prepvt'] = cr50_utils.HasPrepvtImage(self.host)
        if state['has_prepvt']:
            state['device_prepvt_ver'] = cr50_utils.GetBinVersion(self.host,
                    cr50_utils.CR50_PREPVT)
        else:
            state['device_prepvt_ver'] = None
        state['rlz'] = cr50_utils.GetRLZ(self.host)
        state['chip_bid'] = cr50_utils.GetChipBoardId(self.host)
        state['chip_bid_str'] = '%08x:%08x:%08x' % state['chip_bid']
        state['running_ver'] = cr50_utils.GetRunningVersion(self.host)
        state['cr50_image_bid'] = self.cr50.get_active_board_id_str()

        logging.debug('Current Cr50 state:\n%s', pprint.pformat(state))
        return state


    def _check_original_state(self):
        """Compare the current cr50 state to the original state.

        Returns:
            A dictionary with the state that is wrong as the key and
            the new and old state as the value
        """
        if not (self._saved_state & self.INITIAL_STATE):
            logging.warning('Did not save the original state. Cannot verify it '
                            'matches')
            return
        # Make sure the /var/cache/cr50* state is up to date.
        cr50_utils.ClearUpdateStateAndReboot(self.host)

        mismatch = {}
        new_state = self.get_cr50_device_state()

        for k, new_val in new_state.iteritems():
            original_val = self._original_state[k]
            if new_val != original_val:
                mismatch[k] = 'old: %s, new: %s' % (original_val, new_val)

        if mismatch:
            logging.warning('State Mismatch:\n%s', pprint.pformat(mismatch))
        else:
            logging.info('The device is in the original state')
        return mismatch


    def _reset_ccd_settings(self):
        """Reset the ccd lock and capability states."""
        # Clear the password if one was set.
        if self.cr50.get_ccd_info()['Password'] != 'none':
            self.servo.set_nocheck('cr50_testlab', 'open')
            self.cr50.send_command('ccd reset')
            if self.cr50.get_ccd_info()['Password'] != 'none':
                raise error.TestFail('Could not clear password')

        current_settings = self.cr50.get_cap_dict(info=self.cr50.CAP_SETTING)
        if self.original_ccd_settings != current_settings:
            if not self.can_set_ccd_level:
                raise error.TestError("CCD state has changed, but we can't "
                        "restore it")
            self.fast_open(True)
            self.cr50.set_caps(self.original_ccd_settings)

        # First try using testlab open to open the device
        if self.cr50.testlab_is_on() and self.original_ccd_level == 'open':
            self.servo.set_nocheck('cr50_testlab', 'open')
        if self.original_ccd_level != self.cr50.get_ccd_level():
            self.cr50.set_ccd_level(self.original_ccd_level)



    def cleanup(self):
        """Attempt to cleanup the cr50 state. Then run firmware cleanup"""
        try:
            self._restore_cr50_state()
        finally:
            super(Cr50Test, self).cleanup()

        # Check the logs captured during firmware_test cleanup for cr50 errors.
        self._get_cr50_stats_from_uart_capture()


    def _get_cr50_stats_from_uart_capture(self):
        """Check cr50 uart output for errors.

        Use the logs captured during firmware_test cleanup to check for cr50
        errors. Flash operation issues aren't obvious unless you check the logs.
        All flash op errors print do_flash_op and it isn't printed during normal
        operation. Open the cr50 uart file and count the number of times this is
        printed. Log the number of errors.
        """
        if not hasattr(self, 'cr50_uart_file'):
            logging.info('There is not a cr50 uart file')
            return

        flash_error_count = 0
        with open(self.cr50_uart_file, 'r') as f:
            for line in f:
                if self.CR50_FLASH_OP_ERROR_MSG in line:
                    flash_error_count += 1

        # Log any flash operation errors.
        logging.info('do_flash_op count: %d', flash_error_count)


    def _restore_cr50_state(self):
        """Restore cr50 state, so the device can be used for further testing"""
        state_mismatch = self._check_original_state()
        if state_mismatch and not self._provision_update:
            self._restore_original_state()
            if self._raise_error_on_mismatch:
                raise error.TestError('Unexpected state mismatch during '
                                      'cleanup %s' % state_mismatch)

        # Try to open cr50 and enable testlab mode if it isn't enabled.
        try:
            self.fast_open(True)
        except:
            # Even if we can't open cr50, do our best to reset the rest of the
            # system state. Log a warning here.
            logging.warning('Unable to Open cr50', exc_info=True)
        # Reset the password as the first thing in cleanup. It is important that
        # if some other part of cleanup fails, the password has at least been
        # reset.
        self.cr50.send_command('rddkeepalive disable')
        self.cr50.send_command('ccd reset')
        self.cr50.send_command('wp follow_batt_pres atboot')

        # Reboot cr50 if the console is accessible. This will reset most state.
        if self.cr50.get_cap('GscFullConsole')[self.cr50.CAP_IS_ACCESSIBLE]:
            self.cr50.reboot()

        # Reenable servo v4 CCD
        self.cr50.ccd_enable()

        # reboot to normal mode if the device is in dev mode.
        self.enter_mode_after_checking_tpm_state('normal')

        tpm_utils.ClearTPMOwnerRequest(self.host, wait_for_ready=True)
        self.clear_fwmp()

        # Restore the ccd privilege level
        if hasattr(self, 'original_ccd_level'):
            self._reset_ccd_settings()


    def find_cr50_gs_image(self, filename, image_type=None):
        """Find the cr50 gs image name.

        Args:
            filename: the cr50 filename to match to
            image_type: release or debug. If it is not specified we will search
                        both the release and debug directories
        Returns:
            a tuple of the gsutil bucket, filename
        """
        gs_url = self.CR50_GS_URL % (image_type if image_type else '*')
        gs_filename = os.path.join(gs_url, filename)
        bucket, gs_filename = utils.gs_ls(gs_filename)[0].rsplit('/', 1)
        return bucket, gs_filename


    def download_cr50_gs_image(self, filename, image_bid='', bucket=None,
                               image_type=None):
        """Get the image from gs and save it in the autotest dir.

        Args:
            filename: The cr50 image basename
            image_bid: the board id info list or string. It will be added to the
                       filename.
            bucket: The gs bucket name
            image_type: 'debug' or 'release'. This will be used to determine
                        the bucket if the bucket is not given.
        Returns:
            A tuple with the local path and version
        """
        # Add the image bid string to the filename
        if image_bid:
            bid_str = cr50_utils.GetBoardIdInfoString(image_bid,
                                                       symbolic=True)
            filename += '.' + bid_str.replace(':', '_')

        if not bucket:
            bucket, filename = self.find_cr50_gs_image(filename, image_type)

        remote_temp_dir = '/tmp/'
        src = os.path.join(remote_temp_dir, filename)
        dest = os.path.join(self.resultsdir, filename)

        # Copy the image to the dut
        gsutil_wrapper.copy_private_bucket(host=self.host,
                                           bucket=bucket,
                                           filename=filename,
                                           destination=remote_temp_dir)

        self.host.get_file(src, dest)
        ver = cr50_utils.GetBinVersion(self.host, src)

        # Compare the image board id to the downloaded image to make sure we got
        # the right file
        downloaded_bid = cr50_utils.GetBoardIdInfoString(ver[2], symbolic=True)
        if image_bid and bid_str != downloaded_bid:
            raise error.TestError('Could not download image with matching '
                                  'board id wanted %s got %s' % (bid_str,
                                  downloaded_bid))
        return dest, ver


    def download_cr50_debug_image(self, devid, image_bid=''):
        """download the cr50 debug file.

        Get the file with the matching devid and image board id info

        Args:
            devid: the cr50_devid string '${DEVID0} ${DEVID1}'
            image_bid: the image board id info string or list
        Returns:
            A tuple with the debug image local path and version
        """
        # Debug images are node locked with the devid. Add the devid to the
        # filename
        filename = self.CR50_DEBUG_FILE % (devid.replace(' ', '_'))

        # Download the image
        dest, ver = self.download_cr50_gs_image(filename, image_bid=image_bid,
                                                image_type='debug')

        return dest, ver


    def download_cr50_release_image(self, image_rw, image_bid=''):
        """download the cr50 release file.

        Get the file with the matching version and image board id info

        Args:
            image_rw: the rw version string
            image_bid: the image board id info string or list
        Returns:
            A tuple with the release image local path and version
        """
        # Release images can be found using the rw version
        filename = self.CR50_PROD_FILE % image_rw

        # Download the image
        dest, ver = self.download_cr50_gs_image(filename, image_bid=image_bid,
                                                image_type='release')

        # Compare the rw version and board id info to make sure the right image
        # was found
        if image_rw != ver[1]:
            raise error.TestError('Could not download image with matching '
                                  'rw version')
        return dest, ver


    def _cr50_verify_update(self, expected_rw, expect_rollback):
        """Verify the expected version is running on cr50.

        Args:
            expected_rw: The RW version string we expect to be running
            expect_rollback: True if cr50 should have rolled back during the
                             update

        Raises:
            TestFail if there is any unexpected update state
        """
        errors = []
        running_rw = self.cr50.get_version()
        if expected_rw != running_rw:
            errors.append('running %s not %s' % (running_rw, expected_rw))

        if expect_rollback != self.cr50.rolledback():
            errors.append('%srollback detected' %
                          'no ' if expect_rollback else '')
        if len(errors):
            raise error.TestFail('cr50_update failed: %s' % ', '.join(errors))
        logging.info('RUNNING %s after %s', expected_rw,
                     'rollback' if expect_rollback else 'update')


    def _cr50_run_update(self, path):
        """Install the image at path onto cr50.

        Args:
            path: the location of the image to update to

        Returns:
            the rw version of the image
        """
        tmp_dest = '/tmp/' + os.path.basename(path)

        dest, image_ver = cr50_utils.InstallImage(self.host, path, tmp_dest)
        cr50_utils.GSCTool(self.host, ['-a', dest])
        return image_ver[1]


    def cr50_update(self, path, rollback=False, erase_nvmem=False,
                    expect_rollback=False, chip_bid=None, chip_flags=None):
        """Attempt to update to the given image.

        If rollback is True, we assume that cr50 is already running an image
        that can rollback.

        Args:
            path: the location of the update image
            rollback: True if we need to force cr50 to rollback to update to
                      the given image
            erase_nvmem: True if we need to erase nvmem during rollback
            expect_rollback: True if cr50 should rollback on its own
            chip_bid: the integer representation of chip board id or None if the
                      board id should be erased during rollback
            chip_flags: the integer representation of chip board id flags or
                        None if the board id should be erased during rollback

        Raises:
            TestFail if the update failed
        """
        original_rw = self.cr50.get_version()

        # Cr50 is going to reject an update if it hasn't been up for more than
        # 60 seconds. Wait until that passes before trying to run the update.
        self.cr50.wait_until_update_is_allowed()

        image_rw = self._cr50_run_update(path)

        # Running the update may cause cr50 to reboot. Wait for that before
        # sending more commands. The reboot should happen quickly. Wait a
        # maximum of 10 seconds.
        self.cr50.wait_for_reboot(timeout=10)

        if erase_nvmem and rollback:
            self.cr50.erase_nvmem()

        if rollback:
            self.cr50.rollback(chip_bid=chip_bid, chip_flags=chip_flags)

        expected_rw = original_rw if expect_rollback else image_rw
        # If we expect a rollback, the version should remain unchanged
        self._cr50_verify_update(expected_rw, rollback or expect_rollback)


    def ccd_open_from_ap(self):
        """Start the open process and press the power button."""
        self._ccd_open_last_len = 0

        self._ccd_open_stdout = StringIO.StringIO()

        ccd_open_cmd = utils.sh_escape('gsctool -a -o')
        full_ssh_cmd = '%s "%s"' % (self.host.ssh_command(options='-tt'),
            ccd_open_cmd)
        # Start running the Cr50 Open process in the background.
        self._ccd_open_job = utils.BgJob(full_ssh_cmd,
                                         nickname='ccd_open',
                                         stdout_tee=self._ccd_open_stdout,
                                         stderr_tee=utils.TEE_TO_LOGS)

        if self._ccd_open_job == None:
            raise error.TestFail('could not start ccd open')

        try:
            # Wait for the first gsctool power button prompt before starting the
            # open process.
            logging.info(self._get_ccd_open_output())
            # Cr50 starts out by requesting 5 quick presses then 4 longer
            # power button presses. Run the quick presses without looking at the
            # command output, because getting the output can take some time. For
            # the presses that require a 1 minute wait check the output between
            # presses, so we can catch errors
            #
            # run quick presses for 30 seconds. It may take a couple of seconds
            # for open to start. 10 seconds should be enough. 30 is just used
            # because it will definitely be enough, and this process takes 300
            # seconds, so doing quick presses for 30 seconds won't matter.
            end_time = time.time() + 30
            while time.time() < end_time:
                self.servo.power_short_press()
                logging.info('short int power button press')
                time.sleep(self.PP_SHORT_INTERVAL)
            # Poll the output and press the power button for the longer presses.
            utils.wait_for_value(self._check_open_and_press_power_button,
                expected_value=True, timeout_sec=self.cr50.PP_LONG)
        except Exception, e:
            logging.info(e)
            raise
        finally:
            self._close_ccd_open_job()
        logging.info(self.cr50.get_ccd_info())


    def _check_open_and_press_power_button(self):
        """Check stdout and press the power button if prompted.

        Returns:
            True if the process is still running.
        """
        logging.info(self._get_ccd_open_output())
        self.servo.power_short_press()
        logging.info('long int power button press')
        # Give cr50 some time to complete the open process. After the last
        # power button press cr50 erases nvmem and resets the dut before setting
        # the state to open. Wait a bit so we don't check the ccd state in the
        # middle of this reset process. Power button requests happen once a
        # minute, so waiting 10 seconds isn't a big deal.
        time.sleep(10)
        return (self._ccd_open_job.sp.poll() is not None or 'Open' in
                self.cr50.get_ccd_info()['State'])


    def _get_ccd_open_output(self):
        """Read the new output."""
        self._ccd_open_job.process_output()
        self._ccd_open_stdout.seek(self._ccd_open_last_len)
        output = self._ccd_open_stdout.read()
        self._ccd_open_last_len = self._ccd_open_stdout.len
        return output


    def _close_ccd_open_job(self):
        """Terminate the process and check the results."""
        exit_status = utils.nuke_subprocess(self._ccd_open_job.sp)
        stdout = self._ccd_open_stdout.getvalue().strip()
        delattr(self, '_ccd_open_job')
        if stdout:
            logging.info('stdout of ccd open:\n%s', stdout)
        if exit_status:
            logging.info('exit status: %d', exit_status)
        if 'Error' in stdout:
            raise error.TestFail('ccd open Error %s' %
                                 stdout.split('Error')[-1])
        if self.cr50.OPEN != self.cr50.get_ccd_level():
            raise error.TestFail('unable to open cr50: %s' % stdout)
        else:
            logging.info('Opened Cr50')


    def fast_open(self, enable_testlab=False):
        """Try to use testlab open. If that fails, do regular ap open.

        Args:
            enable_testlab: If True, enable testlab mode after cr50 is open.
        """
        # Try to use testlab open first, so we don't have to wait for the
        # physical presence check.
        self.cr50.send_command('ccd testlab open')
        if self.cr50.get_ccd_level() == 'open':
            return

        # Use the console to open cr50 without entering dev mode if possible. It
        # takes longer and relies on more systems to enter dev mode and ssh into
        # the AP. Skip the steps that aren't required.
        if not self.cr50.get_cap('OpenNoDevMode')[self.cr50.CAP_IS_ACCESSIBLE]:
            self.enter_mode_after_checking_tpm_state('dev')

        if self.cr50.get_cap('OpenFromUSB')[self.cr50.CAP_IS_ACCESSIBLE]:
            self.cr50.set_ccd_level(self.cr50.OPEN)
        else:
            self.ccd_open_from_ap()

        if enable_testlab:
            self.cr50.set_ccd_testlab('on')

        # Make sure the device is in normal mode. After opening cr50, the TPM
        # should be cleared and the device should automatically reset to normal
        # mode. Just check to be consistent. It's possible capabilitiy settings
        # are set to skip wiping the TPM.
        self.enter_mode_after_checking_tpm_state('normal')


    def run_gsctool_cmd_with_password(self, password, cmd, name, expect_error):
        """Run a gsctool command and input the password

        Args:
            password: The cr50 password string
            cmd: The gsctool command
            name: The name to give the job
            expect_error: True if the command should fail
        """
        set_pwd_cmd = utils.sh_escape(cmd)
        full_ssh_command = '%s "%s"' % (self.host.ssh_command(options='-tt'),
            set_pwd_cmd)
        stdout = StringIO.StringIO()
        # Start running the gsctool Command in the background.
        gsctool_job = utils.BgJob(full_ssh_command,
                                  nickname='%s_with_password' % name,
                                  stdout_tee=stdout,
                                  stderr_tee=utils.TEE_TO_LOGS,
                                  stdin=subprocess.PIPE)
        if gsctool_job == None:
            raise error.TestFail('could not start gsctool command %r', cmd)

        try:
            # Wait for enter prompt
            gsctool_job.process_output()
            logging.info(stdout.getvalue().strip())
            # Enter the password
            gsctool_job.sp.stdin.write(password + '\n')

            # Wait for re-enter prompt
            gsctool_job.process_output()
            logging.info(stdout.getvalue().strip())
            # Re-enter the password
            gsctool_job.sp.stdin.write(password + '\n')
            time.sleep(self.cr50.CONSERVATIVE_CCD_WAIT)
            gsctool_job.process_output()
        finally:
            exit_status = utils.nuke_subprocess(gsctool_job.sp)
            output = stdout.getvalue().strip()
            logging.info('%s stdout: %s', name, output)
            logging.info('%s exit status: %s', name, exit_status)
            if exit_status:
                message = ('gsctool %s failed using %r: %s %s' %
                           (name, password, exit_status, output))
                if expect_error:
                    logging.info(message)
                else:
                    raise error.TestFail(message)
            elif expect_error:
                raise error.TestFail('%s with %r did not fail when expected' %
                                     (name, password))


    def set_ccd_password(self, password, expect_error=False):
        """Set the ccd password"""
        # If for some reason the test sets a password and is interrupted before
        # we can clear it, we want testlab mode to be enabled, so it's possible
        # to clear the password without knowing it.
        if not self.cr50.testlab_is_on():
            raise error.TestError('Will not set password unless testlab mode '
                                  'is enabled.')
        self.run_gsctool_cmd_with_password(
                password, 'gsctool -a -P', 'set_password', expect_error)


    def ccd_unlock_from_ap(self, password=None, expect_error=False):
        """Unlock cr50"""
        if not password:
            self.host.run('gsctool -a -U')
            return
        self.run_gsctool_cmd_with_password(
                password, 'gsctool -a -U', 'unlock', expect_error)


    def enter_mode_after_checking_tpm_state(self, mode):
        """Reboot to mode if cr50 doesn't already match the state"""
        # If the device is already in the correct mode, don't do anything
        if (mode == 'dev') == self.cr50.in_dev_mode():
            logging.info('already in %r mode', mode)
            return

        self.switcher.reboot_to_mode(to_mode=mode)

        if (mode == 'dev') != self.cr50.in_dev_mode():
            raise error.TestError('Unable to enter %r mode' % mode)


    def _fwmp_is_cleared(self):
        """Return True if the FWMP has been created"""
        res = self.host.run('cryptohome '
                            '--action=get_firmware_management_parameters',
                            ignore_status=True)
        if res.exit_status and res.exit_status != self.CLEARED_FWMP_EXIT_STATUS:
            raise error.TestError('Could not run cryptohome command %r' % res)
        return self.CLEARED_FWMP_ERROR_MSG in res.stdout


    def clear_fwmp(self):
        """Clear the FWMP"""
        if self._fwmp_is_cleared():
            return
        status = self.host.run('cryptohome --action=tpm_status').stdout
        logging.debug(status)
        if 'TPM Owned: true' not in status:
            self.host.run('cryptohome --action=tpm_take_ownership')
            self.host.run('cryptohome --action=tpm_wait_ownership')
        self.host.run('cryptohome '
                      '--action=remove_firmware_management_parameters')

    def tpm_is_responsive(self):
        """Check TPM responsiveness by running tpm_version."""
        result = self.host.run('tpm_version', ignore_status=True)
        logging.debug(result.stdout.strip())
        return not result.exit_status