1
2
3
4
5 import time, os, datetime
6 import pytest
7 import tempfile
8
9 from webgateway.webgateway_cache import FileCache, WebGatewayCache, WebGatewayTempFile
10 from webgateway import views
11 import omero
12 from omero.gateway.scripts.testdb_create import *
13
14 from decorators import login_required
15
16 from django.test.client import Client
17 from django.core.handlers.wsgi import WSGIRequest
18 from django.conf import settings
19 from django.http import QueryDict
20
21 CLIENT_BASE='test'
24 def bogus_request(self, **request):
25 """
26 The master request method. Composes the environment dictionary
27 and passes to the handler, returning the result of the handler.
28 Assumes defaults for the query environment, which can be overridden
29 using the arguments to the request.
30 """
31 environ = {
32 'HTTP_COOKIE': self.cookies,
33 'PATH_INFO': '/',
34 'QUERY_STRING': '',
35 'REQUEST_METHOD': 'GET',
36 'SCRIPT_NAME': '',
37 'SERVER_NAME': 'testserver',
38 'SERVER_PORT': '80',
39 'SERVER_PROTOCOL': 'HTTP/1.1',
40 'HTTP_HOST': 'localhost',
41 'wsgi.version': (1,0),
42 'wsgi.url_scheme': 'http',
43 'wsgi.errors': None,
44 'wsgi.multiprocess': True,
45 'wsgi.multithread': False,
46 'wsgi.run_once': False,
47 'wsgi.input': None,
48 }
49 environ.update(self.defaults)
50 environ.update(request)
51 r = WSGIRequest(environ)
52 if 'django.contrib.sessions' in settings.INSTALLED_APPS:
53 engine = __import__(settings.SESSION_ENGINE, {}, {}, [''])
54 r.session = engine.SessionStore()
55 qlen = len(r.REQUEST.dicts)
56 def setQuery (**query):
57 r.REQUEST.dicts = r.REQUEST.dicts[:qlen]
58 q = QueryDict('', mutable=True)
59 q.update(query)
60 r.REQUEST.dicts += (q,)
61 r.setQuery = setQuery
62 return r
63 Client.bogus_request = bogus_request
64 c = Client()
65 return c.bogus_request(**kwargs)
66
69 ColorHolder = omero.gateway.ColorHolder
70 c1 = ColorHolder()
71 assert c1._color == {'red': 0, 'green': 0,'blue': 0, 'alpha': 255}
72 c1 = ColorHolder('blue')
73 assert c1.getHtml() == '0000FF'
74 assert c1.getCss() == 'rgba(0,0,255,1.000)'
75 assert c1.getRGB() == (0,0,255)
76 c1.setRed(0xF0)
77 assert c1.getCss() == 'rgba(240,0,255,1.000)'
78 c1.setGreen(0x0F)
79 assert c1.getCss() == 'rgba(240,15,255,1.000)'
80 c1.setBlue(0)
81 assert c1.getCss() == 'rgba(240,15,0,1.000)'
82 c1.setAlpha(0x7F)
83 assert c1.getCss() == 'rgba(240,15,0,0.498)'
84 c1 = ColorHolder.fromRGBA(50,100,200,300)
85 assert c1.getCss() == 'rgba(50,100,200,1.000)'
86
94
96 splitHTMLColor = omero.gateway.splitHTMLColor
97 assert splitHTMLColor('abc') == [0xAA, 0xBB, 0xCC, 0xFF]
98 assert splitHTMLColor('abcd') == [0xAA, 0xBB, 0xCC, 0xDD]
99 assert splitHTMLColor('abbccd') == [0xAB, 0xBC, 0xCD, 0xFF]
100 assert splitHTMLColor('abbccdde') == [0xAB, 0xBC, 0xCD, 0xDE]
101 assert splitHTMLColor('#$%&%') == None
102
111
113 @pytest.fixture(autouse=True)
114 - def setUp (self, request):
115 def fin ():
116 os.system('rm -fr test_cache')
117 request.addfinalizer(fin)
118 self.cache = FileCache('test_cache')
119
121 assert self.cache.get('date/test/1') == None, 'Key already exists in cache'
122 self.cache.set('date/test/1', '1', timeout=3)
123 assert self.cache.get('date/test/1') == '1', 'Key not properly cached'
124 time.sleep(4)
125 assert self.cache.get('date/test/1') == None, 'Timeout failed'
126
127 self.cache.wipe()
128 self.cache._default_timeout = 0
129 assert self.cache.get('date/test/1') == None, 'Key already exists in cache'
130 self.cache.set('date/test/1', '1', timeout=3)
131 assert self.cache.get('date/test/1') == '1', 'Key not properly cached'
132 time.sleep(4)
133 assert self.cache.has_key('date/test/1')
134 assert self.cache.get('date/test/1') == '1', 'Key got timedout and should not'
135
137 empty_size, cache_block = _testCacheFSBlockSize(self.cache)
138 self.cache._max_size = empty_size + 4*cache_block + 1
139
140
141 for i in range(6):
142 self.cache.set('date/test/%d' % i, 'abcdefgh'*127*cache_block)
143 for i in range(4):
144 assert self.cache.get('date/test/%d' % i) == 'abcdefgh'*127*cache_block,\
145 'Key %d not properly cached' % i
146 assert self.cache.get('date/test/5') == None, 'Size limit failed'
147 self.cache._max_size = 0
148 self.cache.wipe()
149 for i in range(6):
150 self.cache.set('date/test/%d' % i, 'abcdefgh'*127*cache_block)
151 for i in range(6):
152 assert self.cache.get('date/test/%d' % i) == 'abcdefgh'*127*cache_block,\
153 'Key %d not properly cached' % i
154
156 self.cache._max_entries = 2
157 self.cache.set('date/test/1', '1')
158 self.cache.set('date/test/2', '2')
159 self.cache.set('date/test/3', '3')
160 assert self.cache.get('date/test/1') == '1', 'Key not properly cached'
161 assert self.cache.get('date/test/2') == '2', 'Key not properly cached'
162 assert self.cache.get('date/test/3') == None, 'File number limit failed'
163 self.cache.wipe()
164 self.cache._max_entries = 0
165 self.cache.set('date/test/1', '1')
166 self.cache.set('date/test/2', '2')
167 self.cache.set('date/test/3', '3')
168 assert self.cache.get('date/test/1') == '1', 'Key not properly cached'
169 assert self.cache.get('date/test/2') == '2', 'Key not properly cached'
170 assert self.cache.get('date/test/3') == '3', 'Key not properly cached'
171
173 self.cache._max_entries = 2
174 self.cache._default_timeout = 3
175 self.cache.set('date/test/1', '1')
176 self.cache.set('date/test/2', '2')
177 self.cache.set('date/test/3', '3')
178 assert self.cache.get('date/test/1') == '1', 'Key not properly cached'
179 assert self.cache.get('date/test/2') == '2', 'Key not properly cached'
180 assert self.cache.get('date/test/3') == None, 'File number limit failed'
181 time.sleep(4)
182 self.cache.set('date/test/3', '3')
183 assert self.cache.get('date/test/3') == '3', 'Purge not working'
184
186
187 pytest.raises(ValueError, self.cache.set, 'date/test/1', 123)
188
189 pytest.raises(ValueError, self.cache.set, '/date/test/1', '1')
190 pytest.raises(ValueError, self.cache.set, 'date/test/../1', '1')
191
192 self.cache.set('date/test/1', '1')
193 self.cache.set('date/test/2', '2')
194 self.cache.set('date/test/3', '3')
195 assert self.cache.get('date/test/1') == '1', 'Key not properly cached'
196 assert self.cache.get('date/test/2') == '2', 'Key not properly cached'
197 assert self.cache.get('date/test/3') == '3', 'Key not properly cached'
198
199 assert self.cache.has_key('date/test/1')
200 assert not self.cache.has_key('date/test/bogus')
201
202 assert self.cache._num_entries == 3
203 self.cache.wipe()
204 assert self.cache._num_entries == 0
205
207 @pytest.fixture(autouse=True)
208 - def setUp (self, request):
209 def fin ():
210 os.system('rm -fr test_cache')
211 request.addfinalizer(fin)
212 self.tmpfile = WebGatewayTempFile(tdir='test_cache')
213
215 """
216 Make sure slashes, dashes, underscores and other chars don't mess things up.
217 Also check for filename size limits.
218 """
219 fname='1/2_3!"\'#$%&()=@€£‰¶÷[]≠§±+*~^\,.;:'
220
221 try:
222 fpath, rpath, fobj = self.tmpfile.new(fname, key='specialchars')
223 except:
224 raise
225 pytest.fail('WebGatewayTempFile.new not handling special characters properly')
226
227 fname = "a"*384
228 try:
229 fpath, rpath, fobj = self.tmpfile.new(fname, key='longname')
230 fobj.close()
231
232 fpath, rpath, fobj = self.tmpfile.new("1" + fname + '.tif', key='longname')
233 fobj.close()
234 assert fpath[-5:] == 'a.tif'
235 fpath, rpath, fobj = self.tmpfile.new("2" + fname + '.ome.tiff', key='longname')
236 fobj.close()
237 assert fpath[-10:] == 'a.ome.tiff'
238 fpath, rpath, fobj = self.tmpfile.new("3" + fname + 'ome.tiff', key='longname')
239 fobj.close()
240 assert fpath[-6:] == 'a.tiff'
241 fpath, rpath, fobj = self.tmpfile.new("4" + fname + 'somethingverylong.zip', key='longname')
242 fobj.close()
243 assert fpath[-5:] == 'a.zip'
244 fpath, rpath, fobj = self.tmpfile.new("5" + fname + '.tif.somethingverylong', key='longname')
245 fobj.close()
246 assert fpath[-5:] == 'aaaaa'
247 except:
248 pytest.fail('WebGatewayTempFile.new not handling long file names properly')
249
252 @pytest.fixture(autouse=True)
253 - def setUp (self, request):
254 def fin ():
255 os.system('rm -fr test_cache')
256 request.addfinalizer(fin)
257 self.wcache = WebGatewayCache(backend=FileCache, basedir='test_cache')
258 class r:
259 def __init__ (self):
260 self.REQUEST = {'c':'1|292:1631$FF0000,2|409:5015$0000FF','m':'c', 'q':'0.9'}
261 def new (self, q):
262 rv = self.__class__()
263 rv.REQUEST.update(q)
264 return rv
265 self.request = r()
266
268 uid = 123
269
270 self.wcache._updateCacheSettings(self.wcache._thumb_cache, timeout=2, max_entries=5, max_size=0 )
271 cachestr = 'abcdefgh'*127
272 self.wcache._thumb_cache.wipe()
273 for i in range(6):
274 self.wcache.setThumb(self.request, 'test', uid, i, cachestr)
275 max_size = self.wcache._thumb_cache._du()
276 self.wcache._updateCacheSettings(self.wcache._thumb_cache, timeout=2, max_entries=5, max_size=max_size )
277 self.wcache._thumb_cache.wipe()
278 for i in range(6):
279 self.wcache.setThumb(self.request, 'test', uid, i, cachestr)
280 for i in range(4):
281 assert self.wcache.getThumb(self.request, 'test', uid, i) == cachestr,\
282 'Key %d not properly cached' % i
283 assert self.wcache.getThumb(self.request, 'test', uid, 5) is None, 'Size limit failed'
284 for i in range(10):
285 self.wcache.setThumb(self.request, 'test', uid, i, 'abcdefgh')
286 for i in range(5):
287 assert self.wcache.getThumb(self.request, 'test', uid, i) == 'abcdefgh', 'Key %d not properly cached' % i
288 assert self.wcache.getThumb(self.request, 'test', uid, 5) is None, 'Entries limit failed'
289 time.sleep(2)
290 assert self.wcache.getThumb(self.request, 'test', uid, 0) is None, 'Time limit failed'
291
293 uid = 123
294 assert self.wcache.getThumb(self.request, 'test', uid, 1) is None
295 self.wcache.setThumb(self.request, 'test', uid, 1, 'thumbdata')
296 assert self.wcache.getThumb(self.request, 'test', uid, 1) == 'thumbdata',\
297 'Thumb not properly cached (%s)' % self.wcache.getThumb(self.request, 'test', uid, 1)
298 self.wcache.clearThumb(self.request, 'test', uid, 1)
299 assert self.wcache.getThumb(self.request, 'test', uid, 1) is None
300
301 self.wcache.setThumb(self.request, 'test', uid, 1, 'thumbdata')
302 assert self.wcache.getThumb(self.request, 'test', uid, 1) == 'thumbdata', 'Thumb not properly cached'
303 assert self.wcache._thumb_cache._num_entries != 0
304 self.wcache.clear()
305 assert self.wcache._thumb_cache._num_entries == 0
306
308 uid = 123
309
310 preq = self.request.new({'p':'intmax'})
311 assert self.wcache.getThumb(self.request, 'test', uid, 1) is None
312 self.wcache.setThumb(self.request, 'test', uid, 1, 'thumbdata')
313 assert self.wcache.getThumb(self.request, 'test', uid, 1) == 'thumbdata'
314 img = omero.gateway.ImageWrapper(None, omero.model.ImageI(1,False))
315 assert self.wcache.getImage(self.request, 'test', img, 2, 3) is None
316 self.wcache.setImage(self.request, 'test', img, 2, 3, 'imagedata')
317 assert self.wcache.getImage(self.request, 'test', img, 2, 3) == 'imagedata'
318 assert self.wcache.getImage(preq, 'test', img, 2, 3) is None
319 self.wcache.setImage(preq, 'test', img, 2, 3, 'imagedata')
320 assert self.wcache.getImage(preq, 'test', img, 2, 3) == 'imagedata'
321 assert self.wcache.getSplitChannelImage(self.request, 'test', img, 2, 3) is None
322 self.wcache.setSplitChannelImage(self.request, 'test', img, 2, 3, 'imagedata')
323 assert self.wcache.getSplitChannelImage(self.request, 'test', img, 2, 3) == 'imagedata'
324 self.wcache.clearImage(self.request, 'test', uid, img)
325 assert self.wcache.getImage(self.request, 'test', img, 2, 3) is None
326 assert self.wcache.getSplitChannelImage(self.request, 'test', img, 2, 3) is None
327 assert self.wcache.getImage(preq, 'test', img, 2, 3) is None
328 assert self.wcache.getThumb(self.request, 'test', uid, 1) is None
329
330 self.wcache.setThumb(self.request, 'test', uid, 1, 'thumbdata')
331 assert self.wcache.getThumb(self.request, 'test', uid, 1) == 'thumbdata'
332 self.wcache.setImage(self.request, 'test', img, 2, 3, 'imagedata')
333 assert self.wcache.getImage(self.request, 'test', img, 2, 3) == 'imagedata'
334 assert self.wcache.getImage(preq, 'test', img, 2, 3) is None
335 self.wcache.setImage(preq, 'test', img, 2, 3, 'imagedata')
336 assert self.wcache.getImage(preq, 'test', img, 2, 3) == 'imagedata'
337 assert self.wcache.getSplitChannelImage(self.request, 'test', img, 2, 3) is None
338 self.wcache.setSplitChannelImage(self.request, 'test', img, 2, 3, 'imagedata')
339 assert self.wcache.getSplitChannelImage(self.request, 'test', img, 2, 3) == 'imagedata'
340 self.wcache.invalidateObject('test', uid, img)
341 assert self.wcache.getImage(self.request, 'test', img, 2, 3) is None
342 assert self.wcache.getSplitChannelImage(self.request, 'test', img, 2, 3) is None
343 assert self.wcache.getImage(preq, 'test', img, 2, 3) is None
344 assert self.wcache.getThumb(self.request, 'test', uid, 1) is None
345
346 assert self.wcache.getImage(self.request, 'test', img, 2, 3) is None
347 self.wcache.setImage(self.request, 'test', img, 2, 3, 'imagedata')
348 assert self.wcache.getImage(self.request, 'test', img, 2, 3) == 'imagedata'
349 assert self.wcache._img_cache._num_entries != 0
350 self.wcache.clear()
351 assert self.wcache._img_cache._num_entries == 0
352
362
364 uid = 123
365 ds = omero.gateway.DatasetWrapper(None, omero.model.DatasetI(1,False))
366 assert self.wcache.getDatasetContents(self.request, 'test', ds) is None
367 self.wcache.setDatasetContents(self.request, 'test', ds, 'datasetdata')
368 assert self.wcache.getDatasetContents(self.request, 'test', ds) == 'datasetdata'
369 self.wcache.clearDatasetContents(self.request, 'test', ds)
370 assert self.wcache.getDatasetContents(self.request, 'test', ds) is None
371
372 assert self.wcache.getDatasetContents(self.request, 'test', ds) is None
373 self.wcache.setDatasetContents(self.request, 'test', ds, 'datasetdata')
374 assert self.wcache.getDatasetContents(self.request, 'test', ds) == 'datasetdata'
375 self.wcache.invalidateObject('test', uid, ds)
376 assert self.wcache.getDatasetContents(self.request, 'test', ds) is None
377
378 assert self.wcache.getDatasetContents(self.request, 'test', ds) is None
379 self.wcache.setDatasetContents(self.request, 'test', ds, 'datasetdata')
380 assert self.wcache.getDatasetContents(self.request, 'test', ds) == 'datasetdata'
381 assert self.wcache._json_cache._num_entries != 0
382 self.wcache.clear()
383 assert self.wcache._json_cache._num_entries == 0
384
394
396 img = author_testimg
397 did = img.getParent().getId()
398 r = fakeRequest()
399 v = views.listImages_json(r, did=did, server_id=1, conn=gatewaywrapper.gateway, _internal=True)
400 assert type(v) == type('')
401 assert '"id": %d,' % img.getId() in v
402 assert '"tiled: "' not in v
403 r.setQuery(tiled='1')
404 v = views.listImages_json(r, did=did, server_id=1, conn=gatewaywrapper.gateway, _internal=True)
405 assert type(v) == type('')
406 assert '"id": %d,' % img.getId() in v
407 assert '"tiled": false' in v
408
416