Package omero :: Package util :: Module populate_roi
[hide private]
[frames] | no frames]

Source Code for Module omero.util.populate_roi

   1  #!/usr/bin/env python 
   2  # -*- coding: utf-8 -*- 
   3  """ 
   4  ... 
   5  """ 
   6   
   7  # 
   8  #  Copyright (C) 2009 University of Dundee. All rights reserved. 
   9  # 
  10  # 
  11  #  This program is free software; you can redistribute it and/or modify 
  12  #  it under the terms of the GNU General Public License as published by 
  13  #  the Free Software Foundation; either version 2 of the License, or 
  14  #  (at your option) any later version. 
  15  #  This program is distributed in the hope that it will be useful, 
  16  #  but WITHOUT ANY WARRANTY; without even the implied warranty of 
  17  #  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the 
  18  #  GNU General Public License for more details. 
  19  # 
  20  #  You should have received a copy of the GNU General Public License along 
  21  #  with this program; if not, write to the Free Software Foundation, Inc., 
  22  #  51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 
  23  # 
  24   
  25   
  26  import tempfile 
  27  import logging 
  28  import time 
  29  import sys 
  30  import csv 
  31  import re 
  32  from threading import Thread 
  33  from StringIO import StringIO 
  34  from getpass import getpass 
  35  from getopt import getopt, GetoptError 
  36  from Queue import Queue 
  37   
  38  import omero.clients 
  39  from omero.rtypes import rdouble, rstring, rint 
  40  from omero.model import OriginalFileI, PlateI, PlateAnnotationLinkI, ImageI, \ 
  41                          FileAnnotationI, RoiI, EllipseI, PointI 
  42  from omero.grid import ImageColumn, WellColumn, RoiColumn, LongColumn, DoubleColumn 
  43  from omero.util.temp_files import create_path, remove_path 
  44  from omero import client 
  45   
  46  # Handle Python 2.5 built-in ElementTree 
  47  try: 
  48          from xml.etree.cElementTree import XML, Element, SubElement, ElementTree, dump, iterparse 
  49  except ImportError: 
  50          from cElementTree import XML, Element, SubElement, ElementTree, dump, iterparse 
  51   
  52  log = logging.getLogger("omero.util.populate_roi") 
  53   
