普通文本  |  300行  |  10.17 KB

import webapp2
from webapp2_extras import sessions

from webapp2_extras import auth
from webapp2_extras.appengine.auth import models

from google.appengine.ext.ndb import model

import test_base


class TestAuth(test_base.BaseTestCase):

    def setUp(self):
        super(TestAuth, self).setUp()
        self.register_model('User', models.User)
        self.register_model('UserToken', models.UserToken)
        self.register_model('Unique', models.Unique)

    def _check_token(self, user_id, token, subject='auth'):
        rv = models.UserToken.get(user=user_id, subject=subject, token=token)
        return rv is not None

    def test_get_user_by_session(self):
        app = webapp2.WSGIApplication(config={
            'webapp2_extras.sessions': {
                'secret_key': 'foo',
            }
        })
        req = webapp2.Request.blank('/')
        rsp = webapp2.Response()
        req.app = app
        s = auth.get_store(app=app)
        a = auth.Auth(request=req)
        session_store = sessions.get_store(request=req)

        # This won't work.
        a.set_session_data({})
        self.assertEqual(a.session.get('_user'), None)

        # This won't work.
        a.session['_user'] = {}
        self.assertEqual(a.get_session_data(), None)
        self.assertEqual(a.session.get('_user'), None)

        # Create a user.
        m = models.User
        success, user = m.create_user(auth_id='auth_id',
                                      password_raw='password')

        user_id = user.key.id()

        # Get user with session. An anonymous_user is returned.
        rv = a.get_user_by_session()
        self.assertTrue(rv is None)

        # Login with password. User dict is returned.
        rv = a.get_user_by_password('auth_id', 'password')
        self.assertEqual(rv['user_id'], user_id)

        # Save sessions.
        session_store.save_sessions(rsp)

        # Get user with session. Voila!
        cookies = rsp.headers.get('Set-Cookie')
        req = webapp2.Request.blank('/', headers=[('Cookie', cookies)])
        rsp = webapp2.Response()
        req.app = app
        a = auth.Auth(request=req)

        # only auth_id is returned when there're no
        # custom user attributes defined.
        rv = a.get_user_by_session()
        self.assertEqual(rv['user_id'], user_id)

        # If we call get_user_by_token() now, the same user is returned.
        rv2 = a.get_user_by_token(rv['user_id'], rv['token'])
        self.assertTrue(rv is rv2)

        # Let's get it again and check that token is the same.
        token = rv['token']
        a._user = None
        rv = a.get_user_by_session()
        self.assertEqual(rv['user_id'], user_id)
        self.assertEqual(rv['token'], token)

        # Now let's force token to be renewed and check that we have a new one.
        s.config['token_new_age'] = -300
        a._user = None
        rv = a.get_user_by_session()
        self.assertEqual(rv['user_id'], user_id)
        self.assertNotEqual(rv['token'], token)

        # Now let's force token to be invalid.
        s.config['token_max_age'] = -300
        a._user = None
        rv = a.get_user_by_session()
        self.assertEqual(rv, None)

    def test_get_user_by_password(self):
        app = webapp2.WSGIApplication(config={
            'webapp2_extras.sessions': {
                'secret_key': 'foo',
            }
        })
        req = webapp2.Request.blank('/')
        req.app = app
        s = auth.get_store(app=app)
        a = auth.get_auth(request=req)
        session_store = sessions.get_store(request=req)

        m = models.User
        success, user = m.create_user(auth_id='auth_id',
                                      password_raw='password')

        user_id = user.key.id()
        # Lets test the cookie max_age when we use remember=True or False.
        rv = a.get_user_by_password('auth_id', 'password', remember=True)
        self.assertEqual(rv['user_id'], user_id)
        self.assertEqual(session_store.sessions['auth'].session_args['max_age'],
                         86400 * 7 * 3)

        # Now remember=False.
        rv = a.get_user_by_password('auth_id', 'password')
        self.assertEqual(rv['user_id'], user_id)
        self.assertEqual(session_store.sessions['auth'].session_args['max_age'],
                         None)

        # User was set so getting it from session will return the same one.
        rv = a.get_user_by_session()
        self.assertEqual(rv['user_id'], user_id)

        # Now try a failed password submission: user will be unset.
        rv = a.get_user_by_password('auth_id', 'password_2', silent=True)
        self.assertTrue(rv is None)

        # And getting by session will no longer work.
        rv = a.get_user_by_session()
        self.assertTrue(rv is None)

    def test_validate_password(self):
        app = webapp2.WSGIApplication()
        req = webapp2.Request.blank('/')
        req.app = app
        s = auth.get_store(app=app)

        m = models.User
        success, user = m.create_user(auth_id='auth_id',
                                      password_raw='foo')

        u = s.validate_password('auth_id', 'foo')
        self.assertEqual(u, s.user_to_dict(user))
        self.assertRaises(auth.InvalidPasswordError,
                          s.validate_password, 'auth_id', 'bar')
        self.assertRaises(auth.InvalidAuthIdError,
                          s.validate_password, 'auth_id_2', 'foo')

    def test_validate_token(self):
        app = webapp2.WSGIApplication()
        req = webapp2.Request.blank('/')
        req.app = app
        s = auth.get_store(app=app)

        rv = s.validate_token('auth_id', 'token')
        self.assertEqual(rv, (None, None))

        # Expired timestamp.
        rv = s.validate_token('auth_id', 'token', -300)
        self.assertEqual(rv, (None, None))

        m = models.User
        success, user = m.create_user(auth_id='auth_id',
                                      password_raw='foo')

        user_id = user.key.id()
        token = m.create_auth_token(user_id)
        rv = s.validate_token(user_id, token)
        self.assertEqual(rv, (s.user_to_dict(user), token))
        # Token must still be there.
        self.assertTrue(self._check_token(user_id, token))

        # Expired timestamp.
        token = m.create_auth_token(user_id)
        rv = s.validate_token(user_id, token, -300)
        self.assertEqual(rv, (None, None))
        # Token must have been deleted.
        self.assertFalse(self._check_token(user_id, token))

        # Force expiration.
        token = m.create_auth_token(user_id)
        s.config['token_max_age'] = -300
        rv = s.validate_token(user_id, token)
        self.assertEqual(rv, (None, None))
        # Token must have been deleted.
        self.assertFalse(self._check_token(user_id, token))

        # Revert expiration, force renewal.
        token = m.create_auth_token(user_id)
        s.config['token_max_age'] = 86400 * 7 * 3
        s.config['token_new_age'] = -300
        rv = s.validate_token(user_id, token)
        self.assertEqual(rv, (s.user_to_dict(user), None))
        # Token must have been deleted.
        self.assertFalse(self._check_token(user_id, token))

    def test_set_auth_store(self):
        app = webapp2.WSGIApplication()
        req = webapp2.Request.blank('/')
        req.app = app
        store = auth.AuthStore(app)

        self.assertEqual(len(app.registry), 0)
        auth.set_store(store, app=app)
        self.assertEqual(len(app.registry), 1)
        s = auth.get_store(app=app)
        self.assertTrue(isinstance(s, auth.AuthStore))

    def test_get_auth_store(self):
        app = webapp2.WSGIApplication()
        req = webapp2.Request.blank('/')
        req.app = app
        self.assertEqual(len(app.registry), 0)
        s = auth.get_store(app=app)
        self.assertEqual(len(app.registry), 1)
        self.assertTrue(isinstance(s, auth.AuthStore))

    def test_set_auth(self):
        app = webapp2.WSGIApplication()
        req = webapp2.Request.blank('/')
        req.app = app
        a = auth.Auth(req)

        self.assertEqual(len(req.registry), 0)
        auth.set_auth(a, request=req)
        self.assertEqual(len(req.registry), 1)
        a = auth.get_auth(request=req)
        self.assertTrue(isinstance(a, auth.Auth))

    def test_get_auth(self):
        app = webapp2.WSGIApplication()
        req = webapp2.Request.blank('/')
        req.app = app
        self.assertEqual(len(req.registry), 0)
        a = auth.get_auth(request=req)
        self.assertEqual(len(req.registry), 1)
        self.assertTrue(isinstance(a, auth.Auth))

    '''
    def test_set_callables(self):
        app = webapp2.WSGIApplication()
        req = webapp2.Request.blank('/')
        req.app = app
        s = auth.get_store(app=app)

        def validate_password(store, auth_id, password):
            self.assertTrue(store is s)
            self.assertEqual(auth_id, 'auth_id')
            self.assertEqual(password, 'password')
            return 'validate_password'

        def validate_token(store, auth_id, token, token_ts=None):
            self.assertTrue(store is s)
            self.assertEqual(auth_id, 'auth_id')
            self.assertEqual(token, 'token')
            self.assertEqual(token_ts, 'token_ts')
            return 'validate_token'

        s.set_password_validator(validate_password)
        rv = s.validate_password('auth_id', 'password')
        self.assertEqual(rv, 'validate_password')

        s.set_token_validator(validate_token)
        rv = s.validate_token('auth_id', 'token', 'token_ts')
        self.assertEqual(rv, 'validate_token')
    '''

    def test_extended_user(self):
        class MyUser(models.User):
            newsletter = model.BooleanProperty()
            age = model.IntegerProperty()

        auth_id = 'own:username'
        success, info = MyUser.create_user(auth_id, newsletter=True, age=22)
        self.assertTrue(success)

        app = webapp2.WSGIApplication(config={
            'webapp2_extras.auth': {
                'user_model': MyUser,
            }
        })
        s = auth.get_store(app=app)
        user = s.user_model.get_by_auth_id(auth_id)
        self.assertEqual(info, user)
        self.assertEqual(user.age, 22)
        self.assertTrue(user.newsletter is True)


if __name__ == '__main__':
    test_base.main()