@@ -17,6 +17,8 @@ import (
17
17
"code.gitea.io/gitea/modules/log"
18
18
"code.gitea.io/gitea/modules/proxy"
19
19
"code.gitea.io/gitea/modules/setting"
20
+
21
+ "golang.org/x/sync/errgroup"
20
22
)
21
23
22
24
// HTTPClient is used to communicate with the LFS server
@@ -113,6 +115,7 @@ func (c *HTTPClient) Upload(ctx context.Context, objects []Pointer, callback Upl
113
115
return c .performOperation (ctx , objects , nil , callback )
114
116
}
115
117
118
+ // performOperation takes a slice of LFS object pointers, batches them, and performs the upload/download operations concurrently in each batch
116
119
func (c * HTTPClient ) performOperation (ctx context.Context , objects []Pointer , dc DownloadCallback , uc UploadCallback ) error {
117
120
if len (objects ) == 0 {
118
121
return nil
@@ -133,71 +136,90 @@ func (c *HTTPClient) performOperation(ctx context.Context, objects []Pointer, dc
133
136
return fmt .Errorf ("TransferAdapter not found: %s" , result .Transfer )
134
137
}
135
138
139
+ errGroup , groupCtx := errgroup .WithContext (ctx )
136
140
for _ , object := range result .Objects {
137
- if object .Error != nil {
138
- log .Trace ("Error on object %v: %v" , object .Pointer , object .Error )
139
- if uc != nil {
140
- if _ , err := uc (object .Pointer , object .Error ); err != nil {
141
- return err
142
- }
143
- } else {
144
- if err := dc (object .Pointer , nil , object .Error ); err != nil {
145
- return err
146
- }
147
- }
148
- continue
149
- }
150
-
151
- if uc != nil {
152
- if len (object .Actions ) == 0 {
153
- log .Trace ("%v already present on server" , object .Pointer )
154
- continue
155
- }
141
+ errGroup .Go (func () error {
142
+ err := performSingleOperation (groupCtx , object , dc , uc , transferAdapter )
143
+ return err
144
+ })
145
+ }
156
146
157
- link , ok := object .Actions ["upload" ]
158
- if ! ok {
159
- log .Debug ("%+v" , object )
160
- return errors .New ("missing action 'upload'" )
161
- }
147
+ // only the first error is returned, preserving legacy behavior before concurrency
148
+ return errGroup .Wait ()
149
+ }
162
150
163
- content , err := uc (object .Pointer , nil )
164
- if err != nil {
165
- return err
166
- }
151
+ // performSingleOperation performs an LFS upload or download operation on a single object
152
+ func performSingleOperation (ctx context.Context , object * ObjectResponse , dc DownloadCallback , uc UploadCallback , transferAdapter TransferAdapter ) error {
153
+ // the response from an lfs batch api request for this specific object id contained an error
154
+ if object .Error != nil {
155
+ log .Trace ("Error on object %v: %v" , object .Pointer , object .Error )
167
156
168
- err = transferAdapter .Upload (ctx , link , object .Pointer , content )
169
- if err != nil {
157
+ // this was an 'upload' request inside the batch request
158
+ if uc != nil {
159
+ if _ , err := uc (object .Pointer , object .Error ); err != nil {
170
160
return err
171
161
}
172
-
173
- link , ok = object .Actions ["verify" ]
174
- if ok {
175
- if err := transferAdapter .Verify (ctx , link , object .Pointer ); err != nil {
176
- return err
177
- }
178
- }
179
162
} else {
180
- link , ok := object .Actions ["download" ]
181
- if ! ok {
182
- // no actions block in response, try legacy response schema
183
- link , ok = object .Links ["download" ]
184
- }
185
- if ! ok {
186
- log .Debug ("%+v" , object )
187
- return errors .New ("missing action 'download'" )
163
+ // this was NOT an 'upload' request inside the batch request, meaning it must be a 'download' request
164
+ err := dc (object .Pointer , nil , object .Error )
165
+ if errors .Is (object .Error , ErrObjectNotExist ) {
166
+ log .Warn ("Ignoring missing upstream LFS object %-v: %v" , object .Pointer , err )
167
+ return nil
188
168
}
189
169
190
- content , err := transferAdapter . Download ( ctx , link )
191
- if err != nil {
192
- return err
193
- }
170
+ // this was a 'download' request which was a legitimate error response from the batch api (not an http/404 )
171
+ return err
172
+ }
173
+ }
194
174
195
- if err := dc (object .Pointer , content , nil ); err != nil {
175
+ // the response from an lfs batch api request contained necessary upload/download fields to act upon
176
+ if uc != nil {
177
+ if len (object .Actions ) == 0 {
178
+ log .Trace ("%v already present on server" , object .Pointer )
179
+ return nil
180
+ }
181
+
182
+ link , ok := object .Actions ["upload" ]
183
+ if ! ok {
184
+ return errors .New ("missing action 'upload'" )
185
+ }
186
+
187
+ content , err := uc (object .Pointer , nil )
188
+ if err != nil {
189
+ return err
190
+ }
191
+
192
+ err = transferAdapter .Upload (ctx , link , object .Pointer , content )
193
+ if err != nil {
194
+ return err
195
+ }
196
+
197
+ link , ok = object .Actions ["verify" ]
198
+ if ok {
199
+ if err := transferAdapter .Verify (ctx , link , object .Pointer ); err != nil {
196
200
return err
197
201
}
198
202
}
199
- }
203
+ } else {
204
+ link , ok := object .Actions ["download" ]
205
+ if ! ok {
206
+ // no actions block in response, try legacy response schema
207
+ link , ok = object .Links ["download" ]
208
+ }
209
+ if ! ok {
210
+ log .Debug ("%+v" , object )
211
+ return errors .New ("missing action 'download'" )
212
+ }
200
213
214
+ content , err := transferAdapter .Download (ctx , link )
215
+ if err != nil {
216
+ return err
217
+ }
218
+
219
+ if err := dc (object .Pointer , content , nil ); err != nil {
220
+ return err
221
+ }
222
+ }
201
223
return nil
202
224
}
203
225
0 commit comments