# (c) 2005 Ben Bangert
# This module is part of the Python Paste Project and is released under
# the MIT License: http://www.opensource.org/licenses/mit-license.php
from nose.tools import assert_raises
from paste.fixture import *
from paste.registry import *
from paste.registry import Registry
from paste.evalexception.middleware import EvalException
regobj = StackedObjectProxy()
secondobj = StackedObjectProxy(default=dict(hi='people'))
def simpleapp(environ, start_response):
status = '200 OK'
response_headers = [('Content-type','text/plain')]
start_response(status, response_headers)
return [b'Hello world!\n']
def simpleapp_withregistry(environ, start_response):
status = '200 OK'
response_headers = [('Content-type','text/plain')]
start_response(status, response_headers)
body = 'Hello world!Value is %s\n' % regobj.keys()
if six.PY3:
body = body.encode('utf8')
return [body]
def simpleapp_withregistry_default(environ, start_response):
status = '200 OK'
response_headers = [('Content-type','text/plain')]
start_response(status, response_headers)
body = 'Hello world!Value is %s\n' % secondobj
if six.PY3:
body = body.encode('utf8')
return [body]
class RegistryUsingApp(object):
def __init__(self, var, value, raise_exc=False):
self.var = var
self.value = value
self.raise_exc = raise_exc
def __call__(self, environ, start_response):
if 'paste.registry' in environ:
environ['paste.registry'].register(self.var, self.value)
if self.raise_exc:
raise self.raise_exc
status = '200 OK'
response_headers = [('Content-type','text/plain')]
start_response(status, response_headers)
body = 'Hello world!\nThe variable is %s' % str(regobj)
if six.PY3:
body = body.encode('utf8')
return [body]
class RegistryUsingIteratorApp(object):
def __init__(self, var, value):
self.var = var
self.value = value
def __call__(self, environ, start_response):
if 'paste.registry' in environ:
environ['paste.registry'].register(self.var, self.value)
status = '200 OK'
response_headers = [('Content-type','text/plain')]
start_response(status, response_headers)
body = 'Hello world!\nThe variable is %s' % str(regobj)
if six.PY3:
body = body.encode('utf8')
return iter([body])
class RegistryMiddleMan(object):
def __init__(self, app, var, value, depth):
self.app = app
self.var = var
self.value = value
self.depth = depth
def __call__(self, environ, start_response):
if 'paste.registry' in environ:
environ['paste.registry'].register(self.var, self.value)
line = ('\nInserted by middleware!\nInsertValue at depth %s is %s'
% (self.depth, str(regobj)))
if six.PY3:
line = line.encode('utf8')
app_response = [line]
app_iter = None
app_iter = self.app(environ, start_response)
if type(app_iter) in (list, tuple):
app_response.extend(app_iter)
else:
response = []
for line in app_iter:
response.append(line)
if hasattr(app_iter, 'close'):
app_iter.close()
app_response.extend(response)
line = ('\nAppended by middleware!\nAppendValue at \
depth %s is %s' % (self.depth, str(regobj)))
if six.PY3:
line = line.encode('utf8')
app_response.append(line)
return app_response
def test_simple():
app = TestApp(simpleapp)
response = app.get('/')
assert 'Hello world' in response
def test_solo_registry():
obj = {'hi':'people'}
wsgiapp = RegistryUsingApp(regobj, obj)
wsgiapp = RegistryManager(wsgiapp)
app = TestApp(wsgiapp)
res = app.get('/')
assert 'Hello world' in res
assert 'The variable is' in res
assert "{'hi': 'people'}" in res
def test_registry_no_object_error():
app = TestApp(simpleapp_withregistry)
assert_raises(TypeError, app.get, '/')
def test_with_default_object():
app = TestApp(simpleapp_withregistry_default)
res = app.get('/')
print(res)
assert 'Hello world' in res
assert "Value is {'hi': 'people'}" in res
def test_double_registry():
obj = {'hi':'people'}
secondobj = {'bye':'friends'}
wsgiapp = RegistryUsingApp(regobj, obj)
wsgiapp = RegistryManager(wsgiapp)
wsgiapp = RegistryMiddleMan(wsgiapp, regobj, secondobj, 0)
wsgiapp = RegistryManager(wsgiapp)
app = TestApp(wsgiapp)
res = app.get('/')
assert 'Hello world' in res
assert 'The variable is' in res
assert "{'hi': 'people'}" in res
assert "InsertValue at depth 0 is {'bye': 'friends'}" in res
assert "AppendValue at depth 0 is {'bye': 'friends'}" in res
def test_really_deep_registry():
keylist = ['fred', 'wilma', 'barney', 'homer', 'marge', 'bart', 'lisa',
'maggie']
valuelist = range(0, len(keylist))
obj = {'hi':'people'}
wsgiapp = RegistryUsingApp(regobj, obj)
wsgiapp = RegistryManager(wsgiapp)
for depth in valuelist:
newobj = {keylist[depth]: depth}
wsgiapp = RegistryMiddleMan(wsgiapp, regobj, newobj, depth)
wsgiapp = RegistryManager(wsgiapp)
app = TestApp(wsgiapp)
res = app.get('/')
assert 'Hello world' in res
assert 'The variable is' in res
assert "{'hi': 'people'}" in res
for depth in valuelist:
assert "InsertValue at depth %s is {'%s': %s}" % \
(depth, keylist[depth], depth) in res
for depth in valuelist:
assert "AppendValue at depth %s is {'%s': %s}" % \
(depth, keylist[depth], depth) in res
def test_iterating_response():
obj = {'hi':'people'}
secondobj = {'bye':'friends'}
wsgiapp = RegistryUsingIteratorApp(regobj, obj)
wsgiapp = RegistryManager(wsgiapp)
wsgiapp = RegistryMiddleMan(wsgiapp, regobj, secondobj, 0)
wsgiapp = RegistryManager(wsgiapp)
app = TestApp(wsgiapp)
res = app.get('/')
assert 'Hello world' in res
assert 'The variable is' in res
assert "{'hi': 'people'}" in res
assert "InsertValue at depth 0 is {'bye': 'friends'}" in res
assert "AppendValue at depth 0 is {'bye': 'friends'}" in res
def _test_restorer(stack, data):
# We need to test the request's specific Registry. Initialize it here so we
# can use it later (RegistryManager will re-use one preexisting in the
# environ)
registry = Registry()
extra_environ={'paste.throw_errors': False,
'paste.registry': registry}
request_id = restorer.get_request_id(extra_environ)
app = TestApp(stack)
res = app.get('/', extra_environ=extra_environ, expect_errors=True)
# Ensure all the StackedObjectProxies are empty after the RegistryUsingApp
# raises an Exception
for stacked, proxied_obj, test_cleanup in data:
only_key = list(proxied_obj.keys())[0]
try:
assert only_key not in stacked
assert False
except TypeError:
# Definitely empty
pass
# Ensure the StackedObjectProxies & Registry 'work' in the simulated
# EvalException context
replace = {'replace': 'dict'}
new = {'new': 'object'}
restorer.restoration_begin(request_id)
try:
for stacked, proxied_obj, test_cleanup in data:
# Ensure our original data magically re-appears in this context
only_key, only_val = list(proxied_obj.items())[0]
assert only_key in stacked and stacked[only_key] == only_val
# Ensure the Registry still works
registry.prepare()
registry.register(stacked, new)
assert 'new' in stacked and stacked['new'] == 'object'
registry.cleanup()
# Back to the original (pre-prepare())
assert only_key in stacked and stacked[only_key] == only_val
registry.replace(stacked, replace)
assert 'replace' in stacked and stacked['replace'] == 'dict'
if test_cleanup:
registry.cleanup()
try:
stacked._current_obj()
assert False
except TypeError:
# Definitely empty
pass
finally:
restorer.restoration_end()
def _restorer_data():
S = StackedObjectProxy
d = [[S(name='first'), dict(top='of the registry stack'), False],
[S(name='second'), dict(middle='of the stack'), False],
[S(name='third'), dict(bottom='of the STACK.'), False]]
return d
def _set_cleanup_test(data):
"""Instruct _test_restorer to check registry cleanup at this level of the stack
"""
data[2] = True
def test_restorer_basic():
data = _restorer_data()[0]
wsgiapp = RegistryUsingApp(data[0], data[1], raise_exc=Exception())
wsgiapp = RegistryManager(wsgiapp)
_set_cleanup_test(data)
wsgiapp = EvalException(wsgiapp)
_test_restorer(wsgiapp, [data])
def test_restorer_basic_manager_outside():
data = _restorer_data()[0]
wsgiapp = RegistryUsingApp(data[0], data[1], raise_exc=Exception())
wsgiapp = EvalException(wsgiapp)
wsgiapp = RegistryManager(wsgiapp)
_set_cleanup_test(data)
_test_restorer(wsgiapp, [data])
def test_restorer_middleman_nested_evalexception():
data = _restorer_data()[:2]
wsgiapp = RegistryUsingApp(data[0][0], data[0][1], raise_exc=Exception())
wsgiapp = EvalException(wsgiapp)
wsgiapp = RegistryMiddleMan(wsgiapp, data[1][0], data[1][1], 0)
wsgiapp = RegistryManager(wsgiapp)
_set_cleanup_test(data[1])
_test_restorer(wsgiapp, data)
def test_restorer_nested_middleman():
data = _restorer_data()[:2]
wsgiapp = RegistryUsingApp(data[0][0], data[0][1], raise_exc=Exception())
wsgiapp = RegistryManager(wsgiapp)
_set_cleanup_test(data[0])
wsgiapp = RegistryMiddleMan(wsgiapp, data[1][0], data[1][1], 0)
wsgiapp = EvalException(wsgiapp)
wsgiapp = RegistryManager(wsgiapp)
_set_cleanup_test(data[1])
_test_restorer(wsgiapp, data)
def test_restorer_middlemen_nested_evalexception():
data = _restorer_data()
wsgiapp = RegistryUsingApp(data[0][0], data[0][1], raise_exc=Exception())
wsgiapp = RegistryManager(wsgiapp)
_set_cleanup_test(data[0])
wsgiapp = EvalException(wsgiapp)
wsgiapp = RegistryMiddleMan(wsgiapp, data[1][0], data[1][1], 0)
wsgiapp = RegistryManager(wsgiapp)
_set_cleanup_test(data[1])
wsgiapp = RegistryMiddleMan(wsgiapp, data[2][0], data[2][1], 1)
wsgiapp = RegistryManager(wsgiapp)
_set_cleanup_test(data[2])
_test_restorer(wsgiapp, data)
def test_restorer_disabled():
# Ensure restoration_begin/end work safely when there's no Registry
wsgiapp = TestApp(simpleapp)
wsgiapp.get('/')
try:
restorer.restoration_begin(1)
finally:
restorer.restoration_end()
# A second call should do nothing
restorer.restoration_end()