# (c) 2005 Ian Bicking and contributors; written for Paste # (http://pythonpaste.org) # Licensed under the MIT license: # http://www.opensource.org/licenses/mit-license.php """ Routines for testing WSGI applications. Most interesting is TestApp """ from __future__ import unicode_literals import os import re import json import random import fnmatch import mimetypes from base64 import b64encode from six import StringIO from six import BytesIO from six import string_types from six import binary_type from six import text_type from six.moves import http_cookiejar from webtest.compat import urlparse from webtest.compat import urlencode from webtest.compat import to_bytes from webtest.compat import escape_cookie_value from webtest.response import TestResponse from webtest import forms from webtest import lint from webtest import utils import webob __all__ = ['TestApp', 'TestRequest'] class AppError(Exception): def __init__(self, message, *args): if isinstance(message, binary_type): message = message.decode('utf8') str_args = () for arg in args: if isinstance(arg, webob.Response): body = arg.body if isinstance(body, binary_type): if arg.charset: arg = body.decode(arg.charset) else: arg = repr(body) elif isinstance(arg, binary_type): try: arg = arg.decode('utf8') except UnicodeDecodeError: arg = repr(arg) str_args += (arg,) message = message % str_args Exception.__init__(self, message) class CookiePolicy(http_cookiejar.DefaultCookiePolicy): """A subclass of DefaultCookiePolicy to allow cookie set for Domain=localhost.""" def return_ok_domain(self, cookie, request): if cookie.domain.endswith(request.origin_req_host): return True return http_cookiejar.DefaultCookiePolicy.return_ok_domain( self, cookie, request) def set_ok_domain(self, cookie, request): if cookie.domain.endswith(request.origin_req_host): return True return http_cookiejar.DefaultCookiePolicy.set_ok_domain( self, cookie, request) class TestRequest(webob.BaseRequest): """A subclass of webob.Request""" ResponseClass = TestResponse class TestApp(object): """ Wraps a WSGI application in a more convenient interface for testing. It uses extended version of :class:`webob.BaseRequest` and :class:`webob.Response`. :param app: May be an WSGI application or Paste Deploy app, like ``'config:filename.ini#test'``. .. versionadded:: 2.0 It can also be an actual full URL to an http server and webtest will proxy requests with `WSGIProxy2 `_. :type app: WSGI application :param extra_environ: A dictionary of values that should go into the environment for each request. These can provide a communication channel with the application. :type extra_environ: dict :param relative_to: A directory used for file uploads are calculated relative to this. Also ``config:`` URIs that aren't absolute. :type relative_to: string :param cookiejar: :class:`cookielib.CookieJar` alike API that keeps cookies across requets. :type cookiejar: CookieJar instance .. attribute:: cookies A convenient shortcut for a dict of all cookies in ``cookiejar``. :param parser_features: Passed to BeautifulSoup when parsing responses. :type parser_features: string or list :param json_encoder: Passed to json.dumps when encoding json :type json_encoder: A subclass of json.JSONEncoder :param lint: If True (default) then check that the application is WSGI compliant :type lint: A boolean """ RequestClass = TestRequest def __init__(self, app, extra_environ=None, relative_to=None, use_unicode=True, cookiejar=None, parser_features=None, json_encoder=None, lint=True): if 'WEBTEST_TARGET_URL' in os.environ: app = os.environ['WEBTEST_TARGET_URL'] if isinstance(app, string_types): if app.startswith('http'): try: from wsgiproxy import HostProxy except ImportError: # pragma: no cover raise ImportError(( 'Using webtest with a real url requires WSGIProxy2. ' 'Please install it with: ' 'pip install WSGIProxy2')) if '#' not in app: app += '#httplib' url, client = app.split('#', 1) app = HostProxy(url, client=client) else: from paste.deploy import loadapp # @@: Should pick up relative_to from calling module's # __file__ app = loadapp(app, relative_to=relative_to) self.app = app self.lint = lint self.relative_to = relative_to if extra_environ is None: extra_environ = {} self.extra_environ = extra_environ self.use_unicode = use_unicode if cookiejar is None: cookiejar = http_cookiejar.CookieJar(policy=CookiePolicy()) self.cookiejar = cookiejar if parser_features is None: parser_features = 'html.parser' self.RequestClass.ResponseClass.parser_features = parser_features if json_encoder is None: json_encoder = json.JSONEncoder self.JSONEncoder = json_encoder def get_authorization(self): """Allow to set the HTTP_AUTHORIZATION environ key. Value should looks like ``('Basic', ('user', 'password'))`` If value is None the the HTTP_AUTHORIZATION is removed """ return self.authorization_value def set_authorization(self, value): self.authorization_value = value if value is not None: invalid_value = ( "You should use a value like ('Basic', ('user', 'password'))" " OR ('Bearer', 'token') OR ('JWT', 'token')" ) if isinstance(value, (list, tuple)) and len(value) == 2: authtype, val = value if authtype == 'Basic' and val and \ isinstance(val, (list, tuple)): val = ':'.join(list(val)) val = b64encode(to_bytes(val)).strip() val = val.decode('latin1') elif authtype in ('Bearer', 'JWT') and val and \ isinstance(val, (str, text_type)): val = val.strip() else: raise ValueError(invalid_value) value = str('%s %s' % (authtype, val)) else: raise ValueError(invalid_value) self.extra_environ.update({ 'HTTP_AUTHORIZATION': value, }) else: if 'HTTP_AUTHORIZATION' in self.extra_environ: del self.extra_environ['HTTP_AUTHORIZATION'] authorization = property(get_authorization, set_authorization) @property def cookies(self): return dict([(cookie.name, cookie.value) for cookie in self.cookiejar]) def set_cookie(self, name, value): """ Sets a cookie to be passed through with requests. """ cookie_domain = self.extra_environ.get('HTTP_HOST', 'localhost') cookie_domain = cookie_domain.split(':', 1)[0] cookie_domain = '.' + cookie_domain value = escape_cookie_value(value) cookie = http_cookiejar.Cookie( version=0, name=name, value=value, port=None, port_specified=False, domain=cookie_domain, domain_specified=True, domain_initial_dot=False, path='/', path_specified=True, secure=False, expires=None, discard=False, comment=None, comment_url=None, rest=None ) self.cookiejar.set_cookie(cookie) def reset(self): """ Resets the state of the application; currently just clears saved cookies. """ self.cookiejar.clear() def set_parser_features(self, parser_features): """ Changes the parser used by BeautifulSoup. See its documentation to know the supported parsers. """ self.RequestClass.ResponseClass.parser_features = parser_features def get(self, url, params=None, headers=None, extra_environ=None, status=None, expect_errors=False, xhr=False): """ Do a GET request given the url path. :param params: A query string, or a dictionary that will be encoded into a query string. You may also include a URL query string on the ``url``. :param headers: Extra headers to send. :type headers: dictionary :param extra_environ: Environmental variables that should be added to the request. :type extra_environ: dictionary :param status: The HTTP status code you expect in response (if not 200 or 3xx). You can also use a wildcard, like ``'3*'`` or ``'*'``. :type status: integer or string :param expect_errors: If this is False, then if anything is written to environ ``wsgi.errors`` it will be an error. If it is True, then non-200/3xx responses are also okay. :type expect_errors: boolean :param xhr: If this is true, then marks response as ajax. The same as headers={'X-REQUESTED-WITH': 'XMLHttpRequest', } :type xhr: boolean :returns: :class:`webtest.TestResponse` instance. """ environ = self._make_environ(extra_environ) url = str(url) url = self._remove_fragment(url) if params: if not isinstance(params, string_types): params = urlencode(params, doseq=True) if str('?') in url: url += str('&') else: url += str('?') url += params if str('?') in url: url, environ['QUERY_STRING'] = url.split(str('?'), 1) else: environ['QUERY_STRING'] = str('') req = self.RequestClass.blank(url, environ) if xhr: headers = self._add_xhr_header(headers) if headers: req.headers.update(headers) return self.do_request(req, status=status, expect_errors=expect_errors) def post(self, url, params='', headers=None, extra_environ=None, status=None, upload_files=None, expect_errors=False, content_type=None, xhr=False): """ Do a POST request. Similar to :meth:`~webtest.TestApp.get`. :param params: Are put in the body of the request. If params is a iterator it will be urlencoded, if it is string it will not be encoded, but placed in the body directly. Can be a collections.OrderedDict with :class:`webtest.forms.Upload` fields included:: app.post('/myurl', collections.OrderedDict([ ('textfield1', 'value1'), ('uploadfield', webapp.Upload('filename.txt', 'contents'), ('textfield2', 'value2')]))) :param upload_files: It should be a list of ``(fieldname, filename, file_content)``. You can also use just ``(fieldname, filename)`` and the file contents will be read from disk. :type upload_files: list :param content_type: HTTP content type, for example `application/json`. :type content_type: string :param xhr: If this is true, then marks response as ajax. The same as headers={'X-REQUESTED-WITH': 'XMLHttpRequest', } :type xhr: boolean :returns: :class:`webtest.TestResponse` instance. """ if xhr: headers = self._add_xhr_header(headers) return self._gen_request('POST', url, params=params, headers=headers, extra_environ=extra_environ, status=status, upload_files=upload_files, expect_errors=expect_errors, content_type=content_type) def put(self, url, params='', headers=None, extra_environ=None, status=None, upload_files=None, expect_errors=False, content_type=None, xhr=False): """ Do a PUT request. Similar to :meth:`~webtest.TestApp.post`. :returns: :class:`webtest.TestResponse` instance. """ if xhr: headers = self._add_xhr_header(headers) return self._gen_request('PUT', url, params=params, headers=headers, extra_environ=extra_environ, status=status, upload_files=upload_files, expect_errors=expect_errors, content_type=content_type, ) def patch(self, url, params='', headers=None, extra_environ=None, status=None, upload_files=None, expect_errors=False, content_type=None, xhr=False): """ Do a PATCH request. Similar to :meth:`~webtest.TestApp.post`. :returns: :class:`webtest.TestResponse` instance. """ if xhr: headers = self._add_xhr_header(headers) return self._gen_request('PATCH', url, params=params, headers=headers, extra_environ=extra_environ, status=status, upload_files=upload_files, expect_errors=expect_errors, content_type=content_type) def delete(self, url, params='', headers=None, extra_environ=None, status=None, expect_errors=False, content_type=None, xhr=False): """ Do a DELETE request. Similar to :meth:`~webtest.TestApp.get`. :returns: :class:`webtest.TestResponse` instance. """ if xhr: headers = self._add_xhr_header(headers) return self._gen_request('DELETE', url, params=params, headers=headers, extra_environ=extra_environ, status=status, upload_files=None, expect_errors=expect_errors, content_type=content_type) def options(self, url, headers=None, extra_environ=None, status=None, expect_errors=False, xhr=False): """ Do a OPTIONS request. Similar to :meth:`~webtest.TestApp.get`. :returns: :class:`webtest.TestResponse` instance. """ if xhr: headers = self._add_xhr_header(headers) return self._gen_request('OPTIONS', url, headers=headers, extra_environ=extra_environ, status=status, upload_files=None, expect_errors=expect_errors) def head(self, url, headers=None, extra_environ=None, status=None, expect_errors=False, xhr=False): """ Do a HEAD request. Similar to :meth:`~webtest.TestApp.get`. :returns: :class:`webtest.TestResponse` instance. """ if xhr: headers = self._add_xhr_header(headers) return self._gen_request('HEAD', url, headers=headers, extra_environ=extra_environ, status=status, upload_files=None, expect_errors=expect_errors) post_json = utils.json_method('POST') put_json = utils.json_method('PUT') patch_json = utils.json_method('PATCH') delete_json = utils.json_method('DELETE') def encode_multipart(self, params, files): """ Encodes a set of parameters (typically a name/value list) and a set of files (a list of (name, filename, file_body, mimetype)) into a typical POST body, returning the (content_type, body). """ boundary = to_bytes(str(random.random()))[2:] boundary = b'----------a_BoUnDaRy' + boundary + b'$' lines = [] def _append_file(file_info): key, filename, value, fcontent = self._get_file_info(file_info) if isinstance(key, text_type): try: key = key.encode('ascii') except: # pragma: no cover raise # file name must be ascii if isinstance(filename, text_type): try: filename = filename.encode('utf8') except: # pragma: no cover raise # file name must be ascii or utf8 if not fcontent: fcontent = mimetypes.guess_type(filename.decode('utf8'))[0] fcontent = to_bytes(fcontent) fcontent = fcontent or b'application/octet-stream' lines.extend([ b'--' + boundary, b'Content-Disposition: form-data; ' + b'name="' + key + b'"; filename="' + filename + b'"', b'Content-Type: ' + fcontent, b'', value]) for key, value in params: if isinstance(key, text_type): try: key = key.encode('ascii') except: # pragma: no cover raise # field name are always ascii if isinstance(value, forms.File): if value.value: _append_file([key] + list(value.value)) else: # If no file was uploaded simulate an empty file with no # name like real browsers do: _append_file([key, b'', b'']) elif isinstance(value, forms.Upload): file_info = [key, value.filename] if value.content is not None: file_info.append(value.content) if value.content_type is not None: file_info.append(value.content_type) _append_file(file_info) else: if isinstance(value, text_type): value = value.encode('utf8') lines.extend([ b'--' + boundary, b'Content-Disposition: form-data; name="' + key + b'"', b'', value]) for file_info in files: _append_file(file_info) lines.extend([b'--' + boundary + b'--', b'']) body = b'\r\n'.join(lines) boundary = boundary.decode('ascii') content_type = 'multipart/form-data; boundary=%s' % boundary return content_type, body def request(self, url_or_req, status=None, expect_errors=False, **req_params): """ Creates and executes a request. You may either pass in an instantiated :class:`TestRequest` object, or you may pass in a URL and keyword arguments to be passed to :meth:`TestRequest.blank`. You can use this to run a request without the intermediary functioning of :meth:`TestApp.get` etc. For instance, to test a WebDAV method:: resp = app.request('/new-col', method='MKCOL') Note that the request won't have a body unless you specify it, like:: resp = app.request('/test.txt', method='PUT', body='test') You can use :class:`webtest.TestRequest`:: req = webtest.TestRequest.blank('/url/', method='GET') resp = app.do_request(req) """ if isinstance(url_or_req, text_type): url_or_req = str(url_or_req) for (k, v) in req_params.items(): if isinstance(v, text_type): req_params[k] = str(v) if isinstance(url_or_req, string_types): req = self.RequestClass.blank(url_or_req, **req_params) else: req = url_or_req.copy() for name, value in req_params.items(): setattr(req, name, value) req.environ['paste.throw_errors'] = True for name, value in self.extra_environ.items(): req.environ.setdefault(name, value) return self.do_request(req, status=status, expect_errors=expect_errors, ) def do_request(self, req, status=None, expect_errors=None): """ Executes the given webob Request (``req``), with the expected ``status``. Generally :meth:`~webtest.TestApp.get` and :meth:`~webtest.TestApp.post` are used instead. To use this:: req = webtest.TestRequest.blank('url', ...args...) resp = app.do_request(req) .. note:: You can pass any keyword arguments to ``TestRequest.blank()``, which will be set on the request. These can be arguments like ``content_type``, ``accept``, etc. """ errors = StringIO() req.environ['wsgi.errors'] = errors script_name = req.environ.get('SCRIPT_NAME', '') if script_name and req.path_info.startswith(script_name): req.path_info = req.path_info[len(script_name):] # set framework hooks req.environ['paste.testing'] = True req.environ['paste.testing_variables'] = {} # set request cookies self.cookiejar.add_cookie_header(utils._RequestCookieAdapter(req)) # verify wsgi compatibility app = lint.middleware(self.app) if self.lint else self.app # FIXME: should it be an option to not catch exc_info? res = req.get_response(app, catch_exc_info=True) # be sure to decode the content res.decode_content() # set a few handy attributes res._use_unicode = self.use_unicode res.request = req res.app = app res.test_app = self # We do this to make sure the app_iter is exhausted: try: res.body except TypeError: # pragma: no cover pass res.errors = errors.getvalue() for name, value in req.environ['paste.testing_variables'].items(): if hasattr(res, name): raise ValueError( "paste.testing_variables contains the variable %r, but " "the response object already has an attribute by that " "name" % name) setattr(res, name, value) if not expect_errors: self._check_status(status, res) self._check_errors(res) # merge cookies back in self.cookiejar.extract_cookies(utils._ResponseCookieAdapter(res), utils._RequestCookieAdapter(req)) return res def _check_status(self, status, res): if status == '*': return res_status = res.status if (isinstance(status, string_types) and '*' in status): if re.match(fnmatch.translate(status), res_status, re.I): return if isinstance(status, string_types): if status == res_status: return if isinstance(status, (list, tuple)): if res.status_int not in status: raise AppError( "Bad response: %s (not one of %s for %s)\n%s", res_status, ', '.join(map(str, status)), res.request.url, res) return if status is None: if res.status_int >= 200 and res.status_int < 400: return raise AppError( "Bad response: %s (not 200 OK or 3xx redirect for %s)\n%s", res_status, res.request.url, res) if status != res.status_int: raise AppError( "Bad response: %s (not %s)\n%s", res_status, status, res) def _check_errors(self, res): errors = res.errors if errors: raise AppError( "Application had errors logged:\n%s", errors) def _make_environ(self, extra_environ=None): environ = self.extra_environ.copy() environ['paste.throw_errors'] = True if extra_environ: environ.update(extra_environ) return environ def _remove_fragment(self, url): scheme, netloc, path, query, fragment = urlparse.urlsplit(url) return urlparse.urlunsplit((scheme, netloc, path, query, "")) def _gen_request(self, method, url, params=utils.NoDefault, headers=None, extra_environ=None, status=None, upload_files=None, expect_errors=False, content_type=None): """ Do a generic request. """ environ = self._make_environ(extra_environ) inline_uploads = [] # this supports OrderedDict if isinstance(params, dict) or hasattr(params, 'items'): params = list(params.items()) if isinstance(params, (list, tuple)): inline_uploads = [v for (k, v) in params if isinstance(v, (forms.File, forms.Upload))] if len(inline_uploads) > 0: content_type, params = self.encode_multipart( params, upload_files or ()) environ['CONTENT_TYPE'] = content_type else: params = utils.encode_params(params, content_type) if upload_files or \ (content_type and to_bytes(content_type).startswith(b'multipart')): params = urlparse.parse_qsl(params, keep_blank_values=True) content_type, params = self.encode_multipart( params, upload_files or ()) environ['CONTENT_TYPE'] = content_type elif params: environ.setdefault('CONTENT_TYPE', str('application/x-www-form-urlencoded')) if content_type is not None: environ['CONTENT_TYPE'] = content_type environ['REQUEST_METHOD'] = str(method) url = str(url) url = self._remove_fragment(url) req = self.RequestClass.blank(url, environ) if isinstance(params, text_type): params = params.encode(req.charset or 'utf8') req.environ['wsgi.input'] = BytesIO(params) req.content_length = len(params) if headers: req.headers.update(headers) return self.do_request(req, status=status, expect_errors=expect_errors) def _get_file_info(self, file_info): if len(file_info) == 2: # It only has a filename filename = file_info[1] if self.relative_to: filename = os.path.join(self.relative_to, filename) f = open(filename, 'rb') content = f.read() f.close() return (file_info[0], filename, content, None) elif 3 <= len(file_info) <= 4: content = file_info[2] if not isinstance(content, binary_type): raise ValueError('File content must be %s not %s' % (binary_type, type(content))) if len(file_info) == 3: return tuple(file_info) + (None,) else: return file_info else: raise ValueError( "upload_files need to be a list of tuples of (fieldname, " "filename, filecontent, mimetype) or (fieldname, " "filename, filecontent) or (fieldname, filename); " "you gave: %r" % repr(file_info)[:100]) @staticmethod def _add_xhr_header(headers): headers = headers or {} # if remove str we will be have an error in lint.middleware headers.update({'X-REQUESTED-WITH': str('XMLHttpRequest')}) return headers