# -*- coding: utf-8 -*-
# Copyright 2014 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Implementation of Unix-like rsync command."""
from __future__ import absolute_import
import errno
import heapq
import io
from itertools import islice
import os
import re
import tempfile
import textwrap
import traceback
import urllib
from boto import config
import crcmod
from gslib import copy_helper
from gslib.bucket_listing_ref import BucketListingObject
from gslib.cloud_api import NotFoundException
from gslib.command import Command
from gslib.command import DummyArgChecker
from gslib.command_argument import CommandArgument
from gslib.copy_helper import CreateCopyHelperOpts
from gslib.copy_helper import SkipUnsupportedObjectError
from gslib.cs_api_map import ApiSelector
from gslib.exception import CommandException
from gslib.hashing_helper import CalculateB64EncodedCrc32cFromContents
from gslib.hashing_helper import CalculateB64EncodedMd5FromContents
from gslib.hashing_helper import SLOW_CRCMOD_WARNING
from gslib.plurality_checkable_iterator import PluralityCheckableIterator
from gslib.sig_handling import GetCaughtSignals
from gslib.sig_handling import RegisterSignalHandler
from gslib.storage_url import StorageUrlFromString
from gslib.util import GetCloudApiInstance
from gslib.util import IsCloudSubdirPlaceholder
from gslib.util import TEN_MIB
from gslib.util import UsingCrcmodExtension
from gslib.util import UTF8
from gslib.wildcard_iterator import CreateWildcardIterator
_SYNOPSIS = """
gsutil rsync [-c] [-C] [-d] [-e] [-n] [-p] [-r] [-U] [-x] src_url dst_url
"""
_DETAILED_HELP_TEXT = ("""
<B>SYNOPSIS</B>
""" + _SYNOPSIS + """
<B>DESCRIPTION</B>
The gsutil rsync command makes the contents under dst_url the same as the
contents under src_url, by copying any missing files/objects, and (if the
-d option is specified) deleting any extra files/objects. For example, to
make gs://mybucket/data match the contents of the local directory "data"
you could do:
gsutil rsync -d data gs://mybucket/data
To recurse into directories use the -r option:
gsutil rsync -d -r data gs://mybucket/data
To copy only new/changed files without deleting extra files from
gs://mybucket/data leave off the -d option:
gsutil rsync -r data gs://mybucket/data
If you have a large number of objects to synchronize you might want to use the
gsutil -m option, to perform parallel (multi-threaded/multi-processing)
synchronization:
gsutil -m rsync -d -r data gs://mybucket/data
The -m option typically will provide a large performance boost if either the
source or destination (or both) is a cloud URL. If both source and
destination are file URLs the -m option will typically thrash the disk and
slow synchronization down.
To make the local directory "data" the same as the contents of
gs://mybucket/data:
gsutil rsync -d -r gs://mybucket/data data
To make the contents of gs://mybucket2 the same as gs://mybucket1:
gsutil rsync -d -r gs://mybucket1 gs://mybucket2
You can also mirror data across local directories:
gsutil rsync -d -r dir1 dir2
To mirror your content across clouds:
gsutil rsync -d -r gs://my-gs-bucket s3://my-s3-bucket
Note: If you are synchronizing a large amount of data between clouds you might
consider setting up a
`Google Compute Engine <https://cloud.google.com/products/compute-engine>`_
account and running gsutil there. Since cross-provider gsutil data transfers
flow through the machine where gsutil is running, doing this can make your
transfer run significantly faster than running gsutil on your local
workstation.
<B>BE CAREFUL WHEN USING -d OPTION!</B>
The rsync -d option is very useful and commonly used, because it provides a
means of making the contents of a destination bucket or directory match those
of a source bucket or directory. However, please exercise caution when you
use this option: It's possible to delete large amounts of data accidentally
if, for example, you erroneously reverse source and destination. For example,
if you meant to synchronize a local directory from a bucket in the cloud but
instead run the command:
gsutil -m rsync -r -d ./your-dir gs://your-bucket
and your-dir is currently empty, you will quickly delete all of the objects in
gs://your-bucket.
You can also cause large amounts of data to be lost quickly by specifying a
subdirectory of the destination as the source of an rsync. For example, the
command:
gsutil -m rsync -r -d gs://your-bucket/data gs://your-bucket
would cause most or all of the objects in gs://your-bucket to be deleted
(some objects may survive if there are any with names that sort lower than
"data" under gs://your-bucket/data).
In addition to paying careful attention to the source and destination you
specify with the rsync command, there are two more safety measures your can
take when using gsutil rsync -d:
1. Try running the command with the rsync -n option first, to see what it
would do without actually performing the operations. For example, if
you run the command:
gsutil -m rsync -r -d -n gs://your-bucket/data gs://your-bucket
it will be immediately evident that running that command without the -n
option would cause many objects to be deleted.
2. Enable object versioning in your bucket, which will allow you to restore
objects if you accidentally delete them. For more details see
"gsutil help versions".
<B>IMPACT OF BUCKET LISTING EVENTUAL CONSISTENCY</B>
The rsync command operates by listing the source and destination URLs, and
then performing copy and remove operations according to the differences
between these listings. Because bucket listing is eventually (not strongly)
consistent, if you upload new objects or delete objects from a bucket and then
immediately run gsutil rsync with that bucket as the source or destination,
it's possible the rsync command will not see the recent updates and thus
synchronize incorrectly. You can rerun the rsync operation again later to
correct the incorrect synchronization.
<B>CHECKSUM VALIDATION AND FAILURE HANDLING</B>
At the end of every upload or download, the gsutil rsync command validates
that the checksum of the source file/object matches the checksum of the
destination file/object. If the checksums do not match, gsutil will delete
the invalid copy and print a warning message. This very rarely happens, but
if it does, please contact gs-team@google.com.
The rsync command will retry when failures occur, but if enough failures
happen during a particular copy or delete operation the command will skip that
object and move on. At the end of the synchronization run if any failures were
not successfully retried, the rsync command will report the count of failures,
and exit with non-zero status. At this point you can run the rsync command
again, and it will attempt any remaining needed copy and/or delete operations.
Note that there are cases where retrying will never succeed, such as if you
don't have write permission to the destination bucket or if the destination
path for some objects is longer than the maximum allowed length.
For more details about gsutil's retry handling, please see
"gsutil help retries".
<B>CHANGE DETECTION ALGORITHM</B>
To determine if a file or object has changed gsutil rsync first checks whether
the source and destination sizes match. If they match, it next checks if their
checksums match, using checksums if available (see below). Unlike the Unix
rsync command, gsutil rsync does not use timestamps to determine if the
file/object changed, because the GCS API does not permit the caller to set an
object's timestamp (hence, timestamps of identical files/objects cannot be
made to match).
Checksums will not be available in two cases:
1. When synchronizing to or from a file system. By default, gsutil does not
checksum files, because of the slowdown caused when working with large
files. You can cause gsutil to checksum files by using the gsutil rsync -c
option, at the cost of increased local disk I/O and run time when working
with large files. You should consider using the -c option if your files can
change without changing sizes (e.g., if you have files that contain fixed
width data, such as timestamps).
2. When comparing composite GCS objects with objects at a cloud provider that
does not support CRC32C (which is the only checksum available for composite
objects). See 'gsutil help compose' for details about composite objects.
<B>COPYING IN THE CLOUD AND METADATA PRESERVATION</B>
If both the source and destination URL are cloud URLs from the same provider,
gsutil copies data "in the cloud" (i.e., without downloading to and uploading
from the machine where you run gsutil). In addition to the performance and
cost advantages of doing this, copying in the cloud preserves metadata (like
Content-Type and Cache-Control). In contrast, when you download data from the
cloud it ends up in a file, which has no associated metadata. Thus, unless you
have some way to hold on to or re-create that metadata, synchronizing a bucket
to a directory in the local file system will not retain the metadata.
Note that by default, the gsutil rsync command does not copy the ACLs of
objects being synchronized and instead will use the default bucket ACL (see
"gsutil help defacl"). You can override this behavior with the -p option (see
OPTIONS below).
<B>SLOW CHECKSUMS</B>
If you find that CRC32C checksum computation runs slowly, this is likely
because you don't have a compiled CRC32c on your system. Try running:
gsutil ver -l
If the output contains:
compiled crcmod: False
you are running a Python library for computing CRC32C, which is much slower
than using the compiled code. For information on getting a compiled CRC32C
implementation, see 'gsutil help crc32c'.
<B>LIMITATIONS</B>
1. The gsutil rsync command doesn't make the destination object's timestamps
match those of the source object (it can't; timestamp setting is not
allowed by the GCS API).
2. The gsutil rsync command considers only the current object generations in
the source and destination buckets when deciding what to copy / delete. If
versioning is enabled in the destination bucket then gsutil rsync's
overwriting or deleting objects will end up creating versions, but the
command doesn't try to make the archived generations match in the source
and destination buckets.
<B>OPTIONS</B>
-c Causes the rsync command to compute checksums for files if the
size of source and destination match, and then compare
checksums. This option increases local disk I/O and run time
if either src_url or dst_url are on the local file system.
-C If an error occurs, continue to attempt to copy the remaining
files. If errors occurred, gsutil's exit status will be non-zero
even if this flag is set. This option is implicitly set when
running "gsutil -m rsync...". Note: -C only applies to the
actual copying operation. If an error occurs while iterating
over the files in the local directory (e.g., invalid Unicode
file name) gsutil will print an error message and abort.
-d Delete extra files under dst_url not found under src_url. By
default extra files are not deleted. Note: this option can
delete data quickly if you specify the wrong source/destination
combination. See the help section above,
"BE CAREFUL WHEN USING -d OPTION!".
-e Exclude symlinks. When specified, symbolic links will be
ignored.
-n Causes rsync to run in "dry run" mode, i.e., just outputting
what would be copied or deleted without actually doing any
copying/deleting.
-p Causes ACLs to be preserved when synchronizing in the cloud.
Note that this option has performance and cost implications when
using the XML API, as it requires separate HTTP calls for
interacting with ACLs. The performance issue can be mitigated to
some degree by using gsutil -m rsync to cause parallel
synchronization. Also, this option only works if you have OWNER
access to all of the objects that are copied.
You can avoid the additional performance and cost of using
rsync -p if you want all objects in the destination bucket to
end up with the same ACL by setting a default object ACL on that
bucket instead of using rsync -p. See 'help gsutil defacl'.
-R, -r Causes directories, buckets, and bucket subdirectories to be
synchronized recursively. If you neglect to use this option
gsutil will make only the top-level directory in the source
and destination URLs match, skipping any sub-directories.
-U Skip objects with unsupported object types instead of failing.
Unsupported object types are Amazon S3 Objects in the GLACIER
storage class.
-x pattern Causes files/objects matching pattern to be excluded, i.e., any
matching files/objects will not be copied or deleted. Note that
the pattern is a Python regular expression, not a wildcard (so,
matching any string ending in 'abc' would be specified using
'.*abc' rather than '*abc'). Note also that the exclude path is
always relative (similar to Unix rsync or tar exclude options).
For example, if you run the command:
gsutil rsync -x 'data./.*\\.txt' dir gs://my-bucket
it will skip the file dir/data1/a.txt.
You can use regex alternation to specify multiple exclusions,
for example:
gsutil rsync -x '.*\\.txt|.*\\.jpg' dir gs://my-bucket
""")
class _DiffAction(object):
COPY = 'copy'
REMOVE = 'remove'
_NA = '-'
_OUTPUT_BUFFER_SIZE = 64 * 1024
_PROGRESS_REPORT_LISTING_COUNT = 10000
# Tracks files we need to clean up at end or if interrupted.
_tmp_files = []
# pylint: disable=unused-argument
def _HandleSignals(signal_num, cur_stack_frame):
"""Called when rsync command is killed with SIGINT, SIGQUIT or SIGTERM."""
CleanUpTempFiles()
def CleanUpTempFiles():
"""Cleans up temp files.
This function allows the main (RunCommand) function to clean up at end of
operation, or if gsutil rsync is interrupted (e.g., via ^C). This is necessary
because tempfile.NamedTemporaryFile doesn't allow the created file to be
re-opened in read mode on Windows, so we have to use tempfile.mkstemp, which
doesn't automatically delete temp files.
"""
try:
for fname in _tmp_files:
os.unlink(fname)
except: # pylint: disable=bare-except
pass
class _DiffToApply(object):
"""Class that encapsulates info needed to apply diff for one object."""
def __init__(self, src_url_str, dst_url_str, diff_action):
"""Constructor.
Args:
src_url_str: The source URL string, or None if diff_action is REMOVE.
dst_url_str: The destination URL string.
diff_action: _DiffAction to be applied.
"""
self.src_url_str = src_url_str
self.dst_url_str = dst_url_str
self.diff_action = diff_action
def _DiffToApplyArgChecker(command_instance, diff_to_apply):
"""Arg checker that skips symlinks if -e flag specified."""
if (diff_to_apply.diff_action == _DiffAction.REMOVE
or not command_instance.exclude_symlinks):
# No src URL is populated for REMOVE actions.
return True
exp_src_url = StorageUrlFromString(diff_to_apply.src_url_str)
if exp_src_url.IsFileUrl() and os.path.islink(exp_src_url.object_name):
command_instance.logger.info('Skipping symbolic link %s...', exp_src_url)
return False
return True
def _ComputeNeededFileChecksums(logger, src_url_str, src_size, src_crc32c,
src_md5, dst_url_str, dst_size, dst_crc32c,
dst_md5):
"""Computes any file checksums needed by _ObjectsMatch.
Args:
logger: logging.logger for outputting log messages.
src_url_str: Source URL string.
src_size: Source size
src_crc32c: Source CRC32c.
src_md5: Source MD5.
dst_url_str: Destination URL string.
dst_size: Destination size
dst_crc32c: Destination CRC32c.
dst_md5: Destination MD5.
Returns:
(src_crc32c, src_md5, dst_crc32c, dst_md5)
"""
src_url = StorageUrlFromString(src_url_str)
dst_url = StorageUrlFromString(dst_url_str)
if src_url.IsFileUrl():
if dst_crc32c != _NA or dst_url.IsFileUrl():
if src_size > TEN_MIB:
logger.info('Computing MD5 for %s...', src_url_str)
with open(src_url.object_name, 'rb') as fp:
src_crc32c = CalculateB64EncodedCrc32cFromContents(fp)
elif dst_md5 != _NA or dst_url.IsFileUrl():
if dst_size > TEN_MIB:
logger.info('Computing MD5 for %s...', dst_url_str)
with open(src_url.object_name, 'rb') as fp:
src_md5 = CalculateB64EncodedMd5FromContents(fp)
if dst_url.IsFileUrl():
if src_crc32c != _NA:
if src_size > TEN_MIB:
logger.info('Computing CRC32C for %s...', src_url_str)
with open(dst_url.object_name, 'rb') as fp:
dst_crc32c = CalculateB64EncodedCrc32cFromContents(fp)
elif src_md5 != _NA:
if dst_size > TEN_MIB:
logger.info('Computing CRC32C for %s...', dst_url_str)
with open(dst_url.object_name, 'rb') as fp:
dst_md5 = CalculateB64EncodedMd5FromContents(fp)
return (src_crc32c, src_md5, dst_crc32c, dst_md5)
def _ListUrlRootFunc(cls, args_tuple, thread_state=None):
"""Worker function for listing files/objects under to be sync'd.
Outputs sorted list to out_file_name, formatted per _BuildTmpOutputLine. We
sort the listed URLs because we don't want to depend on consistent sort
order across file systems and cloud providers.
Args:
cls: Command instance.
args_tuple: (base_url_str, out_file_name, desc), where base_url_str is
top-level URL string to list; out_filename is name of file to
which sorted output should be written; desc is 'source' or
'destination'.
thread_state: gsutil Cloud API instance to use.
"""
gsutil_api = GetCloudApiInstance(cls, thread_state=thread_state)
(base_url_str, out_filename, desc) = args_tuple
# We sort while iterating over base_url_str, allowing parallelism of batched
# sorting with collecting the listing.
out_file = io.open(out_filename, mode='w', encoding=UTF8)
try:
_BatchSort(_FieldedListingIterator(cls, gsutil_api, base_url_str, desc),
out_file)
except Exception as e: # pylint: disable=broad-except
# Abandon rsync if an exception percolates up to this layer - retryable
# exceptions are handled in the lower layers, so we got a non-retryable
# exception (like 404 bucket not found) and proceeding would either be
# futile or could result in data loss - for example:
# gsutil rsync -d gs://non-existent-bucket ./localdir
# would delete files from localdir.
cls.logger.error(
'Caught non-retryable exception while listing %s: %s' %
(base_url_str, e))
cls.non_retryable_listing_failures = 1
out_file.close()
def _LocalDirIterator(base_url):
"""A generator that yields a BLR for each file in a local directory.
We use this function instead of WildcardIterator for listing a local
directory without recursion, because the glob.globi implementation called
by WildcardIterator skips "dot" files (which we don't want to do when
synchronizing to or from a local directory).
Args:
base_url: URL for the directory over which to iterate.
Yields:
BucketListingObject for each file in the directory.
"""
for filename in os.listdir(base_url.object_name):
filename = os.path.join(base_url.object_name, filename)
if os.path.isfile(filename):
yield BucketListingObject(StorageUrlFromString(filename), None)
def _FieldedListingIterator(cls, gsutil_api, base_url_str, desc):
"""Iterator over base_url_str formatting output per _BuildTmpOutputLine.
Args:
cls: Command instance.
gsutil_api: gsutil Cloud API instance to use for bucket listing.
base_url_str: The top-level URL string over which to iterate.
desc: 'source' or 'destination'.
Yields:
Output line formatted per _BuildTmpOutputLine.
"""
base_url = StorageUrlFromString(base_url_str)
if base_url.scheme == 'file' and not cls.recursion_requested:
iterator = _LocalDirIterator(base_url)
else:
if cls.recursion_requested:
wildcard = '%s/**' % base_url_str.rstrip('/\\')
else:
wildcard = '%s/*' % base_url_str.rstrip('/\\')
iterator = CreateWildcardIterator(
wildcard, gsutil_api, debug=cls.debug,
project_id=cls.project_id).IterObjects(
# Request just the needed fields, to reduce bandwidth usage.
bucket_listing_fields=['crc32c', 'md5Hash', 'name', 'size'])
i = 0
for blr in iterator:
# Various GUI tools (like the GCS web console) create placeholder objects
# ending with '/' when the user creates an empty directory. Normally these
# tools should delete those placeholders once objects have been written
# "under" the directory, but sometimes the placeholders are left around.
# We need to filter them out here, otherwise if the user tries to rsync
# from GCS to a local directory it will result in a directory/file
# conflict (e.g., trying to download an object called "mydata/" where the
# local directory "mydata" exists).
url = blr.storage_url
if IsCloudSubdirPlaceholder(url, blr=blr):
# We used to output the message 'Skipping cloud sub-directory placeholder
# object...' but we no longer do so because it caused customer confusion.
continue
if (cls.exclude_symlinks and url.IsFileUrl()
and os.path.islink(url.object_name)):
continue
if cls.exclude_pattern:
str_to_check = url.url_string[len(base_url_str):]
if str_to_check.startswith(url.delim):
str_to_check = str_to_check[1:]
if cls.exclude_pattern.match(str_to_check):
continue
i += 1
if i % _PROGRESS_REPORT_LISTING_COUNT == 0:
cls.logger.info('At %s listing %d...', desc, i)
yield _BuildTmpOutputLine(blr)
def _BuildTmpOutputLine(blr):
"""Builds line to output to temp file for given BucketListingRef.
Args:
blr: The BucketListingRef.
Returns:
The output line, formatted as _EncodeUrl(URL)<sp>size<sp>crc32c<sp>md5
where crc32c will only be present for GCS URLs, and md5 will only be
present for cloud URLs that aren't composite objects. A missing field is
populated with '-'.
"""
crc32c = _NA
md5 = _NA
url = blr.storage_url
if url.IsFileUrl():
size = os.path.getsize(url.object_name)
elif url.IsCloudUrl():
size = blr.root_object.size
crc32c = blr.root_object.crc32c or _NA
md5 = blr.root_object.md5Hash or _NA
else:
raise CommandException('Got unexpected URL type (%s)' % url.scheme)
return '%s %d %s %s\n' % (_EncodeUrl(url.url_string), size, crc32c, md5)
def _EncodeUrl(url_string):
"""Encodes url_str with quote plus encoding and UTF8 character encoding.
We use this for all URL encodings.
Args:
url_string: String URL to encode.
Returns:
encoded URL.
"""
return urllib.quote_plus(url_string.encode(UTF8))
def _DecodeUrl(enc_url_string):
"""Inverts encoding from EncodeUrl.
Args:
enc_url_string: String URL to decode.
Returns:
decoded URL.
"""
return urllib.unquote_plus(enc_url_string).decode(UTF8)
# pylint: disable=bare-except
def _BatchSort(in_iter, out_file):
"""Sorts input lines from in_iter and outputs to out_file.
Sorts in batches as input arrives, so input file does not need to be loaded
into memory all at once. Derived from Python Recipe 466302: Sorting big
files the Python 2.4 way by Nicolas Lehuen.
Sorted format is per _BuildTmpOutputLine. We're sorting on the entire line
when we could just sort on the first record (URL); but the sort order is
identical either way.
Args:
in_iter: Input iterator.
out_file: Output file.
"""
# Note: If chunk_files gets very large we can run out of open FDs. See .boto
# file comments about rsync_buffer_lines. If increasing rsync_buffer_lines
# doesn't suffice (e.g., for someone synchronizing with a really large
# bucket), an option would be to make gsutil merge in passes, never
# opening all chunk files simultaneously.
buffer_size = config.getint('GSUtil', 'rsync_buffer_lines', 32000)
chunk_files = []
try:
while True:
current_chunk = sorted(islice(in_iter, buffer_size))
if not current_chunk:
break
output_chunk = io.open('%s-%06i' % (out_file.name, len(chunk_files)),
mode='w+', encoding=UTF8)
chunk_files.append(output_chunk)
output_chunk.writelines(unicode(''.join(current_chunk)))
output_chunk.flush()
output_chunk.seek(0)
out_file.writelines(heapq.merge(*chunk_files))
except IOError as e:
if e.errno == errno.EMFILE:
raise CommandException('\n'.join(textwrap.wrap(
'Synchronization failed because too many open file handles were '
'needed while building synchronization state. Please see the '
'comments about rsync_buffer_lines in your .boto config file for a '
'possible way to address this problem.')))
raise
finally:
for chunk_file in chunk_files:
try:
chunk_file.close()
os.remove(chunk_file.name)
except:
pass
class _DiffIterator(object):
"""Iterator yielding sequence of _DiffToApply objects."""
def __init__(self, command_obj, base_src_url, base_dst_url):
self.command_obj = command_obj
self.compute_file_checksums = command_obj.compute_file_checksums
self.delete_extras = command_obj.delete_extras
self.recursion_requested = command_obj.recursion_requested
self.logger = self.command_obj.logger
self.base_src_url = base_src_url
self.base_dst_url = base_dst_url
self.logger.info('Building synchronization state...')
(src_fh, self.sorted_list_src_file_name) = tempfile.mkstemp(
prefix='gsutil-rsync-src-')
_tmp_files.append(self.sorted_list_src_file_name)
(dst_fh, self.sorted_list_dst_file_name) = tempfile.mkstemp(
prefix='gsutil-rsync-dst-')
_tmp_files.append(self.sorted_list_dst_file_name)
# Close the file handles; the file will be opened in write mode by
# _ListUrlRootFunc.
os.close(src_fh)
os.close(dst_fh)
# Build sorted lists of src and dst URLs in parallel. To do this, pass args
# to _ListUrlRootFunc as tuple (base_url_str, out_filename, desc)
# where base_url_str is the starting URL string for listing.
args_iter = iter([
(self.base_src_url.url_string, self.sorted_list_src_file_name,
'source'),
(self.base_dst_url.url_string, self.sorted_list_dst_file_name,
'destination')
])
# Contains error message from non-retryable listing failure.
command_obj.non_retryable_listing_failures = 0
shared_attrs = ['non_retryable_listing_failures']
command_obj.Apply(_ListUrlRootFunc, args_iter, _RootListingExceptionHandler,
shared_attrs, arg_checker=DummyArgChecker,
parallel_operations_override=True,
fail_on_error=True)
if command_obj.non_retryable_listing_failures:
raise CommandException('Caught non-retryable exception - aborting rsync')
self.sorted_list_src_file = open(self.sorted_list_src_file_name, 'r')
self.sorted_list_dst_file = open(self.sorted_list_dst_file_name, 'r')
# Wrap iterators in PluralityCheckableIterator so we can check emptiness.
self.sorted_src_urls_it = PluralityCheckableIterator(
iter(self.sorted_list_src_file))
self.sorted_dst_urls_it = PluralityCheckableIterator(
iter(self.sorted_list_dst_file))
def _ParseTmpFileLine(self, line):
"""Parses output from _BuildTmpOutputLine.
Parses into tuple:
(URL, size, crc32c, md5)
where crc32c and/or md5 can be _NA.
Args:
line: The line to parse.
Returns:
Parsed tuple: (url, size, crc32c, md5)
"""
(encoded_url, size, crc32c, md5) = line.split()
return (_DecodeUrl(encoded_url), int(size), crc32c, md5.strip())
def _WarnIfMissingCloudHash(self, url_str, crc32c, md5):
"""Warns if given url_str is a cloud URL and is missing both crc32c and md5.
Args:
url_str: Destination URL string.
crc32c: Destination CRC32c.
md5: Destination MD5.
Returns:
True if issued warning.
"""
# One known way this can currently happen is when rsync'ing objects larger
# than 5 GB from S3 (for which the etag is not an MD5).
if (StorageUrlFromString(url_str).IsCloudUrl()
and crc32c == _NA and md5 == _NA):
self.logger.warn(
'Found no hashes to validate %s. Integrity cannot be assured without '
'hashes.', url_str)
return True
return False
def _ObjectsMatch(self, src_url_str, src_size, src_crc32c, src_md5,
dst_url_str, dst_size, dst_crc32c, dst_md5):
"""Returns True if src and dst objects are the same.
Uses size plus whatever checksums are available.
Args:
src_url_str: Source URL string.
src_size: Source size
src_crc32c: Source CRC32c.
src_md5: Source MD5.
dst_url_str: Destination URL string.
dst_size: Destination size
dst_crc32c: Destination CRC32c.
dst_md5: Destination MD5.
Returns:
True/False.
"""
# Note: This function is called from __iter__, which is called from the
# Command.Apply driver. Thus, all checksum computation will be run in a
# single thread, which is good (having multiple threads concurrently
# computing checksums would thrash the disk).
if src_size != dst_size:
return False
if self.compute_file_checksums:
(src_crc32c, src_md5, dst_crc32c, dst_md5) = _ComputeNeededFileChecksums(
self.logger, src_url_str, src_size, src_crc32c, src_md5, dst_url_str,
dst_size, dst_crc32c, dst_md5)
if src_md5 != _NA and dst_md5 != _NA:
self.logger.debug('Comparing md5 for %s and %s', src_url_str, dst_url_str)
return src_md5 == dst_md5
if src_crc32c != _NA and dst_crc32c != _NA:
self.logger.debug(
'Comparing crc32c for %s and %s', src_url_str, dst_url_str)
return src_crc32c == dst_crc32c
if not self._WarnIfMissingCloudHash(src_url_str, src_crc32c, src_md5):
self._WarnIfMissingCloudHash(dst_url_str, dst_crc32c, dst_md5)
# Without checksums to compare we depend only on basic size comparison.
return True
def __iter__(self):
"""Iterates over src/dst URLs and produces a _DiffToApply sequence.
Yields:
The _DiffToApply.
"""
# Strip trailing slashes, if any, so we compute tail length against
# consistent position regardless of whether trailing slashes were included
# or not in URL.
base_src_url_len = len(self.base_src_url.url_string.rstrip('/\\'))
base_dst_url_len = len(self.base_dst_url.url_string.rstrip('/\\'))
src_url_str = dst_url_str = None
# Invariant: After each yield, the URLs in src_url_str, dst_url_str,
# self.sorted_src_urls_it, and self.sorted_dst_urls_it are not yet
# processed. Each time we encounter None in src_url_str or dst_url_str we
# populate from the respective iterator, and we reset one or the other value
# to None after yielding an action that disposes of that URL.
while not self.sorted_src_urls_it.IsEmpty() or src_url_str is not None:
if src_url_str is None:
(src_url_str, src_size, src_crc32c, src_md5) = self._ParseTmpFileLine(
self.sorted_src_urls_it.next())
# Skip past base URL and normalize slashes so we can compare across
# clouds/file systems (including Windows).
src_url_str_to_check = _EncodeUrl(
src_url_str[base_src_url_len:].replace('\\', '/'))
dst_url_str_would_copy_to = copy_helper.ConstructDstUrl(
self.base_src_url, StorageUrlFromString(src_url_str), True, True,
self.base_dst_url, False, self.recursion_requested).url_string
if self.sorted_dst_urls_it.IsEmpty():
# We've reached end of dst URLs, so copy src to dst.
yield _DiffToApply(
src_url_str, dst_url_str_would_copy_to, _DiffAction.COPY)
src_url_str = None
continue
if not dst_url_str:
(dst_url_str, dst_size, dst_crc32c, dst_md5) = (
self._ParseTmpFileLine(self.sorted_dst_urls_it.next()))
# Skip past base URL and normalize slashes so we can compare acros
# clouds/file systems (including Windows).
dst_url_str_to_check = _EncodeUrl(
dst_url_str[base_dst_url_len:].replace('\\', '/'))
if src_url_str_to_check < dst_url_str_to_check:
# There's no dst object corresponding to src object, so copy src to dst.
yield _DiffToApply(
src_url_str, dst_url_str_would_copy_to, _DiffAction.COPY)
src_url_str = None
elif src_url_str_to_check > dst_url_str_to_check:
# dst object without a corresponding src object, so remove dst if -d
# option was specified.
if self.delete_extras:
yield _DiffToApply(None, dst_url_str, _DiffAction.REMOVE)
dst_url_str = None
else:
# There is a dst object corresponding to src object, so check if objects
# match.
if self._ObjectsMatch(
src_url_str, src_size, src_crc32c, src_md5,
dst_url_str, dst_size, dst_crc32c, dst_md5):
# Continue iterating without yielding a _DiffToApply.
pass
else:
yield _DiffToApply(src_url_str, dst_url_str, _DiffAction.COPY)
src_url_str = None
dst_url_str = None
# If -d option specified any files/objects left in dst iteration should be
# removed.
if not self.delete_extras:
return
if dst_url_str:
yield _DiffToApply(None, dst_url_str, _DiffAction.REMOVE)
dst_url_str = None
for line in self.sorted_dst_urls_it:
(dst_url_str, _, _, _) = self._ParseTmpFileLine(line)
yield _DiffToApply(None, dst_url_str, _DiffAction.REMOVE)
def _RsyncFunc(cls, diff_to_apply, thread_state=None):
"""Worker function for performing the actual copy and remove operations."""
gsutil_api = GetCloudApiInstance(cls, thread_state=thread_state)
dst_url_str = diff_to_apply.dst_url_str
dst_url = StorageUrlFromString(dst_url_str)
if diff_to_apply.diff_action == _DiffAction.REMOVE:
if cls.dryrun:
cls.logger.info('Would remove %s', dst_url)
else:
cls.logger.info('Removing %s', dst_url)
if dst_url.IsFileUrl():
os.unlink(dst_url.object_name)
else:
try:
gsutil_api.DeleteObject(
dst_url.bucket_name, dst_url.object_name,
generation=dst_url.generation, provider=dst_url.scheme)
except NotFoundException:
# If the object happened to be deleted by an external process, this
# is fine because it moves us closer to the desired state.
pass
elif diff_to_apply.diff_action == _DiffAction.COPY:
src_url_str = diff_to_apply.src_url_str
src_url = StorageUrlFromString(src_url_str)
if cls.dryrun:
cls.logger.info('Would copy %s to %s', src_url, dst_url)
else:
try:
copy_helper.PerformCopy(cls.logger, src_url, dst_url, gsutil_api, cls,
_RsyncExceptionHandler,
headers=cls.headers)
except SkipUnsupportedObjectError, e:
cls.logger.info('Skipping item %s with unsupported object type %s',
src_url, e.unsupported_type)
else:
raise CommandException('Got unexpected DiffAction (%d)'
% diff_to_apply.diff_action)
def _RootListingExceptionHandler(cls, e):
"""Simple exception handler for exceptions during listing URLs to sync."""
cls.logger.error(str(e))
def _RsyncExceptionHandler(cls, e):
"""Simple exception handler to allow post-completion status."""
cls.logger.error(str(e))
cls.op_failure_count += 1
cls.logger.debug('\n\nEncountered exception while syncing:\n%s\n',
traceback.format_exc())
class RsyncCommand(Command):
"""Implementation of gsutil rsync command."""
# Command specification. See base class for documentation.
command_spec = Command.CreateCommandSpec(
'rsync',
command_name_aliases=[],
usage_synopsis=_SYNOPSIS,
min_args=2,
max_args=2,
supported_sub_args='cCdenprRUx:',
file_url_ok=True,
provider_url_ok=False,
urls_start_arg=0,
gs_api_support=[ApiSelector.XML, ApiSelector.JSON],
gs_default_api=ApiSelector.JSON,
argparse_arguments=[
CommandArgument.MakeNCloudOrFileURLsArgument(2)
]
)
# Help specification. See help_provider.py for documentation.
help_spec = Command.HelpSpec(
help_name='rsync',
help_name_aliases=['sync', 'synchronize'],
help_type='command_help',
help_one_line_summary='Synchronize content of two buckets/directories',
help_text=_DETAILED_HELP_TEXT,
subcommand_help_text={},
)
total_bytes_transferred = 0
def _InsistContainer(self, url_str, treat_nonexistent_object_as_subdir):
"""Sanity checks that URL names an existing container.
Args:
url_str: URL string to check.
treat_nonexistent_object_as_subdir: indicates if should treat a
non-existent object as a subdir.
Returns:
URL for checked string.
Raises:
CommandException if url_str doesn't name an existing container.
"""
(url, have_existing_container) = (
copy_helper.ExpandUrlToSingleBlr(url_str, self.gsutil_api, self.debug,
self.project_id,
treat_nonexistent_object_as_subdir))
if not have_existing_container:
raise CommandException(
'arg (%s) does not name a directory, bucket, or bucket subdir.'
% url_str)
return url
def RunCommand(self):
"""Command entry point for the rsync command."""
self._ParseOpts()
if self.compute_file_checksums and not UsingCrcmodExtension(crcmod):
self.logger.warn(SLOW_CRCMOD_WARNING)
src_url = self._InsistContainer(self.args[0], False)
dst_url = self._InsistContainer(self.args[1], True)
# Tracks if any copy or rm operations failed.
self.op_failure_count = 0
# List of attributes to share/manage across multiple processes in
# parallel (-m) mode.
shared_attrs = ['op_failure_count']
for signal_num in GetCaughtSignals():
RegisterSignalHandler(signal_num, _HandleSignals)
# Perform sync requests in parallel (-m) mode, if requested, using
# configured number of parallel processes and threads. Otherwise,
# perform requests with sequential function calls in current process.
diff_iterator = _DiffIterator(self, src_url, dst_url)
self.logger.info('Starting synchronization')
try:
self.Apply(_RsyncFunc, diff_iterator, _RsyncExceptionHandler,
shared_attrs, arg_checker=_DiffToApplyArgChecker,
fail_on_error=True)
finally:
CleanUpTempFiles()
if self.op_failure_count:
plural_str = 's' if self.op_failure_count else ''
raise CommandException(
'%d file%s/object%s could not be copied/removed.' %
(self.op_failure_count, plural_str, plural_str))
def _ParseOpts(self):
# exclude_symlinks is handled by Command parent class, so save in Command
# state rather than CopyHelperOpts.
self.exclude_symlinks = False
# continue_on_error is handled by Command parent class, so save in Command
# state rather than CopyHelperOpts.
self.continue_on_error = False
self.delete_extras = False
preserve_acl = False
self.compute_file_checksums = False
self.dryrun = False
self.exclude_pattern = None
self.skip_unsupported_objects = False
# self.recursion_requested is initialized in command.py (so it can be
# checked in parent class for all commands).
if self.sub_opts:
for o, a in self.sub_opts:
if o == '-c':
self.compute_file_checksums = True
# Note: In gsutil cp command this is specified using -c but here we use
# -C so we can use -c for checksum arg (to be consistent with Unix rsync
# command options).
elif o == '-C':
self.continue_on_error = True
elif o == '-d':
self.delete_extras = True
elif o == '-e':
self.exclude_symlinks = True
elif o == '-n':
self.dryrun = True
elif o == '-p':
preserve_acl = True
elif o == '-r' or o == '-R':
self.recursion_requested = True
elif o == '-U':
self.skip_unsupported_objects = True
elif o == '-x':
if not a:
raise CommandException('Invalid blank exclude filter')
try:
self.exclude_pattern = re.compile(a)
except re.error:
raise CommandException('Invalid exclude filter (%s)' % a)
return CreateCopyHelperOpts(
preserve_acl=preserve_acl,
skip_unsupported_objects=self.skip_unsupported_objects)