普通文本  |  300行  |  8.26 KB

#!/usr/bin/python

import logging, os, select, StringIO, subprocess, sys, unittest
import common
from autotest_lib.client.common_lib import logging_manager, logging_config


class PipedStringIO(object):
    """
    Like StringIO, but all I/O passes through a pipe.  This means a
    PipedStringIO is backed by a file descriptor is thus can do things like
    pass down to a subprocess.  However, this means the creating process must
    call read_pipe() (or the classmethod read_all_pipes()) periodically to read
    the pipe, and must call close() (or the classmethod cleanup()) to close the
    pipe.
    """
    _instances = set()

    def __init__(self):
        self._string_io = StringIO.StringIO()
        self._read_end, self._write_end = os.pipe()
        PipedStringIO._instances.add(self)


    def close(self):
        self._string_io.close()
        os.close(self._read_end)
        os.close(self._write_end)
        PipedStringIO._instances.remove(self)


    def write(self, data):
        os.write(self._write_end, data)


    def flush(self):
        pass


    def fileno(self):
        return self._write_end


    def getvalue(self):
        self.read_pipe()
        return self._string_io.getvalue()


    def read_pipe(self):
        while True:
            read_list, _, _ = select.select([self._read_end], [], [], 0)
            if not read_list:
                return
            self._string_io.write(os.read(self._read_end, 1024))


    @classmethod
    def read_all_pipes(cls):
        for instance in cls._instances:
            instance.read_pipe()


    @classmethod
    def cleanup_all_instances(cls):
        for instance in list(cls._instances):
            instance.close()


LOGGING_FORMAT = '%(levelname)s: %(message)s'

_EXPECTED_STDOUT = """\
print 1
system 1
INFO: logging 1
INFO: print 2
INFO: system 2
INFO: logging 2
INFO: print 6
INFO: system 6
INFO: logging 6
print 7
system 7
INFO: logging 7
"""

_EXPECTED_LOG1 = """\
INFO: print 3
INFO: system 3
INFO: logging 3
INFO: print 4
INFO: system 4
INFO: logging 4
INFO: print 5
INFO: system 5
INFO: logging 5
"""

_EXPECTED_LOG2 = """\
INFO: print 4
INFO: system 4
INFO: logging 4
"""


class DummyLoggingConfig(logging_config.LoggingConfig):
    console_formatter = logging.Formatter(LOGGING_FORMAT)

    def __init__(self):
        super(DummyLoggingConfig, self).__init__()
        self.log = PipedStringIO()


    def add_debug_file_handlers(self, log_dir, log_name=None):
        self.logger.addHandler(logging.StreamHandler(self.log))


