-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathgalaxy-fuse.py
324 lines (281 loc) · 11.8 KB
/
galaxy-fuse.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
#!/usr/bin/env python
"""
galaxy-fuse.py will mount Galaxy datasets for direct read access using FUSE.
To do this you will need your Galaxy API key, found by logging into Galaxy and
selecting the menu option User -> API Keys. You can mount your Galaxy datasets
using a command like
python galaxy-fuse.py <api-key> &
This puts the galaxy-fuse process into the background. Galaxy Datasets will then
appear as read-only files, organised by History, under the directory galaxy_files.
galaxy-fuse was written by Dr David Powell and began life at
https://github.com/drpowell/galaxy-fuse .
Modified December 2016 by Madison Flannery.
"""
from errno import ENOENT
from stat import S_IFDIR, S_IFREG, S_IFLNK
from sys import argv, exit
import re
import time
import os
import argparse
from fuse import FUSE, FuseOSError, Operations, LoggingMixIn, fuse_get_context
from bioblend import galaxy
# number of seconds to cache history/dataset lookups
CACHE_TIME = 30
# Split a path into hash of components
def path_type(path):
parts = filter(lambda x: len(x)>0, path.split('/'))
if path=='/':
return ('root',dict())
elif path=='/histories':
return ('histories',dict())
elif len(parts)==2 and parts[0]=='histories':
return ('datasets',dict(h_name=unesc_filename(parts[1])))
elif len(parts)==3 and parts[0]=='histories':
# Path: histories/<history_name>/<data_name>
# OR histories/<history_name>/<collection_name>
return ('historydataorcoll',dict(h_name=unesc_filename(parts[1]), ds_name=unesc_filename(parts[2])))
elif len(parts)==4 and parts[0]=='histories':
# Path: histories/<history_name>/<coll_name>/<dataset_name>
return ('collectiondataset',dict(h_name=unesc_filename(parts[1]), c_name=unesc_filename(parts[2]),
ds_name=unesc_filename(parts[3])))
print "Unknown : %s"%path
return ('',0)
# Escape/unescape slashes in filenames
def esc_filename(fname):
def esc(m):
c=m.group(0)
if c=='%':
return '%%'
elif c=='/':
return '%-'
return re.sub(r'%|/', esc, fname)
# Escape/unescape slashes in filenames
def unesc_filename(fname):
def unesc(m):
str=m.group(0)
if str=='%%':
return '%'
elif str=='%-':
return '/'
return re.sub(r'%(.)', unesc, fname)
def parse_name_with_id(fname):
m = re.match(r"^(?P<name>.*)-(?P<id>[0-9a-f]{16})", fname)
if m is not None:
return (m.group('name'), m.group('id'))
else:
return (fname,'')
class Context(LoggingMixIn, Operations):
'Prototype FUSE to galaxy histories'
def __init__(self, api_key):
self.gi = galaxy.GalaxyInstance(url='http://127.0.0.1:80/galaxy/', key=api_key)
self.filtered_datasets_cache = {}
self.full_datasets_cache = {}
self.histories_cache = {'time':None, 'contents':None}
def getattr(self, path, fh=None):
(typ,kw) = path_type(path)
now = time.time()
if typ=='root' or typ=='histories':
# Simple directory
st = dict(st_mode=(S_IFDIR | 0555), st_nlink=2)
st['st_ctime'] = st['st_mtime'] = st['st_atime'] = now
elif typ=='datasets':
# Simple directory
st = dict(st_mode=(S_IFDIR | 0555), st_nlink=2)
st['st_ctime'] = st['st_mtime'] = st['st_atime'] = now
elif typ=='historydataorcoll':
# Dataset or collection
d = self._dataset(kw)
if d['history_content_type'] == 'dataset_collection':
# A collection, will be a simple directory.
st = dict(st_mode=(S_IFDIR | 0555), st_nlink=2)
st['st_ctime'] = st['st_mtime'] = st['st_atime'] = now
else:
# A file, will be a symlink to a galaxy dataset.
t = time.mktime(time.strptime(d['update_time'],'%Y-%m-%dT%H:%M:%S.%f'))
fname = esc_filename(d.get('file_path', d['file_name']))
st = dict(st_mode=(S_IFLNK | 0444), st_nlink=1,
st_size=len(fname), st_ctime=t, st_mtime=t,
st_atime=t)
elif typ=='collectiondataset':
# A file within a collection, will be a symlink to a galaxy dataset.
d = self._dataset(kw, display=False)
t = time.mktime(time.strptime(d['update_time'],'%Y-%m-%dT%H:%M:%S.%f'))
fname = esc_filename(d.get('file_path', d['file_name']))
st = dict(st_mode=(S_IFLNK | 0444), st_nlink=1,
st_size=len(fname), st_ctime=t, st_mtime=t,
st_atime=t)
else:
raise FuseOSError(ENOENT)
return st
# Return a symlink for the given dataset
def readlink(self, path):
(typ,kw) = path_type(path)
if typ=='historydataorcoll':
# Dataset inside history.
d = self._dataset(kw)
# We have already checked that one of these keys is present
return d.get('file_path', d['file_name'])
elif typ=='collectiondataset':
# Dataset inside collection.
d = self._dataset(kw, display=False)
# We have already checked that one of these keys is present
return d.get('file_path', d['file_name'])
raise FuseOSError(ENOENT)
def read(self, path, size, offset, fh):
raise RuntimeError('unexpected path: %r' % path)
# Lookup all histories in galaxy; cache
def _histories(self):
cache = self.histories_cache
now = time.time()
if cache['contents'] is None or now - cache['time'] > CACHE_TIME:
cache['time'] = now
cache['contents'] = self.gi.histories.get_histories()
return cache['contents']
# Find a specific history by name
def _history(self,h_name):
(fixed_name, hist_id) = parse_name_with_id(h_name)
h = filter(lambda x: x['name']==fixed_name, self._histories())
if len(h)==0:
raise FuseOSError(ENOENT)
if len(h)>1:
h = filter(lambda x: x['id']==hist_id, self._histories())
if len(h)==0:
raise FuseOSError(ENOENT)
if len(h)>1:
print "Too many histories with identical names and IDs"
return h[0]
return h[0]
# Lookup visible datasets in the specified history; cache
# This will not return deleted or hidden datasets.
def _filtered_datasets(self, h):
id = h['id']
cache = self.filtered_datasets_cache
now = time.time()
if id not in cache or now - cache[id]['time'] > CACHE_TIME:
cache[id] = {'time':now,
'contents':self.gi.histories.show_history(id,contents=True,details='all', deleted=False, visible=True)}
return cache[id]['contents']
# Lookup all datasets in the specified history; cache
# This will return hidden datasets. Will not return deleted datasets.
def _all_datasets(self, h):
id = h['id']
cache = self.full_datasets_cache
now = time.time()
if id not in cache or now - cache[id]['time'] > CACHE_TIME:
cache[id] = {'time':now,
'contents':self.gi.histories.show_history(id,contents=True,details='all', deleted=False)}
return cache[id]['contents']
# Find a specific dataset - the 'kw' parameter is from path_type() above
# Will also handle dataset collections.
def _dataset(self, kw, display=True):
h = self._history(kw['h_name'])
if display:
ds = self._filtered_datasets(h)
else:
ds = self._all_datasets(h)
(d_name, d_id) = parse_name_with_id(kw['ds_name'])
d = filter(lambda x: x['name']==d_name, ds)
if len(d)==0:
raise FuseOSError(ENOENT)
if len(d)>1:
d = filter(lambda x: x['name']==d_name and x['id'] == d_id, ds)
if len(d)==0:
raise FuseOSError(ENOENT)
if len(d)>1:
print "Too many datasets with that name and ID"
return d[0]
# This is a collection. Deal with it upstream.
if d[0]['history_content_type'] == 'dataset_collection':
return d[0]
# Some versions of the Galaxy API use file_path and some file_name
if 'file_path' not in d[0] and 'file_name' not in d[0]:
print "Unable to find file of dataset. Have you set : expose_dataset_path = True"
raise FuseOSError(ENOENT)
return d[0]
# read directory contents
def readdir(self, path, fh):
(typ,kw) = path_type(path)
if typ=='root':
return ['.', '..', 'histories']
elif typ=='histories':
hl = self._histories()
# Count duplicates
hist_count = {}
for h in hl:
try:
hist_count[h['name']] += 1
except:
hist_count[h['name']] = 1
# Build up results manually
results = ['.', '..']
for h in hl:
if h['name'] in hist_count and hist_count[h['name']] > 1:
results.append(esc_filename(h['name'] + '-' + h['id']))
else:
results.append(esc_filename(h['name']))
return results
elif typ=='datasets':
h = self._history(kw['h_name'])
ds = self._filtered_datasets(h)
# Count duplicates
d_count = {}
for d in ds:
try:
d_count[d['name']] += 1
except:
d_count[d['name']] = 1
results = ['.', '..']
for d in ds:
if d['name'] in d_count and d_count[d['name']] > 1:
results.append(esc_filename(d['name'] + '-' + d['id']))
else:
results.append(esc_filename(d['name']))
return results
elif typ=='historydataorcoll':
# This is a dataset collection
# Get the datasets in the collection
ds = [x['object'] for x in self._dataset(kw)['elements']]
# Get all datasets - we need this for checking and handling duplicates
# Handles the situation in which duplicates in history and
# one (or more) of the duplicates are in collection.
h = self._history(kw['h_name'])
all_ds = self._all_datasets(h)
# Count duplicates
d_count = {}
for d in all_ds:
try:
d_count[d['name']] += 1
except:
d_count[d['name']] = 1
results = ['.', '..']
for d in ds:
if d['name'] in d_count and d_count[d['name']] > 1:
results.append(esc_filename(d['name'] + '-' + d['id']))
else:
results.append(esc_filename(d['name']))
return results
# Disable unused operations:
access = None
flush = None
getxattr = None
listxattr = None
open = None
opendir = None
release = None
releasedir = None
statfs = None
if __name__ == '__main__':
parser = argparse.ArgumentParser(description="Mount Galaxy Datasets for direct read access using FUSE.")
parser.add_argument("apikey",
help="Galaxy API key for the account to read")
parser.add_argument("-m", "--mountpoint", default="galaxy_files",
help="Directory under which to mount the Galaxy Datasets.")
args = parser.parse_args()
# Create the directory if it does not exist
if not os.path.exists(args.mountpoint):
os.makedirs(args.mountpoint)
fuse = FUSE(Context(args.apikey),
args.mountpoint,
foreground=True,
ro=True)