普通文本  |  601行  |  19.81 KB

# (c) 2005 Ian Bicking and contributors; written for Paste (http://pythonpaste.org)
# Licensed under the MIT license: http://www.opensource.org/licenses/mit-license.php

"""
A module of many disparate routines.
"""

from __future__ import print_function

# functions which moved to paste.request and paste.response
# Deprecated around 15 Dec 2005
from paste.request import get_cookies, parse_querystring, parse_formvars
from paste.request import construct_url, path_info_split, path_info_pop
from paste.response import HeaderDict, has_header, header_value, remove_header
from paste.response import error_body_response, error_response, error_response_app

from traceback import print_exception
import six
import sys
from six.moves import cStringIO as StringIO
from six.moves.urllib.parse import unquote, urlsplit
import warnings

__all__ = ['add_close', 'add_start_close', 'capture_output', 'catch_errors',
           'catch_errors_app', 'chained_app_iters', 'construct_url',
           'dump_environ', 'encode_unicode_app_iter', 'error_body_response',
           'error_response', 'get_cookies', 'has_header', 'header_value',
           'interactive', 'intercept_output', 'path_info_pop',
           'path_info_split', 'raw_interactive', 'send_file']

class add_close(object):
    """
    An an iterable that iterates over app_iter, then calls
    close_func.
    """

    def __init__(self, app_iterable, close_func):
        self.app_iterable = app_iterable
        self.app_iter = iter(app_iterable)
        self.close_func = close_func
        self._closed = False

    def __iter__(self):
        return self

    def next(self):
        return self.app_iter.next()

    def close(self):
        self._closed = True
        if hasattr(self.app_iterable, 'close'):
            self.app_iterable.close()
        self.close_func()

    def __del__(self):
        if not self._closed:
            # We can't raise an error or anything at this stage
            print("Error: app_iter.close() was not called when finishing "
                "WSGI request. finalization function %s not called"
                  % self.close_func, file=sys.stderr)

class add_start_close(object):
    """
    An an iterable that iterates over app_iter, calls start_func
    before the first item is returned, then calls close_func at the
    end.
    """

    def __init__(self, app_iterable, start_func, close_func=None):
        self.app_iterable = app_iterable
        self.app_iter = iter(app_iterable)
        self.first = True
        self.start_func = start_func
        self.close_func = close_func
        self._closed = False

    def __iter__(self):
        return self

    def next(self):
        if self.first:
            self.start_func()
            self.first = False
        return next(self.app_iter)
    __next__ = next

    def close(self):
        self._closed = True
        if hasattr(self.app_iterable, 'close'):
            self.app_iterable.close()
        if self.close_func is not None:
            self.close_func()

    def __del__(self):
        if not self._closed:
            # We can't raise an error or anything at this stage
            print("Error: app_iter.close() was not called when finishing "
                "WSGI request. finalization function %s not called"
                  % self.close_func, file=sys.stderr)

class chained_app_iters(object):

    """
    Chains several app_iters together, also delegating .close() to each
    of them.
    """

    def __init__(self, *chained):
        self.app_iters = chained
        self.chained = [iter(item) for item in chained]
        self._closed = False

    def __iter__(self):
        return self

    def next(self):
        if len(self.chained) == 1:
            return self.chained[0].next()
        else:
            try:
                return self.chained[0].next()
            except StopIteration:
                self.chained.pop(0)
                return self.next()

    def close(self):
        self._closed = True
        got_exc = None
        for app_iter in self.app_iters:
            try:
                if hasattr(app_iter, 'close'):
                    app_iter.close()
            except:
                got_exc = sys.exc_info()
        if got_exc:
            six.reraise(got_exc[0], got_exc[1], got_exc[2])

    def __del__(self):
        if not self._closed:
            # We can't raise an error or anything at this stage
            print("Error: app_iter.close() was not called when finishing "
                "WSGI request. finalization function %s not called"
                  % self.close_func, file=sys.stderr)

class encode_unicode_app_iter(object):
    """
    Encodes an app_iterable's unicode responses as strings
    """

    def __init__(self, app_iterable, encoding=sys.getdefaultencoding(),
                 errors='strict'):
        self.app_iterable = app_iterable
        self.app_iter = iter(app_iterable)
        self.encoding = encoding
        self.errors = errors

    def __iter__(self):
        return self

    def next(self):
        content = next(self.app_iter)
        if isinstance(content, six.text_type):
            content = content.encode(self.encoding, self.errors)
        return content
    __next__ = next

    def close(self):
        if hasattr(self.app_iterable, 'close'):
            self.app_iterable.close()

