#!/usr/bin/env python # Copyright 2011 Google Inc. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import calendar import email.utils import httparchive import unittest def create_request(headers): return httparchive.ArchivedHttpRequest( 'GET', 'www.test.com', '/', None, headers) def create_response(headers): return httparchive.ArchivedHttpResponse( 11, 200, 'OK', headers, '') class HttpArchiveTest(unittest.TestCase): REQUEST_HEADERS = {} REQUEST = create_request(REQUEST_HEADERS) # Used for if-(un)modified-since checks DATE_PAST = 'Wed, 13 Jul 2011 03:58:08 GMT' DATE_PRESENT = 'Wed, 20 Jul 2011 04:58:08 GMT' DATE_FUTURE = 'Wed, 27 Jul 2011 05:58:08 GMT' DATE_INVALID = 'This is an invalid date!!' # etag values ETAG_VALID = 'etag' ETAG_INVALID = 'This is an invalid etag value!!' RESPONSE_HEADERS = [('last-modified', DATE_PRESENT), ('etag', ETAG_VALID)] RESPONSE = create_response(RESPONSE_HEADERS) def setUp(self): self.archive = httparchive.HttpArchive() self.archive[self.REQUEST] = self.RESPONSE # Also add an identical POST request for testing request = httparchive.ArchivedHttpRequest( 'POST', 'www.test.com', '/', None, self.REQUEST_HEADERS) self.archive[request] = self.RESPONSE def tearDown(self): pass def test_init(self): archive = httparchive.HttpArchive() self.assertEqual(len(archive), 0) def test_request__TrimHeaders(self): request = httparchive.ArchivedHttpRequest header1 = {'accept-encoding': 'gzip,deflate'} self.assertEqual(request._TrimHeaders(header1), [(k, v) for k, v in header1.items()]) header2 = {'referer': 'www.google.com'} self.assertEqual(request._TrimHeaders(header2), []) header3 = {'referer': 'www.google.com', 'cookie': 'cookie_monster!', 'hello': 'world'} self.assertEqual(request._TrimHeaders(header3), [('hello', 'world')]) # Tests that spaces and trailing comma get stripped. header4 = {'accept-encoding': 'gzip, deflate,, '} self.assertEqual(request._TrimHeaders(header4), [('accept-encoding', 'gzip,deflate')]) # Tests that 'lzma' gets stripped. header5 = {'accept-encoding': 'gzip, deflate, lzma'} self.assertEqual(request._TrimHeaders(header5), [('accept-encoding', 'gzip,deflate')]) # Tests that x-client-data gets stripped. header6 = {'x-client-data': 'testdata'} self.assertEqual(request._TrimHeaders(header6), []) def test_matches(self): headers = {} request1 = httparchive.ArchivedHttpRequest( 'GET', 'www.test.com', '/index.html?hello=world', None, headers) request2 = httparchive.ArchivedHttpRequest( 'GET', 'www.test.com', '/index.html?foo=bar', None, headers) self.assert_(not request1.matches( request2.command, request2.host, request2.full_path, use_query=True)) self.assert_(request1.matches( request2.command, request2.host, request2.full_path, use_query=False)) self.assert_(request1.matches( request2.command, request2.host, None, use_query=True)) self.assert_(request1.matches( request2.command, None, request2.full_path, use_query=False)) empty_request = httparchive.ArchivedHttpRequest( None, None, None, None, headers) self.assert_(not empty_request.matches( request2.command, request2.host, None, use_query=True)) self.assert_(not empty_request.matches( request2.command, None, request2.full_path, use_query=False)) def setup_find_closest_request(self): headers = {} request1 = httparchive.ArchivedHttpRequest( 'GET', 'www.test.com', '/a?hello=world', None, headers) request2 = httparchive.ArchivedHttpRequest( 'GET', 'www.test.com', '/a?foo=bar', None, headers) request3 = httparchive.ArchivedHttpRequest( 'GET', 'www.test.com', '/b?hello=world', None, headers) request4 = httparchive.ArchivedHttpRequest( 'GET', 'www.test.com', '/c?hello=world', None, headers) archive = httparchive.HttpArchive() # Add requests 2 and 3 and find closest match with request1 archive[request2] = self.RESPONSE archive[request3] = self.RESPONSE return archive, request1, request2, request3, request4 def test_find_closest_request(self): archive, request1, request2, request3, request4 = ( self.setup_find_closest_request()) # Always favor requests with same paths, even if use_path=False. self.assertEqual( request2, archive.find_closest_request(request1, use_path=False)) # If we match strictly on path, request2 is the only match self.assertEqual( request2, archive.find_closest_request(request1, use_path=True)) # request4 can be matched with request3, if use_path=False self.assertEqual( request3, archive.find_closest_request(request4, use_path=False)) # ...but None, if use_path=True self.assertEqual( None, archive.find_closest_request(request4, use_path=True)) def test_find_closest_request_delete_simple(self): archive, request1, request2, request3, request4 = ( self.setup_find_closest_request()) del archive[request3] self.assertEqual( request2, archive.find_closest_request(request1, use_path=False)) self.assertEqual( request2, archive.find_closest_request(request1, use_path=True)) def test_find_closest_request_delete_complex(self): archive, request1, request2, request3, request4 = ( self.setup_find_closest_request()) del archive[request2] self.assertEqual( request3, archive.find_closest_request(request1, use_path=False)) self.assertEqual( None, archive.find_closest_request(request1, use_path=True)) def test_find_closest_request_timestamp(self): headers = {} request1 = httparchive.ArchivedHttpRequest( 'GET', 'www.test.com', '/index.html?time=100000000&important=true', None, headers) request2 = httparchive.ArchivedHttpRequest( 'GET', 'www.test.com', '/index.html?time=99999999&important=true', None, headers) request3 = httparchive.ArchivedHttpRequest( 'GET', 'www.test.com', '/index.html?time=10000000&important=false', None, headers) archive = httparchive.HttpArchive() # Add requests 2 and 3 and find closest match with request1 archive[request2] = self.RESPONSE archive[request3] = self.RESPONSE # Although request3 is lexicographically closer, request2 is semantically # more similar. self.assertEqual( request2, archive.find_closest_request(request1, use_path=True)) def test_get_cmp_seq(self): # The order of key-value pairs in query and header respectively should not # matter. headers = {'k2': 'v2', 'k1': 'v1'} request = httparchive.ArchivedHttpRequest( 'GET', 'www.test.com', '/a?c=d&a=b;e=f', None, headers) self.assertEqual([('a', 'b'), ('c', 'd'), ('e', 'f'), ('k1', 'v1'), ('k2', 'v2')], request._GetCmpSeq('c=d&a=b;e=f')) def test_get_simple(self): request = self.REQUEST response = self.RESPONSE archive = self.archive self.assertEqual(archive.get(request), response) false_request_headers = {'foo': 'bar'} false_request = create_request(false_request_headers) self.assertEqual(archive.get(false_request, default=None), None) def test_get_modified_headers(self): request = self.REQUEST response = self.RESPONSE archive = self.archive not_modified_response = httparchive.create_response(304) # Fail check and return response again request_headers = {'if-modified-since': self.DATE_PAST} request = create_request(request_headers) self.assertEqual(archive.get(request), response) # Succeed check and return 304 Not Modified request_headers = {'if-modified-since': self.DATE_FUTURE} request = create_request(request_headers) self.assertEqual(archive.get(request), not_modified_response) # Succeed check and return 304 Not Modified request_headers = {'if-modified-since': self.DATE_PRESENT} request = create_request(request_headers) self.assertEqual(archive.get(request), not_modified_response) # Invalid date, fail check and return response again request_headers = {'if-modified-since': self.DATE_INVALID} request = create_request(request_headers) self.assertEqual(archive.get(request), response) # fail check since the request is not a GET or HEAD request (as per RFC) request_headers = {'if-modified-since': self.DATE_FUTURE} request = httparchive.ArchivedHttpRequest( 'POST', 'www.test.com', '/', None, request_headers) self.assertEqual(archive.get(request), response) def test_get_unmodified_headers(self): request = self.REQUEST response = self.RESPONSE archive = self.archive not_modified_response = httparchive.create_response(304) # Succeed check request_headers = {'if-unmodified-since': self.DATE_PAST} request = create_request(request_headers) self.assertEqual(archive.get(request), not_modified_response) # Fail check request_headers = {'if-unmodified-since': self.DATE_FUTURE} request = create_request(request_headers) self.assertEqual(archive.get(request), response) # Succeed check request_headers = {'if-unmodified-since': self.DATE_PRESENT} request = create_request(request_headers) self.assertEqual(archive.get(request), not_modified_response) # Fail check request_headers = {'if-unmodified-since': self.DATE_INVALID} request = create_request(request_headers) self.assertEqual(archive.get(request), response) # Fail check since the request is not a GET or HEAD request (as per RFC) request_headers = {'if-modified-since': self.DATE_PAST} request = httparchive.ArchivedHttpRequest( 'POST', 'www.test.com', '/', None, request_headers) self.assertEqual(archive.get(request), response) def test_get_etags(self): request = self.REQUEST response = self.RESPONSE archive = self.archive not_modified_response = httparchive.create_response(304) precondition_failed_response = httparchive.create_response(412) # if-match headers request_headers = {'if-match': self.ETAG_VALID} request = create_request(request_headers) self.assertEqual(archive.get(request), response) request_headers = {'if-match': self.ETAG_INVALID} request = create_request(request_headers) self.assertEqual(archive.get(request), precondition_failed_response) # if-none-match headers request_headers = {'if-none-match': self.ETAG_VALID} request = create_request(request_headers) self.assertEqual(archive.get(request), not_modified_response) request_headers = {'if-none-match': self.ETAG_INVALID} request = create_request(request_headers) self.assertEqual(archive.get(request), response) def test_get_multiple_match_headers(self): request = self.REQUEST response = self.RESPONSE archive = self.archive not_modified_response = httparchive.create_response(304) precondition_failed_response = httparchive.create_response(412) # if-match headers # If the request would, without the If-Match header field, # result in anything other than a 2xx or 412 status, # then the If-Match header MUST be ignored. request_headers = { 'if-match': self.ETAG_VALID, 'if-modified-since': self.DATE_PAST, } request = create_request(request_headers) self.assertEqual(archive.get(request), response) # Invalid etag, precondition failed request_headers = { 'if-match': self.ETAG_INVALID, 'if-modified-since': self.DATE_PAST, } request = create_request(request_headers) self.assertEqual(archive.get(request), precondition_failed_response) # 304 response; ignore if-match header request_headers = { 'if-match': self.ETAG_VALID, 'if-modified-since': self.DATE_FUTURE, } request = create_request(request_headers) self.assertEqual(archive.get(request), not_modified_response) # 304 response; ignore if-match header request_headers = { 'if-match': self.ETAG_INVALID, 'if-modified-since': self.DATE_PRESENT, } request = create_request(request_headers) self.assertEqual(archive.get(request), not_modified_response) # Invalid etag, precondition failed request_headers = { 'if-match': self.ETAG_INVALID, 'if-modified-since': self.DATE_INVALID, } request = create_request(request_headers) self.assertEqual(archive.get(request), precondition_failed_response) def test_get_multiple_none_match_headers(self): request = self.REQUEST response = self.RESPONSE archive = self.archive not_modified_response = httparchive.create_response(304) precondition_failed_response = httparchive.create_response(412) # if-none-match headers # If the request would, without the If-None-Match header field, # result in anything other than a 2xx or 304 status, # then the If-None-Match header MUST be ignored. request_headers = { 'if-none-match': self.ETAG_VALID, 'if-modified-since': self.DATE_PAST, } request = create_request(request_headers) self.assertEqual(archive.get(request), response) request_headers = { 'if-none-match': self.ETAG_INVALID, 'if-modified-since': self.DATE_PAST, } request = create_request(request_headers) self.assertEqual(archive.get(request), response) # etag match, precondition failed request_headers = { 'if-none-match': self.ETAG_VALID, 'if-modified-since': self.DATE_FUTURE, } request = create_request(request_headers) self.assertEqual(archive.get(request), not_modified_response) request_headers = { 'if-none-match': self.ETAG_INVALID, 'if-modified-since': self.DATE_PRESENT, } request = create_request(request_headers) self.assertEqual(archive.get(request), not_modified_response) request_headers = { 'if-none-match': self.ETAG_INVALID, 'if-modified-since': self.DATE_INVALID, } request = create_request(request_headers) self.assertEqual(archive.get(request), response) def test_response__TrimHeaders(self): response = httparchive.ArchivedHttpResponse header1 = [('access-control-allow-origin', '*'), ('content-type', 'image/jpeg'), ('content-length', 2878)] self.assertEqual(response._TrimHeaders(header1), header1) header2 = [('content-type', 'text/javascript; charset=utf-8'), ('connection', 'keep-alive'), ('cache-control', 'private, must-revalidate, max-age=0'), ('content-encoding', 'gzip')] self.assertEqual(response._TrimHeaders(header2), header2) header3 = [('content-security-policy', """\ default-src 'self' http://*.cnn.com:* https://*.cnn.com:* \ *.cnn.net:* *.turner.com:* *.ugdturner.com:* *.vgtf.net:*; \ script-src 'unsafe-inline' 'unsafe-eval' 'self' *; \ style-src 'unsafe-inline' 'self' *; frame-src 'self' *; \ object-src 'self' *; img-src 'self' * data:; media-src 'self' *; \ font-src 'self' *; connect-src 'self' *"""), ('access-control-allow-origin', '*'), ('content-type', 'text/html; charset=utf-8'), ('content-encoding', 'gzip')] self.assertEqual(response._TrimHeaders(header3), [ ('access-control-allow-origin', '*'), ('content-type', 'text/html; charset=utf-8'), ('content-encoding', 'gzip') ]) header4 = [('content-security-policy', """\ default-src * data: blob:;script-src *.facebook.com *.fbcdn.net \ *.facebook.net *.google-analytics.com *.virtualearth.net *.google.com \ 127.0.0.1:* *.spotilocal.com:* 'unsafe-inline' 'unsafe-eval' \ fbstatic-a.akamaihd.net fbcdn-static-b-a.akamaihd.net *.atlassolutions.com \ blob: chrome-extension://lifbcibllhkdhoafpjfnlhfpfgnpldfl \ *.liverail.com;style-src * 'unsafe-inline' data:;connect-src *.facebook.com \ *.fbcdn.net *.facebook.net *.spotilocal.com:* *.akamaihd.net \ wss://*.facebook.com:* https://fb.scanandcleanlocal.com:* \ *.atlassolutions.com attachment.fbsbx.com ws://localhost:* \ blob: 127.0.0.1:* *.liverail.com""")] self.assertEqual(response._TrimHeaders(header4), []) class ArchivedHttpResponse(unittest.TestCase): PAST_DATE_A = 'Tue, 13 Jul 2010 03:47:07 GMT' PAST_DATE_B = 'Tue, 13 Jul 2010 02:47:07 GMT' # PAST_DATE_A -1 hour PAST_DATE_C = 'Tue, 13 Jul 2010 04:47:07 GMT' # PAST_DATE_A +1 hour NOW_DATE_A = 'Wed, 20 Jul 2011 04:58:08 GMT' NOW_DATE_B = 'Wed, 20 Jul 2011 03:58:08 GMT' # NOW_DATE_A -1 hour NOW_DATE_C = 'Wed, 20 Jul 2011 05:58:08 GMT' # NOW_DATE_A +1 hour NOW_SECONDS = calendar.timegm(email.utils.parsedate(NOW_DATE_A)) def setUp(self): self.response = create_response([('date', self.PAST_DATE_A)]) def test_update_date_same_date(self): self.assertEqual( self.response.update_date(self.PAST_DATE_A, now=self.NOW_SECONDS), self.NOW_DATE_A) def test_update_date_before_date(self): self.assertEqual( self.response.update_date(self.PAST_DATE_B, now=self.NOW_SECONDS), self.NOW_DATE_B) def test_update_date_after_date(self): self.assertEqual( self.response.update_date(self.PAST_DATE_C, now=self.NOW_SECONDS), self.NOW_DATE_C) def test_update_date_bad_date_param(self): self.assertEqual( self.response.update_date('garbage date', now=self.NOW_SECONDS), 'garbage date') def test_update_date_bad_date_header(self): self.response.set_header('date', 'garbage date') self.assertEqual( self.response.update_date(self.PAST_DATE_B, now=self.NOW_SECONDS), self.PAST_DATE_B) if __name__ == '__main__': unittest.main()