普通文本  |  2004行  |  77.76 KB

# -*- coding: utf-8 -*-
# Copyright 2012 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Contains the perfdiag gsutil command."""

from __future__ import absolute_import

import calendar
from collections import defaultdict
from collections import namedtuple
import contextlib
import cStringIO
import datetime
import httplib
import json
import logging
import math
import multiprocessing
import os
import random
import re
import socket
import string
import subprocess
import tempfile
import time

import boto
import boto.gs.connection

import gslib
from gslib.cloud_api import NotFoundException
from gslib.cloud_api import ServiceException
from gslib.cloud_api_helper import GetDownloadSerializationData
from gslib.command import Command
from gslib.command import DummyArgChecker
from gslib.command_argument import CommandArgument
from gslib.commands import config
from gslib.cs_api_map import ApiSelector
from gslib.exception import CommandException
from gslib.file_part import FilePart
from gslib.hashing_helper import CalculateB64EncodedMd5FromContents
from gslib.storage_url import StorageUrlFromString
from gslib.third_party.storage_apitools import storage_v1_messages as apitools_messages
from gslib.util import CheckFreeSpace
from gslib.util import DivideAndCeil
from gslib.util import GetCloudApiInstance
from gslib.util import GetFileSize
from gslib.util import GetMaxRetryDelay
from gslib.util import HumanReadableToBytes
from gslib.util import IS_LINUX
from gslib.util import MakeBitsHumanReadable
from gslib.util import MakeHumanReadable
from gslib.util import Percentile
from gslib.util import ResumableThreshold

_SYNOPSIS = """
  gsutil perfdiag [-i in.json]
  gsutil perfdiag [-o out.json] [-n objects] [-c processes]
      [-k threads] [-p parallelism type] [-y slices] [-s size] [-d directory]
      [-t tests] url...
"""

_DETAILED_HELP_TEXT = ("""
<B>SYNOPSIS</B>
""" + _SYNOPSIS + """


<B>DESCRIPTION</B>
  The perfdiag command runs a suite of diagnostic tests for a given Google
  Storage bucket.

  The 'url' parameter must name an existing bucket (e.g. gs://foo) to which
  the user has write permission. Several test files will be uploaded to and
  downloaded from this bucket. All test files will be deleted at the completion
  of the diagnostic if it finishes successfully.

  gsutil performance can be impacted by many factors at the client, server,
  and in-between, such as: CPU speed; available memory; the access path to the
  local disk; network bandwidth; contention and error rates along the path
  between gsutil and Google; operating system buffering configuration; and
  firewalls and other network elements. The perfdiag command is provided so
  that customers can run a known measurement suite when troubleshooting
  performance problems.


<B>PROVIDING DIAGNOSTIC OUTPUT TO GOOGLE CLOUD STORAGE TEAM</B>
  If the Google Cloud Storage Team asks you to run a performance diagnostic
  please use the following command, and email the output file (output.json)
  to gs-team@google.com:

    gsutil perfdiag -o output.json gs://your-bucket


<B>OPTIONS</B>
  -n          Sets the number of objects to use when downloading and uploading
              files during tests. Defaults to 5.

  -c          Sets the number of processes to use while running throughput
              experiments. The default value is 1.

  -k          Sets the number of threads per process to use while running
              throughput experiments. Each process will receive an equal number
              of threads. The default value is 1.

              Note: All specified threads and processes will be created, but may
              not by saturated with work if too few objects (specified with -n)
              and too few components (specified with -y) are specified.

  -p          Sets the type of parallelism to be used (only applicable when
              threads or processes are specified and threads * processes > 1).
              The default is to use fan. Must be one of the following:

              fan
                 Use one thread per object. This is akin to using gsutil -m cp,
                 with sliced object download / parallel composite upload
                 disabled.

              slice
                 Use Y (specified with -y) threads for each object, transferring
                 one object at a time. This is akin to using parallel object
                 download / parallel composite upload, without -m. Sliced
                 uploads not supported for s3.

              both
                 Use Y (specified with -y) threads for each object, transferring
                 multiple objects at a time. This is akin to simultaneously
                 using sliced object download / parallel composite upload and
                 gsutil -m cp. Sliced uploads not supported for s3.

  -y          Sets the number of slices to divide each file/object into while
              transferring data. Only applicable with the slice (or both)
              parallelism type. The default is 4 slices.

  -s          Sets the size (in bytes) for each of the N (set with -n) objects
              used in the read and write throughput tests. The default is 1 MiB.
              This can also be specified using byte suffixes such as 500K or 1M.
              Note: these values are interpreted as multiples of 1024 (K=1024,
              M=1024*1024, etc.)
              Note: If rthru_file or wthru_file are performed, N (set with -n)
              times as much disk space as specified will be required for the
              operation.

  -d          Sets the directory to store temporary local files in. If not
              specified, a default temporary directory will be used.

  -t          Sets the list of diagnostic tests to perform. The default is to
              run the lat, rthru, and wthru diagnostic tests. Must be a
              comma-separated list containing one or more of the following:

              lat
                 For N (set with -n) objects, write the object, retrieve its
                 metadata, read the object, and finally delete the object.
                 Record the latency of each operation.

              list
                 Write N (set with -n) objects to the bucket, record how long
                 it takes for the eventually consistent listing call to return
                 the N objects in its result, delete the N objects, then record
                 how long it takes listing to stop returning the N objects.
                 This test is off by default.

              rthru
                 Runs N (set with -n) read operations, with at most C
                 (set with -c) reads outstanding at any given time.

              rthru_file
                 The same as rthru, but simultaneously writes data to the disk,
                 to gauge the performance impact of the local disk on downloads.

              wthru
                 Runs N (set with -n) write operations, with at most C
                 (set with -c) writes outstanding at any given time.

              wthru_file
                 The same as wthru, but simultaneously reads data from the disk,
                 to gauge the performance impact of the local disk on uploads.

  -m          Adds metadata to the result JSON file. Multiple -m values can be
              specified. Example:

                  gsutil perfdiag -m "key1:val1" -m "key2:val2" gs://bucketname

              Each metadata key will be added to the top-level "metadata"
              dictionary in the output JSON file.

  -o          Writes the results of the diagnostic to an output file. The output
              is a JSON file containing system information and performance
              diagnostic results. The file can be read and reported later using
              the -i option.

  -i          Reads the JSON output file created using the -o command and prints
              a formatted description of the results.


<B>MEASURING AVAILABILITY</B>
  The perfdiag command ignores the boto num_retries configuration parameter.
  Instead, it always retries on HTTP errors in the 500 range and keeps track of
  how many 500 errors were encountered during the test. The availability
  measurement is reported at the end of the test.

  Note that HTTP responses are only recorded when the request was made in a
  single process. When using multiple processes or threads, read and write
  throughput measurements are performed in an external process, so the
  availability numbers reported won't include the throughput measurements.


<B>NOTE</B>
  The perfdiag command collects system information. It collects your IP address,
  executes DNS queries to Google servers and collects the results, and collects
  network statistics information from the output of netstat -s. It will also
  attempt to connect to your proxy server if you have one configured. None of
  this information will be sent to Google unless you choose to send it.
""")

FileDataTuple = namedtuple(
    'FileDataTuple',
    'size md5 data')

# Describes one object in a fanned download. If need_to_slice is specified as
# True, the object should be downloaded with the slice strategy. Other field
# names are the same as documented in PerfDiagCommand.Download.
FanDownloadTuple = namedtuple(
    'FanDownloadTuple',
    'need_to_slice object_name file_name serialization_data')

# Describes one slice in a sliced download.
# Field names are the same as documented in PerfDiagCommand.Download.
SliceDownloadTuple = namedtuple(
    'SliceDownloadTuple',
    'object_name file_name serialization_data start_byte end_byte')

# Describes one file in a fanned upload. If need_to_slice is specified as
# True, the file should be uploaded with the slice strategy. Other field
# names are the same as documented in PerfDiagCommand.Upload.
FanUploadTuple = namedtuple(
    'FanUploadTuple',
    'need_to_slice file_name object_name use_file')

# Describes one slice in a sliced upload.
# Field names are the same as documented in PerfDiagCommand.Upload.
SliceUploadTuple = namedtuple(
    'SliceUploadTuple',
    'file_name object_name use_file file_start file_size')

# Dict storing file_path:FileDataTuple for each temporary file used by
# perfdiag. This data should be kept outside of the PerfDiagCommand class
# since calls to Apply will make copies of all member data.
temp_file_dict = {}


class Error(Exception):
  """Base exception class for this module."""
  pass


class InvalidArgument(Error):
  """Raised on invalid arguments to functions."""
  pass


def _DownloadObject(cls, args, thread_state=None):
  """Function argument to apply for performing fanned parallel downloads.

  Args:
    cls: The calling PerfDiagCommand class instance.
    args: A FanDownloadTuple object describing this download.
    thread_state: gsutil Cloud API instance to use for the operation.
  """
  cls.gsutil_api = GetCloudApiInstance(cls, thread_state)
  if args.need_to_slice:
    cls.PerformSlicedDownload(args.object_name, args.file_name,
                              args.serialization_data)
  else:
    cls.Download(args.object_name, args.file_name, args.serialization_data)