def catch_errors(application, environ, start_response, error_callback,
                 ok_callback=None):
    """
    Runs the application, and returns the application iterator (which should be
    passed upstream).  If an error occurs then error_callback will be called with
    exc_info as its sole argument.  If no errors occur and ok_callback is given,
    then it will be called with no arguments.
    """
    try:
        app_iter = application(environ, start_response)
    except:
        error_callback(sys.exc_info())
        raise
    if type(app_iter) in (list, tuple):
        # These won't produce exceptions
        if ok_callback:
            ok_callback()
        return app_iter
    else:
        return _wrap_app_iter(app_iter, error_callback, ok_callback)

class _wrap_app_iter(object):

    def __init__(self, app_iterable, error_callback, ok_callback):
        self.app_iterable = app_iterable
        self.app_iter = iter(app_iterable)
        self.error_callback = error_callback
        self.ok_callback = ok_callback
        if hasattr(self.app_iterable, 'close'):
            self.close = self.app_iterable.close

    def __iter__(self):
        return self

    def next(self):
        try:
            return self.app_iter.next()
        except StopIteration:
            if self.ok_callback:
                self.ok_callback()
            raise
        except:
            self.error_callback(sys.exc_info())
            raise

def catch_errors_app(application, environ, start_response, error_callback_app,
                     ok_callback=None, catch=Exception):
    """
    Like ``catch_errors``, except error_callback_app should be a
    callable that will receive *three* arguments -- ``environ``,
    ``start_response``, and ``exc_info``.  It should call
    ``start_response`` (*with* the exc_info argument!) and return an
    iterator.
    """
    try:
        app_iter = application(environ, start_response)
    except catch:
        return error_callback_app(environ, start_response, sys.exc_info())
    if type(app_iter) in (list, tuple):
        # These won't produce exceptions
        if ok_callback is not None:
            ok_callback()
        return app_iter
    else:
        return _wrap_app_iter_app(
            environ, start_response, app_iter,
            error_callback_app, ok_callback, catch=catch)

class _wrap_app_iter_app(object):

    def __init__(self, environ, start_response, app_iterable,
                 error_callback_app, ok_callback, catch=Exception):
        self.environ = environ
        self.start_response = start_response
        self.app_iterable = app_iterable
        self.app_iter = iter(app_iterable)
        self.error_callback_app = error_callback_app
        self.ok_callback = ok_callback
        self.catch = catch
        if hasattr(self.app_iterable, 'close'):
            self.close = self.app_iterable.close

    def __iter__(self):
        return self

    def next(self):
        try:
            return self.app_iter.next()
        except StopIteration:
            if self.ok_callback:
                self.ok_callback()
            raise
        except self.catch:
            if hasattr(self.app_iterable, 'close'):
                try:
                    self.app_iterable.close()
                except:
                    # @@: Print to wsgi.errors?
                    pass
            new_app_iterable = self.error_callback_app(
                self.environ, self.start_response, sys.exc_info())
            app_iter = iter(new_app_iterable)
            if hasattr(new_app_iterable, 'close'):
                self.close = new_app_iterable.close
            self.next = app_iter.next
            return self.next()

