Package omeroweb :: Package webadmin :: Package tests :: Module request_factory
[hide private]
[frames] | no frames]

Source Code for Module omeroweb.webadmin.tests.request_factory

  1  #!/usr/bin/env python 
  2  # -*- coding: utf-8 -*- 
  3  # 
  4  # 
  5  # 
  6  # Copyright (c) 2008 University of Dundee.  
  7  # 
  8  # This program is free software: you can redistribute it and/or modify 
  9  # it under the terms of the GNU Affero General Public License as 
 10  # published by the Free Software Foundation, either version 3 of the 
 11  # License, or (at your option) any later version. 
 12  # 
 13  # This program is distributed in the hope that it will be useful, 
 14  # but WITHOUT ANY WARRANTY; without even the implied warranty of 
 15  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the 
 16  # GNU Affero General Public License for more details. 
 17  # 
 18  # You should have received a copy of the GNU Affero General Public License 
 19  # along with this program.  If not, see <http://www.gnu.org/licenses/>. 
 20  # 
 21   
 22  import urllib 
 23  from urlparse import urlparse, urlunparse, urlsplit 
 24  import sys 
 25  import os 
 26  import re 
 27  import mimetypes 
 28  import warnings 
 29  try: 
 30      from cStringIO import StringIO 
 31  except ImportError: 
 32      from StringIO import StringIO 
 33       
 34  from django.conf import settings 
 35  from django.contrib.auth import authenticate, login 
 36  from django.core.handlers.base import BaseHandler 
 37  from django.core.handlers.wsgi import WSGIRequest 
 38  from django.core.signals import got_request_exception 
 39  from django.core.urlresolvers import reverse 
 40  from django.http import SimpleCookie, HttpRequest, QueryDict 
 41  from django.template import TemplateDoesNotExist 
 42  from django.test import signals 
 43  from django.utils.functional import curry 
 44  from django.utils.encoding import smart_str 
 45  from django.utils.http import urlencode 
 46  from django.utils.importlib import import_module 
 47  from django.utils.itercompat import is_iterable 
 48  from django.db import transaction, close_connection 
 49  from django.test.utils import ContextList 
 50   
 51  from omeroweb.connector import Connector 
 52   
 53  __all__ = ('Client', 'RequestFactory', 'encode_file', 'encode_multipart') 
 54   
 55   
 56  BOUNDARY = 'BoUnDaRyStRiNg' 
 57  MULTIPART_CONTENT = 'multipart/form-data; boundary=%s' % BOUNDARY 
 58  CONTENT_TYPE_RE = re.compile('.*; charset=([\w\d-]+);?') 
