普通文本  |  438行  |  15.8 KB

# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

"""Utilities for cellular tests."""
import copy, dbus, os, tempfile

# TODO(thieule): Consider renaming mm.py, mm1.py, modem.py, etc to be more
# descriptive (crosbug.com/37060).
import common
from autotest_lib.client.bin import utils
from autotest_lib.client.common_lib import error
from autotest_lib.client.cros.cellular import cellular
from autotest_lib.client.cros.cellular import cellular_system_error
from autotest_lib.client.cros.cellular import mm
from autotest_lib.client.cros.cellular import modem

from autotest_lib.client.cros import flimflam_test_path
import flimflam


TIMEOUT = 30
SERVICE_TIMEOUT = 60

import cellular_logging

logger = cellular_logging.SetupCellularLogging('cell_tools')


def ConnectToCellular(flim, timeout=TIMEOUT):
    """Attempts to connect to a cell network using FlimFlam.

    Args:
        flim: A flimflam object
        timeout: Timeout (in seconds) before giving up on connect

    Returns:
        a tuple of the service and the service state

    Raises:
        Error if connection fails or times out
    """

    service = flim.FindCellularService(timeout=timeout)
    if not service:
        raise cellular_system_error.ConnectionFailure(
            'Could not find cell service')
    properties = service.GetProperties(utf8_strings=True)
    logger.error('Properties are: %s', properties)

    logger.info('Connecting to cell service: %s', service)

    states = ['portal', 'online', 'idle']
    state = flim.WaitForServiceState(service=service,
                                     expected_states=states,
                                     timeout=timeout,
                                     ignore_failure=True)[0]
    logger.debug('Cell connection state : %s ' % state)
    connected_states = ['portal', 'online']
    if state in connected_states:
        logger.debug('Looks good, skip ConnectService')
        return service, state
    else:
        logger.debug('Trying to ConnectService')

    success, status = flim.ConnectService(
        service=service,
        assoc_timeout=timeout,
        config_timeout=timeout)

    if not success:
        logger.error('Connect failed: %s' % status)
        # TODO(rochberg):  Turn off autoconnect
        if 'Error.AlreadyConnected' not in status['reason']:
            raise cellular_system_error.ConnectionFailure(
                'Could not connect: %s.' % status)

    state = flim.WaitForServiceState(service=service,
                                     expected_states=connected_states,
                                     timeout=timeout,
                                     ignore_failure=True)[0]
    if not state in connected_states:
        raise cellular_system_error.BadState(
            'Still in state %s, expecting one of: %s ' %
            (state, str(connected_states)))

    return service, state


def FindLastGoodAPN(service, default=None):
    if not service:
        return default
    props = service.GetProperties()
    if 'Cellular.LastGoodAPN' not in props:
        return default
    last_good_apn = props['Cellular.LastGoodAPN']
    return last_good_apn.get('apn', default)


def DisconnectFromCellularService(bs, flim, service):
    """Attempts to disconnect from the supplied cellular service.

    Args:
        bs:  A basestation object.  Pass None to skip basestation-side checks
        flim:  A flimflam object
        service:  A cellular service object
    """

    flim.DisconnectService(service)  # Waits for flimflam state to go to idle

    if bs:
        verifier = bs.GetAirStateVerifier()
        # This is racy: The modem is free to report itself as
        # disconnected before it actually finishes tearing down its RF
        # connection.
        verifier.AssertDataStatusIn([
            cellular.UeGenericDataStatus.DISCONNECTING,
            cellular.UeGenericDataStatus.REGISTERED,
            cellular.UeGenericDataStatus.NONE,])

        def _ModemIsFullyDisconnected():
            return verifier.IsDataStatusIn([
                cellular.UeGenericDataStatus.REGISTERED,
                cellular.UeGenericDataStatus.NONE,])

        utils.poll_for_condition(
            _ModemIsFullyDisconnected,
            timeout=20,
            exception=cellular_system_error.BadState(
                'modem not disconnected from base station'))


def _EnumerateModems(manager):
    """Get a set of modem paths."""
    return set([x[1] for x in mm.EnumerateDevices(manager)])


def _SawNewModem(manager, preexisting_modems, old_modem):
    current_modems = _EnumerateModems(manager)
    if old_modem in current_modems:
        return False
    # NB: This fails if an unrelated modem disappears.  Not fixing
    # until we support > 1 modem
    return preexisting_modems != current_modems


def _WaitForModemToReturn(manager, preexisting_modems_original, modem_path):
    preexisting_modems = copy.copy(preexisting_modems_original)
    preexisting_modems.remove(modem_path)

    utils.poll_for_condition(
        lambda: _SawNewModem(manager, preexisting_modems, modem_path),
        timeout=50,
        exception=cellular_system_error.BadState(
            'Modem did not come back after settings change'))

    current_modems = _EnumerateModems(manager)

    new_modems = [x for x in current_modems - preexisting_modems]
    if len(new_modems) != 1:
        raise cellular_system_error.BadState(
            'Unexpected modem list change: %s vs %s' %
            (current_modems, new_modems))

    logger.info('New modem: %s' % new_modems[0])
    return new_modems[0]


