#!/usr/bin/env python
# Copyright 2011 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import calendar
import email.utils
import httparchive
import unittest


def create_request(headers):
  return httparchive.ArchivedHttpRequest(
      'GET', 'www.test.com', '/', None, headers)

def create_response(headers):
  return httparchive.ArchivedHttpResponse(
      11, 200, 'OK', headers, '')


class HttpArchiveTest(unittest.TestCase):

  REQUEST_HEADERS = {}
  REQUEST = create_request(REQUEST_HEADERS)

  # Used for if-(un)modified-since checks
  DATE_PAST = 'Wed, 13 Jul 2011 03:58:08 GMT'
  DATE_PRESENT = 'Wed, 20 Jul 2011 04:58:08 GMT'
  DATE_FUTURE = 'Wed, 27 Jul 2011 05:58:08 GMT'
  DATE_INVALID = 'This is an invalid date!!'

  # etag values
  ETAG_VALID = 'etag'
  ETAG_INVALID = 'This is an invalid etag value!!'

  RESPONSE_HEADERS = [('last-modified', DATE_PRESENT), ('etag', ETAG_VALID)]
  RESPONSE = create_response(RESPONSE_HEADERS)

  def setUp(self):
    self.archive = httparchive.HttpArchive()
    self.archive[self.REQUEST] = self.RESPONSE

    # Also add an identical POST request for testing
    request = httparchive.ArchivedHttpRequest(
        'POST', 'www.test.com', '/', None, self.REQUEST_HEADERS)
    self.archive[request] = self.RESPONSE

  def tearDown(self):
    pass

  def test_init(self):
    archive = httparchive.HttpArchive()
    self.assertEqual(len(archive), 0)

  def test_request__TrimHeaders(self):
    request = httparchive.ArchivedHttpRequest
    header1 = {'accept-encoding': 'gzip,deflate'}
    self.assertEqual(request._TrimHeaders(header1),
                     [(k, v) for k, v in header1.items()])

    header2 = {'referer': 'www.google.com'}
    self.assertEqual(request._TrimHeaders(header2), [])

    header3 = {'referer': 'www.google.com', 'cookie': 'cookie_monster!',
               'hello': 'world'}
    self.assertEqual(request._TrimHeaders(header3), [('hello', 'world')])

    # Tests that spaces and trailing comma get stripped.
    header4 = {'accept-encoding': 'gzip, deflate,, '}
    self.assertEqual(request._TrimHeaders(header4),
                     [('accept-encoding', 'gzip,deflate')])

    # Tests that 'lzma' gets stripped.
    header5 = {'accept-encoding': 'gzip, deflate, lzma'}
    self.assertEqual(request._TrimHeaders(header5),
                     [('accept-encoding', 'gzip,deflate')])

    # Tests that x-client-data gets stripped.
    header6 = {'x-client-data': 'testdata'}
    self.assertEqual(request._TrimHeaders(header6), [])

  def test_matches(self):
    headers = {}
    request1 = httparchive.ArchivedHttpRequest(
        'GET', 'www.test.com', '/index.html?hello=world', None, headers)
    request2 = httparchive.ArchivedHttpRequest(
        'GET', 'www.test.com', '/index.html?foo=bar', None, headers)

    self.assert_(not request1.matches(
        request2.command, request2.host, request2.full_path, use_query=True))
    self.assert_(request1.matches(
        request2.command, request2.host, request2.full_path, use_query=False))

    self.assert_(request1.matches(
        request2.command, request2.host, None, use_query=True))
    self.assert_(request1.matches(
        request2.command, None, request2.full_path, use_query=False))

    empty_request = httparchive.ArchivedHttpRequest(
        None, None, None, None, headers)
    self.assert_(not empty_request.matches(
        request2.command, request2.host, None, use_query=True))
    self.assert_(not empty_request.matches(
        request2.command, None, request2.full_path, use_query=False))

  def setup_find_closest_request(self):
    headers = {}
    request1 = httparchive.ArchivedHttpRequest(
        'GET', 'www.test.com', '/a?hello=world', None, headers)
    request2 = httparchive.ArchivedHttpRequest(
        'GET', 'www.test.com', '/a?foo=bar', None, headers)
    request3 = httparchive.ArchivedHttpRequest(
        'GET', 'www.test.com', '/b?hello=world', None, headers)
    request4 = httparchive.ArchivedHttpRequest(
        'GET', 'www.test.com', '/c?hello=world', None, headers)

    archive = httparchive.HttpArchive()
    # Add requests 2 and 3 and find closest match with request1
    archive[request2] = self.RESPONSE
    archive[request3] = self.RESPONSE

    return archive, request1, request2, request3, request4

  def test_find_closest_request(self):
    archive, request1, request2, request3, request4 = (
      self.setup_find_closest_request())

    # Always favor requests with same paths, even if use_path=False.
    self.assertEqual(
        request2, archive.find_closest_request(request1, use_path=False))
    # If we match strictly on path, request2 is the only match
    self.assertEqual(
        request2, archive.find_closest_request(request1, use_path=True))
    # request4 can be matched with request3, if use_path=False
    self.assertEqual(
        request3, archive.find_closest_request(request4, use_path=False))
    # ...but None, if use_path=True
    self.assertEqual(
        None, archive.find_closest_request(request4, use_path=True))

  def test_find_closest_request_delete_simple(self):
    archive, request1, request2, request3, request4 = (
      self.setup_find_closest_request())

    del archive[request3]
    self.assertEqual(
        request2, archive.find_closest_request(request1, use_path=False))
    self.assertEqual(
        request2, archive.find_closest_request(request1, use_path=True))

  def test_find_closest_request_delete_complex(self):
    archive, request1, request2, request3, request4 = (
      self.setup_find_closest_request())

    del archive[request2]
    self.assertEqual(
        request3, archive.find_closest_request(request1, use_path=False))
    self.assertEqual(
        None, archive.find_closest_request(request1, use_path=True))

  def test_find_closest_request_timestamp(self):
    headers = {}
    request1 = httparchive.ArchivedHttpRequest(
        'GET', 'www.test.com', '/index.html?time=100000000&important=true',
        None, headers)
    request2 = httparchive.ArchivedHttpRequest(
        'GET', 'www.test.com', '/index.html?time=99999999&important=true',
        None, headers)
    request3 = httparchive.ArchivedHttpRequest(
        'GET', 'www.test.com', '/index.html?time=10000000&important=false',
        None, headers)
    archive = httparchive.HttpArchive()
    # Add requests 2 and 3 and find closest match with request1
    archive[request2] = self.RESPONSE
    archive[request3] = self.RESPONSE

    # Although request3 is lexicographically closer, request2 is semantically
    # more similar.
    self.assertEqual(
        request2, archive.find_closest_request(request1, use_path=True))

  def test_get_cmp_seq(self):
    # The order of key-value pairs in query and header respectively should not
    # matter.
    headers = {'k2': 'v2', 'k1': 'v1'}
    request = httparchive.ArchivedHttpRequest(
        'GET', 'www.test.com', '/a?c=d&a=b;e=f', None, headers)
    self.assertEqual([('a', 'b'), ('c', 'd'), ('e', 'f'),
                      ('k1', 'v1'), ('k2', 'v2')],
                     request._GetCmpSeq('c=d&a=b;e=f'))

  def test_get_simple(self):
    request = self.REQUEST
    response = self.RESPONSE
    archive = self.archive

    self.assertEqual(archive.get(request), response)

    false_request_headers = {'foo': 'bar'}
    false_request = create_request(false_request_headers)
    self.assertEqual(archive.get(false_request, default=None), None)

  def test_get_modified_headers(self):
    request = self.REQUEST
    response = self.RESPONSE
    archive = self.archive
    not_modified_response = httparchive.create_response(304)

    # Fail check and return response again
    request_headers = {'if-modified-since': self.DATE_PAST}
    request = create_request(request_headers)
    self.assertEqual(archive.get(request), response)

    # Succeed check and return 304 Not Modified
    request_headers = {'if-modified-since': self.DATE_FUTURE}
    request = create_request(request_headers)
    self.assertEqual(archive.get(request), not_modified_response)

    # Succeed check and return 304 Not Modified
    request_headers = {'if-modified-since': self.DATE_PRESENT}
    request = create_request(request_headers)
    self.assertEqual(archive.get(request), not_modified_response)

    # Invalid date, fail check and return response again
    request_headers = {'if-modified-since': self.DATE_INVALID}
    request = create_request(request_headers)
    self.assertEqual(archive.get(request), response)

    # fail check since the request is not a GET or HEAD request (as per RFC)
    request_headers = {'if-modified-since': self.DATE_FUTURE}
    request = httparchive.ArchivedHttpRequest(
        'POST', 'www.test.com', '/', None, request_headers)
    self.assertEqual(archive.get(request), response)

  def test_get_unmodified_headers(self):
    request = self.REQUEST
    response = self.RESPONSE
    archive = self.archive
    not_modified_response = httparchive.create_response(304)

    # Succeed check
    request_headers = {'if-unmodified-since': self.DATE_PAST}
    request = create_request(request_headers)
    self.assertEqual(archive.get(request), not_modified_response)

    # Fail check
    request_headers = {'if-unmodified-since': self.DATE_FUTURE}
    request = create_request(request_headers)
    self.assertEqual(archive.get(request), response)

    # Succeed check
    request_headers = {'if-unmodified-since': self.DATE_PRESENT}
    request = create_request(request_headers)
    self.assertEqual(archive.get(request), not_modified_response)

    # Fail check
    request_headers = {'if-unmodified-since': self.DATE_INVALID}
    request = create_request(request_headers)
    self.assertEqual(archive.get(request), response)

    # Fail check since the request is not a GET or HEAD request (as per RFC)
    request_headers = {'if-modified-since': self.DATE_PAST}
    request = httparchive.ArchivedHttpRequest(
        'POST', 'www.test.com', '/', None, request_headers)
    self.assertEqual(archive.get(request), response)

  def test_get_etags(self):
    request = self.REQUEST
    response = self.RESPONSE
    archive = self.archive
    not_modified_response = httparchive.create_response(304)
    precondition_failed_response = httparchive.create_response(412)

    # if-match headers
    request_headers = {'if-match': self.ETAG_VALID}
    request = create_request(request_headers)
    self.assertEqual(archive.get(request), response)

    request_headers = {'if-match': self.ETAG_INVALID}
    request = create_request(request_headers)
    self.assertEqual(archive.get(request), precondition_failed_response)

    # if-none-match headers
    request_headers = {'if-none-match': self.ETAG_VALID}
    request = create_request(request_headers)
    self.assertEqual(archive.get(request), not_modified_response)

    request_headers = {'if-none-match': self.ETAG_INVALID}
    request = create_request(request_headers)
    self.assertEqual(archive.get(request), response)

  def test_get_multiple_match_headers(self):
    request = self.REQUEST
    response = self.RESPONSE
    archive = self.archive
    not_modified_response = httparchive.create_response(304)
    precondition_failed_response = httparchive.create_response(412)

    # if-match headers
    # If the request would, without the If-Match header field,
    # result in anything other than a 2xx or 412 status,
    # then the If-Match header MUST be ignored.

    request_headers = {
        'if-match': self.ETAG_VALID,
        'if-modified-since': self.DATE_PAST,
    }
    request = create_request(request_headers)
    self.assertEqual(archive.get(request), response)

    # Invalid etag, precondition failed
    request_headers = {
        'if-match': self.ETAG_INVALID,
        'if-modified-since': self.DATE_PAST,
    }
    request = create_request(request_headers)
    self.assertEqual(archive.get(request), precondition_failed_response)

    # 304 response; ignore if-match header
    request_headers = {
        'if-match': self.ETAG_VALID,
        'if-modified-since': self.DATE_FUTURE,
    }
    request = create_request(request_headers)
    self.assertEqual(archive.get(request), not_modified_response)

    # 304 response; ignore if-match header
    request_headers = {
        'if-match': self.ETAG_INVALID,
        'if-modified-since': self.DATE_PRESENT,
    }
    request = create_request(request_headers)
    self.assertEqual(archive.get(request), not_modified_response)

    # Invalid etag, precondition failed
    request_headers = {
        'if-match': self.ETAG_INVALID,
        'if-modified-since': self.DATE_INVALID,
    }
    request = create_request(request_headers)
    self.assertEqual(archive.get(request), precondition_failed_response)

  def test_get_multiple_none_match_headers(self):
    request = self.REQUEST
    response = self.RESPONSE
    archive = self.archive
    not_modified_response = httparchive.create_response(304)
    precondition_failed_response = httparchive.create_response(412)

    # if-none-match headers
    # If the request would, without the If-None-Match header field,
    # result in anything other than a 2xx or 304 status,
    # then the If-None-Match header MUST be ignored.

    request_headers = {
        'if-none-match': self.ETAG_VALID,
        'if-modified-since': self.DATE_PAST,
    }
    request = create_request(request_headers)
    self.assertEqual(archive.get(request), response)

    request_headers = {
        'if-none-match': self.ETAG_INVALID,
        'if-modified-since': self.DATE_PAST,
    }
    request = create_request(request_headers)
    self.assertEqual(archive.get(request), response)

    # etag match, precondition failed
    request_headers = {
        'if-none-match': self.ETAG_VALID,
        'if-modified-since': self.DATE_FUTURE,
    }
    request = create_request(request_headers)
    self.assertEqual(archive.get(request), not_modified_response)

    request_headers = {
        'if-none-match': self.ETAG_INVALID,
        'if-modified-since': self.DATE_PRESENT,
    }
    request = create_request(request_headers)
    self.assertEqual(archive.get(request), not_modified_response)

    request_headers = {
        'if-none-match': self.ETAG_INVALID,
        'if-modified-since': self.DATE_INVALID,
    }
    request = create_request(request_headers)
    self.assertEqual(archive.get(request), response)

  def test_response__TrimHeaders(self):
    response = httparchive.ArchivedHttpResponse
    header1 = [('access-control-allow-origin', '*'),
               ('content-type', 'image/jpeg'),
               ('content-length', 2878)]
    self.assertEqual(response._TrimHeaders(header1), header1)

    header2 = [('content-type', 'text/javascript; charset=utf-8'),
               ('connection', 'keep-alive'),
               ('cache-control', 'private, must-revalidate, max-age=0'),
               ('content-encoding', 'gzip')]
    self.assertEqual(response._TrimHeaders(header2), header2)

    header3 = [('content-security-policy', """\
default-src 'self' http://*.cnn.com:* https://*.cnn.com:* \
*.cnn.net:* *.turner.com:* *.ugdturner.com:* *.vgtf.net:*; \
script-src 'unsafe-inline' 'unsafe-eval' 'self' *; \
style-src 'unsafe-inline' 'self' *; frame-src 'self' *; \
object-src 'self' *; img-src 'self' * data:; media-src 'self' *; \
font-src 'self' *; connect-src 'self' *"""),
               ('access-control-allow-origin', '*'),
               ('content-type', 'text/html; charset=utf-8'),
               ('content-encoding', 'gzip')]
    self.assertEqual(response._TrimHeaders(header3), [
        ('access-control-allow-origin', '*'),
        ('content-type', 'text/html; charset=utf-8'),
        ('content-encoding', 'gzip')
    ])

    header4 = [('content-security-policy', """\
default-src * data: blob:;script-src *.facebook.com *.fbcdn.net \
*.facebook.net *.google-analytics.com *.virtualearth.net *.google.com \
127.0.0.1:* *.spotilocal.com:* 'unsafe-inline' 'unsafe-eval' \
fbstatic-a.akamaihd.net fbcdn-static-b-a.akamaihd.net *.atlassolutions.com \
blob: chrome-extension://lifbcibllhkdhoafpjfnlhfpfgnpldfl \
*.liverail.com;style-src * 'unsafe-inline' data:;connect-src *.facebook.com \
*.fbcdn.net *.facebook.net *.spotilocal.com:* *.akamaihd.net \
wss://*.facebook.com:* https://fb.scanandcleanlocal.com:* \
*.atlassolutions.com attachment.fbsbx.com ws://localhost:* \
blob: 127.0.0.1:* *.liverail.com""")]
    self.assertEqual(response._TrimHeaders(header4), [])


