普通文本  |  375行  |  10.65 KB

# -*- coding: utf-8 -*-
# Copyright 2013 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import absolute_import

from contextlib import contextmanager
import functools
import os
import pkgutil
import posixpath
import re
import tempfile
import unittest
import urlparse

import boto
import crcmod
import gslib.tests as gslib_tests
from gslib.util import UsingCrcmodExtension

if not hasattr(unittest.TestCase, 'assertIsNone'):
  # external dependency unittest2 required for Python <= 2.6
  import unittest2 as unittest  # pylint: disable=g-import-not-at-top

# Flags for running different types of tests.
RUN_INTEGRATION_TESTS = True
RUN_UNIT_TESTS = True
RUN_S3_TESTS = False

PARALLEL_COMPOSITE_UPLOAD_TEST_CONFIG = '/tmp/.boto.parallel_upload_test_config'


def _HasS3Credentials():
  return (boto.config.get('Credentials', 'aws_access_key_id', None) and
          boto.config.get('Credentials', 'aws_secret_access_key', None))

HAS_S3_CREDS = _HasS3Credentials()


def _HasGSHost():
  return boto.config.get('Credentials', 'gs_host', None) is not None

HAS_GS_HOST = _HasGSHost()


def _UsingJSONApi():
  return boto.config.get('GSUtil', 'prefer_api', 'json').upper() != 'XML'

USING_JSON_API = _UsingJSONApi()


def _ArgcompleteAvailable():
  argcomplete = None
  try:
    # pylint: disable=g-import-not-at-top
    import argcomplete
  except ImportError:
    pass
  return argcomplete is not None

ARGCOMPLETE_AVAILABLE = _ArgcompleteAvailable()


def _NormalizeURI(uri):
  """Normalizes the path component of a URI.

  Args:
    uri: URI to normalize.

  Returns:
    Normalized URI.

  Examples:
    gs://foo//bar -> gs://foo/bar
    gs://foo/./bar -> gs://foo/bar
  """
  # Note: we have to do this dance of changing gs:// to file:// because on
  # Windows, the urlparse function won't work with URL schemes that are not
  # known. urlparse('gs://foo/bar') on Windows turns into:
  #     scheme='gs', netloc='', path='//foo/bar'
  # while on non-Windows platforms, it turns into:
  #     scheme='gs', netloc='foo', path='/bar'
  uri = uri.replace('gs://', 'file://')
  parsed = list(urlparse.urlparse(uri))
  parsed[2] = posixpath.normpath(parsed[2])
  if parsed[2].startswith('//'):
    # The normpath function doesn't change '//foo' -> '/foo' by design.
    parsed[2] = parsed[2][1:]
  unparsed = urlparse.urlunparse(parsed)
  unparsed = unparsed.replace('file://', 'gs://')
  return unparsed


def GenerationFromURI(uri):
  """Returns a the generation for a StorageUri.

  Args:
    uri: boto.storage_uri.StorageURI object to get the URI from.

  Returns:
    Generation string for the URI.
  """
  if not (uri.generation or uri.version_id):
    if uri.scheme == 's3': return 'null'
  return uri.generation or uri.version_id


def ObjectToURI(obj, *suffixes):
  """Returns the storage URI string for a given StorageUri or file object.

  Args:
    obj: The object to get the URI from. Can be a file object, a subclass of
         boto.storage_uri.StorageURI, or a string. If a string, it is assumed to
         be a local on-disk path.
    *suffixes: Suffixes to append. For example, ObjectToUri(bucketuri, 'foo')
               would return the URI for a key name 'foo' inside the given
               bucket.

  Returns:
    Storage URI string.
  """
  if isinstance(obj, file):
    return 'file://%s' % os.path.abspath(os.path.join(obj.name, *suffixes))
  if isinstance(obj, basestring):
    return 'file://%s' % os.path.join(obj, *suffixes)
  uri = obj.uri
  if suffixes:
    uri = _NormalizeURI('/'.join([uri] + list(suffixes)))

  # Storage URIs shouldn't contain a trailing slash.
  if uri.endswith('/'):
    uri = uri[:-1]
  return uri

# The mock storage service comes from the Boto library, but it is not
# distributed with Boto when installed as a package. To get around this, we
# copy the file to gslib/tests/mock_storage_service.py when building the gsutil
# package. Try and import from both places here.
# pylint: disable=g-import-not-at-top
try:
  from gslib.tests import mock_storage_service
except ImportError:
  try:
    from boto.tests.integration.s3 import mock_storage_service
  except ImportError:
    try:
      from tests.integration.s3 import mock_storage_service
    except ImportError:
      import mock_storage_service


class GSMockConnection(mock_storage_service.MockConnection):

  def __init__(self, *args, **kwargs):
    kwargs['provider'] = 'gs'
    self.debug = 0
    super(GSMockConnection, self).__init__(*args, **kwargs)

mock_connection = GSMockConnection()


class GSMockBucketStorageUri(mock_storage_service.MockBucketStorageUri):

  def connect(self, access_key_id=None, secret_access_key=None):
    return mock_connection

  def compose(self, components, headers=None):
    """Dummy implementation to allow parallel uploads with tests."""
    return self.new_key()


TEST_BOTO_REMOVE_SECTION = 'TestRemoveSection'


