普通文本  |  320行  |  12.87 KB

#!/usr/bin/python

"""Tests for site_sysinfo."""

__author__ = 'dshi@google.com (Dan Shi)'

import cPickle as pickle
import filecmp
import os
import random
import shutil
import tempfile
import unittest

import common
from autotest_lib.client.bin import site_sysinfo
from autotest_lib.client.common_lib import autotemp


class diffable_logdir_test(unittest.TestCase):
    """Tests for methods in class diffable_logdir."""


    def setUp(self):
        """Initialize a temp direcotry with test files."""
        self.tempdir = autotemp.tempdir(unique_id='diffable_logdir')
        self.src_dir = os.path.join(self.tempdir.name, 'src')
        self.dest_dir = os.path.join(self.tempdir.name, 'dest')

        self.existing_files = ['existing_file_'+str(i) for i in range(3)]
        self.existing_files_folder = ['', 'sub', 'sub/sub2']
        self.existing_files_path = [os.path.join(self.src_dir, folder, f)
                                    for f,folder in zip(self.existing_files,
                                                self.existing_files_folder)]
        self.new_files = ['new_file_'+str(i) for i in range(2)]
        self.new_files_folder = ['sub', 'sub/sub3']
        self.new_files_path = [os.path.join(self.src_dir, folder, f)
                                    for f,folder in zip(self.new_files,
                                                self.new_files_folder)]

        # Create some file with random data in source directory.
        for p in self.existing_files_path:
            self.append_text_to_file(str(random.random()), p)

        self.existing_fifo_path = os.path.join(
            self.src_dir,'sub/sub2/existing_fifo')
        os.mkfifo(self.existing_fifo_path)


    def tearDown(self):
        """Clearn up."""
        self.tempdir.clean()


    def append_text_to_file(self, text, file_path):
        """Append text to the end of a file, create the file if not existed.

        @param text: text to be appended to a file.
        @param file_path: path to the file.

        """
        dir_name = os.path.dirname(file_path)
        if not os.path.exists(dir_name):
            os.makedirs(dir_name)
        with open(file_path, 'a') as f:
            f.write(text)


    def assert_trees_equal(self, dir1, dir2, ignore=None):
        """Assert two directory trees contain the same files.

        @param dir1: the left comparison directory.
        @param dir2: the right comparison directory.
        @param ignore: filenames to ignore (in any directory).

        """
        dircmps = []
        dircmps.append(filecmp.dircmp(dir1, dir2, ignore))
        while dircmps:
            dcmp = dircmps.pop()
            self.assertEqual(dcmp.left_list, dcmp.right_list)
            self.assertEqual([], dcmp.diff_files)
            dircmps.extend(dcmp.subdirs.values())


    def test_diffable_logdir_success(self):
        """Test the diff function to save new data from a directory."""
        info = site_sysinfo.diffable_logdir(self.src_dir,
                                            keep_file_hierarchy=False,
                                            append_diff_in_name=False)
        # Run the first time to collect file status.
        info.run(log_dir=None, collect_init_status=True)

        # Add new files to the test directory.
        for file_name, file_path in zip(self.new_files,
                                         self.new_files_path):
            self.append_text_to_file(file_name, file_path)

        # Temp file for existing_file_2, used to hold on the inode. If the
        # file is deleted and recreated, its inode might not change.
        existing_file_2 = self.existing_files_path[2]
        existing_file_2_tmp =  existing_file_2 + '_tmp'
        os.rename(existing_file_2, existing_file_2_tmp)

        # Append data to existing file.
        for file_name, file_path in zip(self.existing_files,
                                         self.existing_files_path):
            self.append_text_to_file(file_name, file_path)

        # Remove the tmp file.
        os.remove(existing_file_2_tmp)

        # Add a new FIFO
        new_fifo_path = os.path.join(self.src_dir, 'sub/sub2/new_fifo')
        os.mkfifo(new_fifo_path)

        # Run the second time to do diff.
        info.run(self.dest_dir, collect_init_status=False, collect_all=True)

        # Validate files in dest_dir.
        for file_name, file_path in zip(self.existing_files+self.new_files,
                                self.existing_files_path+self.new_files_path):
            file_path = file_path.replace('src', 'dest')
            with open(file_path, 'r') as f:
                self.assertEqual(file_name, f.read())

        # Assert that FIFOs are not in the diff.
        self.assertFalse(
                os.path.exists(self.existing_fifo_path.replace('src', 'dest')),
                msg='Existing FIFO present in diff sysinfo')
        self.assertFalse(
                os.path.exists(new_fifo_path.replace('src', 'dest')),
                msg='New FIFO present in diff sysinfo')

        # With collect_all=True, full sysinfo should also be present.
        full_sysinfo_path = self.dest_dir + self.src_dir
        self.assertTrue(os.path.exists(full_sysinfo_path),
                        msg='Full sysinfo not present')

        # Assert that the full sysinfo is present.
        self.assertNotEqual(self.src_dir, full_sysinfo_path)
        self.assert_trees_equal(self.src_dir, full_sysinfo_path)