def raw_interactive(application, path='', raise_on_wsgi_error=False,
                    **environ):
    """
    Runs the application in a fake environment.
    """
    assert "path_info" not in environ, "argument list changed"
    if raise_on_wsgi_error:
        errors = ErrorRaiser()
    else:
        errors = six.BytesIO()
    basic_environ = {
        # mandatory CGI variables
        'REQUEST_METHOD': 'GET',     # always mandatory
        'SCRIPT_NAME': '',           # may be empty if app is at the root
        'PATH_INFO': '',             # may be empty if at root of app
        'SERVER_NAME': 'localhost',  # always mandatory
        'SERVER_PORT': '80',         # always mandatory
        'SERVER_PROTOCOL': 'HTTP/1.0',
        # mandatory wsgi variables
        'wsgi.version': (1, 0),
        'wsgi.url_scheme': 'http',
        'wsgi.input': six.BytesIO(),
        'wsgi.errors': errors,
        'wsgi.multithread': False,
        'wsgi.multiprocess': False,
        'wsgi.run_once': False,
        }
    if path:
        (_, _, path_info, query, fragment) = urlsplit(str(path))
        path_info = unquote(path_info)
        # urlsplit returns unicode so coerce it back to str
        path_info, query = str(path_info), str(query)
        basic_environ['PATH_INFO'] = path_info
        if query:
            basic_environ['QUERY_STRING'] = query
    for name, value in environ.items():
        name = name.replace('__', '.')
        basic_environ[name] = value
    if ('SERVER_NAME' in basic_environ
        and 'HTTP_HOST' not in basic_environ):
        basic_environ['HTTP_HOST'] = basic_environ['SERVER_NAME']
    istream = basic_environ['wsgi.input']
    if isinstance(istream, bytes):
        basic_environ['wsgi.input'] = six.BytesIO(istream)
        basic_environ['CONTENT_LENGTH'] = len(istream)
    data = {}
    output = []
    headers_set = []
    headers_sent = []
    def start_response(status, headers, exc_info=None):
        if exc_info:
            try:
                if headers_sent:
                    # Re-raise original exception only if headers sent
                    six.reraise(exc_info[0], exc_info[1], exc_info[2])
            finally:
                # avoid dangling circular reference
                exc_info = None
        elif headers_set:
            # You cannot set the headers more than once, unless the
            # exc_info is provided.
            raise AssertionError("Headers already set and no exc_info!")
        headers_set.append(True)
        data['status'] = status
        data['headers'] = headers
        return output.append
    app_iter = application(basic_environ, start_response)
    try:
        try:
            for s in app_iter:
                if not isinstance(s, six.binary_type):
                    raise ValueError(
                        "The app_iter response can only contain bytes (not "
                        "unicode); got: %r" % s)
                headers_sent.append(True)
                if not headers_set:
                    raise AssertionError("Content sent w/o headers!")
                output.append(s)
        except TypeError as e:
            # Typically "iteration over non-sequence", so we want
            # to give better debugging information...
            e.args = ((e.args[0] + ' iterable: %r' % app_iter),) + e.args[1:]
            raise
    finally:
        if hasattr(app_iter, 'close'):
            app_iter.close()
    return (data['status'], data['headers'], b''.join(output),
            errors.getvalue())

class ErrorRaiser(object):

    def flush(self):
        pass

    def write(self, value):
        if not value:
            return
        raise AssertionError(
            "No errors should be written (got: %r)" % value)

    def writelines(self, seq):
        raise AssertionError(
            "No errors should be written (got lines: %s)" % list(seq))

    def getvalue(self):
        return ''

def interactive(*args, **kw):
    """
    Runs the application interatively, wrapping `raw_interactive` but
    returning the output in a formatted way.
    """
    status, headers, content, errors = raw_interactive(*args, **kw)
    full = StringIO()
    if errors:
        full.write('Errors:\n')
        full.write(errors.strip())
        full.write('\n----------end errors\n')
    full.write(status + '\n')
    for name, value in headers:
        full.write('%s: %s\n' % (name, value))
    full.write('\n')
    full.write(content)
    return full.getvalue()
interactive.proxy = 'raw_interactive'

def dump_environ(environ, start_response):
    """
    Application which simply dumps the current environment
    variables out as a plain text response.
    """
    output = []
    keys = list(environ.keys())
    keys.sort()
    for k in keys:
        v = str(environ[k]).replace("\n","\n    ")
        output.append("%s: %s\n" % (k, v))
    output.append("\n")
    content_length = environ.get("CONTENT_LENGTH", '')
    if content_length:
        output.append(environ['wsgi.input'].read(int(content_length)))
        output.append("\n")
    output = "".join(output)
    if six.PY3:
        output = output.encode('utf8')
    headers = [('Content-Type', 'text/plain'),
               ('Content-Length', str(len(output)))]
    start_response("200 OK", headers)
    return [output]

def send_file(filename):
    warnings.warn(
        "wsgilib.send_file has been moved to paste.fileapp.FileApp",
        DeprecationWarning, 2)
    from paste import fileapp
    return fileapp.FileApp(filename)

