普通文本  |  736行  |  26.43 KB

# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

import dbus, gobject, logging, os, stat
from dbus.mainloop.glib import DBusGMainLoop

import common
from autotest_lib.client.bin import utils
from autotest_lib.client.common_lib import autotemp, error
from mainloop import ExceptionForward
from mainloop import GenericTesterMainLoop


"""This module contains several helper classes for writing tests to verify the
CrosDisks DBus interface. In particular, the CrosDisksTester class can be used
to derive functional tests that interact with the CrosDisks server over DBus.
"""


class ExceptionSuppressor(object):
    """A context manager class for suppressing certain types of exception.

    An instance of this class is expected to be used with the with statement
    and takes a set of exception classes at instantiation, which are types of
    exception to be suppressed (and logged) in the code block under the with
    statement.

    Example:

        with ExceptionSuppressor(OSError, IOError):
            # An exception, which is a sub-class of OSError or IOError, is
            # suppressed in the block code under the with statement.
    """
    def __init__(self, *args):
        self.__suppressed_exc_types = (args)

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        if exc_type and issubclass(exc_type, self.__suppressed_exc_types):
            try:
                logging.exception('Suppressed exception: %s(%s)',
                                  exc_type, exc_value)
            except Exception:
                pass
            return True
        return False


class DBusClient(object):
    """ A base class of a DBus proxy client to test a DBus server.

    This class is expected to be used along with a GLib main loop and provides
    some convenient functions for testing the DBus API exposed by a DBus server.
    """
    def __init__(self, main_loop, bus, bus_name, object_path):
        """Initializes the instance.

        Args:
            main_loop: The GLib main loop.
            bus: The bus where the DBus server is connected to.
            bus_name: The bus name owned by the DBus server.
            object_path: The object path of the DBus server.
        """
        self.__signal_content = {}
        self.main_loop = main_loop
        self.signal_timeout_in_seconds = 10
        logging.debug('Getting D-Bus proxy object on bus "%s" and path "%s"',
                      bus_name, object_path)
        self.proxy_object = bus.get_object(bus_name, object_path)

    def clear_signal_content(self, signal_name):
        """Clears the content of the signal.

        Args:
            signal_name: The name of the signal.
        """
        if signal_name in self.__signal_content:
            self.__signal_content[signal_name] = None

    def get_signal_content(self, signal_name):
        """Gets the content of a signal.

        Args:
            signal_name: The name of the signal.

        Returns:
            The content of a signal or None if the signal is not being handled.
        """
        return self.__signal_content.get(signal_name)

    def handle_signal(self, interface, signal_name, argument_names=()):
        """Registers a signal handler to handle a given signal.

        Args:
            interface: The DBus interface of the signal.
            signal_name: The name of the signal.
            argument_names: A list of argument names that the signal contains.
        """
        if signal_name in self.__signal_content:
            return

        self.__signal_content[signal_name] = None

        def signal_handler(*args):
            self.__signal_content[signal_name] = dict(zip(argument_names, args))

        logging.debug('Handling D-Bus signal "%s(%s)" on interface "%s"',
                      signal_name, ', '.join(argument_names), interface)
        self.proxy_object.connect_to_signal(signal_name, signal_handler,
                                            interface)

    def wait_for_signal(self, signal_name):
        """Waits for the reception of a signal.

        Args:
            signal_name: The name of the signal to wait for.

        Returns:
            The content of the signal.
        """
        if signal_name not in self.__signal_content:
            return None

        def check_signal_content():
            context = self.main_loop.get_context()
            while context.iteration(False):
                pass
            return self.__signal_content[signal_name] is not None

        logging.debug('Waiting for D-Bus signal "%s"', signal_name)
        utils.poll_for_condition(condition=check_signal_content,
                                 desc='%s signal' % signal_name,
                                 timeout=self.signal_timeout_in_seconds)
        content = self.__signal_content[signal_name]
        logging.debug('Received D-Bus signal "%s(%s)"', signal_name, content)
        self.__signal_content[signal_name] = None
        return content

    def expect_signal(self, signal_name, expected_content):
        """Waits the the reception of a signal and verifies its content.

        Args:
            signal_name: The name of the signal to wait for.
            expected_content: The expected content of the signal, which can be
                              partially specified. Only specified fields are
                              compared between the actual and expected content.

        Returns:
            The actual content of the signal.

        Raises:
            error.TestFail: A test failure when there is a mismatch between the
                            actual and expected content of the signal.
        """
        actual_content = self.wait_for_signal(signal_name)
        logging.debug("%s signal: expected=%s actual=%s",
                      signal_name, expected_content, actual_content)
        for argument, expected_value in expected_content.iteritems():
            if argument not in actual_content:
                raise error.TestFail(
                    ('%s signal missing "%s": expected=%s, actual=%s') %
                    (signal_name, argument, expected_content, actual_content))

            if actual_content[argument] != expected_value:
                raise error.TestFail(
                    ('%s signal not matched on "%s": expected=%s, actual=%s') %
                    (signal_name, argument, expected_content, actual_content))
        return actual_content


