普通文本  |  570行  |  20.3 KB

#!/usr/bin/env python
# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

# This module is the entry point for pseudomodem. Though honestly, I can't think
# of any case when you want to use this module directly. Instead, use the
# |pseudomodem_context| module that provides a way to launch pseudomodem in a
# child process.

import argparse
import dbus
import dbus.mainloop.glib
import gobject
import imp
import json
import logging
import os
import os.path
import signal
import sys
import testing
import traceback

import logging_setup
import modem_cdma
import modem_3gpp
import modemmanager
import sim
import state_machine_factory as smf

import common
from autotest_lib.client.cros.cellular import mm1_constants

# Flags used by pseudomodem modules only that are defined below in
# ParserArguments.
CLI_FLAG = '--cli'
EXIT_ERROR_FILE_FLAG = '--exit-error-file'

class PseudoModemManager(object):
    """
    The main class to be used to launch the pseudomodem.

    There should be only one instance of this class that orchestrates
    pseudomodem.

    """

    def Setup(self, opts):
        """
        Call |Setup| to prepare pseudomodem to be launched.

        @param opts: The options accepted by pseudomodem. See top level function
                |ParseArguments| for details.

        """
        self._opts = opts

        self._in_exit_sequence = False
        self._manager = None
        self._modem = None
        self._state_machine_factory = None
        self._sim = None
        self._mainloop = None

        self._dbus_loop = dbus.mainloop.glib.DBusGMainLoop()
        self._bus = dbus.SystemBus(private=True, mainloop=self._dbus_loop)
        self._bus_name = dbus.service.BusName(mm1_constants.I_MODEM_MANAGER,
                                              self._bus)
        logging.info('Exported dbus service with well known name: |%s|',
                     self._bus_name.get_name())

        self._SetupPseudomodemParts()
        logging.info('Pseudomodem setup completed!')


    def StartBlocking(self):
        """
        Start pseudomodem operation.

        This call blocks untill |GracefulExit| is called from some other
        context.

        """
        self._mainloop = gobject.MainLoop()
        self._mainloop.run()


    def GracefulExit(self):
        """ Stop pseudomodem operation and clean up. """
        if self._in_exit_sequence:
            logging.debug('Already exiting.')
            return

        self._in_exit_sequence = True
        logging.info('pseudomodem shutdown sequence initiated...')
        # Guard each step by its own try...catch, because we want to attempt
        # each step irrespective of whether the earlier ones succeeded.
        try:
            if self._manager:
                self._manager.Remove(self._modem)
        except Exception as e:
            logging.warning('Error while exiting: %s', repr(e))
        try:
            if self._mainloop:
                self._mainloop.quit()
        except Exception as e:
            logging.warning('Error while exiting: %s', repr(e))

        logging.info('pseudomodem: Bye! Bye!')


    def _SetupPseudomodemParts(self):
        """
        Contructs all pseudomodem objects, but does not start operation.

        Three main objects are created: the |Modem|, the |Sim|, and the
        |StateMachineFactory|. This objects may be instantiations of the default
        classes, or of user provided classes, depending on options provided.

        """
        self._ReadCustomParts()

        use_3gpp = (self._opts.family == '3GPP')

        if not self._modem and not self._state_machine_factory:
            self._state_machine_factory = smf.StateMachineFactory()
            logging.info('Created default state machine factory.')

        if use_3gpp and not self._sim:
            self._sim = sim.SIM(sim.SIM.Carrier('test'),
                                mm1_constants.MM_MODEM_ACCESS_TECHNOLOGY_GSM,
                                locked=self._opts.locked)
            logging.info('Created default 3GPP SIM.')

        # Store this constant here because the variable name is too long.
        network_available = dbus.types.UInt32(
                mm1_constants.MM_MODEM_3GPP_NETWORK_AVAILABILITY_AVAILABLE)
        if not self._modem:
            if use_3gpp:
                technology_gsm = dbus.types.UInt32(
                        mm1_constants.MM_MODEM_ACCESS_TECHNOLOGY_GSM)
                networks = [modem_3gpp.Modem3gpp.GsmNetwork(
                        'Roaming Network Long ' + str(i),
                        'Roaming Network Short ' + str(i),
                        '00100' + str(i + 1),
                        network_available,
                        technology_gsm)
                        for i in xrange(self._opts.roaming_networks)]
                # TODO(armansito): Support "not activated" initialization option
                # for 3GPP carriers.
                self._modem = modem_3gpp.Modem3gpp(
                        self._state_machine_factory,
                        roaming_networks=networks)
                logging.info('Created default 3GPP modem.')
            else:
                self._modem = modem_cdma.ModemCdma(
                        self._state_machine_factory,
                        modem_cdma.ModemCdma.CdmaNetwork(
                                activated=self._opts.activated))
                logging.info('Created default CDMA modem.')

        # Everyone gets the |_bus|, woohoo!
        self._manager = modemmanager.ModemManager(self._bus)
        self._modem.SetBus(self._bus)  # Also sets it on StateMachineFactory.
        self._manager.Add(self._modem)

        # Unfortunately, setting the SIM has to be deferred until everyone has
        # their BUS set. |self._sim| exists if the user provided one, or if the
        # modem family is |3GPP|.
        if self._sim:
            self._modem.SetSIM(self._sim)

        # The testing interface can be brought up now that we have the bus.
        self._testing_object = testing.Testing(self._modem, self._bus)


    def _ReadCustomParts(self):
        """
        Loads user provided implementations of pseudomodem objects.

        The user can provide their own implementations of the |Modem|, |Sim| or
        |StateMachineFactory| classes.

        """
        if not self._opts.test_module:
            return

        test_module = self._LoadCustomPartsModule(self._opts.test_module)

        if self._opts.test_modem_class:
            self._modem = self._CreateCustomObject(test_module,
                                                   self._opts.test_modem_class,
                                                   self._opts.test_modem_arg)

        if self._opts.test_sim_class:
            self._sim = self._CreateCustomObject(test_module,
                                                 self._opts.test_sim_class,
                                                 self._opts.test_sim_arg)

        if self._opts.test_state_machine_factory_class:
            if self._opts.test_modem_class:
                logging.warning(
                        'User provided a |Modem| implementation as well as a '
                        '|StateMachineFactory|. Ignoring the latter.')
            else:
                self._state_machine_factory = self._CreateCustomObject(
                        test_module,
                        self._opts.test_state_machine_factory_class,
                        self._opts.test_state_machine_factory_arg)


    def _CreateCustomObject(self, test_module, class_name, arg_file_name):
        """
        Create the custom object specified by test.

        @param test_module: The loaded module that implemets the custom object.
        @param class_name: Name of the class implementing the custom object.
        @param arg_file_name: Absolute path to file containing list of arguments
                taken by |test_module|.|class_name| constructor in json.
        @returns: A brand new object of the custom type.
        @raises: AttributeError if the class definition is not found;
                ValueError if |arg_file| does not contain valid json
                representaiton of a python list.
                Other errors may be raised during object creation.

        """
        arg = None
        if arg_file_name:
            arg_file = open(arg_file_name, 'rb')
            try:
                arg = json.load(arg_file)
            finally:
                arg_file.close()
            if not isinstance(arg, list):
                raise ValueError('Argument must be a python list.')

        class_def = getattr(test_module, class_name)
        try:
            if arg:
                logging.debug('Loading test class %s%s',
                              class_name, str(arg))
                return class_def(*arg)
            else:
                logging.debug('Loading test class %s', class_def)
                return class_def()
        except Exception as e:
            logging.error('Exception raised when instantiating class %s: %s',
                          class_name, str(e))
            raise


    def _LoadCustomPartsModule(self, module_abs_path):
        """
        Loads the given file as a python module.

        The loaded module *is* added to |sys.modules|.

        @param module_abs_path: Absolute path to the file to be loaded.
        @returns: The loaded module.
        @raises: ImportError if the module can not be loaded, or if another
                 module with the same name is already loaded.

        """
        path, name = os.path.split(module_abs_path)
        name, _ = os.path.splitext(name)

        if name in sys.modules:
            raise ImportError('A module named |%s| is already loaded.' %
                              name)

        logging.debug('Loading module %s from %s', name, path)
        module_file, filepath, data = imp.find_module(name, [path])
        try:
            module = imp.load_module(name, module_file, filepath, data)
        except Exception as e:
            logging.error(
                    'Exception raised when loading test module from %s: %s',
                    module_abs_path, str(e))
            raise
        finally:
            module_file.close()
        return module


