#!/usr/bin/python
# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import cellular_logging
import dbus, os, subprocess, time
from autotest_lib.client.common_lib import error
from autotest_lib.client.cros import flimflam_test_path
from autotest_lib.client.cros.cellular import modem
log = cellular_logging.SetupCellularLogging('mm_test')
class ModemManagerTest(object):
"""Wrapper for starting up ModemManager in an artificial testing
environment, connected to a fake modem program and talking to a
fake (tun) network device.
The test using this must ensure the setup of the fakegudev and
fakemodem deps.
"""
def __init__(self, autodir, modem_pattern_files):
self.autodir=autodir # not great. Examine deps directly?
self.modem_pattern_files = modem_pattern_files
self.modemmanager = None
self.fakemodem_process = None
self.fakenet_process = None
def _start_fake_network(self):
"""Start the fakenetwork program and return the fake interface name
Start up the fakenet program, which uses the tun driver to create
a network device.
Returns the name of the fake network interface.
Sets self.fakenet_process as a handle to the process.
"""
self.fakenet_process = subprocess.Popen(
os.path.join(self.autodir,'deps/fakemodem/bin','fakenet'),
stdout=subprocess.PIPE)
return self.fakenet_process.stdout.readline().rstrip()
def _start_fake_modem(self, patternfiles):
"""Start the fakemodem program and return the pty path to access it
Start up the fakemodem program
Argument:
patternfiles -- List of files to read for command/response patterns
Returns the device path of the pty that serves the fake modem, e.g.
/dev/pts/4.
Sets self.fakemodem_process as a handle to the process, and
self.fakemodem as a DBus interface to it.
"""
scriptargs = ["--patternfile=" + x for x in patternfiles]
name = os.path.join(self.autodir, 'deps/fakemodem/bin', 'fakemodem')
self.fakemodem_process = subprocess.Popen(
[os.path.join(self.autodir, 'deps/fakemodem/bin', 'fakemodem')]
+ scriptargs,
stdout=subprocess.PIPE)
ptyname = self.fakemodem_process.stdout.readline().rstrip()
time.sleep(2) # XXX
self.fakemodem = dbus.Interface(
dbus.SystemBus().get_object('org.chromium.FakeModem', '/'),
'org.chromium.FakeModem')
return ptyname
def _start_modemmanager(self, netname, modemname):
"""Start modemmanager under the control of fake devices.
Arguments:
netname -- fake network interface name (e.g. tun0)
modemname -- path to pty slave device of fake modem (e.g. /dev/pts/4)
Returns...
"""
id_props = ['property_ID_MM_CANDIDATE=1',
'property_ID_VENDOR_ID=04e8', # Samsung USB VID
'property_ID_MODEL_ID=6872' # Y3300 modem PID
]
tty_device = (['device_file=%s' % (modemname),
'name=%s' % (modemname[5:]), # remove leading /dev/
'subsystem=tty',
'driver=fake',
'sysfs_path=/sys/devices/fake/tty',
'parent=/dev/fake-parent'] +
id_props)
net_device = (['device_file=/dev/fakenet',
'name=%s' % (netname),
'subsystem=net',
'driver=fake',
'sysfs_path=/sys/devices/fake/net',
'parent=/dev/fake-parent'] +
id_props)
parent_device=['device_file=/dev/fake-parent',
'sysfs_path=/sys/devices/fake/parent',
'devtype=usb_device',
'subsystem=usb']
environment = { 'FAKEGUDEV_DEVICES' : ':'.join(tty_device +
net_device +
parent_device),
'FAKEGUDEV_BLOCK_REAL' : 'true',
'G_DEBUG' : 'fatal_criticals',
'LD_PRELOAD' : os.path.join(self.autodir,
"deps/fakegudev/lib",
"libfakegudev.so") }
self.modemmanager = subprocess.Popen(['/usr/sbin/modem-manager',
'--debug',
'--log-level=DEBUG',
'--log-file=/tmp/mm-log'],
env=environment)
time.sleep(3) # wait for DeviceAdded signal?
self.modemmanager.poll()
if self.modemmanager.returncode is not None:
self.modemmanager = None
raise error.TestFail("ModemManager quit early")
# wait for MM to stabilize?
return modem.ModemManager(provider='org.freedesktop')
def _stop_fake_network(self):
if self.fakenet_process:
self.fakenet_process.poll()
if self.fakenet_process.returncode is None:
self.fakenet_process.terminate()
self.fakenet_process.wait()
def _stop_fake_modem(self):
if self.fakemodem_process:
self.fakemodem_process.poll()
if self.fakemodem_process.returncode is None:
self.fakemodem_process.terminate()
self.fakemodem_process.wait()
def _stop_modemmanager(self):
if self.modemmanager:
self.modemmanager.poll()
if self.modemmanager.returncode is None:
self.modemmanager.terminate()
self.modemmanager.wait()
def __enter__(self):
fakenetname = self._start_fake_network()
fakemodemname = self._start_fake_modem(self.modem_pattern_files)
self.mm = self._start_modemmanager(fakenetname, fakemodemname)
# This would be better handled by listening for DeviceAdded, but
# since we've blocked everything else and only supplied data for
# one modem, it's going to be right
self.modem_object_path = self.mm.path + '/Modems/0'
return self
def __exit__(self, exception, value, traceback):
self._stop_modemmanager()
self._stop_fake_modem()
self._stop_fake_network()
return False