普通文本  |  336行  |  10.83 KB

# Copyright (C) 2009 Google Inc. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
#    * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
#    * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
#    * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

import errno
import os
import re

from webkitpy.common.system import path
from webkitpy.common.system import ospath


class MockFileSystem(object):
    def __init__(self, files=None, cwd='/'):
        """Initializes a "mock" filesystem that can be used to completely
        stub out a filesystem.

        Args:
            files: a dict of filenames -> file contents. A file contents
                value of None is used to indicate that the file should
                not exist.
        """
        self.files = files or {}
        self.written_files = {}
        self._sep = '/'
        self.current_tmpno = 0
        self.cwd = cwd
        self.dirs = {}

    def _get_sep(self):
        return self._sep

    sep = property(_get_sep, doc="pathname separator")

    def _raise_not_found(self, path):
        raise IOError(errno.ENOENT, path, os.strerror(errno.ENOENT))

    def _split(self, path):
        return path.rsplit(self.sep, 1)

    def abspath(self, path):
        if os.path.isabs(path):
            return self.normpath(path)
        return self.abspath(self.join(self.cwd, path))

    def basename(self, path):
        return self._split(path)[1]

    def chdir(self, path):
        path = self.normpath(path)
        if not self.isdir(path):
            raise OSError(errno.ENOENT, path, os.strerror(errno.ENOENT))
        self.cwd = path

    def copyfile(self, source, destination):
        if not self.exists(source):
            self._raise_not_found(source)
        if self.isdir(source):
            raise IOError(errno.EISDIR, source, os.strerror(errno.ISDIR))
        if self.isdir(destination):
            raise IOError(errno.EISDIR, destination, os.strerror(errno.ISDIR))

        self.files[destination] = self.files[source]
        self.written_files[destination] = self.files[source]

    def dirname(self, path):
        return self._split(path)[0]

    def exists(self, path):
        return self.isfile(path) or self.isdir(path)

    def files_under(self, path, dirs_to_skip=[], file_filter=None):
        def filter_all(fs, dirpath, basename):
            return True

        file_filter = file_filter or filter_all
        files = []
        if self.isfile(path):
            if file_filter(self, self.dirname(path), self.basename(path)):
                files.append(path)
            return files

        if self.basename(path) in dirs_to_skip:
            return []

        if not path.endswith(self.sep):
            path += self.sep

        dir_substrings = [self.sep + d + self.sep for d in dirs_to_skip]
        for filename in self.files:
            if not filename.startswith(path):
                continue

            suffix = filename[len(path) - 1:]
            if any(dir_substring in suffix for dir_substring in dir_substrings):
                continue

            dirpath, basename = self._split(filename)
            if file_filter(self, dirpath, basename):
                files.append(filename)

        return files

    def getcwd(self, path):
        return self.cwd

    def glob(self, path):
        # FIXME: This only handles a wildcard '*' at the end of the path.
        # Maybe it should handle more?
        if path[-1] == '*':
            return [f for f in self.files if f.startswith(path[:-1])]
        else:
            return [f for f in self.files if f == path]

    def isabs(self, path):
        return path.startswith(self.sep)

    def isfile(self, path):
        return path in self.files and self.files[path] is not None

    def isdir(self, path):
        if path in self.files:
            return False
        path = self.normpath(path)
        if path in self.dirs:
            return True

        # We need to use a copy of the keys here in order to avoid switching
        # to a different thread and potentially modifying the dict in
        # mid-iteration.
        files = self.files.keys()[:]
        result = any(f.startswith(path) for f in files)
        if result:
            self.dirs[path] = True
        return result

    def join(self, *comps):
        # FIXME: might want tests for this and/or a better comment about how
        # it works.
        return re.sub(re.escape(os.path.sep), self.sep, os.path.join(*comps))

    def listdir(self, path):
        if not self.isdir(path):
            raise OSError("%s is not a directory" % path)

        if not path.endswith(self.sep):
            path += self.sep

        dirs = []
        files = []
        for f in self.files:
            if self.exists(f) and f.startswith(path):
                remaining = f[len(path):]
                if self.sep in remaining:
                    dir = remaining[:remaining.index(self.sep)]
                    if not dir in dirs:
                        dirs.append(dir)
                else:
                    files.append(remaining)
        return dirs + files

    def mtime(self, path):
        if self.exists(path):
            return 0
        self._raise_not_found(path)

    def _mktemp(self, suffix='', prefix='tmp', dir=None, **kwargs):
        if dir is None:
            dir = self.sep + '__im_tmp'
        curno = self.current_tmpno
        self.current_tmpno += 1
        return self.join(dir, "%s_%u_%s" % (prefix, curno, suffix))

    def mkdtemp(self, **kwargs):
        class TemporaryDirectory(object):
            def __init__(self, fs, **kwargs):
                self._kwargs = kwargs
                self._filesystem = fs
                self._directory_path = fs._mktemp(**kwargs)
                fs.maybe_make_directory(self._directory_path)

            def __str__(self):
                return self._directory_path

            def __enter__(self):
                return self._directory_path

            def __exit__(self, type, value, traceback):
                # Only self-delete if necessary.

                # FIXME: Should we delete non-empty directories?
                if self._filesystem.exists(self._directory_path):
                    self._filesystem.rmtree(self._directory_path)

        return TemporaryDirectory(fs=self, **kwargs)

    def maybe_make_directory(self, *path):
        norm_path = self.normpath(self.join(*path))
        if not self.isdir(norm_path):
            self.dirs[norm_path] = True

    def move(self, source, destination):
        if self.files[source] is None:
            self._raise_not_found(source)
        self.files[destination] = self.files[source]
        self.written_files[destination] = self.files[destination]
        self.files[source] = None
        self.written_files[source] = None

    def normpath(self, path):
        # Like join(), relies on os.path functionality but normalizes the
        # path separator to the mock one.
        return re.sub(re.escape(os.path.sep), self.sep, os.path.normpath(path))

    def open_binary_tempfile(self, suffix=''):
        path = self._mktemp(suffix)
        return (WritableFileObject(self, path), path)

    def open_text_file_for_writing(self, path, append=False):
        return WritableFileObject(self, path, append)

    def read_text_file(self, path):
        return self.read_binary_file(path).decode('utf-8')

    def open_binary_file_for_reading(self, path):
        if self.files[path] is None:
            self._raise_not_found(path)
        return ReadableFileObject(self, path, self.files[path])

    def read_binary_file(self, path):
        # Intentionally raises KeyError if we don't recognize the path.
        if self.files[path] is None:
            self._raise_not_found(path)
        return self.files[path]

    def relpath(self, path, start='.'):
        return ospath.relpath(path, start, self.abspath, self.sep)

    def remove(self, path):
        if self.files[path] is None:
            self._raise_not_found(path)
        self.files[path] = None
        self.written_files[path] = None

    def rmtree(self, path):
        if not path.endswith(self.sep):
            path += self.sep

        for f in self.files:
            if f.startswith(path):
                self.files[f] = None

    def splitext(self, path):
        idx = path.rfind('.')
        if idx == -1:
            idx = 0
        return (path[0:idx], path[idx:])

    def write_text_file(self, path, contents):
        return self.write_binary_file(path, contents.encode('utf-8'))

    def write_binary_file(self, path, contents):
        self.files[path] = contents
        self.written_files[path] = contents


class WritableFileObject(object):
    def __init__(self, fs, path, append=False, encoding=None):
        self.fs = fs
        self.path = path
        self.closed = False
        if path not in self.fs.files or not append:
            self.fs.files[path] = ""

    def __enter__(self):
        return self

    def __exit__(self, type, value, traceback):
        self.close()

    def close(self):
        self.closed = True

    def write(self, str):
        self.fs.files[self.path] += str
        self.fs.written_files[self.path] = self.fs.files[self.path]


class ReadableFileObject(object):
    def __init__(self, fs, path, data=""):
        self.fs = fs
        self.path = path
        self.closed = False
        self.data = data
        self.offset = 0

    def __enter__(self):
        return self

    def __exit__(self, type, value, traceback):
        self.close()

    def close(self):
        self.closed = True

    def read(self, bytes=None):
        if not bytes:
            return self.data[self.offset:]
        start = self.offset
        self.offset += bytes
        return self.data[start:self.offset]