#!/usr/bin/env python
# Copyright 2012 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
import unittest
import datetime
import dnsproxy
import httparchive
import httpclient
import platformsettings
import script_injector
import test_utils
class RealHttpFetchTest(unittest.TestCase):
# Initialize test data
CONTENT_TYPE = 'content-type: image/x-icon'
COOKIE_1 = ('Set-Cookie: GMAIL_IMP=EXPIRED; '
'Expires=Thu, 12-Jul-2012 22:41:22 GMT; '
'Path=/mail; Secure')
COOKIE_2 = ('Set-Cookie: GMAIL_STAT_205a=EXPIRED; '
'Expires=Thu, 12-Jul-2012 22:42:24 GMT; '
'Path=/mail; Secure')
FIRST_LINE = 'fake-header: first line'
SECOND_LINE = ' second line'
THIRD_LINE = '\tthird line'
BAD_HEADER = 'this is a bad header'
def test__GetHeaderNameValueBasic(self):
"""Test _GetHeaderNameValue with normal header."""
real_http_fetch = httpclient.RealHttpFetch
name_value = real_http_fetch._GetHeaderNameValue(self.CONTENT_TYPE)
self.assertEqual(name_value, ('content-type', 'image/x-icon'))
def test__GetHeaderNameValueLowercasesName(self):
"""_GetHeaderNameValue lowercases header name."""
real_http_fetch = httpclient.RealHttpFetch
header = 'X-Google-Gfe-Backend-Request-Info: eid=1KMAUMeiK4eMiAL52YyMBg'
expected = ('x-google-gfe-backend-request-info',
'eid=1KMAUMeiK4eMiAL52YyMBg')
name_value = real_http_fetch._GetHeaderNameValue(header)
self.assertEqual(name_value, expected)
def test__GetHeaderNameValueBadLineGivesNone(self):
"""_GetHeaderNameValue returns None for a header in wrong format."""
real_http_fetch = httpclient.RealHttpFetch
name_value = real_http_fetch._GetHeaderNameValue(self.BAD_HEADER)
self.assertIsNone(name_value)
def test__ToTuplesBasic(self):
"""Test _ToTuples with normal input."""
real_http_fetch = httpclient.RealHttpFetch
headers = [self.CONTENT_TYPE, self.COOKIE_1, self.FIRST_LINE]
result = real_http_fetch._ToTuples(headers)
expected = [('content-type', 'image/x-icon'),
('set-cookie', self.COOKIE_1[12:]),
('fake-header', 'first line')]
self.assertEqual(result, expected)
def test__ToTuplesMultipleHeadersWithSameName(self):
"""Test mulitple headers with the same name."""
real_http_fetch = httpclient.RealHttpFetch
headers = [self.CONTENT_TYPE, self.COOKIE_1, self.COOKIE_2, self.FIRST_LINE]
result = real_http_fetch._ToTuples(headers)
expected = [('content-type', 'image/x-icon'),
('set-cookie', self.COOKIE_1[12:]),
('set-cookie', self.COOKIE_2[12:]),
('fake-header', 'first line')]
self.assertEqual(result, expected)
def test__ToTuplesAppendsContinuationLine(self):
"""Test continuation line is handled."""
real_http_fetch = httpclient.RealHttpFetch
headers = [self.CONTENT_TYPE, self.COOKIE_1, self.FIRST_LINE,
self.SECOND_LINE, self.THIRD_LINE]
result = real_http_fetch._ToTuples(headers)
expected = [('content-type', 'image/x-icon'),
('set-cookie', self.COOKIE_1[12:]),
('fake-header', 'first line\n second line\n third line')]
self.assertEqual(result, expected)
def test__ToTuplesIgnoresBadHeader(self):
"""Test bad header is ignored."""
real_http_fetch = httpclient.RealHttpFetch
bad_headers = [self.CONTENT_TYPE, self.BAD_HEADER, self.COOKIE_1]
expected = [('content-type', 'image/x-icon'),
('set-cookie', self.COOKIE_1[12:])]
result = real_http_fetch._ToTuples(bad_headers)
self.assertEqual(result, expected)
def test__ToTuplesIgnoresMisplacedContinuationLine(self):
"""Test misplaced continuation line is ignored."""
real_http_fetch = httpclient.RealHttpFetch
misplaced_headers = [self.THIRD_LINE, self.CONTENT_TYPE,
self.COOKIE_1, self.FIRST_LINE, self.SECOND_LINE]
result = real_http_fetch._ToTuples(misplaced_headers)
expected = [('content-type', 'image/x-icon'),
('set-cookie', self.COOKIE_1[12:]),
('fake-header', 'first line\n second line')]
self.assertEqual(result, expected)
class RealHttpFetchGetConnectionTest(unittest.TestCase):
"""Test that a connection is made with request IP/port or proxy IP/port."""
def setUp(self):
def real_dns_lookup(host):
return {
'example.com': '127.127.127.127',
'proxy.com': '2.2.2.2',
}[host]
self.fetch = httpclient.RealHttpFetch(real_dns_lookup)
self.https_proxy = None
self.http_proxy = None
def get_proxy(is_ssl):
return self.https_proxy if is_ssl else self.http_proxy
self.fetch._get_system_proxy = get_proxy
def set_http_proxy(self, host, port):
self.http_proxy = platformsettings.SystemProxy(host, port)
def set_https_proxy(self, host, port):
self.https_proxy = platformsettings.SystemProxy(host, port)
def test_get_connection_without_proxy_connects_to_host_ip(self):
"""HTTP connection with no proxy connects to host IP."""
self.set_http_proxy(host=None, port=None)
connection = self.fetch._get_connection('example.com', None, is_ssl=False)
self.assertEqual('127.127.127.127', connection.host)
self.assertEqual(80, connection.port) # default HTTP port
def test_get_connection_without_proxy_uses_nondefault_request_port(self):
"""HTTP connection with no proxy connects with request port."""
self.set_https_proxy(host=None, port=None)
connection = self.fetch._get_connection('example.com', 8888, is_ssl=False)
self.assertEqual('127.127.127.127', connection.host)
self.assertEqual(8888, connection.port) # request HTTP port
def test_get_connection_with_proxy_uses_proxy_port(self):
"""HTTP connection with proxy connects used proxy port."""
self.set_http_proxy(host='proxy.com', port=None)
connection = self.fetch._get_connection('example.com', 8888, is_ssl=False)
self.assertEqual('2.2.2.2', connection.host) # proxy IP
self.assertEqual(80, connection.port) # proxy port (default HTTP)
def test_ssl_get_connection_without_proxy_connects_to_host_ip(self):
"""HTTPS (SSL) connection with no proxy connects to host IP."""
self.set_https_proxy(host=None, port=None)
connection = self.fetch._get_connection('example.com', None, is_ssl=True)
self.assertEqual('127.127.127.127', connection.host)
self.assertEqual(443, connection.port) # default SSL port
def test_ssl_get_connection_with_proxy_connects_to_proxy_ip(self):
"""HTTPS (SSL) connection with proxy connects to proxy IP."""
self.set_https_proxy(host='proxy.com', port=8443)
connection = self.fetch._get_connection('example.com', None, is_ssl=True)
self.assertEqual('2.2.2.2', connection.host) # proxy IP
self.assertEqual(8443, connection.port) # SSL proxy port
def test_ssl_get_connection_with_proxy_tunnels_to_host(self):
"""HTTPS (SSL) connection with proxy tunnels to target host."""
self.set_https_proxy(host='proxy.com', port=8443)
connection = self.fetch._get_connection('example.com', 9443, is_ssl=True)
self.assertEqual('example.com', connection._tunnel_host) # host name
self.assertEqual(9443, connection._tunnel_port) # host port
class ActualNetworkFetchTest(test_utils.RealNetworkFetchTest):
def testFetchNonSSLRequest(self):
real_dns_lookup = dnsproxy.RealDnsLookup(
name_servers=[platformsettings.get_original_primary_nameserver()],
dns_forwarding=False, proxy_host='127.0.0.1', proxy_port=5353)
fetch = httpclient.RealHttpFetch(real_dns_lookup)
request = httparchive.ArchivedHttpRequest(
command='GET', host='google.com', full_path='/search?q=dogs',
request_body=None, headers={}, is_ssl=False)
response = fetch(request)
self.assertIsNotNone(response)
def testFetchSSLRequest(self):
real_dns_lookup = dnsproxy.RealDnsLookup(
name_servers=[platformsettings.get_original_primary_nameserver()],
dns_forwarding=False, proxy_host='127.0.0.1', proxy_port=5353)
fetch = httpclient.RealHttpFetch(real_dns_lookup)
request = httparchive.ArchivedHttpRequest(
command='GET', host='google.com', full_path='/search?q=dogs',
request_body=None, headers={}, is_ssl=True)
response = fetch(request)
self.assertIsNotNone(response)
class HttpArchiveFetchTest(unittest.TestCase):
TEST_REQUEST_TIME = datetime.datetime(2016, 11, 17, 1, 2, 3, 456)
def createTestResponse(self):
return httparchive.ArchivedHttpResponse(
11, 200, 'OK', [('content-type', 'text/html')],
['<body>test</body>'],
request_time=HttpArchiveFetchTest.TEST_REQUEST_TIME)
def checkTestResponse(self, actual_response, archive, request):
self.assertEqual(actual_response, archive[request])
self.assertEqual(['<body>test</body>'], actual_response.response_data)
self.assertEqual(HttpArchiveFetchTest.TEST_REQUEST_TIME,
actual_response.request_time)
@staticmethod
def dummy_injector(_):
return '<body>test</body>'
class RecordHttpArchiveFetchTest(HttpArchiveFetchTest):
@mock.patch('httpclient.RealHttpFetch')
def testFetch(self, real_http_fetch):
http_fetch_instance = real_http_fetch.return_value
response = self.createTestResponse()
http_fetch_instance.return_value = response
archive = httparchive.HttpArchive()
fetch = httpclient.RecordHttpArchiveFetch(archive, self.dummy_injector)
request = httparchive.ArchivedHttpRequest(
'GET', 'www.test.com', '/', None, {})
self.checkTestResponse(fetch(request), archive, request)
class ReplayHttpArchiveFetchTest(HttpArchiveFetchTest):
def testFetch(self):
request = httparchive.ArchivedHttpRequest(
'GET', 'www.test.com', '/', None, {})
response = self.createTestResponse()
archive = httparchive.HttpArchive()
archive[request] = response
fetch = httpclient.ReplayHttpArchiveFetch(
archive, None, self.dummy_injector)
self.checkTestResponse(fetch(request), archive, request)
@mock.patch('script_injector.util.resource_string')
@mock.patch('script_injector.util.resource_exists')
@mock.patch('script_injector.os.path.exists')
def testInjectedDate(self, os_path, util_exists, util_resource_string):
os_path.return_value = False
util_exists.return_value = True
util_resource_string.return_value = \
["""var time_seed={}""".format(script_injector.TIME_SEED_MARKER)]
request = httparchive.ArchivedHttpRequest(
'GET', 'www.test.com', '/', None, {})
response = self.createTestResponse()
archive = httparchive.HttpArchive()
archive[request] = response
fetch = httpclient.ReplayHttpArchiveFetch(
archive, None, script_injector.GetScriptInjector("time_script.js"))
self.assertEqual(
['<script>var time_seed=1479344523000</script><body>test</body>'],
fetch(request).response_data)
if __name__ == '__main__':
unittest.main()