Package omero :: Module callbacks
[hide private]
[frames] | no frames]

Source Code for Module omero.callbacks

  1  #!/usr/bin/env python 
  2  # -*- coding: utf-8 -*- 
  3  """ 
  4     Callbacks to be used with asynchronous services. The 
  5     ProcessCallbackI is also included in the omero.scripts 
  6     module for backwards compatibility. 
  7   
  8     Copyright 2010 Glencoe Software, Inc. All rights reserved. 
  9     Use is subject to license terms supplied in LICENSE.txt 
 10   
 11  """ 
 12   
 13  import os 
 14  import Ice 
 15  import logging 
 16   
 17  import omero 
 18  import omero.all 
 19  import omero.util.concurrency 
 20  import omero_ext.uuid as uuid # see ticket:3774 
 21   
 22  from omero.rtypes import * 
 23   
 24   
 25  PROC_LOG = logging.getLogger("omero.scripts.ProcessCallback") 
 26  DEL_LOG = logging.getLogger("omero.api.DeleteCallback") 
 27  CMD_LOG = logging.getLogger("omero.cmd.CmdCallback") 
 28   
 29   
30 -def adapter_and_category(adapter_or_client, category):
31 if isinstance(adapter_or_client, Ice.ObjectAdapter): 32 # This should be the case either when an 33 # instance is created server-side or when 34 # the user has passed in a category 35 # explicitly. If it's missing, then we'll 36 # have to throw 37 if not category: 38 raise omero.ClientError("No category available") 39 return adapter_or_client, category 40 else: 41 # This is the case client-side, where an 42 # omero.client instance is available. 43 # If a category is passed we use that 44 # (though it's unlikely that that will be useful) 45 if not category: 46 category = adapter_or_client.getCategory() 47 return adapter_or_client.getAdapter(), category
48 49
50 -class ProcessCallbackI(omero.grid.ProcessCallback):
51 """ 52 Simple callback which registers itself with the given process. 53 """ 54 55 FINISHED = "FINISHED" 56 CANCELLED = "CANCELLED" 57 KILLED = "KILLED" 58
59 - def __init__(self, adapter_or_client, process, poll = True, category=None):
60 self.event = omero.util.concurrency.get_event(name="ProcessCallbackI") 61 self.result = None 62 self.poll = poll 63 self.process = process 64 self.adapter, self.category = \ 65 adapter_and_category(adapter_or_client, category) 66 67 self.id = Ice.Identity(str(uuid.uuid4()), self.category) 68 self.prx = self.adapter.add(self, self.id) # OK ADAPTER USAGE 69 self.prx = omero.grid.ProcessCallbackPrx.uncheckedCast(self.prx) 70 process.registerCallback(self.prx)
71
72 - def block(self, ms):
73 """ 74 Should only be used if the default logic of the process methods is kept 75 in place. If "event.set" does not get called, this method will always 76 block for the given milliseconds. 77 """ 78 if self.poll: 79 try: 80 rc = self.process.poll() 81 if rc is not None: 82 self.processFinished(rc.getValue()) 83 except Exception, e: 84 PROC_LOG.warn("Error calling poll: %s" % e) 85 86 self.event.wait(float(ms) / 1000) 87 if self.event.isSet(): 88 return self.result 89 return None
90
91 - def processCancelled(self, success, current = None):
92 self.result = ProcessCallbackI.CANCELLED 93 self.event.set()
94
95 - def processFinished(self, returncode, current = None):
96 self.result = ProcessCallbackI.FINISHED 97 self.event.set()
98
99 - def processKilled(self, success, current = None):
100 self.result = ProcessCallbackI.KILLED 101 self.event.set()
102
103 - def close(self):
104 self.adapter.remove(self.id) # OK ADAPTER USAGE
105 106
107 -class DeleteCallbackI(object):
108 """ 109 Callback used for waiting until DeleteHandlePrx will return true on 110 finished(). The block(long) method will wait the given number of 111 milliseconds and then return the number of errors if any or None 112 if the delete is not yet complete. 113 114 Example usage: 115 116 cb = DeleteCallbackI(client, handle) 117 errors = None 118 while (errors is None): 119 errors = cb.block(500) 120 """ 121
122 - def __init__(self, adapter_or_client, handle, poll = True):
123 self.event = omero.util.concurrency.get_event(name="DeleteCallbackI") 124 self.result = None 125 self.poll = poll 126 self.handle = handle 127 self.adapter = adapter_or_client 128 self.id = Ice.Identity(str(uuid.uuid4()), "DeleteHandleCallback") 129 if not isinstance(self.adapter, Ice.ObjectAdapter): 130 self.adapter = self.adapter.adapter
131 #self.prx = self.adapter.add(self, self.id) # OK ADAPTER USAGE 132 #self.prx = omero.grid.ProcessCallbackPrx.uncheckedCast(self.prx) 133 #process.registerCallback(self.prx) 134
135 - def loop(self, loops, ms):
136 """ 137 Calls block(long) "loops" number of times with the "ms" 138 argument. This means the total wait time for the delete to occur 139 is: loops X ms. Sensible values might be 10 loops for 500 ms, or 140 5 seconds. 141 142 @param loops Number of times to call block(long) 143 @param ms Number of milliseconds to pass to block(long 144 @throws omero.LockTimeout if block(long) does not return 145 a non-null value after loops calls. 146 """ 147 148 count = 0 149 errors = None 150 while errors is None and count < loops: 151 errors = self.block(ms) 152 count += 1 153 154 if errors is None: 155 waited = (ms / 1000) * loops 156 raise omero.LockTimeout(None, None, 157 "Delete unfinished after %s seconds" % waited, 158 5000L, waited) 159 else: 160 return self.handle.report()
161
162 - def block(self, ms):
163 """ 164 Should only be used if the default logic of the handle methods is kept 165 in place. If "event.set" does not get called, this method will always 166 block for the given milliseconds. 167 """ 168 if self.poll: 169 try: 170 if self.handle.finished(): 171 try: 172 self.finished(self.handle.errors()) 173 except Exception, e: 174 DEL_LOG.warn("Error calling DeleteCallbackI.finished: %s" % e, exc_info=True) 175 except Ice.ObjectNotExistException, onee: 176 raise omero.ClientError("Handle is gone! %s" % self.handle) 177 except: 178 DEL_LOG.warn("Error polling DeleteHandle:" + str(self.handle), exc_info=True) 179 180 181 self.event.wait(float(ms) / 1000) 182 if self.event.isSet(): 183 return self.result 184 return None
185 186
187 - def finished(self, errors):
188 self.result = errors 189 self.event.set()
190
191 - def close(self):
192 #self.adapter.remove(self.id) # OK ADAPTER USAGE 193 try: 194 self.handle.close() # ticket:2978 195 except Exception, e: 196 DEL_LOG.warn("Error calling DeleteHandlePrx.close: %s" % self.handle, exc_info=True)
197 198
199 -class CmdCallbackI(omero.cmd.CmdCallback):
200 """ 201 Callback servant used to wait until a HandlePrx would 202 return non-null on getReponse. The server will notify 203 of completion to prevent constantly polling on 204 getResponse. Subclasses can override methods for handling 205 based on the completion status. 206 207 Example usage: 208 209 cb = CmdCallbackI(client, handle) 210 response = None 211 while (response is None): 212 response = cb.block(500) 213 214 # or 215 216 response = cb.loop(5, 500) 217 """ 218
219 - def __init__(self, adapter_or_client, handle, category=None):
220 221 if adapter_or_client is None: 222 raise omero.ClientError("Null client") 223 224 if handle is None: 225 raise omero.ClientError("Null handle") 226 227 self.event = omero.util.concurrency.get_event(name="CmdCallbackI") 228 self.state = (None, None) # (Response, Status) 229 self.handle = handle 230 self.adapter, self.category = \ 231 adapter_and_category(adapter_or_client, category) 232 233 self.id = Ice.Identity(str(uuid.uuid4()), self.category) 234 self.prx = self.adapter.add(self, self.id) # OK ADAPTER USAGE 235 self.prx = omero.cmd.CmdCallbackPrx.uncheckedCast(self.prx) 236 handle.addCallback(self.prx) 237 238 # Now check just in case the process exited VERY quickly 239 self.poll()
240 241 # 242 # Local invocations 243 # 244
245 - def getResponse(self):
246 """ 247 Returns possibly null Response value. If null, then neither has 248 the remote server nor the local poll method called finish 249 with non-null values. 250 """ 251 return self.state[0]
252
253 - def getStatus(self):
254 """ 255 Returns possibly null Status value. If null, then neither has 256 the remote server nor the local poll method called finish 257 with non-null values. 258 """ 259 return self.state[1]
260
261 - def getStatusOrThrow(self):
262 s = self.getStatus() 263 if not s: 264 raise omero.ClientError("Status not present!") 265 return s
266
267 - def isCancelled(self):
268 """ 269 Returns whether Status::CANCELLED is contained in 270 the flags variable of the Status instance. If no 271 Status is available, a ClientError will be thrown. 272 """ 273 s = self.getStatusOrThrow() 274 try: 275 s.flags.index(omero.cmd.State.CANCELLED) 276 return True 277 except: 278 return False
279
280 - def isFailure(self):
281 """ 282 Returns whether Status::FAILURE is contained in 283 the flags variable of the Status instance. If no 284 Status is available, a ClientError will be thrown. 285 """ 286 s = self.getStatusOrThrow() 287 try: 288 s.flags.index(omero.cmd.State.FAILURE) 289 return True 290 except: 291 return False
292
293 - def loop(self, loops, ms):
294 """ 295 Calls block(long) "loops" number of times with the "ms" 296 argument. This means the total wait time for the delete to occur 297 is: loops X ms. Sensible values might be 10 loops for 500 ms, or 298 5 seconds. 299 300 @param loops Number of times to call block(long) 301 @param ms Number of milliseconds to pass to block(long 302 @throws omero.LockTimeout if block(long) does not return 303 a non-null value after loops calls. 304 """ 305 306 count = 0 307 found = False 308 rsp = None 309 while count < loops: 310 count += 1 311 found = self.block(ms) 312 if found: 313 break 314 315 if found: 316 return self.getResponse() 317 else: 318 waited = (ms / 1000.0) * loops 319 raise omero.LockTimeout(None, None, 320 "Command unfinished after %s seconds" % waited, 321 5000L, waited)
322
323 - def block(self, ms):
324 """ 325 Blocks for the given number of milliseconds unless 326 finished(Response, Status, Current) has been called in 327 which case it returns immediately with true. If false 328 is returned, then the timeout was reached. 329 """ 330 self.event.wait(float(ms) / 1000) 331 return self.event.isSet()
332 333 # 334 # Remote invocations 335 # 336
337 - def poll(self):
338 """ 339 Calls HandlePrx#getResponse in order to check for a 340 non-null value. If so, {@link Handle#getStatus} is also called, and the 341 two non-null values are passed to finished(Response, Status, Current). 342 This should typically not be used. Instead, favor the use of block and 343 loop. 344 """ 345 rsp = self.handle.getResponse() 346 if rsp is not None: 347 s = self.handle.getStatus() 348 self.finished(rsp, s, None) # Only time that current should be null
349
350 - def step(self, complete, total, current=None):
351 """ 352 Called periodically by the server to signal that processing is 353 moving forward. Default implementation does nothing. 354 """ 355 pass
356
357 - def finished(self, rsp, status, current=None):
358 """ 359 Called when the command has completed whether with 360 a cancellation or a completion. 361 """ 362 self.state = (rsp, status) 363 self.event.set() 364 self.onFinished(rsp, status, current)
365
366 - def onFinished(self, rsp, status, current):
367 """ 368 Method intended to be overridden by subclasses. Default logic does 369 nothing. 370 """ 371 pass
372
373 - def close(self, closeHandle):
374 """ 375 First removes self from the adapter so as to no longer receive 376 notifications, and the calls close on the remote handle if requested. 377 """ 378 self.adapter.remove(self.id) # OK ADAPTER USAGE 379 if closeHandle: 380 self.handle.close();
381