def _DownloadSlice(cls, args, thread_state=None):
  """Function argument to apply for performing sliced downloads.

  Args:
    cls: The calling PerfDiagCommand class instance.
    args: A SliceDownloadTuple object describing this download.
    thread_state: gsutil Cloud API instance to use for the operation.
  """
  cls.gsutil_api = GetCloudApiInstance(cls, thread_state)
  cls.Download(args.object_name, args.file_name, args.serialization_data,
               args.start_byte, args.end_byte)


def _UploadObject(cls, args, thread_state=None):
  """Function argument to apply for performing fanned parallel uploads.

  Args:
    cls: The calling PerfDiagCommand class instance.
    args: A FanUploadTuple object describing this upload.
    thread_state: gsutil Cloud API instance to use for the operation.
  """
  cls.gsutil_api = GetCloudApiInstance(cls, thread_state)
  if args.need_to_slice:
    cls.PerformSlicedUpload(args.file_name, args.object_name, args.use_file)
  else:
    cls.Upload(args.file_name, args.object_name, args.use_file)


def _UploadSlice(cls, args, thread_state=None):
  """Function argument to apply for performing sliced parallel uploads.

  Args:
    cls: The calling PerfDiagCommand class instance.
    args: A SliceUploadTuple object describing this upload.
    thread_state: gsutil Cloud API instance to use for the operation.
  """
  cls.gsutil_api = GetCloudApiInstance(cls, thread_state)
  cls.Upload(args.file_name, args.object_name, args.use_file,
             args.file_start, args.file_size)


def _DeleteWrapper(cls, object_name, thread_state=None):
  """Function argument to apply for performing parallel object deletions.

  Args:
    cls: The calling PerfDiagCommand class instance.
    object_name: The object name to delete from the test bucket.
    thread_state: gsutil Cloud API instance to use for the operation.
  """
  cls.gsutil_api = GetCloudApiInstance(cls, thread_state)
  cls.Delete(object_name)


def _PerfdiagExceptionHandler(cls, e):
  """Simple exception handler to allow post-completion status."""
  cls.logger.error(str(e))


def _DummyTrackerCallback(_):
  pass


class DummyFile(object):
  """A dummy, file-like object that throws away everything written to it."""

  def write(self, *args, **kwargs):  # pylint: disable=invalid-name
    pass

  def close(self):  # pylint: disable=invalid-name
    pass