class CrosDisksClient(DBusClient):
    """A DBus proxy client for testing the CrosDisks DBus server.
    """

    CROS_DISKS_BUS_NAME = 'org.chromium.CrosDisks'
    CROS_DISKS_INTERFACE = 'org.chromium.CrosDisks'
    CROS_DISKS_OBJECT_PATH = '/org/chromium/CrosDisks'
    DBUS_PROPERTIES_INTERFACE = 'org.freedesktop.DBus.Properties'
    FORMAT_COMPLETED_SIGNAL = 'FormatCompleted'
    FORMAT_COMPLETED_SIGNAL_ARGUMENTS = (
        'status', 'path'
    )
    MOUNT_COMPLETED_SIGNAL = 'MountCompleted'
    MOUNT_COMPLETED_SIGNAL_ARGUMENTS = (
        'status', 'source_path', 'source_type', 'mount_path'
    )

    def __init__(self, main_loop, bus):
        """Initializes the instance.

        Args:
            main_loop: The GLib main loop.
            bus: The bus where the DBus server is connected to.
        """
        super(CrosDisksClient, self).__init__(main_loop, bus,
                                              self.CROS_DISKS_BUS_NAME,
                                              self.CROS_DISKS_OBJECT_PATH)
        self.interface = dbus.Interface(self.proxy_object,
                                        self.CROS_DISKS_INTERFACE)
        self.properties = dbus.Interface(self.proxy_object,
                                         self.DBUS_PROPERTIES_INTERFACE)
        self.handle_signal(self.CROS_DISKS_INTERFACE,
                           self.FORMAT_COMPLETED_SIGNAL,
                           self.FORMAT_COMPLETED_SIGNAL_ARGUMENTS)
        self.handle_signal(self.CROS_DISKS_INTERFACE,
                           self.MOUNT_COMPLETED_SIGNAL,
                           self.MOUNT_COMPLETED_SIGNAL_ARGUMENTS)

    def is_alive(self):
        """Invokes the CrosDisks IsAlive method.

        Returns:
            True if the CrosDisks server is alive or False otherwise.
        """
        return self.interface.IsAlive()

    def enumerate_auto_mountable_devices(self):
        """Invokes the CrosDisks EnumerateAutoMountableDevices method.

        Returns:
            A list of sysfs paths of devices that are auto-mountable by
            CrosDisks.
        """
        return self.interface.EnumerateAutoMountableDevices()

    def enumerate_devices(self):
        """Invokes the CrosDisks EnumerateMountableDevices method.

        Returns:
            A list of sysfs paths of devices that are recognized by
            CrosDisks.
        """
        return self.interface.EnumerateDevices()

    def get_device_properties(self, path):
        """Invokes the CrosDisks GetDeviceProperties method.

        Args:
            path: The device path.

        Returns:
            The properties of the device in a dictionary.
        """
        return self.interface.GetDeviceProperties(path)

    def format(self, path, filesystem_type=None, options=None):
        """Invokes the CrosDisks Format method.

        Args:
            path: The device path to format.
            filesystem_type: The filesystem type used for formatting the device.
            options: A list of options used for formatting the device.
        """
        if filesystem_type is None:
            filesystem_type = ''
        if options is None:
            options = []
        self.clear_signal_content(self.FORMAT_COMPLETED_SIGNAL)
        self.interface.Format(path, filesystem_type, options)

    def wait_for_format_completion(self):
        """Waits for the CrosDisks FormatCompleted signal.

        Returns:
            The content of the FormatCompleted signal.
        """
        return self.wait_for_signal(self.FORMAT_COMPLETED_SIGNAL)

    def expect_format_completion(self, expected_content):
        """Waits and verifies for the CrosDisks FormatCompleted signal.

        Args:
            expected_content: The expected content of the FormatCompleted
                              signal, which can be partially specified.
                              Only specified fields are compared between the
                              actual and expected content.

        Returns:
            The actual content of the FormatCompleted signal.

        Raises:
            error.TestFail: A test failure when there is a mismatch between the
                            actual and expected content of the FormatCompleted
                            signal.
        """
        return self.expect_signal(self.FORMAT_COMPLETED_SIGNAL,
                                  expected_content)

    def mount(self, path, filesystem_type=None, options=None):
        """Invokes the CrosDisks Mount method.

        Args:
            path: The device path to mount.
            filesystem_type: The filesystem type used for mounting the device.
            options: A list of options used for mounting the device.
        """
        if filesystem_type is None:
            filesystem_type = ''
        if options is None:
            options = []
        self.clear_signal_content(self.MOUNT_COMPLETED_SIGNAL)
        self.interface.Mount(path, filesystem_type, options)

    def unmount(self, path, options=None):
        """Invokes the CrosDisks Unmount method.

        Args:
            path: The device or mount path to unmount.
            options: A list of options used for unmounting the path.
        """
        if options is None:
            options = []
        self.interface.Unmount(path, options)

    def wait_for_mount_completion(self):
        """Waits for the CrosDisks MountCompleted signal.

        Returns:
            The content of the MountCompleted signal.
        """
        return self.wait_for_signal(self.MOUNT_COMPLETED_SIGNAL)

    def expect_mount_completion(self, expected_content):
        """Waits and verifies for the CrosDisks MountCompleted signal.

        Args:
            expected_content: The expected content of the MountCompleted
                              signal, which can be partially specified.
                              Only specified fields are compared between the
                              actual and expected content.

        Returns:
            The actual content of the MountCompleted signal.

        Raises:
            error.TestFail: A test failure when there is a mismatch between the
                            actual and expected content of the MountCompleted
                            signal.
        """
        return self.expect_signal(self.MOUNT_COMPLETED_SIGNAL,
                                  expected_content)


