#!/usr/bin/env python
# Copyright 2012 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import mock
import unittest

import datetime
import dnsproxy
import httparchive
import httpclient
import platformsettings
import script_injector
import test_utils


class RealHttpFetchTest(unittest.TestCase):

  # Initialize test data
  CONTENT_TYPE = 'content-type: image/x-icon'
  COOKIE_1 = ('Set-Cookie: GMAIL_IMP=EXPIRED; '
              'Expires=Thu, 12-Jul-2012 22:41:22 GMT; '
              'Path=/mail; Secure')
  COOKIE_2 = ('Set-Cookie: GMAIL_STAT_205a=EXPIRED; '
              'Expires=Thu, 12-Jul-2012 22:42:24 GMT; '
              'Path=/mail; Secure')
  FIRST_LINE = 'fake-header: first line'
  SECOND_LINE = ' second line'
  THIRD_LINE = '\tthird line'
  BAD_HEADER = 'this is a bad header'

  def test__GetHeaderNameValueBasic(self):
    """Test _GetHeaderNameValue with normal header."""

    real_http_fetch = httpclient.RealHttpFetch
    name_value = real_http_fetch._GetHeaderNameValue(self.CONTENT_TYPE)
    self.assertEqual(name_value, ('content-type', 'image/x-icon'))

  def test__GetHeaderNameValueLowercasesName(self):
    """_GetHeaderNameValue lowercases header name."""

    real_http_fetch = httpclient.RealHttpFetch
    header = 'X-Google-Gfe-Backend-Request-Info: eid=1KMAUMeiK4eMiAL52YyMBg'
    expected = ('x-google-gfe-backend-request-info',
                'eid=1KMAUMeiK4eMiAL52YyMBg')
    name_value = real_http_fetch._GetHeaderNameValue(header)
    self.assertEqual(name_value, expected)

  def test__GetHeaderNameValueBadLineGivesNone(self):
    """_GetHeaderNameValue returns None for a header in wrong format."""

    real_http_fetch = httpclient.RealHttpFetch
    name_value = real_http_fetch._GetHeaderNameValue(self.BAD_HEADER)
    self.assertIsNone(name_value)

  def test__ToTuplesBasic(self):
    """Test _ToTuples with normal input."""

    real_http_fetch = httpclient.RealHttpFetch
    headers = [self.CONTENT_TYPE, self.COOKIE_1, self.FIRST_LINE]
    result = real_http_fetch._ToTuples(headers)
    expected = [('content-type', 'image/x-icon'),
                ('set-cookie', self.COOKIE_1[12:]),
                ('fake-header', 'first line')]
    self.assertEqual(result, expected)

  def test__ToTuplesMultipleHeadersWithSameName(self):
    """Test mulitple headers with the same name."""

    real_http_fetch = httpclient.RealHttpFetch
    headers = [self.CONTENT_TYPE, self.COOKIE_1, self.COOKIE_2, self.FIRST_LINE]
    result = real_http_fetch._ToTuples(headers)
    expected = [('content-type', 'image/x-icon'),
                ('set-cookie', self.COOKIE_1[12:]),
                ('set-cookie', self.COOKIE_2[12:]),
                ('fake-header', 'first line')]
    self.assertEqual(result, expected)

  def test__ToTuplesAppendsContinuationLine(self):
    """Test continuation line is handled."""

    real_http_fetch = httpclient.RealHttpFetch
    headers = [self.CONTENT_TYPE, self.COOKIE_1, self.FIRST_LINE,
               self.SECOND_LINE, self.THIRD_LINE]
    result = real_http_fetch._ToTuples(headers)
    expected = [('content-type', 'image/x-icon'),
                ('set-cookie', self.COOKIE_1[12:]),
                ('fake-header', 'first line\n second line\n third line')]
    self.assertEqual(result, expected)

  def test__ToTuplesIgnoresBadHeader(self):
    """Test bad header is ignored."""

    real_http_fetch = httpclient.RealHttpFetch
    bad_headers = [self.CONTENT_TYPE, self.BAD_HEADER, self.COOKIE_1]
    expected = [('content-type', 'image/x-icon'),
                ('set-cookie', self.COOKIE_1[12:])]
    result = real_http_fetch._ToTuples(bad_headers)
    self.assertEqual(result, expected)

  def test__ToTuplesIgnoresMisplacedContinuationLine(self):
    """Test misplaced continuation line is ignored."""

    real_http_fetch = httpclient.RealHttpFetch
    misplaced_headers = [self.THIRD_LINE, self.CONTENT_TYPE,
                         self.COOKIE_1, self.FIRST_LINE, self.SECOND_LINE]
    result = real_http_fetch._ToTuples(misplaced_headers)
    expected = [('content-type', 'image/x-icon'),
                ('set-cookie', self.COOKIE_1[12:]),
                ('fake-header', 'first line\n second line')]
    self.assertEqual(result, expected)


