Skip to content

Make LFS http_client parallel within a batch. #32369

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion custom/conf/app.example.ini
Original file line number Diff line number Diff line change
Expand Up @@ -2642,9 +2642,15 @@ LEVEL = Info
;; override the azure blob base path if storage type is azureblob
;AZURE_BLOB_BASE_PATH = lfs/

;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; settings for Gitea's LFS client (eg: mirroring an upstream lfs endpoint)
;;
;[lfs_client]
;; When mirroring an upstream lfs endpoint, limit the number of pointers in each batch request to this number
;; Limit the number of pointers in each batch request to this number
;BATCH_SIZE = 20
;; Limit the number of concurrent upload/download operations within a batch
;BATCH_OPERATION_CONCURRENCY = 3

;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ require (
golang.org/x/image v0.21.0
golang.org/x/net v0.30.0
golang.org/x/oauth2 v0.23.0
golang.org/x/sync v0.8.0
golang.org/x/sys v0.26.0
golang.org/x/text v0.19.0
golang.org/x/tools v0.26.0
Expand Down Expand Up @@ -316,7 +317,6 @@ require (
go.uber.org/zap v1.27.0 // indirect
golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c // indirect
golang.org/x/mod v0.21.0 // indirect
golang.org/x/sync v0.8.0 // indirect
golang.org/x/time v0.7.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20241021214115-324edc3d5d38 // indirect
gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc // indirect
Expand Down
119 changes: 69 additions & 50 deletions modules/lfs/http_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/proxy"
"code.gitea.io/gitea/modules/setting"

"golang.org/x/sync/errgroup"
)

// HTTPClient is used to communicate with the LFS server
Expand Down Expand Up @@ -113,6 +115,7 @@ func (c *HTTPClient) Upload(ctx context.Context, objects []Pointer, callback Upl
return c.performOperation(ctx, objects, nil, callback)
}

// performOperation takes a slice of LFS object pointers, batches them, and performs the upload/download operations concurrently in each batch
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One more thing, I do not think "batch size" means "concurrency limit" in this case. If I understand correctly:

  • "batch size" means how many objects to be processed in one time.
  • "concurrency limit" means how many concurrent connections to the LFS server.

Maybe "batch size" could be 100 or 1000 without causing problem, but "concurrency limit" should be much smaller, eg: 5 or 10. A lot of connections to a LFS server might trigger their rate-limit protection, or be considered as somewhat DoS?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm always in favor of making things more configurable, however as an administrator I think most of the time you would want [lfs_client].BATCH_SIZE = (the number of concurrent threads for upload/download within a batch). I'm happy to add a new config though, what should we call it and what should its default be? [lfs_client].BATCH_MAX_THREADS ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe [lfs_client].MAX_OPERATION_CONCURRENCY ? (since there is no "thread" concept in Golang)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair, how about [lfs_client].BATCH_OPERATION_CONCURRENCY, as I feel there may be other single-threaded operations still in this code that we might want configs for then, and this is specific to lfs client batch operations? I updated this branch with that config, and defaulted it to what [lfs_client].BATCH_SIZE if unset or <1

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to match the default git-lfs's behavior:

https://github.com/git-lfs/git-lfs/blob/4cca3f8e7d77c5b86cdfe34c2e6bba2126e3d5cd/docs/man/git-lfs-config.5.ronn#L25-L33

lfs.concurrenttransfers

The number of concurrent uploads/downloads. Default 3.

func (c *HTTPClient) performOperation(ctx context.Context, objects []Pointer, dc DownloadCallback, uc UploadCallback) error {
if len(objects) == 0 {
return nil
Expand All @@ -133,71 +136,87 @@ func (c *HTTPClient) performOperation(ctx context.Context, objects []Pointer, dc
return fmt.Errorf("TransferAdapter not found: %s", result.Transfer)
}

errGroup, groupCtx := errgroup.WithContext(ctx)
errGroup.SetLimit(setting.LFSClient.BatchOperationConcurrency)
for _, object := range result.Objects {
if object.Error != nil {
log.Trace("Error on object %v: %v", object.Pointer, object.Error)
if uc != nil {
if _, err := uc(object.Pointer, object.Error); err != nil {
return err
}
} else {
if err := dc(object.Pointer, nil, object.Error); err != nil {
return err
}
}
continue
}
errGroup.Go(func() error {
return performSingleOperation(groupCtx, object, dc, uc, transferAdapter)
})
}

if uc != nil {
if len(object.Actions) == 0 {
log.Trace("%v already present on server", object.Pointer)
continue
}
// only the first error is returned, preserving legacy behavior before concurrency
return errGroup.Wait()
}

link, ok := object.Actions["upload"]
if !ok {
log.Debug("%+v", object)
return errors.New("missing action 'upload'")
}
// performSingleOperation performs an LFS upload or download operation on a single object
func performSingleOperation(ctx context.Context, object *ObjectResponse, dc DownloadCallback, uc UploadCallback, transferAdapter TransferAdapter) error {
// the response from a lfs batch api request for this specific object id contained an error
if object.Error != nil {
log.Trace("Error on object %v: %v", object.Pointer, object.Error)

content, err := uc(object.Pointer, nil)
if err != nil {
// this was an 'upload' request inside the batch request
if uc != nil {
if _, err := uc(object.Pointer, object.Error); err != nil {
return err
}

err = transferAdapter.Upload(ctx, link, object.Pointer, content)
if err != nil {
} else {
// this was NOT an 'upload' request inside the batch request, meaning it must be a 'download' request
if err := dc(object.Pointer, nil, object.Error); err != nil {
return err
}
}
// if the callback returns no err, then the error could be ignored, and the operations should continue
return nil
}

link, ok = object.Actions["verify"]
if ok {
if err := transferAdapter.Verify(ctx, link, object.Pointer); err != nil {
return err
}
}
} else {
link, ok := object.Actions["download"]
if !ok {
// no actions block in response, try legacy response schema
link, ok = object.Links["download"]
}
if !ok {
log.Debug("%+v", object)
return errors.New("missing action 'download'")
}
// the response from an lfs batch api request contained necessary upload/download fields to act upon
if uc != nil {
if len(object.Actions) == 0 {
log.Trace("%v already present on server", object.Pointer)
return nil
}

content, err := transferAdapter.Download(ctx, link)
if err != nil {
return err
}
link, ok := object.Actions["upload"]
if !ok {
return errors.New("missing action 'upload'")
}

content, err := uc(object.Pointer, nil)
if err != nil {
return err
}

if err := dc(object.Pointer, content, nil); err != nil {
err = transferAdapter.Upload(ctx, link, object.Pointer, content)
if err != nil {
return err
}

link, ok = object.Actions["verify"]
if ok {
if err := transferAdapter.Verify(ctx, link, object.Pointer); err != nil {
return err
}
}
}
} else {
link, ok := object.Actions["download"]
if !ok {
// no actions block in response, try legacy response schema
link, ok = object.Links["download"]
}
if !ok {
log.Debug("%+v", object)
return errors.New("missing action 'download'")
}

content, err := transferAdapter.Download(ctx, link)
if err != nil {
return err
}

if err := dc(object.Pointer, content, nil); err != nil {
return err
}
}
return nil
}

Expand Down
Loading
Loading