@@ -13,6 +13,8 @@ import (
13
13
"net/url"
14
14
"strings"
15
15
16
+ "golang.org/x/sync/errgroup"
17
+
16
18
"code.gitea.io/gitea/modules/json"
17
19
"code.gitea.io/gitea/modules/log"
18
20
"code.gitea.io/gitea/modules/proxy"
@@ -114,6 +116,7 @@ func (c *HTTPClient) Upload(ctx context.Context, objects []Pointer, callback Upl
114
116
return c .performOperation (ctx , objects , nil , callback )
115
117
}
116
118
119
+ // performOperation takes a slice of LFS object pointers, batches them, and performs the upload/download operations concurrently in each batch
117
120
func (c * HTTPClient ) performOperation (ctx context.Context , objects []Pointer , dc DownloadCallback , uc UploadCallback ) error {
118
121
if len (objects ) == 0 {
119
122
return nil
@@ -134,71 +137,86 @@ func (c *HTTPClient) performOperation(ctx context.Context, objects []Pointer, dc
134
137
return fmt .Errorf ("TransferAdapter not found: %s" , result .Transfer )
135
138
}
136
139
140
+ errGroup , groupCtx := errgroup .WithContext (context .Background ())
137
141
for _ , object := range result .Objects {
138
- if object .Error != nil {
139
- log .Trace ("Error on object %v: %v" , object .Pointer , object .Error )
140
- if uc != nil {
141
- if _ , err := uc (object .Pointer , object .Error ); err != nil {
142
- return err
143
- }
144
- } else {
145
- if err := dc (object .Pointer , nil , object .Error ); err != nil {
146
- return err
147
- }
148
- }
149
- continue
150
- }
151
-
152
- if uc != nil {
153
- if len (object .Actions ) == 0 {
154
- log .Trace ("%v already present on server" , object .Pointer )
155
- continue
156
- }
142
+ func (ctx context.Context , object * ObjectResponse , dc DownloadCallback , uc UploadCallback , transferAdapter TransferAdapter ) {
143
+ errGroup .Go (func () error {
144
+ err := performSingleOperation (groupCtx , object , dc , uc , transferAdapter )
145
+ return err
146
+ })
147
+ }(groupCtx , object , dc , uc , transferAdapter )
148
+ }
157
149
158
- link , ok := object .Actions ["upload" ]
159
- if ! ok {
160
- log .Debug ("%+v" , object )
161
- return errors .New ("missing action 'upload'" )
162
- }
150
+ // only the first error is returned, preserving legacy behavior before concurrency
151
+ return errGroup .Wait ()
152
+ }
163
153
164
- content , err := uc (object .Pointer , nil )
165
- if err != nil {
154
+ // performSingleOperation performs an LFS upload or download operation on a single object
155
+ func performSingleOperation (ctx context.Context , object * ObjectResponse , dc DownloadCallback , uc UploadCallback , transferAdapter TransferAdapter ) error {
156
+ if object .Error != nil {
157
+ log .Trace ("Error on object %v: %v" , object .Pointer , object .Error )
158
+ if uc != nil {
159
+ if _ , err := uc (object .Pointer , object .Error ); err != nil {
166
160
return err
167
161
}
168
-
169
- err = transferAdapter .Upload (ctx , link , object .Pointer , content )
170
- if err != nil {
162
+ } else {
163
+ err := dc (object .Pointer , nil , object .Error )
164
+ if errors .Is (object .Error , ErrObjectNotExist ) {
165
+ log .Warn ("Ignoring missing upstream LFS object %-v: %v" , object .Pointer , err )
166
+ return nil
167
+ } else {
171
168
return err
172
169
}
170
+ }
171
+ }
173
172
174
- link , ok = object .Actions ["verify" ]
175
- if ok {
176
- if err := transferAdapter .Verify (ctx , link , object .Pointer ); err != nil {
177
- return err
178
- }
179
- }
180
- } else {
181
- link , ok := object .Actions ["download" ]
182
- if ! ok {
183
- // no actions block in response, try legacy response schema
184
- link , ok = object .Links ["download" ]
185
- }
186
- if ! ok {
187
- log .Debug ("%+v" , object )
188
- return errors .New ("missing action 'download'" )
189
- }
173
+ if uc != nil {
174
+ if len (object .Actions ) == 0 {
175
+ log .Trace ("%v already present on server" , object .Pointer )
176
+ return nil
177
+ }
190
178
191
- content , err := transferAdapter .Download (ctx , link )
192
- if err != nil {
193
- return err
194
- }
179
+ link , ok := object .Actions ["upload" ]
180
+ if ! ok {
181
+ return errors .New ("missing action 'upload'" )
182
+ }
183
+
184
+ content , err := uc (object .Pointer , nil )
185
+ if err != nil {
186
+ return err
187
+ }
195
188
196
- if err := dc (object .Pointer , content , nil ); err != nil {
189
+ err = transferAdapter .Upload (ctx , link , object .Pointer , content )
190
+ if err != nil {
191
+ return err
192
+ }
193
+
194
+ link , ok = object .Actions ["verify" ]
195
+ if ok {
196
+ if err := transferAdapter .Verify (ctx , link , object .Pointer ); err != nil {
197
197
return err
198
198
}
199
199
}
200
- }
200
+ } else {
201
+ link , ok := object .Actions ["download" ]
202
+ if ! ok {
203
+ // no actions block in response, try legacy response schema
204
+ link , ok = object .Links ["download" ]
205
+ }
206
+ if ! ok {
207
+ log .Debug ("%+v" , object )
208
+ return errors .New ("missing action 'download'" )
209
+ }
201
210
211
+ content , err := transferAdapter .Download (ctx , link )
212
+ if err != nil {
213
+ return err
214
+ }
215
+
216
+ if err := dc (object .Pointer , content , nil ); err != nil {
217
+ return err
218
+ }
219
+ }
202
220
return nil
203
221
}
204
222
0 commit comments