def SetFirmwareForTechnologyFamily(manager, modem_path, family):
    """Set the modem to firmware.  Return potentially-new modem path."""
    # todo(byronk): put this in a modem object?
    if family == cellular.TechnologyFamily.LTE:
        return  # nothing to set up on a Pixel. todo(byronk) how about others?
    logger.debug('SetFirmwareForTechnologyFamily : manager : %s ' % manager)
    logger.debug('SetFirmwareForTechnologyFamily : modem_path : %s ' %
                 modem_path)
    logger.debug('SetFirmwareForTechnologyFamily : family : %s ' % family)
    preexisting_modems = _EnumerateModems(manager)
    # We do not currently support any multi-family modems besides Gobi
    gobi = manager.GetModem(modem_path).GobiModem()
    if not gobi:
        raise cellular_system_error.BadScpiCommand(
            'Modem %s does not support %s, cannot change technologies' %
            modem_path, family)

    logger.info('Changing firmware to technology family %s' % family)

    FamilyToCarrierString = {
            cellular.TechnologyFamily.UMTS: 'Generic UMTS',
            cellular.TechnologyFamily.CDMA: 'Verizon Wireless',}

    gobi.SetCarrier(FamilyToCarrierString[family])
    return _WaitForModemToReturn(manager, preexisting_modems, modem_path)


# A test PRL that has an ID of 3333 and sets the device to aquire the
# default config of an 8960 with system_id 331.  Base64 encoding
# Generated with "base64 < prl"

TEST_PRL_3333 = (
    'ADENBQMAAMAAAYADAgmABgIKDQsEAYAKDUBAAQKWAAICQGAJApYAAgIw8BAAAQDhWA=='.
    decode('base64_codec'))


# A modem with this MDN will always report itself as activated
TESTING_MDN = dbus.String('1115551212', variant_level=1)


def _IsCdmaModemConfiguredCorrectly(manager, modem_path):
    """Returns true iff the CDMA modem at modem_path is configured correctly."""
    # We don't test for systemID because the PRL should take care of
    # that.

    status = manager.GetModem(modem_path).SimpleModem().GetStatus()

    required_settings = {'mdn': TESTING_MDN,
                         'min': TESTING_MDN,
                         'prl_version': 3333}
    configured_correctly = True

    for rk, rv in required_settings.iteritems():
        if rk not in status or rv != status[rk]:
            logger.error('_CheckCdmaModemStatus:  %s: expected %s, got %s' % (
                rk, rv, status.get(rk)))
            configured_correctly = False
    return configured_correctly


def PrepareCdmaModem(manager, modem_path):
    """Configure a CDMA device (including PRL, MIN, and MDN)."""

    if _IsCdmaModemConfiguredCorrectly(manager, modem_path):
        return modem_path

    logger.info('Updating modem settings')
    preexisting_modems = _EnumerateModems(manager)
    cdma = manager.GetModem(modem_path).CdmaModem()

    with tempfile.NamedTemporaryFile() as f:
        os.chmod(f.name, 0744)
        f.write(TEST_PRL_3333)
        f.flush()
        logger.info('Calling ActivateManual to change PRL')

        cdma.ActivateManual({
            'mdn': TESTING_MDN,
            'min': TESTING_MDN,
            'prlfile': dbus.String(f.name, variant_level=1),
            'system_id': dbus.UInt16(331, variant_level=1),  # Default 8960 SID
            'spc': dbus.String('000000'),})
        new_path = _WaitForModemToReturn(
            manager, preexisting_modems, modem_path)

    if not _IsCdmaModemConfiguredCorrectly(manager, new_path):
        raise cellular_system_error.BadState('Modem configuration failed')
    return new_path


def PrepareModemForTechnology(modem_path, target_technology):
    """Prepare modem for the technology: Sets things like firmware, PRL."""

    manager, modem_path = mm.PickOneModem(modem_path)

    logger.info('Found modem %s' % modem_path)


    # todo(byronk) : This returns TechnologyFamily:UMTS on a Pixel. ????
    current_family = manager.GetModem(modem_path).GetCurrentTechnologyFamily()
    target_family = cellular.TechnologyToFamily[target_technology]

    if current_family != target_family:
        logger.debug('Modem Current Family: %s ' % current_family)
        logger.debug('Modem Target Family : %s ' %target_family )
        modem_path = SetFirmwareForTechnologyFamily(
            manager, modem_path, target_family)

    if target_family == cellular.TechnologyFamily.CDMA:
        modem_path = PrepareCdmaModem(manager, modem_path)
        # Force the modem to report that is has been activated since we
        # use a custom PRL and have already manually activated it.
        manager.GetModem(modem_path).GobiModem().ForceModemActivatedStatus()

    # When testing EVDO, we need to force the modem to register with EVDO
    # directly (bypassing CDMA 1x RTT) else the modem will not register
    # properly because it looks for CDMA 1x RTT first but can't find it
    # because the call box can only emulate one technology at a time (EVDO).
    try:
        if target_technology == cellular.Technology.EVDO_1X:
            network_preference = modem.Modem.NETWORK_PREFERENCE_EVDO_1X
        else:
            network_preference = modem.Modem.NETWORK_PREFERENCE_AUTOMATIC
        gobi = manager.GetModem(modem_path).GobiModem()
        gobi.SetNetworkPreference(network_preference)
    except AttributeError:
        # Not a Gobi modem
        pass

    return modem_path


