# -*- coding: utf-8 -*-
# Copyright 2011 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Implementation of gsutil test command."""
from __future__ import absolute_import
from collections import namedtuple
import logging
import os
import subprocess
import sys
import tempfile
import textwrap
import time
import gslib
from gslib.command import Command
from gslib.command import ResetFailureCount
from gslib.exception import CommandException
from gslib.project_id import PopulateProjectId
import gslib.tests as tests
from gslib.util import IS_WINDOWS
from gslib.util import NO_MAX
# For Python 2.6, unittest2 is required to run the tests. If it's not available,
# display an error if the test command is run instead of breaking the whole
# program.
# pylint: disable=g-import-not-at-top
try:
from gslib.tests.util import GetTestNames
from gslib.tests.util import unittest
except ImportError as e:
if 'unittest2' in str(e):
unittest = None
GetTestNames = None # pylint: disable=invalid-name
else:
raise
try:
import coverage
except ImportError:
coverage = None
_DEFAULT_TEST_PARALLEL_PROCESSES = 5
_DEFAULT_S3_TEST_PARALLEL_PROCESSES = 50
_SEQUENTIAL_ISOLATION_FLAG = 'sequential_only'
_SYNOPSIS = """
gsutil test [-l] [-u] [-f] [command command...]
"""
_DETAILED_HELP_TEXT = ("""
<B>SYNOPSIS</B>
""" + _SYNOPSIS + """
<B>DESCRIPTION</B>
The gsutil test command runs the gsutil unit tests and integration tests.
The unit tests use an in-memory mock storage service implementation, while
the integration tests send requests to the production service using the
preferred API set in the boto configuration file (see "gsutil help apis" for
details).
To run both the unit tests and integration tests, run the command with no
arguments:
gsutil test
To run the unit tests only (which run quickly):
gsutil test -u
Tests run in parallel regardless of whether the top-level -m flag is
present. To limit the number of tests run in parallel to 10 at a time:
gsutil test -p 10
To force tests to run sequentially:
gsutil test -p 1
To have sequentially-run tests stop running immediately when an error occurs:
gsutil test -f
To run tests for one or more individual commands add those commands as
arguments. For example, the following command will run the cp and mv command
tests:
gsutil test cp mv
To list available tests, run the test command with the -l argument:
gsutil test -l
The tests are defined in the code under the gslib/tests module. Each test
file is of the format test_[name].py where [name] is the test name you can
pass to this command. For example, running "gsutil test ls" would run the
tests in "gslib/tests/test_ls.py".
You can also run an individual test class or function name by passing the
test module followed by the class name and optionally a test name. For
example, to run the an entire test class by name:
gsutil test naming.GsutilNamingTests
or an individual test function:
gsutil test cp.TestCp.test_streaming
You can list the available tests under a module or class by passing arguments
with the -l option. For example, to list all available test functions in the
cp module:
gsutil test -l cp
To output test coverage:
gsutil test -c -p 500
coverage html
This will output an HTML report to a directory named 'htmlcov'.
<B>OPTIONS</B>
-c Output coverage information.
-f Exit on first sequential test failure.
-l List available tests.
-p N Run at most N tests in parallel. The default value is %d.
-s Run tests against S3 instead of GS.
-u Only run unit tests.
""" % _DEFAULT_TEST_PARALLEL_PROCESSES)
TestProcessData = namedtuple('TestProcessData',
'name return_code stdout stderr')
def MakeCustomTestResultClass(total_tests):
"""Creates a closure of CustomTestResult.
Args:
total_tests: The total number of tests being run.
Returns:
An instance of CustomTestResult.
"""
class CustomTestResult(unittest.TextTestResult):
"""A subclass of unittest.TextTestResult that prints a progress report."""
def startTest(self, test):
super(CustomTestResult, self).startTest(test)
if self.dots:
test_id = '.'.join(test.id().split('.')[-2:])
message = ('\r%d/%d finished - E[%d] F[%d] s[%d] - %s' % (
self.testsRun, total_tests, len(self.errors),
len(self.failures), len(self.skipped), test_id))
message = message[:73]
message = message.ljust(73)
self.stream.write('%s - ' % message)
return CustomTestResult
def GetTestNamesFromSuites(test_suite):
"""Takes a list of test suites and returns a list of contained test names."""
suites = [test_suite]
test_names = []
while suites:
suite = suites.pop()
for test in suite:
if isinstance(test, unittest.TestSuite):
suites.append(test)
else:
test_names.append(test.id()[len('gslib.tests.test_'):])
return test_names
# pylint: disable=protected-access
# Need to get into the guts of unittest to evaluate test cases for parallelism.
def TestCaseToName(test_case):
"""Converts a python.unittest to its gsutil test-callable name."""
return (str(test_case.__class__).split('\'')[1] + '.' +
test_case._testMethodName)
# pylint: disable=protected-access
# Need to get into the guts of unittest to evaluate test cases for parallelism.
def SplitParallelizableTestSuite(test_suite):
"""Splits a test suite into groups with different running properties.
Args:
test_suite: A python unittest test suite.
Returns:
4-part tuple of lists of test names:
(tests that must be run sequentially,
tests that must be isolated in a separate process but can be run either
sequentially or in parallel,
unit tests that can be run in parallel,
integration tests that can run in parallel)
"""
# pylint: disable=import-not-at-top
# Need to import this after test globals are set so that skip functions work.
from gslib.tests.testcase.unit_testcase import GsUtilUnitTestCase
isolated_tests = []
sequential_tests = []
parallelizable_integration_tests = []
parallelizable_unit_tests = []
items_to_evaluate = [test_suite]
cases_to_evaluate = []
# Expand the test suites into individual test cases:
while items_to_evaluate:
suite_or_case = items_to_evaluate.pop()
if isinstance(suite_or_case, unittest.suite.TestSuite):
for item in suite_or_case._tests:
items_to_evaluate.append(item)
elif isinstance(suite_or_case, unittest.TestCase):
cases_to_evaluate.append(suite_or_case)
for test_case in cases_to_evaluate:
test_method = getattr(test_case, test_case._testMethodName, None)
if getattr(test_method, 'requires_isolation', False):
# Test must be isolated to a separate process, even it if is being
# run sequentially.
isolated_tests.append(TestCaseToName(test_case))
elif not getattr(test_method, 'is_parallelizable', True):
sequential_tests.append(TestCaseToName(test_case))
elif isinstance(test_case, GsUtilUnitTestCase):
parallelizable_unit_tests.append(TestCaseToName(test_case))
else:
parallelizable_integration_tests.append(TestCaseToName(test_case))
return (sorted(sequential_tests),
sorted(isolated_tests),
sorted(parallelizable_unit_tests),
sorted(parallelizable_integration_tests))
def CountFalseInList(input_list):
"""Counts number of falses in the input list."""
num_false = 0
for item in input_list:
if not item:
num_false += 1
return num_false
def CreateTestProcesses(parallel_tests, test_index, process_list, process_done,
max_parallel_tests, root_coverage_file=None):
"""Creates test processes to run tests in parallel.
Args:
parallel_tests: List of all parallel tests.
test_index: List index of last created test before this function call.
process_list: List of running subprocesses. Created processes are appended
to this list.
process_done: List of booleans indicating process completion. One 'False'
will be added per process created.
max_parallel_tests: Maximum number of tests to run in parallel.
root_coverage_file: The root .coverage filename if coverage is requested.
Returns:
Index of last created test.
"""
orig_test_index = test_index
executable_prefix = [sys.executable] if sys.executable and IS_WINDOWS else []
s3_argument = ['-s'] if tests.util.RUN_S3_TESTS else []
process_create_start_time = time.time()
last_log_time = process_create_start_time
while (CountFalseInList(process_done) < max_parallel_tests and
test_index < len(parallel_tests)):
env = os.environ.copy()
if root_coverage_file:
env['GSUTIL_COVERAGE_OUTPUT_FILE'] = root_coverage_file
process_list.append(subprocess.Popen(
executable_prefix + [gslib.GSUTIL_PATH] +
['-o', 'GSUtil:default_project_id=' + PopulateProjectId()] +
['test'] + s3_argument +
['--' + _SEQUENTIAL_ISOLATION_FLAG] +
[parallel_tests[test_index][len('gslib.tests.test_'):]],
stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env))
test_index += 1
process_done.append(False)
if time.time() - last_log_time > 5:
print ('Created %d new processes (total %d/%d created)' %
(test_index - orig_test_index, len(process_list),
len(parallel_tests)))
last_log_time = time.time()
if test_index == len(parallel_tests):
print ('Test process creation finished (%d/%d created)' %
(len(process_list), len(parallel_tests)))
return test_index
class TestCommand(Command):
"""Implementation of gsutil test command."""
# Command specification. See base class for documentation.
command_spec = Command.CreateCommandSpec(
'test',
command_name_aliases=[],
usage_synopsis=_SYNOPSIS,
min_args=0,
max_args=NO_MAX,
supported_sub_args='uflp:sc',
file_url_ok=True,
provider_url_ok=False,
urls_start_arg=0,
supported_private_args=[_SEQUENTIAL_ISOLATION_FLAG]
)
# Help specification. See help_provider.py for documentation.
help_spec = Command.HelpSpec(
help_name='test',
help_name_aliases=[],
help_type='command_help',
help_one_line_summary='Run gsutil tests',
help_text=_DETAILED_HELP_TEXT,
subcommand_help_text={},
)
def RunParallelTests(self, parallel_integration_tests,
max_parallel_tests, coverage_filename):
"""Executes the parallel/isolated portion of the test suite.
Args:
parallel_integration_tests: List of tests to execute.
max_parallel_tests: Maximum number of parallel tests to run at once.
coverage_filename: If not None, filename for coverage output.
Returns:
(int number of test failures, float elapsed time)
"""
process_list = []
process_done = []
process_results = [] # Tuples of (name, return code, stdout, stderr)
num_parallel_failures = 0
# Number of logging cycles we ran with no progress.
progress_less_logging_cycles = 0
completed_as_of_last_log = 0
num_parallel_tests = len(parallel_integration_tests)
parallel_start_time = last_log_time = time.time()
test_index = CreateTestProcesses(
parallel_integration_tests, 0, process_list, process_done,
max_parallel_tests, root_coverage_file=coverage_filename)
while len(process_results) < num_parallel_tests:
for proc_num in xrange(len(process_list)):
if process_done[proc_num] or process_list[proc_num].poll() is None:
continue
process_done[proc_num] = True
stdout, stderr = process_list[proc_num].communicate()
process_list[proc_num].stdout.close()
process_list[proc_num].stderr.close()
# TODO: Differentiate test failures from errors.
if process_list[proc_num].returncode != 0:
num_parallel_failures += 1
process_results.append(TestProcessData(
name=parallel_integration_tests[proc_num],
return_code=process_list[proc_num].returncode,
stdout=stdout, stderr=stderr))
if len(process_list) < num_parallel_tests:
test_index = CreateTestProcesses(
parallel_integration_tests, test_index, process_list,
process_done, max_parallel_tests,
root_coverage_file=coverage_filename)
if len(process_results) < num_parallel_tests:
if time.time() - last_log_time > 5:
print '%d/%d finished - %d failures' % (
len(process_results), num_parallel_tests, num_parallel_failures)
if len(process_results) == completed_as_of_last_log:
progress_less_logging_cycles += 1
else:
completed_as_of_last_log = len(process_results)
# A process completed, so we made progress.
progress_less_logging_cycles = 0
if progress_less_logging_cycles > 4:
# Ran 5 or more logging cycles with no progress, let the user
# know which tests are running slowly or hanging.
still_running = []
for proc_num in xrange(len(process_list)):
if not process_done[proc_num]:
still_running.append(parallel_integration_tests[proc_num])
print 'Still running: %s' % still_running
# TODO: Terminate still-running processes if they
# hang for a long time.
last_log_time = time.time()
time.sleep(1)
process_run_finish_time = time.time()
if num_parallel_failures:
for result in process_results:
if result.return_code != 0:
new_stderr = result.stderr.split('\n')
print 'Results for failed test %s:' % result.name
for line in new_stderr:
print line
return (num_parallel_failures,
(process_run_finish_time - parallel_start_time))
def PrintTestResults(self, num_sequential_tests, sequential_success,
sequential_time_elapsed,
num_parallel_tests, num_parallel_failures,
parallel_time_elapsed):
"""Prints test results for parallel and sequential tests."""
# TODO: Properly track test skips.
print 'Parallel tests complete. Success: %s Fail: %s' % (
num_parallel_tests - num_parallel_failures, num_parallel_failures)
print (
'Ran %d tests in %.3fs (%d sequential in %.3fs, %d parallel in %.3fs)'
% (num_parallel_tests + num_sequential_tests,
float(sequential_time_elapsed + parallel_time_elapsed),
num_sequential_tests,
float(sequential_time_elapsed),
num_parallel_tests,
float(parallel_time_elapsed)))
print
if not num_parallel_failures and sequential_success:
print 'OK'
else:
if num_parallel_failures:
print 'FAILED (parallel tests)'
if not sequential_success:
print 'FAILED (sequential tests)'
def RunCommand(self):
"""Command entry point for the test command."""
if not unittest:
raise CommandException('On Python 2.6, the unittest2 module is required '
'to run the gsutil tests.')
failfast = False
list_tests = False
max_parallel_tests = _DEFAULT_TEST_PARALLEL_PROCESSES
perform_coverage = False
sequential_only = False
if self.sub_opts:
for o, a in self.sub_opts:
if o == '-c':
perform_coverage = True
elif o == '-f':
failfast = True
elif o == '-l':
list_tests = True
elif o == ('--' + _SEQUENTIAL_ISOLATION_FLAG):
# Called to isolate a single test in a separate process.
# Don't try to isolate it again (would lead to an infinite loop).
sequential_only = True
elif o == '-p':
max_parallel_tests = long(a)
elif o == '-s':
if not tests.util.HAS_S3_CREDS:
raise CommandException('S3 tests require S3 credentials. Please '
'add appropriate credentials to your .boto '
'file and re-run.')
tests.util.RUN_S3_TESTS = True
elif o == '-u':
tests.util.RUN_INTEGRATION_TESTS = False
if perform_coverage and not coverage:
raise CommandException(
'Coverage has been requested but the coverage module was not found. '
'You can install it with "pip install coverage".')
if (tests.util.RUN_S3_TESTS and
max_parallel_tests > _DEFAULT_S3_TEST_PARALLEL_PROCESSES):
self.logger.warn(
'Reducing parallel tests to %d due to S3 maximum bucket '
'limitations.', _DEFAULT_S3_TEST_PARALLEL_PROCESSES)
max_parallel_tests = _DEFAULT_S3_TEST_PARALLEL_PROCESSES
test_names = sorted(GetTestNames())
if list_tests and not self.args:
print 'Found %d test names:' % len(test_names)
print ' ', '\n '.join(sorted(test_names))
return 0
# Set list of commands to test if supplied.
if self.args:
commands_to_test = []
for name in self.args:
if name in test_names or name.split('.')[0] in test_names:
commands_to_test.append('gslib.tests.test_%s' % name)
else:
commands_to_test.append(name)
else:
commands_to_test = ['gslib.tests.test_%s' % name for name in test_names]
# Installs a ctrl-c handler that tries to cleanly tear down tests.
unittest.installHandler()
loader = unittest.TestLoader()
if commands_to_test:
try:
suite = loader.loadTestsFromNames(commands_to_test)
except (ImportError, AttributeError) as e:
raise CommandException('Invalid test argument name: %s' % e)
if list_tests:
test_names = GetTestNamesFromSuites(suite)
print 'Found %d test names:' % len(test_names)
print ' ', '\n '.join(sorted(test_names))
return 0
if logging.getLogger().getEffectiveLevel() <= logging.INFO:
verbosity = 1
else:
verbosity = 2
logging.disable(logging.ERROR)
if perform_coverage:
# We want to run coverage over the gslib module, but filter out the test
# modules and any third-party code. We also filter out anything under the
# temporary directory. Otherwise, the gsutil update test (which copies
# code to the temporary directory) gets included in the output.
coverage_controller = coverage.coverage(
source=['gslib'], omit=['gslib/third_party/*', 'gslib/tests/*',
tempfile.gettempdir() + '*'])
coverage_controller.erase()
coverage_controller.start()
num_parallel_failures = 0
sequential_success = False
(sequential_tests, isolated_tests,
parallel_unit_tests, parallel_integration_tests) = (
SplitParallelizableTestSuite(suite))
# Since parallel integration tests are run in a separate process, they
# won't get the override to tests.util, so skip them here.
if not tests.util.RUN_INTEGRATION_TESTS:
parallel_integration_tests = []
logging.debug('Sequential tests to run: %s', sequential_tests)
logging.debug('Isolated tests to run: %s', isolated_tests)
logging.debug('Parallel unit tests to run: %s', parallel_unit_tests)
logging.debug('Parallel integration tests to run: %s',
parallel_integration_tests)
# If we're running an already-isolated test (spawned in isolation by a
# previous test process), or we have no parallel tests to run,
# just run sequentially. For now, unit tests are always run sequentially.
run_tests_sequentially = (sequential_only or
(len(parallel_integration_tests) <= 1
and not isolated_tests))
if run_tests_sequentially:
total_tests = suite.countTestCases()
resultclass = MakeCustomTestResultClass(total_tests)
runner = unittest.TextTestRunner(verbosity=verbosity,
resultclass=resultclass,
failfast=failfast)
ret = runner.run(suite)
sequential_success = ret.wasSuccessful()
else:
if max_parallel_tests == 1:
# We can't take advantage of parallelism, though we may have tests that
# need isolation.
sequential_tests += parallel_integration_tests
parallel_integration_tests = []
sequential_start_time = time.time()
# TODO: For now, run unit tests sequentially because they are fast.
# We could potentially shave off several seconds of execution time
# by executing them in parallel with the integration tests.
if len(sequential_tests) + len(parallel_unit_tests):
print 'Running %d tests sequentially.' % (len(sequential_tests) +
len(parallel_unit_tests))
sequential_tests_to_run = sequential_tests + parallel_unit_tests
suite = loader.loadTestsFromNames(
sorted([test_name for test_name in sequential_tests_to_run]))
num_sequential_tests = suite.countTestCases()
resultclass = MakeCustomTestResultClass(num_sequential_tests)
runner = unittest.TextTestRunner(verbosity=verbosity,
resultclass=resultclass,
failfast=failfast)
ret = runner.run(suite)
sequential_success = ret.wasSuccessful()
else:
num_sequential_tests = 0
sequential_success = True
sequential_time_elapsed = time.time() - sequential_start_time
# At this point, all tests get their own process so just treat the
# isolated tests as parallel tests.
parallel_integration_tests += isolated_tests
num_parallel_tests = len(parallel_integration_tests)
if not num_parallel_tests:
pass
else:
num_processes = min(max_parallel_tests, num_parallel_tests)
if num_parallel_tests > 1 and max_parallel_tests > 1:
message = 'Running %d tests in parallel mode (%d processes).'
if num_processes > _DEFAULT_TEST_PARALLEL_PROCESSES:
message += (
' Please be patient while your CPU is incinerated. '
'If your machine becomes unresponsive, consider reducing '
'the amount of parallel test processes by running '
'\'gsutil test -p <num_processes>\'.')
print ('\n'.join(textwrap.wrap(
message % (num_parallel_tests, num_processes))))
else:
print ('Running %d tests sequentially in isolated processes.' %
num_parallel_tests)
(num_parallel_failures, parallel_time_elapsed) = self.RunParallelTests(
parallel_integration_tests, max_parallel_tests,
coverage_controller.data.filename if perform_coverage else None)
self.PrintTestResults(
num_sequential_tests, sequential_success,
sequential_time_elapsed,
num_parallel_tests, num_parallel_failures,
parallel_time_elapsed)
if perform_coverage:
coverage_controller.stop()
coverage_controller.combine()
coverage_controller.save()
print ('Coverage information was saved to: %s' %
coverage_controller.data.filename)
if sequential_success and not num_parallel_failures:
ResetFailureCount()
return 0
return 1