# Copyright (C) 2009 Google Inc. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import errno
import os
import re
from webkitpy.common.system import path
from webkitpy.common.system import ospath
class MockFileSystem(object):
def __init__(self, files=None, cwd='/'):
"""Initializes a "mock" filesystem that can be used to completely
stub out a filesystem.
Args:
files: a dict of filenames -> file contents. A file contents
value of None is used to indicate that the file should
not exist.
"""
self.files = files or {}
self.written_files = {}
self._sep = '/'
self.current_tmpno = 0
self.cwd = cwd
self.dirs = {}
def _get_sep(self):
return self._sep
sep = property(_get_sep, doc="pathname separator")
def _raise_not_found(self, path):
raise IOError(errno.ENOENT, path, os.strerror(errno.ENOENT))
def _split(self, path):
return path.rsplit(self.sep, 1)
def abspath(self, path):
if os.path.isabs(path):
return self.normpath(path)
return self.abspath(self.join(self.cwd, path))
def basename(self, path):
return self._split(path)[1]
def chdir(self, path):
path = self.normpath(path)
if not self.isdir(path):
raise OSError(errno.ENOENT, path, os.strerror(errno.ENOENT))
self.cwd = path
def copyfile(self, source, destination):
if not self.exists(source):
self._raise_not_found(source)
if self.isdir(source):
raise IOError(errno.EISDIR, source, os.strerror(errno.ISDIR))
if self.isdir(destination):
raise IOError(errno.EISDIR, destination, os.strerror(errno.ISDIR))
self.files[destination] = self.files[source]
self.written_files[destination] = self.files[source]
def dirname(self, path):
return self._split(path)[0]
def exists(self, path):
return self.isfile(path) or self.isdir(path)
def files_under(self, path, dirs_to_skip=[], file_filter=None):
def filter_all(fs, dirpath, basename):
return True
file_filter = file_filter or filter_all
files = []
if self.isfile(path):
if file_filter(self, self.dirname(path), self.basename(path)):
files.append(path)
return files
if self.basename(path) in dirs_to_skip:
return []
if not path.endswith(self.sep):
path += self.sep
dir_substrings = [self.sep + d + self.sep for d in dirs_to_skip]
for filename in self.files:
if not filename.startswith(path):
continue
suffix = filename[len(path) - 1:]
if any(dir_substring in suffix for dir_substring in dir_substrings):
continue
dirpath, basename = self._split(filename)
if file_filter(self, dirpath, basename):
files.append(filename)
return files
def getcwd(self, path):
return self.cwd
def glob(self, path):
# FIXME: This only handles a wildcard '*' at the end of the path.
# Maybe it should handle more?
if path[-1] == '*':
return [f for f in self.files if f.startswith(path[:-1])]
else:
return [f for f in self.files if f == path]
def isabs(self, path):
return path.startswith(self.sep)
def isfile(self, path):
return path in self.files and self.files[path] is not None
def isdir(self, path):
if path in self.files:
return False
path = self.normpath(path)
if path in self.dirs:
return True
# We need to use a copy of the keys here in order to avoid switching
# to a different thread and potentially modifying the dict in
# mid-iteration.
files = self.files.keys()[:]
result = any(f.startswith(path) for f in files)
if result:
self.dirs[path] = True
return result
def join(self, *comps):
# FIXME: might want tests for this and/or a better comment about how
# it works.
return re.sub(re.escape(os.path.sep), self.sep, os.path.join(*comps))
def listdir(self, path):
if not self.isdir(path):
raise OSError("%s is not a directory" % path)
if not path.endswith(self.sep):
path += self.sep
dirs = []
files = []
for f in self.files:
if self.exists(f) and f.startswith(path):
remaining = f[len(path):]
if self.sep in remaining:
dir = remaining[:remaining.index(self.sep)]
if not dir in dirs:
dirs.append(dir)
else:
files.append(remaining)
return dirs + files
def mtime(self, path):
if self.exists(path):
return 0
self._raise_not_found(path)
def _mktemp(self, suffix='', prefix='tmp', dir=None, **kwargs):
if dir is None:
dir = self.sep + '__im_tmp'
curno = self.current_tmpno
self.current_tmpno += 1
return self.join(dir, "%s_%u_%s" % (prefix, curno, suffix))
def mkdtemp(self, **kwargs):
class TemporaryDirectory(object):
def __init__(self, fs, **kwargs):
self._kwargs = kwargs
self._filesystem = fs
self._directory_path = fs._mktemp(**kwargs)
fs.maybe_make_directory(self._directory_path)
def __str__(self):
return self._directory_path
def __enter__(self):
return self._directory_path
def __exit__(self, type, value, traceback):
# Only self-delete if necessary.
# FIXME: Should we delete non-empty directories?
if self._filesystem.exists(self._directory_path):
self._filesystem.rmtree(self._directory_path)
return TemporaryDirectory(fs=self, **kwargs)
def maybe_make_directory(self, *path):
norm_path = self.normpath(self.join(*path))
if not self.isdir(norm_path):
self.dirs[norm_path] = True
def move(self, source, destination):
if self.files[source] is None:
self._raise_not_found(source)
self.files[destination] = self.files[source]
self.written_files[destination] = self.files[destination]
self.files[source] = None
self.written_files[source] = None
def normpath(self, path):
# Like join(), relies on os.path functionality but normalizes the
# path separator to the mock one.
return re.sub(re.escape(os.path.sep), self.sep, os.path.normpath(path))
def open_binary_tempfile(self, suffix=''):
path = self._mktemp(suffix)
return (WritableFileObject(self, path), path)
def open_text_file_for_writing(self, path, append=False):
return WritableFileObject(self, path, append)
def read_text_file(self, path):
return self.read_binary_file(path).decode('utf-8')
def open_binary_file_for_reading(self, path):
if self.files[path] is None:
self._raise_not_found(path)
return ReadableFileObject(self, path, self.files[path])
def read_binary_file(self, path):
# Intentionally raises KeyError if we don't recognize the path.
if self.files[path] is None:
self._raise_not_found(path)
return self.files[path]
def relpath(self, path, start='.'):
return ospath.relpath(path, start, self.abspath, self.sep)
def remove(self, path):
if self.files[path] is None:
self._raise_not_found(path)
self.files[path] = None
self.written_files[path] = None
def rmtree(self, path):
if not path.endswith(self.sep):
path += self.sep
for f in self.files:
if f.startswith(path):
self.files[f] = None
def splitext(self, path):
idx = path.rfind('.')
if idx == -1:
idx = 0
return (path[0:idx], path[idx:])
def write_text_file(self, path, contents):
return self.write_binary_file(path, contents.encode('utf-8'))
def write_binary_file(self, path, contents):
self.files[path] = contents
self.written_files[path] = contents
class WritableFileObject(object):
def __init__(self, fs, path, append=False, encoding=None):
self.fs = fs
self.path = path
self.closed = False
if path not in self.fs.files or not append:
self.fs.files[path] = ""
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
self.close()
def close(self):
self.closed = True
def write(self, str):
self.fs.files[self.path] += str
self.fs.written_files[self.path] = self.fs.files[self.path]
class ReadableFileObject(object):
def __init__(self, fs, path, data=""):
self.fs = fs
self.path = path
self.closed = False
self.data = data
self.offset = 0
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
self.close()
def close(self):
self.closed = True
def read(self, bytes=None):
if not bytes:
return self.data[self.offset:]
start = self.offset
self.offset += bytes
return self.data[start:self.offset]