# -*- coding: utf-8 -*- """ tests.basic ~~~~~~~~~~~~~~~~~~~~~ The basic functionality. :copyright: © 2010 by the Pallets team. :license: BSD, see LICENSE for more details. """ import re import time import uuid from datetime import datetime from threading import Thread import pytest import werkzeug.serving from werkzeug.exceptions import BadRequest, Forbidden, NotFound from werkzeug.http import parse_date from werkzeug.routing import BuildError import flask from flask._compat import text_type def test_options_work(app, client): @app.route('/', methods=['GET', 'POST']) def index(): return 'Hello World' rv = client.open('/', method='OPTIONS') assert sorted(rv.allow) == ['GET', 'HEAD', 'OPTIONS', 'POST'] assert rv.data == b'' def test_options_on_multiple_rules(app, client): @app.route('/', methods=['GET', 'POST']) def index(): return 'Hello World' @app.route('/', methods=['PUT']) def index_put(): return 'Aha!' rv = client.open('/', method='OPTIONS') assert sorted(rv.allow) == ['GET', 'HEAD', 'OPTIONS', 'POST', 'PUT'] def test_provide_automatic_options_attr(): app = flask.Flask(__name__) def index(): return 'Hello World!' index.provide_automatic_options = False app.route('/')(index) rv = app.test_client().open('/', method='OPTIONS') assert rv.status_code == 405 app = flask.Flask(__name__) def index2(): return 'Hello World!' index2.provide_automatic_options = True app.route('/', methods=['OPTIONS'])(index2) rv = app.test_client().open('/', method='OPTIONS') assert sorted(rv.allow) == ['OPTIONS'] def test_provide_automatic_options_kwarg(app, client): def index(): return flask.request.method def more(): return flask.request.method app.add_url_rule('/', view_func=index, provide_automatic_options=False) app.add_url_rule( '/more', view_func=more, methods=['GET', 'POST'], provide_automatic_options=False ) assert client.get('/').data == b'GET' rv = client.post('/') assert rv.status_code == 405 assert sorted(rv.allow) == ['GET', 'HEAD'] # Older versions of Werkzeug.test.Client don't have an options method if hasattr(client, 'options'): rv = client.options('/') else: rv = client.open('/', method='OPTIONS') assert rv.status_code == 405 rv = client.head('/') assert rv.status_code == 200 assert not rv.data # head truncates assert client.post('/more').data == b'POST' assert client.get('/more').data == b'GET' rv = client.delete('/more') assert rv.status_code == 405 assert sorted(rv.allow) == ['GET', 'HEAD', 'POST'] if hasattr(client, 'options'): rv = client.options('/more') else: rv = client.open('/more', method='OPTIONS') assert rv.status_code == 405 def test_request_dispatching(app, client): @app.route('/') def index(): return flask.request.method @app.route('/more', methods=['GET', 'POST']) def more(): return flask.request.method assert client.get('/').data == b'GET' rv = client.post('/') assert rv.status_code == 405 assert sorted(rv.allow) == ['GET', 'HEAD', 'OPTIONS'] rv = client.head('/') assert rv.status_code == 200 assert not rv.data # head truncates assert client.post('/more').data == b'POST' assert client.get('/more').data == b'GET' rv = client.delete('/more') assert rv.status_code == 405 assert sorted(rv.allow) == ['GET', 'HEAD', 'OPTIONS', 'POST'] def test_disallow_string_for_allowed_methods(app): with pytest.raises(TypeError): @app.route('/', methods='GET POST') def index(): return "Hey" def test_url_mapping(app, client): random_uuid4 = "7eb41166-9ebf-4d26-b771-ea3f54f8b383" def index(): return flask.request.method def more(): return flask.request.method def options(): return random_uuid4 app.add_url_rule('/', 'index', index) app.add_url_rule('/more', 'more', more, methods=['GET', 'POST']) # Issue 1288: Test that automatic options are not added when non-uppercase 'options' in methods app.add_url_rule('/options', 'options', options, methods=['options']) assert client.get('/').data == b'GET' rv = client.post('/') assert rv.status_code == 405 assert sorted(rv.allow) == ['GET', 'HEAD', 'OPTIONS'] rv = client.head('/') assert rv.status_code == 200 assert not rv.data # head truncates assert client.post('/more').data == b'POST' assert client.get('/more').data == b'GET' rv = client.delete('/more') assert rv.status_code == 405 assert sorted(rv.allow) == ['GET', 'HEAD', 'OPTIONS', 'POST'] rv = client.open('/options', method='OPTIONS') assert rv.status_code == 200 assert random_uuid4 in rv.data.decode("utf-8") def test_werkzeug_routing(app, client): from werkzeug.routing import Submount, Rule app.url_map.add(Submount('/foo', [ Rule('/bar', endpoint='bar'), Rule('/', endpoint='index') ])) def bar(): return 'bar' def index(): return 'index' app.view_functions['bar'] = bar app.view_functions['index'] = index assert client.get('/foo/').data == b'index' assert client.get('/foo/bar').data == b'bar' def test_endpoint_decorator(app, client): from werkzeug.routing import Submount, Rule app.url_map.add(Submount('/foo', [ Rule('/bar', endpoint='bar'), Rule('/', endpoint='index') ])) @app.endpoint('bar') def bar(): return 'bar' @app.endpoint('index') def index(): return 'index' assert client.get('/foo/').data == b'index' assert client.get('/foo/bar').data == b'bar' def test_session(app, client): @app.route('/set', methods=['POST']) def set(): assert not flask.session.accessed assert not flask.session.modified flask.session['value'] = flask.request.form['value'] assert flask.session.accessed assert flask.session.modified return 'value set' @app.route('/get') def get(): assert not flask.session.accessed assert not flask.session.modified v = flask.session.get('value', 'None') assert flask.session.accessed assert not flask.session.modified return v assert client.post('/set', data={'value': '42'}).data == b'value set' assert client.get('/get').data == b'42' def test_session_using_server_name(app, client): app.config.update( SERVER_NAME='example.com' ) @app.route('/') def index(): flask.session['testing'] = 42 return 'Hello World' rv = client.get('/', 'http://example.com/') assert 'domain=.example.com' in rv.headers['set-cookie'].lower() assert 'httponly' in rv.headers['set-cookie'].lower() def test_session_using_server_name_and_port(app, client): app.config.update( SERVER_NAME='example.com:8080' ) @app.route('/') def index(): flask.session['testing'] = 42 return 'Hello World' rv = client.get('/', 'http://example.com:8080/') assert 'domain=.example.com' in rv.headers['set-cookie'].lower() assert 'httponly' in rv.headers['set-cookie'].lower() def test_session_using_server_name_port_and_path(app, client): app.config.update( SERVER_NAME='example.com:8080', APPLICATION_ROOT='/foo' ) @app.route('/') def index(): flask.session['testing'] = 42 return 'Hello World' rv = client.get('/', 'http://example.com:8080/foo') assert 'domain=example.com' in rv.headers['set-cookie'].lower() assert 'path=/foo' in rv.headers['set-cookie'].lower() assert 'httponly' in rv.headers['set-cookie'].lower() def test_session_using_application_root(app, client): class PrefixPathMiddleware(object): def __init__(self, app, prefix): self.app = app self.prefix = prefix def __call__(self, environ, start_response): environ['SCRIPT_NAME'] = self.prefix return self.app(environ, start_response) app.wsgi_app = PrefixPathMiddleware(app.wsgi_app, '/bar') app.config.update( APPLICATION_ROOT='/bar' ) @app.route('/') def index(): flask.session['testing'] = 42 return 'Hello World' rv = client.get('/', 'http://example.com:8080/') assert 'path=/bar' in rv.headers['set-cookie'].lower() def test_session_using_session_settings(app, client): app.config.update( SERVER_NAME='www.example.com:8080', APPLICATION_ROOT='/test', SESSION_COOKIE_DOMAIN='.example.com', SESSION_COOKIE_HTTPONLY=False, SESSION_COOKIE_SECURE=True, SESSION_COOKIE_SAMESITE='Lax', SESSION_COOKIE_PATH='/' ) @app.route('/') def index(): flask.session['testing'] = 42 return 'Hello World' rv = client.get('/', 'http://www.example.com:8080/test/') cookie = rv.headers['set-cookie'].lower() assert 'domain=.example.com' in cookie assert 'path=/' in cookie assert 'secure' in cookie assert 'httponly' not in cookie assert 'samesite' in cookie def test_session_using_samesite_attribute(app, client): @app.route('/') def index(): flask.session['testing'] = 42 return 'Hello World' app.config.update(SESSION_COOKIE_SAMESITE='invalid') with pytest.raises(ValueError): client.get('/') app.config.update(SESSION_COOKIE_SAMESITE=None) rv = client.get('/') cookie = rv.headers['set-cookie'].lower() assert 'samesite' not in cookie app.config.update(SESSION_COOKIE_SAMESITE='Strict') rv = client.get('/') cookie = rv.headers['set-cookie'].lower() assert 'samesite=strict' in cookie app.config.update(SESSION_COOKIE_SAMESITE='Lax') rv = client.get('/') cookie = rv.headers['set-cookie'].lower() assert 'samesite=lax' in cookie def test_session_localhost_warning(recwarn, app, client): app.config.update( SERVER_NAME='localhost:5000', ) @app.route('/') def index(): flask.session['testing'] = 42 return 'testing' rv = client.get('/', 'http://localhost:5000/') assert 'domain' not in rv.headers['set-cookie'].lower() w = recwarn.pop(UserWarning) assert '"localhost" is not a valid cookie domain' in str(w.message) def test_session_ip_warning(recwarn, app, client): app.config.update( SERVER_NAME='127.0.0.1:5000', ) @app.route('/') def index(): flask.session['testing'] = 42 return 'testing' rv = client.get('/', 'http://127.0.0.1:5000/') assert 'domain=127.0.0.1' in rv.headers['set-cookie'].lower() w = recwarn.pop(UserWarning) assert 'cookie domain is an IP' in str(w.message) def test_missing_session(app): app.secret_key = None def expect_exception(f, *args, **kwargs): e = pytest.raises(RuntimeError, f, *args, **kwargs) assert e.value.args and 'session is unavailable' in e.value.args[0] with app.test_request_context(): assert flask.session.get('missing_key') is None expect_exception(flask.session.__setitem__, 'foo', 42) expect_exception(flask.session.pop, 'foo') def test_session_expiration(app, client): permanent = True @app.route('/') def index(): flask.session['test'] = 42 flask.session.permanent = permanent return '' @app.route('/test') def test(): return text_type(flask.session.permanent) rv = client.get('/') assert 'set-cookie' in rv.headers match = re.search(r'(?i)\bexpires=([^;]+)', rv.headers['set-cookie']) expires = parse_date(match.group()) expected = datetime.utcnow() + app.permanent_session_lifetime assert expires.year == expected.year assert expires.month == expected.month assert expires.day == expected.day rv = client.get('/test') assert rv.data == b'True' permanent = False rv = client.get('/') assert 'set-cookie' in rv.headers match = re.search(r'\bexpires=([^;]+)', rv.headers['set-cookie']) assert match is None def test_session_stored_last(app, client): @app.after_request def modify_session(response): flask.session['foo'] = 42 return response @app.route('/') def dump_session_contents(): return repr(flask.session.get('foo')) assert client.get('/').data == b'None' assert client.get('/').data == b'42' def test_session_special_types(app, client): now = datetime.utcnow().replace(microsecond=0) the_uuid = uuid.uuid4() @app.route('/') def dump_session_contents(): flask.session['t'] = (1, 2, 3) flask.session['b'] = b'\xff' flask.session['m'] = flask.Markup('') flask.session['u'] = the_uuid flask.session['d'] = now flask.session['t_tag'] = {' t': 'not-a-tuple'} flask.session['di_t_tag'] = {' t__': 'not-a-tuple'} flask.session['di_tag'] = {' di': 'not-a-dict'} return '', 204 with client: client.get('/') s = flask.session assert s['t'] == (1, 2, 3) assert type(s['b']) == bytes assert s['b'] == b'\xff' assert type(s['m']) == flask.Markup assert s['m'] == flask.Markup('') assert s['u'] == the_uuid assert s['d'] == now assert s['t_tag'] == {' t': 'not-a-tuple'} assert s['di_t_tag'] == {' t__': 'not-a-tuple'} assert s['di_tag'] == {' di': 'not-a-dict'} def test_session_cookie_setting(app): is_permanent = True @app.route('/bump') def bump(): rv = flask.session['foo'] = flask.session.get('foo', 0) + 1 flask.session.permanent = is_permanent return str(rv) @app.route('/read') def read(): return str(flask.session.get('foo', 0)) def run_test(expect_header): with app.test_client() as c: assert c.get('/bump').data == b'1' assert c.get('/bump').data == b'2' assert c.get('/bump').data == b'3' rv = c.get('/read') set_cookie = rv.headers.get('set-cookie') assert (set_cookie is not None) == expect_header assert rv.data == b'3' is_permanent = True app.config['SESSION_REFRESH_EACH_REQUEST'] = True run_test(expect_header=True) is_permanent = True app.config['SESSION_REFRESH_EACH_REQUEST'] = False run_test(expect_header=False) is_permanent = False app.config['SESSION_REFRESH_EACH_REQUEST'] = True run_test(expect_header=False) is_permanent = False app.config['SESSION_REFRESH_EACH_REQUEST'] = False run_test(expect_header=False) def test_session_vary_cookie(app, client): @app.route('/set') def set_session(): flask.session['test'] = 'test' return '' @app.route('/get') def get(): return flask.session.get('test') @app.route('/getitem') def getitem(): return flask.session['test'] @app.route('/setdefault') def setdefault(): return flask.session.setdefault('test', 'default') @app.route('/vary-cookie-header-set') def vary_cookie_header_set(): response = flask.Response() response.vary.add('Cookie') flask.session['test'] = 'test' return response @app.route('/vary-header-set') def vary_header_set(): response = flask.Response() response.vary.update(('Accept-Encoding', 'Accept-Language')) flask.session['test'] = 'test' return response @app.route('/no-vary-header') def no_vary_header(): return '' def expect(path, header_value='Cookie'): rv = client.get(path) if header_value: # The 'Vary' key should exist in the headers only once. assert len(rv.headers.get_all('Vary')) == 1 assert rv.headers['Vary'] == header_value else: assert 'Vary' not in rv.headers expect('/set') expect('/get') expect('/getitem') expect('/setdefault') expect('/vary-cookie-header-set') expect('/vary-header-set', 'Accept-Encoding, Accept-Language, Cookie') expect('/no-vary-header', None) def test_flashes(app, req_ctx): assert not flask.session.modified flask.flash('Zap') flask.session.modified = False flask.flash('Zip') assert flask.session.modified assert list(flask.get_flashed_messages()) == ['Zap', 'Zip'] def test_extended_flashing(app): # Be sure app.testing=True below, else tests can fail silently. # # Specifically, if app.testing is not set to True, the AssertionErrors # in the view functions will cause a 500 response to the test client # instead of propagating exceptions. @app.route('/') def index(): flask.flash(u'Hello World') flask.flash(u'Hello World', 'error') flask.flash(flask.Markup(u'Testing'), 'warning') return '' @app.route('/test/') def test(): messages = flask.get_flashed_messages() assert list(messages) == [ u'Hello World', u'Hello World', flask.Markup(u'Testing') ] return '' @app.route('/test_with_categories/') def test_with_categories(): messages = flask.get_flashed_messages(with_categories=True) assert len(messages) == 3 assert list(messages) == [ ('message', u'Hello World'), ('error', u'Hello World'), ('warning', flask.Markup(u'Testing')) ] return '' @app.route('/test_filter/') def test_filter(): messages = flask.get_flashed_messages( category_filter=['message'], with_categories=True) assert list(messages) == [('message', u'Hello World')] return '' @app.route('/test_filters/') def test_filters(): messages = flask.get_flashed_messages( category_filter=['message', 'warning'], with_categories=True) assert list(messages) == [ ('message', u'Hello World'), ('warning', flask.Markup(u'Testing')) ] return '' @app.route('/test_filters_without_returning_categories/') def test_filters2(): messages = flask.get_flashed_messages( category_filter=['message', 'warning']) assert len(messages) == 2 assert messages[0] == u'Hello World' assert messages[1] == flask.Markup(u'Testing') return '' # Create new test client on each test to clean flashed messages. client = app.test_client() client.get('/') client.get('/test_with_categories/') client = app.test_client() client.get('/') client.get('/test_filter/') client = app.test_client() client.get('/') client.get('/test_filters/') client = app.test_client() client.get('/') client.get('/test_filters_without_returning_categories/') def test_request_processing(app, client): evts = [] @app.before_request def before_request(): evts.append('before') @app.after_request def after_request(response): response.data += b'|after' evts.append('after') return response @app.route('/') def index(): assert 'before' in evts assert 'after' not in evts return 'request' assert 'after' not in evts rv = client.get('/').data assert 'after' in evts assert rv == b'request|after' def test_request_preprocessing_early_return(app, client): evts = [] @app.before_request def before_request1(): evts.append(1) @app.before_request def before_request2(): evts.append(2) return "hello" @app.before_request def before_request3(): evts.append(3) return "bye" @app.route('/') def index(): evts.append('index') return "damnit" rv = client.get('/').data.strip() assert rv == b'hello' assert evts == [1, 2] def test_after_request_processing(app, client): @app.route('/') def index(): @flask.after_this_request def foo(response): response.headers['X-Foo'] = 'a header' return response return 'Test' resp = client.get('/') assert resp.status_code == 200 assert resp.headers['X-Foo'] == 'a header' def test_teardown_request_handler(app, client): called = [] @app.teardown_request def teardown_request(exc): called.append(True) return "Ignored" @app.route('/') def root(): return "Response" rv = client.get('/') assert rv.status_code == 200 assert b'Response' in rv.data assert len(called) == 1 def test_teardown_request_handler_debug_mode(app, client): called = [] @app.teardown_request def teardown_request(exc): called.append(True) return "Ignored" @app.route('/') def root(): return "Response" rv = client.get('/') assert rv.status_code == 200 assert b'Response' in rv.data assert len(called) == 1 def test_teardown_request_handler_error(app, client): called = [] app.testing = False @app.teardown_request def teardown_request1(exc): assert type(exc) == ZeroDivisionError called.append(True) # This raises a new error and blows away sys.exc_info(), so we can # test that all teardown_requests get passed the same original # exception. try: raise TypeError() except: pass @app.teardown_request def teardown_request2(exc): assert type(exc) == ZeroDivisionError called.append(True) # This raises a new error and blows away sys.exc_info(), so we can # test that all teardown_requests get passed the same original # exception. try: raise TypeError() except: pass @app.route('/') def fails(): 1 // 0 rv = client.get('/') assert rv.status_code == 500 assert b'Internal Server Error' in rv.data assert len(called) == 2 def test_before_after_request_order(app, client): called = [] @app.before_request def before1(): called.append(1) @app.before_request def before2(): called.append(2) @app.after_request def after1(response): called.append(4) return response @app.after_request def after2(response): called.append(3) return response @app.teardown_request def finish1(exc): called.append(6) @app.teardown_request def finish2(exc): called.append(5) @app.route('/') def index(): return '42' rv = client.get('/') assert rv.data == b'42' assert called == [1, 2, 3, 4, 5, 6] def test_error_handling(app, client): app.testing = False @app.errorhandler(404) def not_found(e): return 'not found', 404 @app.errorhandler(500) def internal_server_error(e): return 'internal server error', 500 @app.errorhandler(Forbidden) def forbidden(e): return 'forbidden', 403 @app.route('/') def index(): flask.abort(404) @app.route('/error') def error(): 1 // 0 @app.route('/forbidden') def error2(): flask.abort(403) rv = client.get('/') assert rv.status_code == 404 assert rv.data == b'not found' rv = client.get('/error') assert rv.status_code == 500 assert b'internal server error' == rv.data rv = client.get('/forbidden') assert rv.status_code == 403 assert b'forbidden' == rv.data def test_error_handler_unknown_code(app): with pytest.raises(KeyError) as exc_info: app.register_error_handler(999, lambda e: ('999', 999)) assert 'Use a subclass' in exc_info.value.args[0] def test_error_handling_processing(app, client): app.testing = False @app.errorhandler(500) def internal_server_error(e): return 'internal server error', 500 @app.route('/') def broken_func(): 1 // 0 @app.after_request def after_request(resp): resp.mimetype = 'text/x-special' return resp resp = client.get('/') assert resp.mimetype == 'text/x-special' assert resp.data == b'internal server error' def test_baseexception_error_handling(app, client): app.testing = False @app.route('/') def broken_func(): raise KeyboardInterrupt() with pytest.raises(KeyboardInterrupt): client.get('/') ctx = flask._request_ctx_stack.top assert ctx.preserved assert type(ctx._preserved_exc) is KeyboardInterrupt def test_before_request_and_routing_errors(app, client): @app.before_request def attach_something(): flask.g.something = 'value' @app.errorhandler(404) def return_something(error): return flask.g.something, 404 rv = client.get('/') assert rv.status_code == 404 assert rv.data == b'value' def test_user_error_handling(app, client): class MyException(Exception): pass @app.errorhandler(MyException) def handle_my_exception(e): assert isinstance(e, MyException) return '42' @app.route('/') def index(): raise MyException() assert client.get('/').data == b'42' def test_http_error_subclass_handling(app, client): class ForbiddenSubclass(Forbidden): pass @app.errorhandler(ForbiddenSubclass) def handle_forbidden_subclass(e): assert isinstance(e, ForbiddenSubclass) return 'banana' @app.errorhandler(403) def handle_forbidden_subclass(e): assert not isinstance(e, ForbiddenSubclass) assert isinstance(e, Forbidden) return 'apple' @app.route('/1') def index1(): raise ForbiddenSubclass() @app.route('/2') def index2(): flask.abort(403) @app.route('/3') def index3(): raise Forbidden() assert client.get('/1').data == b'banana' assert client.get('/2').data == b'apple' assert client.get('/3').data == b'apple' def test_errorhandler_precedence(app, client): class E1(Exception): pass class E2(Exception): pass class E3(E1, E2): pass @app.errorhandler(E2) def handle_e2(e): return 'E2' @app.errorhandler(Exception) def handle_exception(e): return 'Exception' @app.route('/E1') def raise_e1(): raise E1 @app.route('/E3') def raise_e3(): raise E3 rv = client.get('/E1') assert rv.data == b'Exception' rv = client.get('/E3') assert rv.data == b'E2' def test_trapping_of_bad_request_key_errors(app, client): @app.route('/key') def fail(): flask.request.form['missing_key'] @app.route('/abort') def allow_abort(): flask.abort(400) rv = client.get('/key') assert rv.status_code == 400 assert b'missing_key' not in rv.data rv = client.get('/abort') assert rv.status_code == 400 app.debug = True with pytest.raises(KeyError) as e: client.get("/key") assert e.errisinstance(BadRequest) assert 'missing_key' in e.value.description rv = client.get('/abort') assert rv.status_code == 400 app.debug = False app.config['TRAP_BAD_REQUEST_ERRORS'] = True with pytest.raises(KeyError): client.get('/key') with pytest.raises(BadRequest): client.get('/abort') def test_trapping_of_all_http_exceptions(app, client): app.config['TRAP_HTTP_EXCEPTIONS'] = True @app.route('/fail') def fail(): flask.abort(404) with pytest.raises(NotFound): client.get('/fail') def test_error_handler_after_processor_error(app, client): app.testing = False @app.before_request def before_request(): if trigger == 'before': 1 // 0 @app.after_request def after_request(response): if trigger == 'after': 1 // 0 return response @app.route('/') def index(): return 'Foo' @app.errorhandler(500) def internal_server_error(e): return 'Hello Server Error', 500 for trigger in 'before', 'after': rv = client.get('/') assert rv.status_code == 500 assert rv.data == b'Hello Server Error' def test_enctype_debug_helper(app, client): from flask.debughelpers import DebugFilesKeyError app.debug = True @app.route('/fail', methods=['POST']) def index(): return flask.request.files['foo'].filename # with statement is important because we leave an exception on the # stack otherwise and we want to ensure that this is not the case # to not negatively affect other tests. with client: with pytest.raises(DebugFilesKeyError) as e: client.post('/fail', data={'foo': 'index.txt'}) assert 'no file contents were transmitted' in str(e.value) assert 'This was submitted: "index.txt"' in str(e.value) def test_response_types(app, client): @app.route('/text') def from_text(): return u'Hällo Wörld' @app.route('/bytes') def from_bytes(): return u'Hällo Wörld'.encode('utf-8') @app.route('/full_tuple') def from_full_tuple(): return 'Meh', 400, { 'X-Foo': 'Testing', 'Content-Type': 'text/plain; charset=utf-8' } @app.route('/text_headers') def from_text_headers(): return 'Hello', { 'X-Foo': 'Test', 'Content-Type': 'text/plain; charset=utf-8' } @app.route('/text_status') def from_text_status(): return 'Hi, status!', 400 @app.route('/response_headers') def from_response_headers(): return flask.Response('Hello world', 404, {'X-Foo': 'Baz'}), { "X-Foo": "Bar", "X-Bar": "Foo" } @app.route('/response_status') def from_response_status(): return app.response_class('Hello world', 400), 500 @app.route('/wsgi') def from_wsgi(): return NotFound() assert client.get('/text').data == u'Hällo Wörld'.encode('utf-8') assert client.get('/bytes').data == u'Hällo Wörld'.encode('utf-8') rv = client.get('/full_tuple') assert rv.data == b'Meh' assert rv.headers['X-Foo'] == 'Testing' assert rv.status_code == 400 assert rv.mimetype == 'text/plain' rv = client.get('/text_headers') assert rv.data == b'Hello' assert rv.headers['X-Foo'] == 'Test' assert rv.status_code == 200 assert rv.mimetype == 'text/plain' rv = client.get('/text_status') assert rv.data == b'Hi, status!' assert rv.status_code == 400 assert rv.mimetype == 'text/html' rv = client.get('/response_headers') assert rv.data == b'Hello world' assert rv.headers.getlist('X-Foo') == ['Baz', 'Bar'] assert rv.headers['X-Bar'] == 'Foo' assert rv.status_code == 404 rv = client.get('/response_status') assert rv.data == b'Hello world' assert rv.status_code == 500 rv = client.get('/wsgi') assert b'Not Found' in rv.data assert rv.status_code == 404 def test_response_type_errors(): app = flask.Flask(__name__) app.testing = True @app.route('/none') def from_none(): pass @app.route('/small_tuple') def from_small_tuple(): return 'Hello', @app.route('/large_tuple') def from_large_tuple(): return 'Hello', 234, {'X-Foo': 'Bar'}, '???' @app.route('/bad_type') def from_bad_type(): return True @app.route('/bad_wsgi') def from_bad_wsgi(): return lambda: None c = app.test_client() with pytest.raises(TypeError) as e: c.get('/none') assert 'returned None' in str(e) with pytest.raises(TypeError) as e: c.get('/small_tuple') assert 'tuple must have the form' in str(e) pytest.raises(TypeError, c.get, '/large_tuple') with pytest.raises(TypeError) as e: c.get('/bad_type') assert 'it was a bool' in str(e) pytest.raises(TypeError, c.get, '/bad_wsgi') def test_make_response(app, req_ctx): rv = flask.make_response() assert rv.status_code == 200 assert rv.data == b'' assert rv.mimetype == 'text/html' rv = flask.make_response('Awesome') assert rv.status_code == 200 assert rv.data == b'Awesome' assert rv.mimetype == 'text/html' rv = flask.make_response('W00t', 404) assert rv.status_code == 404 assert rv.data == b'W00t' assert rv.mimetype == 'text/html' def test_make_response_with_response_instance(app, req_ctx): rv = flask.make_response( flask.jsonify({'msg': 'W00t'}), 400) assert rv.status_code == 400 assert rv.data == b'{"msg":"W00t"}\n' assert rv.mimetype == 'application/json' rv = flask.make_response( flask.Response(''), 400) assert rv.status_code == 400 assert rv.data == b'' assert rv.mimetype == 'text/html' rv = flask.make_response( flask.Response('', headers={'Content-Type': 'text/html'}), 400, [('X-Foo', 'bar')]) assert rv.status_code == 400 assert rv.headers['Content-Type'] == 'text/html' assert rv.headers['X-Foo'] == 'bar' def test_jsonify_no_prettyprint(app, req_ctx): app.config.update({"JSONIFY_PRETTYPRINT_REGULAR": False}) compressed_msg = b'{"msg":{"submsg":"W00t"},"msg2":"foobar"}\n' uncompressed_msg = { "msg": { "submsg": "W00t" }, "msg2": "foobar" } rv = flask.make_response( flask.jsonify(uncompressed_msg), 200) assert rv.data == compressed_msg def test_jsonify_prettyprint(app, req_ctx): app.config.update({"JSONIFY_PRETTYPRINT_REGULAR": True}) compressed_msg = {"msg": {"submsg": "W00t"}, "msg2": "foobar"} pretty_response = \ b'{\n "msg": {\n "submsg": "W00t"\n }, \n "msg2": "foobar"\n}\n' rv = flask.make_response( flask.jsonify(compressed_msg), 200) assert rv.data == pretty_response def test_jsonify_mimetype(app, req_ctx): app.config.update({"JSONIFY_MIMETYPE": 'application/vnd.api+json'}) msg = { "msg": {"submsg": "W00t"}, } rv = flask.make_response( flask.jsonify(msg), 200) assert rv.mimetype == 'application/vnd.api+json' def test_jsonify_args_and_kwargs_check(app, req_ctx): with pytest.raises(TypeError) as e: flask.jsonify('fake args', kwargs='fake') assert 'behavior undefined' in str(e.value) def test_url_generation(app, req_ctx): @app.route('/hello/', methods=['POST']) def hello(): pass assert flask.url_for('hello', name='test x') == '/hello/test%20x' assert flask.url_for('hello', name='test x', _external=True) == \ 'http://localhost/hello/test%20x' def test_build_error_handler(app): # Test base case, a URL which results in a BuildError. with app.test_request_context(): pytest.raises(BuildError, flask.url_for, 'spam') # Verify the error is re-raised if not the current exception. try: with app.test_request_context(): flask.url_for('spam') except BuildError as err: error = err try: raise RuntimeError('Test case where BuildError is not current.') except RuntimeError: pytest.raises( BuildError, app.handle_url_build_error, error, 'spam', {}) # Test a custom handler. def handler(error, endpoint, values): # Just a test. return '/test_handler/' app.url_build_error_handlers.append(handler) with app.test_request_context(): assert flask.url_for('spam') == '/test_handler/' def test_build_error_handler_reraise(app): # Test a custom handler which reraises the BuildError def handler_raises_build_error(error, endpoint, values): raise error app.url_build_error_handlers.append(handler_raises_build_error) with app.test_request_context(): pytest.raises(BuildError, flask.url_for, 'not.existing') def test_url_for_passes_special_values_to_build_error_handler(app): @app.url_build_error_handlers.append def handler(error, endpoint, values): assert values == { '_external': False, '_anchor': None, '_method': None, '_scheme': None, } return 'handled' with app.test_request_context(): flask.url_for('/') def test_custom_converters(app, client): from werkzeug.routing import BaseConverter class ListConverter(BaseConverter): def to_python(self, value): return value.split(',') def to_url(self, value): base_to_url = super(ListConverter, self).to_url return ','.join(base_to_url(x) for x in value) app.url_map.converters['list'] = ListConverter @app.route('/') def index(args): return '|'.join(args) assert client.get('/1,2,3').data == b'1|2|3' def test_static_files(app, client): rv = client.get('/static/index.html') assert rv.status_code == 200 assert rv.data.strip() == b'