class CrosDisksTester(GenericTesterMainLoop):
    """A base tester class for testing the CrosDisks server.

    A derived class should override the get_tests method to return a list of
    test methods. The perform_one_test method invokes each test method in the
    list to verify some functionalities of CrosDisks server.
    """
    def __init__(self, test):
        bus_loop = DBusGMainLoop(set_as_default=True)
        bus = dbus.SystemBus(mainloop=bus_loop)
        self.main_loop = gobject.MainLoop()
        super(CrosDisksTester, self).__init__(test, self.main_loop)
        self.cros_disks = CrosDisksClient(self.main_loop, bus)

    def get_tests(self):
        """Returns a list of test methods to be invoked by perform_one_test.

        A derived class should override this method.

        Returns:
            A list of test methods.
        """
        return []

    @ExceptionForward
    def perform_one_test(self):
        """Exercises each test method in the list returned by get_tests.
        """
        tests = self.get_tests()
        self.remaining_requirements = set([test.func_name for test in tests])
        for test in tests:
            test()
            self.requirement_completed(test.func_name)


class FilesystemTestObject(object):
    """A base class to represent a filesystem test object.

    A filesystem test object can be a file, directory or symbolic link.
    A derived class should override the _create and _verify method to implement
    how the test object should be created and verified, respectively, on a
    filesystem.
    """
    def __init__(self, path, content, mode):
        """Initializes the instance.

        Args:
            path: The relative path of the test object.
            content: The content of the test object.
            mode: The file permissions given to the test object.
        """
        self._path = path
        self._content = content
        self._mode = mode

    def create(self, base_dir):
        """Creates the test object in a base directory.

        Args:
            base_dir: The base directory where the test object is created.

        Returns:
            True if the test object is created successfully or False otherwise.
        """
        if not self._create(base_dir):
            logging.debug('Failed to create filesystem test object at "%s"',
                          os.path.join(base_dir, self._path))
            return False
        return True

    def verify(self, base_dir):
        """Verifies the test object in a base directory.

        Args:
            base_dir: The base directory where the test object is expected to be
                      found.

        Returns:
            True if the test object is found in the base directory and matches
            the expected content, or False otherwise.
        """
        if not self._verify(base_dir):
            logging.debug('Failed to verify filesystem test object at "%s"',
                          os.path.join(base_dir, self._path))
            return False
        return True

    def _create(self, base_dir):
        return False

    def _verify(self, base_dir):
        return False


