1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import urllib
23 from urlparse import urlparse, urlunparse, urlsplit
24 import sys
25 import os
26 import re
27 import mimetypes
28 import warnings
29 try:
30 from cStringIO import StringIO
31 except ImportError:
32 from StringIO import StringIO
33
34 from django.conf import settings
35 from django.contrib.auth import authenticate, login
36 from django.core.handlers.base import BaseHandler
37 from django.core.handlers.wsgi import WSGIRequest
38 from django.core.signals import got_request_exception
39 from django.core.urlresolvers import reverse
40 from django.http import SimpleCookie, HttpRequest, QueryDict
41 from django.template import TemplateDoesNotExist
42 from django.test import signals
43 from django.utils.functional import curry
44 from django.utils.encoding import smart_str
45 from django.utils.http import urlencode
46 from django.utils.importlib import import_module
47 from django.utils.itercompat import is_iterable
48 from django.db import transaction, close_connection
49 from django.test.utils import ContextList
50
51 from omeroweb.connector import Connector
52
53 __all__ = ('Client', 'RequestFactory', 'encode_file', 'encode_multipart')
54
55
56 BOUNDARY = 'BoUnDaRyStRiNg'
57 MULTIPART_CONTENT = 'multipart/form-data; boundary=%s' % BOUNDARY
58 CONTENT_TYPE_RE = re.compile('.*; charset=([\w\d-]+);?')
59
60 -def fakeRequest (method, path="/", params={}, **kwargs):
61 def bogus_request(self, **request):
62 """
63 Usage:
64 rf = RequestFactory()
65 get_request = rf.get('/hello/')
66 post_request = rf.post('/submit/', {'foo': 'bar'})
67 """
68 if not method.lower() in ('post', 'get'):
69 raise AttributeError("Method must be 'get' or 'post'")
70 if not isinstance(params, dict):
71 raise AttributeError("Params must be a dictionary")
72
73 rf = RequestFactory()
74 r = getattr(rf, method.lower())(path, params)
75 if 'django.contrib.sessions' in settings.INSTALLED_APPS:
76 engine = __import__(settings.SESSION_ENGINE, {}, {}, [''])
77 r.session = engine.SessionStore()
78 return r
79 Client.bogus_request = bogus_request
80 c = Client()
81 return c.bogus_request(**kwargs)
82
85 """
86 A wrapper around StringIO that restricts what can be read since data from
87 the network can't be seeked and cannot be read outside of its content
88 length. This makes sure that views can't do anything under the test client
89 that wouldn't work in Real Life.
90 """
92 self.__content = StringIO(content)
93 self.__len = len(content)
94
95 - def read(self, num_bytes=None):
96 if num_bytes is None:
97 num_bytes = self.__len or 1
98 assert self.__len >= num_bytes, "Cannot read more than the available bytes from the HTTP incoming data."
99 content = self.__content.read(num_bytes)
100 self.__len -= num_bytes
101 return content
102
105 """
106 A HTTP Handler that can be used for testing purposes.
107 Uses the WSGI interface to compose requests, but returns
108 the raw HttpResponse object
109 """
110 - def __init__(self, enforce_csrf_checks=True, *args, **kwargs):
111 self.enforce_csrf_checks = enforce_csrf_checks
112 super(ClientHandler, self).__init__(*args, **kwargs)
113
115 from django.conf import settings
116 from django.core import signals
117
118
119
120 if self._request_middleware is None:
121 self.load_middleware()
122
123 signals.request_started.send(sender=self.__class__)
124 try:
125 request = WSGIRequest(environ)
126
127
128
129
130 request._dont_enforce_csrf_checks = not self.enforce_csrf_checks
131 response = self.get_response(request)
132 finally:
133 signals.request_finished.disconnect(close_connection)
134 signals.request_finished.send(sender=self.__class__)
135 signals.request_finished.connect(close_connection)
136
137 return response
138
140 """
141 Stores templates and contexts that are rendered.
142 """
143 store.setdefault('templates', []).append(template)
144 store.setdefault('context', ContextList()).append(context)
145
147 """
148 Encodes multipart POST data from a dictionary of form values.
149
150 The key will be used as the form data name; the value will be transmitted
151 as content. If the value is a file, the contents of the file will be sent
152 as an application/octet-stream; otherwise, str(value) will be sent.
153 """
154 lines = []
155 to_str = lambda s: smart_str(s, settings.DEFAULT_CHARSET)
156
157
158 is_file = lambda thing: hasattr(thing, "read") and callable(thing.read)
159
160
161
162
163 for (key, value) in data.items():
164 if is_file(value):
165 lines.extend(encode_file(boundary, key, value))
166 elif not isinstance(value, basestring) and is_iterable(value):
167 for item in value:
168 if is_file(item):
169 lines.extend(encode_file(boundary, key, item))
170 else:
171 lines.extend([
172 '--' + boundary,
173 'Content-Disposition: form-data; name="%s"' % to_str(key),
174 '',
175 to_str(item)
176 ])
177 else:
178 lines.extend([
179 '--' + boundary,
180 'Content-Disposition: form-data; name="%s"' % to_str(key),
181 '',
182 to_str(value)
183 ])
184
185 lines.extend([
186 '--' + boundary + '--',
187 '',
188 ])
189 return '\r\n'.join(lines)
190
192 to_str = lambda s: smart_str(s, settings.DEFAULT_CHARSET)
193 content_type = mimetypes.guess_type(file.name)[0]
194 if content_type is None:
195 content_type = 'application/octet-stream'
196 return [
197 '--' + boundary,
198 'Content-Disposition: form-data; name="%s"; filename="%s"' \
199 % (to_str(key), to_str(os.path.basename(file.name))),
200 'Content-Type: %s' % content_type,
201 '',
202 file.read()
203 ]
204
208 """
209 Class that lets you create mock Request objects for use in testing.
210
211 Usage:
212
213 rf = RequestFactory()
214 get_request = rf.get('/hello/')
215 post_request = rf.post('/submit/', {'foo': 'bar'})
216
217 Once you have a request object you can pass it to any view function,
218 just as if that view had been hooked up using a URLconf.
219 """
221 self.defaults = defaults
222 self.cookies = SimpleCookie()
223 self.errors = StringIO()
224
226 """
227 The base environment for a request.
228 """
229 environ = {
230 'HTTP_COOKIE': self.cookies.output(header='', sep='; '),
231 'PATH_INFO': '/',
232 'QUERY_STRING': '',
233 'REMOTE_ADDR': '127.0.0.1',
234 'REQUEST_METHOD': 'GET',
235 'SCRIPT_NAME': '',
236 'SERVER_NAME': 'testserver',
237 'SERVER_PORT': '80',
238 'SERVER_PROTOCOL': 'HTTP/1.1',
239 'wsgi.version': (1,0),
240 'wsgi.url_scheme': 'http',
241 'wsgi.errors': self.errors,
242 'wsgi.multiprocess': True,
243 'wsgi.multithread': False,
244 'wsgi.run_once': False,
245 }
246 environ.update(self.defaults)
247 environ.update(request)
248 return environ
249
251 "Construct a generic request object with session."
252 return WSGIRequest(self._base_environ(**request))
253
254 - def get(self, path, data={}, **extra):
255 "Construct a GET request"
256
257 parsed = urlparse(path)
258 r = {
259 'CONTENT_TYPE': 'text/html; charset=utf-8',
260 'PATH_INFO': urllib.unquote(parsed[2]),
261 'QUERY_STRING': urlencode(data, doseq=True) or parsed[4],
262 'REQUEST_METHOD': 'GET',
263 'wsgi.input': FakePayload('')
264 }
265 r.update(extra)
266 return self.request(**r)
267
268 - def post(self, path, data={}, content_type=MULTIPART_CONTENT,
269 **extra):
270 "Construct a POST request."
271
272 if content_type is MULTIPART_CONTENT:
273 post_data = encode_multipart(BOUNDARY, data)
274 else:
275
276 match = CONTENT_TYPE_RE.match(content_type)
277 if match:
278 charset = match.group(1)
279 else:
280 charset = settings.DEFAULT_CHARSET
281 post_data = smart_str(data, encoding=charset)
282
283 parsed = urlparse(path)
284 r = {
285 'CONTENT_LENGTH': len(post_data),
286 'CONTENT_TYPE': content_type,
287 'PATH_INFO': urllib.unquote(parsed[2]),
288 'QUERY_STRING': parsed[4],
289 'REQUEST_METHOD': 'POST',
290 'wsgi.input': FakePayload(post_data),
291 }
292 r.update(extra)
293 return self.request(**r)
294
295 - def head(self, path, data={}, **extra):
296 "Construct a HEAD request."
297
298 parsed = urlparse(path)
299 r = {
300 'CONTENT_TYPE': 'text/html; charset=utf-8',
301 'PATH_INFO': urllib.unquote(parsed[2]),
302 'QUERY_STRING': urlencode(data, doseq=True) or parsed[4],
303 'REQUEST_METHOD': 'HEAD',
304 'wsgi.input': FakePayload('')
305 }
306 r.update(extra)
307 return self.request(**r)
308
309 - def options(self, path, data={}, **extra):
310 "Constrict an OPTIONS request"
311
312 parsed = urlparse(path)
313 r = {
314 'PATH_INFO': urllib.unquote(parsed[2]),
315 'QUERY_STRING': urlencode(data, doseq=True) or parsed[4],
316 'REQUEST_METHOD': 'OPTIONS',
317 'wsgi.input': FakePayload('')
318 }
319 r.update(extra)
320 return self.request(**r)
321
324 "Construct a PUT request."
325
326 if content_type is MULTIPART_CONTENT:
327 post_data = encode_multipart(BOUNDARY, data)
328 else:
329 post_data = data
330
331
332
333 query_string = None
334 if not isinstance(data, basestring):
335 query_string = urlencode(data, doseq=True)
336
337 parsed = urlparse(path)
338 r = {
339 'CONTENT_LENGTH': len(post_data),
340 'CONTENT_TYPE': content_type,
341 'PATH_INFO': urllib.unquote(parsed[2]),
342 'QUERY_STRING': query_string or parsed[4],
343 'REQUEST_METHOD': 'PUT',
344 'wsgi.input': FakePayload(post_data),
345 }
346 r.update(extra)
347 return self.request(**r)
348
349 - def delete(self, path, data={}, **extra):
350 "Construct a DELETE request."
351
352 parsed = urlparse(path)
353 r = {
354 'PATH_INFO': urllib.unquote(parsed[2]),
355 'QUERY_STRING': urlencode(data, doseq=True) or parsed[4],
356 'REQUEST_METHOD': 'DELETE',
357 'wsgi.input': FakePayload('')
358 }
359 r.update(extra)
360 return self.request(**r)
361
362
363 -class Client(RequestFactory):
364 """
365 A class that can act as a client for testing purposes.
366
367 It allows the user to compose GET and POST requests, and
368 obtain the response that the server gave to those requests.
369 The server Response objects are annotated with the details
370 of the contexts and templates that were rendered during the
371 process of serving the request.
372
373 Client objects are stateful - they will retain cookie (and
374 thus session) details for the lifetime of the Client instance.
375
376 This is not intended as a replacement for Twill/Selenium or
377 the like - it is here to allow testing against the
378 contexts and templates produced by a view, rather than the
379 HTML rendered to the end-user.
380 """
381 - def __init__(self, enforce_csrf_checks=False, **defaults):
385
387 """
388 Stores exceptions when they are generated by a view.
389 """
390 self.exc_info = sys.exc_info()
391
393 """
394 Obtains the current session variables.
395 """
396 if 'django.contrib.sessions' in settings.INSTALLED_APPS:
397 engine = import_module(settings.SESSION_ENGINE)
398 cookie = self.cookies.get(settings.SESSION_COOKIE_NAME, None)
399 if cookie:
400 return engine.SessionStore(cookie.value)
401 return {}
402 session = property(_session)
403
404
406 """
407 The master request method. Composes the environment dictionary
408 and passes to the handler, returning the result of the handler.
409 Assumes defaults for the query environment, which can be overridden
410 using the arguments to the request.
411 """
412 environ = self._base_environ(**request)
413
414
415
416 data = {}
417 on_template_render = curry(store_rendered_templates, data)
418 signals.template_rendered.connect(on_template_render, dispatch_uid="template-render")
419
420 got_request_exception.connect(self.store_exc_info, dispatch_uid="request-exception")
421 try:
422
423 try:
424 response = self.handler(environ)
425 except TemplateDoesNotExist, e:
426
427
428
429
430
431
432 if e.args != ('500.html',):
433 raise
434
435
436
437
438
439 if self.exc_info:
440 exc_info = self.exc_info
441 self.exc_info = None
442 raise exc_info[1], None, exc_info[2]
443
444
445 response.client = self
446 response.request = request
447
448
449 response.templates = data.get("templates", [])
450 response.context = data.get("context")
451
452
453
454
455 if response.context and len(response.context) == 1:
456 response.context = response.context[0]
457
458
459 def _get_template(self):
460 warnings.warn("response.template is deprecated; use response.templates instead (which is always a list)",
461 PendingDeprecationWarning, stacklevel=2)
462 if not self.templates:
463 return None
464 elif len(self.templates) == 1:
465 return self.templates[0]
466 return self.templates
467 response.__class__.template = property(_get_template)
468
469
470 if response.cookies:
471 self.cookies.update(response.cookies)
472
473 return response
474 finally:
475 signals.template_rendered.disconnect(dispatch_uid="template-render")
476 got_request_exception.disconnect(dispatch_uid="request-exception")
477
478 - def get(self, path, data={}, follow=False, **extra):
486
487 - def post(self, path, data={}, content_type=MULTIPART_CONTENT,
488 follow=False, **extra):
489 """
490 Requests a response from the server using POST.
491 """
492 response = super(Client, self).post(path, data=data, content_type=content_type, **extra)
493 if follow:
494 response = self._handle_redirects(response, **extra)
495 return response
496
497 - def head(self, path, data={}, follow=False, **extra):
505
506 - def options(self, path, data={}, follow=False, **extra):
514
524
525 - def delete(self, path, data={}, follow=False, **extra):
533
534 - def login(self, login, password, server_id=1, secure=True):
535 """
536 Sets the Factory to appear as if it has successfully logged into a site.
537
538 Returns True if login is possible; False if the provided credentials
539 are incorrect, or the user is inactive, or if the sessions framework is
540 not available.
541 """
542 engine = import_module(settings.SESSION_ENGINE)
543
544 params = {
545 'username': login,
546 'password': password,
547 'server':server_id,
548 'ssl':'on'
549 }
550 request = fakeRequest(method="post", path=(reverse(viewname="walogin")), params=params)
551
552 if self.session:
553 request.session = self.session
554 else:
555 request.session = engine.SessionStore()
556
557
558 session_cookie = settings.SESSION_COOKIE_NAME
559 self.cookies[session_cookie] = request.session.session_key
560 cookie_data = {
561 'max-age': None,
562 'path': '/',
563 'domain': settings.SESSION_COOKIE_DOMAIN,
564 'secure': settings.SESSION_COOKIE_SECURE or None,
565 'expires': None,
566 }
567 self.cookies[session_cookie].update(cookie_data)
568
569 connector = Connector(request.REQUEST.get('server'), True)
570 conn = connector.create_connection('TEST.webadmin', login, password)
571
572 if conn is not None and conn.isConnected() and conn.keepAlive():
573 request.session.save()
574 return True
575 else:
576 try:
577 try:
578 conn.seppuku()
579 except:
580 self.fail('Exception during logout.', exc_info=True)
581 finally:
582 request.session.flush()
583 return False
584
586 """
587 Removes the authenticated user's cookies and session object.
588
589 Causes the authenticated user to be logged out.
590 """
591
592 session = import_module(settings.SESSION_ENGINE).SessionStore()
593 session_cookie = self.cookies.get(settings.SESSION_COOKIE_NAME)
594 if session_cookie:
595 session.delete(session_key=session_cookie.value)
596 self.cookies = SimpleCookie()
597
598 from omeroweb.webclient.decorators import login_required
599
600 request = fakeRequest(method="get", path=reverse(viewname="weblogout"))
601 @login_required()
602 def foo(request, conn=None):
603 return conn
604
605 try:
606 conn = foo(request)
607 conn.seppuku()
608 finally:
609 request.session.flush()
610
612 "Follows any redirects by requesting responses from the server using GET."
613
614 response.redirect_chain = []
615 while response.status_code in (301, 302, 303, 307):
616 url = response['Location']
617 scheme, netloc, path, query, fragment = urlsplit(url)
618
619 redirect_chain = response.redirect_chain
620 redirect_chain.append((url, response.status_code))
621
622 if scheme:
623 extra['wsgi.url_scheme'] = scheme
624
625
626
627
628
629 response = self.get(path, QueryDict(query), follow=False, **extra)
630 response.redirect_chain = redirect_chain
631
632
633 if response.redirect_chain[-1] in response.redirect_chain[0:-1]:
634 break
635 return response
636