# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import dbus, gobject, logging, os, stat
from dbus.mainloop.glib import DBusGMainLoop
import common
from autotest_lib.client.bin import utils
from autotest_lib.client.common_lib import autotemp, error
from autotest_lib.client.cros import dbus_util
from mainloop import ExceptionForward
from mainloop import GenericTesterMainLoop
"""This module contains several helper classes for writing tests to verify the
CrosDisks DBus interface. In particular, the CrosDisksTester class can be used
to derive functional tests that interact with the CrosDisks server over DBus.
"""
class ExceptionSuppressor(object):
"""A context manager class for suppressing certain types of exception.
An instance of this class is expected to be used with the with statement
and takes a set of exception classes at instantiation, which are types of
exception to be suppressed (and logged) in the code block under the with
statement.
Example:
with ExceptionSuppressor(OSError, IOError):
# An exception, which is a sub-class of OSError or IOError, is
# suppressed in the block code under the with statement.
"""
def __init__(self, *args):
self.__suppressed_exc_types = (args)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
if exc_type and issubclass(exc_type, self.__suppressed_exc_types):
try:
logging.exception('Suppressed exception: %s(%s)',
exc_type, exc_value)
except Exception:
pass
return True
return False
class DBusClient(object):
""" A base class of a DBus proxy client to test a DBus server.
This class is expected to be used along with a GLib main loop and provides
some convenient functions for testing the DBus API exposed by a DBus server.
"""
def __init__(self, main_loop, bus, bus_name, object_path, timeout=None):
"""Initializes the instance.
Args:
main_loop: The GLib main loop.
bus: The bus where the DBus server is connected to.
bus_name: The bus name owned by the DBus server.
object_path: The object path of the DBus server.
timeout: Maximum time in seconds to wait for the DBus connection.
"""
self.__signal_content = {}
self.main_loop = main_loop
self.signal_timeout_in_seconds = 10
logging.debug('Getting D-Bus proxy object on bus "%s" and path "%s"',
bus_name, object_path)
self.proxy_object = dbus_util.get_dbus_object(bus, bus_name,
object_path, timeout)
def clear_signal_content(self, signal_name):
"""Clears the content of the signal.
Args:
signal_name: The name of the signal.
"""
if signal_name in self.__signal_content:
self.__signal_content[signal_name] = None
def get_signal_content(self, signal_name):
"""Gets the content of a signal.
Args:
signal_name: The name of the signal.
Returns:
The content of a signal or None if the signal is not being handled.
"""
return self.__signal_content.get(signal_name)
def handle_signal(self, interface, signal_name, argument_names=()):
"""Registers a signal handler to handle a given signal.
Args:
interface: The DBus interface of the signal.
signal_name: The name of the signal.
argument_names: A list of argument names that the signal contains.
"""
if signal_name in self.__signal_content:
return
self.__signal_content[signal_name] = None
def signal_handler(*args):
self.__signal_content[signal_name] = dict(zip(argument_names, args))
logging.debug('Handling D-Bus signal "%s(%s)" on interface "%s"',
signal_name, ', '.join(argument_names), interface)
self.proxy_object.connect_to_signal(signal_name, signal_handler,
interface)
def wait_for_signal(self, signal_name):
"""Waits for the reception of a signal.
Args:
signal_name: The name of the signal to wait for.
Returns:
The content of the signal.
"""
if signal_name not in self.__signal_content:
return None
def check_signal_content():
context = self.main_loop.get_context()
while context.iteration(False):
pass
return self.__signal_content[signal_name] is not None
logging.debug('Waiting for D-Bus signal "%s"', signal_name)
utils.poll_for_condition(condition=check_signal_content,
desc='%s signal' % signal_name,
timeout=self.signal_timeout_in_seconds)
content = self.__signal_content[signal_name]
logging.debug('Received D-Bus signal "%s(%s)"', signal_name, content)
self.__signal_content[signal_name] = None
return content
def expect_signal(self, signal_name, expected_content):
"""Waits the the reception of a signal and verifies its content.
Args:
signal_name: The name of the signal to wait for.
expected_content: The expected content of the signal, which can be
partially specified. Only specified fields are
compared between the actual and expected content.
Returns:
The actual content of the signal.
Raises:
error.TestFail: A test failure when there is a mismatch between the
actual and expected content of the signal.
"""
actual_content = self.wait_for_signal(signal_name)
logging.debug("%s signal: expected=%s actual=%s",
signal_name, expected_content, actual_content)
for argument, expected_value in expected_content.iteritems():
if argument not in actual_content:
raise error.TestFail(
('%s signal missing "%s": expected=%s, actual=%s') %
(signal_name, argument, expected_content, actual_content))
if actual_content[argument] != expected_value:
raise error.TestFail(
('%s signal not matched on "%s": expected=%s, actual=%s') %
(signal_name, argument, expected_content, actual_content))
return actual_content
class CrosDisksClient(DBusClient):
"""A DBus proxy client for testing the CrosDisks DBus server.
"""
CROS_DISKS_BUS_NAME = 'org.chromium.CrosDisks'
CROS_DISKS_INTERFACE = 'org.chromium.CrosDisks'
CROS_DISKS_OBJECT_PATH = '/org/chromium/CrosDisks'
DBUS_PROPERTIES_INTERFACE = 'org.freedesktop.DBus.Properties'
FORMAT_COMPLETED_SIGNAL = 'FormatCompleted'
FORMAT_COMPLETED_SIGNAL_ARGUMENTS = (
'status', 'path'
)
MOUNT_COMPLETED_SIGNAL = 'MountCompleted'
MOUNT_COMPLETED_SIGNAL_ARGUMENTS = (
'status', 'source_path', 'source_type', 'mount_path'
)
RENAME_COMPLETED_SIGNAL = 'RenameCompleted'
RENAME_COMPLETED_SIGNAL_ARGUMENTS = (
'status', 'path'
)
def __init__(self, main_loop, bus, timeout_seconds=None):
"""Initializes the instance.
Args:
main_loop: The GLib main loop.
bus: The bus where the DBus server is connected to.
timeout_seconds: Maximum time in seconds to wait for the DBus
connection.
"""
super(CrosDisksClient, self).__init__(main_loop, bus,
self.CROS_DISKS_BUS_NAME,
self.CROS_DISKS_OBJECT_PATH,
timeout_seconds)
self.interface = dbus.Interface(self.proxy_object,
self.CROS_DISKS_INTERFACE)
self.properties = dbus.Interface(self.proxy_object,
self.DBUS_PROPERTIES_INTERFACE)
self.handle_signal(self.CROS_DISKS_INTERFACE,
self.FORMAT_COMPLETED_SIGNAL,
self.FORMAT_COMPLETED_SIGNAL_ARGUMENTS)
self.handle_signal(self.CROS_DISKS_INTERFACE,
self.MOUNT_COMPLETED_SIGNAL,
self.MOUNT_COMPLETED_SIGNAL_ARGUMENTS)
self.handle_signal(self.CROS_DISKS_INTERFACE,
self.RENAME_COMPLETED_SIGNAL,
self.RENAME_COMPLETED_SIGNAL_ARGUMENTS)
def enumerate_devices(self):
"""Invokes the CrosDisks EnumerateMountableDevices method.
Returns:
A list of sysfs paths of devices that are recognized by
CrosDisks.
"""
return self.interface.EnumerateDevices()
def get_device_properties(self, path):
"""Invokes the CrosDisks GetDeviceProperties method.
Args:
path: The device path.
Returns:
The properties of the device in a dictionary.
"""
return self.interface.GetDeviceProperties(path)
def format(self, path, filesystem_type=None, options=None):
"""Invokes the CrosDisks Format method.
Args:
path: The device path to format.
filesystem_type: The filesystem type used for formatting the device.
options: A list of options used for formatting the device.
"""
if filesystem_type is None:
filesystem_type = ''
if options is None:
options = []
self.clear_signal_content(self.FORMAT_COMPLETED_SIGNAL)
self.interface.Format(path, filesystem_type,
dbus.Array(options, signature='s'))
def wait_for_format_completion(self):
"""Waits for the CrosDisks FormatCompleted signal.
Returns:
The content of the FormatCompleted signal.
"""
return self.wait_for_signal(self.FORMAT_COMPLETED_SIGNAL)
def expect_format_completion(self, expected_content):
"""Waits and verifies for the CrosDisks FormatCompleted signal.
Args:
expected_content: The expected content of the FormatCompleted
signal, which can be partially specified.
Only specified fields are compared between the
actual and expected content.
Returns:
The actual content of the FormatCompleted signal.
Raises:
error.TestFail: A test failure when there is a mismatch between the
actual and expected content of the FormatCompleted
signal.
"""
return self.expect_signal(self.FORMAT_COMPLETED_SIGNAL,
expected_content)
def rename(self, path, volume_name=None):
"""Invokes the CrosDisks Rename method.
Args:
path: The device path to rename.
volume_name: The new name used for renaming.
"""
if volume_name is None:
volume_name = ''
self.clear_signal_content(self.RENAME_COMPLETED_SIGNAL)
self.interface.Rename(path, volume_name)
def wait_for_rename_completion(self):
"""Waits for the CrosDisks RenameCompleted signal.
Returns:
The content of the RenameCompleted signal.
"""
return self.wait_for_signal(self.RENAME_COMPLETED_SIGNAL)
def expect_rename_completion(self, expected_content):
"""Waits and verifies for the CrosDisks RenameCompleted signal.
Args:
expected_content: The expected content of the RenameCompleted
signal, which can be partially specified.
Only specified fields are compared between the
actual and expected content.
Returns:
The actual content of the RenameCompleted signal.
Raises:
error.TestFail: A test failure when there is a mismatch between the
actual and expected content of the RenameCompleted
signal.
"""
return self.expect_signal(self.RENAME_COMPLETED_SIGNAL,
expected_content)
def mount(self, path, filesystem_type=None, options=None):
"""Invokes the CrosDisks Mount method.
Args:
path: The device path to mount.
filesystem_type: The filesystem type used for mounting the device.
options: A list of options used for mounting the device.
"""
if filesystem_type is None:
filesystem_type = ''
if options is None:
options = []
self.clear_signal_content(self.MOUNT_COMPLETED_SIGNAL)
self.interface.Mount(path, filesystem_type,
dbus.Array(options, signature='s'))
def unmount(self, path, options=None):
"""Invokes the CrosDisks Unmount method.
Args:
path: The device or mount path to unmount.
options: A list of options used for unmounting the path.
Returns:
The mount error code.
"""
if options is None:
options = []
return self.interface.Unmount(path, dbus.Array(options, signature='s'))
def wait_for_mount_completion(self):
"""Waits for the CrosDisks MountCompleted signal.
Returns:
The content of the MountCompleted signal.
"""
return self.wait_for_signal(self.MOUNT_COMPLETED_SIGNAL)
def expect_mount_completion(self, expected_content):
"""Waits and verifies for the CrosDisks MountCompleted signal.
Args:
expected_content: The expected content of the MountCompleted
signal, which can be partially specified.
Only specified fields are compared between the
actual and expected content.
Returns:
The actual content of the MountCompleted signal.
Raises:
error.TestFail: A test failure when there is a mismatch between the
actual and expected content of the MountCompleted
signal.
"""
return self.expect_signal(self.MOUNT_COMPLETED_SIGNAL,
expected_content)
class CrosDisksTester(GenericTesterMainLoop):
"""A base tester class for testing the CrosDisks server.
A derived class should override the get_tests method to return a list of
test methods. The perform_one_test method invokes each test method in the
list to verify some functionalities of CrosDisks server.
"""
def __init__(self, test):
bus_loop = DBusGMainLoop(set_as_default=True)
self.bus = dbus.SystemBus(mainloop=bus_loop)
self.main_loop = gobject.MainLoop()
super(CrosDisksTester, self).__init__(test, self.main_loop)
self.cros_disks = CrosDisksClient(self.main_loop, self.bus)
def get_tests(self):
"""Returns a list of test methods to be invoked by perform_one_test.
A derived class should override this method.
Returns:
A list of test methods.
"""
return []
@ExceptionForward
def perform_one_test(self):
"""Exercises each test method in the list returned by get_tests.
"""
tests = self.get_tests()
self.remaining_requirements = set([test.func_name for test in tests])
for test in tests:
test()
self.requirement_completed(test.func_name)
def reconnect_client(self, timeout_seconds=None):
""""Reconnect the CrosDisks DBus client.
Args:
timeout_seconds: Maximum time in seconds to wait for the DBus
connection.
"""
self.cros_disks = CrosDisksClient(self.main_loop, self.bus,
timeout_seconds)
class FilesystemTestObject(object):
"""A base class to represent a filesystem test object.
A filesystem test object can be a file, directory or symbolic link.
A derived class should override the _create and _verify method to implement
how the test object should be created and verified, respectively, on a
filesystem.
"""
def __init__(self, path, content, mode):
"""Initializes the instance.
Args:
path: The relative path of the test object.
content: The content of the test object.
mode: The file permissions given to the test object.
"""
self._path = path
self._content = content
self._mode = mode
def create(self, base_dir):
"""Creates the test object in a base directory.
Args:
base_dir: The base directory where the test object is created.
Returns:
True if the test object is created successfully or False otherwise.
"""
if not self._create(base_dir):
logging.debug('Failed to create filesystem test object at "%s"',
os.path.join(base_dir, self._path))
return False
return True
def verify(self, base_dir):
"""Verifies the test object in a base directory.
Args:
base_dir: The base directory where the test object is expected to be
found.
Returns:
True if the test object is found in the base directory and matches
the expected content, or False otherwise.
"""
if not self._verify(base_dir):
logging.debug('Failed to verify filesystem test object at "%s"',
os.path.join(base_dir, self._path))
return False
return True
def _create(self, base_dir):
return False
def _verify(self, base_dir):
return False
class FilesystemTestDirectory(FilesystemTestObject):
"""A filesystem test object that represents a directory."""
def __init__(self, path, content, mode=stat.S_IRWXU|stat.S_IRGRP| \
stat.S_IXGRP|stat.S_IROTH|stat.S_IXOTH):
super(FilesystemTestDirectory, self).__init__(path, content, mode)
def _create(self, base_dir):
path = os.path.join(base_dir, self._path) if self._path else base_dir
if self._path:
with ExceptionSuppressor(OSError):
os.makedirs(path)
os.chmod(path, self._mode)
if not os.path.isdir(path):
return False
for content in self._content:
if not content.create(path):
return False
return True
def _verify(self, base_dir):
path = os.path.join(base_dir, self._path) if self._path else base_dir
if not os.path.isdir(path):
return False
for content in self._content:
if not content.verify(path):
return False
return True
class FilesystemTestFile(FilesystemTestObject):
"""A filesystem test object that represents a file."""
def __init__(self, path, content, mode=stat.S_IRUSR|stat.S_IWUSR| \
stat.S_IRGRP|stat.S_IROTH):
super(FilesystemTestFile, self).__init__(path, content, mode)
def _create(self, base_dir):
path = os.path.join(base_dir, self._path)
with ExceptionSuppressor(IOError):
with open(path, 'wb+') as f:
f.write(self._content)
with ExceptionSuppressor(OSError):
os.chmod(path, self._mode)
return True
return False
def _verify(self, base_dir):
path = os.path.join(base_dir, self._path)
with ExceptionSuppressor(IOError):
with open(path, 'rb') as f:
return f.read() == self._content
return False
class DefaultFilesystemTestContent(FilesystemTestDirectory):
def __init__(self):
super(DefaultFilesystemTestContent, self).__init__('', [
FilesystemTestFile('file1', '0123456789'),
FilesystemTestDirectory('dir1', [
FilesystemTestFile('file1', ''),
FilesystemTestFile('file2', 'abcdefg'),
FilesystemTestDirectory('dir2', [
FilesystemTestFile('file3', 'abcdefg'),
FilesystemTestFile('file4', 'a' * 65536),
]),
]),
], stat.S_IRWXU|stat.S_IRGRP|stat.S_IXGRP|stat.S_IROTH|stat.S_IXOTH)
class VirtualFilesystemImage(object):
def __init__(self, block_size, block_count, filesystem_type,
*args, **kwargs):
"""Initializes the instance.
Args:
block_size: The number of bytes of each block in the image.
block_count: The number of blocks in the image.
filesystem_type: The filesystem type to be given to the mkfs
program for formatting the image.
Keyword Args:
mount_filesystem_type: The filesystem type to be given to the
mount program for mounting the image.
mkfs_options: A list of options to be given to the mkfs program.
"""
self._block_size = block_size
self._block_count = block_count
self._filesystem_type = filesystem_type
self._mount_filesystem_type = kwargs.get('mount_filesystem_type')
if self._mount_filesystem_type is None:
self._mount_filesystem_type = filesystem_type
self._mkfs_options = kwargs.get('mkfs_options')
if self._mkfs_options is None:
self._mkfs_options = []
self._image_file = None
self._loop_device = None
self._loop_device_stat = None
self._mount_dir = None
def __del__(self):
with ExceptionSuppressor(Exception):
self.clean()
def __enter__(self):
self.create()
return self
def __exit__(self, exc_type, exc_value, traceback):
self.clean()
return False
def _remove_temp_path(self, temp_path):
"""Removes a temporary file or directory created using autotemp."""
if temp_path:
with ExceptionSuppressor(Exception):
path = temp_path.name
temp_path.clean()
logging.debug('Removed "%s"', path)
def _remove_image_file(self):
"""Removes the image file if one has been created."""
self._remove_temp_path(self._image_file)
self._image_file = None
def _remove_mount_dir(self):
"""Removes the mount directory if one has been created."""
self._remove_temp_path(self._mount_dir)
self._mount_dir = None
@property
def image_file(self):
"""Gets the path of the image file.
Returns:
The path of the image file or None if no image file has been
created.
"""
return self._image_file.name if self._image_file else None
@property
def loop_device(self):
"""Gets the loop device where the image file is attached to.
Returns:
The path of the loop device where the image file is attached to or
None if no loop device is attaching the image file.
"""
return self._loop_device
@property
def mount_dir(self):
"""Gets the directory where the image file is mounted to.
Returns:
The directory where the image file is mounted to or None if no
mount directory has been created.
"""
return self._mount_dir.name if self._mount_dir else None
def create(self):
"""Creates a zero-filled image file with the specified size.
The created image file is temporary and removed when clean()
is called.
"""
self.clean()
self._image_file = autotemp.tempfile(unique_id='fsImage')
try:
logging.debug('Creating zero-filled image file at "%s"',
self._image_file.name)
utils.run('dd if=/dev/zero of=%s bs=%s count=%s' %
(self._image_file.name, self._block_size,
self._block_count))
except error.CmdError as exc:
self._remove_image_file()
message = 'Failed to create filesystem image: %s' % exc
raise RuntimeError(message)
def clean(self):
"""Removes the image file if one has been created.
Before removal, the image file is detached from the loop device that
it is attached to.
"""
self.detach_from_loop_device()
self._remove_image_file()
def attach_to_loop_device(self):
"""Attaches the created image file to a loop device.
Creates the image file, if one has not been created, by calling
create().
Returns:
The path of the loop device where the image file is attached to.
"""
if self._loop_device:
return self._loop_device
if not self._image_file:
self.create()
logging.debug('Attaching image file "%s" to loop device',
self._image_file.name)
utils.run('losetup -f %s' % self._image_file.name)
output = utils.system_output('losetup -j %s' % self._image_file.name)
# output should look like: "/dev/loop0: [000d]:6329 (/tmp/test.img)"
self._loop_device = output.split(':')[0]
logging.debug('Attached image file "%s" to loop device "%s"',
self._image_file.name, self._loop_device)
self._loop_device_stat = os.stat(self._loop_device)
logging.debug('Loop device "%s" (uid=%d, gid=%d, permissions=%04o)',
self._loop_device,
self._loop_device_stat.st_uid,
self._loop_device_stat.st_gid,
stat.S_IMODE(self._loop_device_stat.st_mode))
return self._loop_device
def detach_from_loop_device(self):
"""Detaches the image file from the loop device."""
if not self._loop_device:
return
self.unmount()
logging.debug('Cleaning up remaining mount points of loop device "%s"',
self._loop_device)
utils.run('umount -f %s' % self._loop_device, ignore_status=True)
logging.debug('Restore ownership/permissions of loop device "%s"',
self._loop_device)
os.chmod(self._loop_device,
stat.S_IMODE(self._loop_device_stat.st_mode))
os.chown(self._loop_device,
self._loop_device_stat.st_uid, self._loop_device_stat.st_gid)
logging.debug('Detaching image file "%s" from loop device "%s"',
self._image_file.name, self._loop_device)
utils.run('losetup -d %s' % self._loop_device)
self._loop_device = None
def format(self):
"""Formats the image file as the specified filesystem."""
self.attach_to_loop_device()
try:
logging.debug('Formatting image file at "%s" as "%s" filesystem',
self._image_file.name, self._filesystem_type)
utils.run('yes | mkfs -t %s %s %s' %
(self._filesystem_type, ' '.join(self._mkfs_options),
self._loop_device))
logging.debug('blkid: %s', utils.system_output(
'blkid -c /dev/null %s' % self._loop_device,
ignore_status=True))
except error.CmdError as exc:
message = 'Failed to format filesystem image: %s' % exc
raise RuntimeError(message)
def mount(self, options=None):
"""Mounts the image file to a directory.
Args:
options: An optional list of mount options.
"""
if self._mount_dir:
return self._mount_dir.name
if options is None:
options = []
options_arg = ','.join(options)
if options_arg:
options_arg = '-o ' + options_arg
self.attach_to_loop_device()
self._mount_dir = autotemp.tempdir(unique_id='fsImage')
try:
logging.debug('Mounting image file "%s" (%s) to directory "%s"',
self._image_file.name, self._loop_device,
self._mount_dir.name)
utils.run('mount -t %s %s %s %s' %
(self._mount_filesystem_type, options_arg,
self._loop_device, self._mount_dir.name))
except error.CmdError as exc:
self._remove_mount_dir()
message = ('Failed to mount virtual filesystem image "%s": %s' %
(self._image_file.name, exc))
raise RuntimeError(message)
return self._mount_dir.name
def unmount(self):
"""Unmounts the image file from the mounted directory."""
if not self._mount_dir:
return
try:
logging.debug('Unmounting image file "%s" (%s) from directory "%s"',
self._image_file.name, self._loop_device,
self._mount_dir.name)
utils.run('umount %s' % self._mount_dir.name)
except error.CmdError as exc:
message = ('Failed to unmount virtual filesystem image "%s": %s' %
(self._image_file.name, exc))
raise RuntimeError(message)
finally:
self._remove_mount_dir()
def get_volume_label(self):
"""Gets volume name information of |self._loop_device|
@return a string with volume name if it exists.
"""
# This script is run as root in a normal autotest run,
# so this works: It doesn't have access to the necessary info
# when run as a non-privileged user
cmd = "blkid -c /dev/null -o udev %s" % self._loop_device
output = utils.system_output(cmd, ignore_status=True)
for line in output.splitlines():
udev_key, udev_val = line.split('=')
if udev_key == 'ID_FS_LABEL':
return udev_val
return None