Package omeroweb :: Package webgateway :: Package tests :: Module test_webgateway
[hide private]
[frames] | no frames]

Source Code for Module omeroweb.webgateway.tests.test_webgateway

  1  #!/usr/bin/env python 
  2  # -*- coding: utf-8 -*- 
  3  # coding=utf-8 
  4   
  5  import time, os, datetime 
  6  import pytest 
  7  import tempfile 
  8   
  9  from webgateway.webgateway_cache import FileCache, WebGatewayCache, WebGatewayTempFile 
 10  from webgateway import views 
 11  import omero 
 12  from omero.gateway.scripts.testdb_create import * 
 13   
 14  from decorators import login_required 
 15   
 16  from django.test.client import Client 
 17  from django.core.handlers.wsgi import WSGIRequest 
 18  from django.conf import settings 
 19  from django.http import QueryDict 
 20   
 21  CLIENT_BASE='test' 
22 23 -def fakeRequest (**kwargs):
24 def bogus_request(self, **request): 25 """ 26 The master request method. Composes the environment dictionary 27 and passes to the handler, returning the result of the handler. 28 Assumes defaults for the query environment, which can be overridden 29 using the arguments to the request. 30 """ 31 environ = { 32 'HTTP_COOKIE': self.cookies, 33 'PATH_INFO': '/', 34 'QUERY_STRING': '', 35 'REQUEST_METHOD': 'GET', 36 'SCRIPT_NAME': '', 37 'SERVER_NAME': 'testserver', 38 'SERVER_PORT': '80', 39 'SERVER_PROTOCOL': 'HTTP/1.1', 40 'HTTP_HOST': 'localhost', 41 'wsgi.version': (1,0), 42 'wsgi.url_scheme': 'http', 43 'wsgi.errors': None,#self.errors, 44 'wsgi.multiprocess': True, 45 'wsgi.multithread': False, 46 'wsgi.run_once': False, 47 'wsgi.input': None, 48 } 49 environ.update(self.defaults) 50 environ.update(request) 51 r = WSGIRequest(environ) 52 if 'django.contrib.sessions' in settings.INSTALLED_APPS: 53 engine = __import__(settings.SESSION_ENGINE, {}, {}, ['']) 54 r.session = engine.SessionStore() 55 qlen = len(r.REQUEST.dicts) 56 def setQuery (**query): 57 r.REQUEST.dicts = r.REQUEST.dicts[:qlen] 58 q = QueryDict('', mutable=True) 59 q.update(query) 60 r.REQUEST.dicts += (q,)
61 r.setQuery = setQuery 62 return r 63 Client.bogus_request = bogus_request 64 c = Client() 65 return c.bogus_request(**kwargs) 66
67 -class TestHelperObjects (object):
68 - def testColorHolder (self):
69 ColorHolder = omero.gateway.ColorHolder 70 c1 = ColorHolder() 71 assert c1._color == {'red': 0, 'green': 0,'blue': 0, 'alpha': 255} 72 c1 = ColorHolder('blue') 73 assert c1.getHtml() == '0000FF' 74 assert c1.getCss() == 'rgba(0,0,255,1.000)' 75 assert c1.getRGB() == (0,0,255) 76 c1.setRed(0xF0) 77 assert c1.getCss() == 'rgba(240,0,255,1.000)' 78 c1.setGreen(0x0F) 79 assert c1.getCss() == 'rgba(240,15,255,1.000)' 80 c1.setBlue(0) 81 assert c1.getCss() == 'rgba(240,15,0,1.000)' 82 c1.setAlpha(0x7F) 83 assert c1.getCss() == 'rgba(240,15,0,0.498)' 84 c1 = ColorHolder.fromRGBA(50,100,200,300) 85 assert c1.getCss() == 'rgba(50,100,200,1.000)'
86
87 - def testOmeroType (self):
88 omero_type = omero.gateway.omero_type 89 assert isinstance(omero_type('rstring'), omero.RString) 90 assert isinstance(omero_type(u'rstring'), omero.RString) 91 assert isinstance(omero_type(1), omero.RInt) 92 assert isinstance(omero_type(1L), omero.RLong) 93 assert not isinstance(omero_type((1,2,'a')), omero.RType)
94
95 - def testSplitHTMLColor (self):
96 splitHTMLColor = omero.gateway.splitHTMLColor 97 assert splitHTMLColor('abc') == [0xAA, 0xBB, 0xCC, 0xFF] 98 assert splitHTMLColor('abcd') == [0xAA, 0xBB, 0xCC, 0xDD] 99 assert splitHTMLColor('abbccd') == [0xAB, 0xBC, 0xCD, 0xFF] 100 assert splitHTMLColor('abbccdde') == [0xAB, 0xBC, 0xCD, 0xDE] 101 assert splitHTMLColor('#$%&%') == None
102
103 104 -def _testCacheFSBlockSize (cache):
105 cache.wipe() 106 c1 = cache._du() 107 cache.set('test/1', 'a') 108 c2 = cache._du() 109 cache.wipe() 110 return c1, c2-c1
111
112 -class TestFileCache(object):
113 @pytest.fixture(autouse=True)
114 - def setUp (self, request):
115 def fin (): 116 os.system('rm -fr test_cache')
117 request.addfinalizer(fin) 118 self.cache = FileCache('test_cache')
119
120 - def testTimeouts (self):
121 assert self.cache.get('date/test/1') == None, 'Key already exists in cache' 122 self.cache.set('date/test/1', '1', timeout=3) 123 assert self.cache.get('date/test/1') == '1', 'Key not properly cached' 124 time.sleep(4) 125 assert self.cache.get('date/test/1') == None, 'Timeout failed' 126 # if _default_timeout is 0, timeouts are simply not checked 127 self.cache.wipe() 128 self.cache._default_timeout = 0 129 assert self.cache.get('date/test/1') == None, 'Key already exists in cache' 130 self.cache.set('date/test/1', '1', timeout=3) 131 assert self.cache.get('date/test/1') == '1', 'Key not properly cached' 132 time.sleep(4) 133 assert self.cache.has_key('date/test/1') 134 assert self.cache.get('date/test/1') == '1', 'Key got timedout and should not'
135
136 - def testMaxSize (self):
137 empty_size, cache_block = _testCacheFSBlockSize(self.cache) 138 self.cache._max_size = empty_size + 4*cache_block + 1 139 # There is an overhead (8 bytes in my system) for timestamp per file, 140 # and the limit is only enforced after we cross over it 141 for i in range(6): 142 self.cache.set('date/test/%d' % i, 'abcdefgh'*127*cache_block) 143 for i in range(4): 144 assert self.cache.get('date/test/%d' % i) == 'abcdefgh'*127*cache_block,\ 145 'Key %d not properly cached' % i 146 assert self.cache.get('date/test/5') == None, 'Size limit failed' 147 self.cache._max_size = 0 148 self.cache.wipe() 149 for i in range(6): 150 self.cache.set('date/test/%d' % i, 'abcdefgh'*127*cache_block) 151 for i in range(6): 152 assert self.cache.get('date/test/%d' % i) == 'abcdefgh'*127*cache_block,\ 153 'Key %d not properly cached' % i
154
155 - def testMaxEntries (self):
156 self.cache._max_entries = 2 157 self.cache.set('date/test/1', '1') 158 self.cache.set('date/test/2', '2') 159 self.cache.set('date/test/3', '3') 160 assert self.cache.get('date/test/1') == '1', 'Key not properly cached' 161 assert self.cache.get('date/test/2') == '2', 'Key not properly cached' 162 assert self.cache.get('date/test/3') == None, 'File number limit failed' 163 self.cache.wipe() 164 self.cache._max_entries = 0 165 self.cache.set('date/test/1', '1') 166 self.cache.set('date/test/2', '2') 167 self.cache.set('date/test/3', '3') 168 assert self.cache.get('date/test/1') == '1', 'Key not properly cached' 169 assert self.cache.get('date/test/2') == '2', 'Key not properly cached' 170 assert self.cache.get('date/test/3') == '3', 'Key not properly cached'
171
172 - def testPurge (self):
173 self.cache._max_entries = 2 174 self.cache._default_timeout = 3 175 self.cache.set('date/test/1', '1') 176 self.cache.set('date/test/2', '2') 177 self.cache.set('date/test/3', '3') 178 assert self.cache.get('date/test/1') == '1', 'Key not properly cached' 179 assert self.cache.get('date/test/2') == '2', 'Key not properly cached' 180 assert self.cache.get('date/test/3') == None, 'File number limit failed' 181 time.sleep(4) 182 self.cache.set('date/test/3', '3') 183 assert self.cache.get('date/test/3') == '3', 'Purge not working'
184
185 - def testOther (self):
186 # set should only accept strings as values 187 pytest.raises(ValueError, self.cache.set, 'date/test/1', 123) 188 # keys can't have .. or start with / 189 pytest.raises(ValueError, self.cache.set, '/date/test/1', '1') 190 pytest.raises(ValueError, self.cache.set, 'date/test/../1', '1') 191 # get some test data in 192 self.cache.set('date/test/1', '1') 193 self.cache.set('date/test/2', '2') 194 self.cache.set('date/test/3', '3') 195 assert self.cache.get('date/test/1') == '1', 'Key not properly cached' 196 assert self.cache.get('date/test/2') == '2', 'Key not properly cached' 197 assert self.cache.get('date/test/3') == '3', 'Key not properly cached' 198 # check has_key 199 assert self.cache.has_key('date/test/1') 200 assert not self.cache.has_key('date/test/bogus') 201 # assert wipe() nukes the whole thing 202 assert self.cache._num_entries == 3 203 self.cache.wipe() 204 assert self.cache._num_entries == 0
205
206 -class TestWebGatewayCacheTempFile(object):
207 @pytest.fixture(autouse=True)
208 - def setUp (self, request):
209 def fin (): 210 os.system('rm -fr test_cache')
211 request.addfinalizer(fin) 212 self.tmpfile = WebGatewayTempFile(tdir='test_cache')
213
214 - def testFilenameSize (self):
215 """ 216 Make sure slashes, dashes, underscores and other chars don't mess things up. 217 Also check for filename size limits. 218 """ 219 fname='1/2_3!"\'#$%&()=@€£‰¶÷[]≠§±+*~^\,.;:' 220 221 try: 222 fpath, rpath, fobj = self.tmpfile.new(fname, key='specialchars') 223 except: 224 raise 225 pytest.fail('WebGatewayTempFile.new not handling special characters properly') 226 # ext2/3/4 limit is 255 bytes, most others are equal to or larger 227 fname = "a"*384 228 try: 229 fpath, rpath, fobj = self.tmpfile.new(fname, key='longname') 230 fobj.close() 231 # is it keeping extensions properly? 232 fpath, rpath, fobj = self.tmpfile.new("1" + fname + '.tif', key='longname') 233 fobj.close() 234 assert fpath[-5:] == 'a.tif' 235 fpath, rpath, fobj = self.tmpfile.new("2" + fname + '.ome.tiff', key='longname') 236 fobj.close() 237 assert fpath[-10:] == 'a.ome.tiff' 238 fpath, rpath, fobj = self.tmpfile.new("3" + fname + 'ome.tiff', key='longname') 239 fobj.close() 240 assert fpath[-6:] == 'a.tiff' 241 fpath, rpath, fobj = self.tmpfile.new("4" + fname + 'somethingverylong.zip', key='longname') 242 fobj.close() 243 assert fpath[-5:] == 'a.zip' 244 fpath, rpath, fobj = self.tmpfile.new("5" + fname + '.tif.somethingverylong', key='longname') 245 fobj.close() 246 assert fpath[-5:] == 'aaaaa' 247 except: 248 pytest.fail('WebGatewayTempFile.new not handling long file names properly')
249
250 251 -class TestWebGatewayCache(object):
252 @pytest.fixture(autouse=True)
253 - def setUp (self, request):
254 def fin (): 255 os.system('rm -fr test_cache')
256 request.addfinalizer(fin) 257 self.wcache = WebGatewayCache(backend=FileCache, basedir='test_cache') 258 class r: 259 def __init__ (self): 260 self.REQUEST = {'c':'1|292:1631$FF0000,2|409:5015$0000FF','m':'c', 'q':'0.9'}
261 def new (self, q): 262 rv = self.__class__() 263 rv.REQUEST.update(q) 264 return rv 265 self.request = r() 266
267 - def testCacheSettings (self):
268 uid = 123 269 #empty_size, cache_block = _testCacheFSBlockSize(self.wcache._thumb_cache) 270 self.wcache._updateCacheSettings(self.wcache._thumb_cache, timeout=2, max_entries=5, max_size=0 ) 271 cachestr = 'abcdefgh'*127 272 self.wcache._thumb_cache.wipe() 273 for i in range(6): 274 self.wcache.setThumb(self.request, 'test', uid, i, cachestr) 275 max_size = self.wcache._thumb_cache._du() 276 self.wcache._updateCacheSettings(self.wcache._thumb_cache, timeout=2, max_entries=5, max_size=max_size ) 277 self.wcache._thumb_cache.wipe() 278 for i in range(6): 279 self.wcache.setThumb(self.request, 'test', uid, i, cachestr) 280 for i in range(4): 281 assert self.wcache.getThumb(self.request, 'test', uid, i) == cachestr,\ 282 'Key %d not properly cached' % i 283 assert self.wcache.getThumb(self.request, 'test', uid, 5) is None, 'Size limit failed' 284 for i in range(10): 285 self.wcache.setThumb(self.request, 'test', uid, i, 'abcdefgh') 286 for i in range(5): 287 assert self.wcache.getThumb(self.request, 'test', uid, i) == 'abcdefgh', 'Key %d not properly cached' % i 288 assert self.wcache.getThumb(self.request, 'test', uid, 5) is None, 'Entries limit failed' 289 time.sleep(2) 290 assert self.wcache.getThumb(self.request, 'test', uid, 0) is None, 'Time limit failed'
291
292 - def testThumbCache (self):
293 uid = 123 294 assert self.wcache.getThumb(self.request, 'test', uid, 1) is None 295 self.wcache.setThumb(self.request, 'test', uid, 1, 'thumbdata') 296 assert self.wcache.getThumb(self.request, 'test', uid, 1) == 'thumbdata',\ 297 'Thumb not properly cached (%s)' % self.wcache.getThumb(self.request, 'test', uid, 1) 298 self.wcache.clearThumb(self.request, 'test', uid, 1) 299 assert self.wcache.getThumb(self.request, 'test', uid, 1) is None 300 # Make sure clear() nukes this 301 self.wcache.setThumb(self.request, 'test', uid, 1, 'thumbdata') 302 assert self.wcache.getThumb(self.request, 'test', uid, 1) == 'thumbdata', 'Thumb not properly cached' 303 assert self.wcache._thumb_cache._num_entries != 0 304 self.wcache.clear() 305 assert self.wcache._thumb_cache._num_entries == 0
306
307 - def testImageCache (self):
308 uid = 123 309 # Also add a thumb, a split channel and a projection, as it should get deleted with image 310 preq = self.request.new({'p':'intmax'}) 311 assert self.wcache.getThumb(self.request, 'test', uid, 1) is None 312 self.wcache.setThumb(self.request, 'test', uid, 1, 'thumbdata') 313 assert self.wcache.getThumb(self.request, 'test', uid, 1) == 'thumbdata' 314 img = omero.gateway.ImageWrapper(None, omero.model.ImageI(1,False)) 315 assert self.wcache.getImage(self.request, 'test', img, 2, 3) is None 316 self.wcache.setImage(self.request, 'test', img, 2, 3, 'imagedata') 317 assert self.wcache.getImage(self.request, 'test', img, 2, 3) == 'imagedata' 318 assert self.wcache.getImage(preq, 'test', img, 2, 3) is None 319 self.wcache.setImage(preq, 'test', img, 2, 3, 'imagedata') 320 assert self.wcache.getImage(preq, 'test', img, 2, 3) == 'imagedata' 321 assert self.wcache.getSplitChannelImage(self.request, 'test', img, 2, 3) is None 322 self.wcache.setSplitChannelImage(self.request, 'test', img, 2, 3, 'imagedata') 323 assert self.wcache.getSplitChannelImage(self.request, 'test', img, 2, 3) == 'imagedata' 324 self.wcache.clearImage(self.request, 'test', uid, img) 325 assert self.wcache.getImage(self.request, 'test', img, 2, 3) is None 326 assert self.wcache.getSplitChannelImage(self.request, 'test', img, 2, 3) is None 327 assert self.wcache.getImage(preq, 'test', img, 2, 3) is None 328 assert self.wcache.getThumb(self.request, 'test', uid, 1) is None 329 # The exact same behaviour, using invalidateObject 330 self.wcache.setThumb(self.request, 'test', uid, 1, 'thumbdata') 331 assert self.wcache.getThumb(self.request, 'test', uid, 1) == 'thumbdata' 332 self.wcache.setImage(self.request, 'test', img, 2, 3, 'imagedata') 333 assert self.wcache.getImage(self.request, 'test', img, 2, 3) == 'imagedata' 334 assert self.wcache.getImage(preq, 'test', img, 2, 3) is None 335 self.wcache.setImage(preq, 'test', img, 2, 3, 'imagedata') 336 assert self.wcache.getImage(preq, 'test', img, 2, 3) == 'imagedata' 337 assert self.wcache.getSplitChannelImage(self.request, 'test', img, 2, 3) is None 338 self.wcache.setSplitChannelImage(self.request, 'test', img, 2, 3, 'imagedata') 339 assert self.wcache.getSplitChannelImage(self.request, 'test', img, 2, 3) == 'imagedata' 340 self.wcache.invalidateObject('test', uid, img) 341 assert self.wcache.getImage(self.request, 'test', img, 2, 3) is None 342 assert self.wcache.getSplitChannelImage(self.request, 'test', img, 2, 3) is None 343 assert self.wcache.getImage(preq, 'test', img, 2, 3) is None 344 assert self.wcache.getThumb(self.request, 'test', uid, 1) is None 345 # Make sure clear() nukes this 346 assert self.wcache.getImage(self.request, 'test', img, 2, 3) is None 347 self.wcache.setImage(self.request, 'test', img, 2, 3, 'imagedata') 348 assert self.wcache.getImage(self.request, 'test', img, 2, 3) == 'imagedata' 349 assert self.wcache._img_cache._num_entries != 0 350 self.wcache.clear() 351 assert self.wcache._img_cache._num_entries == 0
352
353 - def testLocks (self):
354 wcache2 = WebGatewayCache(backend=FileCache, basedir=self.wcache._basedir) 355 #wcache2 will hold the lock 356 assert wcache2.tryLock() 357 assert not self.wcache.tryLock() 358 assert wcache2.tryLock() 359 del wcache2 360 # The lock should have been removed 361 assert self.wcache.tryLock()
362
363 - def testJsonCache (self):
364 uid = 123 365 ds = omero.gateway.DatasetWrapper(None, omero.model.DatasetI(1,False)) 366 assert self.wcache.getDatasetContents(self.request, 'test', ds) is None 367 self.wcache.setDatasetContents(self.request, 'test', ds, 'datasetdata') 368 assert self.wcache.getDatasetContents(self.request, 'test', ds) == 'datasetdata' 369 self.wcache.clearDatasetContents(self.request, 'test', ds) 370 assert self.wcache.getDatasetContents(self.request, 'test', ds) is None 371 # The exact same behaviour, using invalidateObject 372 assert self.wcache.getDatasetContents(self.request, 'test', ds) is None 373 self.wcache.setDatasetContents(self.request, 'test', ds, 'datasetdata') 374 assert self.wcache.getDatasetContents(self.request, 'test', ds) == 'datasetdata' 375 self.wcache.invalidateObject('test', uid, ds) 376 assert self.wcache.getDatasetContents(self.request, 'test', ds) is None 377 # Make sure clear() nukes this 378 assert self.wcache.getDatasetContents(self.request, 'test', ds) is None 379 self.wcache.setDatasetContents(self.request, 'test', ds, 'datasetdata') 380 assert self.wcache.getDatasetContents(self.request, 'test', ds) == 'datasetdata' 381 assert self.wcache._json_cache._num_entries != 0 382 self.wcache.clear() 383 assert self.wcache._json_cache._num_entries == 0
384
385 386 -def testImageDataJson (gatewaywrapper, author_testimg):
387 iid = author_testimg.getId() 388 r = fakeRequest() 389 v = views.imageData_json(r, iid=iid, server_id=1, conn=gatewaywrapper.gateway, _internal=True) 390 assert type(v) == type('') 391 assert '"width": 512' in v 392 assert '"split_channel":' in v 393 assert '"pixel_range": [-32768, 32767]' in v
394
395 -def testListChildrenJson (gatewaywrapper, author_testimg):
396 img = author_testimg 397 did = img.getParent().getId() 398 r = fakeRequest() 399 v = views.listImages_json(r, did=did, server_id=1, conn=gatewaywrapper.gateway, _internal=True) 400 assert type(v) == type('') 401 assert '"id": %d,' % img.getId() in v 402 assert '"tiled: "' not in v 403 r.setQuery(tiled='1') 404 v = views.listImages_json(r, did=did, server_id=1, conn=gatewaywrapper.gateway, _internal=True) 405 assert type(v) == type('') 406 assert '"id": %d,' % img.getId() in v 407 assert '"tiled": false' in v
408
409 -def testUserProxy (gatewaywrapper):
410 gatewaywrapper.loginAsAuthor() 411 user = gatewaywrapper.gateway.getUser() 412 assert user.isAdmin() is False 413 int(user.getId()) 414 assert user.getName() == gatewaywrapper.AUTHOR.name 415 assert user.getFirstName() == gatewaywrapper.AUTHOR.firstname
416