import httplib2 import pytest import tests from six.moves import urllib def test_credentials(): c = httplib2.Credentials() c.add("joe", "password") assert tuple(c.iter("bitworking.org"))[0] == ("joe", "password") assert tuple(c.iter(""))[0] == ("joe", "password") c.add("fred", "password2", "wellformedweb.org") assert tuple(c.iter("bitworking.org"))[0] == ("joe", "password") assert len(tuple(c.iter("bitworking.org"))) == 1 assert len(tuple(c.iter("wellformedweb.org"))) == 2 assert ("fred", "password2") in tuple(c.iter("wellformedweb.org")) c.clear() assert len(tuple(c.iter("bitworking.org"))) == 0 c.add("fred", "password2", "wellformedweb.org") assert ("fred", "password2") in tuple(c.iter("wellformedweb.org")) assert len(tuple(c.iter("bitworking.org"))) == 0 assert len(tuple(c.iter(""))) == 0 def test_basic(): # Test Basic Authentication http = httplib2.Http() password = tests.gen_password() handler = tests.http_reflect_with_auth( allow_scheme="basic", allow_credentials=(("joe", password),) ) with tests.server_request(handler, request_count=3) as uri: response, content = http.request(uri, "GET") assert response.status == 401 http.add_credentials("joe", password) response, content = http.request(uri, "GET") assert response.status == 200 def test_basic_for_domain(): # Test Basic Authentication http = httplib2.Http() password = tests.gen_password() handler = tests.http_reflect_with_auth( allow_scheme="basic", allow_credentials=(("joe", password),) ) with tests.server_request(handler, request_count=4) as uri: response, content = http.request(uri, "GET") assert response.status == 401 http.add_credentials("joe", password, "example.org") response, content = http.request(uri, "GET") assert response.status == 401 domain = urllib.parse.urlparse(uri)[1] http.add_credentials("joe", password, domain) response, content = http.request(uri, "GET") assert response.status == 200 def test_basic_two_credentials(): # Test Basic Authentication with multiple sets of credentials http = httplib2.Http() password1 = tests.gen_password() password2 = tests.gen_password() allowed = [("joe", password1)] # exploit shared mutable list handler = tests.http_reflect_with_auth( allow_scheme="basic", allow_credentials=allowed ) with tests.server_request(handler, request_count=7) as uri: http.add_credentials("fred", password2) response, content = http.request(uri, "GET") assert response.status == 401 http.add_credentials("joe", password1) response, content = http.request(uri, "GET") assert response.status == 200 allowed[0] = ("fred", password2) response, content = http.request(uri, "GET") assert response.status == 200 def test_digest(): # Test that we support Digest Authentication http = httplib2.Http() password = tests.gen_password() handler = tests.http_reflect_with_auth( allow_scheme="digest", allow_credentials=(("joe", password),) ) with tests.server_request(handler, request_count=3) as uri: response, content = http.request(uri, "GET") assert response.status == 401 http.add_credentials("joe", password) response, content = http.request(uri, "GET") assert response.status == 200, content.decode() def test_digest_next_nonce_nc(): # Test that if the server sets nextnonce that we reset # the nonce count back to 1 http = httplib2.Http() password = tests.gen_password() grenew_nonce = [None] handler = tests.http_reflect_with_auth( allow_scheme="digest", allow_credentials=(("joe", password),), out_renew_nonce=grenew_nonce, ) with tests.server_request(handler, request_count=5) as uri: http.add_credentials("joe", password) response1, _ = http.request(uri, "GET") info = httplib2._parse_www_authenticate(response1, "authentication-info") assert response1.status == 200 assert info.get("digest", {}).get("nc") == "00000001", info assert not info.get("digest", {}).get("nextnonce"), info response2, _ = http.request(uri, "GET") info2 = httplib2._parse_www_authenticate(response2, "authentication-info") assert info2.get("digest", {}).get("nc") == "00000002", info2 grenew_nonce[0]() response3, content = http.request(uri, "GET") info3 = httplib2._parse_www_authenticate(response3, "authentication-info") assert response3.status == 200 assert info3.get("digest", {}).get("nc") == "00000001", info3 def test_digest_auth_stale(): # Test that we can handle a nonce becoming stale http = httplib2.Http() password = tests.gen_password() grenew_nonce = [None] requests = [] handler = tests.http_reflect_with_auth( allow_scheme="digest", allow_credentials=(("joe", password),), out_renew_nonce=grenew_nonce, out_requests=requests, ) with tests.server_request(handler, request_count=4) as uri: http.add_credentials("joe", password) response, _ = http.request(uri, "GET") assert response.status == 200 info = httplib2._parse_www_authenticate( requests[0][1].headers, "www-authenticate" ) grenew_nonce[0]() response, _ = http.request(uri, "GET") assert response.status == 200 assert not response.fromcache assert getattr(response, "_stale_digest", False) info2 = httplib2._parse_www_authenticate( requests[2][1].headers, "www-authenticate" ) nonce1 = info.get("digest", {}).get("nonce", "") nonce2 = info2.get("digest", {}).get("nonce", "") assert nonce1 != "" assert nonce2 != "" assert nonce1 != nonce2, (nonce1, nonce2) @pytest.mark.parametrize( "data", ( ({}, {}), ({"www-authenticate": ""}, {}), ( { "www-authenticate": 'Test realm="test realm" , foo=foo ,bar="bar", baz=baz,qux=qux' }, { "test": { "realm": "test realm", "foo": "foo", "bar": "bar", "baz": "baz", "qux": "qux", } }, ), ( {"www-authenticate": 'T*!%#st realm=to*!%#en, to*!%#en="quoted string"'}, {"t*!%#st": {"realm": "to*!%#en", "to*!%#en": "quoted string"}}, ), ( {"www-authenticate": 'Test realm="a \\"test\\" realm"'}, {"test": {"realm": 'a "test" realm'}}, ), ({"www-authenticate": 'Basic realm="me"'}, {"basic": {"realm": "me"}}), ( {"www-authenticate": 'Basic realm="me", algorithm="MD5"'}, {"basic": {"realm": "me", "algorithm": "MD5"}}, ), ( {"www-authenticate": 'Basic realm="me", algorithm=MD5'}, {"basic": {"realm": "me", "algorithm": "MD5"}}, ), ( {"www-authenticate": 'Basic realm="me",other="fred" '}, {"basic": {"realm": "me", "other": "fred"}}, ), ({"www-authenticate": 'Basic REAlm="me" '}, {"basic": {"realm": "me"}}), ( { "www-authenticate": 'Digest realm="digest1", qop="auth,auth-int", nonce="7102dd2", opaque="e9517f"' }, { "digest": { "realm": "digest1", "qop": "auth,auth-int", "nonce": "7102dd2", "opaque": "e9517f", } }, ), # multiple schema choice ( { "www-authenticate": 'Digest realm="multi-d", nonce="8b11d0f6", opaque="cc069c" Basic realm="multi-b" ' }, { "digest": {"realm": "multi-d", "nonce": "8b11d0f6", "opaque": "cc069c"}, "basic": {"realm": "multi-b"}, }, ), # FIXME # comma between schemas (glue for multiple headers with same name) # ({'www-authenticate': 'Digest realm="2-comma-d", qop="auth-int", nonce="c0c8ff1", Basic realm="2-comma-b"'}, # {'digest': {'realm': '2-comma-d', 'qop': 'auth-int', 'nonce': 'c0c8ff1'}, # 'basic': {'realm': '2-comma-b'}}), # FIXME # comma between schemas + WSSE (glue for multiple headers with same name) # ({'www-authenticate': 'Digest realm="com3d", Basic realm="com3b", WSSE realm="com3w", profile="token"'}, # {'digest': {'realm': 'com3d'}, 'basic': {'realm': 'com3b'}, 'wsse': {'realm': 'com3w', profile': 'token'}}), # FIXME # multiple syntax figures # ({'www-authenticate': # 'Digest realm="brig", qop \t=\t"\tauth,auth-int", nonce="(*)&^&$%#",opaque="5ccc"' + # ', Basic REAlm="zoo", WSSE realm="very", profile="UsernameToken"'}, # {'digest': {'realm': 'brig', 'qop': 'auth,auth-int', 'nonce': '(*)&^&$%#', 'opaque': '5ccc'}, # 'basic': {'realm': 'zoo'}, # 'wsse': {'realm': 'very', 'profile': 'UsernameToken'}}), # more quote combos ( { "www-authenticate": 'Digest realm="myrealm", nonce="KBAA=3", algorithm=MD5, qop="auth", stale=true' }, { "digest": { "realm": "myrealm", "nonce": "KBAA=3", "algorithm": "MD5", "qop": "auth", "stale": "true", } }, ), ), ids=lambda data: str(data[0]), ) @pytest.mark.parametrize("strict", (True, False), ids=("strict", "relax")) def test_parse_www_authenticate_correct(data, strict): headers, info = data # FIXME: move strict to parse argument httplib2.USE_WWW_AUTH_STRICT_PARSING = strict try: assert httplib2._parse_www_authenticate(headers) == info finally: httplib2.USE_WWW_AUTH_STRICT_PARSING = 0 def test_parse_www_authenticate_malformed(): # TODO: test (and fix) header value 'barbqwnbm-bb...:asd' leads to dead loop with tests.assert_raises(httplib2.MalformedHeader): httplib2._parse_www_authenticate( { "www-authenticate": 'OAuth "Facebook Platform" "invalid_token" "Invalid OAuth access token."' } ) def test_digest_object(): credentials = ("joe", "password") host = None request_uri = "/test/digest/" headers = {} response = { "www-authenticate": 'Digest realm="myrealm", nonce="KBAA=35", algorithm=MD5, qop="auth"' } content = b"" d = httplib2.DigestAuthentication( credentials, host, request_uri, headers, response, content, None ) d.request("GET", request_uri, headers, content, cnonce="33033375ec278a46") our_request = "authorization: " + headers["authorization"] working_request = ( 'authorization: Digest username="joe", realm="myrealm", ' 'nonce="KBAA=35", uri="/test/digest/"' + ', algorithm=MD5, response="de6d4a123b80801d0e94550411b6283f", ' 'qop=auth, nc=00000001, cnonce="33033375ec278a46"' ) assert our_request == working_request def test_digest_object_with_opaque(): credentials = ("joe", "password") host = None request_uri = "/digest/opaque/" headers = {} response = { "www-authenticate": 'Digest realm="myrealm", nonce="30352fd", algorithm=MD5, ' 'qop="auth", opaque="atestopaque"' } content = "" d = httplib2.DigestAuthentication( credentials, host, request_uri, headers, response, content, None ) d.request("GET", request_uri, headers, content, cnonce="5ec2") our_request = "authorization: " + headers["authorization"] working_request = ( 'authorization: Digest username="joe", realm="myrealm", ' 'nonce="30352fd", uri="/digest/opaque/", algorithm=MD5' + ', response="a1fab43041f8f3789a447f48018bee48", qop=auth, nc=00000001, ' 'cnonce="5ec2", opaque="atestopaque"' ) assert our_request == working_request def test_digest_object_stale(): credentials = ("joe", "password") host = None request_uri = "/digest/stale/" headers = {} response = httplib2.Response({}) response["www-authenticate"] = ( 'Digest realm="myrealm", nonce="bd669f", ' 'algorithm=MD5, qop="auth", stale=true' ) response.status = 401 content = b"" d = httplib2.DigestAuthentication( credentials, host, request_uri, headers, response, content, None ) # Returns true to force a retry assert d.response(response, content) def test_digest_object_auth_info(): credentials = ("joe", "password") host = None request_uri = "/digest/nextnonce/" headers = {} response = httplib2.Response({}) response["www-authenticate"] = ( 'Digest realm="myrealm", nonce="barney", ' 'algorithm=MD5, qop="auth", stale=true' ) response["authentication-info"] = 'nextnonce="fred"' content = b"" d = httplib2.DigestAuthentication( credentials, host, request_uri, headers, response, content, None ) # Returns true to force a retry assert not d.response(response, content) assert d.challenge["nonce"] == "fred" assert d.challenge["nc"] == 1 def test_wsse_algorithm(): digest = httplib2._wsse_username_token( "d36e316282959a9ed4c89851497a717f", "2003-12-15T14:43:07Z", "taadtaadpstcsm" ) expected = b"quR/EWLAV4xLf9Zqyw4pDmfV9OY=" assert expected == digest