1
2
3
4
5
6
7
8
9 import os
10 import time
11 import signal
12 import logging
13 import traceback
14 import killableprocess as subprocess
15
16 from path import path
17
18 import Ice
19 import omero
20 import omero.clients
21 import omero.scripts
22 import omero.util
23 import omero.util.concurrency
24
25 import omero_ext.uuid as uuid
26
27 from omero.util.temp_files import create_path, remove_path
28 from omero.util.decorators import remoted, perf, locked
29 from omero.rtypes import *
30 from omero.util.decorators import remoted, perf, wraps
31
32 sys = __import__("sys")
33
34
35 -def with_context(func, context):
36 """ Decorator for invoking Ice methods with a context """
37 def handler(*args, **kwargs):
38 args = list(args)
39 args.append(context)
40 return func(*args, **kwargs)
41 handler = wraps(func)(handler)
42 return handler
43
45 """
46 Wraps a ServiceInterfacePrx instance and applies
47 a "omero.group" to the passed context on every
48 invocation.
49
50 For example, using a job handle as root requires logging
51 manually into the group. (ticket:2044)
52 """
53
55 self._service = service
56 self._group_id = str(group_id)
57
59 ctx = self._service.ice_getCommunicator().getImplicitContext().getContext()
60 ctx = dict(ctx)
61 ctx["omero.group"] = group
62 return ctx
63
65 if name.startswith("_"):
66 return self.__dict__[name]
67 elif hasattr(self._service, name):
68 method = getattr(self._service, name)
69 ctx = self._get_ctx(self._group_id)
70 return with_context(method, ctx)
71 raise AttributeError("'%s' object has no attribute '%s'" % (self.service, name))
72
73 -class ProcessI(omero.grid.Process, omero.util.SimpleServant):
74 """
75 Wrapper around a subprocess.Popen instance. Returned by ProcessorI
76 when a job is submitted. This implementation uses the given
77 interpreter to call a file that must be named "script" in the
78 generated temporary directory.
79
80 Call is equivalent to:
81
82 cd TMP_DIR
83 ICE_CONFIG=./config interpreter ./script >out 2>err &
84
85 The properties argument is used to generate the ./config file.
86
87 The params argument may be null in which case this process
88 is being used solely to calculate the parameters for the script
89 ("omero.scripts.parse=true")
90
91 If iskill is True, then on cleanup, this process will reap the
92 attached session completely.
93 """
94
95 - def __init__(self, ctx, interpreter, properties, params, iskill = False,\
96 Popen = subprocess.Popen, callback_cast = omero.grid.ProcessCallbackPrx.uncheckedCast,\
97 omero_home = path.getcwd()):
98 """
99 Popen and callback_Cast are primarily for testing.
100 """
101 omero.util.SimpleServant.__init__(self, ctx)
102 self.omero_home = omero_home
103 self.interpreter = interpreter
104 self.properties = properties
105 self.params = params
106 self.iskill = iskill
107 self.Popen = Popen
108 self.callback_cast = callback_cast
109
110 self.rcode = None
111 self.callbacks = {}
112 self.popen = None
113 self.pid = None
114 self.started = None
115 self.stopped = None
116 self.final_status = None
117
118 self.uuid = properties["omero.user"]
119
120
121 self.make_files()
122 self.make_env()
123 self.make_config()
124 self.logger.info("Created %s in %s" % (self.uuid, self.dir))
125
126
127
128
129
131 self.env = omero.util.Environment("PATH", "PYTHONPATH",\
132 "DYLD_LIBRARY_PATH", "LD_LIBRARY_PATH", "MLABRAW_CMD_STR", "HOME",\
133 "DISPLAY")
134
135
136
137
138
139
140
141 self.env.append("PYTHONPATH", str(self.omero_home / "lib" / "python"))
142 self.env.set("ICE_CONFIG", str(self.config_path))
143
145 self.dir = create_path("process", ".dir", folder = True)
146 self.script_path = self.dir / "script"
147 self.config_path = self.dir / "config"
148 self.stdout_path = self.dir / "out"
149 self.stderr_path = self.dir / "err"
150
152 """
153 Creates the ICE_CONFIG file used by the client.
154 """
155 config_file = open(str(self.config_path), "w")
156 try:
157 for key in self.properties.iterkeys():
158 config_file.write("%s=%s\n"%(key, self.properties[key]))
159 finally:
160 config_file.close()
161
163 """
164 Create a client for performing cleanup operations.
165 This client should be closed as soon as possible
166 by the process
167 """
168 try:
169 client = omero.client(["--Ice.Config=%s" % str(self.config_path)])
170 client.setAgent("OMERO.process")
171 client.createSession().detachOnDestroy()
172 self.logger.debug("client: %s" % client.sf)
173 return client
174 except:
175 self.logger.error("Failed to create client for %s" % self.uuid)
176 return None
177
178
179
180
181
182 @locked
184 """
185 Process creation has to wait until all external downloads, etc
186 are finished.
187 """
188
189 if self.isActive():
190 raise omero.ApiUsageException(None, None, "Already activated")
191
192 self.stdout = open(str(self.stdout_path), "w")
193 self.stderr = open(str(self.stderr_path), "w")
194 self.popen = self.Popen([self.interpreter, "./script"], cwd=str(self.dir), env=self.env(), stdout=self.stdout, stderr=self.stderr)
195 self.pid = self.popen.pid
196 self.started = time.time()
197 self.stopped = None
198 self.status("Activated")
199
200 @locked
202 """
203 Cleans up the temporary directory used by the process, and terminates
204 the Popen process if running.
205 """
206
207 if not self.isActive():
208 raise omero.ApiUsageException(None, None, "Not active")
209
210 if self.stopped:
211
212 return
213
214 self.stopped = time.time()
215 d_start = time.time()
216 self.status("Deactivating")
217
218
219 try:
220
221 self.shutdown()
222 self.popen = None
223
224 client = self.tmp_client()
225 try:
226 self.set_job_status(client)
227 self.cleanup_output()
228 self.upload_output(client)
229 self.cleanup_tmpdir()
230 finally:
231 if client:
232 client.__del__()
233
234 except Exception:
235 self.logger.error("FAILED TO CLEANUP pid=%s (%s)", self.pid, self.uuid, exc_info = True)
236
237 d_stop = time.time()
238 elapsed = int(self.stopped - self.started)
239 d_elapsed = int(d_stop - d_start)
240 self.status("Lived %ss. Deactivation took %ss." % (elapsed, d_elapsed))
241
242 @locked
244 """
245 Tests only if this instance has a non-None popen attribute. After activation
246 this method will return True until the popen itself returns a non-None
247 value (self.rcode) at which time it will be nulled and this method will again
248 return False
249 """
250 return self.popen is not None
251
252 @locked
254 """
255 Returns true only if this instance has either a non-null
256 popen or a non-null rcode field.
257 """
258 return self.popen is not None or self.rcode is not None
259
260 @locked
262 return self.popen is not None and self.rcode is None
263
264 @locked
266 return self.rcode is not None
267
268 @locked
270 """
271 Allows short-cutting various checks if we already
272 have a rcode for this popen. A non-None return value
273 implies that a process was started and returned
274 the given non-None value itself.
275 """
276 if not self.wasActivated:
277 raise omero.InternalException(None, None, "Process never activated")
278 return self.isFinished()
279
280
281
282
283
286
287 @perf
288 @locked
290 """
291 Called periodically to keep the session alive. Returns
292 False if this resource can be cleaned up. (Resources API)
293 """
294
295 if not self.wasActivated():
296 return True
297
298 try:
299 self.poll()
300 self.ctx.getSession().getSessionService().getSession(self.uuid)
301 return True
302 except:
303 self.status("Keep alive failed")
304 return False
305
306 @perf
307 @locked
309 """
310 Deactivates the process (if active) and cleanups the server
311 connection. (Resources API)
312 """
313
314 if self.isRunning():
315 self.deactivate()
316
317 if not self.iskill:
318 return
319
320 try:
321 sf = self.ctx.getSession(recreate = False)
322 except:
323 self.logger.debug("Can't get session for cleanup")
324 return
325
326 self.status("Killing session")
327 svc = sf.getSessionService()
328 obj = omero.model.SessionI()
329 obj.uuid = omero.rtypes.rstring(self.uuid)
330 try:
331 while svc.closeSession(obj) > 0:
332 pass
333
334
335 except:
336 self.logger.error("Error on session cleanup, kill=%s" % self.iskill, exc_info = True)
337
339 """
340 Flush and close the stderr and stdout streams.
341 """
342 try:
343 if hasattr(self, "stderr"):
344 self.stderr.flush()
345 self.stderr.close()
346 except:
347 self.logger.error("cleanup of sterr failed", exc_info = True)
348 try:
349 if hasattr(self, "stdout"):
350 self.stdout.flush()
351 self.stdout.close()
352 except:
353 self.logger.error("cleanup of sterr failed", exc_info = True)
354
374
376 """
377 If this is not a params calculation (i.e. parms != null) and the
378 stdout or stderr are non-null, they they will be uploaded and
379 attached to the job.
380 """
381 if not client:
382 self.logger.error("No client: Cannot upload output for pid=%s (%s)", self.pid, self.uuid)
383 return
384
385 if self.params:
386 out_format = self.params.stdoutFormat
387 err_format = self.params.stderrFormat
388 else:
389 out_format = "text/plain"
390 err_format = out_format
391
392 self._upload(client, self.stdout_path, "stdout", out_format)
393 self._upload(client, self.stderr_path, "stderr", err_format)
394
395 - def _upload(self, client, filename, name, format):
396
397 if not format:
398 return
399
400 filename = str(filename)
401 sz = os.path.getsize(filename)
402 if not sz:
403 self.status("No %s" % name)
404 return
405
406 try:
407 ofile = client.upload(filename, name=name, type=format)
408 jobid = long(client.getProperty("omero.job"))
409 link = omero.model.JobOriginalFileLinkI()
410 if self.params is None:
411 link.parent = omero.model.ParseJobI(rlong(jobid), False)
412 else:
413 link.parent = omero.model.ScriptJobI(rlong(jobid), False)
414 link.child = ofile.proxy()
415 client.getSession().getUpdateService().saveObject(link)
416 self.status("Uploaded %s bytes of %s to %s" % (sz, filename, ofile.id.val))
417 except:
418 self.logger.error("Error on upload of %s for pid=%s (%s)", filename, self.pid, self.uuid, exc_info = True)
419
421 """
422 Remove all known files and finally the temporary directory.
423 If other files exist, an exception will be raised.
424 """
425 try:
426 remove_path(self.dir)
427 except:
428 self.logger.error("Failed to remove dir %s" % self.dir, exc_info = True)
429
430
431
432
433
438
439 @perf
440 @remoted
441 - def poll(self, current = None):
442 """
443 Checks popen.poll() (if active) and notifies all callbacks
444 if necessary. If this method returns a non-None value, then
445 the process will be marked inactive.
446 """
447
448 if self.alreadyDone():
449 return rint(self.rcode)
450
451 self.status("Polling")
452 if self.rcode is None:
453
454 return None
455 else:
456 self.deactivate()
457 rv = rint(self.rcode)
458 self.allcallbacks("processFinished", self.rcode)
459 return rv
460
461 @perf
462 @remoted
463 - def wait(self, current = None):
464 """
465 Waits on popen.wait() to return (if active) and notifies
466 all callbacks. Marks this process as inactive.
467 """
468
469 if self.alreadyDone():
470 return self.rcode
471
472 self.status("Waiting")
473 self.rcode = self.popen.wait()
474 self.deactivate()
475 self.allcallbacks("processFinished", self.rcode)
476 return self.rcode
477
479 """
480 Attempts to cancel the process by sending SIGTERM
481 (or similar)
482 """
483 try:
484 self.status("os.kill(TERM)")
485 os.kill(self.popen.pid, signal.SIGTERM)
486 except AttributeError:
487 self.logger.debug("No os.kill(TERM). Skipping cancel")
488
489 - def _send(self, iskill):
490 """
491 Helper method for sending signals. This method only
492 makes a call is the process is active.
493 """
494 if self.isRunning():
495 try:
496 if self.popen.poll() is None:
497 if iskill:
498 self.status("popen.kill(True)")
499 self.popen.kill(True)
500 else:
501 self._term()
502
503 else:
504 self.status("Skipped signal")
505 except OSError, oserr:
506 self.logger.debug("err on pid=%s iskill=%s : %s", self.popen.pid, iskill, oserr)
507
508 @perf
509 @remoted
510 - def cancel(self, current = None):
525
526 @perf
527 @remoted
528 - def kill(self, current = None):
540
541 @perf
542 @remoted
544 """
545 If self.popen is active, then first call cancel, wait a period of
546 time, and finally call kill.
547 """
548
549 if self.alreadyDone():
550 return
551
552 self.status("Shutdown")
553 try:
554 for i in range(5, 0, -1):
555 if self.cancel():
556 break
557 else:
558 self.logger.warning("Shutdown: %s (%s). Killing in %s seconds.", self.pid, self.uuid, 6*(i-1)+1)
559 self.stop_event.wait(6)
560 self.kill()
561 except:
562 self.logger.error("Shutdown failed: %s (%s)", self.pid, self.uuid, exc_info = True)
563
564
565
566
567
568 @remoted
569 @locked
571 try:
572 id = callback.ice_getIdentity()
573 key = "%s/%s" % (id.category, id.name)
574 callback = callback.ice_oneway()
575 callback = self.callback_cast(callback)
576 if not callback:
577 e = "Callback is invalid"
578 else:
579 self.callbacks[key] = callback
580 self.logger.debug("Added callback: %s", key)
581 return
582 except Exception, ex:
583 e = ex
584
585 msg = "Failed to add callback: %s. Reason: %s" % (callback, e)
586 self.logger.debug(msg)
587 raise omero.ApiUsageException(None, None, msg)
588
589 @remoted
590 @locked
592 try:
593 id = callback.ice_getIdentity()
594 key = "%s/%s" % (id.category, id.name)
595 if not key in self.callback:
596 raise omero.ApiUsageException(None, None, "No callback registered with id: %s" % key)
597 del self.callbacks[key]
598 self.logger.debug("Removed callback: %s", key)
599 except Exception, e:
600 msg = "Failed to remove callback: %s. Reason: %s" % (callback, e)
601 self.logger.debug(msg)
602 raise omero.ApiUsageException(None, None, msg)
603
604 @locked
606 self.status("Callback %s" % method)
607 for key, cb in self.callbacks.items():
608 try:
609 m = getattr(cb, method)
610 m(arg)
611 except Ice.LocalException, e:
612 self.logger.debug("LocalException calling callback %s on pid=%s (%s)" % (key, self.pid, self.uuid), exc_info = False)
613 except:
614 self.logger.error("Error calling callback %s on pid=%s (%s)" % (key, self.pid, self.uuid), exc_info = True)
615
617 return "<proc:%s,rc=%s,uuid=%s>" % (self.pid, (self.rcode is None and "-" or self.rcode), self.uuid)
618
620
623
625 try:
626 self.sf.keepAlive(None)
627 return True
628 except:
629 return False
630
633
634 -class ProcessorI(omero.grid.Processor, omero.util.Servant):
635
636 - def __init__(self, ctx, needs_session = True,
637 use_session = None, accepts_list = None, cfg = None,
638 omero_home = path.getcwd(), category = None):
639
640 if accepts_list is None: accepts_list = []
641
642 self.category = category
643 self.omero_home = omero_home
644
645
646
647 self.use_session = use_session
648 """
649 If set, this session will be returned from internal_session and
650 the "needs_session" setting ignored.
651 """
652
653 if self.use_session:
654 needs_session = False
655
656 self.accepts_list = accepts_list
657 """
658 A list of contexts which will be accepted by this user-mode
659 processor.
660 """
661
662 omero.util.Servant.__init__(self, ctx, needs_session = needs_session)
663 if cfg is None:
664 self.cfg = os.path.join(omero_home, "etc", "ice.config")
665 self.cfg = os.path.abspath(self.cfg)
666 else:
667 self.cfg = cfg
668
669
670 self.resources.add( UseSessionHolder(use_session) )
671
686
688 """
689 Creates an omero.client instance for use by
690 users.
691 """
692 args = ["--Ice.Config=%s" % (self.cfg)]
693 rtr = self.internal_session().ice_getRouter()
694 if rtr:
695 args.insert(0, "--Ice.Default.Router=%s" % rtr)
696 client = omero.client(args)
697 client.setAgent(agent)
698 return client
699
701 """
702 Returns the session which should be used for lookups by this instance.
703 Some methods will create a session based on the session parameter.
704 In these cases, the session will belong to the user who is running a
705 script.
706 """
707 if self.use_session:
708 return self.use_session
709 else:
710 return self.ctx.getSession()
711
713 self.logger.info("Registering processor %s", self.prx)
714 prx = omero.grid.ProcessorPrx.uncheckedCast(self.prx)
715 session.sharedResources().addProcessor(prx)
716
735
736 @remoted
737 - def willAccept(self, userContext, groupContext, scriptContext, cb, current = None):
738
739 userID = None
740 if userContext != None:
741 userID = userContext.id.val
742
743 groupID = None
744 if groupContext != None:
745 groupID = groupContext.id.val
746
747 scriptID = None
748 if scriptContext != None:
749 scriptID = scriptContext.id.val
750
751 if scriptID:
752 try:
753 file, handle = self.lookup(scriptContext)
754 handle.close()
755 valid = (file is not None)
756 except:
757 self.logger.error("File lookup failed: user=%s, group=%s, script=%s",\
758 userID, groupID, scriptID, exc_info=1)
759 return
760 else:
761 valid = False
762 for x in self.accepts_list:
763 if isinstance(x, omero.model.Experimenter) and x.id.val == userID:
764 valid = True
765 elif isinstance(x, omero.model.ExperimenterGroup) and x.id.val == groupID:
766 valid = True
767
768 self.logger.debug("Accepts called on: user:%s group:%s scriptjob:%s - Valid: %s",
769 userID, groupID, scriptID, valid)
770
771 try:
772 id = self.internal_session().ice_getIdentity().name
773 cb = cb.ice_oneway()
774 cb = omero.grid.ProcessorCallbackPrx.uncheckedCast(cb)
775 prx = omero.grid.ProcessorPrx.uncheckedCast(self.prx)
776 cb.isProxyAccepted(valid, id, prx)
777 except Exception, e:
778 self.logger.warn("callback failed on willAccept: %s Exception:%s", cb, e)
779
780 return valid
781
782 @remoted
784
785 try:
786 cb = cb.ice_oneway()
787 cb = omero.grid.ProcessorCallbackPrx.uncheckedCast(cb)
788 servants = list(self.ctx.servant_map.values())
789 rv = []
790
791 for x in servants:
792 try:
793 rv.append(long(x.properties["omero.job"]))
794 except:
795 pass
796 cb.responseRunning(rv)
797 except Exception, e:
798 self.logger.warn("callback failed on requestRunning: %s Exception:%s", cb, e)
799
800
801 @remoted
802 - def parseJob(self, session, job, current = None):
822
823 @remoted
824 - def processJob(self, session, params, job, current = None):
836
837
838 @perf
839 - def process(self, client, session, job, current, params, properties = None, iskill = True):
840 """
841 session: session uuid, used primarily if client is None
842 client: an omero.client object which should be attached to a session
843 """
844
845 if properties is None: properties = {}
846
847 if not session or not job or not job.id:
848 raise omero.ApiUsageException("No null arguments")
849
850 file, handle = self.lookup(job)
851
852 try:
853 if not file:
854 raise omero.ApiUsageException(\
855 None, None, "Job should have one executable file attached.")
856
857 sf = self.internal_session()
858 if params:
859 self.logger.debug("Checking params for job %s" % job.id.val)
860 svc = sf.getSessionService()
861 inputs = svc.getInputs(session)
862 errors = omero.scripts.validate_inputs(params, inputs, svc, session)
863 if errors:
864 errors = "Invalid parameters:\n%s" % errors
865 raise omero.ValidationException(None, None, errors)
866
867 properties["omero.job"] = str(job.id.val)
868 properties["omero.user"] = session
869 properties["omero.pass"] = session
870 properties["Ice.Default.Router"] = client.getProperty("Ice.Default.Router")
871
872 process = ProcessI(self.ctx, sys.executable, properties, params, iskill, omero_home = self.omero_home)
873 self.resources.add(process)
874
875
876 scriptText = sf.getScriptService().getScriptText(file.id.val)
877 process.script_path.write_bytes(scriptText)
878
879 self.logger.info("Downloaded file: %s" % file.id.val)
880 s = client.sha1(str(process.script_path))
881 if not s == file.sha1.val:
882 msg = "Sha1s don't match! expected %s, found %s" % (file.sha1.val, s)
883 self.logger.error(msg)
884 process.cleanup()
885 raise omero.InternalException(None, None, msg)
886 else:
887 process.activate()
888 handle.setStatus("Running")
889
890 id = None
891 if self.category:
892 id = Ice.Identity()
893 id.name = "Process-%s" % uuid.uuid4()
894 id.category = self.category
895 prx = self.ctx.add_servant(current, process, ice_identity=id)
896 return omero.grid.ProcessPrx.uncheckedCast(prx), process
897
898 finally:
899 handle.close()
900
901 -def usermode_processor(client, serverid = "UsermodeProcessor",\
902 cfg = None, accepts_list = None, stop_event = None,\
903 omero_home = path.getcwd()):
904 """
905 Creates and activates a usermode processor for the given client.
906 It is the responsibility of the client to call "cleanup()" on
907 the ProcessorI implementation which is returned.
908
909 cfg is the path to an --Ice.Config-valid file or files. If none
910 is given, the value of ICE_CONFIG will be taken from the environment
911 if available. Otherwise, all properties will be taken from the client
912 instance.
913
914 accepts_list is the list of IObject instances which will be passed to
915 omero.api.IScripts.validateScript. If none is given, only the current
916 Experimenter's own object will be passed.
917
918 stop_event is an threading.Event. One will be acquired from
919 omero.util.concurrency.get_event if none is provided.
920 """
921
922 if cfg is None:
923 cfg = os.environ.get("ICE_CONFIG")
924
925 if accepts_list is None:
926 uid = client.sf.getAdminService().getEventContext().userId
927 accepts_list = [omero.model.ExperimenterI(uid, False)]
928
929 if stop_event is None:
930 stop_event = omero.util.concurrency.get_event(name="UsermodeProcessor")
931
932 id = Ice.Identity()
933 id.name = "%s-%s" % (serverid, uuid.uuid4())
934 id.category = client.getCategory()
935
936 ctx = omero.util.ServerContext(serverid, client.ic, stop_event)
937 impl = omero.processor.ProcessorI(ctx,
938 use_session=client.sf, accepts_list=accepts_list, cfg=cfg,
939 omero_home = omero_home, category=id.category)
940 ctx.add_servant(client.adapter, impl, ice_identity=id)
941 return impl
942