Package omero :: Module tables
[hide private]
[frames] | no frames]

Source Code for Module omero.tables

  1  #!/usr/bin/env python 
  2  # -*- coding: utf-8 -*- 
  3  # 
  4  # OMERO Tables Interface 
  5  # Copyright 2009 Glencoe Software, Inc.  All Rights Reserved. 
  6  # Use is subject to license terms supplied in LICENSE.txt 
  7  # 
  8   
  9  import os 
 10  import Ice 
 11  import time 
 12  import numpy 
 13  import signal 
 14  import logging 
 15  import threading 
 16  import traceback 
 17  import subprocess 
 18  import portalocker # Third-party 
 19   
 20  from path import path 
 21   
 22   
 23  import omero # Do we need both?? 
 24  import omero.clients 
 25  import omero.callbacks 
 26   
 27  # For ease of use 
 28  from omero.columns import * 
 29  from omero.rtypes import * 
 30  from omero.util.decorators import remoted, locked, perf 
 31  from omero_ext.functional import wraps 
 32   
 33   
 34  sys = __import__("sys") # Python sys 
 35  tables = __import__("tables") # Pytables 
36 37 -def slen(rv):
38 """ 39 Returns the length of the argument or None 40 if the argument is None 41 """ 42 if rv is None: 43 return None 44 return len(rv)
45
46 -def stamped(func, update = False):
47 """ 48 Decorator which takes the first argument after "self" and compares 49 that to the last modification time. If the stamp is older, then the 50 method call will throw an omero.OptimisticLockException. Otherwise, 51 execution will complete normally. If update is True, then the 52 last modification time will be updated after the method call if it 53 is successful. 54 55 Note: stamped implies locked 56 57 """ 58 def check_and_update_stamp(*args, **kwargs): 59 self = args[0] 60 stamp = args[1] 61 if stamp < self._stamp: 62 raise omero.OptimisticLockException(None, None, "Resource modified by another thread") 63 64 try: 65 return func(*args, **kwargs) 66 finally: 67 if update: 68 self._stamp = time.time()
69 checked_and_update_stamp = wraps(func)(check_and_update_stamp) 70 return locked(check_and_update_stamp) 71
72 73 -class HdfList(object):
74 """ 75 Since two calls to tables.openFile() return non-equal files 76 with equal fileno's, portalocker cannot be used to prevent 77 the creation of two HdfStorage instances from the same 78 Python process. 79 """ 80
81 - def __init__(self):
82 self.logger = logging.getLogger("omero.tables.HdfList") 83 self._lock = threading.RLock() 84 self.__filenos = {} 85 self.__paths = {} 86 self.__locks = {}
87 88 @locked
89 - def addOrThrow(self, hdfpath, hdfstorage):
90 91 if hdfpath in self.__locks: 92 raise omero.LockTimeout(None, None, "Path already in HdfList: %s" % hdfpath) 93 94 parent = path(hdfpath).parent 95 if not parent.exists(): 96 raise omero.ApiUsageException(None, None, "Parent directory does not exist: %s" % parent) 97 98 lock = None 99 try: 100 lock = open(hdfpath, "a+") 101 portalocker.lock(lock, portalocker.LOCK_NB|portalocker.LOCK_EX) 102 self.__locks[hdfpath] = lock 103 except portalocker.LockException, le: 104 if lock: 105 lock.close() 106 raise omero.LockTimeout(None, None, "Cannot acquire exclusive lock on: %s" % hdfpath, 0) 107 except: 108 if lock: 109 lock.close() 110 raise 111 112 hdffile = hdfstorage.openfile("a") 113 fileno = hdffile.fileno() 114 if fileno in self.__filenos.keys(): 115 hdffile.close() 116 raise omero.LockTimeout(None, None, "File already opened by process: %s" % hdfpath, 0) 117 else: 118 self.__filenos[fileno] = hdfstorage 119 self.__paths[hdfpath] = hdfstorage 120 121 return hdffile
122 123 @locked
124 - def getOrCreate(self, hdfpath):
125 try: 126 return self.__paths[hdfpath] 127 except KeyError: 128 return HdfStorage(hdfpath) # Adds itself.
129 130 @locked
131 - def remove(self, hdfpath, hdffile):
132 del self.__filenos[hdffile.fileno()] 133 del self.__paths[hdfpath] 134 try: 135 if hdfpath in self.__locks: 136 try: 137 lock = self.__locks[hdfpath] 138 lock.close() 139 finally: 140 del self.__locks[hdfpath] 141 except Exception, e: 142 self.logger.warn("Exception on remove(%s)" % hdfpath, exc_info=True)
143 144 # Global object for maintaining files 145 HDFLIST = HdfList()
146 147 -class HdfStorage(object):
148 """ 149 Provides HDF-storage for measurement results. At most a single 150 instance will be available for any given physical HDF5 file. 151 """ 152 153
154 - def __init__(self, file_path):
155 156 """ 157 file_path should be the path to a file in a valid directory where 158 this HDF instance can be stored (Not None or Empty). Once this 159 method is finished, self.__hdf_file is guaranteed to be a PyTables HDF 160 file, but not necessarily initialized. 161 """ 162 163 if file_path is None or str(file_path) == "": 164 raise omero.ValidationException(None, None, "Invalid file_path") 165 166 self.logger = logging.getLogger("omero.tables.HdfStorage") 167 168 self.__hdf_path = path(file_path) 169 # Locking first as described at: 170 # http://www.pytables.org/trac/ticket/185 171 self.__hdf_file = HDFLIST.addOrThrow(file_path, self) 172 self.__tables = [] 173 174 self._lock = threading.RLock() 175 self._stamp = time.time() 176 177 # These are what we'd like to have 178 self.__mea = None 179 self.__ome = None 180 181 try: 182 self.__ome = self.__hdf_file.root.OME 183 self.__mea = self.__ome.Measurements 184 self.__types = self.__ome.ColumnTypes[:] 185 self.__descriptions = self.__ome.ColumnDescriptions[:] 186 self.__initialized = True 187 except tables.NoSuchNodeError: 188 self.__initialized = False
189 190 # 191 # Non-locked methods 192 # 193
194 - def size(self):
195 return self.__hdf_path.size
196
197 - def openfile(self, mode):
198 try: 199 if self.__hdf_path.exists() and self.__hdf_path.size == 0: 200 mode = "w" 201 return tables.openFile(self.__hdf_path, mode=mode,\ 202 title="OMERO HDF Measurement Storage", rootUEP="/") 203 except (tables.HDF5ExtError, IOError), io: 204 msg = "HDFStorage initialized with bad path: %s" % self.__hdf_path 205 self.logger.error(msg) 206 raise omero.ValidationException(None, None, msg)
207
208 - def __initcheck(self):
209 if not self.__initialized: 210 raise omero.ApiUsageException(None, None, "Not yet initialized")
211
212 - def __width(self):
213 return len(self.__types)
214
215 - def __length(self):
216 return self.__mea.nrows
217
218 - def __sizecheck(self, colNumbers, rowNumbers):
219 if colNumbers is not None: 220 if len(colNumbers) > 0: 221 maxcol = max(colNumbers) 222 totcol = self.__width() 223 if maxcol >= totcol: 224 raise omero.ApiUsageException(None, None, "Column overflow: %s >= %s" % (maxcol, totcol)) 225 else: 226 raise omero.ApiUsageException(None, None, "Columns not specified: %s" % colNumbers) 227 228 229 if rowNumbers is not None: 230 if len(rowNumbers) > 0: 231 maxrow = max(rowNumbers) 232 totrow = self.__length() 233 if maxrow >= totrow: 234 raise omero.ApiUsageException(None, None, "Row overflow: %s >= %s" % (maxrow, totrow)) 235 else: 236 raise omero.ApiUsageException(None, None, "Rows not specified: %s" % rowNumbers)
237 238 # 239 # Locked methods 240 # 241 242 @locked
243 - def initialize(self, cols, metadata = None):
244 """ 245 246 """ 247 if metadata is None: metadata = {} 248 249 if self.__initialized: 250 raise omero.ValidationException(None, None, "Already initialized.") 251 252 if not cols: 253 raise omero.ApiUsageException(None, None, "No columns provided") 254 255 for c in cols: 256 if not c.name: 257 raise omero.ApiUsageException(None, None, "Column unnamed: %s" % c) 258 259 self.__definition = columns2definition(cols) 260 self.__ome = self.__hdf_file.createGroup("/", "OME") 261 self.__mea = self.__hdf_file.createTable(self.__ome, "Measurements", self.__definition) 262 263 self.__types = [ x.ice_staticId() for x in cols ] 264 self.__descriptions = [ (x.description != None) and x.description or "" for x in cols ] 265 self.__hdf_file.createArray(self.__ome, "ColumnTypes", self.__types) 266 self.__hdf_file.createArray(self.__ome, "ColumnDescriptions", self.__descriptions) 267 268 self.__mea.attrs.version = "v1" 269 self.__mea.attrs.initialized = time.time() 270 if metadata: 271 for k, v in metadata.items(): 272 self.__mea.attrs[k] = v 273 # See attrs._f_list("user") to retrieve these. 274 275 self.__mea.flush() 276 self.__hdf_file.flush() 277 self.__initialized = True
278 279 @locked
280 - def incr(self, table):
281 sz = len(self.__tables) 282 self.logger.info("Size: %s - Attaching %s to %s" % (sz, table, self.__hdf_path)) 283 if table in self.__tables: 284 self.logger.warn("Already added") 285 raise omero.ApiUsageException(None, None, "Already added") 286 self.__tables.append(table) 287 return sz + 1
288 289 @locked
290 - def decr(self, table):
291 sz = len(self.__tables) 292 self.logger.info("Size: %s - Detaching %s from %s", sz, table, self.__hdf_path) 293 if not (table in self.__tables): 294 self.logger.warn("Unknown table") 295 raise omero.ApiUsageException(None, None, "Unknown table") 296 self.__tables.remove(table) 297 if sz <= 1: 298 self.cleanup() 299 return sz - 1
300 301 @locked
302 - def uptodate(self, stamp):
303 return self._stamp <= stamp
304 305 @locked
306 - def rows(self):
307 self.__initcheck() 308 return self.__mea.nrows
309 310 @locked
311 - def cols(self, size, current):
312 self.__initcheck() 313 ic = current.adapter.getCommunicator() 314 types = self.__types 315 names = self.__mea.colnames 316 cols = [] 317 for i in range(len(types)): 318 t = types[i] 319 n = names[i] 320 try: 321 col = ic.findObjectFactory(t).create(t) 322 col.name = n 323 col.setsize(size) 324 col.settable(self.__mea) 325 cols.append(col) 326 except: 327 msg = traceback.format_exc() 328 raise omero.ValidationException(None, msg, "BAD COLUMN TYPE: %s for %s" % (t,n)) 329 return cols
330 331 @locked
332 - def get_meta_map(self):
333 self.__initcheck() 334 metadata = {} 335 attr = self.__mea.attrs 336 keys = list(self.__mea.attrs._v_attrnamesuser) 337 for key in keys: 338 val = attr[key] 339 if type(val) == numpy.float64: 340 val = rfloat(val) 341 elif type(val) == numpy.int32: 342 val = rint(val) 343 elif type(val) == numpy.int64: 344 val = rlong(val) 345 elif type(val) == numpy.string_: 346 val = rstring(val) 347 else: 348 raise omero.ValidationException("BAD TYPE: %s" % type(val)) 349 metadata[key] = val 350 return metadata
351 352 @locked
353 - def add_meta_map(self, m):
354 if not m: 355 return 356 self.__initcheck() 357 attr = self.__mea.attrs 358 for k, v in m.items(): 359 attr[k] = unwrap(v) 360 self.__mea.flush()
361 362 @locked
363 - def append(self, cols):
364 self.__initcheck() 365 # Optimize! 366 arrays = [] 367 dtypes = [] 368 sz = None 369 for col in cols: 370 if sz is None: 371 sz = col.getsize() 372 else: 373 if sz != col.getsize(): 374 raise omero.ValidationException("Columns are of differing length") 375 arrays.extend(col.arrays()) 376 dtypes.extend(col.dtypes()) 377 col.append(self.__mea) # Potential corruption !!! 378 379 # Convert column-wise data to row-wise records 380 records = numpy.array(zip(*arrays), dtype=dtypes) 381 382 self.__mea.append(records) 383 self.__mea.flush()
384 385 # 386 # Stamped methods 387 # 388 389 @stamped
390 - def update(self, stamp, data):
391 self.__initcheck() 392 if data: 393 for i, rn in enumerate(data.rowNumbers): 394 for col in data.columns: 395 getattr(self.__mea.cols, col.name)[rn] = col.values[i] 396 self.__mea.flush()
397 398 @stamped
399 - def getWhereList(self, stamp, condition, variables, unused, start, stop, step):
400 self.__initcheck() 401 try: 402 return self.__mea.getWhereList(condition, variables, None, start, stop, step).tolist() 403 except (NameError, SyntaxError, TypeError, ValueError), err: 404 aue = omero.ApiUsageException() 405 aue.message = "Bad condition: %s, %s" % (condition, variables) 406 aue.serverStackTrace = "".join(traceback.format_exc()) 407 aue.serverExceptionClass = str(err.__class__.__name__) 408 raise aue
409
410 - def _as_data(self, cols, rowNumbers):
411 """ 412 Constructs a omero.grid.Data object for returning to the client. 413 """ 414 data = omero.grid.Data() 415 data.columns = cols 416 data.rowNumbers = rowNumbers 417 data.lastModification = long(self._stamp*1000) # Convert to millis since epoch 418 return data
419 420 @stamped
421 - def readCoordinates(self, stamp, rowNumbers, current):
422 self.__initcheck() 423 self.__sizecheck(None, rowNumbers) 424 cols = self.cols(None, current) 425 for col in cols: 426 col.readCoordinates(self.__mea, rowNumbers) 427 return self._as_data(cols, rowNumbers)
428 429 @stamped
430 - def read(self, stamp, colNumbers, start, stop, current):
431 self.__initcheck() 432 self.__sizecheck(colNumbers, None) 433 cols = self.cols(None, current) 434 435 rows = self._getrows(start, stop) 436 rv, l = self._rowstocols(rows, colNumbers, cols) 437 return self._as_data(rv, range(start, start+l))
438
439 - def _getrows(self, start, stop):
440 return self.__mea.read(start, stop)
441
442 - def _rowstocols(self, rows, colNumbers, cols):
443 l = 0 444 rv = [] 445 for i in colNumbers: 446 col = cols[i] 447 col.fromrows(rows) 448 rv.append(col) 449 if not l: 450 l = len(col.values) 451 return rv, l
452 453 @stamped
454 - def slice(self, stamp, colNumbers, rowNumbers, current):
455 self.__initcheck() 456 457 if colNumbers is None or len(colNumbers) == 0: 458 colNumbers = range(self.__width()) 459 if rowNumbers is None or len(rowNumbers) == 0: 460 rowNumbers = range(self.__length()) 461 462 self.__sizecheck(colNumbers, rowNumbers) 463 cols = self.cols(None, current) 464 rv = [] 465 for i in colNumbers: 466 col = cols[i] 467 col.readCoordinates(self.__mea, rowNumbers) 468 rv.append(col) 469 return self._as_data(rv, rowNumbers)
470 471 # 472 # Lifecycle methods 473 # 474
475 - def check(self):
476 return True
477 478 @locked
479 - def cleanup(self):
480 self.logger.info("Cleaning storage: %s", self.__hdf_path) 481 if self.__mea: 482 self.__mea.flush() 483 self.__mea = None 484 if self.__ome: 485 self.__ome = None 486 if self.__hdf_file: 487 HDFLIST.remove(self.__hdf_path, self.__hdf_file) 488 hdffile = self.__hdf_file 489 self.__hdf_file = None 490 hdffile.close() # Resources freed
491
492 # End class HdfStorage 493 494 495 -class TableI(omero.grid.Table, omero.util.SimpleServant):
496 """ 497 Spreadsheet implementation based on pytables. 498 """ 499
500 - def __init__(self, ctx, file_obj, factory, storage, uuid = "unknown", \ 501 call_context = None):
502 self.uuid = uuid 503 self.file_obj = file_obj 504 self.factory = factory 505 self.storage = storage 506 self.call_context = call_context 507 self.can_write = factory.getAdminService().canUpdate(file_obj, call_context) 508 omero.util.SimpleServant.__init__(self, ctx) 509 510 self.stamp = time.time() 511 self.storage.incr(self) 512 513 self._closed = False
514
515 - def assert_write(self):
516 """ 517 Checks that the current user can write to the given object 518 at the database level. If not, no FS level writes are permitted 519 either. 520 521 ticket:2910 522 """ 523 if not self.can_write: 524 raise omero.SecurityViolation("Current user cannot write to file %s" % self.file_obj.id.val)
525
526 - def check(self):
527 """ 528 Called periodically to check the resource is alive. Returns 529 False if this resource can be cleaned up. (Resources API) 530 """ 531 self.logger.debug("Checking %s" % self) 532 if self._closed: 533 return False 534 535 idname = 'UNKNOWN' 536 try: 537 idname = self.factory.ice_getIdentity().name 538 clientSession = self.ctx.getSession().getSessionService() \ 539 .getSession(idname) 540 if clientSession.getClosed(): 541 self.logger.debug("Client session closed: %s" % idname) 542 return False 543 return True 544 except Exception: 545 self.logger.debug("Client session not found: %s" % idname) 546 return False
547
548 - def cleanup(self):
549 """ 550 Decrements the counter on the held storage to allow it to 551 be cleaned up. 552 """ 553 if self.storage: 554 try: 555 self.storage.decr(self) 556 finally: 557 self.storage = None
558
559 - def __str__(self):
560 return "Table-%s" % self.uuid
561 562 @remoted 563 @perf
564 - def close(self, current = None):
565 566 size = None 567 if self.storage is not None: 568 size = self.storage.size() # Size to reset the server object to 569 570 try: 571 self.cleanup() 572 self.logger.info("Closed %s", self) 573 except: 574 self.logger.warn("Closed %s with errors", self) 575 576 self._closed = True 577 578 if self.file_obj is not None and self.can_write: 579 fid = self.file_obj.id.val 580 if not self.file_obj.isLoaded() or\ 581 self.file_obj.getDetails() is None or\ 582 self.file_obj.details.group is None: 583 self.logger.warn("Cannot update file object %s since group is none", fid) 584 else: 585 gid = self.file_obj.details.group.id.val 586 client_uuid = self.factory.ice_getIdentity().category[8:] 587 ctx = {"omero.group": str(gid), omero.constants.CLIENTUUID: client_uuid} 588 try: 589 rfs = self.factory.createRawFileStore(ctx) 590 try: 591 rfs.setFileId(fid, ctx) 592 if size: 593 rfs.truncate(size, ctx) # May do nothing 594 rfs.write([], size, 0, ctx) # Force an update 595 else: 596 rfs.write([], 0, 0, ctx) # No-op 597 file_obj = rfs.save(ctx) 598 finally: 599 rfs.close(ctx) 600 self.logger.info("Updated file object %s to sha1=%s (%s bytes)",\ 601 self.file_obj.id.val, file_obj.sha1.val, file_obj.size.val) 602 except: 603 self.logger.warn("Failed to update file object %s", self.file_obj.id.val, exc_info=1)
604 605 # TABLES READ API ============================ 606 607 @remoted 608 @perf
609 - def getOriginalFile(self, current = None):
610 msg = "unknown" 611 if self.file_obj: 612 if self.file_obj.id: 613 msg = self.file_obj.id.val 614 self.logger.info("%s.getOriginalFile() => id=%s", self, msg) 615 return self.file_obj
616 617 @remoted 618 @perf
619 - def getHeaders(self, current = None):
620 rv = self.storage.cols(None, current) 621 self.logger.info("%s.getHeaders() => size=%s", self, slen(rv)) 622 return rv
623 624 @remoted 625 @perf
626 - def getNumberOfRows(self, current = None):
627 rv = self.storage.rows() 628 self.logger.info("%s.getNumberOfRows() => %s", self, rv) 629 return long(rv)
630 631 @remoted 632 @perf
633 - def getWhereList(self, condition, variables, start, stop, step, current = None):
634 variables = unwrap(variables) 635 if stop == 0: 636 stop = None 637 if step == 0: 638 step = None 639 rv = self.storage.getWhereList(self.stamp, condition, variables, None, start, stop, step) 640 self.logger.info("%s.getWhereList(%s, %s, %s, %s, %s) => size=%s", self, condition, variables, start, stop, step, slen(rv)) 641 return rv
642 643 @remoted 644 @perf
645 - def readCoordinates(self, rowNumbers, current = None):
646 self.logger.info("%s.readCoordinates(size=%s)", self, slen(rowNumbers)) 647 try: 648 return self.storage.readCoordinates(self.stamp, rowNumbers, current) 649 except tables.HDF5ExtError, err: 650 aue = omero.ApiUsageException() 651 aue.message = "Error reading coordinates. Most likely out of range" 652 aue.serverStackTrace = "".join(traceback.format_exc()) 653 aue.serverExceptionClass = str(err.__class__.__name__) 654 raise aue
655 656 @remoted 657 @perf
658 - def read(self, colNumbers, start, stop, current = None):
659 self.logger.info("%s.read(%s, %s, %s)", self, colNumbers, start, stop) 660 if start == 0L and stop == 0L: 661 stop = None 662 try: 663 return self.storage.read(self.stamp, colNumbers, start, stop, current) 664 except tables.HDF5ExtError, err: 665 aue = omero.ApiUsageException() 666 aue.message = "Error reading coordinates. Most likely out of range" 667 aue.serverStackTrace = "".join(traceback.format_exc()) 668 aue.serverExceptionClass = str(err.__class__.__name__) 669 raise aue
670 671 @remoted 672 @perf
673 - def slice(self, colNumbers, rowNumbers, current = None):
674 self.logger.info("%s.slice(size=%s, size=%s)", self, slen(colNumbers), slen(rowNumbers)) 675 return self.storage.slice(self.stamp, colNumbers, rowNumbers, current)
676 677 # TABLES WRITE API =========================== 678 679 @remoted 680 @perf
681 - def initialize(self, cols, current = None):
682 self.assert_write() 683 self.storage.initialize(cols) 684 if cols: 685 self.logger.info("Initialized %s with %s col(s)", self, slen(cols))
686 687 @remoted 688 @perf
689 - def addColumn(self, col, current = None):
690 self.assert_write() 691 raise omero.ApiUsageException(None, None, "NYI")
692 693 @remoted 694 @perf
695 - def addData(self, cols, current = None):
696 self.assert_write() 697 self.storage.append(cols) 698 sz = 0 699 if cols and cols[0] and cols[0].getsize(): 700 self.logger.info("Added %s row(s) of data to %s", cols[0].getsize(), self)
701 702 @remoted 703 @perf
704 - def update(self, data, current = None):
705 self.assert_write() 706 if data: 707 self.storage.update(self.stamp, data) 708 self.logger.info("Updated %s row(s) of data to %s", slen(data.rowNumbers), self)
709 710 @remoted 711 @perf
712 - def delete(self, current = None):
713 self.assert_write() 714 self.close() 715 prx = self.factory.getDeleteService() 716 dc = omero.api.delete.DeleteCommand("/OriginalFile", self.file_obj.id.val, None) 717 handle = prx.queueDelete([dc]) 718 self.file_obj = None 719 # TODO: possible just return handle? 720 cb = omero.callbacks.DeleteCallbackI(current.adapter, handle) 721 count = 10 722 while count: 723 count -= 1 724 rv = cb.block(500) 725 if rv is not None: 726 report = handle.report()[0] 727 if rv > 0: 728 raise omero.InternalException(None, None, report.error) 729 else: 730 return 731 raise omero.InternalException(None, None, "delete timed-out")
732 733 734 # TABLES METADATA API =========================== 735 736 @remoted 737 @perf
738 - def getMetadata(self, key, current = None):
739 rv = self.storage.get_meta_map() 740 rv = rv.get(key) 741 self.logger.info("%s.getMetadata() => %s", self, unwrap(rv)) 742 return rv
743 744 @remoted 745 @perf
746 - def getAllMetadata(self, current = None):
747 rv = self.storage.get_meta_map() 748 self.logger.info("%s.getMetadata() => size=%s", self, slen(rv)) 749 return rv
750 751 @remoted 752 @perf
753 - def setMetadata(self, key, value, current = None):
754 self.assert_write() 755 self.storage.add_meta_map({key: value}) 756 self.logger.info("%s.setMetadata() => %s=%s", self, key, unwrap(value))
757 758 @remoted 759 @perf
760 - def setAllMetadata(self, value, current = None):
761 self.assert_write() 762 self.storage.add_meta_map({"key": wrap(value)}) 763 self.logger.info("%s.setMetadata() => number=%s", self, slen(value))
764
765 # Column methods missing 766 767 -class TablesI(omero.grid.Tables, omero.util.Servant):
768 """ 769 Implementation of the omero.grid.Tables API. Provides 770 spreadsheet like functionality across the OMERO.grid. 771 This servant serves as a session-less, user-less 772 resource for obtaining omero.grid.Table proxies. 773 774 The first major step in initialization is getting 775 a session. This will block until the Blitz server 776 is reachable. 777 """ 778
779 - def __init__(self,\ 780 ctx,\ 781 table_cast = omero.grid.TablePrx.uncheckedCast,\ 782 internal_repo_cast = omero.grid.InternalRepositoryPrx.checkedCast):
783 784 omero.util.Servant.__init__(self, ctx, needs_session = True) 785 786 # Storing these methods, mainly to allow overriding via 787 # test methods. Static methods are evil. 788 self._table_cast = table_cast 789 self._internal_repo_cast = internal_repo_cast 790 791 self.__stores = [] 792 self._get_dir() 793 self._get_uuid() 794 self._get_repo()
795
796 - def _get_dir(self):
797 """ 798 Second step in initialization is to find the .omero/repository 799 directory. If this is not created, then a required server has 800 not started, and so this instance will not start. 801 """ 802 wait = int(self.communicator.getProperties().getPropertyWithDefault("omero.repo.wait", "1")) 803 self.repo_dir = self.communicator.getProperties().getProperty("omero.repo.dir") 804 805 if not self.repo_dir: 806 # Implies this is the legacy directory. Obtain from server 807 self.repo_dir = self.ctx.getSession().getConfigService().getConfigValue("omero.data.dir") 808 809 self.repo_cfg = path(self.repo_dir) / ".omero" / "repository" 810 start = time.time() 811 while not self.repo_cfg.exists() and wait < (time.time() - start): 812 self.logger.info("%s doesn't exist; waiting 5 seconds..." % self.repo_cfg) 813 self.stop_event.wait(5) 814 if not self.repo_cfg.exists(): 815 msg = "No repository found: %s" % self.repo_cfg 816 self.logger.error(msg) 817 raise omero.ResourceError(None, None, msg)
818
819 - def _get_uuid(self):
820 """ 821 Third step in initialization is to find the database uuid 822 for this grid instance. Multiple OMERO.grids could be watching 823 the same directory. 824 """ 825 cfg = self.ctx.getSession().getConfigService() 826 self.db_uuid = cfg.getDatabaseUuid() 827 self.instance = self.repo_cfg / self.db_uuid
828
829 - def _get_repo(self):
830 """ 831 Fourth step in initialization is to find the repository object 832 for the UUID found in .omero/repository/<db_uuid>, and then 833 create a proxy for the InternalRepository attached to that. 834 """ 835 836 # Get and parse the uuid from the RandomAccessFile format from FileMaker 837 self.repo_uuid = (self.instance / "repo_uuid").lines()[0].strip() 838 if len(self.repo_uuid) != 38: 839 raise omero.ResourceError("Poorly formed UUID: %s" % self.repo_uuid) 840 self.repo_uuid = self.repo_uuid[2:] 841 842 # Using the repo_uuid, find our OriginalFile object 843 self.repo_obj = self.ctx.getSession().getQueryService().findByQuery("select f from OriginalFile f where sha1 = :uuid", 844 omero.sys.ParametersI().add("uuid", rstring(self.repo_uuid))) 845 self.repo_mgr = self.communicator.stringToProxy("InternalRepository-%s" % self.repo_uuid) 846 self.repo_mgr = self._internal_repo_cast(self.repo_mgr) 847 self.repo_svc = self.repo_mgr.getProxy()
848 849 @remoted
850 - def getRepository(self, current = None):
851 """ 852 Returns the Repository object for this Tables server. 853 """ 854 return self.repo_svc
855 856 @remoted 857 @perf
858 - def getTable(self, file_obj, factory, current = None):
859 """ 860 Create and/or register a table servant. 861 """ 862 863 # Will throw an exception if not allowed. 864 file_id = None 865 if file_obj is not None and file_obj.id is not None: 866 file_id = file_obj.id.val 867 self.logger.info("getTable: %s %s", file_id, current.ctx) 868 869 file_path = self.repo_mgr.getFilePath(file_obj) 870 p = path(file_path).dirname() 871 if not p.exists(): 872 p.makedirs() 873 874 storage = HDFLIST.getOrCreate(file_path) 875 id = Ice.Identity() 876 id.name = Ice.generateUUID() 877 table = TableI(self.ctx, file_obj, factory, storage, uuid = id.name, \ 878 call_context=current.ctx) 879 self.resources.add(table) 880 881 prx = current.adapter.add(table, id) 882 return self._table_cast(prx)
883