# ##############################################################################
# Public static functions.
def ParseArguments(arg_string=None):
    """
    The main argument parser.

    Pseudomodem is a command line tool.
    Since pseudomodem is a highly customizable tool, the command line arguments
    are expected to be quite complex.
    We use argparse to keep the command line options easy to use.

    @param arg_string: If not None, the string to parse. If none, |sys.argv| is
            used to obtain the argument string.
    @returns: The parsed options object.

    """
    parser = argparse.ArgumentParser(
            description="Run pseudomodem to simulate a modem using the "
                        "modemmanager-next DBus interface.")

    parser.add_argument(
            CLI_FLAG,
            action='store_true',
            default=False,
            help='Launch the command line interface in foreground to interact '
                 'with the launched pseudomodem process. This argument is used '
                 'by |pseudomodem_context|. pseudomodem itself ignores it.')
    parser.add_argument(
            EXIT_ERROR_FILE_FLAG,
            default=None,
            help='If provided, full path to file to which pseudomodem should '
                 'dump the error condition before exiting, in case of a crash. '
                 'The file is not created if it does not already exist.')

    modem_arguments = parser.add_argument_group(
            title='Modem options',
            description='Options to customize the modem exported.')
    modem_arguments.add_argument(
            '--family', '-f',
            choices=['3GPP', 'CDMA'],
            default='3GPP')

    gsm_arguments = parser.add_argument_group(
            title='3GPP options',
            description='Options specific to 3GPP modems. [Only make sense '
                        'when modem family is 3GPP]')

    gsm_arguments.add_argument(
            '--roaming-networks', '-r',
            type=_NonNegInt,
            default=0,
            metavar='<# networks>',
            help='Number of roaming networks available')

    cdma_arguments = parser.add_argument_group(
            title='CDMA options',
            description='Options specific to CDMA modems. [Only make sense '
                        'when modem family is CDMA]')

    sim_arguments = parser.add_argument_group(
            title='SIM options',
            description='Options to customize the SIM in the modem. [Only make '
                        'sense when modem family is 3GPP]')
    sim_arguments.add_argument(
            '--activated',
            type=bool,
            default=True,
            help='Determine whether the SIM is activated')
    sim_arguments.add_argument(
            '--locked', '-l',
            type=bool,
            default=False,
            help='Determine whether the SIM is in locked state')

    testing_arguments = parser.add_argument_group(
            title='Testing interface options',
            description='Options to modify how the tests or user interacts '
                        'with pseudomodem')
    testing_arguments = parser.add_argument(
            '--interactive-state-machines-all',
            type=bool,
            default=False,
            help='Launch all state machines in interactive mode.')
    testing_arguments = parser.add_argument(
            '--interactive-state-machine',
            type=str,
            default=None,
            help='Launch the specified state machine in interactive mode. May '
                 'be repeated to specify multiple machines.')

    customize_arguments = parser.add_argument_group(
            title='Customizable modem options',
            description='Options to customize the emulated modem.')
    customize_arguments.add_argument(
            '--test-module',
            type=str,
            default=None,
            metavar='CUSTOM_MODULE',
            help='Absolute path to the module with custom definitions.')
    customize_arguments.add_argument(
            '--test-modem-class',
            type=str,
            default=None,
            metavar='MODEM_CLASS',
            help='Name of the class in CUSTOM_MODULE that implements the modem '
                 'to load.')
    customize_arguments.add_argument(
            '--test-modem-arg',
            type=str,
            default=None,
            help='Absolute path to the json description of argument list '
                 'taken by MODEM_CLASS.')
    customize_arguments.add_argument(
            '--test-sim-class',
            type=str,
            default=None,
            metavar='SIM_CLASS',
            help='Name of the class in CUSTOM_MODULE that implements the SIM '
                 'to load.')
    customize_arguments.add_argument(
            '--test-sim-arg',
            type=str,
            default=None,
            help='Aboslute path to the json description of argument list '
                 'taken by SIM_CLASS')
    customize_arguments.add_argument(
            '--test-state-machine-factory-class',
            type=str,
            default=None,
            metavar='SMF_CLASS',
            help='Name of the class in CUSTOM_MODULE that impelements the '
                 'state machine factory to load. Only used if MODEM_CLASS is '
                 'not provided.')
    customize_arguments.add_argument(
            '--test-state-machine-factory-arg',
            type=str,
            default=None,
            help='Absolute path to the json description of argument list '
                 'taken by SMF_CLASS')

    opts = parser.parse_args(arg_string)

    # Extra sanity checks.
    if opts.family == 'CDMA' and opts.roaming_networks > 0:
        raise argparse.ArgumentTypeError('CDMA networks do not support '
                                         'roaming networks.')

    test_objects = (opts.test_modem_class or
                    opts.test_sim_class or
                    opts.test_state_machine_factory_class)
    if not opts.test_module and test_objects:
        raise argparse.ArgumentTypeError('test_module is required with any '
                                         'other customization arguments.')

    if opts.test_modem_class and opts.test_state_machine_factory_class:
        logging.warning('test-state-machine-factory-class will be ignored '
                        'because test-modem-class was provided.')

    return opts


