Skip to content

Commit 41b3251

Browse files
authored
Merge pull request #10 from 0xsequence/ensure-only-one-open-is-in-flight
Open & Prefetch optimization
2 parents c6fc296 + 56d0951 commit 41b3251

File tree

1 file changed

+37
-6
lines changed

1 file changed

+37
-6
lines changed

common.go

Lines changed: 37 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -168,10 +168,7 @@ func (f *File) Create(ctx context.Context, fs storage.FS) (io.WriteCloser, error
168168
}
169169

170170
func (f *File) Open(ctx context.Context, fs storage.FS) (io.ReadCloser, error) {
171-
f.mu.Lock()
172171
prefetchedRdr := f.prefetched()
173-
f.mu.Unlock()
174-
175172
if prefetchedRdr != nil {
176173
return prefetchedRdr, nil
177174
}
@@ -180,10 +177,25 @@ func (f *File) Open(ctx context.Context, fs storage.FS) (io.ReadCloser, error) {
180177

181178
func (f *File) Prefetch(ctx context.Context, fs storage.FS) error {
182179
f.mu.Lock()
180+
// check if is already prefetched
183181
if f.prefetchBuffer != nil {
184182
f.mu.Unlock()
185183
return nil
186184
}
185+
// check if prefetch is in progress
186+
if f.prefetchCtx != nil {
187+
prefetchCtx := f.prefetchCtx
188+
<-prefetchCtx.Done()
189+
f.mu.Unlock()
190+
return nil
191+
}
192+
193+
// prepare prefetch context
194+
prefetchCtx, cancelPrefetch := context.WithCancel(ctx)
195+
defer cancelPrefetch()
196+
197+
// set prefetch context
198+
f.prefetchCtx = prefetchCtx
187199
f.mu.Unlock()
188200

189201
rdr, err := f.open(ctx, fs)
@@ -236,11 +248,30 @@ func (f *File) open(ctx context.Context, fs storage.FS) (io.ReadCloser, error) {
236248
}
237249

238250
func (f *File) prefetched() io.ReadCloser {
239-
if f.prefetchBuffer != nil {
240-
rdr := io.NopCloser(bytes.NewReader(f.prefetchBuffer))
241-
f.prefetchBuffer = nil
251+
f.mu.Lock()
252+
prefetchCtx := f.prefetchCtx
253+
prefetchBuffer := f.prefetchBuffer
254+
f.prefetchBuffer = nil
255+
f.mu.Unlock()
256+
257+
if prefetchBuffer != nil {
258+
// already prefetched
259+
rdr := io.NopCloser(bytes.NewReader(prefetchBuffer))
242260
return rdr
261+
} else if prefetchCtx != nil {
262+
// prefetch in progress
263+
<-prefetchCtx.Done()
264+
265+
f.mu.Lock()
266+
defer f.mu.Unlock()
267+
// check if prefetch was successful
268+
if f.prefetchBuffer != nil {
269+
rdr := io.NopCloser(bytes.NewReader(f.prefetchBuffer))
270+
f.prefetchBuffer = nil
271+
return rdr
272+
}
243273
}
274+
// no prefetch
244275
return nil
245276
}
246277

0 commit comments

Comments
 (0)