1
2
3
4
5
6
7
8
9 import os
10 import sys
11 import Ice
12 import time
13 import uuid
14 import omero
15 import IceGrid
16 import logging
17 import Glacier2
18 import threading
19 import exceptions
20 import logging.handlers
21 import omero.util.concurrency
22
23 from omero.util.decorators import locked
24
25 LOGDIR = os.path.join("var","log")
26 LOGFORMAT = """%(asctime)s %(levelname)-5.5s [%(name)40s] (%(threadName)-10s) %(message)s"""
27 LOGLEVEL = logging.INFO
28 LOGSIZE = 500000000
29 LOGNUM = 9
30 LOGMODE = "a"
31
32 orig_stdout = sys.stdout
33 orig_stderr = sys.stderr
36 """
37 Generates a logname from the given instance using the module and name from its class
38 """
39 log_name = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
40 return log_name
41
65
82
84 """
85 Since all server components should exclusively using the logging module
86 any output to stdout or stderr is caught and logged at "WARN". This is
87 useful, especially in the case of Windows, where stdout/stderr is eaten.
88 """
89
91 self.logger = logger
92 self.internal = logging.getLogger("StreamRedirect")
93 self.softspace = False
94
97
99 msg = msg.strip()
100 if msg:
101 self.logger.warn(msg)
102
104 self.internal.warn("No attribute: %s" % name)
105
106 -def internal_service_factory(communicator, user="root", group=None, retries=6, interval=10, client_uuid=None, stop_event = None):
107 """
108 Try to return a ServiceFactory from the grid.
109
110 Try a number of times then give up and raise the
111 last exception returned. This method will only
112 work internally to the grid, i.e. behind the Glacier2
113 firewall. It is intended for internal servers to
114 be able to create sessions for accessing the database. ::
115 communicator := Ice.Communicator used to find the registry
116 user := Username which should have a session created
117 group := Group into which the session should be logged
118 retries := Number of session creation retries before throwing
119 interval := Seconds between retries
120 client_uuid := Uuid of the client which should be used
121 """
122 log = logging.getLogger("omero.utils")
123 if stop_event == None:
124 stop_event = omero.util.concurrency.get_event()
125
126 tryCount = 0
127 excpt = None
128 query = communicator.stringToProxy("IceGrid/Query")
129 query = IceGrid.QueryPrx.checkedCast(query)
130
131 if client_uuid is None:
132 client_uuid = str(uuid.uuid4())
133
134 while tryCount < retries:
135 if stop_event.isSet():
136 return None
137 try:
138 blitz = query.findAllObjectsByType("::Glacier2::SessionManager")[0]
139 blitz = Glacier2.SessionManagerPrx.checkedCast(blitz)
140 sf = blitz.create(user, None, {"omero.client.uuid":client_uuid})
141
142 return omero.api.ServiceFactoryPrx.checkedCast(sf)
143 except Exception, e:
144 tryCount += 1
145 log.info("Failed to get session on attempt %s", str(tryCount))
146 excpt = e
147 stop_event.wait(interval)
148
149 log.warn("Reason: %s", str(excpt))
150 raise excpt
151
153 """
154 """
155 reg = communicator.stringToProxy("IceGrid/Registry")
156 reg = IceGrid.RegistryPrx.checkedCast(reg)
157 adm = reg.createAdminSession('null', '')
158 return adm
159
161 """
162 """
163 sid = communicator.identityToString(obj.ice_getIdentity())
164 adm = create_admin_session(communicator)
165 prx = adm.getAdmin()
166 try:
167 try:
168 prx.addObject(obj)
169 except IceGrid.ObjectExistsException:
170 prx.updateObject(obj)
171 finally:
172 adm.destroy()
173
175 """
176 Converts a long to a path such that for all directiories only
177 a 1000 files and a 1000 subdirectories will be returned.
178
179 This method duplicates the logic in
180 ome.io.nio.AbstractFileSystemService.java:getPath()
181 """
182 suffix = ""
183 remaining = id
184 dirno = 0
185
186 if id is None or id == "":
187 raise exceptions.Exception("Expecting a not-null id.")
188
189 id = long(id)
190
191 if id < 0:
192 raise exceptions.Exception("Expecting a non-negative id.")
193
194 while (remaining > 999):
195 remaining /= 1000
196
197 if remaining > 0:
198 dirno = remaining % 1000
199 suffix = os.path.join("Dir-%03d" % dirno, suffix)
200
201 return os.path.join(root, "%s%s" %(suffix,id))
202
203 -class ServerContext(object):
204 """
205 Context passed to all servants.
206
207 server_id, communicator, and stop_event will be
208 constructed by the top-level Server instance.
209
210 A context instance may also be configured to hold
211 on to an internal session (ServiceFactoryPrx) and
212 keep it alive.
213
214 This instance obeys the Resources API and calls
215 sf.keepAlive(None) on every check call, but does
216 nothing on cleanup. The sf instance must be manually
217 cleaned as the final operation of a servant.
218
219 (Note: cleanup of the server context indicates
220 server shutdown, so should be infrequent)
221 """
222
223 - def __init__(self, server_id, communicator, stop_event):
224 self._lock = threading.RLock()
225 self.logger = logging.getLogger("omero.util.ServerContext")
226 self.server_id = server_id
227 self.communicator = communicator
228 self.stop_event = stop_event
229
230 - def newSession(self):
231 self.session = internal_service_factory(self.communicator, stop_event = self.stop_event)
232
233 - def hasSession(self):
234 return hasattr(self, "session")
235
236 @locked
237 - def getSession(self, recreate = True):
238 """
239 Returns the ServiceFactoryPrx configured for the context if
240 available. If the context was not configured for sessions,
241 an ApiUsageException will be thrown: servants should know
242 whether or not they were configured for sessions.
243 See Servant(..., needs_session = True)
244
245 Otherwise, if there is no ServiceFactoryPrx, an attempt will
246 be made to create one if recreate == True. If the value is None
247 or non can be recreated, an InternalException will be thrown.
248
249 TODO : currently no arguments are provided for re-creating these,
250 but also not in Servant.__init__
251 """
252 if not self.hasSession():
253 raise omero.ApiUsageException("Not configured for server connection")
254
255 if self.session:
256 try:
257 self.session.keepAlive(None)
258 except exceptions.Exception, e:
259 self.logger.warn("Connection failure: %s" % e)
260 self.session = None
261
262 if self.session is None and recreate:
263 try:
264 self.newSession()
265 except exceptions.Exception, e:
266 self.logger.warn("Failed to re-establish connection: %s" % e)
267
268 if self.session is None:
269 raise omero.InternalException("No conection to server")
270
271 return self.session
272
274 """
275 Calls getSession() but always returns True. This keeps the context
276 available in the resources for later uses, and tries to re-establish
277 a connection in case Blitz goes down.
278 """
279 try:
280 self.getSession()
281 except:
282 pass
283 return True
284
286 """
287 Does nothing. Context clean up must happen manually
288 since later activities may want to reuse it. Servants using
289 a server connection should cleanup the instance *after* Resources
290 is cleaned up
291 """
292 pass
293
294 -class Server(Ice.Application):
295 """
296 Basic server implementation which can be used for
297 implementing a standalone python server which can
298 be started from icegridnode.
299
300 The servant implementation MUST have a constructor
301 which takes a single ServerContext argument AND
302 have a cleanup() method
303
304 Logging is configured relative to the current directory
305 to be in var/log by default.
306
307 Usage::
308
309 if __name__ == "__main__":
310 app=Server(ServicesI, "ServicesAdapter", Ice.Identity("Services",""))
311 sys.exit(app.main(sys.argv))
312
313 app.impl now points to an instance of ServicesI
314
315 """
316
317 - def __init__(self, impl_class, adapter_name, identity, logdir = LOGDIR):
318
319 self.impl_class = impl_class
320 self.adapter_name = adapter_name
321 self.identity = identity
322 self.logdir = logdir
323 self.stop_event = omero.util.concurrency.get_event()
324
325 - def run(self,args):
326
327 from omero.rtypes import ObjectFactories as rFactories
328 from omero.columns import ObjectFactories as cFactories
329
330 props = self.communicator().getProperties()
331 configure_server_logging(props)
332
333 self.logger = logging.getLogger("omero.util.Server")
334 self.logger.info("*"*80)
335 self.logger.info("Starting")
336
337 self.shutdownOnInterrupt()
338
339 try:
340
341 self.objectfactory = omero.clients.ObjectFactory()
342 self.objectfactory.registerObjectFactory(self.communicator())
343 for of in rFactories.values() + cFactories.values():
344 of.register(self.communicator())
345
346 try:
347 serverid = self.communicator().getProperties().getProperty("Ice.ServerId")
348 ctx = ServerContext(serverid, self.communicator(), self.stop_event)
349 self.impl = self.impl_class(ctx)
350 getattr(self.impl, "cleanup")
351 except:
352 self.logger.error("Failed initialization", exc_info=1)
353 sys.exit(100)
354
355 try:
356 self.adapter = self.communicator().createObjectAdapter(self.adapter_name)
357 prx = self.adapter.add(self.impl, self.identity)
358 self.adapter.activate()
359 add_grid_object(self.communicator(), prx)
360 except:
361 self.logger.error("Failed activation", exc_info=1)
362 sys.exit(200)
363
364 self.logger.info("Blocking until shutdown")
365 self.communicator().waitForShutdown()
366 finally:
367 self.stop_event.set()
368 self.logger.info("Cleanup")
369 self.cleanup()
370 self.logger.info("Stopped")
371 self.logger.info("*"*80)
372
374 """
375 Cleans up all resources that were created by this server.
376 Primarily the one servant instance.
377 """
378 if hasattr(self,"impl"):
379 try:
380 self.impl.cleanup()
381 finally:
382 del self.impl
383
386 """
387 Base servant initialization. Doesn't create or try to cleanup
388 a top-level Resources thread. This is useful for large numbers
389 of servants. For servers and other singleton-like servants,
390 see "Servant"
391 """
393 self._lock = threading.RLock()
394 self.ctx = ctx
395 self.stop_event = ctx.stop_event
396 self.communicator = ctx.communicator
397 self.logger = logging.getLogger(make_logname(self))
398 self.logger.debug("Created")
399
401 """
402 Abstract servant which can be used along with a slice2py
403 generated dispatch class as the base type of high-level servants.
404 These provide resource cleanup as per the omero.util.Server
405 class.
406
407 By passing "needs_session = True" to this constructor, an internal
408 session will be created and stored in ServerContext as well as
409 registered with self.resources
410 """
411
412 - def __init__(self, ctx, needs_session = False):
418
420 """
421 Cleanups all resoures created by this servant. Calling
422 cleanup multiple times should be safe.
423 """
424 resources = self.resources
425 self.resources = None
426 if resources != None:
427 self.logger.info("Cleaning up")
428 resources.cleanup()
429 self.logger.info("Done")
430 if self.ctx.hasSession():
431 try:
432 sf = self.ctx.getSession(recreate=False)
433 self.logger.debug("Destroying %s" % sf)
434 sf.destroy()
435 except:
436 pass
437
440
443 """
444 Container class for storing resources which should be
445 cleaned up on close and periodically checked. Use
446 stop_event.set() to stop the internal thread.
447 """
448
449 - def __init__(self, sleeptime = 60, stop_event = None):
450 """
451 Add resources via add(object). They should have a no-arg cleanup()
452 and a check() method.
453
454 The check method will be called periodically (default: 60 seconds)
455 on each resource. The cleanup method will be called on
456 Resources.cleanup()
457 """
458
459 self._lock = threading.RLock()
460 self.logger = logging.getLogger("omero.util.Resources")
461 self.stop_event = stop_event
462 if not self.stop_event:
463 self.stop_event = omero.util.concurrency.get_event()
464
465 if sleeptime < 5:
466 raise exceptions.Exception("Sleep time should be greater than 5: " % sleeptime)
467
468 self.sleeptime = sleeptime
469 self.stuff = []
470
471 class Task(threading.Thread):
472 """
473 Internal thread used for checking "stuff"
474 """
475 def run(self):
476 ctx = self.ctx
477 ctx.logger.info("Starting")
478 while not ctx.stop_event.isSet():
479 try:
480 ctx.logger.debug("Executing")
481 copy = ctx.copyStuff()
482 remove = ctx.checkAll(copy)
483 ctx.removeAll(remove)
484 except:
485 ctx.logger.error("Exception during execution", exc_info = True)
486
487 ctx.logger.debug("Sleeping %s" % ctx.sleeptime)
488
489 try:
490 ctx.stop_event.wait(ctx.sleeptime)
491 except ValueError:
492 pass
493
494 ctx.logger.info("Halted")
495
496 self.thread = Task()
497 self.thread.ctx = self
498 self.thread.start()
499
500 @locked
502 """
503 Within a lock, copy the "stuff" list and reverse it.
504 The list is reversed so that entries added
505 later, which may depend on earlier added entries
506 get a chance to be cleaned up first.
507 """
508 copy = list(self.stuff)
509 copy.reverse()
510 return copy
511
512
514 """
515 While stop_event is unset, go through the copy
516 of stuff and call the check method on each
517 entry. Any that throws an exception or returns
518 a False value will be returned in the remove list.
519 """
520 remove = []
521 for m in copy:
522 if self.stop_event.isSet():
523 return
524 self.logger.debug("Checking %s" % m[0])
525 method = getattr(m[0],m[2])
526 rv = None
527 try:
528 rv = method()
529 except:
530 self.logger.warn("Error from %s" % method, exc_info = True)
531 if not rv:
532 remove.append(m)
533 return remove
534
535 @locked
537 """
538 Finally, within another lock, call the "cleanup"
539 method on all the entries in remove, and remove
540 them from the official stuff list. (If stop_event
541 is set during execution, we return with the assumption
542 that Resources.cleanup() will take care of them)
543 """
544 for r in remove:
545 if self.stop_event.isSet():
546 return
547 self.logger.debug("Removing %s" % r[0])
548 self.safeClean(r)
549 self.stuff.remove(r)
550
551 @locked
552 - def add(self, object, cleanupMethod = "cleanup", checkMethod = "check"):
553 entry = (object,cleanupMethod,checkMethod)
554 self.logger.info("Adding object %s" % object)
555 self.stuff.append(entry)
556
557 @locked
559 self.stop_event.set()
560 for m in self.stuff:
561 self.safeClean(m)
562 self.stuff = None
563 self.logger.info("Cleanup done")
564
566 try:
567 self.logger.debug("Cleaning %s" % m[0])
568 method = getattr(m[0],m[1])
569 method()
570 except:
571 self.logger.error("Error cleaning resource: %s" % m[0], exc_info=1)
572
575
577 """
578 Simple class for creating an executable environment
579 """
580
582 """
583 Takes an number of environment variable names which
584 should be copied to the target environment if present
585 in the current execution environment.
586 """
587 if sys.platform == "win32":
588
589 self.env = os.environ.copy()
590 else:
591 self.env = {}
592 for arg in args:
593 if os.environ.has_key(arg):
594 self.env[arg] = os.environ[arg]
596 """
597 Returns the environment map when called.
598 """
599 return self.env
600
601 - def set(self, key, value):
602 """
603 Manually sets a value in the target environment.
604 """
605 self.env[key] = value
606
607 - def append(self, key, addition):
608 """
609 Manually adds a value to the environment string
610 """
611 if self.env.has_key(key):
612 self.env[key] = os.pathsep.join([self.env[key], addition])
613 else:
614 self.set(key, addition)
615