def capture_output(environ, start_response, application):
    """
    Runs application with environ and start_response, and captures
    status, headers, and body.

    Sends status and header, but *not* body.  Returns (status,
    headers, body).  Typically this is used like:

    .. code-block:: python

        def dehtmlifying_middleware(application):
            def replacement_app(environ, start_response):
                status, headers, body = capture_output(
                    environ, start_response, application)
                content_type = header_value(headers, 'content-type')
                if (not content_type
                    or not content_type.startswith('text/html')):
                    return [body]
                body = re.sub(r'<.*?>', '', body)
                return [body]
            return replacement_app

    """
    warnings.warn(
        'wsgilib.capture_output has been deprecated in favor '
        'of wsgilib.intercept_output',
        DeprecationWarning, 2)
    data = []
    output = StringIO()
    def replacement_start_response(status, headers, exc_info=None):
        if data:
            data[:] = []
        data.append(status)
        data.append(headers)
        start_response(status, headers, exc_info)
        return output.write
    app_iter = application(environ, replacement_start_response)
    try:
        for item in app_iter:
            output.write(item)
    finally:
        if hasattr(app_iter, 'close'):
            app_iter.close()
    if not data:
        data.append(None)
    if len(data) < 2:
        data.append(None)
    data.append(output.getvalue())
    return data

def intercept_output(environ, application, conditional=None,
                     start_response=None):
    """
    Runs application with environ and captures status, headers, and
    body.  None are sent on; you must send them on yourself (unlike
    ``capture_output``)

    Typically this is used like:

    .. code-block:: python

        def dehtmlifying_middleware(application):
            def replacement_app(environ, start_response):
                status, headers, body = intercept_output(
                    environ, application)
                start_response(status, headers)
                content_type = header_value(headers, 'content-type')
                if (not content_type
                    or not content_type.startswith('text/html')):
                    return [body]
                body = re.sub(r'<.*?>', '', body)
                return [body]
            return replacement_app

    A third optional argument ``conditional`` should be a function
    that takes ``conditional(status, headers)`` and returns False if
    the request should not be intercepted.  In that case
    ``start_response`` will be called and ``(None, None, app_iter)``
    will be returned.  You must detect that in your code and return
    the app_iter, like:

    .. code-block:: python

        def dehtmlifying_middleware(application):
            def replacement_app(environ, start_response):
                status, headers, body = intercept_output(
                    environ, application,
                    lambda s, h: header_value(headers, 'content-type').startswith('text/html'),
                    start_response)
                if status is None:
                    return body
                start_response(status, headers)
                body = re.sub(r'<.*?>', '', body)
                return [body]
            return replacement_app
    """
    if conditional is not None and start_response is None:
        raise TypeError(
            "If you provide conditional you must also provide "
            "start_response")
    data = []
    output = StringIO()
    def replacement_start_response(status, headers, exc_info=None):
        if conditional is not None and not conditional(status, headers):
            data.append(None)
            return start_response(status, headers, exc_info)
        if data:
            data[:] = []
        data.append(status)
        data.append(headers)
        return output.write
    app_iter = application(environ, replacement_start_response)
    if data[0] is None:
        return (None, None, app_iter)
    try:
        for item in app_iter:
            output.write(item)
    finally:
        if hasattr(app_iter, 'close'):
            app_iter.close()
    if not data:
        data.append(None)
    if len(data) < 2:
        data.append(None)
    data.append(output.getvalue())
    return data

## Deprecation warning wrapper:

class ResponseHeaderDict(HeaderDict):

    def __init__(self, *args, **kw):
        warnings.warn(
            "The class wsgilib.ResponseHeaderDict has been moved "
            "to paste.response.HeaderDict",
            DeprecationWarning, 2)
        HeaderDict.__init__(self, *args, **kw)

def _warn_deprecated(new_func):
    new_name = new_func.func_name
    new_path = new_func.func_globals['__name__'] + '.' + new_name
    def replacement(*args, **kw):
        warnings.warn(
            "The function wsgilib.%s has been moved to %s"
            % (new_name, new_path),
            DeprecationWarning, 2)
        return new_func(*args, **kw)
    try:
        replacement.func_name = new_func.func_name
    except:
        pass
    return replacement

# Put warnings wrapper in place for all public functions that
# were imported from elsewhere:

for _name in __all__:
    _func = globals()[_name]
    if (hasattr(_func, 'func_globals')
        and _func.func_globals['__name__'] != __name__):
        globals()[_name] = _warn_deprecated(_func)

if __name__ == '__main__':
    import doctest
    doctest.testmod()