class LogdirTestCase(unittest.TestCase):
    """Tests logdir.run"""

    def setUp(self):
        self.tempdir = tempfile.mkdtemp()
        self.addCleanup(shutil.rmtree, self.tempdir)

        self.from_dir = os.path.join(self.tempdir, 'from')
        self.to_dir = os.path.join(self.tempdir, 'to')
        os.mkdir(self.from_dir)
        os.mkdir(self.to_dir)

    def _destination_path(self, relative_path, from_dir=None):
        """The expected destination path for a subdir of the source directory"""
        if from_dir is None:
            from_dir = self.from_dir
        return '%s%s' % (self.to_dir, os.path.join(from_dir, relative_path))

    def test_run_recreates_absolute_source_path(self):
        """When copying files, the absolute path from the source is recreated
        in the destination folder.
        """
        os.mkdir(os.path.join(self.from_dir, 'fubar'))
        logdir = site_sysinfo.logdir(self.from_dir)
        logdir.run(self.to_dir)
        destination_path= self._destination_path('fubar')
        self.assertTrue(os.path.exists(destination_path),
                        msg='Failed to copy to %s' % destination_path)

    def test_run_skips_symlinks(self):
        os.mkdir(os.path.join(self.from_dir, 'real'))
        os.symlink(os.path.join(self.from_dir, 'real'),
                   os.path.join(self.from_dir, 'symlink'))

        logdir = site_sysinfo.logdir(self.from_dir)
        logdir.run(self.to_dir)

        destination_path_real = self._destination_path('real')
        self.assertTrue(os.path.exists(destination_path_real),
                        msg='real directory was not copied to %s' %
                        destination_path_real)
        destination_path_link = self._destination_path('symlink')
        self.assertFalse(
                os.path.exists(destination_path_link),
                msg='symlink was copied to %s' % destination_path_link)

    def test_run_resolves_symlinks_to_source_root(self):
        """run tries hard to get to the source directory before copying.

        Within the source folder, we skip any symlinks, but we first try to
        resolve symlinks to the source root itself.
        """
        os.mkdir(os.path.join(self.from_dir, 'fubar'))
        from_symlink = os.path.join(self.tempdir, 'from_symlink')
        os.symlink(self.from_dir, from_symlink)

        logdir = site_sysinfo.logdir(from_symlink)
        logdir.run(self.to_dir)

        destination_path = self._destination_path('fubar')
        self.assertTrue(os.path.exists(destination_path),
                        msg='Failed to copy to %s' % destination_path)

    def test_run_excludes_common_patterns(self):
        os.mkdir(os.path.join(self.from_dir, 'autoserv2344'))
        # Create empty file.
        open(os.path.join(self.from_dir, 'system.journal'), 'w').close()
        deeper_subdir = os.path.join('prefix', 'autoserv', 'suffix')
        os.makedirs(os.path.join(self.from_dir, deeper_subdir))

        logdir = site_sysinfo.logdir(self.from_dir)
        logdir.run(self.to_dir)

        destination_path = self._destination_path('autoserv2344')
        self.assertFalse(os.path.exists(destination_path),
                         msg='Copied banned file to %s' % destination_path)
        destination_path = self._destination_path(deeper_subdir)
        self.assertFalse(os.path.exists(destination_path),
                         msg='Copied banned file to %s' % destination_path)
        destination_path = self._destination_path('system.journal')
        self.assertFalse(os.path.exists(destination_path),
                         msg='Copied banned file to %s' % destination_path)

    def test_run_ignores_exclude_patterns_in_leading_dirs(self):
        """Exclude patterns should only be applied to path suffixes within
        from_dir, not to the root directory itself.
        """
        exclude_pattern_dir = os.path.join(self.from_dir, 'autoserv2344')
        os.makedirs(os.path.join(exclude_pattern_dir, 'fubar'))
        logdir = site_sysinfo.logdir(exclude_pattern_dir)
        logdir.run(self.to_dir)
        destination_path = self._destination_path('fubar',
                                                  from_dir=exclude_pattern_dir)
        self.assertTrue(os.path.exists(destination_path),
                        msg='Failed to copy to %s' % destination_path)

    def test_pickle_unpickle_equal(self):
        """Sanity check pickle-unpickle round-trip."""
        logdir = site_sysinfo.logdir(
                self.from_dir,
                excludes=(site_sysinfo.logdir.DEFAULT_EXCLUDES + ('a',)))
        # base_job uses protocol 2 to pickle. We follow suit.
        logdir_pickle = pickle.dumps(logdir, protocol=2)
        unpickled_logdir = pickle.loads(logdir_pickle)

        self.assertEqual(unpickled_logdir, logdir)

    def test_pickle_enforce_required_attributes(self):
        """Some attributes from this object should never be dropped.

        When running a client test against an older build, we pickle the objects
        of this class from newer version of the class and unpicle to older
        versions of the class. The older versions require some attributes to be
        present.
        """
        logdir = site_sysinfo.logdir(
                self.from_dir,
                excludes=(site_sysinfo.logdir.DEFAULT_EXCLUDES + ('a',)))
        # base_job uses protocol 2 to pickle. We follow suit.
        logdir_pickle = pickle.dumps(logdir, protocol=2)
        logdir = pickle.loads(logdir_pickle)

        self.assertEqual(logdir.additional_exclude, 'a')

    def test_pickle_enforce_required_attributes_default(self):
        """Some attributes from this object should never be dropped.

        When running a client test against an older build, we pickle the objects
        of this class from newer version of the class and unpicle to older
        versions of the class. The older versions require some attributes to be
        present.
        """
        logdir = site_sysinfo.logdir(self.from_dir)
        # base_job uses protocol 2 to pickle. We follow suit.
        logdir_pickle = pickle.dumps(logdir, protocol=2)
        logdir = pickle.loads(logdir_pickle)

        self.assertEqual(logdir.additional_exclude, None)

    def test_unpickle_handle_missing__excludes(self):
        """Sanely handle missing _excludes attribute from pickles

        This can happen when running brand new version of this class that
        introduced this attribute from older server side code in prod.
        """
        logdir = site_sysinfo.logdir(self.from_dir)
        delattr(logdir, '_excludes')
        # base_job uses protocol 2 to pickle. We follow suit.
        logdir_pickle = pickle.dumps(logdir, protocol=2)
        logdir = pickle.loads(logdir_pickle)

        self.assertEqual(logdir._excludes,
                         site_sysinfo.logdir.DEFAULT_EXCLUDES)

    def test_unpickle_handle_missing__excludes_default(self):
        """Sanely handle missing _excludes attribute from pickles

        This can happen when running brand new version of this class that
        introduced this attribute from older server side code in prod.
        """
        logdir = site_sysinfo.logdir(
                self.from_dir,
                excludes=(site_sysinfo.logdir.DEFAULT_EXCLUDES + ('a',)))
        delattr(logdir, '_excludes')
        # base_job uses protocol 2 to pickle. We follow suit.
        logdir_pickle = pickle.dumps(logdir, protocol=2)
        logdir = pickle.loads(logdir_pickle)

        self.assertEqual(
                logdir._excludes,
                (site_sysinfo.logdir.DEFAULT_EXCLUDES + ('a',)))


if __name__ == '__main__':
    unittest.main()