Skip to content

Commit f458b0a

Browse files
authored
fileservice: fix object storage semaphore (#18589)
fix object storage semaphore Approved by: @sukki37
1 parent 3ec169d commit f458b0a

File tree

3 files changed

+25
-7
lines changed

3 files changed

+25
-7
lines changed

pkg/fileservice/io.go

+8
Original file line numberDiff line numberDiff line change
@@ -70,3 +70,11 @@ var ioBufferPool = NewPool(
7070
nil,
7171
nil,
7272
)
73+
74+
type readerFunc func([]byte) (int, error)
75+
76+
var _ io.Reader = readerFunc(nil)
77+
78+
func (r readerFunc) Read(p []byte) (n int, err error) {
79+
return r(p)
80+
}

pkg/fileservice/object_storage_semaphore.go

+16-6
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ package fileservice
1717
import (
1818
"context"
1919
"io"
20+
"sync"
2021
"time"
2122
)
2223

@@ -70,14 +71,23 @@ func (o *objectStorageSemaphore) Read(ctx context.Context, key string, min *int6
7071
o.release()
7172
return nil, err
7273
}
73-
released := false
74+
75+
release := sync.OnceFunc(func() {
76+
o.release()
77+
})
78+
7479
return &readCloser{
75-
r: r,
76-
closeFunc: func() error {
77-
if !released {
78-
o.release()
79-
released = true
80+
r: readerFunc(func(buf []byte) (n int, err error) {
81+
n, err = r.Read(buf)
82+
if err != nil {
83+
// release if error
84+
release()
8085
}
86+
return
87+
}),
88+
closeFunc: func() error {
89+
// release when close
90+
release()
8191
return r.Close()
8292
},
8393
}, nil

pkg/fileservice/s3_fs.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ func NewS3FS(
113113
// limit number of concurrent operations
114114
concurrency := args.Concurrency
115115
if concurrency == 0 {
116-
concurrency = 100
116+
concurrency = 1024
117117
}
118118
fs.storage = newObjectStorageSemaphore(
119119
fs.storage,

0 commit comments

Comments
 (0)