# this isn't really a unit test since it creates subprocesses and pipes and all
# that. i just use the unittest framework because it's convenient.
class LoggingManagerTest(unittest.TestCase):
    def setUp(self):
        self.stdout = PipedStringIO()
        self._log1 = PipedStringIO()
        self._log2 = PipedStringIO()

        self._real_system_calls = False

        # the LoggingManager will change self.stdout (by design), so keep a
        # copy around
        self._original_stdout = self.stdout

        # clear out import-time logging config and reconfigure
        root_logger = logging.getLogger()
        for handler in list(root_logger.handlers):
            root_logger.removeHandler(handler)
        # use INFO to ignore debug output from logging_manager itself
        logging.basicConfig(level=logging.INFO, format=LOGGING_FORMAT,
                            stream=self.stdout)

        self._config_object = DummyLoggingConfig()
        logging_manager.LoggingManager.logging_config_object = (
                self._config_object)


    def tearDown(self):
        PipedStringIO.cleanup_all_instances()


    def _say(self, suffix):
        print >>self.stdout, 'print %s' % suffix
        if self._real_system_calls:
            os.system('echo system %s >&%s' % (suffix,
                                               self._original_stdout.fileno()))
        else:
            print >>self.stdout, 'system %s' % suffix
        logging.info('logging %s', suffix)
        PipedStringIO.read_all_pipes()


    def _setup_manager(self, manager_class=logging_manager.LoggingManager):
        def set_stdout(file_object):
            self.stdout = file_object
        manager = manager_class()
        manager.manage_stream(self.stdout, logging.INFO, set_stdout)
        return manager


    def _run_test(self, manager_class):
        manager = self._setup_manager(manager_class)

        self._say(1)

        manager.start_logging()
        self._say(2)

        manager.redirect_to_stream(self._log1)
        self._say(3)

        manager.tee_redirect_to_stream(self._log2)
        self._say(4)

        manager.undo_redirect()
        self._say(5)

        manager.undo_redirect()
        self._say(6)

        manager.stop_logging()
        self._say(7)


    def _grab_fd_info(self):
        command = 'ls -l /proc/%s/fd' % os.getpid()
        proc = subprocess.Popen(command.split(), shell=True,
                                stdout=subprocess.PIPE)
        return proc.communicate()[0]


    def _compare_logs(self, log_buffer, expected_text):
        actual_lines = log_buffer.getvalue().splitlines()
        expected_lines = expected_text.splitlines()
        if self._real_system_calls:
            # because of the many interacting processes, we can't ensure perfect
            # interleaving.  so compare sets of lines rather than ordered lines.
            actual_lines = set(actual_lines)
            expected_lines = set(expected_lines)
        self.assertEquals(actual_lines, expected_lines)


    def _check_results(self):
        # ensure our stdout was restored
        self.assertEquals(self.stdout, self._original_stdout)

        if self._real_system_calls:
            # ensure FDs were left in their original state
            self.assertEquals(self._grab_fd_info(), self._fd_info)

        self._compare_logs(self.stdout, _EXPECTED_STDOUT)
        self._compare_logs(self._log1, _EXPECTED_LOG1)
        self._compare_logs(self._log2, _EXPECTED_LOG2)


    def test_logging_manager(self):
        self._run_test(logging_manager.LoggingManager)
        self._check_results()


    def test_fd_redirection_logging_manager(self):
        self._real_system_calls = True
        self._fd_info = self._grab_fd_info()
        self._run_test(logging_manager.FdRedirectionLoggingManager)
        self._check_results()


    def test_tee_redirect_debug_dir(self):
        manager = self._setup_manager()
        manager.start_logging()

        manager.tee_redirect_debug_dir('/fake/dir', tag='mytag')
        print >>self.stdout, 'hello'

        manager.undo_redirect()
        print >>self.stdout, 'goodbye'

        manager.stop_logging()

        self._compare_logs(self.stdout,
                           'INFO: mytag : hello\nINFO: goodbye')
        self._compare_logs(self._config_object.log, 'hello\n')


class MonkeyPatchTestCase(unittest.TestCase):
    def setUp(self):
        filename = os.path.split(__file__)[1]
        if filename.endswith('.pyc'):
            filename = filename[:-1]
        self.expected_filename = filename


    def check_filename(self, filename, expected=None):
        if expected is None:
            expected = [self.expected_filename]
        self.assertIn(os.path.split(filename)[1], expected)


    def _0_test_find_caller(self):
        finder = logging_manager._logging_manager_aware_logger__find_caller
        filename, lineno, caller_name = finder(logging_manager.logger)
        self.check_filename(filename)
        self.assertEquals('test_find_caller', caller_name)


    def _1_test_find_caller(self):
        self._0_test_find_caller()


    def test_find_caller(self):
        self._1_test_find_caller()


    def _0_test_non_reported_find_caller(self):
        finder = logging_manager._logging_manager_aware_logger__find_caller
        filename, lineno, caller_name = finder(logging_manager.logger)
        # Python 2.4 unittest implementation will call the unittest method in
        # file 'unittest.py', and Python >= 2.6 does the same in 'case.py'
        self.check_filename(filename, expected=['unittest.py', 'case.py'])


    def _1_test_non_reported_find_caller(self):
        self._0_test_non_reported_find_caller()


    @logging_manager.do_not_report_as_logging_caller
    def test_non_reported_find_caller(self):
        self._1_test_non_reported_find_caller()



if __name__ == '__main__':
    unittest.main()