class FilesystemTestDirectory(FilesystemTestObject):
    """A filesystem test object that represents a directory."""

    def __init__(self, path, content, mode=stat.S_IRWXU|stat.S_IRGRP| \
                 stat.S_IXGRP|stat.S_IROTH|stat.S_IXOTH):
        super(FilesystemTestDirectory, self).__init__(path, content, mode)

    def _create(self, base_dir):
        path = os.path.join(base_dir, self._path) if self._path else base_dir

        if self._path:
            with ExceptionSuppressor(OSError):
                os.makedirs(path)
                os.chmod(path, self._mode)

        if not os.path.isdir(path):
            return False

        for content in self._content:
            if not content.create(path):
                return False
        return True

    def _verify(self, base_dir):
        path = os.path.join(base_dir, self._path) if self._path else base_dir
        if not os.path.isdir(path):
            return False

        for content in self._content:
            if not content.verify(path):
                return False
        return True


class FilesystemTestFile(FilesystemTestObject):
    """A filesystem test object that represents a file."""

    def __init__(self, path, content, mode=stat.S_IRUSR|stat.S_IWUSR| \
                 stat.S_IRGRP|stat.S_IROTH):
        super(FilesystemTestFile, self).__init__(path, content, mode)

    def _create(self, base_dir):
        path = os.path.join(base_dir, self._path)
        with ExceptionSuppressor(IOError):
            with open(path, 'wb+') as f:
                f.write(self._content)
            with ExceptionSuppressor(OSError):
                os.chmod(path, self._mode)
            return True
        return False

    def _verify(self, base_dir):
        path = os.path.join(base_dir, self._path)
        with ExceptionSuppressor(IOError):
            with open(path, 'rb') as f:
                return f.read() == self._content
        return False


class DefaultFilesystemTestContent(FilesystemTestDirectory):
    def __init__(self):
        super(DefaultFilesystemTestContent, self).__init__('', [
            FilesystemTestFile('file1', '0123456789'),
            FilesystemTestDirectory('dir1', [
                FilesystemTestFile('file1', ''),
                FilesystemTestFile('file2', 'abcdefg'),
                FilesystemTestDirectory('dir2', [
                    FilesystemTestFile('file3', 'abcdefg'),
                ]),
            ]),
        ], stat.S_IRWXU|stat.S_IRGRP|stat.S_IXGRP|stat.S_IROTH|stat.S_IXOTH)