def ExtractExitError(dump_file_path):
    """
    Gets the exit error left behind by a crashed pseudomodem.

    If there is a file at |dump_file_path|, extracts the error and the traceback
    left behind by the child process. This function is intended to be used by
    the launching process to parse the error file left behind by pseudomodem.

    @param dump_file_path: Full path to the file to read.
    @returns: (error_reason, error_traceback)
            error_reason: str. The one line reason for error that should be
                    used to raise exceptions.
            error_traceback: A list of str. This is the traceback left
                    behind by the child process, if any. May be [].

    """
    error_reason = 'No detailed reason found :('
    error_traceback = []
    if dump_file_path:
        try:
            dump_file = open(dump_file_path, 'rb')
            error_reason = dump_file.readline().strip()
            error_traceback = dump_file.readlines()
            dump_file.close()
        except OSError as e:
            logging.error('Could not open dump file %s: %s',
                          dump_file_path, str(e))
    return error_reason, error_traceback


# The single global instance of PseudoModemManager.
_pseudo_modem_manager = None


# ##############################################################################
# Private static functions.
def _NonNegInt(value):
    value = int(value)
    if value < 0:
        raise argparse.ArgumentTypeError('%s is not a non-negative int' % value)
    return value


def _DumpExitError(dump_file_path, exc):
    """
    Dump information about the raised exception in the exit error file.

    Format of file dumped:
    - First line is the reason for the crash.
    - Subsequent lines are the traceback from the exception raised.

    We expect the file to exist, because we want the launching context (that
    will eventually read the error dump) to create and own the file.

    @param dump_file_path: Full path to file to which we should dump.
    @param exc: The exception raised.

    """
    if not dump_file_path:
        return

    if not os.path.isfile(dump_file_path):
        logging.error('File |%s| does not exist. Can not dump exit error.',
                      dump_file_path)
        return

    try:
        dump_file = open(dump_file_path, 'wb')
    except IOError as e:
        logging.error('Could not open file |%s| to dump exit error. '
                      'Exception raised when opening file: %s',
                      dump_file_path, str(e))
        return

    dump_file.write(str(exc) + '\n')
    dump_file.writelines(traceback.format_exc())
    dump_file.close()


def sig_handler(signum, frame):
    """
    Top level signal handler to handle user interrupt.

    @param signum: The signal received.
    @param frame: Ignored.
    """
    global _pseudo_modem_manager
    logging.debug('Signal handler called with signal %d', signum)
    if _pseudo_modem_manager:
        _pseudo_modem_manager.GracefulExit()


def main():
    """
    This is the entry point for raw pseudomodem.

    You should not be running this module as a script. If you're trying to run
    pseudomodem from the command line, see |pseudomodem_context| module.

    """
    global _pseudo_modem_manager

    logging_setup.SetupLogging()

    logging.info('Pseudomodem commandline: [%s]', str(sys.argv))
    opts = ParseArguments()

    signal.signal(signal.SIGINT, sig_handler)
    signal.signal(signal.SIGTERM, sig_handler)

    try:
        _pseudo_modem_manager = PseudoModemManager()
        _pseudo_modem_manager.Setup(opts)
        _pseudo_modem_manager.StartBlocking()
    except Exception as e:
        logging.error('Caught exception at top level: %s', str(e))
        _DumpExitError(opts.exit_error_file, e)
        _pseudo_modem_manager.GracefulExit()
        raise


if __name__ == '__main__':
    main()