def FactoryResetModem(modem_pattern, spc='000000'):
    """Factory resets modem, returns DBus pathname of modem after reset."""
    manager, modem_path = mm.PickOneModem(modem_pattern)
    preexisting_modems = _EnumerateModems(manager)
    modem = manager.GetModem(modem_path).Modem()
    modem.FactoryReset(spc)
    return _WaitForModemToReturn(manager, preexisting_modems, modem_path)


class OtherDeviceShutdownContext(object):
    """Context manager that shuts down other devices.

    Usage:
        with cell_tools.OtherDeviceShutdownContext('cellular'):
            block

    TODO(rochberg):  Replace flimflam.DeviceManager with this
    """

    def __init__(self, device_type):
        self.device_type = device_type
        self.device_manager = None

    def __enter__(self):
        self.device_manager = flimflam.DeviceManager(flimflam.FlimFlam())
        self.device_manager.ShutdownAllExcept(self.device_type)
        return self

    def __exit__(self, exception, value, traceback):
        if self.device_manager:
            self.device_manager.RestoreDevices()
        return False


class AutoConnectContext(object):
    """Context manager which sets autoconnect to either true or false.

       Enable or Disable autoconnect for the cellular service.
       Restore it when done.

       Usage:
           with cell_tools.DisableAutoConnectContext(device, flim, autoconnect):
               block
    """

    def __init__(self, device, flim, autoconnect):
        self.device = device
        self.flim = flim
        self.autoconnect = autoconnect
        self.autoconnect_changed = False

    def PowerOnDevice(self, device):
        """Power on a flimflam device, ignoring in progress errors."""
        logger.info('powered = %s' % device.GetProperties()['Powered'])
        if device.GetProperties()['Powered']:
            return
        try:
            device.Enable()
        except dbus.exceptions.DBusException, e:
            if e._dbus_error_name != 'org.chromium.flimflam.Error.InProgress':
                raise e

    def __enter__(self):
        """Power up device, get the service and disable autoconnect."""
        changed = False
        self.PowerOnDevice(self.device)

        # Use SERVICE_TIMEOUT*2 here because it may take SERVICE_TIMEOUT
        # seconds for the modem to disconnect when the base emulator is taken
        # offline for reconfiguration and then another SERVICE_TIMEOUT
        # seconds for the modem to reconnect after the base emulator is
        # brought back online.
        #
        # TODO(jglasgow): generalize to use services associated with device
        service = self.flim.FindCellularService(timeout=SERVICE_TIMEOUT*2)
        if not service:
            raise error.TestFail('No cellular service available.')

        # Always set the AutoConnect property even if the requested value
        # is the same so that shill will retain the AutoConnect property, else
        # shill may override it.
        props = service.GetProperties()
        autoconnect = props['AutoConnect']
        logger.info('AutoConnect = %s' % autoconnect)
        logger.info('Setting AutoConnect = %s.', self.autoconnect)
        service.SetProperty('AutoConnect', dbus.Boolean(self.autoconnect))

        if autoconnect != self.autoconnect:
            props = service.GetProperties()
            autoconnect = props['AutoConnect']
            changed = True

        # Make sure the cellular service gets persisted by taking it out of
        # the ephemeral profile.
        if not props['Profile']:
            manager_props = self.flim.manager.GetProperties()
            active_profile = manager_props['ActiveProfile']
            logger.info("Setting cellular service profile to %s",
                        active_profile)
            service.SetProperty('Profile', active_profile)

        if autoconnect != self.autoconnect:
            raise error.TestFail('AutoConnect is %s, but we want it to be %s' %
                                 (autoconnect, self.autoconnect))

        self.autoconnect_changed = changed

        return self

    def __exit__(self, exception, value, traceback):
        """Restore autoconnect state if we changed it."""
        if not self.autoconnect_changed:
            return False

        try:
            self.PowerOnDevice(self.device)
        except Exception as e:
            if exception:
                logger.error(
                    'Exiting AutoConnectContext with one exception, but ' +
                    'PowerOnDevice raised another')
                logger.error(
                    'Swallowing PowerOnDevice exception %s' % e)
                return False
            else:
                raise e

        # TODO(jglasgow): generalize to use services associated with
        # device, and restore state only on changed services
        service = self.flim.FindCellularService()
        if not service:
            logger.error('Cannot find cellular service.  '
                          'Autoconnect state not restored.')
            return False
        service.SetProperty('AutoConnect', dbus.Boolean(not self.autoconnect))

        return False