Package omero :: Module processor
[hide private]
[frames] | no frames]

Source Code for Module omero.processor

  1  #!/usr/bin/env python 
  2  # -*- coding: utf-8 -*- 
  3  # 
  4  # OMERO Grid Processor 
  5  # Copyright 2008 Glencoe Software, Inc.  All Rights Reserved. 
  6  # Use is subject to license terms supplied in LICENSE.txt 
  7  # 
  8   
  9  import os 
 10  import time 
 11  import signal 
 12  import logging 
 13  import traceback 
 14  import killableprocess as subprocess 
 15   
 16  from path import path 
 17   
 18  import Ice 
 19  import omero 
 20  import omero.clients 
 21  import omero.scripts 
 22  import omero.util 
 23  import omero.util.concurrency 
 24   
 25  import omero_ext.uuid as uuid # see ticket:3774 
 26   
 27  from omero.util.temp_files import create_path, remove_path 
 28  from omero.util.decorators import remoted, perf, locked 
 29  from omero.rtypes import * 
 30  from omero.util.decorators import remoted, perf, wraps 
 31   
 32  sys = __import__("sys") 
33 34 35 -def with_context(func, context):
36 """ Decorator for invoking Ice methods with a context """ 37 def handler(*args, **kwargs): 38 args = list(args) 39 args.append(context) 40 return func(*args, **kwargs)
41 handler = wraps(func)(handler) 42 return handler 43
44 -class WithGroup(object):
45 """ 46 Wraps a ServiceInterfacePrx instance and applies 47 a "omero.group" to the passed context on every 48 invocation. 49 50 For example, using a job handle as root requires logging 51 manually into the group. (ticket:2044) 52 """ 53
54 - def __init__(self, service, group_id):
55 self._service = service 56 self._group_id = str(group_id)
57
58 - def _get_ctx(self, group = None):
59 ctx = self._service.ice_getCommunicator().getImplicitContext().getContext() 60 ctx = dict(ctx) 61 ctx["omero.group"] = group 62 return ctx
63
64 - def __getattr__(self, name):
65 if name.startswith("_"): 66 return self.__dict__[name] 67 elif hasattr(self._service, name): 68 method = getattr(self._service, name) 69 ctx = self._get_ctx(self._group_id) 70 return with_context(method, ctx) 71 raise AttributeError("'%s' object has no attribute '%s'" % (self.service, name))
72
73 -class ProcessI(omero.grid.Process, omero.util.SimpleServant):
74 """ 75 Wrapper around a subprocess.Popen instance. Returned by ProcessorI 76 when a job is submitted. This implementation uses the given 77 interpreter to call a file that must be named "script" in the 78 generated temporary directory. 79 80 Call is equivalent to: 81 82 cd TMP_DIR 83 ICE_CONFIG=./config interpreter ./script >out 2>err & 84 85 The properties argument is used to generate the ./config file. 86 87 The params argument may be null in which case this process 88 is being used solely to calculate the parameters for the script 89 ("omero.scripts.parse=true") 90 91 If iskill is True, then on cleanup, this process will reap the 92 attached session completely. 93 """ 94
95 - def __init__(self, ctx, interpreter, properties, params, iskill = False,\ 96 Popen = subprocess.Popen, callback_cast = omero.grid.ProcessCallbackPrx.uncheckedCast,\ 97 omero_home = path.getcwd()):
98 """ 99 Popen and callback_Cast are primarily for testing. 100 """ 101 omero.util.SimpleServant.__init__(self, ctx) 102 self.omero_home = omero_home #: Location for OMERO_HOME/lib/python 103 self.interpreter = interpreter #: Executable which will be used on the script 104 self.properties = properties #: Properties used to create an Ice.Config 105 self.params = params #: JobParams for this script. Possibly None if a ParseJob 106 self.iskill = iskill #: Whether or not, cleanup should kill the session 107 self.Popen = Popen #: Function which should be used for creating processes 108 self.callback_cast = callback_cast #: Function used to cast all ProcessCallback proxies 109 # Non arguments (mutable state) 110 self.rcode = None #: return code from popen 111 self.callbacks = {} #: dictionary from id strings to callback proxies 112 self.popen = None #: process. if None, then this instance isn't alive. 113 self.pid = None #: pid of the process. Once set, isn't nulled. 114 self.started = None #: time the process started 115 self.stopped = None #: time of deactivation 116 self.final_status = None #: status which will be sent on set_job_status 117 # Non arguments (immutable state) 118 self.uuid = properties["omero.user"] #: session this instance is tied to 119 120 # More fields set by these methods 121 self.make_files() 122 self.make_env() 123 self.make_config() 124 self.logger.info("Created %s in %s" % (self.uuid, self.dir))
125 126 # 127 # Initialization methods 128 # 129
130 - def make_env(self):
131 self.env = omero.util.Environment("PATH", "PYTHONPATH",\ 132 "DYLD_LIBRARY_PATH", "LD_LIBRARY_PATH", "MLABRAW_CMD_STR", "HOME",\ 133 "DISPLAY") 134 # WORKAROUND 135 # Currently duplicating the logic here as in the PYTHONPATH 136 # setting of the grid application descriptor (see etc/grid/*.xml) 137 # This should actually be taken care of in the descriptor itself 138 # by having setting PYTHONPATH to an absolute value. This is 139 # not currently possible with IceGrid (without using icepatch -- 140 # see 39.17.2 "node.datadir). 141 self.env.append("PYTHONPATH", str(self.omero_home / "lib" / "python")) 142 self.env.set("ICE_CONFIG", str(self.config_path))
143
144 - def make_files(self):
145 self.dir = create_path("process", ".dir", folder = True) 146 self.script_path = self.dir / "script" 147 self.config_path = self.dir / "config" 148 self.stdout_path = self.dir / "out" 149 self.stderr_path = self.dir / "err"
150
151 - def make_config(self):
152 """ 153 Creates the ICE_CONFIG file used by the client. 154 """ 155 config_file = open(str(self.config_path), "w") 156 try: 157 for key in self.properties.iterkeys(): 158 config_file.write("%s=%s\n"%(key, self.properties[key])) 159 finally: 160 config_file.close()
161
162 - def tmp_client(self):
163 """ 164 Create a client for performing cleanup operations. 165 This client should be closed as soon as possible 166 by the process 167 """ 168 try: 169 client = omero.client(["--Ice.Config=%s" % str(self.config_path)]) 170 client.setAgent("OMERO.process") 171 client.createSession().detachOnDestroy() 172 self.logger.debug("client: %s" % client.sf) 173 return client 174 except: 175 self.logger.error("Failed to create client for %s" % self.uuid) 176 return None
177 178 # 179 # Activation / Deactivation 180 # 181 182 @locked
183 - def activate(self):
184 """ 185 Process creation has to wait until all external downloads, etc 186 are finished. 187 """ 188 189 if self.isActive(): 190 raise omero.ApiUsageException(None, None, "Already activated") 191 192 self.stdout = open(str(self.stdout_path), "w") 193 self.stderr = open(str(self.stderr_path), "w") 194 self.popen = self.Popen([self.interpreter, "./script"], cwd=str(self.dir), env=self.env(), stdout=self.stdout, stderr=self.stderr) 195 self.pid = self.popen.pid 196 self.started = time.time() 197 self.stopped = None 198 self.status("Activated")
199 200 @locked
201 - def deactivate(self):
202 """ 203 Cleans up the temporary directory used by the process, and terminates 204 the Popen process if running. 205 """ 206 207 if not self.isActive(): 208 raise omero.ApiUsageException(None, None, "Not active") 209 210 if self.stopped: 211 # Prevent recursion since we are reusing kill & cancel 212 return 213 214 self.stopped = time.time() 215 d_start = time.time() 216 self.status("Deactivating") 217 218 # None of these should throw, but just in case 219 try: 220 221 self.shutdown() # Calls cancel & kill which recall this method! 222 self.popen = None # Now we are finished 223 224 client = self.tmp_client() 225 try: 226 self.set_job_status(client) 227 self.cleanup_output() 228 self.upload_output(client) # Important! 229 self.cleanup_tmpdir() 230 finally: 231 if client: 232 client.__del__() # Safe closeSession 233 234 except Exception: 235 self.logger.error("FAILED TO CLEANUP pid=%s (%s)", self.pid, self.uuid, exc_info = True) 236 237 d_stop = time.time() 238 elapsed = int(self.stopped - self.started) 239 d_elapsed = int(d_stop - d_start) 240 self.status("Lived %ss. Deactivation took %ss." % (elapsed, d_elapsed))
241 242 @locked
243 - def isActive(self):
244 """ 245 Tests only if this instance has a non-None popen attribute. After activation 246 this method will return True until the popen itself returns a non-None 247 value (self.rcode) at which time it will be nulled and this method will again 248 return False 249 """ 250 return self.popen is not None
251 252 @locked
253 - def wasActivated(self):
254 """ 255 Returns true only if this instance has either a non-null 256 popen or a non-null rcode field. 257 """ 258 return self.popen is not None or self.rcode is not None
259 260 @locked
261 - def isRunning(self):
262 return self.popen is not None and self.rcode is None
263 264 @locked
265 - def isFinished(self):
266 return self.rcode is not None
267 268 @locked
269 - def alreadyDone(self):
270 """ 271 Allows short-cutting various checks if we already 272 have a rcode for this popen. A non-None return value 273 implies that a process was started and returned 274 the given non-None value itself. 275 """ 276 if not self.wasActivated: 277 raise omero.InternalException(None, None, "Process never activated") 278 return self.isFinished()
279 280 # 281 # Cleanup methods 282 # 283
284 - def __del__(self):
285 self.cleanup()
286 287 @perf 288 @locked
289 - def check(self):
290 """ 291 Called periodically to keep the session alive. Returns 292 False if this resource can be cleaned up. (Resources API) 293 """ 294 295 if not self.wasActivated(): 296 return True # This should only happen on startup, so ignore 297 298 try: 299 self.poll() 300 self.ctx.getSession().getSessionService().getSession(self.uuid) 301 return True 302 except: 303 self.status("Keep alive failed") 304 return False
305 306 @perf 307 @locked
308 - def cleanup(self):
309 """ 310 Deactivates the process (if active) and cleanups the server 311 connection. (Resources API) 312 """ 313 314 if self.isRunning(): 315 self.deactivate() 316 317 if not self.iskill: 318 return 319 320 try: 321 sf = self.ctx.getSession(recreate = False) 322 except: 323 self.logger.debug("Can't get session for cleanup") 324 return 325 326 self.status("Killing session") 327 svc = sf.getSessionService() 328 obj = omero.model.SessionI() 329 obj.uuid = omero.rtypes.rstring(self.uuid) 330 try: 331 while svc.closeSession(obj) > 0: 332 pass 333 # No action to be taken when iskill == False if 334 # we don't have an actual client to worry with. 335 except: 336 self.logger.error("Error on session cleanup, kill=%s" % self.iskill, exc_info = True)
337
338 - def cleanup_output(self):
339 """ 340 Flush and close the stderr and stdout streams. 341 """ 342 try: 343 if hasattr(self, "stderr"): 344 self.stderr.flush() 345 self.stderr.close() 346 except: 347 self.logger.error("cleanup of sterr failed", exc_info = True) 348 try: 349 if hasattr(self, "stdout"): 350 self.stdout.flush() 351 self.stdout.close() 352 except: 353 self.logger.error("cleanup of sterr failed", exc_info = True)
354
355 - def set_job_status(self, client):
356 """ 357 Sets the job status 358 """ 359 if not client: 360 self.logger.error("No client: Cannot set job status for pid=%s (%s)", self.pid, self.uuid) 361 return 362 363 gid = client.sf.getAdminService().getEventContext().groupId 364 handle = WithGroup(client.sf.createJobHandle(), gid) 365 try: 366 status = self.final_status 367 if status is None: 368 status = ( self.rcode == 0 and "Finished" or "Error" ) 369 handle.attach(long(self.properties["omero.job"])) 370 oldStatus = handle.setStatus(status) 371 self.status("Changed job status from %s to %s" % (oldStatus, status)) 372 finally: 373 handle.close()
374
375 - def upload_output(self, client):
376 """ 377 If this is not a params calculation (i.e. parms != null) and the 378 stdout or stderr are non-null, they they will be uploaded and 379 attached to the job. 380 """ 381 if not client: 382 self.logger.error("No client: Cannot upload output for pid=%s (%s)", self.pid, self.uuid) 383 return 384 385 if self.params: 386 out_format = self.params.stdoutFormat 387 err_format = self.params.stderrFormat 388 else: 389 out_format = "text/plain" 390 err_format = out_format 391 392 self._upload(client, self.stdout_path, "stdout", out_format) 393 self._upload(client, self.stderr_path, "stderr", err_format)
394
395 - def _upload(self, client, filename, name, format):
396 397 if not format: 398 return 399 400 filename = str(filename) # Might be path.path 401 sz = os.path.getsize(filename) 402 if not sz: 403 self.status("No %s" % name) 404 return 405 406 try: 407 ofile = client.upload(filename, name=name, type=format) 408 jobid = long(client.getProperty("omero.job")) 409 link = omero.model.JobOriginalFileLinkI() 410 if self.params is None: 411 link.parent = omero.model.ParseJobI(rlong(jobid), False) 412 else: 413 link.parent = omero.model.ScriptJobI(rlong(jobid), False) 414 link.child = ofile.proxy() 415 client.getSession().getUpdateService().saveObject(link) 416 self.status("Uploaded %s bytes of %s to %s" % (sz, filename, ofile.id.val)) 417 except: 418 self.logger.error("Error on upload of %s for pid=%s (%s)", filename, self.pid, self.uuid, exc_info = True)
419
420 - def cleanup_tmpdir(self):
421 """ 422 Remove all known files and finally the temporary directory. 423 If other files exist, an exception will be raised. 424 """ 425 try: 426 remove_path(self.dir) 427 except: 428 self.logger.error("Failed to remove dir %s" % self.dir, exc_info = True)
429 430 # 431 # popen methods 432 # 433
434 - def status(self, msg = ""):
435 if self.isRunning(): 436 self.rcode = self.popen.poll() 437 self.logger.info("%s : %s", self, msg)
438 439 @perf 440 @remoted
441 - def poll(self, current = None):
442 """ 443 Checks popen.poll() (if active) and notifies all callbacks 444 if necessary. If this method returns a non-None value, then 445 the process will be marked inactive. 446 """ 447 448 if self.alreadyDone(): 449 return rint(self.rcode) 450 451 self.status("Polling") 452 if self.rcode is None: 453 # Haven't finished yet, so do nothing. 454 return None 455 else: 456 self.deactivate() 457 rv = rint(self.rcode) 458 self.allcallbacks("processFinished", self.rcode) 459 return rv
460 461 @perf 462 @remoted
463 - def wait(self, current = None):
464 """ 465 Waits on popen.wait() to return (if active) and notifies 466 all callbacks. Marks this process as inactive. 467 """ 468 469 if self.alreadyDone(): 470 return self.rcode 471 472 self.status("Waiting") 473 self.rcode = self.popen.wait() 474 self.deactivate() 475 self.allcallbacks("processFinished", self.rcode) 476 return self.rcode
477
478 - def _term(self):
479 """ 480 Attempts to cancel the process by sending SIGTERM 481 (or similar) 482 """ 483 try: 484 self.status("os.kill(TERM)") 485 os.kill(self.popen.pid, signal.SIGTERM) 486 except AttributeError: 487 self.logger.debug("No os.kill(TERM). Skipping cancel")
488
489 - def _send(self, iskill):
490 """ 491 Helper method for sending signals. This method only 492 makes a call is the process is active. 493 """ 494 if self.isRunning(): 495 try: 496 if self.popen.poll() is None: 497 if iskill: 498 self.status("popen.kill(True)") 499 self.popen.kill(True) 500 else: 501 self._term() 502 503 else: 504 self.status("Skipped signal") 505 except OSError, oserr: 506 self.logger.debug("err on pid=%s iskill=%s : %s", self.popen.pid, iskill, oserr)
507 508 @perf 509 @remoted
510 - def cancel(self, current = None):
511 """ 512 Tries to cancel popen (if active) and notifies callbacks. 513 """ 514 515 if self.alreadyDone(): 516 return True 517 518 self.final_status = "Cancelled" 519 self._send(iskill=False) 520 finished = self.isFinished() 521 if finished: 522 self.deactivate() 523 self.allcallbacks("processCancelled", finished) 524 return finished
525 526 @perf 527 @remoted
528 - def kill(self, current = None):
529 530 if self.alreadyDone(): 531 return True 532 533 self.final_status = "Cancelled" 534 self._send(iskill=True) 535 finished = self.isFinished() 536 if finished: 537 self.deactivate() 538 self.allcallbacks("processKilled", finished) 539 return finished
540 541 @perf 542 @remoted
543 - def shutdown(self, current = None):
544 """ 545 If self.popen is active, then first call cancel, wait a period of 546 time, and finally call kill. 547 """ 548 549 if self.alreadyDone(): 550 return 551 552 self.status("Shutdown") 553 try: 554 for i in range(5, 0, -1): 555 if self.cancel(): 556 break 557 else: 558 self.logger.warning("Shutdown: %s (%s). Killing in %s seconds.", self.pid, self.uuid, 6*(i-1)+1) 559 self.stop_event.wait(6) 560 self.kill() 561 except: 562 self.logger.error("Shutdown failed: %s (%s)", self.pid, self.uuid, exc_info = True)
563 564 # 565 # Callbacks 566 # 567 568 @remoted 569 @locked
570 - def registerCallback(self, callback, current = None):
571 try: 572 id = callback.ice_getIdentity() 573 key = "%s/%s" % (id.category, id.name) 574 callback = callback.ice_oneway() 575 callback = self.callback_cast(callback) 576 if not callback: 577 e = "Callback is invalid" 578 else: 579 self.callbacks[key] = callback 580 self.logger.debug("Added callback: %s", key) 581 return 582 except Exception, ex: 583 e = ex 584 # Only reached on failure 585 msg = "Failed to add callback: %s. Reason: %s" % (callback, e) 586 self.logger.debug(msg) 587 raise omero.ApiUsageException(None, None, msg)
588 589 @remoted 590 @locked
591 - def unregisterCallback(self, callback, current = None):
592 try: 593 id = callback.ice_getIdentity() 594 key = "%s/%s" % (id.category, id.name) 595 if not key in self.callback: 596 raise omero.ApiUsageException(None, None, "No callback registered with id: %s" % key) 597 del self.callbacks[key] 598 self.logger.debug("Removed callback: %s", key) 599 except Exception, e: 600 msg = "Failed to remove callback: %s. Reason: %s" % (callback, e) 601 self.logger.debug(msg) 602 raise omero.ApiUsageException(None, None, msg)
603 604 @locked
605 - def allcallbacks(self, method, arg):
606 self.status("Callback %s" % method) 607 for key, cb in self.callbacks.items(): 608 try: 609 m = getattr(cb, method) 610 m(arg) 611 except Ice.LocalException, e: 612 self.logger.debug("LocalException calling callback %s on pid=%s (%s)" % (key, self.pid, self.uuid), exc_info = False) 613 except: 614 self.logger.error("Error calling callback %s on pid=%s (%s)" % (key, self.pid, self.uuid), exc_info = True)
615
616 - def __str__(self):
617 return "<proc:%s,rc=%s,uuid=%s>" % (self.pid, (self.rcode is None and "-" or self.rcode), self.uuid)
618
619 -class UseSessionHolder(object):
620
621 - def __init__(self, sf):
622 self.sf = sf
623
624 - def check(self):
625 try: 626 self.sf.keepAlive(None) 627 return True 628 except: 629 return False
630
631 - def cleanup(self):
632 pass
633
634 -class ProcessorI(omero.grid.Processor, omero.util.Servant):
635
636 - def __init__(self, ctx, needs_session = True, 637 use_session = None, accepts_list = None, cfg = None, 638 omero_home = path.getcwd(), category = None):
639 640 if accepts_list is None: accepts_list = [] 641 642 self.category = category #: Category to be used w/ ProcessI 643 self.omero_home = omero_home 644 645 # Extensions for user-mode processors (ticket:1672) 646 647 self.use_session = use_session 648 """ 649 If set, this session will be returned from internal_session and 650 the "needs_session" setting ignored. 651 """ 652 653 if self.use_session: 654 needs_session = False 655 656 self.accepts_list = accepts_list 657 """ 658 A list of contexts which will be accepted by this user-mode 659 processor. 660 """ 661 662 omero.util.Servant.__init__(self, ctx, needs_session = needs_session) 663 if cfg is None: 664 self.cfg = os.path.join(omero_home, "etc", "ice.config") 665 self.cfg = os.path.abspath(self.cfg) 666 else: 667 self.cfg = cfg 668 669 # Keep this session alive until the processor is finished 670 self.resources.add( UseSessionHolder(use_session) )
671
672 - def setProxy(self, prx):
673 """ 674 Overrides the default action in order to register this proxy 675 with the session's sharedResources to register for callbacks. 676 The on_newsession handler will also keep new sessions informed. 677 678 See ticket:2304 679 """ 680 omero.util.Servant.setProxy(self, prx) 681 session = self.internal_session() 682 self.register_session(session) 683 684 # Keep other session informed 685 self.ctx.on_newsession = self.register_session
686
687 - def user_client(self, agent):
688 """ 689 Creates an omero.client instance for use by 690 users. 691 """ 692 args = ["--Ice.Config=%s" % (self.cfg)] 693 rtr = self.internal_session().ice_getRouter() 694 if rtr: 695 args.insert(0, "--Ice.Default.Router=%s" % rtr) # FIXME : How do we find an internal router? 696 client = omero.client(args) 697 client.setAgent(agent) 698 return client
699
700 - def internal_session(self):
701 """ 702 Returns the session which should be used for lookups by this instance. 703 Some methods will create a session based on the session parameter. 704 In these cases, the session will belong to the user who is running a 705 script. 706 """ 707 if self.use_session: 708 return self.use_session 709 else: 710 return self.ctx.getSession()
711
712 - def register_session(self, session):
713 self.logger.info("Registering processor %s", self.prx) 714 prx = omero.grid.ProcessorPrx.uncheckedCast(self.prx) 715 session.sharedResources().addProcessor(prx)
716
717 - def lookup(self, job):
718 sf = self.internal_session() 719 gid = job.details.group.id.val 720 handle = WithGroup(sf.createJobHandle(), gid) 721 try: 722 handle.attach(job.id.val) 723 if handle.jobFinished(): 724 handle.close() 725 raise omero.ApiUsageException("Job already finished.") 726 727 prx = WithGroup(sf.getScriptService(), gid) 728 file = prx.validateScript(job, self.accepts_list) 729 730 except omero.SecurityViolation, sv: 731 self.logger.debug("SecurityViolation on validate job %s from group %s", job.id.val, gid) 732 file = None 733 734 return file, handle
735 736 @remoted
737 - def willAccept(self, userContext, groupContext, scriptContext, cb, current = None):
738 739 userID = None 740 if userContext != None: 741 userID = userContext.id.val 742 743 groupID = None 744 if groupContext != None: 745 groupID = groupContext.id.val 746 747 scriptID = None 748 if scriptContext != None: 749 scriptID = scriptContext.id.val 750 751 if scriptID: 752 try: 753 file, handle = self.lookup(scriptContext) 754 handle.close() 755 valid = (file is not None) 756 except: 757 self.logger.error("File lookup failed: user=%s, group=%s, script=%s",\ 758 userID, groupID, scriptID, exc_info=1) 759 return # EARlY EXIT ! 760 else: 761 valid = False 762 for x in self.accepts_list: 763 if isinstance(x, omero.model.Experimenter) and x.id.val == userID: 764 valid = True 765 elif isinstance(x, omero.model.ExperimenterGroup) and x.id.val == groupID: 766 valid = True 767 768 self.logger.debug("Accepts called on: user:%s group:%s scriptjob:%s - Valid: %s", 769 userID, groupID, scriptID, valid) 770 771 try: 772 id = self.internal_session().ice_getIdentity().name 773 cb = cb.ice_oneway() 774 cb = omero.grid.ProcessorCallbackPrx.uncheckedCast(cb) 775 prx = omero.grid.ProcessorPrx.uncheckedCast(self.prx) 776 cb.isProxyAccepted(valid, id, prx) 777 except Exception, e: 778 self.logger.warn("callback failed on willAccept: %s Exception:%s", cb, e) 779 780 return valid
781 782 @remoted
783 - def requestRunning(self, cb, current = None):
784 785 try: 786 cb = cb.ice_oneway() 787 cb = omero.grid.ProcessorCallbackPrx.uncheckedCast(cb) 788 servants = list(self.ctx.servant_map.values()) 789 rv = [] 790 791 for x in servants: 792 try: 793 rv.append(long(x.properties["omero.job"])) 794 except: 795 pass 796 cb.responseRunning(rv) 797 except Exception, e: 798 self.logger.warn("callback failed on requestRunning: %s Exception:%s", cb, e)
799 800 801 @remoted
802 - def parseJob(self, session, job, current = None):
803 self.logger.info("parseJob: Session = %s, JobId = %s" % (session, job.id.val)) 804 client = self.user_client("OMERO.parseJob") 805 806 try: 807 iskill = False 808 client.joinSession(session).detachOnDestroy() 809 properties = {} 810 properties["omero.scripts.parse"] = "true" 811 prx, process = self.process(client, session, job, current, None, properties, iskill) 812 process.wait() 813 rv = client.getOutput("omero.scripts.parse") 814 if rv != None: 815 return rv.val 816 else: 817 self.logger.warning("No output found for omero.scripts.parse. Keys: %s" % client.getOutputKeys()) 818 return None 819 finally: 820 client.closeSession() 821 del client
822 823 @remoted
824 - def processJob(self, session, params, job, current = None):
825 """ 826 """ 827 self.logger.info("processJob: Session = %s, JobId = %s" % (session, job.id.val)) 828 client = self.user_client("OMERO.processJob") 829 try: 830 client.joinSession(session).detachOnDestroy() 831 prx, process = self.process(client, session, job, current, params, iskill = True) 832 return prx 833 finally: 834 client.closeSession() 835 del client
836 837 838 @perf
839 - def process(self, client, session, job, current, params, properties = None, iskill = True):
840 """ 841 session: session uuid, used primarily if client is None 842 client: an omero.client object which should be attached to a session 843 """ 844 845 if properties is None: properties = {} 846 847 if not session or not job or not job.id: 848 raise omero.ApiUsageException("No null arguments") 849 850 file, handle = self.lookup(job) 851 852 try: 853 if not file: 854 raise omero.ApiUsageException(\ 855 None, None, "Job should have one executable file attached.") 856 857 sf = self.internal_session() 858 if params: 859 self.logger.debug("Checking params for job %s" % job.id.val) 860 svc = sf.getSessionService() 861 inputs = svc.getInputs(session) 862 errors = omero.scripts.validate_inputs(params, inputs, svc, session) 863 if errors: 864 errors = "Invalid parameters:\n%s" % errors 865 raise omero.ValidationException(None, None, errors) 866 867 properties["omero.job"] = str(job.id.val) 868 properties["omero.user"] = session 869 properties["omero.pass"] = session 870 properties["Ice.Default.Router"] = client.getProperty("Ice.Default.Router") 871 872 process = ProcessI(self.ctx, sys.executable, properties, params, iskill, omero_home = self.omero_home) 873 self.resources.add(process) 874 875 # client.download(file, str(process.script_path)) 876 scriptText = sf.getScriptService().getScriptText(file.id.val) 877 process.script_path.write_bytes(scriptText) 878 879 self.logger.info("Downloaded file: %s" % file.id.val) 880 s = client.sha1(str(process.script_path)) 881 if not s == file.sha1.val: 882 msg = "Sha1s don't match! expected %s, found %s" % (file.sha1.val, s) 883 self.logger.error(msg) 884 process.cleanup() 885 raise omero.InternalException(None, None, msg) 886 else: 887 process.activate() 888 handle.setStatus("Running") 889 890 id = None 891 if self.category: 892 id = Ice.Identity() 893 id.name = "Process-%s" % uuid.uuid4() 894 id.category = self.category 895 prx = self.ctx.add_servant(current, process, ice_identity=id) 896 return omero.grid.ProcessPrx.uncheckedCast(prx), process 897 898 finally: 899 handle.close()
900
901 -def usermode_processor(client, serverid = "UsermodeProcessor",\ 902 cfg = None, accepts_list = None, stop_event = None,\ 903 omero_home = path.getcwd()):
904 """ 905 Creates and activates a usermode processor for the given client. 906 It is the responsibility of the client to call "cleanup()" on 907 the ProcessorI implementation which is returned. 908 909 cfg is the path to an --Ice.Config-valid file or files. If none 910 is given, the value of ICE_CONFIG will be taken from the environment 911 if available. Otherwise, all properties will be taken from the client 912 instance. 913 914 accepts_list is the list of IObject instances which will be passed to 915 omero.api.IScripts.validateScript. If none is given, only the current 916 Experimenter's own object will be passed. 917 918 stop_event is an threading.Event. One will be acquired from 919 omero.util.concurrency.get_event if none is provided. 920 """ 921 922 if cfg is None: 923 cfg = os.environ.get("ICE_CONFIG") 924 925 if accepts_list is None: 926 uid = client.sf.getAdminService().getEventContext().userId 927 accepts_list = [omero.model.ExperimenterI(uid, False)] 928 929 if stop_event is None: 930 stop_event = omero.util.concurrency.get_event(name="UsermodeProcessor") 931 932 id = Ice.Identity() 933 id.name = "%s-%s" % (serverid, uuid.uuid4()) 934 id.category = client.getCategory() 935 936 ctx = omero.util.ServerContext(serverid, client.ic, stop_event) 937 impl = omero.processor.ProcessorI(ctx, 938 use_session=client.sf, accepts_list=accepts_list, cfg=cfg, 939 omero_home = omero_home, category=id.category) 940 ctx.add_servant(client.adapter, impl, ice_identity=id) 941 return impl
942