Skip to content

Commit 4653cad

Browse files
rremerRoyce Remer
authored and
Royce Remer
committed
Make LFS http_client parallel within a batch.
Signed-off-by: Royce Remer <[email protected]>
1 parent ec2d159 commit 4653cad

File tree

7 files changed

+103
-68
lines changed

7 files changed

+103
-68
lines changed

Diff for: custom/conf/app.example.ini

+2
Original file line numberDiff line numberDiff line change
@@ -2645,6 +2645,8 @@ LEVEL = Info
26452645
;[lfs_client]
26462646
;; When mirroring an upstream lfs endpoint, limit the number of pointers in each batch request to this number
26472647
;BATCH_SIZE = 20
2648+
;; When mirroring an upstream lfs endpoint, limit the number of concurrent upload/download operations within a batch
2649+
;BATCH_OPERATION_CONCURRENCY = 20
26482650

26492651
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
26502652
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;

Diff for: go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ require (
124124
golang.org/x/image v0.21.0
125125
golang.org/x/net v0.30.0
126126
golang.org/x/oauth2 v0.23.0
127+
golang.org/x/sync v0.8.0
127128
golang.org/x/sys v0.26.0
128129
golang.org/x/text v0.19.0
129130
golang.org/x/tools v0.26.0
@@ -316,7 +317,6 @@ require (
316317
go.uber.org/zap v1.27.0 // indirect
317318
golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c // indirect
318319
golang.org/x/mod v0.21.0 // indirect
319-
golang.org/x/sync v0.8.0 // indirect
320320
golang.org/x/time v0.7.0 // indirect
321321
google.golang.org/genproto/googleapis/rpc v0.0.0-20241021214115-324edc3d5d38 // indirect
322322
gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc // indirect

Diff for: modules/lfs/http_client.go

+74-51
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ import (
1717
"code.gitea.io/gitea/modules/log"
1818
"code.gitea.io/gitea/modules/proxy"
1919
"code.gitea.io/gitea/modules/setting"
20+
21+
"golang.org/x/sync/errgroup"
2022
)
2123

2224
// HTTPClient is used to communicate with the LFS server
@@ -113,6 +115,7 @@ func (c *HTTPClient) Upload(ctx context.Context, objects []Pointer, callback Upl
113115
return c.performOperation(ctx, objects, nil, callback)
114116
}
115117

118+
// performOperation takes a slice of LFS object pointers, batches them, and performs the upload/download operations concurrently in each batch
116119
func (c *HTTPClient) performOperation(ctx context.Context, objects []Pointer, dc DownloadCallback, uc UploadCallback) error {
117120
if len(objects) == 0 {
118121
return nil
@@ -133,71 +136,91 @@ func (c *HTTPClient) performOperation(ctx context.Context, objects []Pointer, dc
133136
return fmt.Errorf("TransferAdapter not found: %s", result.Transfer)
134137
}
135138

139+
errGroup, groupCtx := errgroup.WithContext(ctx)
140+
errGroup.SetLimit(setting.LFSClient.BatchConcurrency)
136141
for _, object := range result.Objects {
137-
if object.Error != nil {
138-
log.Trace("Error on object %v: %v", object.Pointer, object.Error)
139-
if uc != nil {
140-
if _, err := uc(object.Pointer, object.Error); err != nil {
141-
return err
142-
}
143-
} else {
144-
if err := dc(object.Pointer, nil, object.Error); err != nil {
145-
return err
146-
}
147-
}
148-
continue
149-
}
150-
151-
if uc != nil {
152-
if len(object.Actions) == 0 {
153-
log.Trace("%v already present on server", object.Pointer)
154-
continue
155-
}
142+
errGroup.Go(func() error {
143+
err := performSingleOperation(groupCtx, object, dc, uc, transferAdapter)
144+
return err
145+
})
146+
}
156147

157-
link, ok := object.Actions["upload"]
158-
if !ok {
159-
log.Debug("%+v", object)
160-
return errors.New("missing action 'upload'")
161-
}
148+
// only the first error is returned, preserving legacy behavior before concurrency
149+
return errGroup.Wait()
150+
}
162151

163-
content, err := uc(object.Pointer, nil)
164-
if err != nil {
165-
return err
166-
}
152+
// performSingleOperation performs an LFS upload or download operation on a single object
153+
func performSingleOperation(ctx context.Context, object *ObjectResponse, dc DownloadCallback, uc UploadCallback, transferAdapter TransferAdapter) error {
154+
// the response from an lfs batch api request for this specific object id contained an error
155+
if object.Error != nil {
156+
log.Trace("Error on object %v: %v", object.Pointer, object.Error)
167157

168-
err = transferAdapter.Upload(ctx, link, object.Pointer, content)
169-
if err != nil {
158+
// this was an 'upload' request inside the batch request
159+
if uc != nil {
160+
if _, err := uc(object.Pointer, object.Error); err != nil {
170161
return err
171162
}
172-
173-
link, ok = object.Actions["verify"]
174-
if ok {
175-
if err := transferAdapter.Verify(ctx, link, object.Pointer); err != nil {
176-
return err
177-
}
178-
}
179163
} else {
180-
link, ok := object.Actions["download"]
181-
if !ok {
182-
// no actions block in response, try legacy response schema
183-
link, ok = object.Links["download"]
184-
}
185-
if !ok {
186-
log.Debug("%+v", object)
187-
return errors.New("missing action 'download'")
164+
// this was NOT an 'upload' request inside the batch request, meaning it must be a 'download' request
165+
err := dc(object.Pointer, nil, object.Error)
166+
if errors.Is(object.Error, ErrObjectNotExist) {
167+
log.Warn("Ignoring missing upstream LFS object %-v: %v", object.Pointer, err)
168+
return nil
188169
}
189170

190-
content, err := transferAdapter.Download(ctx, link)
191-
if err != nil {
192-
return err
193-
}
171+
// this was a 'download' request which was a legitimate error response from the batch api (not an http/404)
172+
return err
173+
}
174+
}
194175

195-
if err := dc(object.Pointer, content, nil); err != nil {
176+
// the response from an lfs batch api request contained necessary upload/download fields to act upon
177+
if uc != nil {
178+
if len(object.Actions) == 0 {
179+
log.Trace("%v already present on server", object.Pointer)
180+
return nil
181+
}
182+
183+
link, ok := object.Actions["upload"]
184+
if !ok {
185+
return errors.New("missing action 'upload'")
186+
}
187+
188+
content, err := uc(object.Pointer, nil)
189+
if err != nil {
190+
return err
191+
}
192+
193+
err = transferAdapter.Upload(ctx, link, object.Pointer, content)
194+
if err != nil {
195+
return err
196+
}
197+
198+
link, ok = object.Actions["verify"]
199+
if ok {
200+
if err := transferAdapter.Verify(ctx, link, object.Pointer); err != nil {
196201
return err
197202
}
198203
}
199-
}
204+
} else {
205+
link, ok := object.Actions["download"]
206+
if !ok {
207+
// no actions block in response, try legacy response schema
208+
link, ok = object.Links["download"]
209+
}
210+
if !ok {
211+
log.Debug("%+v", object)
212+
return errors.New("missing action 'download'")
213+
}
200214

215+
content, err := transferAdapter.Download(ctx, link)
216+
if err != nil {
217+
return err
218+
}
219+
220+
if err := dc(object.Pointer, content, nil); err != nil {
221+
return err
222+
}
223+
}
201224
return nil
202225
}
203226

Diff for: modules/lfs/http_client_test.go

+7-10
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"testing"
1313

1414
"code.gitea.io/gitea/modules/json"
15+
"code.gitea.io/gitea/modules/setting"
1516

1617
"github.com/stretchr/testify/assert"
1718
)
@@ -211,36 +212,31 @@ func TestHTTPClientDownload(t *testing.T) {
211212
expectederror: "TransferAdapter not found: ",
212213
},
213214
// case 5
214-
{
215-
endpoint: "https://error-in-response-objects.io",
216-
expectederror: "Object not found",
217-
},
218-
// case 6
219215
{
220216
endpoint: "https://empty-actions-map.io",
221217
expectederror: "missing action 'download'",
222218
},
223-
// case 7
219+
// case 6
224220
{
225221
endpoint: "https://download-actions-map.io",
226222
expectederror: "",
227223
},
228-
// case 8
224+
// case 7
229225
{
230226
endpoint: "https://upload-actions-map.io",
231227
expectederror: "missing action 'download'",
232228
},
233-
// case 9
229+
// case 8
234230
{
235231
endpoint: "https://verify-actions-map.io",
236232
expectederror: "missing action 'download'",
237233
},
238-
// case 10
234+
// case 9
239235
{
240236
endpoint: "https://unknown-actions-map.io",
241237
expectederror: "missing action 'download'",
242238
},
243-
// case 11
239+
// case 10
244240
{
245241
endpoint: "https://legacy-batch-request-download.io",
246242
expectederror: "",
@@ -255,6 +251,7 @@ func TestHTTPClientDownload(t *testing.T) {
255251
"dummy": dummy,
256252
},
257253
}
254+
setting.LFSClient.BatchConcurrency = 1
258255

259256
err := client.Download(context.Background(), []Pointer{p}, func(p Pointer, content io.ReadCloser, objectError error) error {
260257
if objectError != nil {

Diff for: modules/repository/repo.go

-5
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ package repository
55

66
import (
77
"context"
8-
"errors"
98
"fmt"
109
"io"
1110
"strings"
@@ -182,10 +181,6 @@ func StoreMissingLfsObjectsInRepository(ctx context.Context, repo *repo_model.Re
182181
downloadObjects := func(pointers []lfs.Pointer) error {
183182
err := lfsClient.Download(ctx, pointers, func(p lfs.Pointer, content io.ReadCloser, objectError error) error {
184183
if objectError != nil {
185-
if errors.Is(objectError, lfs.ErrObjectNotExist) {
186-
log.Warn("Repo[%-v]: Ignore missing LFS object %-v: %v", repo, p, objectError)
187-
return nil
188-
}
189184
return objectError
190185
}
191186

Diff for: modules/setting/lfs.go

+6-1
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@ var LFS = struct {
2828

2929
// LFSClient represents configuration for Gitea's LFS clients, for example: mirroring upstream Git LFS
3030
var LFSClient = struct {
31-
BatchSize int `ini:"BATCH_SIZE"`
31+
BatchSize int `ini:"BATCH_SIZE"`
32+
BatchConcurrency int `ini:"BATCH_OPERATION_CONCURRENCY"`
3233
}{}
3334

3435
func loadLFSFrom(rootCfg ConfigProvider) error {
@@ -66,6 +67,10 @@ func loadLFSFrom(rootCfg ConfigProvider) error {
6667
LFSClient.BatchSize = 20
6768
}
6869

70+
if LFSClient.BatchConcurrency < 1 {
71+
LFSClient.BatchConcurrency = LFSClient.BatchSize
72+
}
73+
6974
LFS.HTTPAuthExpiry = sec.Key("LFS_HTTP_AUTH_EXPIRY").MustDuration(24 * time.Hour)
7075

7176
if !LFS.StartServer || !InstallLock {

Diff for: modules/setting/lfs_test.go

+13
Original file line numberDiff line numberDiff line change
@@ -114,4 +114,17 @@ BATCH_SIZE = 0
114114
assert.NoError(t, loadLFSFrom(cfg))
115115
assert.EqualValues(t, 100, LFS.MaxBatchSize)
116116
assert.EqualValues(t, 20, LFSClient.BatchSize)
117+
assert.EqualValues(t, 20, LFSClient.BatchConcurrency)
118+
119+
iniStr = `
120+
[lfs_client]
121+
BATCH_SIZE = 50
122+
BATCH_OPERATION_CONCURRENCY = 10
123+
`
124+
cfg, err = NewConfigProviderFromData(iniStr)
125+
assert.NoError(t, err)
126+
127+
assert.NoError(t, loadLFSFrom(cfg))
128+
assert.EqualValues(t, 50, LFSClient.BatchSize)
129+
assert.EqualValues(t, 10, LFSClient.BatchConcurrency)
117130
}

0 commit comments

Comments
 (0)