普通文本  |  170行  |  5.66 KB

# Copyright 2017 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

import logging
import os
import re

from autotest_lib.client.common_lib import error


RO = 'ro'
RW = 'rw'
CR50_FILE = '/opt/google/cr50/firmware/cr50.bin.prod'
CR50_STATE = '/var/cache/cr50*'
GET_CR50_VERSION = 'cat /var/cache/cr50-version'
GET_CR50_MESSAGES ='grep "cr50-.*\[" /var/log/messages'
UPDATE_FAILURE = 'unexpected cr50-update exit code'
# This dictionary is used to search the usb_updater output for the version
# strings. There are two usb_updater commands that will return versions:
# 'fwver' and 'binver'.
#
# 'fwver'   is used to get the running RO and RW versions from cr50
# 'binver'  gets the version strings for each RO and RW region in the given
#           file
#
# The value in the dictionary is the regular expression that can be used to
# find the version strings for each region.
VERSION_RE = {
    "--fwver" : '\nRO (?P<ro>\S+).*\nRW (?P<rw>\S+)',
    "--binver" : 'RO_A:(?P<ro_a>\S+).*RW_A:(?P<rw_a>\S+).*' \
           'RO_B:(?P<ro_b>\S+).*RW_B:(?P<rw_b>\S+)',
}


def GetVersion(versions, name):
    """Return the version string from the dictionary.

    Get the version for each key in the versions dictionary that contains the
    substring name. Make sure all of the versions match and return the version
    string. Raise an error if the versions don't match.

    @param version: dictionary with the partition names as keys and the
                    partition version strings as values.
    @param name: the string used to find the relevant items in versions.
    """
    ver = None
    for k, v in versions.iteritems():
        if name in k:
            if ver and ver != v:
                raise error.TestFail("Versions don't match %s %s" % (ver, v))
            else:
                ver = v
    return ver


def FindVersion(output, arg):
    """Find the ro and rw versions.

    @param output: The string to search
    @param arg: string representing the usb_updater option, either
                '--binver' or '--fwver'
    @param compare: raise an error if the ro or rw versions don't match
    """
    versions = re.search(VERSION_RE[arg], output)
    versions = versions.groupdict()
    ro = GetVersion(versions, RO)
    rw = GetVersion(versions, RW)
    return ro, rw


def GetSavedVersion(client):
    """Return the saved version from /var/cache/cr50-version"""
    result = client.run(GET_CR50_VERSION).stdout.strip()
    return FindVersion(result, "--fwver")


def GetVersionFromUpdater(client, args):
    """Return the version from usb_updater"""
    result = client.run("usb_updater %s" % ' '.join(args)).stdout.strip()
    return FindVersion(result, args[0])


def GetFwVersion(client):
    """Get the running version using 'usb_updater --fwver'"""
    return GetVersionFromUpdater(client, ["--fwver"])


def GetBinVersion(client, image=CR50_FILE):
    """Get the image version using 'usb_updater --binver image'"""
    return GetVersionFromUpdater(client, ["--binver", image])


def CompareVersions(name_a, ver_a, rw_a, ver_b):
    """Compare ver_a to ver_b. Raise an error if they aren't the same"""
    if ver_a != ver_b:
        raise error.TestFail("Versions do not match: %s RO %s RW %s %s RO %s " \
                             "RW %s" % (name_a, ver_a[0], ver_a[1], name_b,
                                        ver_b[0], ver_b[1]))


def GetRunningVersion(client):
    """Get the running Cr50 version.

    The version from usb_updater and /var/cache/cr50-version should be the
    same. Get both versions and make sure they match.

    Returns:
        running_ver: a tuple with the ro and rw version strings
    Raises:
        TestFail
        - If the version in /var/cache/cr50-version is not the same as the
          version from 'usb_updater --fwver'
    """
    running_ver = GetFwVersion(client)
    saved_ver = GetSavedVersion(client)

    CompareVersions("Running", running_ver, "Saved", saved_ver)
    return running_ver


def CheckForFailures(client, last_message):
    """Check for any unexpected cr50-update exit codes.

    This only checks the cr50 update messages that have happened since
    last_message. If a unexpected exit code is detected it will raise an error>

    Args:
        last_message: the last cr50 message from the last update run

    Returns:
        the last cr50 message in /var/log/messages

    Raises:
        TestFail
            - If there is a unexpected cr50-update exit code after last_message
              in /var/log/messages
    """
    messages = client.run(GET_CR50_MESSAGES).stdout.strip()
    if last_message:
        messages = messages.rsplit(last_message, 1)[-1]
        if UPDATE_FAILURE in messages:
            logging.debug(messages)
            raise error.TestFail("Detected unexpected exit code during update")
    return messages.rsplit('\n', 1)[-1]


def VerifyUpdate(client, ver=None, last_message=''):
    """Verify that the saved update state is correct and there were no
    unexpected cr50-update exit codes since the last update.

    Returns:
        new_ver: a tuple containing the running ro and rw versions
        last_message: The last cr50 update message in /var/log/messages
    """
    # Check that there were no unexpected reboots from cr50-result
    last_message = CheckForFailures(client, last_message)
    logging.debug("last cr50 message %s", last_message)

    new_ver = GetRunningVersion(client)
    if ver:
        CompareVersions("Old", ver, "Updated", new_ver)
    return new_ver, last_message


def ClearUpdateStateAndReboot(client):
    """Removes the cr50 status files in /var/cache and reboots the AP"""
    client.run("rm %s" % CR50_STATE)
    client.reboot()