54 -def usage(error):
55 """Prints usage so that we don't have to. :)""" 56 cmd = sys.argv[0] 57 print """%s 58 Usage: %s [-s hostname] [-u username | -k session_key] <-p port> [plate_id] 59 Runs measurement population code for a given plate. 60 61 Options: 62 -s OMERO hostname to use 63 -p OMERO port to use [defaults to 4064] 64 -u OMERO username to use 65 -k OMERO session key to use 66 -m Measurement index to populate 67 -i Dump measurement information and exit (no population) 68 -d Print debug statements 69 -t Number of threads to use when populating [defaults to 1] 70 71 Examples: 72 %s -s localhost -p 4063 -u bob 27 73 74 Report bugs to ome-devel@lists.openmicroscopy.org.uk""" % (error, cmd, cmd) 75 sys.exit(2)
76 77 ### 78 ### Worker and ThreadPool from... 79 ### http://code.activestate.com/recipes/577187-python-thread-pool/ 80 ### 81
82 -class Worker(Thread):
83 """Thread executing tasks from a given tasks queue"""
84 - def __init__(self, tasks):
85 Thread.__init__(self) 86 self.tasks = tasks 87 self.daemon = True 88 self.start()
89
90 - def run(self):
91 while True: 92 func, args, kargs = self.tasks.get() 93 try: 94 func(*args, **kargs) 95 except Exception, e: 96 log.exception(e) 97 self.tasks.task_done()
98
99 -class ThreadPool:
100 """Pool of threads consuming tasks from a queue"""
101 - def __init__(self, num_threads):
102 self.tasks = Queue(num_threads) 103 for _ in range(num_threads): 104 Worker(self.tasks)
105
106 - def add_task(self, func, *args, **kargs):
107 """Add a task to the queue""" 108 self.tasks.put((func, args, kargs))
109
110 - def wait_completion(self):
111 """Wait for completion of all the tasks in the queue""" 112 self.tasks.join()
113 114 # Global thread pool for use by ROI workers 115 thread_pool = None 116
117 -def get_thread_pool():
118 global thread_pool 119 if thread_pool is None: 120 thread_pool = ThreadPool(1) 121 return thread_pool
122
123 -class MeasurementError(Exception):
124 """ 125 Raised by the analysis or measurement context when an error condition 126 is reached. 127 """ 128 pass
129
130 -class DownloadingOriginalFileProvider(object):
131 """ 132 Provides original file data by downloading it from an OMERO raw file store. 133 """ 134 135 # Default raw file store buffer size 136 BUFFER_SIZE = 1024 * 1024 # 1MB 137
138 - def __init__(self, service_factory):
139 self.service_factory = service_factory 140 self.raw_file_store = self.service_factory.createRawFileStore() 141 self.dir = create_path("populate_roi", "dir", folder = True)
142
143 - def get_original_file_data(self, original_file):
144 """ 145 Downloads an original file to a temporary file and returns an open 146 file handle to that temporary file seeked to zero. The caller is 147 responsible for closing the temporary file. 148 """ 149 log.info("Downloading original file: %d" % original_file.id.val) 150 self.raw_file_store.setFileId(original_file.id.val) 151 temporary_file = tempfile.TemporaryFile(dir=str(self.dir)) 152 size = original_file.size.val 153 for i in range((size / self.BUFFER_SIZE) + 1): 154 index = i * self.BUFFER_SIZE 155 data = self.raw_file_store.read(index, self.BUFFER_SIZE) 156 temporary_file.write(data) 157 temporary_file.seek(0L) 158 temporary_file.truncate(size) 159 return temporary_file
160
161 - def __delete__(self):
162 self.raw_file_store.close()
163
164 -class AbstractPlateAnalysisCtx(object):
165 """ 166 Abstract class which aggregates and represents all measurement runs made on 167 a given Plate. 168 """ 169 170 DEFAULT_ORIGINAL_FILE_PROVIDER = DownloadingOriginalFileProvider 171
172 - def __init__(self, images, original_files, original_file_image_map, 173 plate_id, service_factory):
174 super(AbstractPlateAnalysisCtx, self).__init__() 175 self.images = images 176 self.numcols, self.numrows = self.guess_geometry(self.images) 177 self.original_files = original_files 178 self.original_file_image_map = original_file_image_map 179 self.plate_id = plate_id 180 self.service_factory = service_factory 181 self.log_files = dict() 182 self.detail_files = dict() 183 self.measurements = dict()
184
185 - def guess_geometry(self, images):
186 max_col = 0 187 max_row = 0 188 for image in images: 189 ws = image.copyWellSamples()[0] # Using only first well sample link 190 well = ws.well 191 max_col = max(max_col, well.column.val) 192 max_row = max(max_row, well.row.val) 193 return (max_col+1, max_row+1)
194
195 - def colrow_from_wellnumber(self, width, wellnumber):
196 x = wellnumber - 1 197 col = x % width 198 row = x / width 199 return (col,row)
200
201 - def image_from_wellnumber(self, wellnumber):
202 col, row = self.colrow_from_wellnumber(self.numcols, wellnumber) 203 log.debug("Finding image for %s (%s,%s)..." % (wellnumber, col, row)) 204 for image in self.images: 205 well = image.copyWellSamples()[0].well 206 if well.column.val == col and well.row.val == row: 207 return image 208 raise Exception("Could not find image for (col,row)==(%s,%s)" % (col,row))
209 210 ### 211 ### Abstract methods 212 ### 213
214 - def is_this_type(klass):
215 """ 216 Concrete implementations are to return True if the class pertinent 217 for the original files associated with the plate. 218 """ 219 raise Exception("To be implemented by concrete implementations.")
220 is_this_type = classmethod(is_this_type) 221
222 - def get_measurement_count(self):
223 """Returns the number of recognized measurement runs.""" 224 raise Exception("To be implemented by concrete implementations.")
225
226 - def get_measurement_ctx(self, index):
227 """Returns the measurement context for a given index.""" 228 raise Exception("To be implemented by concrete implementations.")
229
230 - def get_result_file_count(self, measurement_index):
231 """ 232 Return the number of result files associated with a measurement run. 233 """ 234 raise Exception("To be implemented by concrete implementations.")
235
236 -class MIASPlateAnalysisCtx(AbstractPlateAnalysisCtx):
237 """ 238 MIAS dataset concrete class implementation of an analysis context. MIAS 239 measurements are aggregated based on a single "log" file. A result 240 file is present for each stitched (of multiple fields) mosaic and 241 contains the actual measured results and ROI. 242 """ 243 244 # Python datetime format string of the log filename completion date/time 245 datetime_format = '%Y-%m-%d-%Hh%Mm%Ss' 246 247 # Regular expression matching a log filename 248 log_regex = re.compile('.*log(\d+-\d+-\d+-\d+h\d+m\d+s).txt$') 249 250 # Regular expression matching a result filename 251 detail_regex = re.compile( 252 '^Well(\d+)_(.*)_detail_(\d+-\d+-\d+-\d+h\d+m\d+s).txt$') 253 254 # Companion file format 255 companion_format = 'Companion/MIAS' 256
257 - def __init__(self, images, original_files, original_file_image_map, 258 plate_id, service_factory):
259 super(MIASPlateAnalysisCtx, self).__init__( 260 images, original_files, original_file_image_map, plate_id, 261 service_factory) 262 self._populate_log_and_detail_files() 263 self._populate_measurements()
264
266 """ 267 Strips out erroneous files and collects the log and result original 268 files based on regular expression matching. 269 """ 270 for original_file in self.original_files: 271 if original_file.mimetype.val != self.companion_format: 272 continue 273 name = original_file.name.val 274 match = self.log_regex.match(name) 275 if match: 276 d = time.strptime(match.group(1), self.datetime_format) 277 self.log_files[d] = original_file 278 continue 279 match = self.detail_regex.match(name) 280 if match: 281 d = time.strptime(match.group(3), self.datetime_format) 282 self.detail_files[d] = original_file 283 continue
284
285 - def _populate_measurements(self):
286 """ 287 Result original files are only recognizable as part of a given 288 measurement (declared by a log file) based upon their parsed 289 date/time of completion as encoded in the filename. This method 290 collects result original files and groups them by collective 291 parsed date/time of completion. 292 """ 293 log_timestamps = list(self.log_files.keys()) 294 log_timestamps.sort() 295 detail_timestamps = list(self.detail_files.keys()) 296 detail_timestamps.sort() 297 for log_timestamp in log_timestamps: 298 self.measurements[log_timestamp] = list() 299 for detail_timestamp in detail_timestamps: 300 for log_timestamp in log_timestamps: 301 if detail_timestamp < log_timestamp: 302 self.measurements[log_timestamp].append( 303 self.detail_files[detail_timestamp]) 304 break
305 306 ### 307 ### Abstract method implementations 308 ### 309
310 - def is_this_type(klass, original_files):
311 for original_file in original_files: 312 format = original_file.mimetype.val 313 if format == klass.companion_format \ 314 and klass.log_regex.match(original_file.name.val): 315 return True
316 is_this_type = classmethod(is_this_type) 317
318 - def get_measurement_count(self):
319 return len(self.measurements.keys())
320
321 - def get_measurement_ctx(self, index):
322 key = self.log_files.keys()[index] 323 sf = self.service_factory 324 original_file = self.log_files[key] 325 result_files = self.measurements[key] 326 provider = self.DEFAULT_ORIGINAL_FILE_PROVIDER(sf) 327 return MIASMeasurementCtx(self, sf, provider, original_file, 328 result_files)
329
330 - def get_result_file_count(self, measurement_index):
331 key = self.log_files.keys()[measurement_index] 332 return len(self.measurements[key])
333
334 -class FlexPlateAnalysisCtx(AbstractPlateAnalysisCtx):
335 """ 336 Flex dataset concrete class implementation of an analysis context. Flex 337 measurements are aggregated in a single ".res" XML file and contain no 338 ROI. 339 """ 340 341 # Companion file format 342 companion_format = 'Companion/Flex' 343
344 - def __init__(self, images, original_files, original_file_image_map, 345 plate_id, service_factory):
346 super(FlexPlateAnalysisCtx, self).__init__( 347 images, original_files, original_file_image_map, plate_id, 348 service_factory) 349 path_original_file_map = dict() 350 for original_file in original_files: 351 path = original_file.path.val 352 name = original_file.name.val 353 format = original_file.mimetype.val 354 if format == self.companion_format and name.endswith('.res'): 355 path_original_file_map[path] = original_file 356 self.measurements = path_original_file_map.values()
357 358 ### 359 ### Abstract method implementations 360 ### 361
362 - def is_this_type(klass, original_files):
363 for original_file in original_files: 364 path = original_file.path.val 365 format = original_file.mimetype.val 366 name = original_file.name.val 367 if format == klass.companion_format and name.endswith('.res'): 368 return True 369 return False
370 is_this_type = classmethod(is_this_type) 371
372 - def get_measurement_count(self):
373 return len(self.measurements)
374
375 - def get_measurement_ctx(self, index):
376 sf = self.service_factory 377 original_file = self.measurements[index] 378 result_files = [] 379 provider = self.DEFAULT_ORIGINAL_FILE_PROVIDER(sf) 380 return FlexMeasurementCtx(self, sf, provider, original_file, 381 result_files)
382
383 - def get_result_file_count(self, measurement_index):
384 return 1
385
386 -class InCellPlateAnalysisCtx(AbstractPlateAnalysisCtx):
387 """ 388 InCell dataset concrete class implementation of an analysis context. 389 InCell measurements are from InCell Analyzer and are aggregated in a 390 single gargantuan (often larger than 100MB per plate) XML file. 391 """ 392 393 # Companion file format 394 companion_format = 'Companion/InCell' 395
396 - def __init__(self, images, original_files, original_file_image_map, 397 plate_id, service_factory):
398 super(InCellPlateAnalysisCtx, self).__init__( 399 images, original_files, original_file_image_map, plate_id, 400 service_factory) 401 path_original_file_map = dict() 402 for original_file in original_files: 403 path = original_file.path.val 404 name = original_file.name.val 405 format = original_file.mimetype.val 406 if format == self.companion_format and name.endswith('.xml'): 407 path_original_file_map[path] = original_file 408 self.measurements = path_original_file_map.values()
409 410 ### 411 ### Abstract method implementations 412 ### 413
414 - def is_this_type(klass, original_files):
415 for original_file in original_files: 416 path = original_file.path.val 417 format = original_file.mimetype.val 418 name = original_file.name.val 419 if format == klass.companion_format and name.endswith('.xml'): 420 return True 421 return False
422 is_this_type = classmethod(is_this_type) 423
424 - def get_measurement_count(self):
425 return len(self.measurements)
426
427 - def get_measurement_ctx(self, index):
428 sf = self.service_factory 429 original_file = self.measurements[index] 430 result_files = [] 431 provider = self.DEFAULT_ORIGINAL_FILE_PROVIDER(sf) 432 return InCellMeasurementCtx(self, sf, provider, original_file, 433 result_files)
434
435 - def get_result_file_count(self, measurement_index):
436 return 1
437
438 -class PlateAnalysisCtxFactory(object):
439 """ 440 The plate analysis context factory is responsible for detecting and 441 returning a plate analysis context instance for a given plate. 442 """ 443 444 implementations = (FlexPlateAnalysisCtx, MIASPlateAnalysisCtx, 445 InCellPlateAnalysisCtx) 446
447 - def __init__(self, service_factory):
448 self.service_factory = service_factory 449 self.query_service = self.service_factory.getQueryService()
450
451 - def find_images_for_plate(self, plate_id):
452 """ 453 Retrieves all the images associated with a given plate. Fetched 454 are the Image's WellSample, the WellSample's Well, the annotation 455 stack associated with the Image and each annotation's linked 456 original file. 457 """ 458 # The query that follows is doublely linked: 459 # * Image --> WellSample --> Well 460 # * Well --> WellSample --> Image 461 # This is to facilitate later "ordered" access of fields/well 462 # samples required by certain measurement contexts (notably InCell). 463 log.debug("Loading image...") 464 images = self.query_service.findAllByQuery( 465 'select img from Image as img ' \ 466 'join fetch img.wellSamples as ws ' \ 467 'join fetch ws.well as w ' \ 468 'join fetch w.wellSamples as ws2 ' \ 469 'join w.plate as p ' \ 470 'left outer join fetch img.annotationLinks as ia_links ' \ 471 'left outer join fetch ia_links.child as ia ' \ 472 'left outer join fetch ia.file as i_o_file ' \ 473 'where p.id = %d' % plate_id, None) 474 log.debug("Loading plate...") 475 plate = self.query_service.findByQuery( 476 'select p from Plate p ' \ 477 'left outer join fetch p.annotationLinks as pa_links ' \ 478 'left outer join fetch pa_links.child as pa ' \ 479 'left outer join fetch pa.file as p_o_file ' \ 480 'where p.id = %d' % plate_id, None) 481 log.debug("Linking plate and images...") 482 for image in images: 483 for ws in image.copyWellSamples(): 484 ws.well.plate = plate 485 return images
486
487 - def gather_original_files(self, obj, original_files, original_file_obj_map):
488 for annotation_link in obj.copyAnnotationLinks(): 489 annotation = annotation_link.child 490 if isinstance(annotation, FileAnnotationI): 491 f = annotation.file 492 original_files.add(f) 493 if original_file_obj_map is not None: 494 original_file_obj_map[f.id.val] = obj
495
496 - def get_analysis_ctx(self, plate_id):
497 """Retrieves a plate analysis context for a given plate.""" 498 # Using a set since 1) no one was using the image.id key and 2) 499 # we are now also collecting original files from plates (MIAS) 500 # for which there's no clear key. Since all the files are loaded 501 # in a single shot, double linking should not cause a problem. 502 plates = set() 503 original_files = set() 504 original_file_image_map = dict() 505 images = self.find_images_for_plate(plate_id) 506 for i, image in enumerate(images): 507 for ws in image.copyWellSamples(): 508 plate = ws.well.plate 509 if plate not in plates: 510 plates.add(plate) 511 self.gather_original_files(plate, original_files, None) 512 self.gather_original_files(image, original_files, original_file_image_map) 513 for klass in self.implementations: 514 if klass.is_this_type(original_files): 515 return klass(images, original_files, 516 original_file_image_map, 517 plate_id, self.service_factory) 518 raise MeasurementError( 519 "Unable to find suitable analysis context for plate: %d" % \ 520 plate_id)
521
522 -class MeasurementParsingResult(object):
523 """ 524 Holds the results of a measurement parsing event. 525 """
526 - def __init__(self, sets_of_columns=None):
527 if sets_of_columns is None: 528 self.sets_of_columns = list() 529 else: 530 self.sets_of_columns = sets_of_columns
531
532 - def append_columns(self, columns):
533 """Adds a set of columns to the parsing result.""" 534 self.sets_of_columns.append(columns)
535
536 -class AbstractMeasurementCtx(object):
537 """ 538 Abstract class which aggregates and represents all the results produced 539 from a given measurement run. It also provides a scaffold for interacting 540 with the OmeroTables infrastructure. 541 """ 542 543 # The number of ROI to have parsed before streaming them to the server 544 ROI_UPDATE_LIMIT = 1000 545
546 - def __init__(self, analysis_ctx, service_factory, original_file_provider, 547 original_file, result_files):
548 super(AbstractMeasurementCtx, self).__init__() 549 self.thread_pool = get_thread_pool() 550 self.analysis_ctx = analysis_ctx 551 self.service_factory = service_factory 552 self.original_file_provider = original_file_provider 553 self.query_service = self.service_factory.getQueryService() 554 self.update_service = self.service_factory.getUpdateService() 555 self.original_file = original_file 556 self.result_files = result_files 557 558 # Establish the rest of our initial state 559 self.wellimages = dict() 560 for image in self.analysis_ctx.images: 561 for well_sample in image.copyWellSamples(): 562 well = well_sample.well 563 idx = well.copyWellSamples().index(well_sample) 564 row = well.row.val 565 column = well.column.val 566 if row not in self.wellimages: 567 self.wellimages[row] = dict() 568 if column not in self.wellimages[row]: 569 self.wellimages[row][column] = [] 570 # Now we save the image at it's proper index 571 l = self.wellimages[row][column] 572 for x in range(idx - len(l) + 1): 573 l.append(None) 574 l[idx] = image
575
576 - def get_well_images(self, row, col):
577 """ 578 Takes a row and a col index and returns a tuple 579 of Well and image. Either might be None. Uses the 580 first image found to find the Well and therefore 581 must be loaded (image->wellSample->well) 582 """ 583 try: 584 images = self.wellimages[row][col] 585 if not images: 586 return (None, None) 587 image = images[0] 588 well = image.copyWellSamples()[0].well 589 return (well, images) 590 except KeyError: 591 # This has the potential to happen alot with the 592 # datasets we have given the split machine acquisition 593 # ".flex" file storage. 594 log.warn("WARNING: Missing data for row %d column %d" % \ 595 (row, col)) 596 return (None, None)
597
598 - def update_table(self, columns):
599 """Updates the OmeroTables instance backing our results.""" 600 # Create a new OMERO table to store our measurement results 601 sr = self.service_factory.sharedResources() 602 name = self.get_name() 603 self.table = sr.newTable(1, '/%s.r5' % name) 604 if self.table is None: 605 raise MeasurementError( 606 "Unable to create table: %s" % name) 607 608 # Retrieve the original file corresponding to the table for the 609 # measurement, link it to the file annotation representing the 610 # umbrella measurement run, link the annotation to the plate from 611 # which it belongs and save the file annotation. 612 table_original_file = self.table.getOriginalFile() 613 table_original_file_id = table_original_file.id.val 614 log.info("Created new table: %d" % table_original_file_id) 615 unloaded_o_file = OriginalFileI(table_original_file_id, False) 616 self.file_annotation.file = unloaded_o_file 617 unloaded_plate = PlateI(self.analysis_ctx.plate_id, False) 618 plate_annotation_link = PlateAnnotationLinkI() 619 plate_annotation_link.parent = unloaded_plate 620 plate_annotation_link.child = self.file_annotation 621 plate_annotation_link = \ 622 self.update_service.saveAndReturnObject(plate_annotation_link) 623 self.file_annotation = plate_annotation_link.child 624 625 t0 = int(time.time() * 1000) 626 self.table.initialize(columns) 627 log.debug("Table init took %sms" % (int(time.time() * 1000) - t0)) 628 t0 = int(time.time() * 1000) 629 column_report = dict() 630 for column in columns: 631 column_report[column.name] = len(column.values) 632 log.debug("Column report: %r" % column_report) 633 self.table.addData(columns) 634 self.table.close() 635 log.info("Table update took %sms" % (int(time.time() * 1000) - t0))
636
637 - def create_file_annotation(self, set_of_columns):
638 """ 639 Creates a file annotation to represent a set of columns from our 640 measurment. 641 """ 642 self.file_annotation = FileAnnotationI() 643 self.file_annotation.ns = \ 644 rstring('openmicroscopy.org/omero/measurement') 645 name = self.get_name(set_of_columns) 646 self.file_annotation.description = rstring(name)
647
648 - def update_rois(self, rois, batches, batch_no):
649 """ 650 Updates a set of ROI for a given batch updating the batches 651 dictionary with the saved IDs. 652 """ 653 log.debug("Saving %d ROI for batch %d" % (len(rois), batch_no)) 654 t0 = int(time.time() * 1000) 655 roi_ids = self.update_service.saveAndReturnIds(rois) 656 log.info("Batch %d ROI update took %sms" % \ 657 (batch_no, int(time.time() * 1000) - t0)) 658 batches[batch_no] = roi_ids
659
660 - def image_from_original_file(self, original_file):
661 """Returns the image from which an original file has originated.""" 662 m = self.analysis_ctx.original_file_image_map 663 return m[original_file.id.val]
664
665 - def parse_and_populate(self):
666 """ 667 Calls parse and populate, updating the OmeroTables instance backing 668 our results and the OMERO database itself. 669 """ 670 result = self.parse() 671 if result is None: 672 return 673 for i, columns in enumerate(result.sets_of_columns): 674 self.create_file_annotation(i) 675 self.parse_and_populate_roi(columns) 676 self.populate(columns)
677 678 ### 679 ### Abstract methods 680 ### 681
682 - def get_name(self, set_of_columns=None):
683 """Returns the name of the measurement, and a set of columns.""" 684 raise Exception("To be implemented by concrete implementations.")
685
686 - def parse(self):
687 """Parses result files, returning a MeasurementParsingResult.""" 688 raise Exception("To be implemented by concrete implementations.")
689
690 - def parse_and_populate_roi(self, columns):
691 """ 692 Parses and populates ROI from column data in the OMERO database. 693 """ 694 raise Exception("To be implemented by concrete implementations.")
695
696 - def populate(self, columns):
697 """ 698 Populates an OmeroTables instance backing our results and ROI 699 linkages. 700 """ 701 raise Exception("To be implemented by concrete implementations.")
702
703 -class MIASMeasurementCtx(AbstractMeasurementCtx):
704 """ 705 MIAS measurements are a set of tab delimited text files per well. Each 706 TSV file's content is prefixed by the analysis parameters. 707 """ 708 709 # The OmeroTable ImageColumn index 710 IMAGE_COL = 0 711 712 # The OmeroTable RoiColumn index 713 ROI_COL = 1 714 715 # Expected columns in NEO datasets 716 NEO_EXPECTED = ('Image', 'ROI', 'Label', 'Row', 'Col', 'Nucleus Area', 717 'Cell Diam.', 'Cell Type', 'Mean Nucleus Intens.') 718 719 # Expected columns in MNU datasets 720 MNU_EXPECTED = ('Image', 'ROI', 'row', 'col', 'type') 721
722 - def __init__(self, analysis_ctx, service_factory, original_file_provider, 723 original_file, result_files):
724 super(MIASMeasurementCtx, self).__init__( 725 analysis_ctx, service_factory, original_file_provider, 726 original_file, result_files)
727
728 - def get_empty_columns(self, n_columns):
729 """ 730 Retrieves a set of empty OmeroTables columns for the analysis results 731 prefixed by an ImageColumn and RoiColumn to handle these linked 732 object indexes. 733 """ 734 columns = [ImageColumn('Image', '', list()), 735 RoiColumn('ROI', '', list())] 736 for i in range(n_columns): 737 columns.append(DoubleColumn('', '', list())) 738 return columns
739 740 ### 741 ### Overriding abstract implementation 742 ### 743
744 - def image_from_original_file(self, original_file):
745 """ 746 Overriding the abstract implementation since the companion 747 files are no longer attached to the images, but only to the plate 748 for MIAS. Instead, we use the filename itself to find the image. 749 """ 750 name = original_file.name.val 751 # Copy: '^Well(\d+)_(.*)_detail_(\d+-\d+-\d+-\d+h\d+m\d+s).txt$' 752 match = MIASPlateAnalysisCtx.detail_regex.match(name) 753 if match: 754 well_num = int(match.group(1)) 755 return self.analysis_ctx.image_from_wellnumber(well_num) 756 else: 757 raise Exception("Not a detail file")
758 759 ### 760 ### Abstract method implementations 761 ### 762
763 - def get_name(self, set_of_columns=None):
764 return self.original_file.name.val[:-4]
765
766 - def parse(self):
767 columns = None 768 for result_file in self.result_files: 769 log.info("Parsing: %s" % result_file.name.val) 770 image = self.image_from_original_file(result_file) 771 provider = self.original_file_provider 772 data = provider.get_original_file_data(result_file) 773 try: 774 rows = list(csv.reader(data, delimiter='\t')) 775 finally: 776 data.close() 777 rows.reverse() 778 if columns is None: 779 columns = self.get_empty_columns(len(rows[0])) 780 for row in rows: 781 try: 782 for i, value in enumerate(row): 783 value = float(value) 784 columns[i + 2].values.append(value) 785 columns[self.IMAGE_COL].values.append(image.id.val) 786 except ValueError: 787 for i, value in enumerate(row): 788 columns[i + 2].name = value 789 break 790 log.debug("Returning %d columns" % len(columns)) 791 return MeasurementParsingResult([columns])
792
793 - def _parse_neo_roi(self, columns):
794 """Parses out ROI from OmeroTables columns for 'NEO' datasets.""" 795 log.debug("Parsing %s NEO ROIs..." % (len(columns[0].values))) 796 image_ids = columns[self.IMAGE_COL].values 797 rois = list() 798 # Save our file annotation to the database so we can use an unloaded 799 # annotation for the saveAndReturnIds that will be triggered below. 800 self.file_annotation = \ 801 self.update_service.saveAndReturnObject(self.file_annotation) 802 unloaded_file_annotation = \ 803 FileAnnotationI(self.file_annotation.id.val, False) 804 batch_no = 1 805 batches = dict() 806 for i, image_id in enumerate(image_ids): 807 unloaded_image = ImageI(image_id, False) 808 roi = RoiI() 809 shape = EllipseI() 810 values = columns[6].values 811 diameter = rdouble(float(values[i])) 812 shape.theZ = rint(0) 813 shape.theT = rint(0) 814 values = columns[4].values 815 shape.cx = rdouble(float(values[i])) 816 values = columns[3].values 817 shape.cy = rdouble(float(values[i])) 818 shape.rx = diameter 819 shape.ry = diameter 820 roi.addShape(shape) 821 roi.image = unloaded_image 822 roi.linkAnnotation(unloaded_file_annotation) 823 rois.append(roi) 824 if len(rois) == self.ROI_UPDATE_LIMIT: 825 self.thread_pool.add_task(self.update_rois, rois, batches, batch_no) 826 rois = list() 827 batch_no += 1 828 self.thread_pool.add_task(self.update_rois, rois, batches, batch_no) 829 self.thread_pool.wait_completion() 830 batch_keys = batches.keys() 831 batch_keys.sort() 832 for k in batch_keys: 833 columns[self.ROI_COL].values += batches[k]
834
835 - def _parse_mnu_roi(self, columns):
836 """Parses out ROI from OmeroTables columns for 'MNU' datasets.""" 837 log.debug("Parsing %s MNU ROIs..." % (len(columns[0].values))) 838 image_ids = columns[self.IMAGE_COL].values 839 rois = list() 840 # Save our file annotation to the database so we can use an unloaded 841 # annotation for the saveAndReturnIds that will be triggered below. 842 self.file_annotation = \ 843 self.update_service.saveAndReturnObject(self.file_annotation) 844 unloaded_file_annotation = \ 845 FileAnnotationI(self.file_annotation.id.val, False) 846 batch_no = 1 847 batches = dict() 848 for i, image_id in enumerate(image_ids): 849 unloaded_image = ImageI(image_id, False) 850 roi = RoiI() 851 shape = PointI() 852 shape.theZ = rint(0) 853 shape.theT = rint(0) 854 values = columns[3].values 855 shape.cx = rdouble(float(values[i])) 856 values = columns[2].values 857 shape.cy = rdouble(float(values[i])) 858 roi.addShape(shape) 859 roi.image = unloaded_image 860 roi.linkAnnotation(unloaded_file_annotation) 861 rois.append(roi) 862 if len(rois) == self.ROI_UPDATE_LIMIT: 863 self.thread_pool.add_task(self.update_rois, rois, batches, batch_no) 864 rois = list() 865 batch_no += 1 866 self.thread_pool.add_task(self.update_rois, rois, batches, batch_no) 867 self.thread_pool.wait_completion() 868 batch_keys = batches.keys() 869 batch_keys.sort() 870 for k in batch_keys: 871 columns[self.ROI_COL].values += batches[k]
872
873 - def parse_and_populate_roi(self, columns):
874 names = [column.name for column in columns] 875 neo = [name in self.NEO_EXPECTED for name in names] 876 mnu = [name in self.MNU_EXPECTED for name in names] 877 for name in names: 878 log.debug("Column: %s" % name) 879 if len(columns) == 9 and False not in neo: 880 self._parse_neo_roi(columns) 881 elif len(columns) == 5 and False not in mnu: 882 self._parse_mnu_roi(columns) 883 else: 884 log.warn("Unknown ROI type for MIAS dataset: %r" % names)
885
886 - def populate(self, columns):
887 """ 888 Query performed:: 889 first_roi = columns[self.ROI_COL].values[0] 890 first_roi = self.query_service.findByQuery( 891 'select roi from Roi as roi ' \ 892 'join fetch roi.annotationLinks as link ' \ 893 'join fetch link.child ' \ 894 'where roi.id = %d' % first_roi, None) 895 self.file_annotation = first_roi.copyAnnotationLinks()[0].child 896 """ 897 self.update_table(columns)
898
899 -class FlexMeasurementCtx(AbstractMeasurementCtx):
900 """ 901 Flex measurements are located deep within a ".res" XML file container 902 and contain no ROI. 903 """ 904 905 # The XPath to the <Area> which aggregate an acquisition 906 AREA_XPATH = './/Areas/Area' 907 908 # The XPath to the an analysis <Parameter>; will become a column header 909 # and is below AREA_XPATH 910 PARAMETER_XPATH = './/Wells/ResultParameters/Parameter' 911 912 # The XPath to a <Well> which has had at least one acquisition event 913 # within and is below AREA_XPATH 914 WELL_XPATH = './/Wells/Well' 915 916 # The XPath to a <Result> for a given well and is below WELL_XPATH 917 RESULT_XPATH = './/Result' 918
919 - def __init__(self, analysis_ctx, service_factory, original_file_provider, 920 original_file, result_files):
921 super(FlexMeasurementCtx, self).__init__( 922 analysis_ctx, service_factory, original_file_provider, 923 original_file, result_files)
924
925 - def get_empty_columns(self, headers):
926 """ 927 Retrieves a set of empty OmeroTables columns for the analysis results 928 prefixed by a WellColumn to handle linked object indexes. 929 """ 930 columns = {'Well': WellColumn('Well', '', list())} 931 for header in headers: 932 columns[header] = DoubleColumn(header, '', list()) 933 return columns
934 935 ### 936 ### Abstract method implementations 937 ### 938
939 - def get_name(self, set_of_columns=None):
940 return self.original_file.name.val[:-4]
941
942 - def parse(self):
943 log.info("Parsing: %s" % self.original_file.name.val) 944 provider = self.original_file_provider 945 data = provider.get_original_file_data(self.original_file) 946 try: 947 et = ElementTree(file=data) 948 finally: 949 data.close() 950 root = et.getroot() 951 areas = root.findall(self.AREA_XPATH) 952 log.debug("Area count: %d" % len(areas)) 953 for i, area in enumerate(areas): 954 result_parameters = area.findall(self.PARAMETER_XPATH) 955 log.debug("Area %d result children: %d" % \ 956 (i, len(result_parameters))) 957 if len(result_parameters) == 0: 958 log.warn("%s contains no analysis data." % self.get_name()) 959 return 960 headers = list() 961 for result_parameter in result_parameters: 962 headers.append(result_parameter.text) 963 columns = self.get_empty_columns(headers) 964 wells = area.findall(self.WELL_XPATH) 965 for well in wells: 966 # Rows and columns are 1-indexed, OMERO wells are 0-indexed 967 row = int(well.get('row')) - 1 968 column = int(well.get('col')) - 1 969 try: 970 v = columns['Well'].values 971 wellobj, images = self.get_well_images(row, column) 972 if not wellobj: 973 continue 974 v.append(wellobj.id.val) 975 except: 976 log.exception("ERROR: Failed to get well images") 977 continue 978 results = well.findall(self.RESULT_XPATH) 979 for result in results: 980 name = result.get('name') 981 columns[name].values.append(float(result.text)) 982 return MeasurementParsingResult([columns.values()])
983
984 - def parse_and_populate_roi(self, columns):
985 pass
986
987 - def populate(self, columns):
988 self.update_table(columns)
989
990 -class InCellMeasurementCtx(AbstractMeasurementCtx):
991 """ 992 InCell Analyzer measurements are located deep within an XML file container. 993 """ 994 995 # Cells expected centre of gravity columns 996 CELLS_CG_EXPECTED = ['Cell: cgX', 'Cell: cgY'] 997 998 # Nulcei expected centre of gravity columns 999 NUCLEI_CG_EXPECTED = ['Nucleus: cgX', 'Nucleus: cgY'] 1000 1001 # Expected source attribute value for cell data 1002 CELLS_SOURCE = 'Cells' 1003 1004 # Expected source attribute value for nuclei data 1005 NUCLEI_SOURCE = 'Nuclei' 1006 1007 # Expected source attribute value for organelle data 1008 ORGANELLES_SOURCE = 'Organelles' 1009
1010 - def __init__(self, analysis_ctx, service_factory, original_file_provider, 1011 original_file, result_files):
1012 super(InCellMeasurementCtx, self).__init__( 1013 analysis_ctx, service_factory, original_file_provider, 1014 original_file, result_files)
1015
1016 - def check_sparse_data(self, columns):
1017 """ 1018 Checks a set of columns for sparse data (one column shorter than 1019 the rest) and adds -1 where appropriate. 1020 """ 1021 length = None 1022 for i, column in enumerate(columns): 1023 if column.name == 'ROI': 1024 # ROI are processed late so we don't care if this column 1025 # is sparse or not. 1026 continue 1027 current_length = len(column.values) 1028 if length is not None: 1029 if current_length > length: 1030 log.debug("%s length %d > %d modding previous column" % \ 1031 (column.name, current_length, length)) 1032 columns[i - 1].values.append(-1.0) 1033 if current_length < length: 1034 log.debug("%s length %d < %d modding current column" % \ 1035 (column.name, current_length, length)) 1036 column.values.append(-1.0) 1037 length = len(column.values)
1038 1039 ### 1040 ### Abstract method implementations 1041 ### 1042
1043 - def get_name(self, set_of_columns=None):
1044 if set_of_columns is None: 1045 return self.original_file.name.val[:-4] 1046 if set_of_columns == 0: 1047 return self.original_file.name.val[:-4] + ' Cells' 1048 if set_of_columns == 1: 1049 return self.original_file.name.val[:-4] + ' Nuclei' 1050 if set_of_columns == 2: 1051 return self.original_file.name.val[:-4] + ' Organelles'
1052
1053 - def parse(self):
1054 log.info("Parsing: %s" % self.original_file.name.val) 1055 provider = self.original_file_provider 1056 data = provider.get_original_file_data(self.original_file) 1057 try: 1058 events = ('start', 'end') 1059 well_data = None 1060 n_roi = 0 1061 n_measurements = 0 1062 cells_columns = {'Image': ImageColumn('Image', '', list()), 1063 'Cell': LongColumn('Cell', '', list()), 1064 'ROI': RoiColumn('ROI', '', list()) 1065 } 1066 organelles_columns = {'Image': ImageColumn('Image', '', list()), 1067 'Cell': LongColumn('Cell', '', list()), 1068 } 1069 nuclei_columns = {'Image': ImageColumn('Image', '', list()), 1070 'Cell': LongColumn('Cell', '', list()), 1071 'ROI': RoiColumn('ROI', '', list()) 1072 } 1073 for event, element in iterparse(data, events=events): 1074 if event == 'start' and element.tag == 'WellData' \ 1075 and element.get('cell') != 'Summary': 1076 row = int(element.get('row')) - 1 1077 col = int(element.get('col')) - 1 1078 i = int(element.get('field')) - 1 1079 try: 1080 well, images = self.get_well_images(row, col) 1081 if not images: 1082 continue 1083 image = images[i] 1084 except: 1085 log.exception("ERROR: Failed to get well images") 1086 continue 1087 self.check_sparse_data(cells_columns.values()) 1088 self.check_sparse_data(nuclei_columns.values()) 1089 self.check_sparse_data(organelles_columns.values()) 1090 cell = long(element.get('cell')) 1091 cells_columns['Cell'].values.append(cell) 1092 nuclei_columns['Cell'].values.append(cell) 1093 organelles_columns['Cell'].values.append(cell) 1094 well_data = element 1095 cells_columns['Image'].values.append(image.id.val) 1096 nuclei_columns['Image'].values.append(image.id.val) 1097 organelles_columns['Image'].values.append(image.id.val) 1098 elif well_data is not None and event == 'start' \ 1099 and element.tag == 'Measure': 1100 source = element.get('source') 1101 key = element.get('key') 1102 value = float(element.get('value')) 1103 if source == self.CELLS_SOURCE: 1104 columns_list = [cells_columns] 1105 elif source == self.NUCLEI_SOURCE: 1106 columns_list = [nuclei_columns] 1107 elif source == self.ORGANELLES_SOURCE: 1108 columns_list = [organelles_columns] 1109 else: 1110 columns_list = [cells_columns, nuclei_columns, 1111 organelles_columns] 1112 for columns in columns_list: 1113 if key not in columns: 1114 columns[key] = DoubleColumn(key, '', list()) 1115 columns[key].values.append(value) 1116 n_measurements += 1 1117 elif event == 'end' and element.tag == 'WellData': 1118 if well_data is not None: 1119 n_roi += 1 1120 well_data.clear() 1121 well_data = None 1122 else: 1123 element.clear() 1124 # Final row sparseness check 1125 self.check_sparse_data(cells_columns.values()) 1126 self.check_sparse_data(nuclei_columns.values()) 1127 self.check_sparse_data(organelles_columns.values()) 1128 log.info("Total ROI: %d" % n_roi) 1129 log.info("Total measurements: %d" % n_measurements) 1130 sets_of_columns = [cells_columns.values(), nuclei_columns.values(), 1131 organelles_columns.values()] 1132 return MeasurementParsingResult(sets_of_columns) 1133 finally: 1134 data.close()
1135
1136 - def parse_and_populate_roi(self, columns_as_list):
1137 # First sanity check our provided columns 1138 names = [column.name for column in columns_as_list] 1139 log.debug('Parsing columns: %r' % names) 1140 cells_expected = [name in names for name in self.CELLS_CG_EXPECTED] 1141 nuclei_expected = [name in names for name in self.NUCLEI_CG_EXPECTED] 1142 if (False in cells_expected) and (False in nuclei_expected): 1143 log.warn("Missing CGs for InCell dataset: %r" % names) 1144 log.warn('Removing resultant empty ROI column.') 1145 for column in columns_as_list: 1146 if RoiColumn == column.__class__: 1147 columns_as_list.remove(column) 1148 return 1149 # Reconstruct a column name to column map 1150 columns = dict() 1151 for column in columns_as_list: 1152 columns[column.name] = column 1153 image_ids = columns['Image'].values 1154 rois = list() 1155 # Save our file annotation to the database so we can use an unloaded 1156 # annotation for the saveAndReturnIds that will be triggered below. 1157 self.file_annotation = \ 1158 self.update_service.saveAndReturnObject(self.file_annotation) 1159 unloaded_file_annotation = \ 1160 FileAnnotationI(self.file_annotation.id.val, False) 1161 # Parse and append ROI 1162 batch_no = 1 1163 batches = dict() 1164 for i, image_id in enumerate(image_ids): 1165 unloaded_image = ImageI(image_id, False) 1166 if False in nuclei_expected: 1167 # Cell centre of gravity 1168 roi = RoiI() 1169 shape = PointI() 1170 shape.theZ = rint(0) 1171 shape.theT = rint(0) 1172 shape.cx = rdouble(float(columns['Cell: cgX'].values[i])) 1173 shape.cy = rdouble(float(columns['Cell: cgY'].values[i])) 1174 roi.addShape(shape) 1175 roi.image = unloaded_image 1176 roi.linkAnnotation(unloaded_file_annotation) 1177 rois.append(roi) 1178 elif False in cells_expected: 1179 # Nucleus centre of gravity 1180 roi = RoiI() 1181 shape = PointI() 1182 shape.theZ = rint(0) 1183 shape.theT = rint(0) 1184 shape.cx = rdouble(float(columns['Nucleus: cgX'].values[i])) 1185 shape.cy = rdouble(float(columns['Nucleus: cgY'].values[i])) 1186 roi.addShape(shape) 1187 roi.image = unloaded_image 1188 roi.linkAnnotation(unloaded_file_annotation) 1189 rois.append(roi) 1190 else: 1191 raise MeasurementError('Not a nucleus or cell ROI') 1192 if len(rois) == self.ROI_UPDATE_LIMIT: 1193 self.thread_pool.add_task(self.update_rois, rois, batches, batch_no) 1194 rois = list() 1195 batch_no += 1 1196 self.thread_pool.add_task(self.update_rois, rois, batches, batch_no) 1197 self.thread_pool.wait_completion() 1198 batch_keys = batches.keys() 1199 batch_keys.sort() 1200 for k in batch_keys: 1201 columns['ROI'].values += batches[k]
1202
1203 - def populate(self, columns):
1204 self.update_table(columns)
1205 1206 if __name__ == "__main__": 1207 try: 1208 options, args = getopt(sys.argv[1:], "s:p:u:m:k:t:id") 1209 except GetoptError, (msg, opt): 1210 usage(msg) 1211 1212 try: 1213 plate_id, = args 1214 plate_id = long(plate_id) 1215 except ValueError: 1216 usage("Plate ID must be a specified and a number!") 1217 1218 username = None 1219 hostname = None 1220 port = 4064 # SSL 1221 measurement = None 1222 info = False 1223 session_key = None 1224 logging_level = logging.INFO 1225 thread_count = 1 1226 for option, argument in options: 1227 if option == "-u": 1228 username = argument 1229 if option == "-s": 1230 hostname = argument 1231 if option == "-p": 1232 port = int(argument) 1233 if option == "-m": 1234 measurement = int(argument) 1235 if option == "-i": 1236 info = True 1237 if option == "-k": 1238 session_key = argument 1239 if option == "-d": 1240 logging_level = logging.DEBUG 1241 if option == "-t": 1242 thread_count = int(argument) 1243 if session_key is None and username is None: 1244 usage("Username must be specified!") 1245 if session_key is None and hostname is None: 1246 usage("Host name must be specified!") 1247 if session_key is None: 1248 password = getpass() 1249 1250 logging.basicConfig(level = logging_level) 1251 c = client(hostname, port) 1252 c.setAgent("OMERO.populate_roi") 1253 c.enableKeepAlive(60) 1254 try: 1255 if session_key is not None: 1256 service_factory = c.joinSession(session_key) 1257 else: 1258 service_factory = c.createSession(username, password) 1259 1260 log.debug('Creating pool of %d threads' % thread_count) 1261 thread_pool = ThreadPool(thread_count) 1262 factory = PlateAnalysisCtxFactory(service_factory) 1263 analysis_ctx = factory.get_analysis_ctx(plate_id) 1264 n_measurements = analysis_ctx.get_measurement_count() 1265 if measurement is not None and measurement >= n_measurements: 1266 usage("measurement %d not a valid index!") 1267 if info: 1268 for i in range(n_measurements): 1269 n_result_files = analysis_ctx.get_result_file_count(i) 1270 print "Measurement %d has %d result files." % \ 1271 (i, n_result_files) 1272 sys.exit(0) 1273 if measurement is not None: 1274 measurement_ctx = analysis_ctx.get_measurement_ctx(measurement) 1275 measurement_ctx.parse_and_populate() 1276 else: 1277 for i in range(n_measurements): 1278 measurement_ctx = analysis_ctx.get_measurement_ctx(i) 1279 measurement_ctx.parse_and_populate() 1280 finally: 1281 c.closeSession() 1282