# Copyright (C) 2009 Google Inc. All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are # met: # # * Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # * Redistributions in binary form must reproduce the above # copyright notice, this list of conditions and the following disclaimer # in the documentation and/or other materials provided with the # distribution. # * Neither the name of Google Inc. nor the names of its # contributors may be used to endorse or promote products derived from # this software without specific prior written permission. # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. import errno import os import re from webkitpy.common.system import path from webkitpy.common.system import ospath class MockFileSystem(object): def __init__(self, files=None, cwd='/'): """Initializes a "mock" filesystem that can be used to completely stub out a filesystem. Args: files: a dict of filenames -> file contents. A file contents value of None is used to indicate that the file should not exist. """ self.files = files or {} self.written_files = {} self._sep = '/' self.current_tmpno = 0 self.cwd = cwd self.dirs = {} def _get_sep(self): return self._sep sep = property(_get_sep, doc="pathname separator") def _raise_not_found(self, path): raise IOError(errno.ENOENT, path, os.strerror(errno.ENOENT)) def _split(self, path): return path.rsplit(self.sep, 1) def abspath(self, path): if os.path.isabs(path): return self.normpath(path) return self.abspath(self.join(self.cwd, path)) def basename(self, path): return self._split(path)[1] def chdir(self, path): path = self.normpath(path) if not self.isdir(path): raise OSError(errno.ENOENT, path, os.strerror(errno.ENOENT)) self.cwd = path def copyfile(self, source, destination): if not self.exists(source): self._raise_not_found(source) if self.isdir(source): raise IOError(errno.EISDIR, source, os.strerror(errno.ISDIR)) if self.isdir(destination): raise IOError(errno.EISDIR, destination, os.strerror(errno.ISDIR)) self.files[destination] = self.files[source] self.written_files[destination] = self.files[source] def dirname(self, path): return self._split(path)[0] def exists(self, path): return self.isfile(path) or self.isdir(path) def files_under(self, path, dirs_to_skip=[], file_filter=None): def filter_all(fs, dirpath, basename): return True file_filter = file_filter or filter_all files = [] if self.isfile(path): if file_filter(self, self.dirname(path), self.basename(path)): files.append(path) return files if self.basename(path) in dirs_to_skip: return [] if not path.endswith(self.sep): path += self.sep dir_substrings = [self.sep + d + self.sep for d in dirs_to_skip] for filename in self.files: if not filename.startswith(path): continue suffix = filename[len(path) - 1:] if any(dir_substring in suffix for dir_substring in dir_substrings): continue dirpath, basename = self._split(filename) if file_filter(self, dirpath, basename): files.append(filename) return files def getcwd(self, path): return self.cwd def glob(self, path): # FIXME: This only handles a wildcard '*' at the end of the path. # Maybe it should handle more? if path[-1] == '*': return [f for f in self.files if f.startswith(path[:-1])] else: return [f for f in self.files if f == path] def isabs(self, path): return path.startswith(self.sep) def isfile(self, path): return path in self.files and self.files[path] is not None def isdir(self, path): if path in self.files: return False path = self.normpath(path) if path in self.dirs: return True # We need to use a copy of the keys here in order to avoid switching # to a different thread and potentially modifying the dict in # mid-iteration. files = self.files.keys()[:] result = any(f.startswith(path) for f in files) if result: self.dirs[path] = True return result def join(self, *comps): # FIXME: might want tests for this and/or a better comment about how # it works. return re.sub(re.escape(os.path.sep), self.sep, os.path.join(*comps)) def listdir(self, path): if not self.isdir(path): raise OSError("%s is not a directory" % path) if not path.endswith(self.sep): path += self.sep dirs = [] files = [] for f in self.files: if self.exists(f) and f.startswith(path): remaining = f[len(path):] if self.sep in remaining: dir = remaining[:remaining.index(self.sep)] if not dir in dirs: dirs.append(dir) else: files.append(remaining) return dirs + files def mtime(self, path): if self.exists(path): return 0 self._raise_not_found(path) def _mktemp(self, suffix='', prefix='tmp', dir=None, **kwargs): if dir is None: dir = self.sep + '__im_tmp' curno = self.current_tmpno self.current_tmpno += 1 return self.join(dir, "%s_%u_%s" % (prefix, curno, suffix)) def mkdtemp(self, **kwargs): class TemporaryDirectory(object): def __init__(self, fs, **kwargs): self._kwargs = kwargs self._filesystem = fs self._directory_path = fs._mktemp(**kwargs) fs.maybe_make_directory(self._directory_path) def __str__(self): return self._directory_path def __enter__(self): return self._directory_path def __exit__(self, type, value, traceback): # Only self-delete if necessary. # FIXME: Should we delete non-empty directories? if self._filesystem.exists(self._directory_path): self._filesystem.rmtree(self._directory_path) return TemporaryDirectory(fs=self, **kwargs) def maybe_make_directory(self, *path): norm_path = self.normpath(self.join(*path)) if not self.isdir(norm_path): self.dirs[norm_path] = True def move(self, source, destination): if self.files[source] is None: self._raise_not_found(source) self.files[destination] = self.files[source] self.written_files[destination] = self.files[destination] self.files[source] = None self.written_files[source] = None def normpath(self, path): # Like join(), relies on os.path functionality but normalizes the # path separator to the mock one. return re.sub(re.escape(os.path.sep), self.sep, os.path.normpath(path)) def open_binary_tempfile(self, suffix=''): path = self._mktemp(suffix) return (WritableFileObject(self, path), path) def open_text_file_for_writing(self, path, append=False): return WritableFileObject(self, path, append) def read_text_file(self, path): return self.read_binary_file(path).decode('utf-8') def open_binary_file_for_reading(self, path): if self.files[path] is None: self._raise_not_found(path) return ReadableFileObject(self, path, self.files[path]) def read_binary_file(self, path): # Intentionally raises KeyError if we don't recognize the path. if self.files[path] is None: self._raise_not_found(path) return self.files[path] def relpath(self, path, start='.'): return ospath.relpath(path, start, self.abspath, self.sep) def remove(self, path): if self.files[path] is None: self._raise_not_found(path) self.files[path] = None self.written_files[path] = None def rmtree(self, path): if not path.endswith(self.sep): path += self.sep for f in self.files: if f.startswith(path): self.files[f] = None def splitext(self, path): idx = path.rfind('.') if idx == -1: idx = 0 return (path[0:idx], path[idx:]) def write_text_file(self, path, contents): return self.write_binary_file(path, contents.encode('utf-8')) def write_binary_file(self, path, contents): self.files[path] = contents self.written_files[path] = contents class WritableFileObject(object): def __init__(self, fs, path, append=False, encoding=None): self.fs = fs self.path = path self.closed = False if path not in self.fs.files or not append: self.fs.files[path] = "" def __enter__(self): return self def __exit__(self, type, value, traceback): self.close() def close(self): self.closed = True def write(self, str): self.fs.files[self.path] += str self.fs.written_files[self.path] = self.fs.files[self.path] class ReadableFileObject(object): def __init__(self, fs, path, data=""): self.fs = fs self.path = path self.closed = False self.data = data self.offset = 0 def __enter__(self): return self def __exit__(self, type, value, traceback): self.close() def close(self): self.closed = True def read(self, bytes=None): if not bytes: return self.data[self.offset:] start = self.offset self.offset += bytes return self.data[start:self.offset]