# -*- coding: utf-8 -*-
# Copyright 2014 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Media helper functions and classes for Google Cloud Storage JSON API."""
from __future__ import absolute_import
import copy
import cStringIO
import httplib
import logging
import socket
import types
import urlparse
from apitools.base.py import exceptions as apitools_exceptions
import httplib2
from httplib2 import parse_uri
from gslib.cloud_api import BadRequestException
from gslib.progress_callback import ProgressCallbackWithBackoff
from gslib.util import SSL_TIMEOUT
from gslib.util import TRANSFER_BUFFER_SIZE
class BytesTransferredContainer(object):
"""Container class for passing number of bytes transferred to lower layers.
For resumed transfers or connection rebuilds in the middle of a transfer, we
need to rebuild the connection class with how much we've transferred so far.
For uploads, we don't know the total number of bytes uploaded until we've
queried the server, but we need to create the connection class to pass to
httplib2 before we can query the server. This container object allows us to
pass a reference into Upload/DownloadCallbackConnection.
"""
def __init__(self):
self.__bytes_transferred = 0
@property
def bytes_transferred(self):
return self.__bytes_transferred
@bytes_transferred.setter
def bytes_transferred(self, value):
self.__bytes_transferred = value
class UploadCallbackConnectionClassFactory(object):
"""Creates a class that can override an httplib2 connection.
This is used to provide progress callbacks and disable dumping the upload
payload during debug statements. It can later be used to provide on-the-fly
hash digestion during upload.
"""
def __init__(self, bytes_uploaded_container,
buffer_size=TRANSFER_BUFFER_SIZE,
total_size=0, progress_callback=None):
self.bytes_uploaded_container = bytes_uploaded_container
self.buffer_size = buffer_size
self.total_size = total_size
self.progress_callback = progress_callback
def GetConnectionClass(self):
"""Returns a connection class that overrides send."""
outer_bytes_uploaded_container = self.bytes_uploaded_container
outer_buffer_size = self.buffer_size
outer_total_size = self.total_size
outer_progress_callback = self.progress_callback
class UploadCallbackConnection(httplib2.HTTPSConnectionWithTimeout):
"""Connection class override for uploads."""
bytes_uploaded_container = outer_bytes_uploaded_container
# After we instantiate this class, apitools will check with the server
# to find out how many bytes remain for a resumable upload. This allows
# us to update our progress once based on that number.
processed_initial_bytes = False
GCS_JSON_BUFFER_SIZE = outer_buffer_size
callback_processor = None
size = outer_total_size
def __init__(self, *args, **kwargs):
kwargs['timeout'] = SSL_TIMEOUT
httplib2.HTTPSConnectionWithTimeout.__init__(self, *args, **kwargs)
def send(self, data):
"""Overrides HTTPConnection.send."""
if not self.processed_initial_bytes:
self.processed_initial_bytes = True
if outer_progress_callback:
self.callback_processor = ProgressCallbackWithBackoff(
outer_total_size, outer_progress_callback)
self.callback_processor.Progress(
self.bytes_uploaded_container.bytes_transferred)
# httplib.HTTPConnection.send accepts either a string or a file-like
# object (anything that implements read()).
if isinstance(data, basestring):
full_buffer = cStringIO.StringIO(data)
else:
full_buffer = data
partial_buffer = full_buffer.read(self.GCS_JSON_BUFFER_SIZE)
while partial_buffer:
httplib2.HTTPSConnectionWithTimeout.send(self, partial_buffer)
send_length = len(partial_buffer)
if self.callback_processor:
# This is the only place where gsutil has control over making a
# callback, but here we can't differentiate the metadata bytes
# (such as headers and OAuth2 refreshes) sent during an upload
# from the actual upload bytes, so we will actually report
# slightly more bytes than desired to the callback handler.
#
# One considered/rejected alternative is to move the callbacks
# into the HashingFileUploadWrapper which only processes reads on
# the bytes. This has the disadvantages of being removed from
# where we actually send the bytes and unnecessarily
# multi-purposing that class.
self.callback_processor.Progress(send_length)
partial_buffer = full_buffer.read(self.GCS_JSON_BUFFER_SIZE)
return UploadCallbackConnection
def WrapUploadHttpRequest(upload_http):
"""Wraps upload_http so we only use our custom connection_type on PUTs.
POSTs are used to refresh oauth tokens, and we don't want to process the
data sent in those requests.
Args:
upload_http: httplib2.Http instance to wrap
"""
request_orig = upload_http.request
def NewRequest(uri, method='GET', body=None, headers=None,
redirections=httplib2.DEFAULT_MAX_REDIRECTS,
connection_type=None):
if method == 'PUT' or method == 'POST':
override_connection_type = connection_type
else:
override_connection_type = None
return request_orig(uri, method=method, body=body,
headers=headers, redirections=redirections,
connection_type=override_connection_type)
# Replace the request method with our own closure.
upload_http.request = NewRequest
class DownloadCallbackConnectionClassFactory(object):
"""Creates a class that can override an httplib2 connection.
This is used to provide progress callbacks, disable dumping the download
payload during debug statements, and provide on-the-fly hash digestion during
download. On-the-fly digestion is particularly important because httplib2
will decompress gzipped content on-the-fly, thus this class provides our
only opportunity to calculate the correct hash for an object that has a
gzip hash in the cloud.
"""
def __init__(self, bytes_downloaded_container,
buffer_size=TRANSFER_BUFFER_SIZE, total_size=0,
progress_callback=None, digesters=None):
self.buffer_size = buffer_size
self.total_size = total_size
self.progress_callback = progress_callback
self.digesters = digesters
self.bytes_downloaded_container = bytes_downloaded_container
def GetConnectionClass(self):
"""Returns a connection class that overrides getresponse."""
class DownloadCallbackConnection(httplib2.HTTPSConnectionWithTimeout):
"""Connection class override for downloads."""
outer_total_size = self.total_size
outer_digesters = self.digesters
outer_progress_callback = self.progress_callback
outer_bytes_downloaded_container = self.bytes_downloaded_container
processed_initial_bytes = False
callback_processor = None
def __init__(self, *args, **kwargs):
kwargs['timeout'] = SSL_TIMEOUT
httplib2.HTTPSConnectionWithTimeout.__init__(self, *args, **kwargs)
def getresponse(self, buffering=False):
"""Wraps an HTTPResponse to perform callbacks and hashing.
In this function, self is a DownloadCallbackConnection.
Args:
buffering: Unused. This function uses a local buffer.
Returns:
HTTPResponse object with wrapped read function.
"""
orig_response = httplib.HTTPConnection.getresponse(self)
if orig_response.status not in (httplib.OK, httplib.PARTIAL_CONTENT):
return orig_response
orig_read_func = orig_response.read
def read(amt=None): # pylint: disable=invalid-name
"""Overrides HTTPConnection.getresponse.read.
This function only supports reads of TRANSFER_BUFFER_SIZE or smaller.
Args:
amt: Integer n where 0 < n <= TRANSFER_BUFFER_SIZE. This is a
keyword argument to match the read function it overrides,
but it is required.
Returns:
Data read from HTTPConnection.
"""
if not amt or amt > TRANSFER_BUFFER_SIZE:
raise BadRequestException(
'Invalid HTTP read size %s during download, expected %s.' %
(amt, TRANSFER_BUFFER_SIZE))
else:
amt = amt or TRANSFER_BUFFER_SIZE
if not self.processed_initial_bytes:
self.processed_initial_bytes = True
if self.outer_progress_callback:
self.callback_processor = ProgressCallbackWithBackoff(
self.outer_total_size, self.outer_progress_callback)
self.callback_processor.Progress(
self.outer_bytes_downloaded_container.bytes_transferred)
data = orig_read_func(amt)
read_length = len(data)
if self.callback_processor:
self.callback_processor.Progress(read_length)
if self.outer_digesters:
for alg in self.outer_digesters:
self.outer_digesters[alg].update(data)
return data
orig_response.read = read
return orig_response
return DownloadCallbackConnection
def WrapDownloadHttpRequest(download_http):
"""Overrides download request functions for an httplib2.Http object.
Args:
download_http: httplib2.Http.object to wrap / override.
Returns:
Wrapped / overridden httplib2.Http object.
"""
# httplib2 has a bug https://code.google.com/p/httplib2/issues/detail?id=305
# where custom connection_type is not respected after redirects. This
# function is copied from httplib2 and overrides the request function so that
# the connection_type is properly passed through.
# pylint: disable=protected-access,g-inconsistent-quotes,unused-variable
# pylint: disable=g-equals-none,g-doc-return-or-yield
# pylint: disable=g-short-docstring-punctuation,g-doc-args
# pylint: disable=too-many-statements
def OverrideRequest(self, conn, host, absolute_uri, request_uri, method,
body, headers, redirections, cachekey):
"""Do the actual request using the connection object.
Also follow one level of redirects if necessary.
"""
auths = ([(auth.depth(request_uri), auth) for auth in self.authorizations
if auth.inscope(host, request_uri)])
auth = auths and sorted(auths)[0][1] or None
if auth:
auth.request(method, request_uri, headers, body)
(response, content) = self._conn_request(conn, request_uri, method, body,
headers)
if auth:
if auth.response(response, body):
auth.request(method, request_uri, headers, body)
(response, content) = self._conn_request(conn, request_uri, method,
body, headers)
response._stale_digest = 1
if response.status == 401:
for authorization in self._auth_from_challenge(
host, request_uri, headers, response, content):
authorization.request(method, request_uri, headers, body)
(response, content) = self._conn_request(conn, request_uri, method,
body, headers)
if response.status != 401:
self.authorizations.append(authorization)
authorization.response(response, body)
break
if (self.follow_all_redirects or (method in ["GET", "HEAD"])
or response.status == 303):
if self.follow_redirects and response.status in [300, 301, 302,
303, 307]:
# Pick out the location header and basically start from the beginning
# remembering first to strip the ETag header and decrement our 'depth'
if redirections:
if not response.has_key('location') and response.status != 300:
raise httplib2.RedirectMissingLocation(
"Redirected but the response is missing a Location: header.",
response, content)
# Fix-up relative redirects (which violate an RFC 2616 MUST)
if response.has_key('location'):
location = response['location']
(scheme, authority, path, query, fragment) = parse_uri(location)
if authority == None:
response['location'] = urlparse.urljoin(absolute_uri, location)
if response.status == 301 and method in ["GET", "HEAD"]:
response['-x-permanent-redirect-url'] = response['location']
if not response.has_key('content-location'):
response['content-location'] = absolute_uri
httplib2._updateCache(headers, response, content, self.cache,
cachekey)
if headers.has_key('if-none-match'):
del headers['if-none-match']
if headers.has_key('if-modified-since'):
del headers['if-modified-since']
if ('authorization' in headers and
not self.forward_authorization_headers):
del headers['authorization']
if response.has_key('location'):
location = response['location']
old_response = copy.deepcopy(response)
if not old_response.has_key('content-location'):
old_response['content-location'] = absolute_uri
redirect_method = method
if response.status in [302, 303]:
redirect_method = "GET"
body = None
(response, content) = self.request(
location, redirect_method, body=body, headers=headers,
redirections=redirections-1,
connection_type=conn.__class__)
response.previous = old_response
else:
raise httplib2.RedirectLimit(
"Redirected more times than redirection_limit allows.",
response, content)
elif response.status in [200, 203] and method in ["GET", "HEAD"]:
# Don't cache 206's since we aren't going to handle byte range
# requests
if not response.has_key('content-location'):
response['content-location'] = absolute_uri
httplib2._updateCache(headers, response, content, self.cache,
cachekey)
return (response, content)
# Wrap download_http so we do not use our custom connection_type
# on POSTS, which are used to refresh oauth tokens. We don't want to
# process the data received in those requests.
request_orig = download_http.request
def NewRequest(uri, method='GET', body=None, headers=None,
redirections=httplib2.DEFAULT_MAX_REDIRECTS,
connection_type=None):
if method == 'POST':
return request_orig(uri, method=method, body=body,
headers=headers, redirections=redirections,
connection_type=None)
else:
return request_orig(uri, method=method, body=body,
headers=headers, redirections=redirections,
connection_type=connection_type)
# Replace the request methods with our own closures.
download_http._request = types.MethodType(OverrideRequest, download_http)
download_http.request = NewRequest
return download_http
class HttpWithNoRetries(httplib2.Http):
"""httplib2.Http variant that does not retry.
httplib2 automatically retries requests according to httplib2.RETRIES, but
in certain cases httplib2 ignores the RETRIES value and forces a retry.
Because httplib2 does not handle the case where the underlying request body
is a stream, a retry may cause a non-idempotent write as the stream is
partially consumed and not reset before the retry occurs.
Here we override _conn_request to disable retries unequivocally, so that
uploads may be retried at higher layers that properly handle stream request
bodies.
"""
def _conn_request(self, conn, request_uri, method, body, headers): # pylint: disable=too-many-statements
try:
if hasattr(conn, 'sock') and conn.sock is None:
conn.connect()
conn.request(method, request_uri, body, headers)
except socket.timeout:
raise
except socket.gaierror:
conn.close()
raise httplib2.ServerNotFoundError(
'Unable to find the server at %s' % conn.host)
except httplib2.ssl_SSLError:
conn.close()
raise
except socket.error, e:
err = 0
if hasattr(e, 'args'):
err = getattr(e, 'args')[0]
else:
err = e.errno
if err == httplib2.errno.ECONNREFUSED: # Connection refused
raise
except httplib.HTTPException:
conn.close()
raise
try:
response = conn.getresponse()
except (socket.error, httplib.HTTPException):
conn.close()
raise
else:
content = ''
if method == 'HEAD':
conn.close()
else:
content = response.read()
response = httplib2.Response(response)
if method != 'HEAD':
# pylint: disable=protected-access
content = httplib2._decompressContent(response, content)
return (response, content)
class HttpWithDownloadStream(httplib2.Http):
"""httplib2.Http variant that only pushes bytes through a stream.
httplib2 handles media by storing entire chunks of responses in memory, which
is undesirable particularly when multiple instances are used during
multi-threaded/multi-process copy. This class copies and then overrides some
httplib2 functions to use a streaming copy approach that uses small memory
buffers.
Also disables httplib2 retries (for reasons stated in the HttpWithNoRetries
class doc).
"""
def __init__(self, *args, **kwds):
self._stream = None
self._logger = logging.getLogger()
super(HttpWithDownloadStream, self).__init__(*args, **kwds)
@property
def stream(self):
return self._stream
@stream.setter
def stream(self, value):
self._stream = value
def _conn_request(self, conn, request_uri, method, body, headers): # pylint: disable=too-many-statements
try:
if hasattr(conn, 'sock') and conn.sock is None:
conn.connect()
conn.request(method, request_uri, body, headers)
except socket.timeout:
raise
except socket.gaierror:
conn.close()
raise httplib2.ServerNotFoundError(
'Unable to find the server at %s' % conn.host)
except httplib2.ssl_SSLError:
conn.close()
raise
except socket.error, e:
err = 0
if hasattr(e, 'args'):
err = getattr(e, 'args')[0]
else:
err = e.errno
if err == httplib2.errno.ECONNREFUSED: # Connection refused
raise
except httplib.HTTPException:
# Just because the server closed the connection doesn't apparently mean
# that the server didn't send a response.
conn.close()
raise
try:
response = conn.getresponse()
except (socket.error, httplib.HTTPException):
conn.close()
raise
else:
content = ''
if method == 'HEAD':
conn.close()
response = httplib2.Response(response)
else:
if response.status in (httplib.OK, httplib.PARTIAL_CONTENT):
content_length = None
if hasattr(response, 'msg'):
content_length = response.getheader('content-length')
http_stream = response
bytes_read = 0
while True:
new_data = http_stream.read(TRANSFER_BUFFER_SIZE)
if new_data:
if self.stream is None:
raise apitools_exceptions.InvalidUserInputError(
'Cannot exercise HttpWithDownloadStream with no stream')
self.stream.write(new_data)
bytes_read += len(new_data)
else:
break
if (content_length is not None and
long(bytes_read) != long(content_length)):
# The input stream terminated before we were able to read the
# entire contents, possibly due to a network condition. Set
# content-length to indicate how many bytes we actually read.
self._logger.log(
logging.DEBUG, 'Only got %s bytes out of content-length %s '
'for request URI %s. Resetting content-length to match '
'bytes read.', bytes_read, content_length, request_uri)
response.msg['content-length'] = str(bytes_read)
response = httplib2.Response(response)
else:
# We fall back to the current httplib2 behavior if we're
# not processing bytes (eg it's a redirect).
content = response.read()
response = httplib2.Response(response)
# pylint: disable=protected-access
content = httplib2._decompressContent(response, content)
return (response, content)