普通文本  |  156行  |  6.53 KB

# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

import logging

import pm_errors
import state_machine

from autotest_lib.client.cros.cellular import mm1_constants

class RegisterMachine(state_machine.StateMachine):
    """
    RegisterMachine handles the state transitions involved in bringing the
    modem to the REGISTERED state.

    """
    def __init__(self, modem, operator_code="", return_cb=None, raise_cb=None):
        super(RegisterMachine, self).__init__(modem)
        self._networks = None
        self._operator_code = operator_code
        self._return_cb = return_cb
        self._raise_cb = raise_cb


    def Cancel(self):
        """ Overriden from superclass. """
        logging.info('RegisterMachine: Canceling register.')
        super(RegisterMachine, self).Cancel()
        state = self._modem.Get(mm1_constants.I_MODEM, 'State')
        reason = mm1_constants.MM_MODEM_STATE_CHANGE_REASON_USER_REQUESTED
        if state == mm1_constants.MM_MODEM_STATE_SEARCHING:
            logging.info('RegisterMachine: Setting state to ENABLED.')
            self._modem.ChangeState(mm1_constants.MM_MODEM_STATE_ENABLED,
                                    reason)
            self._modem.SetRegistrationState(
                mm1_constants.MM_MODEM_3GPP_REGISTRATION_STATE_IDLE)
        self._modem.register_step = None
        if self._raise_cb:
            self._raise_cb(
                    pm_errors.MMCoreError(pm_errors.MMCoreError.CANCELLED,
                                          'Cancelled'))


    def _HandleEnabledState(self):
        logging.info('RegisterMachine: Modem is ENABLED.')
        logging.info('RegisterMachine: Setting registration state '
                     'to SEARCHING.')
        self._modem.SetRegistrationState(
            mm1_constants.MM_MODEM_3GPP_REGISTRATION_STATE_SEARCHING)
        logging.info('RegisterMachine: Setting state to SEARCHING.')
        reason = mm1_constants.MM_MODEM_STATE_CHANGE_REASON_USER_REQUESTED
        self._modem.ChangeState(mm1_constants.MM_MODEM_STATE_SEARCHING, reason)
        logging.info('RegisterMachine: Starting network scan.')
        try:
            self._networks = self._modem.SyncScan()
        except pm_errors.MMError as e:
            self._modem.register_step = None
            logging.error('An error occurred during network scan: ' + str(e))
            self._modem.ChangeState(
                    mm1_constants.MM_MODEM_STATE_ENABLED,
                    mm1_constants.MODEM_STATE_CHANGE_REASON_UNKNOWN)
            self._modem.SetRegistrationState(
                    mm1_constants.MM_MODEM_3GPP_REGISTRATION_STATE_IDLE)
            if self._raise_cb:
                self._raise_cb(e)
            return False
        logging.info('RegisterMachine: Found networks: ' + str(self._networks))
        return True


    def _HandleSearchingState(self):
        logging.info('RegisterMachine: Modem is SEARCHING.')
        if not self._networks:
            logging.info('RegisterMachine: Scan returned no networks.')
            logging.info('RegisterMachine: Setting state to ENABLED.')
            self._modem.ChangeState(
                    mm1_constants.MM_MODEM_STATE_ENABLED,
                    mm1_constants.MM_MODEM_STATE_CHANGE_REASON_UNKNOWN)
            # TODO(armansito): Figure out the correct registration
            # state to transition to when no network is present.
            logging.info('RegisterMachine: Setting registration state '
                         'to IDLE.')
            self._modem.SetRegistrationState(
                    mm1_constants.MM_MODEM_3GPP_REGISTRATION_STATE_IDLE)
            self._modem.register_step = None
            if self._raise_cb:
                self._raise_cb(pm_errors.MMMobileEquipmentError(
                        pm_errors.MMMobileEquipmentError.NO_NETWORK,
                        'No networks were found to register.'))
            return False

        # Pick the last network in the list. Roaming networks will come before
        # the home network which makes the last item in the list the home
        # network.
        if self._operator_code:
            if not self._operator_code in self._modem.scanned_networks:
                if self._raise_cb:
                    self._raise_cb(pm_errors.MMCoreError(
                            pm_errors.MMCoreError.FAILED,
                            "Unknown network: " + self._operator_code))
                return False
            network = self._modem.scanned_networks[self._operator_code]
        else:
            network = self._networks[-1]
        logging.info(
            'RegisterMachine: Registering to network: ' + str(network))
        self._modem.SetRegistered(
                network['operator-code'],
                network['operator-long'])

        # The previous call should have set the state to REGISTERED.
        self._modem.register_step = None

        if self._return_cb:
            self._return_cb()
        return False


    def _GetModemStateFunctionMap(self):
        return {
            mm1_constants.MM_MODEM_STATE_ENABLED:
                    RegisterMachine._HandleEnabledState,
            mm1_constants.MM_MODEM_STATE_SEARCHING:
                    RegisterMachine._HandleSearchingState
        }


    def _ShouldStartStateMachine(self):
        if self._modem.register_step and self._modem.register_step != self:
            # There is already an ongoing register operation.
            message = 'Register operation already in progress.'
            logging.info(message)
            error = pm_errors.MMCoreError(pm_errors.MMCoreError.IN_PROGRESS,
                                          message)
            if self._raise_cb:
                self._raise_cb(error)
            else:
                raise error
        elif self._modem.register_step is None:
            # There is no register operation going on, canceled or otherwise.
            state = self._modem.Get(mm1_constants.I_MODEM, 'State')
            if state != mm1_constants.MM_MODEM_STATE_ENABLED:
                message = 'Cannot initiate register while in state %d, ' \
                          'state needs to be ENABLED.' % state
                error = pm_errors.MMCoreError(pm_errors.MMCoreError.WRONG_STATE,
                                              message)
                if self._raise_cb:
                    self._raise_cb(error)
                else:
                    raise error

            logging.info('Starting Register.')
            self._modem.register_step = self
        return True