普通文本  |  2359行  |  98.53 KB

# Copyright 2014 Google Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Oauth2client tests

Unit tests for oauth2client.
"""

import base64
import contextlib
import copy
import datetime
import json
import os
import socket
import sys
import tempfile

import httplib2
import mock
import six
from six.moves import http_client
from six.moves import urllib
import unittest2

import oauth2client
from oauth2client import _helpers
from oauth2client import client
from oauth2client import clientsecrets
from oauth2client import service_account
from oauth2client import util
from .http_mock import CacheMock
from .http_mock import HttpMock
from .http_mock import HttpMockSequence

__author__ = 'jcgregorio@google.com (Joe Gregorio)'

DATA_DIR = os.path.join(os.path.dirname(__file__), 'data')


# TODO(craigcitro): This is duplicated from
# googleapiclient.test_discovery; consolidate these definitions.
def assertUrisEqual(testcase, expected, actual):
    """Test that URIs are the same, up to reordering of query parameters."""
    expected = urllib.parse.urlparse(expected)
    actual = urllib.parse.urlparse(actual)
    testcase.assertEqual(expected.scheme, actual.scheme)
    testcase.assertEqual(expected.netloc, actual.netloc)
    testcase.assertEqual(expected.path, actual.path)
    testcase.assertEqual(expected.params, actual.params)
    testcase.assertEqual(expected.fragment, actual.fragment)
    expected_query = urllib.parse.parse_qs(expected.query)
    actual_query = urllib.parse.parse_qs(actual.query)
    for name in expected_query.keys():
        testcase.assertEqual(expected_query[name], actual_query[name])
    for name in actual_query.keys():
        testcase.assertEqual(expected_query[name], actual_query[name])


def datafile(filename):
    return os.path.join(DATA_DIR, filename)


def load_and_cache(existing_file, fakename, cache_mock):
    client_type, client_info = clientsecrets._loadfile(datafile(existing_file))
    cache_mock.cache[fakename] = {client_type: client_info}


class CredentialsTests(unittest2.TestCase):

    def test_to_from_json(self):
        credentials = client.Credentials()
        json = credentials.to_json()
        client.Credentials.new_from_json(json)

    def test_authorize_abstract(self):
        credentials = client.Credentials()
        http = object()
        with self.assertRaises(NotImplementedError):
            credentials.authorize(http)

    def test_refresh_abstract(self):
        credentials = client.Credentials()
        http = object()
        with self.assertRaises(NotImplementedError):
            credentials.refresh(http)

    def test_revoke_abstract(self):
        credentials = client.Credentials()
        http = object()
        with self.assertRaises(NotImplementedError):
            credentials.revoke(http)

    def test_apply_abstract(self):
        credentials = client.Credentials()
        headers = {}
        with self.assertRaises(NotImplementedError):
            credentials.apply(headers)

    def test__to_json_basic(self):
        credentials = client.Credentials()
        json_payload = credentials._to_json([])
        # str(bytes) in Python2 and str(unicode) in Python3
        self.assertIsInstance(json_payload, str)
        payload = json.loads(json_payload)
        expected_payload = {
            '_class': client.Credentials.__name__,
            '_module': client.Credentials.__module__,
            'token_expiry': None,
        }
        self.assertEqual(payload, expected_payload)

    def test__to_json_with_strip(self):
        credentials = client.Credentials()
        credentials.foo = 'bar'
        credentials.baz = 'quux'
        to_strip = ['foo']
        json_payload = credentials._to_json(to_strip)
        # str(bytes) in Python2 and str(unicode) in Python3
        self.assertIsInstance(json_payload, str)
        payload = json.loads(json_payload)
        expected_payload = {
            '_class': client.Credentials.__name__,
            '_module': client.Credentials.__module__,
            'token_expiry': None,
            'baz': credentials.baz,
        }
        self.assertEqual(payload, expected_payload)

    def test__to_json_to_serialize(self):
        credentials = client.Credentials()
        to_serialize = {
            'foo': b'bar',
            'baz': u'quux',
            'st': set(['a', 'b']),
        }
        orig_vals = to_serialize.copy()
        json_payload = credentials._to_json([], to_serialize=to_serialize)
        # str(bytes) in Python2 and str(unicode) in Python3
        self.assertIsInstance(json_payload, str)
        payload = json.loads(json_payload)
        expected_payload = {
            '_class': client.Credentials.__name__,
            '_module': client.Credentials.__module__,
            'token_expiry': None,
        }
        expected_payload.update(to_serialize)
        # Special-case the set.
        expected_payload['st'] = list(expected_payload['st'])
        # Special-case the bytes.
        expected_payload['foo'] = u'bar'
        self.assertEqual(payload, expected_payload)
        # Make sure the method call didn't modify our dictionary.
        self.assertEqual(to_serialize, orig_vals)

    @mock.patch.object(client.Credentials, '_to_json',
                       return_value=object())
    def test_to_json(self, to_json):
        credentials = client.Credentials()
        self.assertEqual(credentials.to_json(), to_json.return_value)
        to_json.assert_called_once_with(
            client.Credentials.NON_SERIALIZED_MEMBERS)

    def test_new_from_json_no_data(self):
        creds_data = {}
        json_data = json.dumps(creds_data)
        with self.assertRaises(KeyError):
            client.Credentials.new_from_json(json_data)

    def test_new_from_json_basic_data(self):
        creds_data = {
            '_module': 'oauth2client.client',
            '_class': 'Credentials',
        }
        json_data = json.dumps(creds_data)
        credentials = client.Credentials.new_from_json(json_data)
        self.assertIsInstance(credentials, client.Credentials)

    def test_new_from_json_old_name(self):
        creds_data = {
            '_module': 'oauth2client.googleapiclient.client',
            '_class': 'Credentials',
        }
        json_data = json.dumps(creds_data)
        credentials = client.Credentials.new_from_json(json_data)
        self.assertIsInstance(credentials, client.Credentials)

    def test_new_from_json_bad_module(self):
        creds_data = {
            '_module': 'oauth2client.foobar',
            '_class': 'Credentials',
        }
        json_data = json.dumps(creds_data)
        with self.assertRaises(ImportError):
            client.Credentials.new_from_json(json_data)

    def test_new_from_json_bad_class(self):
        creds_data = {
            '_module': 'oauth2client.client',
            '_class': 'NopeNotCredentials',
        }
        json_data = json.dumps(creds_data)
        with self.assertRaises(AttributeError):
            client.Credentials.new_from_json(json_data)

    def test_from_json(self):
        unused_data = {}
        credentials = client.Credentials.from_json(unused_data)
        self.assertIsInstance(credentials, client.Credentials)
        self.assertEqual(credentials.__dict__, {})


class TestStorage(unittest2.TestCase):

    def test_locked_get_abstract(self):
        storage = client.Storage()
        with self.assertRaises(NotImplementedError):
            storage.locked_get()

    def test_locked_put_abstract(self):
        storage = client.Storage()
        credentials = object()
        with self.assertRaises(NotImplementedError):
            storage.locked_put(credentials)

    def test_locked_delete_abstract(self):
        storage = client.Storage()
        with self.assertRaises(NotImplementedError):
            storage.locked_delete()


@contextlib.contextmanager
def mock_module_import(module):
    """Place a dummy objects in sys.modules to mock an import test."""
    parts = module.split('.')
    entries = ['.'.join(parts[:i + 1]) for i in range(len(parts))]
    for entry in entries:
        sys.modules[entry] = object()

    try:
        yield

    finally:
        for entry in entries:
            del sys.modules[entry]


class GoogleCredentialsTests(unittest2.TestCase):

    def setUp(self):
        self.os_name = os.name
        client.SETTINGS.env_name = None

    def tearDown(self):
        self.reset_env('SERVER_SOFTWARE')
        self.reset_env(client.GOOGLE_APPLICATION_CREDENTIALS)
        self.reset_env('APPDATA')
        os.name = self.os_name

    def reset_env(self, env):
        """Set the environment variable 'env' to 'value'."""
        os.environ.pop(env, None)

    def validate_service_account_credentials(self, credentials):
        self.assertIsInstance(
            credentials, service_account.ServiceAccountCredentials)
        self.assertEqual('123', credentials.client_id)
        self.assertEqual('dummy@google.com',
                         credentials._service_account_email)
        self.assertEqual('ABCDEF', credentials._private_key_id)
        self.assertEqual('', credentials._scopes)

    def validate_google_credentials(self, credentials):
        self.assertIsInstance(credentials, client.GoogleCredentials)
        self.assertEqual(None, credentials.access_token)
        self.assertEqual('123', credentials.client_id)
        self.assertEqual('secret', credentials.client_secret)
        self.assertEqual('alabalaportocala', credentials.refresh_token)
        self.assertEqual(None, credentials.token_expiry)
        self.assertEqual(oauth2client.GOOGLE_TOKEN_URI, credentials.token_uri)
        self.assertEqual('Python client library', credentials.user_agent)

    def get_a_google_credentials_object(self):
        return client.GoogleCredentials(None, None, None, None,
                                        None, None, None, None)

    def test_create_scoped_required(self):
        self.assertFalse(
            self.get_a_google_credentials_object().create_scoped_required())

    def test_create_scoped(self):
        credentials = self.get_a_google_credentials_object()
        self.assertEqual(credentials, credentials.create_scoped(None))
        self.assertEqual(credentials,
                         credentials.create_scoped(['dummy_scope']))

    @mock.patch.object(client.GoogleCredentials,
                       '_implicit_credentials_from_files',
                       return_value=None)
    @mock.patch.object(client.GoogleCredentials,
                       '_implicit_credentials_from_gce')
    @mock.patch.object(client, '_in_gae_environment',
                       return_value=True)
    @mock.patch.object(client, '_get_application_default_credential_GAE',
                       return_value=object())
    def test_get_application_default_in_gae(self, gae_adc, in_gae,
                                            from_gce, from_files):
        credentials = client.GoogleCredentials.get_application_default()
        self.assertEqual(credentials, gae_adc.return_value)
        from_files.assert_called_once_with()
        in_gae.assert_called_once_with()
        from_gce.assert_not_called()

    @mock.patch.object(client.GoogleCredentials,
                       '_implicit_credentials_from_gae',
                       return_value=None)
    @mock.patch.object(client.GoogleCredentials,
                       '_implicit_credentials_from_files',
                       return_value=None)
    @mock.patch.object(client, '_in_gce_environment',
                       return_value=True)
    @mock.patch.object(client, '_get_application_default_credential_GCE',
                       return_value=object())
    def test_get_application_default_in_gce(self, gce_adc, in_gce,
                                            from_files, from_gae):
        credentials = client.GoogleCredentials.get_application_default()
        self.assertEqual(credentials, gce_adc.return_value)
        in_gce.assert_called_once_with()
        from_gae.assert_called_once_with()
        from_files.assert_called_once_with()

    def test_environment_check_gae_production(self):
        with mock_module_import('google.appengine'):
            self._environment_check_gce_helper(
                server_software='Google App Engine/XYZ')

    def test_environment_check_gae_local(self):
        with mock_module_import('google.appengine'):
            self._environment_check_gce_helper(
                server_software='Development/XYZ')

    def test_environment_check_fastpath(self):
        with mock_module_import('google.appengine'):
            self._environment_check_gce_helper(
                server_software='Development/XYZ')

    def test_environment_caching(self):
        os.environ['SERVER_SOFTWARE'] = 'Development/XYZ'
        with mock_module_import('google.appengine'):
            self.assertTrue(client._in_gae_environment())
            os.environ['SERVER_SOFTWARE'] = ''
            # Even though we no longer pass the environment check, it
            # is cached.
            self.assertTrue(client._in_gae_environment())

    def _environment_check_gce_helper(self, status_ok=True, socket_error=False,
                                      server_software=''):
        response = mock.MagicMock()
        if status_ok:
            response.status = http_client.OK
            response.getheader = mock.MagicMock(
                name='getheader',
                return_value=client._DESIRED_METADATA_FLAVOR)
        else:
            response.status = http_client.NOT_FOUND

        connection = mock.MagicMock()
        connection.getresponse = mock.MagicMock(name='getresponse',
                                                return_value=response)
        if socket_error:
            connection.getresponse.side_effect = socket.error()

        with mock.patch('oauth2client.client.os') as os_module:
            os_module.environ = {client._SERVER_SOFTWARE: server_software}
            with mock.patch('oauth2client.client.six') as six_module:
                http_client_module = six_module.moves.http_client
                http_client_module.HTTPConnection = mock.MagicMock(
                    name='HTTPConnection', return_value=connection)

                if server_software == '':
                    self.assertFalse(client._in_gae_environment())
                else:
                    self.assertTrue(client._in_gae_environment())

                if status_ok and not socket_error and server_software == '':
                    self.assertTrue(client._in_gce_environment())
                else:
                    self.assertFalse(client._in_gce_environment())

                if server_software == '':
                    http_client_module.HTTPConnection.assert_called_once_with(
                        client._GCE_METADATA_HOST,
                        timeout=client.GCE_METADATA_TIMEOUT)
                    connection.getresponse.assert_called_once_with()
                    # Remaining calls are not "getresponse"
                    headers = {
                        client._METADATA_FLAVOR_HEADER: (
                            client._DESIRED_METADATA_FLAVOR),
                    }
                    self.assertEqual(connection.method_calls, [
                        mock.call.request('GET', '/',
                                          headers=headers),
                        mock.call.close(),
                    ])
                    self.assertEqual(response.method_calls, [])
                    if status_ok and not socket_error:
                        response.getheader.assert_called_once_with(
                            client._METADATA_FLAVOR_HEADER)
                else:
                    self.assertEqual(
                        http_client_module.HTTPConnection.mock_calls, [])
                    self.assertEqual(connection.getresponse.mock_calls, [])
                    # Remaining calls are not "getresponse"
                    self.assertEqual(connection.method_calls, [])
                    self.assertEqual(response.method_calls, [])
                    self.assertEqual(response.getheader.mock_calls, [])

    def test_environment_check_gce_production(self):
        self._environment_check_gce_helper(status_ok=True)

    def test_environment_check_gce_prod_with_working_gae_imports(self):
        with mock_module_import('google.appengine'):
            self._environment_check_gce_helper(status_ok=True)

    def test_environment_check_gce_timeout(self):
        self._environment_check_gce_helper(socket_error=True)

    def test_environ_check_gae_module_unknown(self):
        with mock_module_import('google.appengine'):
            self._environment_check_gce_helper(status_ok=False)

    def test_environment_check_unknown(self):
        self._environment_check_gce_helper(status_ok=False)

    def test_get_environment_variable_file(self):
        environment_variable_file = datafile(
            os.path.join('gcloud', client._WELL_KNOWN_CREDENTIALS_FILE))
        os.environ[client.GOOGLE_APPLICATION_CREDENTIALS] = (
            environment_variable_file)
        self.assertEqual(environment_variable_file,
                         client._get_environment_variable_file())

    def test_get_environment_variable_file_error(self):
        nonexistent_file = datafile('nonexistent')
        os.environ[client.GOOGLE_APPLICATION_CREDENTIALS] = nonexistent_file
        expected_err_msg = (
            'File {0} \(pointed by {1} environment variable\) does not '
            'exist!'.format(
                nonexistent_file, client.GOOGLE_APPLICATION_CREDENTIALS))
        with self.assertRaisesRegexp(client.ApplicationDefaultCredentialsError,
                                     expected_err_msg):
            client._get_environment_variable_file()

    @mock.patch.dict(os.environ, {}, clear=True)
    def test_get_environment_variable_file_without_env_var(self):
        self.assertIsNone(client._get_environment_variable_file())

    @mock.patch('os.name', new='nt')
    @mock.patch.dict(os.environ, {'APPDATA': DATA_DIR}, clear=True)
    def test_get_well_known_file_on_windows(self):
        well_known_file = datafile(
            os.path.join(client._CLOUDSDK_CONFIG_DIRECTORY,
                         client._WELL_KNOWN_CREDENTIALS_FILE))
        self.assertEqual(well_known_file, client._get_well_known_file())

    @mock.patch('os.name', new='nt')
    @mock.patch.dict(os.environ, {'SystemDrive': 'G:'}, clear=True)
    def test_get_well_known_file_on_windows_without_appdata(self):
        well_known_file = os.path.join('G:', '\\',
                                       client._CLOUDSDK_CONFIG_DIRECTORY,
                                       client._WELL_KNOWN_CREDENTIALS_FILE)
        self.assertEqual(well_known_file, client._get_well_known_file())

    @mock.patch.dict(os.environ,
                     {client._CLOUDSDK_CONFIG_ENV_VAR: 'CUSTOM_DIR'},
                     clear=True)
    def test_get_well_known_file_with_custom_config_dir(self):
        CUSTOM_DIR = os.environ[client._CLOUDSDK_CONFIG_ENV_VAR]
        EXPECTED_FILE = os.path.join(CUSTOM_DIR,
                                     client._WELL_KNOWN_CREDENTIALS_FILE)
        well_known_file = client._get_well_known_file()
        self.assertEqual(well_known_file, EXPECTED_FILE)

    def test_get_adc_from_file_service_account(self):
        credentials_file = datafile(
            os.path.join('gcloud', client._WELL_KNOWN_CREDENTIALS_FILE))
        credentials = client._get_application_default_credential_from_file(
            credentials_file)
        self.validate_service_account_credentials(credentials)

    def test_save_to_well_known_file_service_account(self):
        credential_file = datafile(
            os.path.join('gcloud', client._WELL_KNOWN_CREDENTIALS_FILE))
        credentials = client._get_application_default_credential_from_file(
            credential_file)
        temp_credential_file = datafile(
            os.path.join('gcloud',
                         'temp_well_known_file_service_account.json'))
        client.save_to_well_known_file(credentials, temp_credential_file)
        with open(temp_credential_file) as f:
            d = json.load(f)
        self.assertEqual('service_account', d['type'])
        self.assertEqual('123', d['client_id'])
        self.assertEqual('dummy@google.com', d['client_email'])
        self.assertEqual('ABCDEF', d['private_key_id'])
        os.remove(temp_credential_file)

    @mock.patch('os.path.isdir', return_value=False)
    def test_save_well_known_file_with_non_existent_config_dir(self,
                                                               isdir_mock):
        credential_file = datafile(
            os.path.join('gcloud', client._WELL_KNOWN_CREDENTIALS_FILE))
        credentials = client._get_application_default_credential_from_file(
            credential_file)
        with self.assertRaises(OSError):
            client.save_to_well_known_file(credentials)
        config_dir = os.path.join(os.path.expanduser('~'), '.config', 'gcloud')
        isdir_mock.assert_called_once_with(config_dir)

    def test_get_adc_from_file_authorized_user(self):
        credentials_file = datafile(os.path.join(
            'gcloud',
            'application_default_credentials_authorized_user.json'))
        credentials = client._get_application_default_credential_from_file(
            credentials_file)
        self.validate_google_credentials(credentials)

    def test_save_to_well_known_file_authorized_user(self):
        credentials_file = datafile(os.path.join(
            'gcloud',
            'application_default_credentials_authorized_user.json'))
        credentials = client._get_application_default_credential_from_file(
            credentials_file)
        temp_credential_file = datafile(
            os.path.join('gcloud',
                         'temp_well_known_file_authorized_user.json'))
        client.save_to_well_known_file(credentials, temp_credential_file)
        with open(temp_credential_file) as f:
            d = json.load(f)
        self.assertEqual('authorized_user', d['type'])
        self.assertEqual('123', d['client_id'])
        self.assertEqual('secret', d['client_secret'])
        self.assertEqual('alabalaportocala', d['refresh_token'])
        os.remove(temp_credential_file)

    def test_get_application_default_credential_from_malformed_file_1(self):
        credentials_file = datafile(
            os.path.join('gcloud',
                         'application_default_credentials_malformed_1.json'))
        expected_err_msg = (
            "'type' field should be defined \(and have one of the '{0}' or "
            "'{1}' values\)".format(client.AUTHORIZED_USER,
                                    client.SERVICE_ACCOUNT))

        with self.assertRaisesRegexp(client.ApplicationDefaultCredentialsError,
                                     expected_err_msg):
            client._get_application_default_credential_from_file(
                credentials_file)

    def test_get_application_default_credential_from_malformed_file_2(self):
        credentials_file = datafile(
            os.path.join('gcloud',
                         'application_default_credentials_malformed_2.json'))
        expected_err_msg = (
            'The following field\(s\) must be defined: private_key_id')
        with self.assertRaisesRegexp(client.ApplicationDefaultCredentialsError,
                                     expected_err_msg):
            client._get_application_default_credential_from_file(
                credentials_file)

    def test_get_application_default_credential_from_malformed_file_3(self):
        credentials_file = datafile(
            os.path.join('gcloud',
                         'application_default_credentials_malformed_3.json'))
        with self.assertRaises(ValueError):
            client._get_application_default_credential_from_file(
                credentials_file)

    def test_raise_exception_for_missing_fields(self):
        missing_fields = ['first', 'second', 'third']
        expected_err_msg = ('The following field\(s\) must be defined: ' +
                            ', '.join(missing_fields))
        with self.assertRaisesRegexp(client.ApplicationDefaultCredentialsError,
                                     expected_err_msg):
            client._raise_exception_for_missing_fields(missing_fields)

    def test_raise_exception_for_reading_json(self):
        credential_file = 'any_file'
        extra_help = ' be good'
        error = client.ApplicationDefaultCredentialsError('stuff happens')
        expected_err_msg = ('An error was encountered while reading '
                            'json file: ' + credential_file +
                            extra_help + ': ' + str(error))
        with self.assertRaisesRegexp(client.ApplicationDefaultCredentialsError,
                                     expected_err_msg):
            client._raise_exception_for_reading_json(
                credential_file, extra_help, error)

    @mock.patch('oauth2client.client._in_gce_environment')
    @mock.patch('oauth2client.client._in_gae_environment', return_value=False)
    @mock.patch('oauth2client.client._get_environment_variable_file')
    @mock.patch('oauth2client.client._get_well_known_file')
    def test_get_adc_from_env_var_service_account(self, *stubs):
        # Set up stubs.
        get_well_known, get_env_file, in_gae, in_gce = stubs
        get_env_file.return_value = datafile(
            os.path.join('gcloud', client._WELL_KNOWN_CREDENTIALS_FILE))

        credentials = client.GoogleCredentials.get_application_default()
        self.validate_service_account_credentials(credentials)

        get_env_file.assert_called_once_with()
        get_well_known.assert_not_called()
        in_gae.assert_not_called()
        in_gce.assert_not_called()

    def test_env_name(self):
        self.assertEqual(None, client.SETTINGS.env_name)
        self.test_get_adc_from_env_var_service_account()
        self.assertEqual(client.DEFAULT_ENV_NAME, client.SETTINGS.env_name)

    @mock.patch('oauth2client.client._in_gce_environment')
    @mock.patch('oauth2client.client._in_gae_environment', return_value=False)
    @mock.patch('oauth2client.client._get_environment_variable_file')
    @mock.patch('oauth2client.client._get_well_known_file')
    def test_get_adc_from_env_var_authorized_user(self, *stubs):
        # Set up stubs.
        get_well_known, get_env_file, in_gae, in_gce = stubs
        get_env_file.return_value = datafile(os.path.join(
            'gcloud',
            'application_default_credentials_authorized_user.json'))

        credentials = client.GoogleCredentials.get_application_default()
        self.validate_google_credentials(credentials)

        get_env_file.assert_called_once_with()
        get_well_known.assert_not_called()
        in_gae.assert_not_called()
        in_gce.assert_not_called()

    @mock.patch('oauth2client.client._in_gce_environment')
    @mock.patch('oauth2client.client._in_gae_environment', return_value=False)
    @mock.patch('oauth2client.client._get_environment_variable_file')
    @mock.patch('oauth2client.client._get_well_known_file')
    def test_get_adc_from_env_var_malformed_file(self, *stubs):
        # Set up stubs.
        get_well_known, get_env_file, in_gae, in_gce = stubs
        get_env_file.return_value = datafile(
            os.path.join('gcloud',
                         'application_default_credentials_malformed_3.json'))

        expected_err = client.ApplicationDefaultCredentialsError
        with self.assertRaises(expected_err) as exc_manager:
            client.GoogleCredentials.get_application_default()

        self.assertTrue(str(exc_manager.exception).startswith(
            'An error was encountered while reading json file: ' +
            get_env_file.return_value + ' (pointed to by ' +
            client.GOOGLE_APPLICATION_CREDENTIALS + ' environment variable):'))

        get_env_file.assert_called_once_with()
        get_well_known.assert_not_called()
        in_gae.assert_not_called()
        in_gce.assert_not_called()

    @mock.patch('oauth2client.client._in_gce_environment', return_value=False)
    @mock.patch('oauth2client.client._in_gae_environment', return_value=False)
    @mock.patch('oauth2client.client._get_environment_variable_file',
                return_value=None)
    @mock.patch('oauth2client.client._get_well_known_file',
                return_value='BOGUS_FILE')
    def test_get_adc_env_not_set_up(self, *stubs):
        # Unpack stubs.
        get_well_known, get_env_file, in_gae, in_gce = stubs
        # Make sure the well-known file actually doesn't exist.
        self.assertFalse(os.path.exists(get_well_known.return_value))

        expected_err = client.ApplicationDefaultCredentialsError
        with self.assertRaises(expected_err) as exc_manager:
            client.GoogleCredentials.get_application_default()

        self.assertEqual(client.ADC_HELP_MSG, str(exc_manager.exception))
        get_env_file.assert_called_once_with()
        get_well_known.assert_called_once_with()
        in_gae.assert_called_once_with()
        in_gce.assert_called_once_with()

    @mock.patch('oauth2client.client._in_gce_environment', return_value=False)
    @mock.patch('oauth2client.client._in_gae_environment', return_value=False)
    @mock.patch('oauth2client.client._get_environment_variable_file',
                return_value=None)
    @mock.patch('oauth2client.client._get_well_known_file')
    def test_get_adc_env_from_well_known(self, *stubs):
        # Unpack stubs.
        get_well_known, get_env_file, in_gae, in_gce = stubs
        # Make sure the well-known file is an actual file.
        get_well_known.return_value = __file__
        # Make sure the well-known file actually doesn't exist.
        self.assertTrue(os.path.exists(get_well_known.return_value))

        method_name = \
            'oauth2client.client._get_application_default_credential_from_file'
        result_creds = object()
        with mock.patch(method_name,
                        return_value=result_creds) as get_from_file:
            result = client.GoogleCredentials.get_application_default()
            self.assertEqual(result, result_creds)
            get_from_file.assert_called_once_with(__file__)

        get_env_file.assert_called_once_with()
        get_well_known.assert_called_once_with()
        in_gae.assert_not_called()
        in_gce.assert_not_called()

    def test_from_stream_service_account(self):
        credentials_file = datafile(
            os.path.join('gcloud', client._WELL_KNOWN_CREDENTIALS_FILE))
        credentials = self.get_a_google_credentials_object().from_stream(
            credentials_file)
        self.validate_service_account_credentials(credentials)

    def test_from_stream_authorized_user(self):
        credentials_file = datafile(os.path.join(
            'gcloud',
            'application_default_credentials_authorized_user.json'))
        credentials = self.get_a_google_credentials_object().from_stream(
            credentials_file)
        self.validate_google_credentials(credentials)

    def test_from_stream_missing_file(self):
        credentials_filename = None
        expected_err_msg = (r'The parameter passed to the from_stream\(\) '
                            r'method should point to a file.')
        with self.assertRaisesRegexp(client.ApplicationDefaultCredentialsError,
                                     expected_err_msg):
            self.get_a_google_credentials_object().from_stream(
                credentials_filename)

    def test_from_stream_malformed_file_1(self):
        credentials_file = datafile(
            os.path.join('gcloud',
                         'application_default_credentials_malformed_1.json'))
        expected_err_msg = (
            'An error was encountered while reading json file: ' +
            credentials_file +
            ' \(provided as parameter to the from_stream\(\) method\): ' +
            "'type' field should be defined \(and have one of the '" +
            client.AUTHORIZED_USER + "' or '" + client.SERVICE_ACCOUNT +
            "' values\)")
        with self.assertRaisesRegexp(client.ApplicationDefaultCredentialsError,
                                     expected_err_msg):
            self.get_a_google_credentials_object().from_stream(
                credentials_file)

    def test_from_stream_malformed_file_2(self):
        credentials_file = datafile(
            os.path.join('gcloud',
                         'application_default_credentials_malformed_2.json'))
        expected_err_msg = (
            'An error was encountered while reading json file: ' +
            credentials_file +
            ' \(provided as parameter to the from_stream\(\) method\): '
            'The following field\(s\) must be defined: '
            'private_key_id')
        with self.assertRaisesRegexp(client.ApplicationDefaultCredentialsError,
                                     expected_err_msg):
            self.get_a_google_credentials_object().from_stream(
                credentials_file)

    def test_from_stream_malformed_file_3(self):
        credentials_file = datafile(
            os.path.join('gcloud',
                         'application_default_credentials_malformed_3.json'))
        with self.assertRaises(client.ApplicationDefaultCredentialsError):
            self.get_a_google_credentials_object().from_stream(
                credentials_file)

    def test_to_from_json_authorized_user(self):
        filename = 'application_default_credentials_authorized_user.json'
        credentials_file = datafile(os.path.join('gcloud', filename))
        creds = client.GoogleCredentials.from_stream(credentials_file)
        json = creds.to_json()
        creds2 = client.GoogleCredentials.from_json(json)

        self.assertEqual(creds.__dict__, creds2.__dict__)

    def test_to_from_json_service_account(self):
        credentials_file = datafile(
            os.path.join('gcloud', client._WELL_KNOWN_CREDENTIALS_FILE))
        creds1 = client.GoogleCredentials.from_stream(credentials_file)
        # Convert to and then back from json.
        creds2 = client.GoogleCredentials.from_json(creds1.to_json())

        creds1_vals = creds1.__dict__
        creds1_vals.pop('_signer')
        creds2_vals = creds2.__dict__
        creds2_vals.pop('_signer')
        self.assertEqual(creds1_vals, creds2_vals)

    def test_to_from_json_service_account_scoped(self):
        credentials_file = datafile(
            os.path.join('gcloud', client._WELL_KNOWN_CREDENTIALS_FILE))
        creds1 = client.GoogleCredentials.from_stream(credentials_file)
        creds1 = creds1.create_scoped(['dummy_scope'])
        # Convert to and then back from json.
        creds2 = client.GoogleCredentials.from_json(creds1.to_json())

        creds1_vals = creds1.__dict__
        creds1_vals.pop('_signer')
        creds2_vals = creds2.__dict__
        creds2_vals.pop('_signer')
        self.assertEqual(creds1_vals, creds2_vals)

    def test_parse_expiry(self):
        dt = datetime.datetime(2016, 1, 1)
        parsed_expiry = client._parse_expiry(dt)
        self.assertEqual('2016-01-01T00:00:00Z', parsed_expiry)

    def test_bad_expiry(self):
        dt = object()
        parsed_expiry = client._parse_expiry(dt)
        self.assertEqual(None, parsed_expiry)


class DummyDeleteStorage(client.Storage):
    delete_called = False

    def locked_delete(self):
        self.delete_called = True


def _token_revoke_test_helper(testcase, status, revoke_raise,
                              valid_bool_value, token_attr):
    current_store = getattr(testcase.credentials, 'store', None)

    dummy_store = DummyDeleteStorage()
    testcase.credentials.set_store(dummy_store)

    actual_do_revoke = testcase.credentials._do_revoke
    testcase.token_from_revoke = None

    def do_revoke_stub(http_request, token):
        testcase.token_from_revoke = token
        return actual_do_revoke(http_request, token)
    testcase.credentials._do_revoke = do_revoke_stub

    http = HttpMock(headers={'status': status})
    if revoke_raise:
        testcase.assertRaises(client.TokenRevokeError,
                              testcase.credentials.revoke, http)
    else:
        testcase.credentials.revoke(http)

    testcase.assertEqual(getattr(testcase.credentials, token_attr),
                         testcase.token_from_revoke)
    testcase.assertEqual(valid_bool_value, testcase.credentials.invalid)
    testcase.assertEqual(valid_bool_value, dummy_store.delete_called)

    testcase.credentials.set_store(current_store)


class BasicCredentialsTests(unittest2.TestCase):

    def setUp(self):
        access_token = 'foo'
        client_id = 'some_client_id'
        client_secret = 'cOuDdkfjxxnv+'
        refresh_token = '1/0/a.df219fjls0'
        token_expiry = datetime.datetime.utcnow()
        user_agent = 'refresh_checker/1.0'
        self.credentials = client.OAuth2Credentials(
            access_token, client_id, client_secret,
            refresh_token, token_expiry, oauth2client.GOOGLE_TOKEN_URI,
            user_agent, revoke_uri=oauth2client.GOOGLE_REVOKE_URI,
            scopes='foo', token_info_uri=oauth2client.GOOGLE_TOKEN_INFO_URI)

        # Provoke a failure if @util.positional is not respected.
        self.old_positional_enforcement = (
            util.positional_parameters_enforcement)
        util.positional_parameters_enforcement = (
            util.POSITIONAL_EXCEPTION)

    def tearDown(self):
        util.positional_parameters_enforcement = (
            self.old_positional_enforcement)

    def test_token_refresh_success(self):
        for status_code in client.REFRESH_STATUS_CODES:
            token_response = {'access_token': '1/3w', 'expires_in': 3600}
            http = HttpMockSequence([
                ({'status': status_code}, b''),
                ({'status': '200'}, json.dumps(token_response).encode(
                    'utf-8')),
                ({'status': '200'}, 'echo_request_headers'),
            ])
            http = self.credentials.authorize(http)
            resp, content = http.request('http://example.com')
            self.assertEqual(b'Bearer 1/3w', content[b'Authorization'])
            self.assertFalse(self.credentials.access_token_expired)
            self.assertEqual(token_response, self.credentials.token_response)

    def test_recursive_authorize(self):
        """Tests that OAuth2Credentials doesn't intro. new method constraints.

        Formerly, OAuth2Credentials.authorize monkeypatched the request method
        of its httplib2.Http argument with a wrapper annotated with
        @util.positional(1). Since the original method has no such annotation,
        that meant that the wrapper was violating the contract of the original
        method by adding a new requirement to it. And in fact the wrapper
        itself doesn't even respect that requirement. So before the removal of
        the annotation, this test would fail.
        """
        token_response = {'access_token': '1/3w', 'expires_in': 3600}
        encoded_response = json.dumps(token_response).encode('utf-8')
        http = HttpMockSequence([
            ({'status': '200'}, encoded_response),
        ])
        http = self.credentials.authorize(http)
        http = self.credentials.authorize(http)
        http.request('http://example.com')

    def test_token_refresh_failure(self):
        for status_code in client.REFRESH_STATUS_CODES:
            http = HttpMockSequence([
                ({'status': status_code}, b''),
                ({'status': http_client.BAD_REQUEST},
                 b'{"error":"access_denied"}'),
            ])
            http = self.credentials.authorize(http)
            with self.assertRaises(
                    client.HttpAccessTokenRefreshError) as exc_manager:
                http.request('http://example.com')
            self.assertEqual(http_client.BAD_REQUEST,
                             exc_manager.exception.status)
            self.assertTrue(self.credentials.access_token_expired)
            self.assertEqual(None, self.credentials.token_response)

    def test_token_revoke_success(self):
        _token_revoke_test_helper(
            self, '200', revoke_raise=False,
            valid_bool_value=True, token_attr='refresh_token')

    def test_token_revoke_failure(self):
        _token_revoke_test_helper(
            self, '400', revoke_raise=True,
            valid_bool_value=False, token_attr='refresh_token')

    def test_token_revoke_fallback(self):
        original_credentials = self.credentials.to_json()
        self.credentials.refresh_token = None
        _token_revoke_test_helper(
            self, '200', revoke_raise=False,
            valid_bool_value=True, token_attr='access_token')
        self.credentials = self.credentials.from_json(original_credentials)

    def test_non_401_error_response(self):
        http = HttpMockSequence([
            ({'status': '400'}, b''),
        ])
        http = self.credentials.authorize(http)
        resp, content = http.request('http://example.com')
        self.assertEqual(http_client.BAD_REQUEST, resp.status)
        self.assertEqual(None, self.credentials.token_response)

    def test_to_from_json(self):
        json = self.credentials.to_json()
        instance = client.OAuth2Credentials.from_json(json)
        self.assertEqual(client.OAuth2Credentials, type(instance))
        instance.token_expiry = None
        self.credentials.token_expiry = None

        self.assertEqual(instance.__dict__, self.credentials.__dict__)

    def test_from_json_token_expiry(self):
        data = json.loads(self.credentials.to_json())
        data['token_expiry'] = None
        instance = client.OAuth2Credentials.from_json(json.dumps(data))
        self.assertIsInstance(instance, client.OAuth2Credentials)

    def test_from_json_bad_token_expiry(self):
        data = json.loads(self.credentials.to_json())
        data['token_expiry'] = 'foobar'
        instance = client.OAuth2Credentials.from_json(json.dumps(data))
        self.assertIsInstance(instance, client.OAuth2Credentials)

    def test_unicode_header_checks(self):
        access_token = u'foo'
        client_id = u'some_client_id'
        client_secret = u'cOuDdkfjxxnv+'
        refresh_token = u'1/0/a.df219fjls0'
        token_expiry = str(datetime.datetime.utcnow())
        token_uri = str(oauth2client.GOOGLE_TOKEN_URI)
        revoke_uri = str(oauth2client.GOOGLE_REVOKE_URI)
        user_agent = u'refresh_checker/1.0'
        credentials = client.OAuth2Credentials(
            access_token, client_id, client_secret, refresh_token,
            token_expiry, token_uri, user_agent, revoke_uri=revoke_uri)

        # First, test that we correctly encode basic objects, making sure
        # to include a bytes object. Note that oauth2client will normalize
        # everything to bytes, no matter what python version we're in.
        http = credentials.authorize(HttpMock())
        headers = {u'foo': 3, b'bar': True, 'baz': b'abc'}
        cleaned_headers = {b'foo': b'3', b'bar': b'True', b'baz': b'abc'}
        http.request(u'http://example.com', method=u'GET', headers=headers)
        for k, v in cleaned_headers.items():
            self.assertTrue(k in http.headers)
            self.assertEqual(v, http.headers[k])

        # Next, test that we do fail on unicode.
        unicode_str = six.unichr(40960) + 'abcd'
        with self.assertRaises(client.NonAsciiHeaderError):
            http.request(u'http://example.com', method=u'GET',
                         headers={u'foo': unicode_str})

    def test_no_unicode_in_request_params(self):
        access_token = u'foo'
        client_id = u'some_client_id'
        client_secret = u'cOuDdkfjxxnv+'
        refresh_token = u'1/0/a.df219fjls0'
        token_expiry = str(datetime.datetime.utcnow())
        token_uri = str(oauth2client.GOOGLE_TOKEN_URI)
        revoke_uri = str(oauth2client.GOOGLE_REVOKE_URI)
        user_agent = u'refresh_checker/1.0'
        credentials = client.OAuth2Credentials(
            access_token, client_id, client_secret, refresh_token,
            token_expiry, token_uri, user_agent, revoke_uri=revoke_uri)

        http = HttpMock()
        http = credentials.authorize(http)
        http.request(u'http://example.com', method=u'GET',
                     headers={u'foo': u'bar'})
        for k, v in six.iteritems(http.headers):
            self.assertIsInstance(k, six.binary_type)
            self.assertIsInstance(v, six.binary_type)

        # Test again with unicode strings that can't simply be converted
        # to ASCII.
        with self.assertRaises(client.NonAsciiHeaderError):
            http.request(
                u'http://example.com', method=u'GET',
                headers={u'foo': u'\N{COMET}'})

        self.credentials.token_response = 'foobar'
        instance = client.OAuth2Credentials.from_json(
            self.credentials.to_json())
        self.assertEqual('foobar', instance.token_response)

    def test__expires_in_no_expiry(self):
        credentials = client.OAuth2Credentials(None, None, None, None,
                                               None, None, None)
        self.assertIsNone(credentials.token_expiry)
        self.assertIsNone(credentials._expires_in())

    @mock.patch('oauth2client.client._UTCNOW')
    def test__expires_in_expired(self, utcnow):
        credentials = client.OAuth2Credentials(None, None, None, None,
                                               None, None, None)
        credentials.token_expiry = datetime.datetime.utcnow()
        now = credentials.token_expiry + datetime.timedelta(seconds=1)
        self.assertLess(credentials.token_expiry, now)
        utcnow.return_value = now
        self.assertEqual(credentials._expires_in(), 0)
        utcnow.assert_called_once_with()

    @mock.patch('oauth2client.client._UTCNOW')
    def test__expires_in_not_expired(self, utcnow):
        credentials = client.OAuth2Credentials(None, None, None, None,
                                               None, None, None)
        credentials.token_expiry = datetime.datetime.utcnow()
        seconds = 1234
        now = credentials.token_expiry - datetime.timedelta(seconds=seconds)
        self.assertLess(now, credentials.token_expiry)
        utcnow.return_value = now
        self.assertEqual(credentials._expires_in(), seconds)
        utcnow.assert_called_once_with()

    @mock.patch('oauth2client.client._UTCNOW')
    def test_get_access_token(self, utcnow):
        # Configure the patch.
        seconds = 11
        NOW = datetime.datetime(1992, 12, 31, second=seconds)
        utcnow.return_value = NOW

        lifetime = 2  # number of seconds in which the token expires
        EXPIRY_TIME = datetime.datetime(1992, 12, 31,
                                        second=seconds + lifetime)

        token1 = u'first_token'
        token_response_first = {
            'access_token': token1,
            'expires_in': lifetime,
        }
        token2 = u'second_token'
        token_response_second = {
            'access_token': token2,
            'expires_in': lifetime,
        }
        http = HttpMockSequence([
            ({'status': '200'}, json.dumps(token_response_first).encode(
                'utf-8')),
            ({'status': '200'}, json.dumps(token_response_second).encode(
                'utf-8')),
        ])

        # Use the current credentials but unset the expiry and
        # the access token.
        credentials = copy.deepcopy(self.credentials)
        credentials.access_token = None
        credentials.token_expiry = None

        # Get Access Token, First attempt.
        self.assertEqual(credentials.access_token, None)
        self.assertFalse(credentials.access_token_expired)
        self.assertEqual(credentials.token_expiry, None)
        token = credentials.get_access_token(http=http)
        self.assertEqual(credentials.token_expiry, EXPIRY_TIME)
        self.assertEqual(token1, token.access_token)
        self.assertEqual(lifetime, token.expires_in)
        self.assertEqual(token_response_first, credentials.token_response)
        # Two utcnow calls are expected:
        # - get_access_token() -> _do_refresh_request (setting expires in)
        # - get_access_token() -> _expires_in()
        expected_utcnow_calls = [mock.call()] * 2
        self.assertEqual(expected_utcnow_calls, utcnow.mock_calls)

        # Get Access Token, Second Attempt (not expired)
        self.assertEqual(credentials.access_token, token1)
        self.assertFalse(credentials.access_token_expired)
        token = credentials.get_access_token(http=http)
        # Make sure no refresh occurred since the token was not expired.
        self.assertEqual(token1, token.access_token)
        self.assertEqual(lifetime, token.expires_in)
        self.assertEqual(token_response_first, credentials.token_response)
        # Three more utcnow calls are expected:
        # - access_token_expired
        # - get_access_token() -> access_token_expired
        # - get_access_token -> _expires_in
        expected_utcnow_calls = [mock.call()] * (2 + 3)
        self.assertEqual(expected_utcnow_calls, utcnow.mock_calls)

        # Get Access Token, Third Attempt (force expiration)
        self.assertEqual(credentials.access_token, token1)
        credentials.token_expiry = NOW  # Manually force expiry.
        self.assertTrue(credentials.access_token_expired)
        token = credentials.get_access_token(http=http)
        # Make sure refresh occurred since the token was not expired.
        self.assertEqual(token2, token.access_token)
        self.assertEqual(lifetime, token.expires_in)
        self.assertFalse(credentials.access_token_expired)
        self.assertEqual(token_response_second,
                         credentials.token_response)
        # Five more utcnow calls are expected:
        # - access_token_expired
        # - get_access_token -> access_token_expired
        # - get_access_token -> _do_refresh_request
        # - get_access_token -> _expires_in
        # - access_token_expired
        expected_utcnow_calls = [mock.call()] * (2 + 3 + 5)
        self.assertEqual(expected_utcnow_calls, utcnow.mock_calls)

    @mock.patch.object(client.OAuth2Credentials, 'refresh')
    @mock.patch.object(client.OAuth2Credentials, '_expires_in',
                       return_value=1835)
    def test_get_access_token_without_http(self, expires_in, refresh_mock):
        credentials = client.OAuth2Credentials(None, None, None, None,
                                               None, None, None)
        # Make sure access_token_expired returns True
        credentials.invalid = True
        # Specify a token so we can use it in the response.
        credentials.access_token = 'ya29-s3kr3t'

        with mock.patch('httplib2.Http',
                        return_value=object) as http_kls:
            token_info = credentials.get_access_token()
            expires_in.assert_called_once_with()
            refresh_mock.assert_called_once_with(http_kls.return_value)

        self.assertIsInstance(token_info, client.AccessTokenInfo)
        self.assertEqual(token_info.access_token,
                         credentials.access_token)
        self.assertEqual(token_info.expires_in,
                         expires_in.return_value)

    @mock.patch.object(client.OAuth2Credentials, 'refresh')
    @mock.patch.object(client.OAuth2Credentials, '_expires_in',
                       return_value=1835)
    def test_get_access_token_with_http(self, expires_in, refresh_mock):
        credentials = client.OAuth2Credentials(None, None, None, None,
                                               None, None, None)
        # Make sure access_token_expired returns True
        credentials.invalid = True
        # Specify a token so we can use it in the response.
        credentials.access_token = 'ya29-s3kr3t'

        http_obj = object()
        token_info = credentials.get_access_token(http_obj)
        self.assertIsInstance(token_info, client.AccessTokenInfo)
        self.assertEqual(token_info.access_token,
                         credentials.access_token)
        self.assertEqual(token_info.expires_in,
                         expires_in.return_value)

        expires_in.assert_called_once_with()
        refresh_mock.assert_called_once_with(http_obj)

    @mock.patch.object(client.OAuth2Credentials,
                       '_generate_refresh_request_headers',
                       return_value=object())
    @mock.patch.object(client.OAuth2Credentials,
                       '_generate_refresh_request_body',
                       return_value=object())
    @mock.patch('oauth2client.client.logger')
    def _do_refresh_request_test_helper(self, response, content,
                                        error_msg, logger, gen_body,
                                        gen_headers, store=None):
        credentials = client.OAuth2Credentials(None, None, None, None,
                                               None, None, None)
        credentials.store = store
        http_request = mock.Mock()
        http_request.return_value = response, content

        with self.assertRaises(
                client.HttpAccessTokenRefreshError) as exc_manager:
            credentials._do_refresh_request(http_request)

        self.assertEqual(exc_manager.exception.args, (error_msg,))
        self.assertEqual(exc_manager.exception.status, response.status)
        http_request.assert_called_once_with(None, body=gen_body.return_value,
                                             headers=gen_headers.return_value,
                                             method='POST')

        call1 = mock.call('Refreshing access_token')
        failure_template = 'Failed to retrieve access token: %s'
        call2 = mock.call(failure_template, content)
        self.assertEqual(logger.info.mock_calls, [call1, call2])
        if store is not None:
            store.locked_put.assert_called_once_with(credentials)

    def test__do_refresh_request_non_json_failure(self):
        response = httplib2.Response({
            'status': int(http_client.BAD_REQUEST),
        })
        content = u'Bad request'
        error_msg = 'Invalid response {0}.'.format(int(response.status))
        self._do_refresh_request_test_helper(response, content, error_msg)

    def test__do_refresh_request_basic_failure(self):
        response = httplib2.Response({
            'status': int(http_client.INTERNAL_SERVER_ERROR),
        })
        content = u'{}'
        error_msg = 'Invalid response {0}.'.format(int(response.status))
        self._do_refresh_request_test_helper(response, content, error_msg)

    def test__do_refresh_request_failure_w_json_error(self):
        response = httplib2.Response({
            'status': http_client.BAD_GATEWAY,
        })
        error_msg = 'Hi I am an error not a bearer'
        content = json.dumps({'error': error_msg})
        self._do_refresh_request_test_helper(response, content, error_msg)

    def test__do_refresh_request_failure_w_json_error_and_store(self):
        response = httplib2.Response({
            'status': http_client.BAD_GATEWAY,
        })
        error_msg = 'Where are we going wearer?'
        content = json.dumps({'error': error_msg})
        store = mock.MagicMock()
        self._do_refresh_request_test_helper(response, content, error_msg,
                                             store=store)

    def test__do_refresh_request_failure_w_json_error_and_desc(self):
        response = httplib2.Response({
            'status': http_client.SERVICE_UNAVAILABLE,
        })
        base_error = 'Ruckus'
        error_desc = 'Can you describe the ruckus'
        content = json.dumps({
            'error': base_error,
            'error_description': error_desc,
        })
        error_msg = '{0}: {1}'.format(base_error, error_desc)
        self._do_refresh_request_test_helper(response, content, error_msg)

    @mock.patch('oauth2client.client.logger')
    def _do_revoke_test_helper(self, response, content,
                               error_msg, logger, store=None):
        credentials = client.OAuth2Credentials(
            None, None, None, None, None, None, None,
            revoke_uri=oauth2client.GOOGLE_REVOKE_URI)
        credentials.store = store
        http_request = mock.Mock()
        http_request.return_value = response, content
        token = u's3kr3tz'

        if response.status == http_client.OK:
            self.assertFalse(credentials.invalid)
            self.assertIsNone(credentials._do_revoke(http_request, token))
            self.assertTrue(credentials.invalid)
            if store is not None:
                store.delete.assert_called_once_with()
        else:
            self.assertFalse(credentials.invalid)
            with self.assertRaises(client.TokenRevokeError) as exc_manager:
                credentials._do_revoke(http_request, token)
            # Make sure invalid was not flipped on.
            self.assertFalse(credentials.invalid)
            self.assertEqual(exc_manager.exception.args, (error_msg,))
            if store is not None:
                store.delete.assert_not_called()

        revoke_uri = oauth2client.GOOGLE_REVOKE_URI + '?token=' + token
        http_request.assert_called_once_with(revoke_uri)

        logger.info.assert_called_once_with('Revoking token')

    def test__do_revoke_success(self):
        response = httplib2.Response({
            'status': http_client.OK,
        })
        self._do_revoke_test_helper(response, b'', None)

    def test__do_revoke_success_with_store(self):
        response = httplib2.Response({
            'status': http_client.OK,
        })
        store = mock.MagicMock()
        self._do_revoke_test_helper(response, b'', None, store=store)

    def test__do_revoke_non_json_failure(self):
        response = httplib2.Response({
            'status': http_client.BAD_REQUEST,
        })
        content = u'Bad request'
        error_msg = 'Invalid response {0}.'.format(response.status)
        self._do_revoke_test_helper(response, content, error_msg)

    def test__do_revoke_basic_failure(self):
        response = httplib2.Response({
            'status': http_client.INTERNAL_SERVER_ERROR,
        })
        content = u'{}'
        error_msg = 'Invalid response {0}.'.format(response.status)
        self._do_revoke_test_helper(response, content, error_msg)

    def test__do_revoke_failure_w_json_error(self):
        response = httplib2.Response({
            'status': http_client.BAD_GATEWAY,
        })
        error_msg = 'Hi I am an error not a bearer'
        content = json.dumps({'error': error_msg})
        self._do_revoke_test_helper(response, content, error_msg)

    def test__do_revoke_failure_w_json_error_and_store(self):
        response = httplib2.Response({
            'status': http_client.BAD_GATEWAY,
        })
        error_msg = 'Where are we going wearer?'
        content = json.dumps({'error': error_msg})
        store = mock.MagicMock()
        self._do_revoke_test_helper(response, content, error_msg,
                                    store=store)

    @mock.patch('oauth2client.client.logger')
    def _do_retrieve_scopes_test_helper(self, response, content,
                                        error_msg, logger, scopes=None):
        credentials = client.OAuth2Credentials(
            None, None, None, None, None, None, None,
            token_info_uri=oauth2client.GOOGLE_TOKEN_INFO_URI)
        http_request = mock.Mock()
        http_request.return_value = response, content
        token = u's3kr3tz'

        if response.status == http_client.OK:
            self.assertEqual(credentials.scopes, set())
            self.assertIsNone(
                credentials._do_retrieve_scopes(http_request, token))
            self.assertEqual(credentials.scopes, scopes)
        else:
            self.assertEqual(credentials.scopes, set())
            with self.assertRaises(client.Error) as exc_manager:
                credentials._do_retrieve_scopes(http_request, token)
            # Make sure scopes were not changed.
            self.assertEqual(credentials.scopes, set())
            self.assertEqual(exc_manager.exception.args, (error_msg,))

        token_uri = client._update_query_params(
            oauth2client.GOOGLE_TOKEN_INFO_URI,
            {'fields': 'scope', 'access_token': token})
        self.assertEqual(len(http_request.mock_calls), 1)
        scopes_call = http_request.mock_calls[0]
        call_args = scopes_call[1]
        self.assertEqual(len(call_args), 1)
        called_uri = call_args[0]
        assertUrisEqual(self, token_uri, called_uri)
        logger.info.assert_called_once_with('Refreshing scopes')

    def test__do_retrieve_scopes_success_bad_json(self):
        response = httplib2.Response({
            'status': http_client.OK,
        })
        invalid_json = b'{'
        with self.assertRaises(ValueError):
            self._do_retrieve_scopes_test_helper(response, invalid_json, None)

    def test__do_retrieve_scopes_success(self):
        response = httplib2.Response({
            'status': http_client.OK,
        })
        content = b'{"scope": "foo bar"}'
        self._do_retrieve_scopes_test_helper(response, content, None,
                                             scopes=set(['foo', 'bar']))

    def test__do_retrieve_scopes_non_json_failure(self):
        response = httplib2.Response({
            'status': http_client.BAD_REQUEST,
        })
        content = u'Bad request'
        error_msg = 'Invalid response {0}.'.format(response.status)
        self._do_retrieve_scopes_test_helper(response, content, error_msg)

    def test__do_retrieve_scopes_basic_failure(self):
        response = httplib2.Response({
            'status': http_client.INTERNAL_SERVER_ERROR,
        })
        content = u'{}'
        error_msg = 'Invalid response {0}.'.format(response.status)
        self._do_retrieve_scopes_test_helper(response, content, error_msg)

    def test__do_retrieve_scopes_failure_w_json_error(self):
        response = httplib2.Response({
            'status': http_client.BAD_GATEWAY,
        })
        error_msg = 'Error desc I sit at a desk'
        content = json.dumps({'error_description': error_msg})
        self._do_retrieve_scopes_test_helper(response, content, error_msg)

    def test_has_scopes(self):
        self.assertTrue(self.credentials.has_scopes('foo'))
        self.assertTrue(self.credentials.has_scopes(['foo']))
        self.assertFalse(self.credentials.has_scopes('bar'))
        self.assertFalse(self.credentials.has_scopes(['bar']))

        self.credentials.scopes = set(['foo', 'bar'])
        self.assertTrue(self.credentials.has_scopes('foo'))
        self.assertTrue(self.credentials.has_scopes('bar'))
        self.assertFalse(self.credentials.has_scopes('baz'))
        self.assertTrue(self.credentials.has_scopes(['foo', 'bar']))
        self.assertFalse(self.credentials.has_scopes(['foo', 'baz']))

        self.credentials.scopes = set([])
        self.assertFalse(self.credentials.has_scopes('foo'))

    def test_retrieve_scopes(self):
        info_response_first = {'scope': 'foo bar'}
        info_response_second = {'error_description': 'abcdef'}
        http = HttpMockSequence([
            ({'status': '200'}, json.dumps(info_response_first).encode(
                'utf-8')),
            ({'status': '400'}, json.dumps(info_response_second).encode(
                'utf-8')),
            ({'status': '500'}, b''),
        ])

        self.credentials.retrieve_scopes(http)
        self.assertEqual(set(['foo', 'bar']), self.credentials.scopes)

        with self.assertRaises(client.Error):
            self.credentials.retrieve_scopes(http)

        with self.assertRaises(client.Error):
            self.credentials.retrieve_scopes(http)

    def test_refresh_updates_id_token(self):
        for status_code in client.REFRESH_STATUS_CODES:
            body = {'foo': 'bar'}
            body_json = json.dumps(body).encode('ascii')
            payload = base64.urlsafe_b64encode(body_json).strip(b'=')
            jwt = b'stuff.' + payload + b'.signature'

            token_response = (b'{'
                              b'  "access_token":"1/3w",'
                              b'  "expires_in":3600,'
                              b'  "id_token": "' + jwt + b'"'
                              b'}')
            http = HttpMockSequence([
                ({'status': status_code}, b''),
                ({'status': '200'}, token_response),
                ({'status': '200'}, 'echo_request_headers'),
            ])
            http = self.credentials.authorize(http)
            resp, content = http.request('http://example.com')
            self.assertEqual(self.credentials.id_token, body)


class AccessTokenCredentialsTests(unittest2.TestCase):

    def setUp(self):
        access_token = 'foo'
        user_agent = 'refresh_checker/1.0'
        self.credentials = client.AccessTokenCredentials(
            access_token, user_agent,
            revoke_uri=oauth2client.GOOGLE_REVOKE_URI)

    def test_token_refresh_success(self):
        for status_code in client.REFRESH_STATUS_CODES:
            http = HttpMockSequence([
                ({'status': status_code}, b''),
            ])
            http = self.credentials.authorize(http)
            with self.assertRaises(client.AccessTokenCredentialsError):
                resp, content = http.request('http://example.com')

    def test_token_revoke_success(self):
        _token_revoke_test_helper(
            self, '200', revoke_raise=False,
            valid_bool_value=True, token_attr='access_token')

    def test_token_revoke_failure(self):
        _token_revoke_test_helper(
            self, '400', revoke_raise=True,
            valid_bool_value=False, token_attr='access_token')

    def test_non_401_error_response(self):
        http = HttpMockSequence([
            ({'status': '400'}, b''),
        ])
        http = self.credentials.authorize(http)
        resp, content = http.request('http://example.com')
        self.assertEqual(http_client.BAD_REQUEST, resp.status)

    def test_auth_header_sent(self):
        http = HttpMockSequence([
            ({'status': '200'}, 'echo_request_headers'),
        ])
        http = self.credentials.authorize(http)
        resp, content = http.request('http://example.com')
        self.assertEqual(b'Bearer foo', content[b'Authorization'])


class TestAssertionCredentials(unittest2.TestCase):
    assertion_text = 'This is the assertion'
    assertion_type = 'http://www.google.com/assertionType'

    class AssertionCredentialsTestImpl(client.AssertionCredentials):

        def _generate_assertion(self):
            return TestAssertionCredentials.assertion_text

    def setUp(self):
        user_agent = 'fun/2.0'
        self.credentials = self.AssertionCredentialsTestImpl(
            self.assertion_type, user_agent=user_agent)

    def test__generate_assertion_abstract(self):
        credentials = client.AssertionCredentials(None)
        with self.assertRaises(NotImplementedError):
            credentials._generate_assertion()

    def test_assertion_body(self):
        body = urllib.parse.parse_qs(
            self.credentials._generate_refresh_request_body())
        self.assertEqual(self.assertion_text, body['assertion'][0])
        self.assertEqual('urn:ietf:params:oauth:grant-type:jwt-bearer',
                         body['grant_type'][0])

    def test_assertion_refresh(self):
        http = HttpMockSequence([
            ({'status': '200'}, b'{"access_token":"1/3w"}'),
            ({'status': '200'}, 'echo_request_headers'),
        ])
        http = self.credentials.authorize(http)
        resp, content = http.request('http://example.com')
        self.assertEqual(b'Bearer 1/3w', content[b'Authorization'])

    def test_token_revoke_success(self):
        _token_revoke_test_helper(
            self, '200', revoke_raise=False,
            valid_bool_value=True, token_attr='access_token')

    def test_token_revoke_failure(self):
        _token_revoke_test_helper(
            self, '400', revoke_raise=True,
            valid_bool_value=False, token_attr='access_token')

    def test_sign_blob_abstract(self):
        credentials = client.AssertionCredentials(None)
        with self.assertRaises(NotImplementedError):
            credentials.sign_blob(b'blob')


class UpdateQueryParamsTest(unittest2.TestCase):
    def test_update_query_params_no_params(self):
        uri = 'http://www.google.com'
        updated = client._update_query_params(uri, {'a': 'b'})
        self.assertEqual(updated, uri + '?a=b')

    def test_update_query_params_existing_params(self):
        uri = 'http://www.google.com?x=y'
        updated = client._update_query_params(uri, {'a': 'b', 'c': 'd&'})
        hardcoded_update = uri + '&a=b&c=d%26'
        assertUrisEqual(self, updated, hardcoded_update)


class ExtractIdTokenTest(unittest2.TestCase):
    """Tests client._extract_id_token()."""

    def test_extract_success(self):
        body = {'foo': 'bar'}
        body_json = json.dumps(body).encode('ascii')
        payload = base64.urlsafe_b64encode(body_json).strip(b'=')
        jwt = b'stuff.' + payload + b'.signature'

        extracted = client._extract_id_token(jwt)
        self.assertEqual(extracted, body)

    def test_extract_failure(self):
        body = {'foo': 'bar'}
        body_json = json.dumps(body).encode('ascii')
        payload = base64.urlsafe_b64encode(body_json).strip(b'=')
        jwt = b'stuff.' + payload
        with self.assertRaises(client.VerifyJwtTokenError):
            client._extract_id_token(jwt)


class OAuth2WebServerFlowTest(unittest2.TestCase):

    def setUp(self):
        self.flow = client.OAuth2WebServerFlow(
            client_id='client_id+1',
            client_secret='secret+1',
            scope='foo',
            redirect_uri=client.OOB_CALLBACK_URN,
            user_agent='unittest-sample/1.0',
            revoke_uri='dummy_revoke_uri',
        )

    def test_construct_authorize_url(self):
        authorize_url = self.flow.step1_get_authorize_url(state='state+1')

        parsed = urllib.parse.urlparse(authorize_url)
        q = urllib.parse.parse_qs(parsed[4])
        self.assertEqual('client_id+1', q['client_id'][0])
        self.assertEqual('code', q['response_type'][0])
        self.assertEqual('foo', q['scope'][0])
        self.assertEqual(client.OOB_CALLBACK_URN, q['redirect_uri'][0])
        self.assertEqual('offline', q['access_type'][0])
        self.assertEqual('state+1', q['state'][0])

    def test_override_flow_via_kwargs(self):
        """Passing kwargs to override defaults."""
        flow = client.OAuth2WebServerFlow(
            client_id='client_id+1',
            client_secret='secret+1',
            scope='foo',
            redirect_uri=client.OOB_CALLBACK_URN,
            user_agent='unittest-sample/1.0',
            access_type='online',
            response_type='token'
        )
        authorize_url = flow.step1_get_authorize_url()

        parsed = urllib.parse.urlparse(authorize_url)
        q = urllib.parse.parse_qs(parsed[4])
        self.assertEqual('client_id+1', q['client_id'][0])
        self.assertEqual('token', q['response_type'][0])
        self.assertEqual('foo', q['scope'][0])
        self.assertEqual(client.OOB_CALLBACK_URN, q['redirect_uri'][0])
        self.assertEqual('online', q['access_type'][0])

    def test__oauth2_web_server_flow_params(self):
        params = client._oauth2_web_server_flow_params({})
        self.assertEqual(params['access_type'], 'offline')
        self.assertEqual(params['response_type'], 'code')

        params = client._oauth2_web_server_flow_params({
            'approval_prompt': 'force'})
        self.assertEqual(params['prompt'], 'consent')
        self.assertNotIn('approval_prompt', params)

        params = client._oauth2_web_server_flow_params({
            'approval_prompt': 'other'})
        self.assertEqual(params['approval_prompt'], 'other')

    @mock.patch('oauth2client.client.logger')
    def test_step1_get_authorize_url_redirect_override(self, logger):
        flow = client.OAuth2WebServerFlow('client_id+1', scope='foo',
                                          redirect_uri=client.OOB_CALLBACK_URN)
        alt_redirect = 'foo:bar'
        self.assertEqual(flow.redirect_uri, client.OOB_CALLBACK_URN)
        result = flow.step1_get_authorize_url(redirect_uri=alt_redirect)
        # Make sure the redirect value was updated.
        self.assertEqual(flow.redirect_uri, alt_redirect)
        query_params = {
            'client_id': flow.client_id,
            'redirect_uri': alt_redirect,
            'scope': flow.scope,
            'access_type': 'offline',
            'response_type': 'code',
        }
        expected = client._update_query_params(flow.auth_uri, query_params)
        assertUrisEqual(self, expected, result)
        # Check stubs.
        self.assertEqual(logger.warning.call_count, 1)

    def test_step1_get_authorize_url_without_redirect(self):
        flow = client.OAuth2WebServerFlow('client_id+1', scope='foo',
                                          redirect_uri=None)
        with self.assertRaises(ValueError):
            flow.step1_get_authorize_url(redirect_uri=None)

    def test_step1_get_authorize_url_without_login_hint(self):
        login_hint = 'There are wascally wabbits nearby'
        flow = client.OAuth2WebServerFlow('client_id+1', scope='foo',
                                          redirect_uri=client.OOB_CALLBACK_URN,
                                          login_hint=login_hint)
        result = flow.step1_get_authorize_url()
        query_params = {
            'client_id': flow.client_id,
            'login_hint': login_hint,
            'redirect_uri': client.OOB_CALLBACK_URN,
            'scope': flow.scope,
            'access_type': 'offline',
            'response_type': 'code',
        }
        expected = client._update_query_params(flow.auth_uri, query_params)
        assertUrisEqual(self, expected, result)

    def test_step1_get_device_and_user_codes_wo_device_uri(self):
        flow = client.OAuth2WebServerFlow('CID', scope='foo', device_uri=None)
        with self.assertRaises(ValueError):
            flow.step1_get_device_and_user_codes()

    def _step1_get_device_and_user_codes_helper(
            self, extra_headers=None, user_agent=None, default_http=False,
            content=None):
        flow = client.OAuth2WebServerFlow('CID', scope='foo',
                                          user_agent=user_agent)
        device_code = 'bfc06756-062e-430f-9f0f-460ca44724e5'
        user_code = '5faf2780-fc83-11e5-9bc2-00c2c63e5792'
        ver_url = 'http://foo.bar'
        if content is None:
            content = json.dumps({
                'device_code': device_code,
                'user_code': user_code,
                'verification_url': ver_url,
            })
        http = HttpMockSequence([
            ({'status': http_client.OK}, content),
        ])
        if default_http:
            with mock.patch('httplib2.Http', return_value=http):
                result = flow.step1_get_device_and_user_codes()
        else:
            result = flow.step1_get_device_and_user_codes(http=http)

        expected = client.DeviceFlowInfo(
            device_code, user_code, None, ver_url, None)
        self.assertEqual(result, expected)
        self.assertEqual(len(http.requests), 1)
        self.assertEqual(
            http.requests[0]['uri'], oauth2client.GOOGLE_DEVICE_URI)
        body = http.requests[0]['body']
        self.assertEqual(urllib.parse.parse_qs(body),
                         {'client_id': [flow.client_id],
                          'scope': [flow.scope]})
        headers = {'content-type': 'application/x-www-form-urlencoded'}
        if extra_headers is not None:
            headers.update(extra_headers)
        self.assertEqual(http.requests[0]['headers'], headers)

    def test_step1_get_device_and_user_codes(self):
        self._step1_get_device_and_user_codes_helper()

    def test_step1_get_device_and_user_codes_w_user_agent(self):
        user_agent = 'spiderman'
        extra_headers = {'user-agent': user_agent}
        self._step1_get_device_and_user_codes_helper(
            user_agent=user_agent, extra_headers=extra_headers)

    def test_step1_get_device_and_user_codes_w_default_http(self):
        self._step1_get_device_and_user_codes_helper(default_http=True)

    def test_step1_get_device_and_user_codes_bad_payload(self):
        non_json_content = b'{'
        with self.assertRaises(client.OAuth2DeviceCodeError):
            self._step1_get_device_and_user_codes_helper(
                content=non_json_content)

    def _step1_get_device_and_user_codes_fail_helper(self, status,
                                                     content, error_msg):
        flow = client.OAuth2WebServerFlow('CID', scope='foo')
        http = HttpMockSequence([
            ({'status': status}, content),
        ])
        with self.assertRaises(client.OAuth2DeviceCodeError) as exc_manager:
            flow.step1_get_device_and_user_codes(http=http)

        self.assertEqual(exc_manager.exception.args, (error_msg,))

    def test_step1_get_device_and_user_codes_non_json_failure(self):
        status = int(http_client.BAD_REQUEST)
        content = 'Nope not JSON.'
        error_msg = 'Invalid response {0}.'.format(status)
        self._step1_get_device_and_user_codes_fail_helper(status, content,
                                                          error_msg)

    def test_step1_get_device_and_user_codes_basic_failure(self):
        status = int(http_client.INTERNAL_SERVER_ERROR)
        content = b'{}'
        error_msg = 'Invalid response {0}.'.format(status)
        self._step1_get_device_and_user_codes_fail_helper(status, content,
                                                          error_msg)

    def test_step1_get_device_and_user_codes_failure_w_json_error(self):
        status = int(http_client.BAD_GATEWAY)
        base_error = 'ZOMG user codes failure.'
        content = json.dumps({'error': base_error})
        error_msg = 'Invalid response {0}. Error: {1}'.format(status,
                                                              base_error)
        self._step1_get_device_and_user_codes_fail_helper(status, content,
                                                          error_msg)

    def test_step2_exchange_no_input(self):
        flow = client.OAuth2WebServerFlow('client_id+1', scope='foo')
        with self.assertRaises(ValueError):
            flow.step2_exchange()

    def test_step2_exchange_code_and_device_flow(self):
        flow = client.OAuth2WebServerFlow('client_id+1', scope='foo')
        with self.assertRaises(ValueError):
            flow.step2_exchange(code='code', device_flow_info='dfi')

    def test_scope_is_required(self):
        with self.assertRaises(TypeError):
            client.OAuth2WebServerFlow('client_id+1')

    def test_exchange_failure(self):
        http = HttpMockSequence([
            ({'status': '400'}, b'{"error":"invalid_request"}'),
        ])

        with self.assertRaises(client.FlowExchangeError):
            self.flow.step2_exchange(code='some random code', http=http)

    def test_urlencoded_exchange_failure(self):
        http = HttpMockSequence([
            ({'status': '400'}, b'error=invalid_request'),
        ])

        with self.assertRaisesRegexp(client.FlowExchangeError,
                                     'invalid_request'):
            self.flow.step2_exchange(code='some random code', http=http)

    def test_exchange_failure_with_json_error(self):
        # Some providers have 'error' attribute as a JSON object
        # in place of regular string.
        # This test makes sure no strange object-to-string coversion
        # exceptions are being raised instead of FlowExchangeError.
        payload = (b'{'
                   b'  "error": {'
                   b'    "message": "Error validating verification code.",'
                   b'    "type": "OAuthException"'
                   b'  }'
                   b'}')
        http = HttpMockSequence([({'status': '400'}, payload)])

        with self.assertRaises(client.FlowExchangeError):
            self.flow.step2_exchange(code='some random code', http=http)

    def _exchange_success_test_helper(self, code=None, device_flow_info=None):
        payload = (b'{'
                   b'  "access_token":"SlAV32hkKG",'
                   b'  "expires_in":3600,'
                   b'  "refresh_token":"8xLOxBtZp8"'
                   b'}')
        http = HttpMockSequence([({'status': '200'}, payload)])
        credentials = self.flow.step2_exchange(
            code=code, device_flow_info=device_flow_info, http=http)
        self.assertEqual('SlAV32hkKG', credentials.access_token)
        self.assertNotEqual(None, credentials.token_expiry)
        self.assertEqual('8xLOxBtZp8', credentials.refresh_token)
        self.assertEqual('dummy_revoke_uri', credentials.revoke_uri)
        self.assertEqual(set(['foo']), credentials.scopes)

    def test_exchange_success(self):
        self._exchange_success_test_helper(code='some random code')

    def test_exchange_success_with_device_flow_info(self):
        device_flow_info = client.DeviceFlowInfo(
            'some random code', None, None, None, None)
        self._exchange_success_test_helper(device_flow_info=device_flow_info)

    def test_exchange_success_binary_code(self):
        binary_code = b'some random code'
        access_token = 'SlAV32hkKG'
        expires_in = '3600'
        refresh_token = '8xLOxBtZp8'
        revoke_uri = 'dummy_revoke_uri'

        payload = ('{'
                   '  "access_token":"' + access_token + '",'
                   '  "expires_in":' + expires_in + ','
                   '  "refresh_token":"' + refresh_token + '"'
                   '}')
        http = HttpMockSequence(
            [({'status': '200'}, _helpers._to_bytes(payload))])
        credentials = self.flow.step2_exchange(code=binary_code, http=http)
        self.assertEqual(access_token, credentials.access_token)
        self.assertIsNotNone(credentials.token_expiry)
        self.assertEqual(refresh_token, credentials.refresh_token)
        self.assertEqual(revoke_uri, credentials.revoke_uri)
        self.assertEqual(set(['foo']), credentials.scopes)

    def test_exchange_dictlike(self):
        class FakeDict(object):
            def __init__(self, d):
                self.d = d

            def __getitem__(self, name):
                return self.d[name]

            def __contains__(self, name):
                return name in self.d

        code = 'some random code'
        not_a_dict = FakeDict({'code': code})
        payload = (b'{'
                   b'  "access_token":"SlAV32hkKG",'
                   b'  "expires_in":3600,'
                   b'  "refresh_token":"8xLOxBtZp8"'
                   b'}')
        http = HttpMockSequence([({'status': '200'}, payload)])

        credentials = self.flow.step2_exchange(code=not_a_dict, http=http)
        self.assertEqual('SlAV32hkKG', credentials.access_token)
        self.assertNotEqual(None, credentials.token_expiry)
        self.assertEqual('8xLOxBtZp8', credentials.refresh_token)
        self.assertEqual('dummy_revoke_uri', credentials.revoke_uri)
        self.assertEqual(set(['foo']), credentials.scopes)
        request_code = urllib.parse.parse_qs(
            http.requests[0]['body'])['code'][0]
        self.assertEqual(code, request_code)

    def test_exchange_using_authorization_header(self):
        auth_header = 'Basic Y2xpZW50X2lkKzE6c2Vjexc_managerV0KzE=',
        flow = client.OAuth2WebServerFlow(
            client_id='client_id+1',
            authorization_header=auth_header,
            scope='foo',
            redirect_uri=client.OOB_CALLBACK_URN,
            user_agent='unittest-sample/1.0',
            revoke_uri='dummy_revoke_uri',
        )
        http = HttpMockSequence([
            ({'status': '200'}, b'access_token=SlAV32hkKG'),
        ])

        credentials = flow.step2_exchange(code='some random code', http=http)
        self.assertEqual('SlAV32hkKG', credentials.access_token)

        test_request = http.requests[0]
        # Did we pass the Authorization header?
        self.assertEqual(test_request['headers']['Authorization'], auth_header)
        # Did we omit client_secret from POST body?
        self.assertTrue('client_secret' not in test_request['body'])

    def test_urlencoded_exchange_success(self):
        http = HttpMockSequence([
            ({'status': '200'}, b'access_token=SlAV32hkKG&expires_in=3600'),
        ])

        credentials = self.flow.step2_exchange(code='some random code',
                                               http=http)
        self.assertEqual('SlAV32hkKG', credentials.access_token)
        self.assertNotEqual(None, credentials.token_expiry)

    def test_urlencoded_expires_param(self):
        http = HttpMockSequence([
            # Note the 'expires=3600' where you'd normally
            # have if named 'expires_in'
            ({'status': '200'}, b'access_token=SlAV32hkKG&expires=3600'),
        ])

        credentials = self.flow.step2_exchange(code='some random code',
                                               http=http)
        self.assertNotEqual(None, credentials.token_expiry)

    def test_exchange_no_expires_in(self):
        payload = (b'{'
                   b'  "access_token":"SlAV32hkKG",'
                   b'  "refresh_token":"8xLOxBtZp8"'
                   b'}')
        http = HttpMockSequence([({'status': '200'}, payload)])

        credentials = self.flow.step2_exchange(code='some random code',
                                               http=http)
        self.assertEqual(None, credentials.token_expiry)

    def test_urlencoded_exchange_no_expires_in(self):
        http = HttpMockSequence([
            # This might be redundant but just to make sure
            # urlencoded access_token gets parsed correctly
            ({'status': '200'}, b'access_token=SlAV32hkKG'),
        ])

        credentials = self.flow.step2_exchange(code='some random code',
                                               http=http)
        self.assertEqual(None, credentials.token_expiry)

    def test_exchange_fails_if_no_code(self):
        payload = (b'{'
                   b'  "access_token":"SlAV32hkKG",'
                   b'  "refresh_token":"8xLOxBtZp8"'
                   b'}')
        http = HttpMockSequence([({'status': '200'}, payload)])

        code = {'error': 'thou shall not pass'}
        with self.assertRaisesRegexp(
                client.FlowExchangeError, 'shall not pass'):
            self.flow.step2_exchange(code=code, http=http)

    def test_exchange_id_token_fail(self):
        payload = (b'{'
                   b'  "access_token":"SlAV32hkKG",'
                   b'  "refresh_token":"8xLOxBtZp8",'
                   b'  "id_token": "stuff.payload"'
                   b'}')
        http = HttpMockSequence([({'status': '200'}, payload)])

        with self.assertRaises(client.VerifyJwtTokenError):
            self.flow.step2_exchange(code='some random code', http=http)

    def test_exchange_id_token(self):
        body = {'foo': 'bar'}
        body_json = json.dumps(body).encode('ascii')
        payload = base64.urlsafe_b64encode(body_json).strip(b'=')
        jwt = (base64.urlsafe_b64encode(b'stuff') + b'.' + payload + b'.' +
               base64.urlsafe_b64encode(b'signature'))

        payload = (b'{'
                   b'  "access_token":"SlAV32hkKG",'
                   b'  "refresh_token":"8xLOxBtZp8",'
                   b'  "id_token": "' + jwt + b'"'
                   b'}')
        http = HttpMockSequence([({'status': '200'}, payload)])
        credentials = self.flow.step2_exchange(code='some random code',
                                               http=http)
        self.assertEqual(credentials.id_token, body)


class FlowFromCachedClientsecrets(unittest2.TestCase):

    def test_flow_from_clientsecrets_cached(self):
        cache_mock = CacheMock()
        load_and_cache('client_secrets.json', 'some_secrets', cache_mock)

        flow = client.flow_from_clientsecrets(
            'some_secrets', '', redirect_uri='oob', cache=cache_mock)
        self.assertEqual('foo_client_secret', flow.client_secret)

    @mock.patch('oauth2client.clientsecrets.loadfile')
    def _flow_from_clientsecrets_success_helper(self, loadfile_mock,
                                                device_uri=None,
                                                revoke_uri=None):
        client_type = clientsecrets.TYPE_WEB
        client_info = {
            'auth_uri': 'auth_uri',
            'token_uri': 'token_uri',
            'client_id': 'client_id',
            'client_secret': 'client_secret',
        }
        if revoke_uri is not None:
            client_info['revoke_uri'] = revoke_uri
        loadfile_mock.return_value = client_type, client_info
        filename = object()
        scope = ['baz']
        cache = object()

        if device_uri is not None:
            result = client.flow_from_clientsecrets(
                filename, scope, cache=cache, device_uri=device_uri)
            self.assertEqual(result.device_uri, device_uri)
        else:
            result = client.flow_from_clientsecrets(
                filename, scope, cache=cache)

        self.assertIsInstance(result, client.OAuth2WebServerFlow)
        loadfile_mock.assert_called_once_with(filename, cache=cache)

    def test_flow_from_clientsecrets_success(self):
        self._flow_from_clientsecrets_success_helper()

    def test_flow_from_clientsecrets_success_w_device_uri(self):
        device_uri = 'http://device.uri'
        self._flow_from_clientsecrets_success_helper(device_uri=device_uri)

    def test_flow_from_clientsecrets_success_w_revoke_uri(self):
        revoke_uri = 'http://revoke.uri'
        self._flow_from_clientsecrets_success_helper(revoke_uri=revoke_uri)

    @mock.patch('oauth2client.clientsecrets.loadfile',
                side_effect=clientsecrets.InvalidClientSecretsError)
    def test_flow_from_clientsecrets_invalid(self, loadfile_mock):
        filename = object()
        cache = object()
        with self.assertRaises(clientsecrets.InvalidClientSecretsError):
            client.flow_from_clientsecrets(
                filename, None, cache=cache, message=None)
        loadfile_mock.assert_called_once_with(filename, cache=cache)

    @mock.patch('oauth2client.clientsecrets.loadfile',
                side_effect=clientsecrets.InvalidClientSecretsError)
    @mock.patch('sys.exit')
    def test_flow_from_clientsecrets_invalid_w_msg(self, sys_exit,
                                                   loadfile_mock):
        filename = object()
        cache = object()
        message = 'hi mom'

        client.flow_from_clientsecrets(
            filename, None, cache=cache, message=message)
        sys_exit.assert_called_once_with(message)
        loadfile_mock.assert_called_once_with(filename, cache=cache)

    @mock.patch('oauth2client.clientsecrets.loadfile',
                side_effect=clientsecrets.InvalidClientSecretsError('foobar'))
    @mock.patch('sys.exit')
    def test_flow_from_clientsecrets_invalid_w_msg_and_text(self, sys_exit,
                                                            loadfile_mock):
        filename = object()
        cache = object()
        message = 'hi mom'
        expected = ('The client secrets were invalid: '
                    '\n{0}\n{1}'.format('foobar', 'hi mom'))

        client.flow_from_clientsecrets(
            filename, None, cache=cache, message=message)
        sys_exit.assert_called_once_with(expected)
        loadfile_mock.assert_called_once_with(filename, cache=cache)

    @mock.patch('oauth2client.clientsecrets.loadfile')
    def test_flow_from_clientsecrets_unknown_flow(self, loadfile_mock):
        client_type = 'UNKNOWN'
        loadfile_mock.return_value = client_type, None
        filename = object()
        cache = object()

        err_msg = ('This OAuth 2.0 flow is unsupported: '
                   '{0!r}'.format(client_type))
        with self.assertRaisesRegexp(client.UnknownClientSecretsFlowError,
                                     err_msg):
            client.flow_from_clientsecrets(filename, None, cache=cache)

        loadfile_mock.assert_called_once_with(filename, cache=cache)


class CredentialsFromCodeTests(unittest2.TestCase):

    def setUp(self):
        self.client_id = 'client_id_abc'
        self.client_secret = 'secret_use_code'
        self.scope = 'foo'
        self.code = '12345abcde'
        self.redirect_uri = 'postmessage'

    def test_exchange_code_for_token(self):
        token = 'asdfghjkl'
        payload = json.dumps({'access_token': token, 'expires_in': 3600})
        http = HttpMockSequence([
            ({'status': '200'}, payload.encode('utf-8')),
        ])
        credentials = client.credentials_from_code(
            self.client_id, self.client_secret, self.scope,
            self.code, http=http, redirect_uri=self.redirect_uri)
        self.assertEqual(credentials.access_token, token)
        self.assertNotEqual(None, credentials.token_expiry)
        self.assertEqual(set(['foo']), credentials.scopes)

    def test_exchange_code_for_token_fail(self):
        http = HttpMockSequence([
            ({'status': '400'}, b'{"error":"invalid_request"}'),
        ])

        with self.assertRaises(client.FlowExchangeError):
            client.credentials_from_code(
                self.client_id, self.client_secret, self.scope,
                self.code, http=http, redirect_uri=self.redirect_uri)

    def test_exchange_code_and_file_for_token(self):
        payload = (b'{'
                   b'  "access_token":"asdfghjkl",'
                   b'  "expires_in":3600'
                   b'}')
        http = HttpMockSequence([({'status': '200'}, payload)])
        credentials = client.credentials_from_clientsecrets_and_code(
            datafile('client_secrets.json'), self.scope,
            self.code, http=http)
        self.assertEqual(credentials.access_token, 'asdfghjkl')
        self.assertNotEqual(None, credentials.token_expiry)
        self.assertEqual(set(['foo']), credentials.scopes)

    def test_exchange_code_and_cached_file_for_token(self):
        http = HttpMockSequence([
            ({'status': '200'}, b'{ "access_token":"asdfghjkl"}'),
        ])
        cache_mock = CacheMock()
        load_and_cache('client_secrets.json', 'some_secrets', cache_mock)

        credentials = client.credentials_from_clientsecrets_and_code(
            'some_secrets', self.scope,
            self.code, http=http, cache=cache_mock)
        self.assertEqual(credentials.access_token, 'asdfghjkl')
        self.assertEqual(set(['foo']), credentials.scopes)

    def test_exchange_code_and_file_for_token_fail(self):
        http = HttpMockSequence([
            ({'status': '400'}, b'{"error":"invalid_request"}'),
        ])

        with self.assertRaises(client.FlowExchangeError):
            client.credentials_from_clientsecrets_and_code(
                datafile('client_secrets.json'), self.scope,
                self.code, http=http)


class Test__save_private_file(unittest2.TestCase):

    def _save_helper(self, filename):
        contents = []
        contents_str = '[]'
        client._save_private_file(filename, contents)
        with open(filename, 'r') as f:
            stored_contents = f.read()
        self.assertEqual(stored_contents, contents_str)

        stat_mode = os.stat(filename).st_mode
        # Octal 777, only last 3 positions matter for permissions mask.
        stat_mode &= 0o777
        self.assertEqual(stat_mode, 0o600)

    def test_new(self):
        filename = tempfile.mktemp()
        self.assertFalse(os.path.exists(filename))
        self._save_helper(filename)

    def test_existing(self):
        filename = tempfile.mktemp()
        with open(filename, 'w') as f:
            f.write('a bunch of nonsense longer than []')
        self.assertTrue(os.path.exists(filename))
        self._save_helper(filename)


class Test__get_application_default_credential_GAE(unittest2.TestCase):

    @mock.patch.dict('sys.modules', {
        'oauth2client.contrib.appengine': mock.Mock()})
    def test_it(self):
        gae_mod = sys.modules['oauth2client.contrib.appengine']
        gae_mod.AppAssertionCredentials = creds_kls = mock.Mock()
        creds_kls.return_value = object()
        credentials = client._get_application_default_credential_GAE()
        self.assertEqual(credentials, creds_kls.return_value)
        creds_kls.assert_called_once_with([])


class Test__get_application_default_credential_GCE(unittest2.TestCase):

    @mock.patch.dict('sys.modules', {
        'oauth2client.contrib.gce': mock.Mock()})
    def test_it(self):
        gce_mod = sys.modules['oauth2client.contrib.gce']
        gce_mod.AppAssertionCredentials = creds_kls = mock.Mock()
        creds_kls.return_value = object()
        credentials = client._get_application_default_credential_GCE()
        self.assertEqual(credentials, creds_kls.return_value)
        creds_kls.assert_called_once_with()


class Test__require_crypto_or_die(unittest2.TestCase):

    @mock.patch.object(client, 'HAS_CRYPTO', new=True)
    def test_with_crypto(self):
        self.assertIsNone(client._require_crypto_or_die())

    @mock.patch.object(client, 'HAS_CRYPTO', new=False)
    def test_without_crypto(self):
        with self.assertRaises(client.CryptoUnavailableError):
            client._require_crypto_or_die()


class TestDeviceFlowInfo(unittest2.TestCase):

    DEVICE_CODE = 'e80ff179-fd65-416c-9dbf-56a23e5d23e4'
    USER_CODE = '4bbd8b82-fc73-11e5-adf3-00c2c63e5792'
    VER_URL = 'http://foo.bar'

    def test_FromResponse(self):
        response = {
            'device_code': self.DEVICE_CODE,
            'user_code': self.USER_CODE,
            'verification_url': self.VER_URL,
        }
        result = client.DeviceFlowInfo.FromResponse(response)
        expected_result = client.DeviceFlowInfo(
            self.DEVICE_CODE, self.USER_CODE, None, self.VER_URL, None)
        self.assertEqual(result, expected_result)

    def test_FromResponse_fallback_to_uri(self):
        response = {
            'device_code': self.DEVICE_CODE,
            'user_code': self.USER_CODE,
            'verification_uri': self.VER_URL,
        }
        result = client.DeviceFlowInfo.FromResponse(response)
        expected_result = client.DeviceFlowInfo(
            self.DEVICE_CODE, self.USER_CODE, None, self.VER_URL, None)
        self.assertEqual(result, expected_result)

    def test_FromResponse_missing_url(self):
        response = {
            'device_code': self.DEVICE_CODE,
            'user_code': self.USER_CODE,
        }
        with self.assertRaises(client.OAuth2DeviceCodeError):
            client.DeviceFlowInfo.FromResponse(response)

    @mock.patch('oauth2client.client._UTCNOW')
    def test_FromResponse_with_expires_in(self, utcnow):
        expires_in = 23
        response = {
            'device_code': self.DEVICE_CODE,
            'user_code': self.USER_CODE,
            'verification_url': self.VER_URL,
            'expires_in': expires_in,
        }
        now = datetime.datetime(1999, 1, 1, 12, 30, 27)
        expire = datetime.datetime(1999, 1, 1, 12, 30, 27 + expires_in)
        utcnow.return_value = now

        result = client.DeviceFlowInfo.FromResponse(response)
        expected_result = client.DeviceFlowInfo(
            self.DEVICE_CODE, self.USER_CODE, None, self.VER_URL, expire)
        self.assertEqual(result, expected_result)