Hello World!

' with app.test_request_context(): assert flask.url_for('static', filename='index.html') == \ '/static/index.html' rv.close() def test_static_url_path(): app = flask.Flask(__name__, static_url_path='/foo') app.testing = True rv = app.test_client().get('/foo/index.html') assert rv.status_code == 200 rv.close() with app.test_request_context(): assert flask.url_for('static', filename='index.html') == '/foo/index.html' def test_static_route_with_host_matching(): app = flask.Flask(__name__, host_matching=True, static_host='example.com') c = app.test_client() rv = c.get('http://example.com/static/index.html') assert rv.status_code == 200 rv.close() with app.test_request_context(): rv = flask.url_for('static', filename='index.html', _external=True) assert rv == 'http://example.com/static/index.html' # Providing static_host without host_matching=True should error. with pytest.raises(Exception): flask.Flask(__name__, static_host='example.com') # Providing host_matching=True with static_folder but without static_host should error. with pytest.raises(Exception): flask.Flask(__name__, host_matching=True) # Providing host_matching=True without static_host but with static_folder=None should not error. flask.Flask(__name__, host_matching=True, static_folder=None) def test_request_locals(): assert repr(flask.g) == '' assert not flask.g def test_test_app_proper_environ(): app = flask.Flask(__name__, subdomain_matching=True) app.config.update( SERVER_NAME='localhost.localdomain:5000' ) client = app.test_client() @app.route('/') def index(): return 'Foo' @app.route('/', subdomain='foo') def subdomain(): return 'Foo SubDomain' rv = client.get('/') assert rv.data == b'Foo' rv = client.get('/', 'http://localhost.localdomain:5000') assert rv.data == b'Foo' rv = client.get('/', 'https://localhost.localdomain:5000') assert rv.data == b'Foo' app.config.update(SERVER_NAME='localhost.localdomain') rv = client.get('/', 'https://localhost.localdomain') assert rv.data == b'Foo' try: app.config.update(SERVER_NAME='localhost.localdomain:443') rv = client.get('/', 'https://localhost.localdomain') # Werkzeug 0.8 assert rv.status_code == 404 except ValueError as e: # Werkzeug 0.7 assert str(e) == ( "the server name provided " "('localhost.localdomain:443') does not match the " "server name from the WSGI environment ('localhost.localdomain')" ) try: app.config.update(SERVER_NAME='localhost.localdomain') rv = client.get('/', 'http://foo.localhost') # Werkzeug 0.8 assert rv.status_code == 404 except ValueError as e: # Werkzeug 0.7 assert str(e) == ( "the server name provided " "('localhost.localdomain') does not match the " "server name from the WSGI environment ('foo.localhost')" ) rv = client.get('/', 'http://foo.localhost.localdomain') assert rv.data == b'Foo SubDomain' def test_exception_propagation(app, client): def apprunner(config_key): @app.route('/') def index(): 1 // 0 if config_key is not None: app.config[config_key] = True with pytest.raises(Exception): client.get('/') else: assert client.get('/').status_code == 500 # we have to run this test in an isolated thread because if the # debug flag is set to true and an exception happens the context is # not torn down. This causes other tests that run after this fail # when they expect no exception on the stack. for config_key in 'TESTING', 'PROPAGATE_EXCEPTIONS', 'DEBUG', None: t = Thread(target=apprunner, args=(config_key,)) t.start() t.join() @pytest.mark.parametrize('debug', [True, False]) @pytest.mark.parametrize('use_debugger', [True, False]) @pytest.mark.parametrize('use_reloader', [True, False]) @pytest.mark.parametrize('propagate_exceptions', [None, True, False]) def test_werkzeug_passthrough_errors(monkeypatch, debug, use_debugger, use_reloader, propagate_exceptions, app): rv = {} # Mocks werkzeug.serving.run_simple method def run_simple_mock(*args, **kwargs): rv['passthrough_errors'] = kwargs.get('passthrough_errors') monkeypatch.setattr(werkzeug.serving, 'run_simple', run_simple_mock) app.config['PROPAGATE_EXCEPTIONS'] = propagate_exceptions app.run(debug=debug, use_debugger=use_debugger, use_reloader=use_reloader) def test_max_content_length(app, client): app.config['MAX_CONTENT_LENGTH'] = 64 @app.before_request def always_first(): flask.request.form['myfile'] assert False @app.route('/accept', methods=['POST']) def accept_file(): flask.request.form['myfile'] assert False @app.errorhandler(413) def catcher(error): return '42' rv = client.post('/accept', data={'myfile': 'foo' * 100}) assert rv.data == b'42' def test_url_processors(app, client): @app.url_defaults def add_language_code(endpoint, values): if flask.g.lang_code is not None and \ app.url_map.is_endpoint_expecting(endpoint, 'lang_code'): values.setdefault('lang_code', flask.g.lang_code) @app.url_value_preprocessor def pull_lang_code(endpoint, values): flask.g.lang_code = values.pop('lang_code', None) @app.route('//') def index(): return flask.url_for('about') @app.route('//about') def about(): return flask.url_for('something_else') @app.route('/foo') def something_else(): return flask.url_for('about', lang_code='en') assert client.get('/de/').data == b'/de/about' assert client.get('/de/about').data == b'/foo' assert client.get('/foo').data == b'/en/about' def test_inject_blueprint_url_defaults(app): bp = flask.Blueprint('foo.bar.baz', __name__, template_folder='template') @bp.url_defaults def bp_defaults(endpoint, values): values['page'] = 'login' @bp.route('/') def view(page): pass app.register_blueprint(bp) values = dict() app.inject_url_defaults('foo.bar.baz.view', values) expected = dict(page='login') assert values == expected with app.test_request_context('/somepage'): url = flask.url_for('foo.bar.baz.view') expected = '/login' assert url == expected def test_nonascii_pathinfo(app, client): @app.route(u'/киртест') def index(): return 'Hello World!' rv = client.get(u'/киртест') assert rv.data == b'Hello World!' def test_debug_mode_complains_after_first_request(app, client): app.debug = True @app.route('/') def index(): return 'Awesome' assert not app.got_first_request assert client.get('/').data == b'Awesome' with pytest.raises(AssertionError) as e: @app.route('/foo') def broken(): return 'Meh' assert 'A setup function was called' in str(e) app.debug = False @app.route('/foo') def working(): return 'Meh' assert client.get('/foo').data == b'Meh' assert app.got_first_request def test_before_first_request_functions(app, client): got = [] @app.before_first_request def foo(): got.append(42) client.get('/') assert got == [42] client.get('/') assert got == [42] assert app.got_first_request def test_before_first_request_functions_concurrent(app, client): got = [] @app.before_first_request def foo(): time.sleep(0.2) got.append(42) def get_and_assert(): client.get("/") assert got == [42] t = Thread(target=get_and_assert) t.start() get_and_assert() t.join() assert app.got_first_request def test_routing_redirect_debugging(app, client): app.debug = True @app.route('/foo/', methods=['GET', 'POST']) def foo(): return 'success' with client: with pytest.raises(AssertionError) as e: client.post('/foo', data={}) assert 'http://localhost/foo/' in str(e) assert ('Make sure to directly send ' 'your POST-request to this URL') in str(e) rv = client.get('/foo', data={}, follow_redirects=True) assert rv.data == b'success' app.debug = False with client: rv = client.post('/foo', data={}, follow_redirects=True) assert rv.data == b'success' def test_route_decorator_custom_endpoint(app, client): app.debug = True @app.route('/foo/') def foo(): return flask.request.endpoint @app.route('/bar/', endpoint='bar') def for_bar(): return flask.request.endpoint @app.route('/bar/123', endpoint='123') def for_bar_foo(): return flask.request.endpoint with app.test_request_context(): assert flask.url_for('foo') == '/foo/' assert flask.url_for('bar') == '/bar/' assert flask.url_for('123') == '/bar/123' assert client.get('/foo/').data == b'foo' assert client.get('/bar/').data == b'bar' assert client.get('/bar/123').data == b'123' def test_preserve_only_once(app, client): app.debug = True @app.route('/fail') def fail_func(): 1 // 0 for x in range(3): with pytest.raises(ZeroDivisionError): client.get('/fail') assert flask._request_ctx_stack.top is not None assert flask._app_ctx_stack.top is not None # implicit appctx disappears too flask._request_ctx_stack.top.pop() assert flask._request_ctx_stack.top is None assert flask._app_ctx_stack.top is None def test_preserve_remembers_exception(app, client): app.debug = True errors = [] @app.route('/fail') def fail_func(): 1 // 0 @app.route('/success') def success_func(): return 'Okay' @app.teardown_request def teardown_handler(exc): errors.append(exc) # After this failure we did not yet call the teardown handler with pytest.raises(ZeroDivisionError): client.get('/fail') assert errors == [] # But this request triggers it, and it's an error client.get('/success') assert len(errors) == 2 assert isinstance(errors[0], ZeroDivisionError) # At this point another request does nothing. client.get('/success') assert len(errors) == 3 assert errors[1] is None def test_get_method_on_g(app_ctx): assert flask.g.get('x') is None assert flask.g.get('x', 11) == 11 flask.g.x = 42 assert flask.g.get('x') == 42 assert flask.g.x == 42 def test_g_iteration_protocol(app_ctx): flask.g.foo = 23 flask.g.bar = 42 assert 'foo' in flask.g assert 'foos' not in flask.g assert sorted(flask.g) == ['bar', 'foo'] def test_subdomain_basic_support(): app = flask.Flask(__name__, subdomain_matching=True) app.config['SERVER_NAME'] = 'localhost.localdomain' client = app.test_client() @app.route('/') def normal_index(): return 'normal index' @app.route('/', subdomain='test') def test_index(): return 'test index' rv = client.get('/', 'http://localhost.localdomain/') assert rv.data == b'normal index' rv = client.get('/', 'http://test.localhost.localdomain/') assert rv.data == b'test index' def test_subdomain_matching(): app = flask.Flask(__name__, subdomain_matching=True) client = app.test_client() app.config['SERVER_NAME'] = 'localhost.localdomain' @app.route('/', subdomain='') def index(user): return 'index for %s' % user rv = client.get('/', 'http://mitsuhiko.localhost.localdomain/') assert rv.data == b'index for mitsuhiko' def test_subdomain_matching_with_ports(): app = flask.Flask(__name__, subdomain_matching=True) app.config['SERVER_NAME'] = 'localhost.localdomain:3000' client = app.test_client() @app.route('/', subdomain='') def index(user): return 'index for %s' % user rv = client.get('/', 'http://mitsuhiko.localhost.localdomain:3000/') assert rv.data == b'index for mitsuhiko' @pytest.mark.parametrize('matching', (False, True)) def test_subdomain_matching_other_name(matching): app = flask.Flask(__name__, subdomain_matching=matching) app.config['SERVER_NAME'] = 'localhost.localdomain:3000' client = app.test_client() @app.route('/') def index(): return '', 204 # ip address can't match name rv = client.get('/', 'http://127.0.0.1:3000/') assert rv.status_code == 404 if matching else 204 # allow all subdomains if matching is disabled rv = client.get('/', 'http://www.localhost.localdomain:3000/') assert rv.status_code == 404 if matching else 204 def test_multi_route_rules(app, client): @app.route('/') @app.route('//') def index(test='a'): return test rv = client.open('/') assert rv.data == b'a' rv = client.open('/b/') assert rv.data == b'b' def test_multi_route_class_views(app, client): class View(object): def __init__(self, app): app.add_url_rule('/', 'index', self.index) app.add_url_rule('//', 'index', self.index) def index(self, test='a'): return test _ = View(app) rv = client.open('/') assert rv.data == b'a' rv = client.open('/b/') assert rv.data == b'b' def test_run_defaults(monkeypatch, app): rv = {} # Mocks werkzeug.serving.run_simple method def run_simple_mock(*args, **kwargs): rv['result'] = 'running...' monkeypatch.setattr(werkzeug.serving, 'run_simple', run_simple_mock) app.run() assert rv['result'] == 'running...' def test_run_server_port(monkeypatch, app): rv = {} # Mocks werkzeug.serving.run_simple method def run_simple_mock(hostname, port, application, *args, **kwargs): rv['result'] = 'running on %s:%s ...' % (hostname, port) monkeypatch.setattr(werkzeug.serving, 'run_simple', run_simple_mock) hostname, port = 'localhost', 8000 app.run(hostname, port, debug=True) assert rv['result'] == 'running on %s:%s ...' % (hostname, port) @pytest.mark.parametrize('host,port,expect_host,expect_port', ( (None, None, 'pocoo.org', 8080), ('localhost', None, 'localhost', 8080), (None, 80, 'pocoo.org', 80), ('localhost', 80, 'localhost', 80), )) def test_run_from_config(monkeypatch, host, port, expect_host, expect_port, app): def run_simple_mock(hostname, port, *args, **kwargs): assert hostname == expect_host assert port == expect_port monkeypatch.setattr(werkzeug.serving, 'run_simple', run_simple_mock) app.config['SERVER_NAME'] = 'pocoo.org:8080' app.run(host, port) def test_max_cookie_size(app, client, recwarn): app.config['MAX_COOKIE_SIZE'] = 100 # outside app context, default to Werkzeug static value, # which is also the default config response = flask.Response() default = flask.Flask.default_config['MAX_COOKIE_SIZE'] assert response.max_cookie_size == default # inside app context, use app config with app.app_context(): assert flask.Response().max_cookie_size == 100 @app.route('/') def index(): r = flask.Response('', status=204) r.set_cookie('foo', 'bar' * 100) return r client.get('/') assert len(recwarn) == 1 w = recwarn.pop() assert 'cookie is too large' in str(w.message) app.config['MAX_COOKIE_SIZE'] = 0 client.get('/') assert len(recwarn) == 0