# Copyright (c) 2014 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. import errno import logging import os import shutil import subprocess import urllib2 from autotest_lib.client.common_lib import global_config def rm_dir_if_exists(dir_to_remove): """ Removes a directory. Does not fail if the directory does NOT exist. @param dir_to_remove: path, directory to be removed. """ try: shutil.rmtree(dir_to_remove) except OSError as e: if e.errno != errno.ENOENT: raise def rm_dirs_if_exist(dirs_to_remove): """ Removes multiple directories. Does not fail if directories do NOT exist. @param dirs_to_remove: list of directory paths to be removed. """ for dr in dirs_to_remove: rm_dir_if_exists(dr) def ensure_file_exists(filepath): """ Verifies path given points to an existing file. @param filepath: path, path to check. @raises IOError if the path given does not point to a valid file. """ error_msg = 'File %s does not exist.' % filepath if not os.path.isfile(filepath): raise IOError(error_msg) def ensure_all_files_exist(filepaths): """ Verifies all paths given point to existing files. @param filepaths: List of paths to check. @raises IOError if given paths do not point to existing files. """ for filepath in filepaths: ensure_file_exists(filepath) def ensure_dir_exists(dirpath): """ Verifies path given points to an existing directory. @param dirpath: path, dir to check. @raises IOError if path does not point to an existing directory. """ error_msg = 'Directory %s does not exist.' % dirpath if not os.path.isdir(dirpath): raise IOError(error_msg) def ensure_all_dirs_exist(dirpaths): """ Verifies all paths given point to existing directories. @param dirpaths: list of directory paths to check. @raises IOError if given paths do not point to existing directories. """ for dirpath in dirpaths: ensure_dir_exists(dirpath) def make_leaf_dir(dirpath): """ Creates a directory, also creating parent directories if they do not exist. @param dirpath: path, directory to create. @raises whatever exception raised other than "path already exist". """ try: os.makedirs(dirpath) except OSError as e: if e.errno != errno.EEXIST: raise def make_leaf_dirs(dirpaths): """ Creates multiple directories building all respective parent directories if they do not exist. @param dirpaths: list of directory paths to create. @raises whatever exception raised other than "path already exists". """ for dirpath in dirpaths: make_leaf_dir(dirpath) def download_file(remote_path, local_path): """ Download file from a remote resource. @param remote_path: path, complete path to the remote file. @param local_path: path, complete path to save downloaded file. @raises: urllib2.HTTPError or urlib2.URLError exception. Both with added debug information """ client_config = global_config.global_config.get_section_values('CLIENT') proxies = {} for name, value in client_config.items('CLIENT'): if value and name.endswith('_proxy'): proxies[name[:-6]] = value if proxies: proxy_handler = urllib2.ProxyHandler(proxies) opener = urllib2.build_opener(proxy_handler) urllib2.install_opener(opener) # Unlike urllib.urlopen urllib2.urlopen will immediately throw on error # If we could not find the file pointed by remote_path we will get an # exception, catch the exception to log useful information then re-raise try: remote_file = urllib2.urlopen(remote_path) # Catch exceptions, extract exception properties and then re-raise # This helps us with debugging what went wrong quickly as we get to see # test_that output immediately except urllib2.HTTPError as e: e.msg = (("""HTTPError raised while retrieving file %s\n. Http Code = %s.\n. Reason = %s\n. Headers = %s.\n Original Message = %s.\n""") % (remote_path, e.code, e.reason, e.headers, e.msg)) raise except urllib2.URLError as e: e.msg = (("""URLError raised while retrieving file %s\n. Reason = %s\n. Original Message = %s\n.""") % (remote_path, e.reason, e.message)) raise with open(local_path, 'wb') as local_file: while True: block = remote_file.read(128 * 1024) if not block: break local_file.write(block) def get_directory_size_kibibytes(directory): """Calculate the total size of a directory with all its contents. @param directory: Path to the directory @return Size of the directory in kibibytes. """ cmd = ['du', '-sk', directory] process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout_data, stderr_data = process.communicate() if process.returncode != 0: # This function is used for statistics only, if it fails, # nothing else should crash. logging.warning('Getting size of %s failed. Stderr:', directory) logging.warning(stderr_data) return 0 return int(stdout_data.split('\t', 1)[0]) def recursive_path_permission(path): """ Recursively lists a path and its parent's permission. For a path, it emits a path_perm string "<path> <permission>" where <permission> is an octal representation of the path's st_mode. After that, it emits its parent's path_perm string(s) recursively. For example, recursive_path_permission('/foo/bar/buz') returns ['/foo/bar/buz 0100644', '/foo/bar 040755', '/foo 040755', '/ 040755'] If |path| is invalid, it returns empty list. If |path| does not have parent directory, it returns a list with only its own path_perm string. @param path: file path. @return list of "<path> <permission>" string. """ def path_st_mode(p): """ Gets path's permission. @param p: file path. @return "<path> <permission>" where <permission> is an octal representation of the path's st_mode. """ return '%s %s' % (p, oct(os.stat(p).st_mode)) if not path or not os.path.exists(path): return [] if path == '/': return [path_st_mode(path)] dirname, basename = os.path.split(path) if not basename: return recursive_path_permission(dirname) result = [path_st_mode(path)] if dirname: result.extend(recursive_path_permission(dirname)) return result