1
2
3 """
4 Reconcile and cleanse where necessary an OMERO data directory of orphaned data.
5 """
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 import omero.clients
32 import omero
33 import sys
34 import os
35
36 from Glacier2 import PermissionDeniedException
37 from getopt import getopt, GetoptError
38 from omero.util import get_user
39 from stat import *
40
41
42
43
44 SEARCH_DIRECTORIES = {
45 'Pixels': 'Pixels',
46 'Files': 'OriginalFile',
47 'Thumbnails': 'Thumbnail'
48 }
49
51 """
52 Prints usage so that we don't have to. :)
53 """
54 cmd = sys.argv[0]
55 print """%s
56 Usage: %s [-dry-run] [-u username | -k] <omero.data.dir>
57 Cleanses files in the OMERO data directory that have no reference in the
58 OMERO database. NOTE: As this script is designed to be run via cron or in
59 a scheduled manner it produces NO output unless a dry run is performed.
60
61 Options:
62 -u Administrator username to log in to OMERO with
63 -k Session key to log in to OMERO with
64 --dry-run Just prints out what would have been done
65
66 Examples:
67 %s --dry-run -u root /OMERO
68
69 Report bugs to OME Users <ome-users@lists.openmicroscopy.org.uk>""" % \
70 (error, cmd, cmd)
71 sys.exit(2)
72
74 """
75 Keeps file cleansing state and performs OMERO database reconciliation of
76 files within an OMERO binary repository.
77 """
78
79
80 QUERY_THRESHOLD = 25
81
82
83 PYRAMID_FILE = "_pyramid"
84 PYRAMID_LOCK = ".pyr_lock"
85 PYRAMID_TEMP = ".tmp"
86
87 - def __init__(self, query_service, object_type):
88 self.query_service = query_service
89 self.object_type = object_type
90 self.cleansed = list()
91 self.bytes_cleansed = 0
92 self.deferred_paths = list()
93 self.dry_run = False
94
96 """
97 Begins a cleansing operation from a given OMERO binary repository
98 root directory. /OMERO/Files or /OMERO/Pixels for instance.
99 """
100 for file in os.listdir(root):
101 path = os.path.join(root, file)
102 if os.path.isdir(path):
103 self.cleanse(path)
104 else:
105 self.query_or_defer(path)
106
108 """
109 Adds a given path to the list of deferred paths. If the number of
110 deferred paths has reached the QUERY_THRESHOLD (to reduce database
111 hits) a reconciliation check will happen against OMERO.
112 """
113 self.deferred_paths.append(path)
114 if len(self.deferred_paths) == self.QUERY_THRESHOLD:
115 self.do_cleanse()
116
118 """
119 Actually performs the reconciliation check against OMERO and
120 removes relevant files.
121 """
122 if len(self.deferred_paths) == 0:
123 return
124 split = os.path.split
125 object_ids = []
126 for path in self.deferred_paths:
127 file_name = split(path)[1]
128 try:
129 object_id = omero.rtypes.rlong(long(file_name))
130 except ValueError:
131 try:
132 file_name.index(self.PYRAMID_FILE)
133 id_part = file_name.split("_")[0]
134 if file_name.endswith(self.PYRAMID_FILE):
135 object_id = omero.rtypes.rlong(long(id_part))
136 elif file_name.endswith(self.PYRAMID_LOCK) or file_name.endswith(self.PYRAMID_TEMP):
137 object_id = omero.rtypes.rlong(long(id_part.lstrip('.')))
138 else:
139 object_id = omero.rtypes.rlong(-1)
140 except ValueError:
141 object_id = omero.rtypes.rlong(-1)
142 object_ids.append(object_id)
143
144 parameters = omero.sys.Parameters()
145 parameters.map = {'ids': omero.rtypes.rlist(object_ids)}
146 rows = self.query_service.projection(
147 "select o.id from %s as o where o.id in (:ids)" % self.object_type,
148 parameters, {"omero.group":"-1"})
149 existing_ids = [cols[0].val for cols in rows]
150
151 for i, object_id in enumerate(object_ids):
152 path = self.deferred_paths[i]
153 if object_id.val not in existing_ids:
154 if object_id.val == -1:
155 if self.dry_run:
156 print " \_ %s (ignored/keep)" % path
157 else:
158 size = os.stat(path)[ST_SIZE]
159 self.cleansed.append(path)
160 self.bytes_cleansed = size
161 if self.dry_run:
162 print " \_ %s (remove)" % path
163 else:
164 try:
165 os.unlink(path)
166 except OSError, e:
167 print e
168 elif self.dry_run:
169 print " \_ %s (keep)" % path
170 self.deferred_paths = list()
171
173 """
174 Takes the final set of deferred paths and performs a reconciliation
175 check against OMERO for them. This method's purpose is basically to
176 catch the final set of paths in the deferred path list and/or perform
177 any cleanup.
178 """
179 self.do_cleanse()
180
182 return "Cleansing context: %d files (%d bytes)" % \
183 (len(self.cleansed), self.bytes_cleansed)
184
185
187
188
189
190 if config_service is None:
191 print ("No config service provided! "
192 "Waiting 10 seconds to allow cancellation")
193 from threading import Event
194 Event().wait(10)
195
196 server_version = config_service.getVersion()
197 server_tuple = tuple([int(x) for x in server_version.split(".")])
198 if server_tuple < (4, 2, 1):
199 print "Server version is too old! (%s) Aborting..." % server_version
200 sys.exit(3)
201
202
203 -def cleanse(data_dir, query_service, dry_run=False, config_service=None):
219
220
221 -def fixpyramids(data_dir, query_service, dry_run=False, config_service=None):
222 initial_check(config_service)
223
224
225
226
227
228 pixels_dir = os.path.join(data_dir, "Pixels")
229 for root, dirs, files in os.walk(pixels_dir):
230 for f in files:
231 pixels_file = os.path.join(root, f)
232 length = os.path.getsize(pixels_file)
233 if length == 0 and f.endswith("_pyramid"):
234 delete_pyramid = True
235 for lockfile in os.listdir(pixels_dir):
236 if lockfile.startswith("." + f) and \
237 (lockfile.endswith(".tmp") or
238 lockfile.endswith(".pyr_lock")):
239 delete_pyramid = False
240 break
241
242 if delete_pyramid:
243 if dry_run:
244 print "Would remove %s" % f
245 else:
246 print "Removing %s" % f
247 os.remove(pixels_file)
248
249
251 """
252 Default main() that performs OMERO data directory cleansing.
253 """
254 try:
255 options, args = getopt(sys.argv[1:], "u:k:", ["dry-run"])
256 except GetoptError, (msg, opt):
257 usage(msg)
258
259 try:
260 data_dir, = args
261 except:
262 usage('Expecting single OMERO data directory!')
263
264 username = get_user("root")
265 session_key = None
266 dry_run = False
267 for option, argument in options:
268 if option == "-u":
269 username = argument
270 if option == "-k":
271 session_key = argument
272 if option == "--dry-run":
273 dry_run = True
274
275 if session_key is None:
276 print "Username: %s" % username
277 try:
278 password = getpass.getpass()
279 except KeyboardInterrupt:
280 sys.exit(2)
281
282 try:
283 client = omero.client('localhost')
284 client.setAgent("OMERO.cleanse")
285 session = None
286 if session_key is None:
287 session = client.createSession(username, password)
288 else:
289 session = client.createSession(session_key)
290 except PermissionDeniedException:
291 print "%s: Permission denied" % sys.argv[0]
292 print "Sorry."
293 sys.exit(1)
294
295
296 query_service = session.getQueryService()
297 config_service = session.getConfigService()
298 try:
299 cleanse(data_dir, query_service, dry_run, config_service)
300 finally:
301 if session_key is None:
302 client.closeSession()
303
304 if __name__ == '__main__':
305 main()
306