3
3
4
4
import os
5
5
import ssl
6
- import json
7
6
import time
8
7
import sys
9
8
import logging
13
12
import multiprocessing
14
13
import six
15
14
16
- import irods
17
- import irods .data_object
18
- from irods .data_object import iRODSDataObjectFileRaw , iRODSDataObject
15
+ from irods .data_object import iRODSDataObject
19
16
from irods .exception import DataObjectDoesNotExist
20
- from irods .message import ( StringStringMap , FileOpenRequest , iRODSMessage )
21
- from irods .api_number import api_number
22
17
import irods .keywords as kw
23
- from collections import OrderedDict
24
18
from six .moves .queue import Queue ,Full ,Empty
25
19
26
20
@@ -44,6 +38,7 @@ def __init__(self, n):
44
38
self .barrier = threading .Semaphore (0 )
45
39
def wait (self ):
46
40
"""Per-thread wait function.
41
+
47
42
As in Python3.2 threading, returns 0 <= wait_serial_int < n
48
43
"""
49
44
self .mutex .acquire ()
@@ -58,6 +53,7 @@ def wait(self):
58
53
@contextlib .contextmanager
59
54
def enableLogging (handlerType ,args ,level_ = logging .INFO ):
60
55
"""Context manager for temporarily enabling a logger. For debug or test.
56
+
61
57
Usage Example -
62
58
with irods.parallel.enableLogging(logging.FileHandler,('/tmp/logfile.txt',)):
63
59
# parallel put/get code here
@@ -98,6 +94,10 @@ def set_transfer_done_callback( self, callback ):
98
94
self .done_callback = callback
99
95
100
96
def __init__ (self , futuresList , callback = None , progress_Queue = None , total = None , keep_ = ()):
97
+ """AsyncNotify initialization (used internally to the io.parallel library).
98
+ The casual user will only be concerned with the callback parameter, called when all threads
99
+ of the parallel PUT or GET have been terminated and the data object closed.
100
+ """
101
101
self ._futures = set (futuresList )
102
102
self ._futures_done = dict ()
103
103
self .keep = dict (keep_ )
@@ -173,15 +173,21 @@ def futures_done(self): return dict(self._futures_done)
173
173
174
174
175
175
class Oper (object ):
176
-
177
176
"""A custom enum-type class with utility methods. """
178
177
179
178
GET = 0
180
179
PUT = 1
181
180
NONBLOCKING = 2
182
181
183
- def __int__ (self ): return self ._opr
184
- def __init__ (self , rhs ): self ._opr = int (rhs )
182
+ def __int__ (self ):
183
+ """Return the stored flags as an integer bitmask. """
184
+ return self ._opr
185
+
186
+ def __init__ (self , rhs ):
187
+ """Initialize with a bit mask of flags ie. whether Operation PUT or GET,
188
+ and whether NONBLOCKING."""
189
+ self ._opr = int (rhs )
190
+
185
191
def isPut (self ): return 0 != (self ._opr & self .PUT )
186
192
def isGet (self ): return not self .isPut ()
187
193
def isNonBlocking (self ): return 0 != (self ._opr & self .NONBLOCKING )
@@ -290,7 +296,6 @@ def _io_multipart_threaded(operation_ , dataObj_and_IO, replica_token, hier_str,
290
296
total_size , num_threads = 0 , ** extra_options ):
291
297
"""Called by _io_main.
292
298
Carve up (0,total_size) range into `num_threads` parts and initiate a transfer thread for each one."""
293
-
294
299
(D , Io ) = dataObj_and_IO
295
300
Operation = Oper ( operation_ )
296
301
0 commit comments