1
2
3
4
5
6
7
8
9 import os
10 import Ice
11 import time
12 import numpy
13 import signal
14 import logging
15 import threading
16 import traceback
17 import subprocess
18 import portalocker
19
20 from path import path
21
22
23 import omero
24 import omero.clients
25 import omero.callbacks
26
27
28 from omero.columns import *
29 from omero.rtypes import *
30 from omero.util.decorators import remoted, locked, perf
31 from omero_ext.functional import wraps
32
33
34 sys = __import__("sys")
35 tables = __import__("tables")
38 """
39 Returns the length of the argument or None
40 if the argument is None
41 """
42 if rv is None:
43 return None
44 return len(rv)
45
47 """
48 Decorator which takes the first argument after "self" and compares
49 that to the last modification time. If the stamp is older, then the
50 method call will throw an omero.OptimisticLockException. Otherwise,
51 execution will complete normally. If update is True, then the
52 last modification time will be updated after the method call if it
53 is successful.
54
55 Note: stamped implies locked
56
57 """
58 def check_and_update_stamp(*args, **kwargs):
59 self = args[0]
60 stamp = args[1]
61 if stamp < self._stamp:
62 raise omero.OptimisticLockException(None, None, "Resource modified by another thread")
63
64 try:
65 return func(*args, **kwargs)
66 finally:
67 if update:
68 self._stamp = time.time()
69 checked_and_update_stamp = wraps(func)(check_and_update_stamp)
70 return locked(check_and_update_stamp)
71
74 """
75 Since two calls to tables.openFile() return non-equal files
76 with equal fileno's, portalocker cannot be used to prevent
77 the creation of two HdfStorage instances from the same
78 Python process.
79 """
80
82 self.logger = logging.getLogger("omero.tables.HdfList")
83 self._lock = threading.RLock()
84 self.__filenos = {}
85 self.__paths = {}
86 self.__locks = {}
87
88 @locked
90
91 if hdfpath in self.__locks:
92 raise omero.LockTimeout(None, None, "Path already in HdfList: %s" % hdfpath)
93
94 parent = path(hdfpath).parent
95 if not parent.exists():
96 raise omero.ApiUsageException(None, None, "Parent directory does not exist: %s" % parent)
97
98 lock = None
99 try:
100 lock = open(hdfpath, "a+")
101 portalocker.lock(lock, portalocker.LOCK_NB|portalocker.LOCK_EX)
102 self.__locks[hdfpath] = lock
103 except portalocker.LockException, le:
104 if lock:
105 lock.close()
106 raise omero.LockTimeout(None, None, "Cannot acquire exclusive lock on: %s" % hdfpath, 0)
107 except:
108 if lock:
109 lock.close()
110 raise
111
112 hdffile = hdfstorage.openfile("a")
113 fileno = hdffile.fileno()
114 if fileno in self.__filenos.keys():
115 hdffile.close()
116 raise omero.LockTimeout(None, None, "File already opened by process: %s" % hdfpath, 0)
117 else:
118 self.__filenos[fileno] = hdfstorage
119 self.__paths[hdfpath] = hdfstorage
120
121 return hdffile
122
123 @locked
125 try:
126 return self.__paths[hdfpath]
127 except KeyError:
128 return HdfStorage(hdfpath)
129
130 @locked
131 - def remove(self, hdfpath, hdffile):
132 del self.__filenos[hdffile.fileno()]
133 del self.__paths[hdfpath]
134 try:
135 if hdfpath in self.__locks:
136 try:
137 lock = self.__locks[hdfpath]
138 lock.close()
139 finally:
140 del self.__locks[hdfpath]
141 except Exception, e:
142 self.logger.warn("Exception on remove(%s)" % hdfpath, exc_info=True)
143
144
145 HDFLIST = HdfList()
148 """
149 Provides HDF-storage for measurement results. At most a single
150 instance will be available for any given physical HDF5 file.
151 """
152
153
155
156 """
157 file_path should be the path to a file in a valid directory where
158 this HDF instance can be stored (Not None or Empty). Once this
159 method is finished, self.__hdf_file is guaranteed to be a PyTables HDF
160 file, but not necessarily initialized.
161 """
162
163 if file_path is None or str(file_path) == "":
164 raise omero.ValidationException(None, None, "Invalid file_path")
165
166 self.logger = logging.getLogger("omero.tables.HdfStorage")
167
168 self.__hdf_path = path(file_path)
169
170
171 self.__hdf_file = HDFLIST.addOrThrow(file_path, self)
172 self.__tables = []
173
174 self._lock = threading.RLock()
175 self._stamp = time.time()
176
177
178 self.__mea = None
179 self.__ome = None
180
181 try:
182 self.__ome = self.__hdf_file.root.OME
183 self.__mea = self.__ome.Measurements
184 self.__types = self.__ome.ColumnTypes[:]
185 self.__descriptions = self.__ome.ColumnDescriptions[:]
186 self.__initialized = True
187 except tables.NoSuchNodeError:
188 self.__initialized = False
189
190
191
192
193
195 return self.__hdf_path.size
196
198 try:
199 if self.__hdf_path.exists() and self.__hdf_path.size == 0:
200 mode = "w"
201 return tables.openFile(self.__hdf_path, mode=mode,\
202 title="OMERO HDF Measurement Storage", rootUEP="/")
203 except (tables.HDF5ExtError, IOError), io:
204 msg = "HDFStorage initialized with bad path: %s" % self.__hdf_path
205 self.logger.error(msg)
206 raise omero.ValidationException(None, None, msg)
207
209 if not self.__initialized:
210 raise omero.ApiUsageException(None, None, "Not yet initialized")
211
213 return len(self.__types)
214
216 return self.__mea.nrows
217
219 if colNumbers is not None:
220 if len(colNumbers) > 0:
221 maxcol = max(colNumbers)
222 totcol = self.__width()
223 if maxcol >= totcol:
224 raise omero.ApiUsageException(None, None, "Column overflow: %s >= %s" % (maxcol, totcol))
225 else:
226 raise omero.ApiUsageException(None, None, "Columns not specified: %s" % colNumbers)
227
228
229 if rowNumbers is not None:
230 if len(rowNumbers) > 0:
231 maxrow = max(rowNumbers)
232 totrow = self.__length()
233 if maxrow >= totrow:
234 raise omero.ApiUsageException(None, None, "Row overflow: %s >= %s" % (maxrow, totrow))
235 else:
236 raise omero.ApiUsageException(None, None, "Rows not specified: %s" % rowNumbers)
237
238
239
240
241
242 @locked
244 """
245
246 """
247 if metadata is None: metadata = {}
248
249 if self.__initialized:
250 raise omero.ValidationException(None, None, "Already initialized.")
251
252 if not cols:
253 raise omero.ApiUsageException(None, None, "No columns provided")
254
255 for c in cols:
256 if not c.name:
257 raise omero.ApiUsageException(None, None, "Column unnamed: %s" % c)
258
259 self.__definition = columns2definition(cols)
260 self.__ome = self.__hdf_file.createGroup("/", "OME")
261 self.__mea = self.__hdf_file.createTable(self.__ome, "Measurements", self.__definition)
262
263 self.__types = [ x.ice_staticId() for x in cols ]
264 self.__descriptions = [ (x.description != None) and x.description or "" for x in cols ]
265 self.__hdf_file.createArray(self.__ome, "ColumnTypes", self.__types)
266 self.__hdf_file.createArray(self.__ome, "ColumnDescriptions", self.__descriptions)
267
268 self.__mea.attrs.version = "v1"
269 self.__mea.attrs.initialized = time.time()
270 if metadata:
271 for k, v in metadata.items():
272 self.__mea.attrs[k] = v
273
274
275 self.__mea.flush()
276 self.__hdf_file.flush()
277 self.__initialized = True
278
279 @locked
280 - def incr(self, table):
281 sz = len(self.__tables)
282 self.logger.info("Size: %s - Attaching %s to %s" % (sz, table, self.__hdf_path))
283 if table in self.__tables:
284 self.logger.warn("Already added")
285 raise omero.ApiUsageException(None, None, "Already added")
286 self.__tables.append(table)
287 return sz + 1
288
289 @locked
290 - def decr(self, table):
291 sz = len(self.__tables)
292 self.logger.info("Size: %s - Detaching %s from %s", sz, table, self.__hdf_path)
293 if not (table in self.__tables):
294 self.logger.warn("Unknown table")
295 raise omero.ApiUsageException(None, None, "Unknown table")
296 self.__tables.remove(table)
297 if sz <= 1:
298 self.cleanup()
299 return sz - 1
300
301 @locked
303 return self._stamp <= stamp
304
305 @locked
309
310 @locked
311 - def cols(self, size, current):
330
331 @locked
351
352 @locked
361
362 @locked
384
385
386
387
388
389 @stamped
390 - def update(self, stamp, data):
397
398 @stamped
399 - def getWhereList(self, stamp, condition, variables, unused, start, stop, step):
400 self.__initcheck()
401 try:
402 return self.__mea.getWhereList(condition, variables, None, start, stop, step).tolist()
403 except (NameError, SyntaxError, TypeError, ValueError), err:
404 aue = omero.ApiUsageException()
405 aue.message = "Bad condition: %s, %s" % (condition, variables)
406 aue.serverStackTrace = "".join(traceback.format_exc())
407 aue.serverExceptionClass = str(err.__class__.__name__)
408 raise aue
409
411 """
412 Constructs a omero.grid.Data object for returning to the client.
413 """
414 data = omero.grid.Data()
415 data.columns = cols
416 data.rowNumbers = rowNumbers
417 data.lastModification = long(self._stamp*1000)
418 return data
419
420 @stamped
428
429 @stamped
430 - def read(self, stamp, colNumbers, start, stop, current):
438
441
452
453 @stamped
454 - def slice(self, stamp, colNumbers, rowNumbers, current):
455 self.__initcheck()
456
457 if colNumbers is None or len(colNumbers) == 0:
458 colNumbers = range(self.__width())
459 if rowNumbers is None or len(rowNumbers) == 0:
460 rowNumbers = range(self.__length())
461
462 self.__sizecheck(colNumbers, rowNumbers)
463 cols = self.cols(None, current)
464 rv = []
465 for i in colNumbers:
466 col = cols[i]
467 col.readCoordinates(self.__mea, rowNumbers)
468 rv.append(col)
469 return self._as_data(rv, rowNumbers)
470
471
472
473
474
477
478 @locked
480 self.logger.info("Cleaning storage: %s", self.__hdf_path)
481 if self.__mea:
482 self.__mea.flush()
483 self.__mea = None
484 if self.__ome:
485 self.__ome = None
486 if self.__hdf_file:
487 HDFLIST.remove(self.__hdf_path, self.__hdf_file)
488 hdffile = self.__hdf_file
489 self.__hdf_file = None
490 hdffile.close()
491
492
493
494
495 -class TableI(omero.grid.Table, omero.util.SimpleServant):
496 """
497 Spreadsheet implementation based on pytables.
498 """
499
500 - def __init__(self, ctx, file_obj, factory, storage, uuid = "unknown", \
501 call_context = None):
502 self.uuid = uuid
503 self.file_obj = file_obj
504 self.factory = factory
505 self.storage = storage
506 self.call_context = call_context
507 self.can_write = factory.getAdminService().canUpdate(file_obj, call_context)
508 omero.util.SimpleServant.__init__(self, ctx)
509
510 self.stamp = time.time()
511 self.storage.incr(self)
512
513 self._closed = False
514
516 """
517 Checks that the current user can write to the given object
518 at the database level. If not, no FS level writes are permitted
519 either.
520
521 ticket:2910
522 """
523 if not self.can_write:
524 raise omero.SecurityViolation("Current user cannot write to file %s" % self.file_obj.id.val)
525
527 """
528 Called periodically to check the resource is alive. Returns
529 False if this resource can be cleaned up. (Resources API)
530 """
531 self.logger.debug("Checking %s" % self)
532 if self._closed:
533 return False
534
535 idname = 'UNKNOWN'
536 try:
537 idname = self.factory.ice_getIdentity().name
538 clientSession = self.ctx.getSession().getSessionService() \
539 .getSession(idname)
540 if clientSession.getClosed():
541 self.logger.debug("Client session closed: %s" % idname)
542 return False
543 return True
544 except Exception:
545 self.logger.debug("Client session not found: %s" % idname)
546 return False
547
549 """
550 Decrements the counter on the held storage to allow it to
551 be cleaned up.
552 """
553 if self.storage:
554 try:
555 self.storage.decr(self)
556 finally:
557 self.storage = None
558
560 return "Table-%s" % self.uuid
561
562 @remoted
563 @perf
564 - def close(self, current = None):
565
566 size = None
567 if self.storage is not None:
568 size = self.storage.size()
569
570 try:
571 self.cleanup()
572 self.logger.info("Closed %s", self)
573 except:
574 self.logger.warn("Closed %s with errors", self)
575
576 self._closed = True
577
578 if self.file_obj is not None and self.can_write:
579 fid = self.file_obj.id.val
580 if not self.file_obj.isLoaded() or\
581 self.file_obj.getDetails() is None or\
582 self.file_obj.details.group is None:
583 self.logger.warn("Cannot update file object %s since group is none", fid)
584 else:
585 gid = self.file_obj.details.group.id.val
586 client_uuid = self.factory.ice_getIdentity().category[8:]
587 ctx = {"omero.group": str(gid), omero.constants.CLIENTUUID: client_uuid}
588 try:
589 rfs = self.factory.createRawFileStore(ctx)
590 try:
591 rfs.setFileId(fid, ctx)
592 if size:
593 rfs.truncate(size, ctx)
594 rfs.write([], size, 0, ctx)
595 else:
596 rfs.write([], 0, 0, ctx)
597 file_obj = rfs.save(ctx)
598 finally:
599 rfs.close(ctx)
600 self.logger.info("Updated file object %s to sha1=%s (%s bytes)",\
601 self.file_obj.id.val, file_obj.sha1.val, file_obj.size.val)
602 except:
603 self.logger.warn("Failed to update file object %s", self.file_obj.id.val, exc_info=1)
604
605
606
607 @remoted
608 @perf
610 msg = "unknown"
611 if self.file_obj:
612 if self.file_obj.id:
613 msg = self.file_obj.id.val
614 self.logger.info("%s.getOriginalFile() => id=%s", self, msg)
615 return self.file_obj
616
617 @remoted
618 @perf
620 rv = self.storage.cols(None, current)
621 self.logger.info("%s.getHeaders() => size=%s", self, slen(rv))
622 return rv
623
624 @remoted
625 @perf
627 rv = self.storage.rows()
628 self.logger.info("%s.getNumberOfRows() => %s", self, rv)
629 return long(rv)
630
631 @remoted
632 @perf
633 - def getWhereList(self, condition, variables, start, stop, step, current = None):
634 variables = unwrap(variables)
635 if stop == 0:
636 stop = None
637 if step == 0:
638 step = None
639 rv = self.storage.getWhereList(self.stamp, condition, variables, None, start, stop, step)
640 self.logger.info("%s.getWhereList(%s, %s, %s, %s, %s) => size=%s", self, condition, variables, start, stop, step, slen(rv))
641 return rv
642
643 @remoted
644 @perf
646 self.logger.info("%s.readCoordinates(size=%s)", self, slen(rowNumbers))
647 try:
648 return self.storage.readCoordinates(self.stamp, rowNumbers, current)
649 except tables.HDF5ExtError, err:
650 aue = omero.ApiUsageException()
651 aue.message = "Error reading coordinates. Most likely out of range"
652 aue.serverStackTrace = "".join(traceback.format_exc())
653 aue.serverExceptionClass = str(err.__class__.__name__)
654 raise aue
655
656 @remoted
657 @perf
658 - def read(self, colNumbers, start, stop, current = None):
659 self.logger.info("%s.read(%s, %s, %s)", self, colNumbers, start, stop)
660 if start == 0L and stop == 0L:
661 stop = None
662 try:
663 return self.storage.read(self.stamp, colNumbers, start, stop, current)
664 except tables.HDF5ExtError, err:
665 aue = omero.ApiUsageException()
666 aue.message = "Error reading coordinates. Most likely out of range"
667 aue.serverStackTrace = "".join(traceback.format_exc())
668 aue.serverExceptionClass = str(err.__class__.__name__)
669 raise aue
670
671 @remoted
672 @perf
673 - def slice(self, colNumbers, rowNumbers, current = None):
674 self.logger.info("%s.slice(size=%s, size=%s)", self, slen(colNumbers), slen(rowNumbers))
675 return self.storage.slice(self.stamp, colNumbers, rowNumbers, current)
676
677
678
679 @remoted
680 @perf
686
687 @remoted
688 @perf
692
693 @remoted
694 @perf
695 - def addData(self, cols, current = None):
701
702 @remoted
703 @perf
704 - def update(self, data, current = None):
709
710 @remoted
711 @perf
712 - def delete(self, current = None):
732
733
734
735
736 @remoted
737 @perf
743
744 @remoted
745 @perf
750
751 @remoted
752 @perf
757
758 @remoted
759 @perf
764
765
766
767 -class TablesI(omero.grid.Tables, omero.util.Servant):
768 """
769 Implementation of the omero.grid.Tables API. Provides
770 spreadsheet like functionality across the OMERO.grid.
771 This servant serves as a session-less, user-less
772 resource for obtaining omero.grid.Table proxies.
773
774 The first major step in initialization is getting
775 a session. This will block until the Blitz server
776 is reachable.
777 """
778
779 - def __init__(self,\
780 ctx,\
781 table_cast = omero.grid.TablePrx.uncheckedCast,\
782 internal_repo_cast = omero.grid.InternalRepositoryPrx.checkedCast):
783
784 omero.util.Servant.__init__(self, ctx, needs_session = True)
785
786
787
788 self._table_cast = table_cast
789 self._internal_repo_cast = internal_repo_cast
790
791 self.__stores = []
792 self._get_dir()
793 self._get_uuid()
794 self._get_repo()
795
797 """
798 Second step in initialization is to find the .omero/repository
799 directory. If this is not created, then a required server has
800 not started, and so this instance will not start.
801 """
802 wait = int(self.communicator.getProperties().getPropertyWithDefault("omero.repo.wait", "1"))
803 self.repo_dir = self.communicator.getProperties().getProperty("omero.repo.dir")
804
805 if not self.repo_dir:
806
807 self.repo_dir = self.ctx.getSession().getConfigService().getConfigValue("omero.data.dir")
808
809 self.repo_cfg = path(self.repo_dir) / ".omero" / "repository"
810 start = time.time()
811 while not self.repo_cfg.exists() and wait < (time.time() - start):
812 self.logger.info("%s doesn't exist; waiting 5 seconds..." % self.repo_cfg)
813 self.stop_event.wait(5)
814 if not self.repo_cfg.exists():
815 msg = "No repository found: %s" % self.repo_cfg
816 self.logger.error(msg)
817 raise omero.ResourceError(None, None, msg)
818
820 """
821 Third step in initialization is to find the database uuid
822 for this grid instance. Multiple OMERO.grids could be watching
823 the same directory.
824 """
825 cfg = self.ctx.getSession().getConfigService()
826 self.db_uuid = cfg.getDatabaseUuid()
827 self.instance = self.repo_cfg / self.db_uuid
828
830 """
831 Fourth step in initialization is to find the repository object
832 for the UUID found in .omero/repository/<db_uuid>, and then
833 create a proxy for the InternalRepository attached to that.
834 """
835
836
837 self.repo_uuid = (self.instance / "repo_uuid").lines()[0].strip()
838 if len(self.repo_uuid) != 38:
839 raise omero.ResourceError("Poorly formed UUID: %s" % self.repo_uuid)
840 self.repo_uuid = self.repo_uuid[2:]
841
842
843 self.repo_obj = self.ctx.getSession().getQueryService().findByQuery("select f from OriginalFile f where sha1 = :uuid",
844 omero.sys.ParametersI().add("uuid", rstring(self.repo_uuid)))
845 self.repo_mgr = self.communicator.stringToProxy("InternalRepository-%s" % self.repo_uuid)
846 self.repo_mgr = self._internal_repo_cast(self.repo_mgr)
847 self.repo_svc = self.repo_mgr.getProxy()
848
849 @remoted
851 """
852 Returns the Repository object for this Tables server.
853 """
854 return self.repo_svc
855
856 @remoted
857 @perf
858 - def getTable(self, file_obj, factory, current = None):
859 """
860 Create and/or register a table servant.
861 """
862
863
864 file_id = None
865 if file_obj is not None and file_obj.id is not None:
866 file_id = file_obj.id.val
867 self.logger.info("getTable: %s %s", file_id, current.ctx)
868
869 file_path = self.repo_mgr.getFilePath(file_obj)
870 p = path(file_path).dirname()
871 if not p.exists():
872 p.makedirs()
873
874 storage = HDFLIST.getOrCreate(file_path)
875 id = Ice.Identity()
876 id.name = Ice.generateUUID()
877 table = TableI(self.ctx, file_obj, factory, storage, uuid = id.name, \
878 call_context=current.ctx)
879 self.resources.add(table)
880
881 prx = current.adapter.add(table, id)
882 return self._table_cast(prx)
883