Package omero :: Module tables
[hide private]
[frames] | no frames]

Source Code for Module omero.tables

  1  #!/usr/bin/env python 
  2  # 
  3  # OMERO Tables Interface 
  4  # Copyright 2009 Glencoe Software, Inc.  All Rights Reserved. 
  5  # Use is subject to license terms supplied in LICENSE.txt 
  6  # 
  7   
  8  import os 
  9  import Ice 
 10  import time 
 11  import numpy 
 12  import signal 
 13  import logging 
 14  import threading 
 15  import traceback 
 16  import subprocess 
 17  import exceptions 
 18  import portalocker # Third-party 
 19   
 20  from path import path 
 21   
 22   
 23  import omero # Do we need both?? 
 24  import omero.clients 
 25   
 26  # For ease of use 
 27  from omero.columns import * 
 28  from omero.rtypes import * 
 29  from omero.util.decorators import remoted, locked, perf 
 30  from omero_ext.functional import wraps 
 31   
 32   
 33  sys = __import__("sys") # Python sys 
 34  tables = __import__("tables") # Pytables 
35 36 -def slen(rv):
37 """ 38 Returns the length of the argument or None 39 if the argument is None 40 """ 41 if rv is None: 42 return None 43 return len(rv)
44
45 -def stamped(func, update = False):
46 """ 47 Decorator which takes the first argument after "self" and compares 48 that to the last modification time. If the stamp is older, then the 49 method call will throw an omero.OptimisticLockException. Otherwise, 50 execution will complete normally. If update is True, then the 51 last modification time will be updated after the method call if it 52 is successful. 53 54 Note: stamped implies locked 55 56 """ 57 def check_and_update_stamp(*args, **kwargs): 58 self = args[0] 59 stamp = args[1] 60 if stamp < self._stamp: 61 raise omero.OptimisticLockException(None, None, "Resource modified by another thread") 62 63 try: 64 return func(*args, **kwargs) 65 finally: 66 if update: 67 self._stamp = time.time()
68 checked_and_update_stamp = wraps(func)(check_and_update_stamp) 69 return locked(check_and_update_stamp) 70
71 72 -class HdfList(object):
73 """ 74 Since two calls to tables.openFile() return non-equal files 75 with equal fileno's, portalocker cannot be used to prevent 76 the creation of two HdfStorage instances from the same 77 Python process. 78 """ 79
80 - def __init__(self):
81 self._lock = threading.RLock() 82 self.__filenos = {} 83 self.__paths = {}
84 85 @locked
86 - def addOrThrow(self, hdfpath, hdffile, hdfstorage, action):
87 fileno = hdffile.fileno() 88 if fileno in self.__filenos.keys(): 89 raise omero.LockTimeout(None, None, "File already opened by process: %s" % hdfpath, 0) 90 else: 91 self.__filenos[fileno] = hdfstorage 92 self.__paths[hdfpath] = hdfstorage 93 action()
94 95 @locked
96 - def getOrCreate(self, hdfpath):
97 try: 98 return self.__paths[hdfpath] 99 except KeyError: 100 return HdfStorage(hdfpath) # Adds itself.
101 102 @locked
103 - def remove(self, hdfpath, hdffile):
104 del self.__filenos[hdffile.fileno()] 105 del self.__paths[hdfpath]
106 107 # Global object for maintaining files 108 HDFLIST = HdfList()
109 110 -class HdfStorage(object):
111 """ 112 Provides HDF-storage for measurement results. At most a single 113 instance will be available for any given physical HDF5 file. 114 """ 115 116
117 - def __init__(self, file_path):
118 119 """ 120 file_path should be the path to a file in a valid directory where 121 this HDF instance can be stored (Not None or Empty). Once this 122 method is finished, self.__hdf_file is guaranteed to be a PyTables HDF 123 file, but not necessarily initialized. 124 """ 125 126 if file_path is None or str(file_path) == "": 127 raise omero.ValidationException(None, None, "Invalid file_path") 128 129 self.logger = logging.getLogger("omero.tables.HdfStorage") 130 self.__hdf_path = path(file_path) 131 self.__hdf_file = self.__openfile("a") 132 self.__tables = [] 133 134 self._lock = threading.RLock() 135 self._stamp = time.time() 136 137 # These are what we'd like to have 138 self.__mea = None 139 self.__ome = None 140 141 # Now we try to lock the file, if this fails, we rollback 142 # any previous initialization (opening the file) 143 try: 144 fileno = self.__hdf_file.fileno() 145 HDFLIST.addOrThrow(self.__hdf_path, self.__hdf_file, self,\ 146 lambda: portalocker.lock(self.__hdf_file, portalocker.LOCK_NB|portalocker.LOCK_EX)) 147 except portalocker.LockException, le: 148 self.cleanup() 149 raise omero.LockTimeout(None, None, "Cannot acquire exclusive lock on: %s" % self.__hdf_path, 0) 150 151 try: 152 self.__ome = self.__hdf_file.root.OME 153 self.__mea = self.__ome.Measurements 154 self.__types = self.__ome.ColumnTypes[:] 155 self.__descriptions = self.__ome.ColumnDescriptions[:] 156 self.__initialized = True 157 except tables.NoSuchNodeError: 158 self.__initialized = False
159 160 # 161 # Non-locked methods 162 # 163
164 - def __openfile(self, mode):
165 try: 166 return tables.openFile(self.__hdf_path, mode=mode, title="OMERO HDF Measurement Storege", rootUEP="/") 167 except IOError, io: 168 msg = "HDFStorage initialized with bad path: %s" % self.__hdf_path 169 self.logger.error(msg) 170 raise omero.ValidationException(None, None, msg)
171
172 - def __initcheck(self):
173 if not self.__initialized: 174 raise omero.ApiUsageException(None, None, "Not yet initialized")
175 176 # 177 # Locked methods 178 # 179 180 @locked
181 - def initialize(self, cols, metadata = {}):
182 """ 183 184 """ 185 186 if self.__initialized: 187 raise omero.ValidationException(None, None, "Already initialized.") 188 189 self.__definition = columns2definition(cols) 190 self.__ome = self.__hdf_file.createGroup("/", "OME") 191 self.__mea = self.__hdf_file.createTable(self.__ome, "Measurements", self.__definition) 192 193 self.__types = [ x.ice_staticId() for x in cols ] 194 self.__descriptions = [ (x.description != None) and x.description or "" for x in cols ] 195 self.__hdf_file.createArray(self.__ome, "ColumnTypes", self.__types) 196 self.__hdf_file.createArray(self.__ome, "ColumnDescriptions", self.__descriptions) 197 198 self.__mea.attrs.version = "v1" 199 self.__mea.attrs.initialized = time.time() 200 if metadata: 201 for k, v in metadata.items(): 202 self.__mea.attrs[k] = v 203 # See attrs._f_list("user") to retrieve these. 204 205 self.__mea.flush() 206 self.__hdf_file.flush() 207 self.__initialized = True
208 209 @locked
210 - def incr(self, table):
211 sz = len(self.__tables) 212 self.logger.info("Size: %s - Attaching %s to %s" % (sz, table, self.__hdf_path)) 213 if table in self.__tables: 214 self.logger.warn("Already added") 215 raise omero.ApiUsageException(None, Non, "Already added") 216 self.__tables.append(table) 217 return sz + 1
218 219 @locked
220 - def decr(self, table):
221 sz = len(self.__tables) 222 self.logger.info("Size: %s - Detaching %s from %s", sz, table, self.__hdf_path) 223 if not (table in self.__tables): 224 self.logger.warn("Unknown table") 225 raise omero.ApiUsageException(None, None, "Unknown table") 226 self.__tables.remove(table) 227 if sz <= 1: 228 self.cleanup() 229 return sz - 1
230 231 @locked
232 - def uptodate(self, stamp):
233 return self._stamp <= stamp
234 235 @locked
236 - def rows(self):
237 self.__initcheck() 238 return self.__mea.nrows
239 240 @locked
241 - def cols(self, size, current):
242 self.__initcheck() 243 ic = current.adapter.getCommunicator() 244 types = self.__types 245 names = self.__mea.colnames 246 cols = [] 247 for i in range(len(types)): 248 t = types[i] 249 n = names[i] 250 try: 251 col = ic.findObjectFactory(t).create(t) 252 col.name = n 253 col.size(size) 254 cols.append(col) 255 except: 256 msg = traceback.format_exc() 257 raise omero.ValidationException(None, msg, "BAD COLUMN TYPE: %s for %s" % (t,n)) 258 return cols
259 260 @locked
261 - def meta(self):
262 self.__initcheck() 263 metadata = {} 264 attr = self.__mea.attrs 265 keys = list(self.__mea.attrs._v_attrnamesuser) 266 for key in keys: 267 val = attr[key] 268 if type(val) == numpy.float64: 269 val = rfloat(val) 270 elif type(val) == numpy.int32: 271 val = rint(val) 272 elif type(val) == numpy.string_: 273 val = rstring(val) 274 else: 275 raise omero.ValidationException("BAD TYPE: %s" % type(val)) 276 metadata[key] = val
277 278 @locked
279 - def append(self, cols):
280 # Optimize! 281 arrays = [] 282 names = [] 283 for col in cols: 284 names.append(col.name) 285 arrays.append(col.array()) 286 data = numpy.rec.fromarrays(arrays, names=names) 287 self.__mea.append(data) 288 self.__mea.flush()
289 290 # 291 # Stamped methods 292 # 293 294 @stamped
295 - def getWhereList(self, stamp, condition, variables, unused, start, stop, step):
296 self.__initcheck() 297 return self.__mea.getWhereList(condition, variables, None, start, stop, step).tolist()
298
299 - def _data(self, cols, rowNumbers):
300 data = omero.grid.Data() 301 data.columns = cols 302 data.rowNumbers = rowNumbers 303 data.lastModification = long(self._stamp*1000) # Convert to millis since epoch 304 return data
305 306 @stamped
307 - def readCoordinates(self, stamp, rowNumbers, current):
308 self.__initcheck() 309 rows = self.__mea.readCoordinates(rowNumbers) 310 cols = self.cols(None, current) 311 for col in cols: 312 col.values = rows[col.name].tolist() 313 return self._data(cols, rowNumbers)
314 315 @stamped
316 - def slice(self, stamp, colNumbers, rowNumbers, current):
317 self.__initcheck() 318 if rowNumbers is None or len(rowNumbers) == 0: 319 rows = self.__mea.read() 320 else: 321 rows = self.__mea.readCoordinates(rowNumbers) 322 cols = self.cols(None, current) 323 rv = [] 324 for i in range(len(cols)): 325 if colNumbers is None or len(colNumbers) == 0 or i in colNumbers: 326 col = cols[i] 327 col.values = rows[col.name].tolist() 328 rv.append(col) 329 return self._data(rv, rowNumbers)
330 331 # 332 # Lifecycle methods 333 # 334
335 - def check(self):
336 return True
337 338 @locked
339 - def cleanup(self):
340 self.logger.info("Cleaning storage: %s", self.__hdf_path) 341 if self.__mea: 342 self.__mea.flush() 343 self.__mea = None 344 if self.__ome: 345 self.__ome = None 346 if self.__hdf_file: 347 HDFLIST.remove(self.__hdf_path, self.__hdf_file) 348 hdffile = self.__hdf_file 349 self.__hdf_file = None 350 hdffile.close() # Resources freed
351
352 # End class HdfStorage 353 354 355 -class TableI(omero.grid.Table, omero.util.SimpleServant):
356 """ 357 Spreadsheet implementation based on pytables. 358 """ 359
360 - def __init__(self, ctx, file_obj, storage, uuid = "unknown"):
361 self.uuid = uuid 362 self.file_obj = file_obj 363 self.stamp = time.time() 364 self.storage = storage 365 omero.util.SimpleServant.__init__(self, ctx) 366 self.storage.incr(self)
367
368 - def check(self):
369 """ 370 Called periodically to check the resource is alive. Returns 371 False if this resource can be cleaned up. (Resources API) 372 """ 373 self.logger.debug("Checking %s" % self) 374 return False
375
376 - def cleanup(self):
377 """ 378 Decrements the counter on the held storage to allow it to 379 be cleaned up. 380 """ 381 if self.storage: 382 try: 383 self.storage.decr(self) 384 finally: 385 self.storage = None
386
387 - def __str__(self):
388 return "Table-%s" % self.uuid
389 390 @remoted 391 @perf
392 - def close(self, current = None):
393 try: 394 self.cleanup() 395 self.logger.info("Closed %s", self) 396 except: 397 self.logger.warn("Closed %s with errors", self)
398 399 # TABLES READ API ============================ 400 401 @remoted 402 @perf
403 - def getOriginalFile(self, current = None):
404 self.logger.info("%s.getOriginalFile() => %s", self, self.file_obj) 405 return self.file_obj
406 407 @remoted 408 @perf
409 - def getHeaders(self, current = None):
410 rv = self.storage.cols(None, current) 411 self.logger.info("%s.getHeaders() => size=%s", self, slen(rv)) 412 return rv
413 414 @remoted 415 @perf
416 - def getMetadata(self, current = None):
417 rv = self.storage.meta() 418 self.logger.info("%s.getMetadata() => size=%s", self, slen(rv)) 419 return rv
420 421 @remoted 422 @perf
423 - def getNumberOfRows(self, current = None):
424 rv = self.storage.rows() 425 self.logger.info("%s.getNumberOfRows() => %s", self, rv) 426 return long(rv)
427 428 @remoted 429 @perf
430 - def getWhereList(self, condition, variables, start, stop, step, current = None):
431 if stop == 0: 432 stop = None 433 if step == 0: 434 step = None 435 rv = self.storage.getWhereList(self.stamp, condition, variables, None, start, stop, step) 436 self.logger.info("%s.getWhereList(%s, %s, %s, %s, %s) => size=%s", self, condition, variables, start, stop, step, slen(rv)) 437 return rv
438 439 @remoted 440 @perf
441 - def readCoordinates(self, rowNumbers, current = None):
442 self.logger.info("%s.readCoordinates(size=%s)", self, slen(rowNumbers)) 443 return self.storage.readCoordinates(self.stamp, rowNumbers, current)
444 445 @remoted 446 @perf
447 - def slice(self, colNumbers, rowNumbers, current = None):
448 self.logger.info("%s.slice(size=%s, size=%s)", self, slen(colNumbers), slen(rowNumbers)) 449 return self.storage.slice(self.stamp, colNumbers, rowNumbers, current)
450 451 # TABLES WRITE API =========================== 452 453 @remoted 454 @perf
455 - def initialize(self, cols, current = None):
456 self.storage.initialize(cols) 457 if cols: 458 self.logger.info("Initialized %s with %s cols", self, slen(cols))
459 460 @remoted 461 @perf
462 - def addColumn(self, col, current = None):
463 raise omero.ApiUsageException(None, None, "NYI")
464 465 @remoted 466 @perf
467 - def addData(self, cols, current = None):
468 self.storage.append(cols) 469 if cols and cols[0].values: 470 self.logger.info("Added %s rows of data to %s", slen(cols[0].values), self)
471
472 473 -class TablesI(omero.grid.Tables, omero.util.Servant):
474 """ 475 Implementation of the omero.grid.Tables API. Provides 476 spreadsheet like functionality across the OMERO.grid. 477 This servant serves as a session-less, user-less 478 resource for obtaining omero.grid.Table proxies. 479 480 The first major step in initialization is getting 481 a session. This will block until the Blitz server 482 is reachable. 483 """ 484
485 - def __init__(self,\ 486 ctx,\ 487 table_cast = omero.grid.TablePrx.uncheckedCast,\ 488 internal_repo_cast = omero.grid.InternalRepositoryPrx.checkedCast):
489 490 omero.util.Servant.__init__(self, ctx, needs_session = True) 491 492 # Storing these methods, mainly to allow overriding via 493 # test methods. Static methods are evil. 494 self._table_cast = table_cast 495 self._internal_repo_cast = internal_repo_cast 496 497 self.__stores = [] 498 self._get_dir() 499 self._get_uuid() 500 self._get_repo()
501
502 - def _get_dir(self):
503 """ 504 Second step in initialization is to find the .omero/repository 505 directory. If this is not created, then a required server has 506 not started, and so this instance will not start. 507 """ 508 wait = int(self.communicator.getProperties().getPropertyWithDefault("omero.repo.wait", "1")) 509 self.repo_dir = self.communicator.getProperties().getProperty("omero.repo.dir") 510 511 if not self.repo_dir: 512 # Implies this is the legacy directory. Obtain from server 513 self.repo_dir = self.ctx.getSession().getConfigService().getConfigValue("omero.data.dir") 514 515 self.repo_cfg = path(self.repo_dir) / ".omero" / "repository" 516 start = time.time() 517 while not self.repo_cfg.exists() and wait < (time.time() - start): 518 self.logger.info("%s doesn't exist; waiting 5 seconds..." % self.repo_cfg) 519 time.sleep(5) 520 count -= 1 521 if not self.repo_cfg.exists(): 522 msg = "No repository found: %s" % self.repo_cfg 523 self.logger.error(msg) 524 raise omero.ResourceError(None, None, msg)
525
526 - def _get_uuid(self):
527 """ 528 Third step in initialization is to find the database uuid 529 for this grid instance. Multiple OMERO.grids could be watching 530 the same directory. 531 """ 532 cfg = self.ctx.getSession().getConfigService() 533 self.db_uuid = cfg.getDatabaseUuid() 534 self.instance = self.repo_cfg / self.db_uuid
535
536 - def _get_repo(self):
537 """ 538 Fourth step in initialization is to find the repository object 539 for the UUID found in .omero/repository/<db_uuid>, and then 540 create a proxy for the InternalRepository attached to that. 541 """ 542 543 # Get and parse the uuid from the RandomAccessFile format from FileMaker 544 self.repo_uuid = (self.instance / "repo_uuid").lines()[0].strip() 545 if len(self.repo_uuid) != 38: 546 raise omero.ResourceError("Poorly formed UUID: %s" % self.repo_uuid) 547 self.repo_uuid = self.repo_uuid[2:] 548 549 # Using the repo_uuid, find our OriginalFile object 550 self.repo_obj = self.ctx.getSession().getQueryService().findByQuery("select f from OriginalFile f where sha1 = :uuid", 551 omero.sys.ParametersI().add("uuid", rstring(self.repo_uuid))) 552 self.repo_mgr = self.communicator.stringToProxy("InternalRepository-%s" % self.repo_uuid) 553 self.repo_mgr = self._internal_repo_cast(self.repo_mgr) 554 self.repo_svc = self.repo_mgr.getProxy()
555 556 @remoted
557 - def getRepository(self, current = None):
558 """ 559 Returns the Repository object for this Tables server. 560 """ 561 return self.repo_svc
562 563 @remoted 564 @perf
565 - def getTable(self, file_obj, current = None):
566 """ 567 Create and/or register a table servant. 568 """ 569 570 # Will throw an exception if not allowed. 571 self.logger.info("getTable: %s", (file_obj and file_obj.id and file_obj.id.val)) 572 file_path = self.repo_mgr.getFilePath(file_obj) 573 p = path(file_path).dirname() 574 if not p.exists(): 575 p.makedirs() 576 577 storage = HDFLIST.getOrCreate(file_path) 578 id = Ice.Identity() 579 id.name = Ice.generateUUID() 580 table = TableI(self.ctx, file_obj, storage, uuid = id.name) 581 self.resources.add(table) 582 583 prx = current.adapter.add(table, id) 584 return self._table_cast(prx)
585