class VirtualFilesystemImage(object):
    def __init__(self, block_size, block_count, filesystem_type,
                 *args, **kwargs):
        """Initializes the instance.

        Args:
            block_size: The number of bytes of each block in the image.
            block_count: The number of blocks in the image.
            filesystem_type: The filesystem type to be given to the mkfs
                             program for formatting the image.

        Keyword Args:
            mount_filesystem_type: The filesystem type to be given to the
                                   mount program for mounting the image.
            mkfs_options: A list of options to be given to the mkfs program.
        """
        self._block_size = block_size
        self._block_count = block_count
        self._filesystem_type = filesystem_type
        self._mount_filesystem_type = kwargs.get('mount_filesystem_type')
        if self._mount_filesystem_type is None:
            self._mount_filesystem_type = filesystem_type
        self._mkfs_options = kwargs.get('mkfs_options')
        if self._mkfs_options is None:
            self._mkfs_options = []
        self._image_file = None
        self._loop_device = None
        self._mount_dir = None

    def __del__(self):
        with ExceptionSuppressor(Exception):
            self.clean()

    def __enter__(self):
        self.create()
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        self.clean()
        return False

    def _remove_temp_path(self, temp_path):
        """Removes a temporary file or directory created using autotemp."""
        if temp_path:
            with ExceptionSuppressor(Exception):
                path = temp_path.name
                temp_path.clean()
                logging.debug('Removed "%s"', path)

    def _remove_image_file(self):
        """Removes the image file if one has been created."""
        self._remove_temp_path(self._image_file)
        self._image_file = None

    def _remove_mount_dir(self):
        """Removes the mount directory if one has been created."""
        self._remove_temp_path(self._mount_dir)
        self._mount_dir = None

    @property
    def image_file(self):
        """Gets the path of the image file.

        Returns:
            The path of the image file or None if no image file has been
            created.
        """
        return self._image_file.name if self._image_file else None

    @property
    def loop_device(self):
        """Gets the loop device where the image file is attached to.

        Returns:
            The path of the loop device where the image file is attached to or
            None if no loop device is attaching the image file.
        """
        return self._loop_device

    @property
    def mount_dir(self):
        """Gets the directory where the image file is mounted to.

        Returns:
            The directory where the image file is mounted to or None if no
            mount directory has been created.
        """
        return self._mount_dir.name if self._mount_dir else None

    def create(self):
        """Creates a zero-filled image file with the specified size.

        The created image file is temporary and removed when clean()
        is called.
        """
        self.clean()
        self._image_file = autotemp.tempfile(unique_id='fsImage')
        try:
            logging.debug('Creating zero-filled image file at "%s"',
                          self._image_file.name)
            utils.run('dd if=/dev/zero of=%s bs=%s count=%s' %
                      (self._image_file.name, self._block_size,
                       self._block_count))
        except error.CmdError as exc:
            self._remove_image_file()
            message = 'Failed to create filesystem image: %s' % exc
            raise RuntimeError(message)

    def clean(self):
        """Removes the image file if one has been created.

        Before removal, the image file is detached from the loop device that
        it is attached to.
        """
        self.detach_from_loop_device()
        self._remove_image_file()

    def attach_to_loop_device(self):
        """Attaches the created image file to a loop device.

        Creates the image file, if one has not been created, by calling
        create().

        Returns:
            The path of the loop device where the image file is attached to.
        """
        if self._loop_device:
            return self._loop_device

        if not self._image_file:
            self.create()

        logging.debug('Attaching image file "%s" to loop device',
                      self._image_file.name)
        utils.run('losetup -f %s' % self._image_file.name)
        output = utils.system_output('losetup -j %s' % self._image_file.name)
        # output should look like: "/dev/loop0: [000d]:6329 (/tmp/test.img)"
        self._loop_device = output.split(':')[0]
        logging.debug('Attached image file "%s" to loop device "%s"',
                      self._image_file.name, self._loop_device)
        return self._loop_device

    def detach_from_loop_device(self):
        """Detaches the image file from the loop device."""
        if not self._loop_device:
            return

        self.unmount()

        logging.debug('Cleaning up remaining mount points of loop device "%s"',
                      self._loop_device)
        utils.run('umount -f %s' % self._loop_device, ignore_status=True)

        logging.debug('Detaching image file "%s" from loop device "%s"',
                      self._image_file.name, self._loop_device)
        utils.run('losetup -d %s' % self._loop_device)
        self._loop_device = None

    def format(self):
        """Formats the image file as the specified filesystem."""
        self.attach_to_loop_device()
        try:
            logging.debug('Formatting image file at "%s" as "%s" filesystem',
                          self._image_file.name, self._filesystem_type)
            utils.run('yes | mkfs -t %s %s %s' %
                      (self._filesystem_type, ' '.join(self._mkfs_options),
                       self._loop_device))
            logging.debug('blkid: %s', utils.system_output(
                'blkid -c /dev/null %s' % self._loop_device,
                ignore_status=True))
        except error.CmdError as exc:
            message = 'Failed to format filesystem image: %s' % exc
            raise RuntimeError(message)

    def mount(self, options=None):
        """Mounts the image file to a directory.

        Args:
            options: An optional list of mount options.
        """
        if self._mount_dir:
            return self._mount_dir.name

        if options is None:
            options = []

        options_arg = ','.join(options)
        if options_arg:
            options_arg = '-o ' + options_arg

        self.attach_to_loop_device()
        self._mount_dir = autotemp.tempdir(unique_id='fsImage')
        try:
            logging.debug('Mounting image file "%s" (%s) to directory "%s"',
                          self._image_file.name, self._loop_device,
                          self._mount_dir.name)
            utils.run('mount -t %s %s %s %s' %
                      (self._mount_filesystem_type, options_arg,
                       self._loop_device, self._mount_dir.name))
        except error.CmdError as exc:
            self._remove_mount_dir()
            message = ('Failed to mount virtual filesystem image "%s": %s' %
                       (self._image_file.name, exc))
            raise RuntimeError(message)
        return self._mount_dir.name

    def unmount(self):
        """Unmounts the image file from the mounted directory."""
        if not self._mount_dir:
            return

        try:
            logging.debug('Unmounting image file "%s" (%s) from directory "%s"',
                          self._image_file.name, self._loop_device,
                          self._mount_dir.name)
            utils.run('umount %s' % self._mount_dir.name)
        except error.CmdError as exc:
            message = ('Failed to unmount virtual filesystem image "%s": %s' %
                       (self._image_file.name, exc))
            raise RuntimeError(message)
        finally:
            self._remove_mount_dir()