@@ -50,14 +50,37 @@ class MMapCache(BaseCache):
50
50
Ensure there is enough disc space in the temporary location.
51
51
52
52
This cache method might only work on posix
53
+
54
+ Parameters
55
+ ----------
56
+ blocksize: int
57
+ How far to read ahead in numbers of bytes
58
+ fetcher: func
59
+ Function of the form f(start, end) which gets bytes from remote as
60
+ specified
61
+ size: int
62
+ How big this file is
63
+ location: str
64
+ Where to create the temporary file. If None, a temporary file is
65
+ created using tempfile.TemporaryFile().
66
+ blocks: set
67
+ Set of block numbers that have already been fetched. If None, an empty
68
+ set is created.
69
+ multi_fetcher: func
70
+ Function of the form f([(start, end)]) which gets bytes from remote
71
+ as specified. This function is used to fetch multiple blocks at once.
72
+ If not specified, the fetcher function is used instead.
53
73
"""
54
74
55
75
name = "mmap"
56
76
57
- def __init__ (self , blocksize , fetcher , size , location = None , blocks = None ):
77
+ def __init__ (
78
+ self , blocksize , fetcher , size , location = None , blocks = None , multi_fetcher = None
79
+ ):
58
80
super ().__init__ (blocksize , fetcher , size )
59
81
self .blocks = set () if blocks is None else blocks
60
82
self .location = location
83
+ self .multi_fetcher = multi_fetcher
61
84
self .cache = self ._makefile ()
62
85
63
86
def _makefile (self ):
@@ -93,16 +116,30 @@ def _fetch(self, start, end):
93
116
start_block = start // self .blocksize
94
117
end_block = end // self .blocksize
95
118
need = [i for i in range (start_block , end_block + 1 ) if i not in self .blocks ]
119
+ ranges = []
96
120
while need :
97
121
# TODO: not a for loop so we can consolidate blocks later to
98
- # make fewer fetch calls; this could be parallel
122
+ # make fewer fetch calls
99
123
i = need .pop (0 )
100
124
sstart = i * self .blocksize
101
125
send = min (sstart + self .blocksize , self .size )
102
- logger .debug (f"MMap get block #{ i } ({ sstart } -{ send } " )
103
- self .cache [sstart :send ] = self .fetcher (sstart , send )
126
+ ranges .append ((sstart , send ))
104
127
self .blocks .add (i )
105
128
129
+ if not ranges :
130
+ return self .cache [start :end ]
131
+
132
+ if self .multi_fetcher :
133
+ logger .debug (f"MMap get blocks { ranges } " )
134
+ for idx , r in enumerate (self .multi_fetcher (ranges )):
135
+ (sstart , send ) = ranges [idx ]
136
+ logger .debug (f"MMap copy block ({ sstart } -{ send } " )
137
+ self .cache [sstart :send ] = r
138
+ else :
139
+ for (sstart , send ) in ranges :
140
+ logger .debug (f"MMap get block ({ sstart } -{ send } " )
141
+ self .cache [sstart :send ] = self .fetcher (sstart , send )
142
+
106
143
return self .cache [start :end ]
107
144
108
145
def __getstate__ (self ):
0 commit comments