-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathutils.py
56 lines (49 loc) · 2.05 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import cloudpickle
import lz4.frame
import functools
import inspect
import concurrent.futures
def compress_and_dumps(obj):
return lz4.frame.compress(cloudpickle.dumps(obj), compression_level=lz4.frame.COMPRESSIONLEVEL_MINHC)
def decompress_and_loads(obj):
return cloudpickle.loads(lz4.frame.decompress(obj))
def run_one_worker(args):
import worker
from config import REDIS_URL
w = worker.Worker(REDIS_URL, verbose=False)
w.start_pubsub()
w.run()
def start_local_workers(n):
executor = concurrent.futures.ProcessPoolExecutor(n)
worker_futures = [executor.submit(run_one_worker, i) for i in range(n)]
return executor, worker_futures
@functools.lru_cache(maxsize=256)
def get_chunking(filelist, chunksize, treename="Events", workers=12, skip_bad_files=False):
"""
Return 2-tuple of
- chunks: triplets of (filename,entrystart,entrystop) calculated with input `chunksize` and `filelist`
- total_nevents: total event count over `filelist`
"""
import uproot
from tqdm.auto import tqdm
chunks = []
nevents = 0
if skip_bad_files:
# slightly slower (serial loop), but can skip bad files
for fname in tqdm(filelist):
try:
items = uproot.numentries(fname, treename, total=False).items()
except (IndexError, ValueError) as e:
print("Skipping bad file", fname)
continue
for fn, nentries in items:
nevents += nentries
for index in range(nentries // chunksize + 1):
chunks.append((fn, chunksize*index, min(chunksize*(index+1), nentries)))
else:
executor = None if len(filelist) < 5 else concurrent.futures.ThreadPoolExecutor(min(workers, len(filelist)))
for fn, nentries in uproot.numentries(filelist, treename, total=False, executor=executor).items():
nevents += nentries
for index in range(nentries // chunksize + 1):
chunks.append((fn, chunksize*index, min(chunksize*(index+1), nentries)))
return chunks, nevents