1
2
3
4
5
6
7
8
9
10 import os
11 import sys
12 import Ice
13 import path
14 import time
15 import omero
16 import IcePy
17 import IceGrid
18 import logging
19 import platform
20 import Glacier2
21 import threading
22 import logging.handlers
23 import omero.util.concurrency
24 import omero_ext.uuid as uuid
25 import omero.ObjectFactoryRegistrar as ofr
26
27 from omero.util.decorators import locked
28
29 LOGDIR = os.path.join("var","log")
30 LOGFORMAT = """%(asctime)s %(levelname)-5.5s [%(name)40s] (%(threadName)-10s) %(message)s"""
31 LOGLEVEL = logging.INFO
32 LOGSIZE = 500000000
33 LOGNUM = 9
34 LOGMODE = "a"
35
36 orig_stdout = sys.stdout
37 orig_stderr = sys.stderr
40 """
41 Generates a logname from the given instance using the module and name from its class
42 """
43 log_name = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
44 return log_name
45
69
86
88 """
89 Since all server components should exclusively using the logging module
90 any output to stdout or stderr is caught and logged at "WARN". This is
91 useful, especially in the case of Windows, where stdout/stderr is eaten.
92 """
93
95 self.logger = logger
96 self.internal = logging.getLogger("StreamRedirect")
97 self.softspace = False
98
101
106
109
111 """
112 Centralized logic for declaring and logging a service
113 dependency on a non-shipped library. This is called
114 lazily from the run method of the application to give
115 logging time to be initialized.
116
117 See #4566
118 """
119
122
124 """
125 Get version method which returns a string
126 representing. Should be overwritten by
127 subclasses for packages/modules with no
128 __version__ field.
129 """
130 return target.__version__
131
132 - def check(self, logger):
133 try:
134 target = __import__(self.key)
135 version = self.get_version(target)
136 logger.info("Loaded dependency %s (%s)" % (self.key, version))
137 return True
138 except ImportError:
139 logger.error("Failed to load: '%s'" % self.key)
140 return False
141
142 -def internal_service_factory(communicator, user="root", group=None, retries=6, interval=10, client_uuid=None, stop_event = None):
143 """
144 Try to return a ServiceFactory from the grid.
145
146 Try a number of times then give up and raise the
147 last exception returned. This method will only
148 work internally to the grid, i.e. behind the Glacier2
149 firewall. It is intended for internal servers to
150 be able to create sessions for accessing the database. ::
151 communicator := Ice.Communicator used to find the registry
152 user := Username which should have a session created
153 group := Group into which the session should be logged
154 retries := Number of session creation retries before throwing
155 interval := Seconds between retries
156 client_uuid := Uuid of the client which should be used
157 """
158 log = logging.getLogger("omero.utils")
159 if stop_event == None:
160 stop_event = omero.util.concurrency.get_event(name="internal_service_factory")
161
162 tryCount = 0
163 excpt = None
164 query = communicator.stringToProxy("IceGrid/Query")
165 query = IceGrid.QueryPrx.checkedCast(query)
166
167 implicit_ctx = communicator.getImplicitContext()
168 implicit_ctx.put(omero.constants.AGENT, "Python service")
169 if client_uuid is not None:
170 implicit_ctx.put(omero.constants.CLIENTUUID, client_uuid)
171 else:
172 if not implicit_ctx.containsKey(omero.constants.CLIENTUUID):
173 client_uuid = str(uuid.uuid4())
174 implicit_ctx.put(omero.constants.CLIENTUUID, client_uuid)
175
176 while tryCount < retries:
177 if stop_event.isSet():
178 return None
179 try:
180 blitz = query.findAllObjectsByType("::Glacier2::SessionManager")[0]
181 blitz = Glacier2.SessionManagerPrx.checkedCast(blitz)
182 sf = blitz.create(user, None)
183
184 return omero.api.ServiceFactoryPrx.checkedCast(sf)
185 except Exception, e:
186 tryCount += 1
187 log.info("Failed to get session on attempt %s", str(tryCount))
188 excpt = e
189 stop_event.wait(interval)
190
191 log.warn("Reason: %s", str(excpt))
192 if excpt:
193 raise excpt
194
196 """
197 """
198 reg = communicator.stringToProxy("IceGrid/Registry")
199 reg = IceGrid.RegistryPrx.checkedCast(reg)
200 adm = reg.createAdminSession('null', '')
201 return adm
202
204 """
205 """
206 sid = communicator.identityToString(obj.ice_getIdentity())
207 adm = create_admin_session(communicator)
208 prx = adm.getAdmin()
209 try:
210 try:
211 prx.addObject(obj)
212 except IceGrid.ObjectExistsException:
213 prx.updateObject(obj)
214 finally:
215 adm.destroy()
216
218 """
219 Converts a long to a path such that for all directiories only
220 a 1000 files and a 1000 subdirectories will be returned.
221
222 This method duplicates the logic in
223 ome.io.nio.AbstractFileSystemService.java:getPath()
224 """
225 suffix = ""
226 remaining = id
227 dirno = 0
228
229 if id is None or id == "":
230 raise Exception("Expecting a not-null id.")
231
232 id = long(id)
233
234 if id < 0:
235 raise Exception("Expecting a non-negative id.")
236
237 while (remaining > 999):
238 remaining /= 1000
239
240 if remaining > 0:
241 dirno = remaining % 1000
242 suffix = os.path.join("Dir-%03d" % dirno, suffix)
243
244 return os.path.join(root, "%s%s" %(suffix,id))
245
246 -class ServerContext(object):
247 """
248 Context passed to all servants.
249
250 server_id, communicator, and stop_event will be
251 constructed by the top-level Server instance.
252
253 A context instance may also be configured to hold
254 on to an internal session (ServiceFactoryPrx) and
255 keep it alive.
256
257 This instance obeys the Resources API and calls
258 sf.keepAlive(None) on every check call, but does
259 nothing on cleanup. The sf instance must be manually
260 cleaned as the final operation of a servant.
261
262 (Note: cleanup of the server context indicates
263 server shutdown, so should be infrequent)
264 """
265
266 - def __init__(self, server_id, communicator, stop_event, on_newsession = None):
267 self._lock = threading.RLock()
268 self.logger = logging.getLogger("omero.util.ServerContext")
269 self.server_id = server_id
270 self.communicator = communicator
271 self.stop_event = stop_event
272 self.servant_map = dict()
273 self.on_newsession = None
274
275 @locked
276 - def add_servant(self, adapter_or_current, servant, ice_identity = None):
277 oa = adapter_or_current
278 if isinstance(adapter_or_current, (Ice.Current, IcePy.Current)):
279 oa = oa.adapter
280 if ice_identity is None:
281 prx = oa.addWithUUID(servant)
282 else:
283 prx = oa.add(servant, ice_identity)
284
285 servant.setProxy(prx)
286 self.servant_map[prx] = servant
287 return prx
288
289 - def newSession(self):
290 self.session = internal_service_factory(self.communicator, stop_event = self.stop_event)
291 if callable(self.on_newsession):
292 self.on_newsession(self.session)
293
294 - def hasSession(self):
295 return hasattr(self, "session")
296
297 @locked
298 - def getSession(self, recreate = True):
299 """
300 Returns the ServiceFactoryPrx configured for the context if
301 available. If the context was not configured for sessions,
302 an ApiUsageException will be thrown: servants should know
303 whether or not they were configured for sessions.
304 See Servant(..., needs_session = True)
305
306 Otherwise, if there is no ServiceFactoryPrx, an attempt will
307 be made to create one if recreate == True. If the value is None
308 or non can be recreated, an InternalException will be thrown.
309
310 TODO : currently no arguments are provided for re-creating these,
311 but also not in Servant.__init__
312 """
313 if not self.hasSession():
314 raise omero.ApiUsageException("Not configured for server connection")
315
316 if self.session:
317 try:
318 self.session.keepAlive(None)
319 except Ice.CommunicatorDestroyedException:
320 self.session = None
321 except Exception, e:
322 self.logger.warn("Connection failure: %s" % e)
323 self.session = None
324
325 if self.session is None and recreate:
326 try:
327 self.newSession()
328 self.logger.info("Established connection: %s" % self.session)
329 except Exception, e:
330 self.logger.warn("Failed to establish connection: %s" % e)
331
332 if self.session is None:
333 raise omero.InternalException("No connection to server")
334
335 return self.session
336
338 """
339 Calls getSession() but always returns True. This keeps the context
340 available in the resources for later uses, and tries to re-establish
341 a connection in case Blitz goes down.
342 """
343 try:
344 self.getSession()
345 except:
346 pass
347 return True
348
350 """
351 Does nothing. Context clean up must happen manually
352 since later activities may want to reuse it. Servants using
353 a server connection should cleanup the instance *after* Resources
354 is cleaned up
355 """
356 pass
357
358 -class Server(Ice.Application):
359 """
360 Basic server implementation which can be used for
361 implementing a standalone python server which can
362 be started from icegridnode.
363
364 The servant implementation MUST have a constructor
365 which takes a single ServerContext argument AND
366 have a cleanup() method
367
368 Logging is configured relative to the current directory
369 to be in var/log by default.
370
371 Usage::
372
373 if __name__ == "__main__":
374 app=Server(ServicesI, "ServicesAdapter", Ice.Identity("Services",""))
375 sys.exit(app.main(sys.argv))
376
377 app.impl now points to an instance of ServicesI
378
379 """
380
381 - def __init__(self, impl_class, adapter_name, identity, logdir = LOGDIR, dependencies = ()):
389
391 ms = 10000
392 try:
393 i = os.environ.get("OMERO_STARTUP_WAIT", "10000")
394 ms = int(i)
395 except:
396 self.logger.debug(exc_info=1)
397
398 try:
399 self.logger.info("Waiting %s ms on startup" % ms)
400 self.stop_event.wait(ms/1000)
401 except:
402 self.logger.debug(exc_info=1)
403
404 - def run(self,args):
460
462 """
463 Cleans up all resources that were created by this server.
464 Primarily the one servant instance.
465 """
466 if hasattr(self,"impl"):
467 try:
468 self.impl.cleanup()
469 finally:
470 del self.impl
471
474 """
475 Base servant initialization. Doesn't create or try to cleanup
476 a top-level Resources thread. This is useful for large numbers
477 of servants. For servers and other singleton-like servants,
478 see "Servant"
479 """
481 self._lock = threading.RLock()
482 self.prx = None
483 self.ctx = ctx
484 self.stop_event = ctx.stop_event
485 self.communicator = ctx.communicator
486 self.logger = logging.getLogger(make_logname(self))
487 self.logger.debug("Created")
488
490 """
491 Should be overwritten for post-initialization activities.
492 The reason this method exists is that the implementation
493 must be complete before registering it with the adapter.
494 """
495 self.prx = prx
496
498 """
499 Abstract servant which can be used along with a slice2py
500 generated dispatch class as the base type of high-level servants.
501 These provide resource cleanup as per the omero.util.Server
502 class.
503
504 By passing "needs_session = True" to this constructor, an internal
505 session will be created and stored in ServerContext as well as
506 registered with self.resources
507 """
508
509 - def __init__(self, ctx, needs_session = False):
515
517 """
518 Cleanups all resoures created by this servant. Calling
519 cleanup multiple times should be safe.
520 """
521 resources = self.resources
522 self.resources = None
523 if resources != None:
524 self.logger.info("Cleaning up")
525 resources.cleanup()
526 self.logger.info("Done")
527 if self.ctx.hasSession():
528 try:
529 sf = self.ctx.getSession(recreate=False)
530 self.logger.debug("Destroying %s" % sf)
531 sf.destroy()
532 except:
533 pass
534
537
540 """
541 Container class for storing resources which should be
542 cleaned up on close and periodically checked. Use
543 stop_event.set() to stop the internal thread.
544 """
545
546 - def __init__(self, sleeptime = 60, stop_event = None):
547 """
548 Add resources via add(object). They should have a no-arg cleanup()
549 and a check() method.
550
551 The check method will be called periodically (default: 60 seconds)
552 on each resource. The cleanup method will be called on
553 Resources.cleanup()
554 """
555
556 self.stuff = []
557 self._lock = threading.RLock()
558 self.logger = logging.getLogger("omero.util.Resources")
559 self.stop_event = stop_event
560 if not self.stop_event:
561 self.stop_event = omero.util.concurrency.get_event(name="Resources")
562
563 if sleeptime < 5:
564 raise Exception("Sleep time should be greater than 5: %s" % sleeptime)
565
566 self.sleeptime = sleeptime
567
568 class Task(threading.Thread):
569 """
570 Internal thread used for checking "stuff"
571 """
572 def run(self):
573 ctx = self.ctx
574 ctx.logger.info("Starting")
575 while not ctx.stop_event.isSet():
576 try:
577 ctx.logger.debug("Executing")
578 copy = ctx.copyStuff()
579 remove = ctx.checkAll(copy)
580 ctx.removeAll(remove)
581 except:
582 ctx.logger.error("Exception during execution", exc_info = True)
583
584 ctx.logger.debug("Sleeping %s" % ctx.sleeptime)
585
586 try:
587 ctx.stop_event.wait(ctx.sleeptime)
588 except ValueError:
589 pass
590
591 if isinstance(ctx.stop_event, omero.util.concurrency.AtExitEvent):
592 if ctx.stop_event.atexit:
593 return
594
595 ctx.logger.info("Halted")
596
597 self.thread = Task()
598 self.thread.ctx = self
599 self.thread.start()
600
601 @locked
603 """
604 Within a lock, copy the "stuff" list and reverse it.
605 The list is reversed so that entries added
606 later, which may depend on earlier added entries
607 get a chance to be cleaned up first.
608 """
609 copy = list(self.stuff)
610 copy.reverse()
611 return copy
612
613
615 """
616 While stop_event is unset, go through the copy
617 of stuff and call the check method on each
618 entry. Any that throws an exception or returns
619 a False value will be returned in the remove list.
620 """
621 remove = []
622 for m in copy:
623 if self.stop_event.isSet():
624 return
625 self.logger.debug("Checking %s" % m[0])
626 method = getattr(m[0],m[2])
627 rv = None
628 try:
629 rv = method()
630 except:
631 self.logger.warn("Error from %s" % method, exc_info = True)
632 if not rv:
633 remove.append(m)
634 return remove
635
636 @locked
638 """
639 Finally, within another lock, call the "cleanup"
640 method on all the entries in remove, and remove
641 them from the official stuff list. (If stop_event
642 is set during execution, we return with the assumption
643 that Resources.cleanup() will take care of them)
644 """
645 for r in remove:
646 if self.stop_event.isSet():
647 return
648 self.logger.debug("Removing %s" % r[0])
649 self.safeClean(r)
650 self.stuff.remove(r)
651
652 @locked
653 - def add(self, object, cleanupMethod = "cleanup", checkMethod = "check"):
654 entry = (object,cleanupMethod,checkMethod)
655 self.logger.debug("Adding object %s" % object)
656 self.stuff.append(entry)
657
658 @locked
660 self.stop_event.set()
661 for m in self.stuff:
662 self.safeClean(m)
663 self.stuff = None
664 self.logger.debug("Cleanup done")
665
667 try:
668 self.logger.debug("Cleaning %s" % m[0])
669 method = getattr(m[0],m[1])
670 method()
671 except:
672 self.logger.error("Error cleaning resource: %s" % m[0], exc_info=1)
673
676
678 """
679 Simple class for creating an executable environment
680 """
681
683 """
684 Takes an number of environment variable names which
685 should be copied to the target environment if present
686 in the current execution environment.
687 """
688 if sys.platform == "win32":
689
690 self.env = os.environ.copy()
691 else:
692 self.env = {}
693 for arg in args:
694 if arg in os.environ:
695 self.env[arg] = os.environ[arg]
696
698 """
699 Returns the environment map when called.
700 """
701 return self.env
702
703 - def set(self, key, value):
704 """
705 Manually sets a value in the target environment.
706 """
707 self.env[key] = value
708
709 - def append(self, key, addition):
710 """
711 Manually adds a value to the environment string
712 """
713 if self.env.has_key(key):
714 self.env[key] = os.pathsep.join([self.env[key], addition])
715 else:
716 self.set(key, addition)
717
718
719
720
721
722 -def get_user(default = None):
723 """
724 Returns the username. For most purposes, this value
725 will be the same as getpass.getuser on *nix and
726 win32api.GetUserName on Windows, but in some situations
727 (when running without a terminal, etc) getuser may throw
728 a KeyError. In which case, or if the username resolves to
729 False, the default value be returned.
730
731 Any unexpected exceptions will be thrown.
732
733 See ticket:6307
734 """
735 rv = None
736 try:
737 import getpass
738 rv = getpass.getuser()
739 except KeyError:
740 pass
741 except ImportError:
742 import win32api
743 rv = win32api.GetUserName()
744
745 if not rv:
746 return default
747 else:
748 return rv
749
752 exceptions_to_handle = (ImportError)
753 try:
754 from pywintypes import com_error
755 from win32com.shell import shellcon, shell
756 exceptions_to_handle = (ImportError, com_error)
757 homeprop = shell.SHGetFolderPath(0, shellcon.CSIDL_APPDATA, 0, 0)
758 except exceptions_to_handle:
759 homeprop = os.path.expanduser("~")
760
761 if "~" == homeprop:
762 raise Exception("Unexpanded '~' from expanduser: see ticket:5583")
763
764 return homeprop
765
767 f = path.path(path_or_obj)
768 editor = os.getenv("VISUAL") or os.getenv("EDITOR")
769 if not editor:
770 if platform.system() == "Windows":
771 editor = "Notepad.exe"
772 else:
773 editor = "vi"
774 f.write_text(start_text)
775
776
777
778
779 editor_obj = path.path(editor)
780 if editor_obj.isabs():
781 editor_path = editor
782 else:
783 from omero_ext.which import which
784 editor_path = which(editor)
785
786 pid = os.spawnl(os.P_WAIT, editor_path, editor_path, f)
787 if pid:
788 re = RuntimeError("Couldn't spawn editor: %s" % editor)
789 re.pid = pid
790 raise re
791
792
793 -def tail_lines(filename,linesback=10,returnlist=0):
794 """Does what "tail -10 filename" would have done
795 Parameters::
796 filename file to read
797 linesback Number of lines to read from end of file
798 returnlist Return a list containing the lines instead of a string
799
800 """
801 avgcharsperline=75
802
803 file = open(filename,'r')
804 while 1:
805 try: file.seek(-1 * avgcharsperline * linesback,2)
806 except IOError: file.seek(0)
807 if file.tell() == 0: atstart=1
808 else: atstart=0
809
810 lines=file.read().split("\n")
811 if (len(lines) > (linesback+1)) or atstart: break
812
813 avgcharsperline=avgcharsperline * 1.3
814 file.close()
815
816 if len(lines) > linesback: start=len(lines)-linesback -1
817 else: start=0
818 if returnlist: return lines[start:len(lines)-1]
819
820 out=""
821 for l in lines[start:len(lines)-1]: out=out + l + "\n"
822 return out
823