def _SetBotoConfig(section, name, value, revert_list):
  """Sets boto configuration temporarily for testing.

  SetBotoConfigForTest and SetBotoConfigFileForTest should be called by tests
  instead of this function. Those functions will ensure that the configuration
  is reverted to its original setting using _RevertBotoConfig.

  Args:
    section: Boto config section to set
    name: Boto config name to set
    value: Value to set
    revert_list: List for tracking configs to revert.
  """
  prev_value = boto.config.get(section, name, None)
  if not boto.config.has_section(section):
    revert_list.append((section, TEST_BOTO_REMOVE_SECTION, None))
    boto.config.add_section(section)
  revert_list.append((section, name, prev_value))
  if value is None:
    boto.config.remove_option(section, name)
  else:
    boto.config.set(section, name, value)


def _RevertBotoConfig(revert_list):
  """Reverts boto config modifications made by _SetBotoConfig.

  Args:
    revert_list: List of boto config modifications created by calls to
                 _SetBotoConfig.
  """
  sections_to_remove = []
  for section, name, value in revert_list:
    if value is None:
      if name == TEST_BOTO_REMOVE_SECTION:
        sections_to_remove.append(section)
      else:
        boto.config.remove_option(section, name)
    else:
      boto.config.set(section, name, value)
  for section in sections_to_remove:
    boto.config.remove_section(section)


def SequentialAndParallelTransfer(func):
  """Decorator for tests that perform file to object transfers, or vice versa.

  This forces the test to run once normally, and again with special boto
  config settings that will ensure that the test follows the parallel composite
  upload and/or sliced object download code paths.

  Args:
    func: Function to wrap.

  Returns:
    Wrapped function.
  """
  @functools.wraps(func)
  def Wrapper(*args, **kwargs):
    # Run the test normally once.
    func(*args, **kwargs)

    if not RUN_S3_TESTS and UsingCrcmodExtension(crcmod):
      # Try again, forcing parallel upload and sliced download.
      with SetBotoConfigForTest([
          ('GSUtil', 'parallel_composite_upload_threshold', '1'),
          ('GSUtil', 'sliced_object_download_threshold', '1'),
          ('GSUtil', 'sliced_object_download_max_components', '3'),
          ('GSUtil', 'check_hashes', 'always')]):
        func(*args, **kwargs)

  return Wrapper


@contextmanager
def SetBotoConfigForTest(boto_config_list):
  """Sets the input list of boto configs for the duration of a 'with' clause.

  Args:
    boto_config_list: list of tuples of:
      (boto config section to set, boto config name to set, value to set)

  Yields:
    Once after config is set.
  """
  revert_configs = []
  tmp_filename = None
  try:
    tmp_fd, tmp_filename = tempfile.mkstemp(prefix='gsutil-temp-cfg')
    os.close(tmp_fd)
    for boto_config in boto_config_list:
      _SetBotoConfig(boto_config[0], boto_config[1], boto_config[2],
                     revert_configs)
    with open(tmp_filename, 'w') as tmp_file:
      boto.config.write(tmp_file)

    with SetBotoConfigFileForTest(tmp_filename):
      yield
  finally:
    _RevertBotoConfig(revert_configs)
    if tmp_filename:
      try:
        os.remove(tmp_filename)
      except OSError:
        pass


@contextmanager
def SetEnvironmentForTest(env_variable_dict):
  """Sets OS environment variables for a single test."""

  def _ApplyDictToEnvironment(dict_to_apply):
    for k, v in dict_to_apply.iteritems():
      old_values[k] = os.environ.get(k)
      if v is not None:
        os.environ[k] = v
      elif k in os.environ:
        del os.environ[k]

  old_values = {}
  for k in env_variable_dict:
    old_values[k] = os.environ.get(k)

  try:
    _ApplyDictToEnvironment(env_variable_dict)
    yield
  finally:
    _ApplyDictToEnvironment(old_values)


@contextmanager
def SetBotoConfigFileForTest(boto_config_path):
  """Sets a given file as the boto config file for a single test."""
  # Setup for entering "with" block.
  try:
    old_boto_config_env_variable = os.environ['BOTO_CONFIG']
    boto_config_was_set = True
  except KeyError:
    boto_config_was_set = False
  os.environ['BOTO_CONFIG'] = boto_config_path

  try:
    yield
  finally:
    # Teardown for exiting "with" block.
    if boto_config_was_set:
      os.environ['BOTO_CONFIG'] = old_boto_config_env_variable
    else:
      os.environ.pop('BOTO_CONFIG', None)


def GetTestNames():
  """Returns a list of the names of the test modules in gslib.tests."""
  matcher = re.compile(r'^test_(?P<name>.*)$')
  names = []
  for _, modname, _ in pkgutil.iter_modules(gslib_tests.__path__):
    m = matcher.match(modname)
    if m:
      names.append(m.group('name'))
  return names


@contextmanager
def WorkingDirectory(new_working_directory):
  """Changes the working directory for the duration of a 'with' call.

  Args:
    new_working_directory: The directory to switch to before executing wrapped
      code. A None value indicates that no switching is necessary.

  Yields:
    Once after working directory has been changed.
  """
  prev_working_directory = None
  try:
    prev_working_directory = os.getcwd()
  except OSError:
    # This can happen if the current working directory no longer exists.
    pass

  if new_working_directory:
    os.chdir(new_working_directory)

  try:
    yield
  finally:
    if new_working_directory and prev_working_directory:
      os.chdir(prev_working_directory)