Skip to content

Make LFS http_client parallel within a batch. #32369

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions custom/conf/app.example.ini
Original file line number Diff line number Diff line change
Expand Up @@ -2645,6 +2645,8 @@ LEVEL = Info
;[lfs_client]
;; When mirroring an upstream lfs endpoint, limit the number of pointers in each batch request to this number
;BATCH_SIZE = 20
;; When mirroring an upstream lfs endpoint, limit the number of concurrent upload/download operations within a batch
;BATCH_OPERATION_CONCURRENCY = 20

;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ require (
golang.org/x/image v0.21.0
golang.org/x/net v0.30.0
golang.org/x/oauth2 v0.23.0
golang.org/x/sync v0.8.0
golang.org/x/sys v0.26.0
golang.org/x/text v0.19.0
golang.org/x/tools v0.26.0
Expand Down Expand Up @@ -316,7 +317,6 @@ require (
go.uber.org/zap v1.27.0 // indirect
golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c // indirect
golang.org/x/mod v0.21.0 // indirect
golang.org/x/sync v0.8.0 // indirect
golang.org/x/time v0.7.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20241021214115-324edc3d5d38 // indirect
gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc // indirect
Expand Down
125 changes: 74 additions & 51 deletions modules/lfs/http_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/proxy"
"code.gitea.io/gitea/modules/setting"

"golang.org/x/sync/errgroup"
)

// HTTPClient is used to communicate with the LFS server
Expand Down Expand Up @@ -113,6 +115,7 @@ func (c *HTTPClient) Upload(ctx context.Context, objects []Pointer, callback Upl
return c.performOperation(ctx, objects, nil, callback)
}

// performOperation takes a slice of LFS object pointers, batches them, and performs the upload/download operations concurrently in each batch
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One more thing, I do not think "batch size" means "concurrency limit" in this case. If I understand correctly:

  • "batch size" means how many objects to be processed in one time.
  • "concurrency limit" means how many concurrent connections to the LFS server.

Maybe "batch size" could be 100 or 1000 without causing problem, but "concurrency limit" should be much smaller, eg: 5 or 10. A lot of connections to a LFS server might trigger their rate-limit protection, or be considered as somewhat DoS?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm always in favor of making things more configurable, however as an administrator I think most of the time you would want [lfs_client].BATCH_SIZE = (the number of concurrent threads for upload/download within a batch). I'm happy to add a new config though, what should we call it and what should its default be? [lfs_client].BATCH_MAX_THREADS ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe [lfs_client].MAX_OPERATION_CONCURRENCY ? (since there is no "thread" concept in Golang)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair, how about [lfs_client].BATCH_OPERATION_CONCURRENCY, as I feel there may be other single-threaded operations still in this code that we might want configs for then, and this is specific to lfs client batch operations? I updated this branch with that config, and defaulted it to what [lfs_client].BATCH_SIZE if unset or <1

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to match the default git-lfs's behavior:

https://github.com/git-lfs/git-lfs/blob/4cca3f8e7d77c5b86cdfe34c2e6bba2126e3d5cd/docs/man/git-lfs-config.5.ronn#L25-L33

lfs.concurrenttransfers

The number of concurrent uploads/downloads. Default 3.

func (c *HTTPClient) performOperation(ctx context.Context, objects []Pointer, dc DownloadCallback, uc UploadCallback) error {
if len(objects) == 0 {
return nil
Expand All @@ -133,71 +136,91 @@ func (c *HTTPClient) performOperation(ctx context.Context, objects []Pointer, dc
return fmt.Errorf("TransferAdapter not found: %s", result.Transfer)
}

errGroup, groupCtx := errgroup.WithContext(ctx)
errGroup.SetLimit(setting.LFSClient.BatchConcurrency)
for _, object := range result.Objects {
if object.Error != nil {
log.Trace("Error on object %v: %v", object.Pointer, object.Error)
if uc != nil {
if _, err := uc(object.Pointer, object.Error); err != nil {
return err
}
} else {
if err := dc(object.Pointer, nil, object.Error); err != nil {
return err
}
}
continue
}

if uc != nil {
if len(object.Actions) == 0 {
log.Trace("%v already present on server", object.Pointer)
continue
}
errGroup.Go(func() error {
err := performSingleOperation(groupCtx, object, dc, uc, transferAdapter)
return err
})
}

link, ok := object.Actions["upload"]
if !ok {
log.Debug("%+v", object)
return errors.New("missing action 'upload'")
}
// only the first error is returned, preserving legacy behavior before concurrency
return errGroup.Wait()
}

content, err := uc(object.Pointer, nil)
if err != nil {
return err
}
// performSingleOperation performs an LFS upload or download operation on a single object
func performSingleOperation(ctx context.Context, object *ObjectResponse, dc DownloadCallback, uc UploadCallback, transferAdapter TransferAdapter) error {
// the response from an lfs batch api request for this specific object id contained an error
if object.Error != nil {
log.Trace("Error on object %v: %v", object.Pointer, object.Error)

err = transferAdapter.Upload(ctx, link, object.Pointer, content)
if err != nil {
// this was an 'upload' request inside the batch request
if uc != nil {
if _, err := uc(object.Pointer, object.Error); err != nil {
return err
}

link, ok = object.Actions["verify"]
if ok {
if err := transferAdapter.Verify(ctx, link, object.Pointer); err != nil {
return err
}
}
} else {
link, ok := object.Actions["download"]
if !ok {
// no actions block in response, try legacy response schema
link, ok = object.Links["download"]
}
if !ok {
log.Debug("%+v", object)
return errors.New("missing action 'download'")
// this was NOT an 'upload' request inside the batch request, meaning it must be a 'download' request
err := dc(object.Pointer, nil, object.Error)
if errors.Is(object.Error, ErrObjectNotExist) {
log.Warn("Ignoring missing upstream LFS object %-v: %v", object.Pointer, err)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to point out that this actually reduces the usefulness of this logline. I moved it out of modules/repository/repo.go, but there it had access to which repository it was operating on, so you know where to investigate if you want to track down the lfs pointer/blob. Here in modules/lfs/http_client.go, I don't have access to the repository name. I was thinking I could add a value to the Context which is passed down from repo to here?

return nil
}

content, err := transferAdapter.Download(ctx, link)
if err != nil {
return err
}
// this was a 'download' request which was a legitimate error response from the batch api (not an http/404)
return err
}
}

if err := dc(object.Pointer, content, nil); err != nil {
// the response from an lfs batch api request contained necessary upload/download fields to act upon
if uc != nil {
if len(object.Actions) == 0 {
log.Trace("%v already present on server", object.Pointer)
return nil
}

link, ok := object.Actions["upload"]
if !ok {
return errors.New("missing action 'upload'")
}

content, err := uc(object.Pointer, nil)
if err != nil {
return err
}

err = transferAdapter.Upload(ctx, link, object.Pointer, content)
if err != nil {
return err
}

link, ok = object.Actions["verify"]
if ok {
if err := transferAdapter.Verify(ctx, link, object.Pointer); err != nil {
return err
}
}
}
} else {
link, ok := object.Actions["download"]
if !ok {
// no actions block in response, try legacy response schema
link, ok = object.Links["download"]
}
if !ok {
log.Debug("%+v", object)
return errors.New("missing action 'download'")
}

content, err := transferAdapter.Download(ctx, link)
if err != nil {
return err
}

if err := dc(object.Pointer, content, nil); err != nil {
return err
}
}
return nil
}

Expand Down
17 changes: 7 additions & 10 deletions modules/lfs/http_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"testing"

"code.gitea.io/gitea/modules/json"
"code.gitea.io/gitea/modules/setting"

"github.com/stretchr/testify/assert"
)
Expand Down Expand Up @@ -211,36 +212,31 @@ func TestHTTPClientDownload(t *testing.T) {
expectederror: "TransferAdapter not found: ",
},
// case 5
{
endpoint: "https://error-in-response-objects.io",
expectederror: "Object not found",
},
// case 6
{
endpoint: "https://empty-actions-map.io",
expectederror: "missing action 'download'",
},
// case 7
// case 6
{
endpoint: "https://download-actions-map.io",
expectederror: "",
},
// case 8
// case 7
{
endpoint: "https://upload-actions-map.io",
expectederror: "missing action 'download'",
},
// case 9
// case 8
{
endpoint: "https://verify-actions-map.io",
expectederror: "missing action 'download'",
},
// case 10
// case 9
{
endpoint: "https://unknown-actions-map.io",
expectederror: "missing action 'download'",
},
// case 11
// case 10
{
endpoint: "https://legacy-batch-request-download.io",
expectederror: "",
Expand All @@ -255,6 +251,7 @@ func TestHTTPClientDownload(t *testing.T) {
"dummy": dummy,
},
}
setting.LFSClient.BatchConcurrency = 1

err := client.Download(context.Background(), []Pointer{p}, func(p Pointer, content io.ReadCloser, objectError error) error {
if objectError != nil {
Expand Down
5 changes: 0 additions & 5 deletions modules/repository/repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package repository

import (
"context"
"errors"
"fmt"
"io"
"strings"
Expand Down Expand Up @@ -182,10 +181,6 @@ func StoreMissingLfsObjectsInRepository(ctx context.Context, repo *repo_model.Re
downloadObjects := func(pointers []lfs.Pointer) error {
err := lfsClient.Download(ctx, pointers, func(p lfs.Pointer, content io.ReadCloser, objectError error) error {
if objectError != nil {
if errors.Is(objectError, lfs.ErrObjectNotExist) {
log.Warn("Repo[%-v]: Ignore missing LFS object %-v: %v", repo, p, objectError)
return nil
}
return objectError
}

Expand Down
7 changes: 6 additions & 1 deletion modules/setting/lfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ var LFS = struct {

// LFSClient represents configuration for Gitea's LFS clients, for example: mirroring upstream Git LFS
var LFSClient = struct {
BatchSize int `ini:"BATCH_SIZE"`
BatchSize int `ini:"BATCH_SIZE"`
BatchConcurrency int `ini:"BATCH_OPERATION_CONCURRENCY"`
}{}

func loadLFSFrom(rootCfg ConfigProvider) error {
Expand Down Expand Up @@ -66,6 +67,10 @@ func loadLFSFrom(rootCfg ConfigProvider) error {
LFSClient.BatchSize = 20
}

if LFSClient.BatchConcurrency < 1 {
LFSClient.BatchConcurrency = LFSClient.BatchSize
}

LFS.HTTPAuthExpiry = sec.Key("LFS_HTTP_AUTH_EXPIRY").MustDuration(24 * time.Hour)

if !LFS.StartServer || !InstallLock {
Expand Down
13 changes: 13 additions & 0 deletions modules/setting/lfs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,4 +114,17 @@ BATCH_SIZE = 0
assert.NoError(t, loadLFSFrom(cfg))
assert.EqualValues(t, 100, LFS.MaxBatchSize)
assert.EqualValues(t, 20, LFSClient.BatchSize)
assert.EqualValues(t, 20, LFSClient.BatchConcurrency)

iniStr = `
[lfs_client]
BATCH_SIZE = 50
BATCH_OPERATION_CONCURRENCY = 10
`
cfg, err = NewConfigProviderFromData(iniStr)
assert.NoError(t, err)

assert.NoError(t, loadLFSFrom(cfg))
assert.EqualValues(t, 50, LFSClient.BatchSize)
assert.EqualValues(t, 10, LFSClient.BatchConcurrency)
}
Loading