class RealHttpFetchGetConnectionTest(unittest.TestCase):
  """Test that a connection is made with request IP/port or proxy IP/port."""

  def setUp(self):
    def real_dns_lookup(host):
      return {
          'example.com': '127.127.127.127',
          'proxy.com': '2.2.2.2',
          }[host]
    self.fetch = httpclient.RealHttpFetch(real_dns_lookup)
    self.https_proxy = None
    self.http_proxy = None
    def get_proxy(is_ssl):
      return self.https_proxy if is_ssl else self.http_proxy
    self.fetch._get_system_proxy = get_proxy

  def set_http_proxy(self, host, port):
    self.http_proxy = platformsettings.SystemProxy(host, port)

  def set_https_proxy(self, host, port):
    self.https_proxy = platformsettings.SystemProxy(host, port)

  def test_get_connection_without_proxy_connects_to_host_ip(self):
    """HTTP connection with no proxy connects to host IP."""
    self.set_http_proxy(host=None, port=None)
    connection = self.fetch._get_connection('example.com', None, is_ssl=False)
    self.assertEqual('127.127.127.127', connection.host)
    self.assertEqual(80, connection.port)  # default HTTP port

  def test_get_connection_without_proxy_uses_nondefault_request_port(self):
    """HTTP connection with no proxy connects with request port."""
    self.set_https_proxy(host=None, port=None)
    connection = self.fetch._get_connection('example.com', 8888, is_ssl=False)
    self.assertEqual('127.127.127.127', connection.host)
    self.assertEqual(8888, connection.port)  # request HTTP port

  def test_get_connection_with_proxy_uses_proxy_port(self):
    """HTTP connection with proxy connects used proxy port."""
    self.set_http_proxy(host='proxy.com', port=None)
    connection = self.fetch._get_connection('example.com', 8888, is_ssl=False)
    self.assertEqual('2.2.2.2', connection.host)  # proxy IP
    self.assertEqual(80, connection.port)  # proxy port (default HTTP)

  def test_ssl_get_connection_without_proxy_connects_to_host_ip(self):
    """HTTPS (SSL) connection with no proxy connects to host IP."""
    self.set_https_proxy(host=None, port=None)
    connection = self.fetch._get_connection('example.com', None, is_ssl=True)
    self.assertEqual('127.127.127.127', connection.host)
    self.assertEqual(443, connection.port)  # default SSL port

  def test_ssl_get_connection_with_proxy_connects_to_proxy_ip(self):
    """HTTPS (SSL) connection with proxy connects to proxy IP."""
    self.set_https_proxy(host='proxy.com', port=8443)
    connection = self.fetch._get_connection('example.com', None, is_ssl=True)
    self.assertEqual('2.2.2.2', connection.host)  # proxy IP
    self.assertEqual(8443, connection.port)  # SSL proxy port

  def test_ssl_get_connection_with_proxy_tunnels_to_host(self):
    """HTTPS (SSL) connection with proxy tunnels to target host."""
    self.set_https_proxy(host='proxy.com', port=8443)
    connection = self.fetch._get_connection('example.com', 9443, is_ssl=True)
    self.assertEqual('example.com', connection._tunnel_host)  # host name
    self.assertEqual(9443, connection._tunnel_port)  # host port