class ArchivedHttpResponse(unittest.TestCase):
  PAST_DATE_A = 'Tue, 13 Jul 2010 03:47:07 GMT'
  PAST_DATE_B = 'Tue, 13 Jul 2010 02:47:07 GMT'  # PAST_DATE_A -1 hour
  PAST_DATE_C = 'Tue, 13 Jul 2010 04:47:07 GMT'  # PAST_DATE_A +1 hour
  NOW_DATE_A = 'Wed, 20 Jul 2011 04:58:08 GMT'
  NOW_DATE_B = 'Wed, 20 Jul 2011 03:58:08 GMT'  # NOW_DATE_A -1 hour
  NOW_DATE_C = 'Wed, 20 Jul 2011 05:58:08 GMT'  # NOW_DATE_A +1 hour
  NOW_SECONDS = calendar.timegm(email.utils.parsedate(NOW_DATE_A))

  def setUp(self):
    self.response = create_response([('date', self.PAST_DATE_A)])

  def test_update_date_same_date(self):
    self.assertEqual(
        self.response.update_date(self.PAST_DATE_A, now=self.NOW_SECONDS),
        self.NOW_DATE_A)

  def test_update_date_before_date(self):
    self.assertEqual(
        self.response.update_date(self.PAST_DATE_B, now=self.NOW_SECONDS),
        self.NOW_DATE_B)

  def test_update_date_after_date(self):
    self.assertEqual(
        self.response.update_date(self.PAST_DATE_C, now=self.NOW_SECONDS),
        self.NOW_DATE_C)

  def test_update_date_bad_date_param(self):
    self.assertEqual(
        self.response.update_date('garbage date', now=self.NOW_SECONDS),
        'garbage date')

  def test_update_date_bad_date_header(self):
    self.response.set_header('date', 'garbage date')
    self.assertEqual(
        self.response.update_date(self.PAST_DATE_B, now=self.NOW_SECONDS),
        self.PAST_DATE_B)


if __name__ == '__main__':
  unittest.main()