59 60 -def fakeRequest (method, path="/", params={}, **kwargs):
61 def bogus_request(self, **request): 62 """ 63 Usage: 64 rf = RequestFactory() 65 get_request = rf.get('/hello/') 66 post_request = rf.post('/submit/', {'foo': 'bar'}) 67 """ 68 if not method.lower() in ('post', 'get'): 69 raise AttributeError("Method must be 'get' or 'post'") 70 if not isinstance(params, dict): 71 raise AttributeError("Params must be a dictionary") 72 73 rf = RequestFactory() 74 r = getattr(rf, method.lower())(path, params) 75 if 'django.contrib.sessions' in settings.INSTALLED_APPS: 76 engine = __import__(settings.SESSION_ENGINE, {}, {}, ['']) 77 r.session = engine.SessionStore() 78 return r
79 Client.bogus_request = bogus_request 80 c = Client() 81 return c.bogus_request(**kwargs) 82
83 84 -class FakePayload(object):
85 """ 86 A wrapper around StringIO that restricts what can be read since data from 87 the network can't be seeked and cannot be read outside of its content 88 length. This makes sure that views can't do anything under the test client 89 that wouldn't work in Real Life. 90 """
91 - def __init__(self, content):
92 self.__content = StringIO(content) 93 self.__len = len(content)
94
95 - def read(self, num_bytes=None):
96 if num_bytes is None: 97 num_bytes = self.__len or 1 98 assert self.__len >= num_bytes, "Cannot read more than the available bytes from the HTTP incoming data." 99 content = self.__content.read(num_bytes) 100 self.__len -= num_bytes 101 return content
102
103 104 -class ClientHandler(BaseHandler):
105 """ 106 A HTTP Handler that can be used for testing purposes. 107 Uses the WSGI interface to compose requests, but returns 108 the raw HttpResponse object 109 """
110 - def __init__(self, enforce_csrf_checks=True, *args, **kwargs):
111 self.enforce_csrf_checks = enforce_csrf_checks 112 super(ClientHandler, self).__init__(*args, **kwargs)
113
114 - def __call__(self, environ):
115 from django.conf import settings 116 from django.core import signals 117 118 # Set up middleware if needed. We couldn't do this earlier, because 119 # settings weren't available. 120 if self._request_middleware is None: 121 self.load_middleware() 122 123 signals.request_started.send(sender=self.__class__) 124 try: 125 request = WSGIRequest(environ) 126 # sneaky little hack so that we can easily get round 127 # CsrfViewMiddleware. This makes life easier, and is probably 128 # required for backwards compatibility with external tests against 129 # admin views. 130 request._dont_enforce_csrf_checks = not self.enforce_csrf_checks 131 response = self.get_response(request) 132 finally: 133 signals.request_finished.disconnect(close_connection) 134 signals.request_finished.send(sender=self.__class__) 135 signals.request_finished.connect(close_connection) 136 137 return response
138
139 -def store_rendered_templates(store, signal, sender, template, context, **kwargs):
140 """ 141 Stores templates and contexts that are rendered. 142 """ 143 store.setdefault('templates', []).append(template) 144 store.setdefault('context', ContextList()).append(context)
145
146 -def encode_multipart(boundary, data):
147 """ 148 Encodes multipart POST data from a dictionary of form values. 149 150 The key will be used as the form data name; the value will be transmitted 151 as content. If the value is a file, the contents of the file will be sent 152 as an application/octet-stream; otherwise, str(value) will be sent. 153 """ 154 lines = [] 155 to_str = lambda s: smart_str(s, settings.DEFAULT_CHARSET) 156 157 # Not by any means perfect, but good enough for our purposes. 158 is_file = lambda thing: hasattr(thing, "read") and callable(thing.read) 159 160 # Each bit of the multipart form data could be either a form value or a 161 # file, or a *list* of form values and/or files. Remember that HTTP field 162 # names can be duplicated! 163 for (key, value) in data.items(): 164 if is_file(value): 165 lines.extend(encode_file(boundary, key, value)) 166 elif not isinstance(value, basestring) and is_iterable(value): 167 for item in value: 168 if is_file(item): 169 lines.extend(encode_file(boundary, key, item)) 170 else: 171 lines.extend([ 172 '--' + boundary, 173 'Content-Disposition: form-data; name="%s"' % to_str(key), 174 '', 175 to_str(item) 176 ]) 177 else: 178 lines.extend([ 179 '--' + boundary, 180 'Content-Disposition: form-data; name="%s"' % to_str(key), 181 '', 182 to_str(value) 183 ]) 184 185 lines.extend([ 186 '--' + boundary + '--', 187 '', 188 ]) 189 return '\r\n'.join(lines)
190
191 -def encode_file(boundary, key, file):
192 to_str = lambda s: smart_str(s, settings.DEFAULT_CHARSET) 193 content_type = mimetypes.guess_type(file.name)[0] 194 if content_type is None: 195 content_type = 'application/octet-stream' 196 return [ 197 '--' + boundary, 198 'Content-Disposition: form-data; name="%s"; filename="%s"' \ 199 % (to_str(key), to_str(os.path.basename(file.name))), 200 'Content-Type: %s' % content_type, 201 '', 202 file.read() 203 ]
204
205 206 207 -class RequestFactory(object):
208 """ 209 Class that lets you create mock Request objects for use in testing. 210 211 Usage: 212 213 rf = RequestFactory() 214 get_request = rf.get('/hello/') 215 post_request = rf.post('/submit/', {'foo': 'bar'}) 216 217 Once you have a request object you can pass it to any view function, 218 just as if that view had been hooked up using a URLconf. 219 """
220 - def __init__(self, **defaults):
221 self.defaults = defaults 222 self.cookies = SimpleCookie() 223 self.errors = StringIO()
224
225 - def _base_environ(self, **request):
226 """ 227 The base environment for a request. 228 """ 229 environ = { 230 'HTTP_COOKIE': self.cookies.output(header='', sep='; '), 231 'PATH_INFO': '/', 232 'QUERY_STRING': '', 233 'REMOTE_ADDR': '127.0.0.1', 234 'REQUEST_METHOD': 'GET', 235 'SCRIPT_NAME': '', 236 'SERVER_NAME': 'testserver', 237 'SERVER_PORT': '80', 238 'SERVER_PROTOCOL': 'HTTP/1.1', 239 'wsgi.version': (1,0), 240 'wsgi.url_scheme': 'http', 241 'wsgi.errors': self.errors, 242 'wsgi.multiprocess': True, 243 'wsgi.multithread': False, 244 'wsgi.run_once': False, 245 } 246 environ.update(self.defaults) 247 environ.update(request) 248 return environ
249
250 - def request(self, **request):
251 "Construct a generic request object with session." 252 return WSGIRequest(self._base_environ(**request))
253
254 - def get(self, path, data={}, **extra):
255 "Construct a GET request" 256 257 parsed = urlparse(path) 258 r = { 259 'CONTENT_TYPE': 'text/html; charset=utf-8', 260 'PATH_INFO': urllib.unquote(parsed[2]), 261 'QUERY_STRING': urlencode(data, doseq=True) or parsed[4], 262 'REQUEST_METHOD': 'GET', 263 'wsgi.input': FakePayload('') 264 } 265 r.update(extra) 266 return self.request(**r)
267
268 - def post(self, path, data={}, content_type=MULTIPART_CONTENT, 269 **extra):
270 "Construct a POST request." 271 272 if content_type is MULTIPART_CONTENT: 273 post_data = encode_multipart(BOUNDARY, data) 274 else: 275 # Encode the content so that the byte representation is correct. 276 match = CONTENT_TYPE_RE.match(content_type) 277 if match: 278 charset = match.group(1) 279 else: 280 charset = settings.DEFAULT_CHARSET 281 post_data = smart_str(data, encoding=charset) 282 283 parsed = urlparse(path) 284 r = { 285 'CONTENT_LENGTH': len(post_data), 286 'CONTENT_TYPE': content_type, 287 'PATH_INFO': urllib.unquote(parsed[2]), 288 'QUERY_STRING': parsed[4], 289 'REQUEST_METHOD': 'POST', 290 'wsgi.input': FakePayload(post_data), 291 } 292 r.update(extra) 293 return self.request(**r)
294
295 - def head(self, path, data={}, **extra):
296 "Construct a HEAD request." 297 298 parsed = urlparse(path) 299 r = { 300 'CONTENT_TYPE': 'text/html; charset=utf-8', 301 'PATH_INFO': urllib.unquote(parsed[2]), 302 'QUERY_STRING': urlencode(data, doseq=True) or parsed[4], 303 'REQUEST_METHOD': 'HEAD', 304 'wsgi.input': FakePayload('') 305 } 306 r.update(extra) 307 return self.request(**r)
308
309 - def options(self, path, data={}, **extra):
310 "Constrict an OPTIONS request" 311 312 parsed = urlparse(path) 313 r = { 314 'PATH_INFO': urllib.unquote(parsed[2]), 315 'QUERY_STRING': urlencode(data, doseq=True) or parsed[4], 316 'REQUEST_METHOD': 'OPTIONS', 317 'wsgi.input': FakePayload('') 318 } 319 r.update(extra) 320 return self.request(**r)
321
322 - def put(self, path, data={}, content_type=MULTIPART_CONTENT, 323 **extra):
324 "Construct a PUT request." 325 326 if content_type is MULTIPART_CONTENT: 327 post_data = encode_multipart(BOUNDARY, data) 328 else: 329 post_data = data 330 331 # Make `data` into a querystring only if it's not already a string. If 332 # it is a string, we'll assume that the caller has already encoded it. 333 query_string = None 334 if not isinstance(data, basestring): 335 query_string = urlencode(data, doseq=True) 336 337 parsed = urlparse(path) 338 r = { 339 'CONTENT_LENGTH': len(post_data), 340 'CONTENT_TYPE': content_type, 341 'PATH_INFO': urllib.unquote(parsed[2]), 342 'QUERY_STRING': query_string or parsed[4], 343 'REQUEST_METHOD': 'PUT', 344 'wsgi.input': FakePayload(post_data), 345 } 346 r.update(extra) 347 return self.request(**r)
348
349 - def delete(self, path, data={}, **extra):
350 "Construct a DELETE request." 351 352 parsed = urlparse(path) 353 r = { 354 'PATH_INFO': urllib.unquote(parsed[2]), 355 'QUERY_STRING': urlencode(data, doseq=True) or parsed[4], 356 'REQUEST_METHOD': 'DELETE', 357 'wsgi.input': FakePayload('') 358 } 359 r.update(extra) 360 return self.request(**r)
361
362 363 -class Client(RequestFactory):
364 """ 365 A class that can act as a client for testing purposes. 366 367 It allows the user to compose GET and POST requests, and 368 obtain the response that the server gave to those requests. 369 The server Response objects are annotated with the details 370 of the contexts and templates that were rendered during the 371 process of serving the request. 372 373 Client objects are stateful - they will retain cookie (and 374 thus session) details for the lifetime of the Client instance. 375 376 This is not intended as a replacement for Twill/Selenium or 377 the like - it is here to allow testing against the 378 contexts and templates produced by a view, rather than the 379 HTML rendered to the end-user. 380 """
381 - def __init__(self, enforce_csrf_checks=False, **defaults):
382 super(Client, self).__init__(**defaults) 383 self.handler = ClientHandler(enforce_csrf_checks) 384 self.exc_info = None
385
386 - def store_exc_info(self, **kwargs):
387 """ 388 Stores exceptions when they are generated by a view. 389 """ 390 self.exc_info = sys.exc_info()
391
392 - def _session(self):
393 """ 394 Obtains the current session variables. 395 """ 396 if 'django.contrib.sessions' in settings.INSTALLED_APPS: 397 engine = import_module(settings.SESSION_ENGINE) 398 cookie = self.cookies.get(settings.SESSION_COOKIE_NAME, None) 399 if cookie: 400 return engine.SessionStore(cookie.value) 401 return {}
402 session = property(_session) 403 404
405 - def request(self, **request):
406 """ 407 The master request method. Composes the environment dictionary 408 and passes to the handler, returning the result of the handler. 409 Assumes defaults for the query environment, which can be overridden 410 using the arguments to the request. 411 """ 412 environ = self._base_environ(**request) 413 414 # Curry a data dictionary into an instance of the template renderer 415 # callback function. 416 data = {} 417 on_template_render = curry(store_rendered_templates, data) 418 signals.template_rendered.connect(on_template_render, dispatch_uid="template-render") 419 # Capture exceptions created by the handler. 420 got_request_exception.connect(self.store_exc_info, dispatch_uid="request-exception") 421 try: 422 423 try: 424 response = self.handler(environ) 425 except TemplateDoesNotExist, e: 426 # If the view raises an exception, Django will attempt to show 427 # the 500.html template. If that template is not available, 428 # we should ignore the error in favor of re-raising the 429 # underlying exception that caused the 500 error. Any other 430 # template found to be missing during view error handling 431 # should be reported as-is. 432 if e.args != ('500.html',): 433 raise 434 435 # Look for a signalled exception, clear the current context 436 # exception data, then re-raise the signalled exception. 437 # Also make sure that the signalled exception is cleared from 438 # the local cache! 439 if self.exc_info: 440 exc_info = self.exc_info 441 self.exc_info = None 442 raise exc_info[1], None, exc_info[2] 443 444 # Save the client and request that stimulated the response. 445 response.client = self 446 response.request = request 447 448 # Add any rendered template detail to the response. 449 response.templates = data.get("templates", []) 450 response.context = data.get("context") 451 452 # Flatten a single context. Not really necessary anymore thanks to 453 # the __getattr__ flattening in ContextList, but has some edge-case 454 # backwards-compatibility implications. 455 if response.context and len(response.context) == 1: 456 response.context = response.context[0] 457 458 # Provide a backwards-compatible (but pending deprecation) response.template 459 def _get_template(self): 460 warnings.warn("response.template is deprecated; use response.templates instead (which is always a list)", 461 PendingDeprecationWarning, stacklevel=2) 462 if not self.templates: 463 return None 464 elif len(self.templates) == 1: 465 return self.templates[0] 466 return self.templates
467 response.__class__.template = property(_get_template) 468 469 # Update persistent cookie data. 470 if response.cookies: 471 self.cookies.update(response.cookies) 472 473 return response 474 finally: 475 signals.template_rendered.disconnect(dispatch_uid="template-render") 476 got_request_exception.disconnect(dispatch_uid="request-exception")
477
478 - def get(self, path, data={}, follow=False, **extra):
479 """ 480 Requests a response from the server using GET. 481 """ 482 response = super(Client, self).get(path, data=data, **extra) 483 if follow: 484 response = self._handle_redirects(response, **extra) 485 return response
486
487 - def post(self, path, data={}, content_type=MULTIPART_CONTENT, 488 follow=False, **extra):
489 """ 490 Requests a response from the server using POST. 491 """ 492 response = super(Client, self).post(path, data=data, content_type=content_type, **extra) 493 if follow: 494 response = self._handle_redirects(response, **extra) 495 return response
496
497 - def head(self, path, data={}, follow=False, **extra):
498 """ 499 Request a response from the server using HEAD. 500 """ 501 response = super(Client, self).head(path, data=data, **extra) 502 if follow: 503 response = self._handle_redirects(response, **extra) 504 return response
505
506 - def options(self, path, data={}, follow=False, **extra):
507 """ 508 Request a response from the server using OPTIONS. 509 """ 510 response = super(Client, self).options(path, data=data, **extra) 511 if follow: 512 response = self._handle_redirects(response, **extra) 513 return response
514
515 - def put(self, path, data={}, content_type=MULTIPART_CONTENT, 516 follow=False, **extra):
517 """ 518 Send a resource to the server using PUT. 519 """ 520 response = super(Client, self).put(path, data=data, content_type=content_type, **extra) 521 if follow: 522 response = self._handle_redirects(response, **extra) 523 return response
524
525 - def delete(self, path, data={}, follow=False, **extra):
526 """ 527 Send a DELETE request to the server. 528 """ 529 response = super(Client, self).delete(path, data=data, **extra) 530 if follow: 531 response = self._handle_redirects(response, **extra) 532 return response
533
534 - def login(self, login, password, server_id=1, secure=True):
535 """ 536 Sets the Factory to appear as if it has successfully logged into a site. 537 538 Returns True if login is possible; False if the provided credentials 539 are incorrect, or the user is inactive, or if the sessions framework is 540 not available. 541 """ 542 engine = import_module(settings.SESSION_ENGINE) 543 544 params = { 545 'username': login, 546 'password': password, 547 'server':server_id, 548 'ssl':'on' 549 } 550 request = fakeRequest(method="post", path=(reverse(viewname="walogin")), params=params) 551 552 if self.session: 553 request.session = self.session 554 else: 555 request.session = engine.SessionStore() 556 557 # Set the cookie to represent the session. 558 session_cookie = settings.SESSION_COOKIE_NAME 559 self.cookies[session_cookie] = request.session.session_key 560 cookie_data = { 561 'max-age': None, 562 'path': '/', 563 'domain': settings.SESSION_COOKIE_DOMAIN, 564 'secure': settings.SESSION_COOKIE_SECURE or None, 565 'expires': None, 566 } 567 self.cookies[session_cookie].update(cookie_data) 568 569 connector = Connector(request.REQUEST.get('server'), True) 570 conn = connector.create_connection('TEST.webadmin', login, password) 571 572 if conn is not None and conn.isConnected() and conn.keepAlive(): 573 request.session.save() 574 return True 575 else: 576 try: 577 try: 578 conn.seppuku() 579 except: 580 self.fail('Exception during logout.', exc_info=True) 581 finally: 582 request.session.flush() 583 return False
584
585 - def logout(self):
586 """ 587 Removes the authenticated user's cookies and session object. 588 589 Causes the authenticated user to be logged out. 590 """ 591 592 session = import_module(settings.SESSION_ENGINE).SessionStore() 593 session_cookie = self.cookies.get(settings.SESSION_COOKIE_NAME) 594 if session_cookie: 595 session.delete(session_key=session_cookie.value) 596 self.cookies = SimpleCookie() 597 598 from omeroweb.webclient.decorators import login_required 599 600 request = fakeRequest(method="get", path=reverse(viewname="weblogout")) 601 @login_required() 602 def foo(request, conn=None): 603 return conn
604 605 try: 606 conn = foo(request) 607 conn.seppuku() 608 finally: 609 request.session.flush() 610
611 - def _handle_redirects(self, response, **extra):
612 "Follows any redirects by requesting responses from the server using GET." 613 614 response.redirect_chain = [] 615 while response.status_code in (301, 302, 303, 307): 616 url = response['Location'] 617 scheme, netloc, path, query, fragment = urlsplit(url) 618 619 redirect_chain = response.redirect_chain 620 redirect_chain.append((url, response.status_code)) 621 622 if scheme: 623 extra['wsgi.url_scheme'] = scheme 624 625 # The test client doesn't handle external links, 626 # but since the situation is simulated in test_client, 627 # we fake things here by ignoring the netloc portion of the 628 # redirected URL. 629 response = self.get(path, QueryDict(query), follow=False, **extra) 630 response.redirect_chain = redirect_chain 631 632 # Prevent loops 633 if response.redirect_chain[-1] in response.redirect_chain[0:-1]: 634 break 635 return response
636