class ActualNetworkFetchTest(test_utils.RealNetworkFetchTest):

  def testFetchNonSSLRequest(self):
    real_dns_lookup = dnsproxy.RealDnsLookup(
        name_servers=[platformsettings.get_original_primary_nameserver()],
        dns_forwarding=False, proxy_host='127.0.0.1', proxy_port=5353)
    fetch = httpclient.RealHttpFetch(real_dns_lookup)
    request = httparchive.ArchivedHttpRequest(
        command='GET', host='google.com', full_path='/search?q=dogs',
        request_body=None, headers={}, is_ssl=False)
    response = fetch(request)
    self.assertIsNotNone(response)

  def testFetchSSLRequest(self):
    real_dns_lookup = dnsproxy.RealDnsLookup(
        name_servers=[platformsettings.get_original_primary_nameserver()],
        dns_forwarding=False, proxy_host='127.0.0.1', proxy_port=5353)
    fetch = httpclient.RealHttpFetch(real_dns_lookup)
    request = httparchive.ArchivedHttpRequest(
        command='GET', host='google.com', full_path='/search?q=dogs',
        request_body=None, headers={}, is_ssl=True)
    response = fetch(request)
    self.assertIsNotNone(response)


class HttpArchiveFetchTest(unittest.TestCase):

  TEST_REQUEST_TIME = datetime.datetime(2016, 11, 17, 1, 2, 3, 456)

  def createTestResponse(self):
    return httparchive.ArchivedHttpResponse(
        11, 200, 'OK', [('content-type', 'text/html')],
        ['<body>test</body>'],
        request_time=HttpArchiveFetchTest.TEST_REQUEST_TIME)

  def checkTestResponse(self, actual_response, archive, request):
    self.assertEqual(actual_response, archive[request])
    self.assertEqual(['<body>test</body>'], actual_response.response_data)
    self.assertEqual(HttpArchiveFetchTest.TEST_REQUEST_TIME,
                     actual_response.request_time)

  @staticmethod
  def dummy_injector(_):
    return '<body>test</body>'


class RecordHttpArchiveFetchTest(HttpArchiveFetchTest):

  @mock.patch('httpclient.RealHttpFetch')
  def testFetch(self, real_http_fetch):
    http_fetch_instance = real_http_fetch.return_value
    response = self.createTestResponse()
    http_fetch_instance.return_value = response
    archive = httparchive.HttpArchive()
    fetch = httpclient.RecordHttpArchiveFetch(archive, self.dummy_injector)
    request = httparchive.ArchivedHttpRequest(
        'GET', 'www.test.com', '/', None, {})
    self.checkTestResponse(fetch(request), archive, request)


class ReplayHttpArchiveFetchTest(HttpArchiveFetchTest):

  def testFetch(self):
    request = httparchive.ArchivedHttpRequest(
        'GET', 'www.test.com', '/', None, {})
    response = self.createTestResponse()
    archive = httparchive.HttpArchive()
    archive[request] = response
    fetch = httpclient.ReplayHttpArchiveFetch(
        archive, None, self.dummy_injector)
    self.checkTestResponse(fetch(request), archive, request)

  @mock.patch('script_injector.util.resource_string')
  @mock.patch('script_injector.util.resource_exists')
  @mock.patch('script_injector.os.path.exists')
  def testInjectedDate(self, os_path, util_exists, util_resource_string):
    os_path.return_value = False
    util_exists.return_value = True
    util_resource_string.return_value = \
        ["""var time_seed={}""".format(script_injector.TIME_SEED_MARKER)]
    request = httparchive.ArchivedHttpRequest(
        'GET', 'www.test.com', '/', None, {})
    response = self.createTestResponse()
    archive = httparchive.HttpArchive()
    archive[request] = response

    fetch = httpclient.ReplayHttpArchiveFetch(
        archive, None, script_injector.GetScriptInjector("time_script.js"))
    self.assertEqual(
        ['<script>var time_seed=1479344523000</script><body>test</body>'],
        fetch(request).response_data)


if __name__ == '__main__':
  unittest.main()