# Many functions in perfdiag re-define a temporary function based on a
# variable from a loop, resulting in a false positive from the linter.
# pylint: disable=cell-var-from-loop
class PerfDiagCommand(Command):
  """Implementation of gsutil perfdiag command."""

  # Command specification. See base class for documentation.
  command_spec = Command.CreateCommandSpec(
      'perfdiag',
      command_name_aliases=['diag', 'diagnostic', 'perf', 'performance'],
      usage_synopsis=_SYNOPSIS,
      min_args=0,
      max_args=1,
      supported_sub_args='n:c:k:p:y:s:d:t:m:i:o:',
      file_url_ok=False,
      provider_url_ok=False,
      urls_start_arg=0,
      gs_api_support=[ApiSelector.XML, ApiSelector.JSON],
      gs_default_api=ApiSelector.JSON,
      argparse_arguments=[
          CommandArgument.MakeNCloudBucketURLsArgument(1)
      ]
  )
  # Help specification. See help_provider.py for documentation.
  help_spec = Command.HelpSpec(
      help_name='perfdiag',
      help_name_aliases=[],
      help_type='command_help',
      help_one_line_summary='Run performance diagnostic',
      help_text=_DETAILED_HELP_TEXT,
      subcommand_help_text={},
  )

  # Byte sizes to use for latency testing files.
  # TODO: Consider letting the user specify these sizes with a configuration
  # parameter.
  test_lat_file_sizes = (
      0,  # 0 bytes
      1024,  # 1 KiB
      102400,  # 100 KiB
      1048576,  # 1 MiB
  )

  # Test names.
  RTHRU = 'rthru'
  RTHRU_FILE = 'rthru_file'
  WTHRU = 'wthru'
  WTHRU_FILE = 'wthru_file'
  LAT = 'lat'
  LIST = 'list'

  # Parallelism strategies.
  FAN = 'fan'
  SLICE = 'slice'
  BOTH = 'both'

  # List of all diagnostic tests.
  ALL_DIAG_TESTS = (RTHRU, RTHRU_FILE, WTHRU, WTHRU_FILE, LAT, LIST)

  # List of diagnostic tests to run by default.
  DEFAULT_DIAG_TESTS = (RTHRU, WTHRU, LAT)

  # List of parallelism strategies.
  PARALLEL_STRATEGIES = (FAN, SLICE, BOTH)

  # Google Cloud Storage XML API endpoint host.
  XML_API_HOST = boto.config.get(
      'Credentials', 'gs_host', boto.gs.connection.GSConnection.DefaultHost)
  # Google Cloud Storage XML API endpoint port.
  XML_API_PORT = boto.config.get('Credentials', 'gs_port', 80)

  # Maximum number of times to retry requests on 5xx errors.
  MAX_SERVER_ERROR_RETRIES = 5
  # Maximum number of times to retry requests on more serious errors like
  # the socket breaking.
  MAX_TOTAL_RETRIES = 10

  # The default buffer size in boto's Key object is set to 8 KiB. This becomes a
  # bottleneck at high throughput rates, so we increase it.
  KEY_BUFFER_SIZE = 16384

  # The maximum number of bytes to generate pseudo-randomly before beginning
  # to repeat bytes. This number was chosen as the next prime larger than 5 MiB.
  MAX_UNIQUE_RANDOM_BYTES = 5242883

  # Maximum amount of time, in seconds, we will wait for object listings to
  # reflect what we expect in the listing tests.
  MAX_LISTING_WAIT_TIME = 60.0

  def _Exec(self, cmd, raise_on_error=True, return_output=False,
            mute_stderr=False):
    """Executes a command in a subprocess.

    Args:
      cmd: List containing the command to execute.
      raise_on_error: Whether or not to raise an exception when a process exits
          with a non-zero return code.
      return_output: If set to True, the return value of the function is the
          stdout of the process.
      mute_stderr: If set to True, the stderr of the process is not printed to
          the console.

    Returns:
      The return code of the process or the stdout if return_output is set.

    Raises:
      Exception: If raise_on_error is set to True and any process exits with a
      non-zero return code.
    """
    self.logger.debug('Running command: %s', cmd)
    stderr = subprocess.PIPE if mute_stderr else None
    p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=stderr)
    (stdoutdata, _) = p.communicate()
    if raise_on_error and p.returncode:
      raise CommandException("Received non-zero return code (%d) from "
                             "subprocess '%s'." % (p.returncode, ' '.join(cmd)))
    return stdoutdata if return_output else p.returncode

  def _WarnIfLargeData(self):
    """Outputs a warning message if a large amount of data is being used."""
    if self.num_objects * self.thru_filesize > HumanReadableToBytes('2GiB'):
      self.logger.info('This is a large operation, and could take a while.')

  def _MakeTempFile(self, file_size=0, mem_metadata=False,
                    mem_data=False, prefix='gsutil_test_file'):
    """Creates a temporary file of the given size and returns its path.

    Args:
      file_size: The size of the temporary file to create.
      mem_metadata: If true, store md5 and file size in memory at
                    temp_file_dict[fpath].md5, tempfile_data[fpath].file_size.
      mem_data: If true, store the file data in memory at
                temp_file_dict[fpath].data
      prefix: The prefix to use for the temporary file. Defaults to
              gsutil_test_file.

    Returns:
      The file path of the created temporary file.
    """
    fd, fpath = tempfile.mkstemp(suffix='.bin', prefix=prefix,
                                 dir=self.directory, text=False)
    with os.fdopen(fd, 'wb') as fp:
      random_bytes = os.urandom(min(file_size,
                                    self.MAX_UNIQUE_RANDOM_BYTES))
      total_bytes_written = 0
      while total_bytes_written < file_size:
        num_bytes = min(self.MAX_UNIQUE_RANDOM_BYTES,
                        file_size - total_bytes_written)
        fp.write(random_bytes[:num_bytes])
        total_bytes_written += num_bytes

    if mem_metadata or mem_data:
      with open(fpath, 'rb') as fp:
        file_size = GetFileSize(fp) if mem_metadata else None
        md5 = CalculateB64EncodedMd5FromContents(fp) if mem_metadata else None
        data = fp.read() if mem_data else None
        temp_file_dict[fpath] = FileDataTuple(file_size, md5, data)

    self.temporary_files.add(fpath)
    return fpath

  def _SetUp(self):
    """Performs setup operations needed before diagnostics can be run."""

    # Stores test result data.
    self.results = {}
    # Set of file paths for local temporary files.
    self.temporary_files = set()
    # Set of names for test objects that exist in the test bucket.
    self.temporary_objects = set()
    # Total number of HTTP requests made.
    self.total_requests = 0
    # Total number of HTTP 5xx errors.
    self.request_errors = 0
    # Number of responses, keyed by response code.
    self.error_responses_by_code = defaultdict(int)
    # Total number of socket errors.
    self.connection_breaks = 0
    # Boolean to prevent doing cleanup twice.
    self.teardown_completed = False

    # Create files for latency test.
    if self.LAT in self.diag_tests:
      self.latency_files = []
      for file_size in self.test_lat_file_sizes:
        fpath = self._MakeTempFile(file_size, mem_metadata=True, mem_data=True)
        self.latency_files.append(fpath)

    # Create files for throughput tests.
    if self.diag_tests.intersection(
        (self.RTHRU, self.WTHRU, self.RTHRU_FILE, self.WTHRU_FILE)):
      # Create a file for warming up the TCP connection.
      self.tcp_warmup_file = self._MakeTempFile(
          5 * 1024 * 1024, mem_metadata=True, mem_data=True)

      # For in memory tests, throughput tests transfer the same object N times
      # instead of creating N objects, in order to avoid excessive memory usage.
      if self.diag_tests.intersection((self.RTHRU, self.WTHRU)):
        self.mem_thru_file_name = self._MakeTempFile(
            self.thru_filesize, mem_metadata=True, mem_data=True)
        self.mem_thru_object_name = os.path.basename(self.mem_thru_file_name)

      # For tests that use disk I/O, it is necessary to create N objects in
      # in order to properly measure the performance impact of seeks.
      if self.diag_tests.intersection((self.RTHRU_FILE, self.WTHRU_FILE)):
        # List of file names and corresponding object names to use for file
        # throughput tests.
        self.thru_file_names = []
        self.thru_object_names = []

        free_disk_space = CheckFreeSpace(self.directory)
        if free_disk_space >= self.thru_filesize * self.num_objects:
          self.logger.info('\nCreating %d local files each of size %s.'
                           % (self.num_objects,
                              MakeHumanReadable(self.thru_filesize)))
          self._WarnIfLargeData()
          for _ in range(self.num_objects):
            file_name = self._MakeTempFile(self.thru_filesize,
                                           mem_metadata=True)
            self.thru_file_names.append(file_name)
            self.thru_object_names.append(os.path.basename(file_name))
        else:
          raise CommandException(
              'Not enough free disk space for throughput files: '
              '%s of disk space required, but only %s available.'
              % (MakeHumanReadable(self.thru_filesize * self.num_objects),
                 MakeHumanReadable(free_disk_space)))

    # Dummy file buffer to use for downloading that goes nowhere.
    self.discard_sink = DummyFile()

    # Filter out misleading progress callback output and the incorrect
    # suggestion to use gsutil -m perfdiag.
    self.logger.addFilter(self._PerfdiagFilter())

  def _TearDown(self):
    """Performs operations to clean things up after performing diagnostics."""
    if not self.teardown_completed:
      temp_file_dict.clear()

      try:
        for fpath in self.temporary_files:
          os.remove(fpath)
        if self.delete_directory:
          os.rmdir(self.directory)
      except OSError:
        pass

      if self.threads > 1 or self.processes > 1:
        args = [obj for obj in self.temporary_objects]
        self.Apply(_DeleteWrapper, args, _PerfdiagExceptionHandler,
                   arg_checker=DummyArgChecker,
                   parallel_operations_override=True,
                   process_count=self.processes, thread_count=self.threads)
      else:
        for object_name in self.temporary_objects:
          self.Delete(object_name)
    self.teardown_completed = True

  @contextlib.contextmanager
  def _Time(self, key, bucket):
    """A context manager that measures time.

    A context manager that prints a status message before and after executing
    the inner command and times how long the inner command takes. Keeps track of
    the timing, aggregated by the given key.

    Args:
      key: The key to insert the timing value into a dictionary bucket.
      bucket: A dictionary to place the timing value in.

    Yields:
      For the context manager.
    """
    self.logger.info('%s starting...', key)
    t0 = time.time()
    yield
    t1 = time.time()
    bucket[key].append(t1 - t0)
    self.logger.info('%s done.', key)

  def _RunOperation(self, func):
    """Runs an operation with retry logic.

    Args:
      func: The function to run.

    Returns:
      True if the operation succeeds, False if aborted.
    """
    # We retry on httplib exceptions that can happen if the socket was closed
    # by the remote party or the connection broke because of network issues.
    # Only the BotoServerError is counted as a 5xx error towards the retry
    # limit.
    success = False
    server_error_retried = 0
    total_retried = 0
    i = 0
    return_val = None
    while not success:
      next_sleep = min(random.random() * (2 ** i) + 1, GetMaxRetryDelay())
      try:
        return_val = func()
        self.total_requests += 1
        success = True
      except tuple(self.exceptions) as e:
        total_retried += 1
        if total_retried > self.MAX_TOTAL_RETRIES:
          self.logger.info('Reached maximum total retries. Not retrying.')
          break
        if isinstance(e, ServiceException):
          if e.status >= 500:
            self.error_responses_by_code[e.status] += 1
            self.total_requests += 1
            self.request_errors += 1
            server_error_retried += 1
            time.sleep(next_sleep)
          else:
            raise
          if server_error_retried > self.MAX_SERVER_ERROR_RETRIES:
            self.logger.info(
                'Reached maximum server error retries. Not retrying.')
            break
        else:
          self.connection_breaks += 1
    return return_val

  def _RunLatencyTests(self):
    """Runs latency tests."""
    # Stores timing information for each category of operation.
    self.results['latency'] = defaultdict(list)

    for i in range(self.num_objects):
      self.logger.info('\nRunning latency iteration %d...', i+1)
      for fpath in self.latency_files:
        file_data = temp_file_dict[fpath]
        url = self.bucket_url.Clone()
        url.object_name = os.path.basename(fpath)
        file_size = file_data.size
        readable_file_size = MakeHumanReadable(file_size)

        self.logger.info(
            "\nFile of size %s located on disk at '%s' being diagnosed in the "
            "cloud at '%s'.", readable_file_size, fpath, url)

        upload_target = StorageUrlToUploadObjectMetadata(url)

        def _Upload():
          io_fp = cStringIO.StringIO(file_data.data)
          with self._Time('UPLOAD_%d' % file_size, self.results['latency']):
            self.gsutil_api.UploadObject(
                io_fp, upload_target, size=file_size, provider=self.provider,
                fields=['name'])
        self._RunOperation(_Upload)

        def _Metadata():
          with self._Time('METADATA_%d' % file_size, self.results['latency']):
            return self.gsutil_api.GetObjectMetadata(
                url.bucket_name, url.object_name,
                provider=self.provider, fields=['name', 'contentType',
                                                'mediaLink', 'size'])
        # Download will get the metadata first if we don't pass it in.
        download_metadata = self._RunOperation(_Metadata)
        serialization_data = GetDownloadSerializationData(download_metadata)

        def _Download():
          with self._Time('DOWNLOAD_%d' % file_size, self.results['latency']):
            self.gsutil_api.GetObjectMedia(
                url.bucket_name, url.object_name, self.discard_sink,
                provider=self.provider, serialization_data=serialization_data)
        self._RunOperation(_Download)

        def _Delete():
          with self._Time('DELETE_%d' % file_size, self.results['latency']):
            self.gsutil_api.DeleteObject(url.bucket_name, url.object_name,
                                         provider=self.provider)
        self._RunOperation(_Delete)

  class _PerfdiagFilter(logging.Filter):

    def filter(self, record):
      # Used to prevent unnecessary output when using multiprocessing.
      msg = record.getMessage()
      return not (('Copying file:///' in msg) or ('Copying gs://' in msg) or
                  ('Computing CRC' in msg) or ('gsutil -m perfdiag' in msg))

  def _PerfdiagExceptionHandler(self, e):
    """Simple exception handler to allow post-completion status."""
    self.logger.error(str(e))

  def PerformFannedDownload(self, need_to_slice, object_names, file_names,
                            serialization_data):
    """Performs a parallel download of multiple objects using the fan strategy.

    Args:
      need_to_slice: If True, additionally apply the slice strategy to each
                     object in object_names.
      object_names: A list of object names to be downloaded. Each object must
                    already exist in the test bucket.
      file_names: A list, corresponding by index to object_names, of file names
                  for downloaded data. If None, discard downloaded data.
      serialization_data: A list, corresponding by index to object_names,
                          of serialization data for each object.
    """
    args = []
    for i in range(len(object_names)):
      file_name = file_names[i] if file_names else None
      args.append(FanDownloadTuple(
          need_to_slice, object_names[i], file_name,
          serialization_data[i]))
    self.Apply(_DownloadObject, args, _PerfdiagExceptionHandler,
               ('total_requests', 'request_errors'),
               arg_checker=DummyArgChecker, parallel_operations_override=True,
               process_count=self.processes, thread_count=self.threads)

  def PerformSlicedDownload(self, object_name, file_name, serialization_data):
    """Performs a download of an object using the slice strategy.

    Args:
      object_name: The name of the object to download.
      file_name: The name of the file to download data to, or None if data
                 should be discarded.
      serialization_data: The serialization data for the object.
    """
    if file_name:
      with open(file_name, 'ab') as fp:
        fp.truncate(self.thru_filesize)
    component_size = DivideAndCeil(self.thru_filesize, self.num_slices)
    args = []
    for i in range(self.num_slices):
      start_byte = i * component_size
      end_byte = min((i + 1) * (component_size) - 1, self.thru_filesize - 1)
      args.append(SliceDownloadTuple(object_name, file_name, serialization_data,
                                     start_byte, end_byte))
    self.Apply(_DownloadSlice, args, _PerfdiagExceptionHandler,
               ('total_requests', 'request_errors'),
               arg_checker=DummyArgChecker, parallel_operations_override=True,
               process_count=self.processes, thread_count=self.threads)

  def PerformFannedUpload(self, need_to_slice, file_names, object_names,
                          use_file):
    """Performs a parallel upload of multiple files using the fan strategy.

    The metadata for file_name should be present in temp_file_dict prior
    to calling. Also, the data for file_name should be present in temp_file_dict
    if use_file is specified as False.

    Args:
      need_to_slice: If True, additionally apply the slice strategy to each
                     file in file_names.
      file_names: A list of file names to be uploaded.
      object_names: A list, corresponding by by index to file_names, of object
                    names to upload data to.
      use_file: If true, use disk I/O, otherwise read upload data from memory.
    """
    args = []
    for i in range(len(file_names)):
      args.append(FanUploadTuple(
          need_to_slice, file_names[i], object_names[i], use_file))
    self.Apply(_UploadObject, args, _PerfdiagExceptionHandler,
               ('total_requests', 'request_errors'),
               arg_checker=DummyArgChecker, parallel_operations_override=True,
               process_count=self.processes, thread_count=self.threads)

  def PerformSlicedUpload(self, file_name, object_name, use_file):
    """Performs a parallel upload of a file using the slice strategy.

    The metadata for file_name should be present in temp_file_dict prior
    to calling. Also, the data from for file_name should be present in
    temp_file_dict if use_file is specified as False.

    Args:
      file_name: The name of the file to upload.
      object_name: The name of the object to upload to.
      use_file: If true, use disk I/O, otherwise read upload data from memory.
    """
    # Divide the file into components.
    component_size = DivideAndCeil(self.thru_filesize, self.num_slices)
    component_object_names = (
        [object_name + str(i) for i in range(self.num_slices)])

    args = []
    for i in range(self.num_slices):
      component_start = i * component_size
      component_size = min(component_size,
                           temp_file_dict[file_name].size - component_start)
      args.append(SliceUploadTuple(file_name, component_object_names[i],
                                   use_file, component_start, component_size))

    # Upload the components in parallel.
    try:
      self.Apply(_UploadSlice, args, _PerfdiagExceptionHandler,
                 ('total_requests', 'request_errors'),
                 arg_checker=DummyArgChecker, parallel_operations_override=True,
                 process_count=self.processes, thread_count=self.threads)

      # Compose the components into an object.
      request_components = []
      for i in range(self.num_slices):
        src_obj_metadata = (
            apitools_messages.ComposeRequest.SourceObjectsValueListEntry(
                name=component_object_names[i]))
        request_components.append(src_obj_metadata)

      dst_obj_metadata = apitools_messages.Object()
      dst_obj_metadata.name = object_name
      dst_obj_metadata.bucket = self.bucket_url.bucket_name
      def _Compose():
        self.gsutil_api.ComposeObject(request_components, dst_obj_metadata,
                                      provider=self.provider)
      self._RunOperation(_Compose)
    finally:
      # Delete the temporary components.
      self.Apply(_DeleteWrapper, component_object_names,
                 _PerfdiagExceptionHandler,
                 ('total_requests', 'request_errors'),
                 arg_checker=DummyArgChecker, parallel_operations_override=True,
                 process_count=self.processes, thread_count=self.threads)

  def _RunReadThruTests(self, use_file=False):
    """Runs read throughput tests."""
    test_name = 'read_throughput_file' if use_file else 'read_throughput'
    file_io_string = 'with file I/O' if use_file else ''
    self.logger.info(
        '\nRunning read throughput tests %s (%s objects of size %s)' %
        (file_io_string, self.num_objects,
         MakeHumanReadable(self.thru_filesize)))
    self._WarnIfLargeData()

    self.results[test_name] = {'file_size': self.thru_filesize,
                               'processes': self.processes,
                               'threads': self.threads,
                               'parallelism': self.parallel_strategy
                              }

    # Copy the file(s) to the test bucket, and also get the serialization data
    # so that we can pass it to download.
    if use_file:
      # For test with file I/O use N files on disk to preserve seek performance.
      file_names = self.thru_file_names
      object_names = self.thru_object_names
      serialization_data = []
      for i in range(self.num_objects):
        self.temporary_objects.add(self.thru_object_names[i])
        if self.WTHRU_FILE in self.diag_tests:
          # If we ran the WTHRU_FILE test, then the objects already exist.
          obj_metadata = self.gsutil_api.GetObjectMetadata(
              self.bucket_url.bucket_name, self.thru_object_names[i],
              fields=['size', 'mediaLink'], provider=self.bucket_url.scheme)
        else:
          obj_metadata = self.Upload(self.thru_file_names[i],
                                     self.thru_object_names[i], use_file)

        # File overwrite causes performance issues with sliced downloads.
        # Delete the file and reopen it for download. This matches what a real
        # download would look like.
        os.unlink(self.thru_file_names[i])
        open(self.thru_file_names[i], 'ab').close()
        serialization_data.append(GetDownloadSerializationData(obj_metadata))
    else:
      # For in-memory test only use one file but copy it num_objects times, to
      # allow scalability in num_objects.
      self.temporary_objects.add(self.mem_thru_object_name)
      obj_metadata = self.Upload(self.mem_thru_file_name,
                                 self.mem_thru_object_name, use_file)
      file_names = None
      object_names = [self.mem_thru_object_name] * self.num_objects
      serialization_data = (
          [GetDownloadSerializationData(obj_metadata)] * self.num_objects)

    # Warmup the TCP connection.
    warmup_obj_name = os.path.basename(self.tcp_warmup_file)
    self.temporary_objects.add(warmup_obj_name)
    self.Upload(self.tcp_warmup_file, warmup_obj_name)
    self.Download(warmup_obj_name)

    t0 = time.time()
    if self.processes == 1 and self.threads == 1:
      for i in range(self.num_objects):
        file_name = file_names[i] if use_file else None
        self.Download(object_names[i], file_name, serialization_data[i])
    else:
      if self.parallel_strategy in (self.FAN, self.BOTH):
        need_to_slice = (self.parallel_strategy == self.BOTH)
        self.PerformFannedDownload(need_to_slice, object_names, file_names,
                                   serialization_data)
      elif self.parallel_strategy == self.SLICE:
        for i in range(self.num_objects):
          file_name = file_names[i] if use_file else None
          self.PerformSlicedDownload(
              object_names[i], file_name, serialization_data[i])
    t1 = time.time()

    time_took = t1 - t0
    total_bytes_copied = self.thru_filesize * self.num_objects
    bytes_per_second = total_bytes_copied / time_took

    self.results[test_name]['time_took'] = time_took
    self.results[test_name]['total_bytes_copied'] = total_bytes_copied
    self.results[test_name]['bytes_per_second'] = bytes_per_second

  def _RunWriteThruTests(self, use_file=False):
    """Runs write throughput tests."""
    test_name = 'write_throughput_file' if use_file else 'write_throughput'
    file_io_string = 'with file I/O' if use_file else ''
    self.logger.info(
        '\nRunning write throughput tests %s (%s objects of size %s)' %
        (file_io_string, self.num_objects,
         MakeHumanReadable(self.thru_filesize)))
    self._WarnIfLargeData()

    self.results[test_name] = {'file_size': self.thru_filesize,
                               'processes': self.processes,
                               'threads': self.threads,
                               'parallelism': self.parallel_strategy}

    # Warmup the TCP connection.
    warmup_obj_name = os.path.basename(self.tcp_warmup_file)
    self.temporary_objects.add(warmup_obj_name)
    self.Upload(self.tcp_warmup_file, warmup_obj_name)

    if use_file:
      # For test with file I/O use N files on disk to preserve seek performance.
      file_names = self.thru_file_names
      object_names = self.thru_object_names
    else:
      # For in-memory test only use one file but copy it num_objects times, to
      # allow for scalability in num_objects.
      file_names = [self.mem_thru_file_name] * self.num_objects
      object_names = (
          [self.mem_thru_object_name + str(i) for i in range(self.num_objects)])

    for object_name in object_names:
      self.temporary_objects.add(object_name)

    t0 = time.time()
    if self.processes == 1 and self.threads == 1:
      for i in range(self.num_objects):
        self.Upload(file_names[i], object_names[i], use_file)
    else:
      if self.parallel_strategy in (self.FAN, self.BOTH):
        need_to_slice = (self.parallel_strategy == self.BOTH)
        self.PerformFannedUpload(need_to_slice, file_names, object_names,
                                 use_file)
      elif self.parallel_strategy == self.SLICE:
        for i in range(self.num_objects):
          self.PerformSlicedUpload(file_names[i], object_names[i], use_file)
    t1 = time.time()

    time_took = t1 - t0
    total_bytes_copied = self.thru_filesize * self.num_objects
    bytes_per_second = total_bytes_copied / time_took

    self.results[test_name]['time_took'] = time_took
    self.results[test_name]['total_bytes_copied'] = total_bytes_copied
    self.results[test_name]['bytes_per_second'] = bytes_per_second

  def _RunListTests(self):
    """Runs eventual consistency listing latency tests."""
    self.results['listing'] = {'num_files': self.num_objects}

    # Generate N random objects to put into the bucket.
    list_prefix = 'gsutil-perfdiag-list-'
    list_fpaths = []
    list_objects = []
    args = []
    for _ in xrange(self.num_objects):
      fpath = self._MakeTempFile(0, mem_data=True, mem_metadata=True,
                                 prefix=list_prefix)
      list_fpaths.append(fpath)
      object_name = os.path.basename(fpath)
      list_objects.append(object_name)
      args.append(FanUploadTuple(False, fpath, object_name, False))
      self.temporary_objects.add(object_name)

    # Add the objects to the bucket.
    self.logger.info(
        '\nWriting %s objects for listing test...', self.num_objects)

    self.Apply(_UploadObject, args, _PerfdiagExceptionHandler,
               arg_checker=DummyArgChecker)

    list_latencies = []
    files_seen = []
    total_start_time = time.time()
    expected_objects = set(list_objects)
    found_objects = set()

    def _List():
      """Lists and returns objects in the bucket. Also records latency."""
      t0 = time.time()
      objects = list(self.gsutil_api.ListObjects(
          self.bucket_url.bucket_name, delimiter='/',
          provider=self.provider, fields=['items/name']))
      t1 = time.time()
      list_latencies.append(t1 - t0)
      return set([obj.data.name for obj in objects])

    self.logger.info(
        'Listing bucket %s waiting for %s objects to appear...',
        self.bucket_url.bucket_name, self.num_objects)
    while expected_objects - found_objects:
      def _ListAfterUpload():
        names = _List()
        found_objects.update(names & expected_objects)
        files_seen.append(len(found_objects))
      self._RunOperation(_ListAfterUpload)
      if expected_objects - found_objects:
        if time.time() - total_start_time > self.MAX_LISTING_WAIT_TIME:
          self.logger.warning('Maximum time reached waiting for listing.')
          break
    total_end_time = time.time()

    self.results['listing']['insert'] = {
        'num_listing_calls': len(list_latencies),
        'list_latencies': list_latencies,
        'files_seen_after_listing': files_seen,
        'time_took': total_end_time - total_start_time,
    }

    args = [object_name for object_name in list_objects]
    self.logger.info(
        'Deleting %s objects for listing test...', self.num_objects)
    self.Apply(_DeleteWrapper, args, _PerfdiagExceptionHandler,
               arg_checker=DummyArgChecker)

    self.logger.info(
        'Listing bucket %s waiting for %s objects to disappear...',
        self.bucket_url.bucket_name, self.num_objects)
    list_latencies = []
    files_seen = []
    total_start_time = time.time()
    found_objects = set(list_objects)
    while found_objects:
      def _ListAfterDelete():
        names = _List()
        found_objects.intersection_update(names)
        files_seen.append(len(found_objects))
      self._RunOperation(_ListAfterDelete)
      if found_objects:
        if time.time() - total_start_time > self.MAX_LISTING_WAIT_TIME:
          self.logger.warning('Maximum time reached waiting for listing.')
          break
    total_end_time = time.time()

    self.results['listing']['delete'] = {
        'num_listing_calls': len(list_latencies),
        'list_latencies': list_latencies,
        'files_seen_after_listing': files_seen,
        'time_took': total_end_time - total_start_time,
    }

  def Upload(self, file_name, object_name, use_file=False, file_start=0,
             file_size=None):
    """Performs an upload to the test bucket.

    The file is uploaded to the bucket referred to by self.bucket_url, and has
    name object_name.

    Args:
      file_name: The path to the local file, and the key to its entry in
                 temp_file_dict.
      object_name: The name of the remote object.
      use_file: If true, use disk I/O, otherwise read everything from memory.
      file_start: The first byte in the file to upload to the object.
                  (only should be specified for sliced uploads)
      file_size: The size of the file to upload.
                 (only should be specified for sliced uploads)

    Returns:
      Uploaded Object Metadata.
    """
    fp = None
    if file_size is None:
      file_size = temp_file_dict[file_name].size

    upload_url = self.bucket_url.Clone()
    upload_url.object_name = object_name
    upload_target = StorageUrlToUploadObjectMetadata(upload_url)

    try:
      if use_file:
        fp = FilePart(file_name, file_start, file_size)
      else:
        data = temp_file_dict[file_name].data[file_start:file_start+file_size]
        fp = cStringIO.StringIO(data)

      def _InnerUpload():
        if file_size < ResumableThreshold():
          return self.gsutil_api.UploadObject(
              fp, upload_target, provider=self.provider, size=file_size,
              fields=['name', 'mediaLink', 'size'])
        else:
          return self.gsutil_api.UploadObjectResumable(
              fp, upload_target, provider=self.provider, size=file_size,
              fields=['name', 'mediaLink', 'size'],
              tracker_callback=_DummyTrackerCallback)
      return self._RunOperation(_InnerUpload)
    finally:
      if fp:
        fp.close()

  def Download(self, object_name, file_name=None, serialization_data=None,
               start_byte=0, end_byte=None):
    """Downloads an object from the test bucket.

    Args:
      object_name: The name of the object (in the test bucket) to download.
      file_name: Optional file name to write downloaded data to. If None,
                 downloaded data is discarded immediately.
      serialization_data: Optional serialization data, used so that we don't
                          have to get the metadata before downloading.
      start_byte: The first byte in the object to download.
                  (only should be specified for sliced downloads)
      end_byte: The last byte in the object to download.
                (only should be specified for sliced downloads)
    """
    fp = None
    try:
      if file_name is not None:
        fp = open(file_name, 'r+b')
        fp.seek(start_byte)
      else:
        fp = self.discard_sink

      def _InnerDownload():
        self.gsutil_api.GetObjectMedia(
            self.bucket_url.bucket_name, object_name, fp,
            provider=self.provider, start_byte=start_byte, end_byte=end_byte,
            serialization_data=serialization_data)
      self._RunOperation(_InnerDownload)
    finally:
      if fp:
        fp.close()

  def Delete(self, object_name):
    """Deletes an object from the test bucket.

    Args:
      object_name: The name of the object to delete.
    """
    try:
      def _InnerDelete():
        self.gsutil_api.DeleteObject(self.bucket_url.bucket_name, object_name,
                                     provider=self.provider)
      self._RunOperation(_InnerDelete)
    except NotFoundException:
      pass

  def _GetDiskCounters(self):
    """Retrieves disk I/O statistics for all disks.

    Adapted from the psutil module's psutil._pslinux.disk_io_counters:
      http://code.google.com/p/psutil/source/browse/trunk/psutil/_pslinux.py

    Originally distributed under under a BSD license.
    Original Copyright (c) 2009, Jay Loden, Dave Daeschler, Giampaolo Rodola.

    Returns:
      A dictionary containing disk names mapped to the disk counters from
      /disk/diskstats.
    """
    # iostat documentation states that sectors are equivalent with blocks and
    # have a size of 512 bytes since 2.4 kernels. This value is needed to
    # calculate the amount of disk I/O in bytes.
    sector_size = 512

    partitions = []
    with open('/proc/partitions', 'r') as f:
      lines = f.readlines()[2:]
      for line in lines:
        _, _, _, name = line.split()
        if name[-1].isdigit():
          partitions.append(name)

    retdict = {}
    with open('/proc/diskstats', 'r') as f:
      for line in f:
        values = line.split()[:11]
        _, _, name, reads, _, rbytes, rtime, writes, _, wbytes, wtime = values
        if name in partitions:
          rbytes = int(rbytes) * sector_size
          wbytes = int(wbytes) * sector_size
          reads = int(reads)
          writes = int(writes)
          rtime = int(rtime)
          wtime = int(wtime)
          retdict[name] = (reads, writes, rbytes, wbytes, rtime, wtime)
    return retdict

  def _GetTcpStats(self):
    """Tries to parse out TCP packet information from netstat output.

    Returns:
       A dictionary containing TCP information, or None if netstat is not
       available.
    """
    # netstat return code is non-zero for -s on Linux, so don't raise on error.
    try:
      netstat_output = self._Exec(['netstat', '-s'], return_output=True,
                                  raise_on_error=False)
    except OSError:
      self.logger.warning('netstat not found on your system; some measurement '
                          'data will be missing')
      return None
    netstat_output = netstat_output.strip().lower()
    found_tcp = False
    tcp_retransmit = None
    tcp_received = None
    tcp_sent = None
    for line in netstat_output.split('\n'):
      # Header for TCP section is "Tcp:" in Linux/Mac and
      # "TCP Statistics for" in Windows.
      if 'tcp:' in line or 'tcp statistics' in line:
        found_tcp = True

      # Linux == "segments retransmited" (sic), Mac == "retransmit timeouts"
      # Windows == "segments retransmitted".
      if (found_tcp and tcp_retransmit is None and
          ('segments retransmited' in line or 'retransmit timeouts' in line or
           'segments retransmitted' in line)):
        tcp_retransmit = ''.join(c for c in line if c in string.digits)

      # Linux+Windows == "segments received", Mac == "packets received".
      if (found_tcp and tcp_received is None and
          ('segments received' in line or 'packets received' in line)):
        tcp_received = ''.join(c for c in line if c in string.digits)

      # Linux == "segments send out" (sic), Mac+Windows == "packets sent".
      if (found_tcp and tcp_sent is None and
          ('segments send out' in line or 'packets sent' in line or
           'segments sent' in line)):
        tcp_sent = ''.join(c for c in line if c in string.digits)

    result = {}
    try:
      result['tcp_retransmit'] = int(tcp_retransmit)
      result['tcp_received'] = int(tcp_received)
      result['tcp_sent'] = int(tcp_sent)
    except (ValueError, TypeError):
      result['tcp_retransmit'] = None
      result['tcp_received'] = None
      result['tcp_sent'] = None

    return result

  def _CollectSysInfo(self):
    """Collects system information."""
    sysinfo = {}

    # All exceptions that might be raised from socket module calls.
    socket_errors = (
        socket.error, socket.herror, socket.gaierror, socket.timeout)

    # Find out whether HTTPS is enabled in Boto.
    sysinfo['boto_https_enabled'] = boto.config.get('Boto', 'is_secure', True)

    # Look up proxy info.
    proxy_host = boto.config.get('Boto', 'proxy', None)
    proxy_port = boto.config.getint('Boto', 'proxy_port', 0)
    sysinfo['using_proxy'] = bool(proxy_host)

    if boto.config.get('Boto', 'proxy_rdns', False):
      self.logger.info('DNS lookups are disallowed in this environment, so '
                       'some information is not included in this perfdiag run.')

    # Get the local IP address from socket lib.
    try:
      sysinfo['ip_address'] = socket.gethostbyname(socket.gethostname())
    except socket_errors:
      sysinfo['ip_address'] = ''
    # Record the temporary directory used since it can affect performance, e.g.
    # when on a networked filesystem.
    sysinfo['tempdir'] = self.directory

    # Produces an RFC 2822 compliant GMT timestamp.
    sysinfo['gmt_timestamp'] = time.strftime('%a, %d %b %Y %H:%M:%S +0000',
                                             time.gmtime())

    # Execute a CNAME lookup on Google DNS to find what Google server
    # it's routing to.
    cmd = ['nslookup', '-type=CNAME', self.XML_API_HOST]
    try:
      nslookup_cname_output = self._Exec(cmd, return_output=True)
      m = re.search(r' = (?P<googserv>[^.]+)\.', nslookup_cname_output)
      sysinfo['googserv_route'] = m.group('googserv') if m else None
    except (CommandException, OSError):
      sysinfo['googserv_route'] = ''

    # Try to determine the latency of a DNS lookup for the Google hostname
    # endpoint. Note: we don't piggyback on gethostbyname_ex below because
    # the _ex version requires an extra RTT.
    try:
      t0 = time.time()
      socket.gethostbyname(self.XML_API_HOST)
      t1 = time.time()
      sysinfo['google_host_dns_latency'] = t1 - t0
    except socket_errors:
      pass

    # Look up IP addresses for Google Server.
    try:
      (hostname, _, ipaddrlist) = socket.gethostbyname_ex(self.XML_API_HOST)
      sysinfo['googserv_ips'] = ipaddrlist
    except socket_errors:
      ipaddrlist = []
      sysinfo['googserv_ips'] = []

    # Reverse lookup the hostnames for the Google Server IPs.
    sysinfo['googserv_hostnames'] = []
    for googserv_ip in ipaddrlist:
      try:
        (hostname, _, ipaddrlist) = socket.gethostbyaddr(googserv_ip)
        sysinfo['googserv_hostnames'].append(hostname)
      except socket_errors:
        pass

    # Query o-o to find out what the Google DNS thinks is the user's IP.
    try:
      cmd = ['nslookup', '-type=TXT', 'o-o.myaddr.google.com.']
      nslookup_txt_output = self._Exec(cmd, return_output=True)
      m = re.search(r'text\s+=\s+"(?P<dnsip>[\.\d]+)"', nslookup_txt_output)
      sysinfo['dns_o-o_ip'] = m.group('dnsip') if m else None
    except (CommandException, OSError):
      sysinfo['dns_o-o_ip'] = ''

    # Try to determine the latency of connecting to the Google hostname
    # endpoint.
    sysinfo['google_host_connect_latencies'] = {}
    for googserv_ip in ipaddrlist:
      try:
        sock = socket.socket()
        t0 = time.time()
        sock.connect((googserv_ip, self.XML_API_PORT))
        t1 = time.time()
        sysinfo['google_host_connect_latencies'][googserv_ip] = t1 - t0
      except socket_errors:
        pass

    # If using a proxy, try to determine the latency of a DNS lookup to resolve
    # the proxy hostname and the latency of connecting to the proxy.
    if proxy_host:
      proxy_ip = None
      try:
        t0 = time.time()
        proxy_ip = socket.gethostbyname(proxy_host)
        t1 = time.time()
        sysinfo['proxy_dns_latency'] = t1 - t0
      except socket_errors:
        pass

      try:
        sock = socket.socket()
        t0 = time.time()
        sock.connect((proxy_ip or proxy_host, proxy_port))
        t1 = time.time()
        sysinfo['proxy_host_connect_latency'] = t1 - t0
      except socket_errors:
        pass

    # Try and find the number of CPUs in the system if available.
    try:
      sysinfo['cpu_count'] = multiprocessing.cpu_count()
    except NotImplementedError:
      sysinfo['cpu_count'] = None

    # For *nix platforms, obtain the CPU load.
    try:
      sysinfo['load_avg'] = list(os.getloadavg())
    except (AttributeError, OSError):
      sysinfo['load_avg'] = None

    # Try and collect memory information from /proc/meminfo if possible.
    mem_total = None
    mem_free = None
    mem_buffers = None
    mem_cached = None

    try:
      with open('/proc/meminfo', 'r') as f:
        for line in f:
          if line.startswith('MemTotal'):
            mem_total = (int(''.join(c for c in line if c in string.digits))
                         * 1000)
          elif line.startswith('MemFree'):
            mem_free = (int(''.join(c for c in line if c in string.digits))
                        * 1000)
          elif line.startswith('Buffers'):
            mem_buffers = (int(''.join(c for c in line if c in string.digits))
                           * 1000)
          elif line.startswith('Cached'):
            mem_cached = (int(''.join(c for c in line if c in string.digits))
                          * 1000)
    except (IOError, ValueError):
      pass

    sysinfo['meminfo'] = {'mem_total': mem_total,
                          'mem_free': mem_free,
                          'mem_buffers': mem_buffers,
                          'mem_cached': mem_cached}

    # Get configuration attributes from config module.
    sysinfo['gsutil_config'] = {}
    for attr in dir(config):
      attr_value = getattr(config, attr)
      # Filter out multiline strings that are not useful.
      if attr.isupper() and not (isinstance(attr_value, basestring) and
                                 '\n' in attr_value):
        sysinfo['gsutil_config'][attr] = attr_value

    sysinfo['tcp_proc_values'] = {}
    stats_to_check = [
        '/proc/sys/net/core/rmem_default',
        '/proc/sys/net/core/rmem_max',
        '/proc/sys/net/core/wmem_default',
        '/proc/sys/net/core/wmem_max',
        '/proc/sys/net/ipv4/tcp_timestamps',
        '/proc/sys/net/ipv4/tcp_sack',
        '/proc/sys/net/ipv4/tcp_window_scaling',
    ]
    for fname in stats_to_check:
      try:
        with open(fname, 'r') as f:
          value = f.read()
        sysinfo['tcp_proc_values'][os.path.basename(fname)] = value.strip()
      except IOError:
        pass

    self.results['sysinfo'] = sysinfo

  def _DisplayStats(self, trials):
    """Prints out mean, standard deviation, median, and 90th percentile."""
    n = len(trials)
    mean = float(sum(trials)) / n
    stdev = math.sqrt(sum((x - mean)**2 for x in trials) / n)

    print str(n).rjust(6), '',
    print ('%.1f' % (mean * 1000)).rjust(9), '',
    print ('%.1f' % (stdev * 1000)).rjust(12), '',
    print ('%.1f' % (Percentile(trials, 0.5) * 1000)).rjust(11), '',
    print ('%.1f' % (Percentile(trials, 0.9) * 1000)).rjust(11), ''

  def _DisplayResults(self):
    """Displays results collected from diagnostic run."""
    print
    print '=' * 78
    print 'DIAGNOSTIC RESULTS'.center(78)
    print '=' * 78

    if 'latency' in self.results:
      print
      print '-' * 78
      print 'Latency'.center(78)
      print '-' * 78
      print ('Operation       Size  Trials  Mean (ms)  Std Dev (ms)  '
             'Median (ms)  90th % (ms)')
      print ('=========  =========  ======  =========  ============  '
             '===========  ===========')
      for key in sorted(self.results['latency']):
        trials = sorted(self.results['latency'][key])
        op, numbytes = key.split('_')
        numbytes = int(numbytes)
        if op == 'METADATA':
          print 'Metadata'.rjust(9), '',
          print MakeHumanReadable(numbytes).rjust(9), '',
          self._DisplayStats(trials)
        if op == 'DOWNLOAD':
          print 'Download'.rjust(9), '',
          print MakeHumanReadable(numbytes).rjust(9), '',
          self._DisplayStats(trials)
        if op == 'UPLOAD':
          print 'Upload'.rjust(9), '',
          print MakeHumanReadable(numbytes).rjust(9), '',
          self._DisplayStats(trials)
        if op == 'DELETE':
          print 'Delete'.rjust(9), '',
          print MakeHumanReadable(numbytes).rjust(9), '',
          self._DisplayStats(trials)

    if 'write_throughput' in self.results:
      print
      print '-' * 78
      print 'Write Throughput'.center(78)
      print '-' * 78
      write_thru = self.results['write_throughput']
      print 'Copied %s %s file(s) for a total transfer size of %s.' % (
          self.num_objects,
          MakeHumanReadable(write_thru['file_size']),
          MakeHumanReadable(write_thru['total_bytes_copied']))
      print 'Write throughput: %s/s.' % (
          MakeBitsHumanReadable(write_thru['bytes_per_second'] * 8))
      print 'Parallelism strategy: %s' % write_thru['parallelism']

    if 'write_throughput_file' in self.results:
      print
      print '-' * 78
      print 'Write Throughput With File I/O'.center(78)
      print '-' * 78
      write_thru_file = self.results['write_throughput_file']
      print 'Copied %s %s file(s) for a total transfer size of %s.' % (
          self.num_objects,
          MakeHumanReadable(write_thru_file['file_size']),
          MakeHumanReadable(write_thru_file['total_bytes_copied']))
      print 'Write throughput: %s/s.' % (
          MakeBitsHumanReadable(write_thru_file['bytes_per_second'] * 8))
      print 'Parallelism strategy: %s' % write_thru_file['parallelism']

    if 'read_throughput' in self.results:
      print
      print '-' * 78
      print 'Read Throughput'.center(78)
      print '-' * 78
      read_thru = self.results['read_throughput']
      print 'Copied %s %s file(s) for a total transfer size of %s.' % (
          self.num_objects,
          MakeHumanReadable(read_thru['file_size']),
          MakeHumanReadable(read_thru['total_bytes_copied']))
      print 'Read throughput: %s/s.' % (
          MakeBitsHumanReadable(read_thru['bytes_per_second'] * 8))
      print 'Parallelism strategy: %s' % read_thru['parallelism']

    if 'read_throughput_file' in self.results:
      print
      print '-' * 78
      print 'Read Throughput With File I/O'.center(78)
      print '-' * 78
      read_thru_file = self.results['read_throughput_file']
      print 'Copied %s %s file(s) for a total transfer size of %s.' % (
          self.num_objects,
          MakeHumanReadable(read_thru_file['file_size']),
          MakeHumanReadable(read_thru_file['total_bytes_copied']))
      print 'Read throughput: %s/s.' % (
          MakeBitsHumanReadable(read_thru_file['bytes_per_second'] * 8))
      print 'Parallelism strategy: %s' % read_thru_file['parallelism']

    if 'listing' in self.results:
      print
      print '-' * 78
      print 'Listing'.center(78)
      print '-' * 78

      listing = self.results['listing']
      insert = listing['insert']
      delete = listing['delete']
      print 'After inserting %s objects:' % listing['num_files']
      print ('  Total time for objects to appear: %.2g seconds' %
             insert['time_took'])
      print '  Number of listing calls made: %s' % insert['num_listing_calls']
      print ('  Individual listing call latencies: [%s]' %
             ', '.join('%.2gs' % lat for lat in insert['list_latencies']))
      print ('  Files reflected after each call: [%s]' %
             ', '.join(map(str, insert['files_seen_after_listing'])))

      print 'After deleting %s objects:' % listing['num_files']
      print ('  Total time for objects to appear: %.2g seconds' %
             delete['time_took'])
      print '  Number of listing calls made: %s' % delete['num_listing_calls']
      print ('  Individual listing call latencies: [%s]' %
             ', '.join('%.2gs' % lat for lat in delete['list_latencies']))
      print ('  Files reflected after each call: [%s]' %
             ', '.join(map(str, delete['files_seen_after_listing'])))

    if 'sysinfo' in self.results:
      print
      print '-' * 78
      print 'System Information'.center(78)
      print '-' * 78
      info = self.results['sysinfo']
      print 'IP Address: \n  %s' % info['ip_address']
      print 'Temporary Directory: \n  %s' % info['tempdir']
      print 'Bucket URI: \n  %s' % self.results['bucket_uri']
      print 'gsutil Version: \n  %s' % self.results.get('gsutil_version',
                                                        'Unknown')
      print 'boto Version: \n  %s' % self.results.get('boto_version', 'Unknown')

      if 'gmt_timestamp' in info:
        ts_string = info['gmt_timestamp']
        timetuple = None
        try:
          # Convert RFC 2822 string to Linux timestamp.
          timetuple = time.strptime(ts_string, '%a, %d %b %Y %H:%M:%S +0000')
        except ValueError:
          pass

        if timetuple:
          # Converts the GMT time tuple to local Linux timestamp.
          localtime = calendar.timegm(timetuple)
          localdt = datetime.datetime.fromtimestamp(localtime)
          print 'Measurement time: \n %s' % localdt.strftime(
              '%Y-%m-%d %I:%M:%S %p %Z')

      print 'Google Server: \n  %s' % info['googserv_route']
      print ('Google Server IP Addresses: \n  %s' %
             ('\n  '.join(info['googserv_ips'])))
      print ('Google Server Hostnames: \n  %s' %
             ('\n  '.join(info['googserv_hostnames'])))
      print 'Google DNS thinks your IP is: \n  %s' % info['dns_o-o_ip']
      print 'CPU Count: \n  %s' % info['cpu_count']
      print 'CPU Load Average: \n  %s' % info['load_avg']
      try:
        print ('Total Memory: \n  %s' %
               MakeHumanReadable(info['meminfo']['mem_total']))
        # Free memory is really MemFree + Buffers + Cached.
        print 'Free Memory: \n  %s' % MakeHumanReadable(
            info['meminfo']['mem_free'] +
            info['meminfo']['mem_buffers'] +
            info['meminfo']['mem_cached'])
      except TypeError:
        pass

      if 'netstat_end' in info and 'netstat_start' in info:
        netstat_after = info['netstat_end']
        netstat_before = info['netstat_start']
        for tcp_type in ('sent', 'received', 'retransmit'):
          try:
            delta = (netstat_after['tcp_%s' % tcp_type] -
                     netstat_before['tcp_%s' % tcp_type])
            print 'TCP segments %s during test:\n  %d' % (tcp_type, delta)
          except TypeError:
            pass
      else:
        print ('TCP segment counts not available because "netstat" was not '
               'found during test runs')

      if 'disk_counters_end' in info and 'disk_counters_start' in info:
        print 'Disk Counter Deltas:\n',
        disk_after = info['disk_counters_end']
        disk_before = info['disk_counters_start']
        print '', 'disk'.rjust(6),
        for colname in ['reads', 'writes', 'rbytes', 'wbytes', 'rtime',
                        'wtime']:
          print colname.rjust(8),
        print
        for diskname in sorted(disk_after):
          before = disk_before[diskname]
          after = disk_after[diskname]
          (reads1, writes1, rbytes1, wbytes1, rtime1, wtime1) = before
          (reads2, writes2, rbytes2, wbytes2, rtime2, wtime2) = after
          print '', diskname.rjust(6),
          deltas = [reads2-reads1, writes2-writes1, rbytes2-rbytes1,
                    wbytes2-wbytes1, rtime2-rtime1, wtime2-wtime1]
          for delta in deltas:
            print str(delta).rjust(8),
          print

      if 'tcp_proc_values' in info:
        print 'TCP /proc values:\n',
        for item in info['tcp_proc_values'].iteritems():
          print '   %s = %s' % item

      if 'boto_https_enabled' in info:
        print 'Boto HTTPS Enabled: \n  %s' % info['boto_https_enabled']

      if 'using_proxy' in info:
        print 'Requests routed through proxy: \n  %s' % info['using_proxy']

      if 'google_host_dns_latency' in info:
        print ('Latency of the DNS lookup for Google Storage server (ms): '
               '\n  %.1f' % (info['google_host_dns_latency'] * 1000.0))

      if 'google_host_connect_latencies' in info:
        print 'Latencies connecting to Google Storage server IPs (ms):'
        for ip, latency in info['google_host_connect_latencies'].iteritems():
          print '  %s = %.1f' % (ip, latency * 1000.0)

      if 'proxy_dns_latency' in info:
        print ('Latency of the DNS lookup for the configured proxy (ms): '
               '\n  %.1f' % (info['proxy_dns_latency'] * 1000.0))

      if 'proxy_host_connect_latency' in info:
        print ('Latency connecting to the configured proxy (ms): \n  %.1f' %
               (info['proxy_host_connect_latency'] * 1000.0))

    if 'request_errors' in self.results and 'total_requests' in self.results:
      print
      print '-' * 78
      print 'In-Process HTTP Statistics'.center(78)
      print '-' * 78
      total = int(self.results['total_requests'])
      numerrors = int(self.results['request_errors'])
      numbreaks = int(self.results['connection_breaks'])
      availability = (((total - numerrors) / float(total)) * 100
                      if total > 0 else 100)
      print 'Total HTTP requests made: %d' % total
      print 'HTTP 5xx errors: %d' % numerrors
      print 'HTTP connections broken: %d' % numbreaks
      print 'Availability: %.7g%%' % availability
      if 'error_responses_by_code' in self.results:
        sorted_codes = sorted(
            self.results['error_responses_by_code'].iteritems())
        if sorted_codes:
          print 'Error responses by code:'
          print '\n'.join('  %s: %s' % c for c in sorted_codes)

    if self.output_file:
      with open(self.output_file, 'w') as f:
        json.dump(self.results, f, indent=2)
      print
      print "Output file written to '%s'." % self.output_file

    print

  def _ParsePositiveInteger(self, val, msg):
    """Tries to convert val argument to a positive integer.

    Args:
      val: The value (as a string) to convert to a positive integer.
      msg: The error message to place in the CommandException on an error.

    Returns:
      A valid positive integer.

    Raises:
      CommandException: If the supplied value is not a valid positive integer.
    """
    try:
      val = int(val)
      if val < 1:
        raise CommandException(msg)
      return val
    except ValueError:
      raise CommandException(msg)

  def _ParseArgs(self):
    """Parses arguments for perfdiag command."""
    # From -n.
    self.num_objects = 5
    # From -c.
    self.processes = 1
    # From -k.
    self.threads = 1
    # From -p
    self.parallel_strategy = None
    # From -y
    self.num_slices = 4
    # From -s.
    self.thru_filesize = 1048576
    # From -d.
    self.directory = tempfile.gettempdir()
    # Keep track of whether or not to delete the directory upon completion.
    self.delete_directory = False
    # From -t.
    self.diag_tests = set(self.DEFAULT_DIAG_TESTS)
    # From -o.
    self.output_file = None
    # From -i.
    self.input_file = None
    # From -m.
    self.metadata_keys = {}

    if self.sub_opts:
      for o, a in self.sub_opts:
        if o == '-n':
          self.num_objects = self._ParsePositiveInteger(
              a, 'The -n parameter must be a positive integer.')
        if o == '-c':
          self.processes = self._ParsePositiveInteger(
              a, 'The -c parameter must be a positive integer.')
        if o == '-k':
          self.threads = self._ParsePositiveInteger(
              a, 'The -k parameter must be a positive integer.')
        if o == '-p':
          if a.lower() in self.PARALLEL_STRATEGIES:
            self.parallel_strategy = a.lower()
          else:
            raise CommandException(
                "'%s' is not a valid parallelism strategy." % a)
        if o == '-y':
          self.num_slices = self._ParsePositiveInteger(
              a, 'The -y parameter must be a positive integer.')
        if o == '-s':
          try:
            self.thru_filesize = HumanReadableToBytes(a)
          except ValueError:
            raise CommandException('Invalid -s parameter.')
        if o == '-d':
          self.directory = a
          if not os.path.exists(self.directory):
            self.delete_directory = True
            os.makedirs(self.directory)
        if o == '-t':
          self.diag_tests = set()
          for test_name in a.strip().split(','):
            if test_name.lower() not in self.ALL_DIAG_TESTS:
              raise CommandException("List of test names (-t) contains invalid "
                                     "test name '%s'." % test_name)
            self.diag_tests.add(test_name)
        if o == '-m':
          pieces = a.split(':')
          if len(pieces) != 2:
            raise CommandException(
                "Invalid metadata key-value combination '%s'." % a)
          key, value = pieces
          self.metadata_keys[key] = value
        if o == '-o':
          self.output_file = os.path.abspath(a)
        if o == '-i':
          self.input_file = os.path.abspath(a)
          if not os.path.isfile(self.input_file):
            raise CommandException("Invalid input file (-i): '%s'." % a)
          try:
            with open(self.input_file, 'r') as f:
              self.results = json.load(f)
              self.logger.info("Read input file: '%s'.", self.input_file)
          except ValueError:
            raise CommandException("Could not decode input file (-i): '%s'." %
                                   a)
          return

    # If parallelism is specified, default parallelism strategy to fan.
    if (self.processes > 1 or self.threads > 1) and not self.parallel_strategy:
      self.parallel_strategy = self.FAN
    elif self.processes == 1 and self.threads == 1 and self.parallel_strategy:
      raise CommandException(
          'Cannot specify parallelism strategy (-p) without also specifying '
          'multiple threads and/or processes (-c and/or -k).')

    if not self.args:
      self.RaiseWrongNumberOfArgumentsException()

    self.bucket_url = StorageUrlFromString(self.args[0])
    self.provider = self.bucket_url.scheme
    if not self.bucket_url.IsCloudUrl() and self.bucket_url.IsBucket():
      raise CommandException('The perfdiag command requires a URL that '
                             'specifies a bucket.\n"%s" is not '
                             'valid.' % self.args[0])

    if (self.thru_filesize > HumanReadableToBytes('2GiB') and
        (self.RTHRU in self.diag_tests or self.WTHRU in self.diag_tests)):
      raise CommandException(
          'For in-memory tests maximum file size is 2GiB. For larger file '
          'sizes, specify rthru_file and/or wthru_file with the -t option.')

    perform_slice = self.parallel_strategy in (self.SLICE, self.BOTH)
    slice_not_available = (
        self.provider == 's3' and self.diag_tests.intersection(self.WTHRU,
                                                               self.WTHRU_FILE))
    if perform_slice and slice_not_available:
      raise CommandException('Sliced uploads are not available for s3. '
                             'Use -p fan or sequential uploads for s3.')

    # Ensure the bucket exists.
    self.gsutil_api.GetBucket(self.bucket_url.bucket_name,
                              provider=self.bucket_url.scheme,
                              fields=['id'])
    self.exceptions = [httplib.HTTPException, socket.error, socket.gaierror,
                       socket.timeout, httplib.BadStatusLine,
                       ServiceException]

  # Command entry point.
  def RunCommand(self):
    """Called by gsutil when the command is being invoked."""
    self._ParseArgs()

    if self.input_file:
      self._DisplayResults()
      return 0

    # We turn off retries in the underlying boto library because the
    # _RunOperation function handles errors manually so it can count them.
    boto.config.set('Boto', 'num_retries', '0')

    self.logger.info(
        'Number of iterations to run: %d\n'
        'Base bucket URI: %s\n'
        'Number of processes: %d\n'
        'Number of threads: %d\n'
        'Parallelism strategy: %s\n'
        'Throughput file size: %s\n'
        'Diagnostics to run: %s',
        self.num_objects,
        self.bucket_url,
        self.processes,
        self.threads,
        self.parallel_strategy,
        MakeHumanReadable(self.thru_filesize),
        (', '.join(self.diag_tests)))

    try:
      self._SetUp()

      # Collect generic system info.
      self._CollectSysInfo()
      # Collect netstat info and disk counters before tests (and again later).
      netstat_output = self._GetTcpStats()
      if netstat_output:
        self.results['sysinfo']['netstat_start'] = netstat_output
      if IS_LINUX:
        self.results['sysinfo']['disk_counters_start'] = self._GetDiskCounters()
      # Record bucket URL.
      self.results['bucket_uri'] = str(self.bucket_url)
      self.results['json_format'] = 'perfdiag'
      self.results['metadata'] = self.metadata_keys

      if self.LAT in self.diag_tests:
        self._RunLatencyTests()
      if self.RTHRU in self.diag_tests:
        self._RunReadThruTests()
      # Run WTHRU_FILE before RTHRU_FILE. If data is created in WTHRU_FILE it
      # will be used in RTHRU_FILE to save time and bandwidth.
      if self.WTHRU_FILE in self.diag_tests:
        self._RunWriteThruTests(use_file=True)
      if self.RTHRU_FILE in self.diag_tests:
        self._RunReadThruTests(use_file=True)
      if self.WTHRU in self.diag_tests:
        self._RunWriteThruTests()
      if self.LIST in self.diag_tests:
        self._RunListTests()

      # Collect netstat info and disk counters after tests.
      netstat_output = self._GetTcpStats()
      if netstat_output:
        self.results['sysinfo']['netstat_end'] = netstat_output
      if IS_LINUX:
        self.results['sysinfo']['disk_counters_end'] = self._GetDiskCounters()

      self.results['total_requests'] = self.total_requests
      self.results['request_errors'] = self.request_errors
      self.results['error_responses_by_code'] = self.error_responses_by_code
      self.results['connection_breaks'] = self.connection_breaks
      self.results['gsutil_version'] = gslib.VERSION
      self.results['boto_version'] = boto.__version__

      self._TearDown()
      self._DisplayResults()
    finally:
      # TODO: Install signal handlers so this is performed in response to a
      # terminating signal; consider multi-threaded object deletes during
      # cleanup so it happens quickly.
      self._TearDown()

    return 0


def StorageUrlToUploadObjectMetadata(storage_url):
  if storage_url.IsCloudUrl() and storage_url.IsObject():
    upload_target = apitools_messages.Object()
    upload_target.name = storage_url.object_name
    upload_target.bucket = storage_url.bucket_name
    return upload_target
  else:
    raise CommandException('Non-cloud URL upload target %s was created in '
                           'perfdiag implemenation.' % storage_url)