Skip to content

Commit b6eef34

Browse files
authored
Fix artifact v4 upload above 8MB (#31664) (#32523)
1 parent d03dd04 commit b6eef34

File tree

3 files changed

+286
-40
lines changed

3 files changed

+286
-40
lines changed

routers/api/actions/artifacts_chunks.go

+49-1
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,54 @@ func listChunksByRunID(st storage.ObjectStorage, runID int64) (map[int64][]*chun
123123
return chunksMap, nil
124124
}
125125

126+
func listChunksByRunIDV4(st storage.ObjectStorage, runID, artifactID int64, blist *BlockList) ([]*chunkFileItem, error) {
127+
storageDir := fmt.Sprintf("tmpv4%d", runID)
128+
var chunks []*chunkFileItem
129+
chunkMap := map[string]*chunkFileItem{}
130+
dummy := &chunkFileItem{}
131+
for _, name := range blist.Latest {
132+
chunkMap[name] = dummy
133+
}
134+
if err := st.IterateObjects(storageDir, func(fpath string, obj storage.Object) error {
135+
baseName := filepath.Base(fpath)
136+
if !strings.HasPrefix(baseName, "block-") {
137+
return nil
138+
}
139+
// when read chunks from storage, it only contains storage dir and basename,
140+
// no matter the subdirectory setting in storage config
141+
item := chunkFileItem{Path: storageDir + "/" + baseName, ArtifactID: artifactID}
142+
var size int64
143+
var b64chunkName string
144+
if _, err := fmt.Sscanf(baseName, "block-%d-%d-%s", &item.RunID, &size, &b64chunkName); err != nil {
145+
return fmt.Errorf("parse content range error: %v", err)
146+
}
147+
rchunkName, err := base64.URLEncoding.DecodeString(b64chunkName)
148+
if err != nil {
149+
return fmt.Errorf("failed to parse chunkName: %v", err)
150+
}
151+
chunkName := string(rchunkName)
152+
item.End = item.Start + size - 1
153+
if _, ok := chunkMap[chunkName]; ok {
154+
chunkMap[chunkName] = &item
155+
}
156+
return nil
157+
}); err != nil {
158+
return nil, err
159+
}
160+
for i, name := range blist.Latest {
161+
chunk, ok := chunkMap[name]
162+
if !ok || chunk.Path == "" {
163+
return nil, fmt.Errorf("missing Chunk (%d/%d): %s", i, len(blist.Latest), name)
164+
}
165+
chunks = append(chunks, chunk)
166+
if i > 0 {
167+
chunk.Start = chunkMap[blist.Latest[i-1]].End + 1
168+
chunk.End += chunk.Start
169+
}
170+
}
171+
return chunks, nil
172+
}
173+
126174
func mergeChunksForRun(ctx *ArtifactContext, st storage.ObjectStorage, runID int64, artifactName string) error {
127175
// read all db artifacts by name
128176
artifacts, err := db.Find[actions.ActionArtifact](ctx, actions.FindArtifactsOptions{
@@ -230,7 +278,7 @@ func mergeChunksForArtifact(ctx *ArtifactContext, chunks []*chunkFileItem, st st
230278
rawChecksum := hash.Sum(nil)
231279
actualChecksum := hex.EncodeToString(rawChecksum)
232280
if !strings.HasSuffix(checksum, actualChecksum) {
233-
return fmt.Errorf("update artifact error checksum is invalid")
281+
return fmt.Errorf("update artifact error checksum is invalid %v vs %v", checksum, actualChecksum)
234282
}
235283
}
236284

routers/api/actions/artifactsv4.go

+107-39
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,15 @@ package actions
2424
// PUT: http://localhost:3000/twirp/github.actions.results.api.v1.ArtifactService/UploadArtifact?sig=mO7y35r4GyjN7fwg0DTv3-Fv1NDXD84KLEgLpoPOtDI=&expires=2024-01-23+21%3A48%3A37.20833956+%2B0100+CET&artifactName=test&taskID=75&comp=block
2525
// 1.3. Continue Upload Zip Content to Blobstorage (unauthenticated request), repeat until everything is uploaded
2626
// PUT: http://localhost:3000/twirp/github.actions.results.api.v1.ArtifactService/UploadArtifact?sig=mO7y35r4GyjN7fwg0DTv3-Fv1NDXD84KLEgLpoPOtDI=&expires=2024-01-23+21%3A48%3A37.20833956+%2B0100+CET&artifactName=test&taskID=75&comp=appendBlock
27-
// 1.4. Unknown xml payload to Blobstorage (unauthenticated request), ignored for now
27+
// 1.4. BlockList xml payload to Blobstorage (unauthenticated request)
28+
// Files of about 800MB are parallel in parallel and / or out of order, this file is needed to enshure the correct order
2829
// PUT: http://localhost:3000/twirp/github.actions.results.api.v1.ArtifactService/UploadArtifact?sig=mO7y35r4GyjN7fwg0DTv3-Fv1NDXD84KLEgLpoPOtDI=&expires=2024-01-23+21%3A48%3A37.20833956+%2B0100+CET&artifactName=test&taskID=75&comp=blockList
30+
// Request
31+
// <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
32+
// <BlockList>
33+
// <Latest>blockId1</Latest>
34+
// <Latest>blockId2</Latest>
35+
// </BlockList>
2936
// 1.5. FinalizeArtifact
3037
// Post: /twirp/github.actions.results.api.v1.ArtifactService/FinalizeArtifact
3138
// Request
@@ -82,6 +89,7 @@ import (
8289
"crypto/hmac"
8390
"crypto/sha256"
8491
"encoding/base64"
92+
"encoding/xml"
8593
"fmt"
8694
"io"
8795
"net/http"
@@ -152,31 +160,34 @@ func ArtifactsV4Routes(prefix string) *web.Route {
152160
return m
153161
}
154162

155-
func (r artifactV4Routes) buildSignature(endp, expires, artifactName string, taskID int64) []byte {
163+
func (r artifactV4Routes) buildSignature(endp, expires, artifactName string, taskID, artifactID int64) []byte {
156164
mac := hmac.New(sha256.New, setting.GetGeneralTokenSigningSecret())
157165
mac.Write([]byte(endp))
158166
mac.Write([]byte(expires))
159167
mac.Write([]byte(artifactName))
160168
mac.Write([]byte(fmt.Sprint(taskID)))
169+
mac.Write([]byte(fmt.Sprint(artifactID)))
161170
return mac.Sum(nil)
162171
}
163172

164-
func (r artifactV4Routes) buildArtifactURL(ctx *ArtifactContext, endp, artifactName string, taskID int64) string {
173+
func (r artifactV4Routes) buildArtifactURL(ctx *ArtifactContext, endp, artifactName string, taskID, artifactID int64) string {
165174
expires := time.Now().Add(60 * time.Minute).Format("2006-01-02 15:04:05.999999999 -0700 MST")
166175
uploadURL := strings.TrimSuffix(httplib.GuessCurrentAppURL(ctx), "/") + strings.TrimSuffix(r.prefix, "/") +
167-
"/" + endp + "?sig=" + base64.URLEncoding.EncodeToString(r.buildSignature(endp, expires, artifactName, taskID)) + "&expires=" + url.QueryEscape(expires) + "&artifactName=" + url.QueryEscape(artifactName) + "&taskID=" + fmt.Sprint(taskID)
176+
"/" + endp + "?sig=" + base64.URLEncoding.EncodeToString(r.buildSignature(endp, expires, artifactName, taskID, artifactID)) + "&expires=" + url.QueryEscape(expires) + "&artifactName=" + url.QueryEscape(artifactName) + "&taskID=" + fmt.Sprint(taskID) + "&artifactID=" + fmt.Sprint(artifactID)
168177
return uploadURL
169178
}
170179

171180
func (r artifactV4Routes) verifySignature(ctx *ArtifactContext, endp string) (*actions.ActionTask, string, bool) {
172181
rawTaskID := ctx.Req.URL.Query().Get("taskID")
182+
rawArtifactID := ctx.Req.URL.Query().Get("artifactID")
173183
sig := ctx.Req.URL.Query().Get("sig")
174184
expires := ctx.Req.URL.Query().Get("expires")
175185
artifactName := ctx.Req.URL.Query().Get("artifactName")
176186
dsig, _ := base64.URLEncoding.DecodeString(sig)
177187
taskID, _ := strconv.ParseInt(rawTaskID, 10, 64)
188+
artifactID, _ := strconv.ParseInt(rawArtifactID, 10, 64)
178189

179-
expecedsig := r.buildSignature(endp, expires, artifactName, taskID)
190+
expecedsig := r.buildSignature(endp, expires, artifactName, taskID, artifactID)
180191
if !hmac.Equal(dsig, expecedsig) {
181192
log.Error("Error unauthorized")
182193
ctx.Error(http.StatusUnauthorized, "Error unauthorized")
@@ -271,6 +282,8 @@ func (r *artifactV4Routes) createArtifact(ctx *ArtifactContext) {
271282
return
272283
}
273284
artifact.ContentEncoding = ArtifactV4ContentEncoding
285+
artifact.FileSize = 0
286+
artifact.FileCompressedSize = 0
274287
if err := actions.UpdateArtifactByID(ctx, artifact.ID, artifact); err != nil {
275288
log.Error("Error UpdateArtifactByID: %v", err)
276289
ctx.Error(http.StatusInternalServerError, "Error UpdateArtifactByID")
@@ -279,7 +292,7 @@ func (r *artifactV4Routes) createArtifact(ctx *ArtifactContext) {
279292

280293
respData := CreateArtifactResponse{
281294
Ok: true,
282-
SignedUploadUrl: r.buildArtifactURL(ctx, "UploadArtifact", artifactName, ctx.ActionTask.ID),
295+
SignedUploadUrl: r.buildArtifactURL(ctx, "UploadArtifact", artifactName, ctx.ActionTask.ID, artifact.ID),
283296
}
284297
r.sendProtbufBody(ctx, &respData)
285298
}
@@ -293,38 +306,77 @@ func (r *artifactV4Routes) uploadArtifact(ctx *ArtifactContext) {
293306
comp := ctx.Req.URL.Query().Get("comp")
294307
switch comp {
295308
case "block", "appendBlock":
296-
// get artifact by name
297-
artifact, err := r.getArtifactByName(ctx, task.Job.RunID, artifactName)
298-
if err != nil {
299-
log.Error("Error artifact not found: %v", err)
300-
ctx.Error(http.StatusNotFound, "Error artifact not found")
301-
return
309+
blockid := ctx.Req.URL.Query().Get("blockid")
310+
if blockid == "" {
311+
// get artifact by name
312+
artifact, err := r.getArtifactByName(ctx, task.Job.RunID, artifactName)
313+
if err != nil {
314+
log.Error("Error artifact not found: %v", err)
315+
ctx.Error(http.StatusNotFound, "Error artifact not found")
316+
return
317+
}
318+
319+
_, err = appendUploadChunk(r.fs, ctx, artifact, artifact.FileSize, ctx.Req.ContentLength, artifact.RunID)
320+
if err != nil {
321+
log.Error("Error runner api getting task: task is not running")
322+
ctx.Error(http.StatusInternalServerError, "Error runner api getting task: task is not running")
323+
return
324+
}
325+
artifact.FileCompressedSize += ctx.Req.ContentLength
326+
artifact.FileSize += ctx.Req.ContentLength
327+
if err := actions.UpdateArtifactByID(ctx, artifact.ID, artifact); err != nil {
328+
log.Error("Error UpdateArtifactByID: %v", err)
329+
ctx.Error(http.StatusInternalServerError, "Error UpdateArtifactByID")
330+
return
331+
}
332+
} else {
333+
_, err := r.fs.Save(fmt.Sprintf("tmpv4%d/block-%d-%d-%s", task.Job.RunID, task.Job.RunID, ctx.Req.ContentLength, base64.URLEncoding.EncodeToString([]byte(blockid))), ctx.Req.Body, -1)
334+
if err != nil {
335+
log.Error("Error runner api getting task: task is not running")
336+
ctx.Error(http.StatusInternalServerError, "Error runner api getting task: task is not running")
337+
return
338+
}
302339
}
303-
304-
if comp == "block" {
305-
artifact.FileSize = 0
306-
artifact.FileCompressedSize = 0
307-
}
308-
309-
_, err = appendUploadChunk(r.fs, ctx, artifact, artifact.FileSize, ctx.Req.ContentLength, artifact.RunID)
340+
ctx.JSON(http.StatusCreated, "appended")
341+
case "blocklist":
342+
rawArtifactID := ctx.Req.URL.Query().Get("artifactID")
343+
artifactID, _ := strconv.ParseInt(rawArtifactID, 10, 64)
344+
_, err := r.fs.Save(fmt.Sprintf("tmpv4%d/%d-%d-blocklist", task.Job.RunID, task.Job.RunID, artifactID), ctx.Req.Body, -1)
310345
if err != nil {
311346
log.Error("Error runner api getting task: task is not running")
312347
ctx.Error(http.StatusInternalServerError, "Error runner api getting task: task is not running")
313348
return
314349
}
315-
artifact.FileCompressedSize += ctx.Req.ContentLength
316-
artifact.FileSize += ctx.Req.ContentLength
317-
if err := actions.UpdateArtifactByID(ctx, artifact.ID, artifact); err != nil {
318-
log.Error("Error UpdateArtifactByID: %v", err)
319-
ctx.Error(http.StatusInternalServerError, "Error UpdateArtifactByID")
320-
return
321-
}
322-
ctx.JSON(http.StatusCreated, "appended")
323-
case "blocklist":
324350
ctx.JSON(http.StatusCreated, "created")
325351
}
326352
}
327353

354+
type BlockList struct {
355+
Latest []string `xml:"Latest"`
356+
}
357+
358+
type Latest struct {
359+
Value string `xml:",chardata"`
360+
}
361+
362+
func (r *artifactV4Routes) readBlockList(runID, artifactID int64) (*BlockList, error) {
363+
blockListName := fmt.Sprintf("tmpv4%d/%d-%d-blocklist", runID, runID, artifactID)
364+
s, err := r.fs.Open(blockListName)
365+
if err != nil {
366+
return nil, err
367+
}
368+
369+
xdec := xml.NewDecoder(s)
370+
blockList := &BlockList{}
371+
err = xdec.Decode(blockList)
372+
373+
delerr := r.fs.Delete(blockListName)
374+
if delerr != nil {
375+
log.Warn("Failed to delete blockList %s: %v", blockListName, delerr)
376+
}
377+
return blockList, err
378+
}
379+
328380
func (r *artifactV4Routes) finalizeArtifact(ctx *ArtifactContext) {
329381
var req FinalizeArtifactRequest
330382

@@ -343,18 +395,34 @@ func (r *artifactV4Routes) finalizeArtifact(ctx *ArtifactContext) {
343395
ctx.Error(http.StatusNotFound, "Error artifact not found")
344396
return
345397
}
346-
chunkMap, err := listChunksByRunID(r.fs, runID)
398+
399+
var chunks []*chunkFileItem
400+
blockList, err := r.readBlockList(runID, artifact.ID)
347401
if err != nil {
348-
log.Error("Error merge chunks: %v", err)
349-
ctx.Error(http.StatusInternalServerError, "Error merge chunks")
350-
return
351-
}
352-
chunks, ok := chunkMap[artifact.ID]
353-
if !ok {
354-
log.Error("Error merge chunks")
355-
ctx.Error(http.StatusInternalServerError, "Error merge chunks")
356-
return
402+
log.Warn("Failed to read BlockList, fallback to old behavior: %v", err)
403+
chunkMap, err := listChunksByRunID(r.fs, runID)
404+
if err != nil {
405+
log.Error("Error merge chunks: %v", err)
406+
ctx.Error(http.StatusInternalServerError, "Error merge chunks")
407+
return
408+
}
409+
chunks, ok = chunkMap[artifact.ID]
410+
if !ok {
411+
log.Error("Error merge chunks")
412+
ctx.Error(http.StatusInternalServerError, "Error merge chunks")
413+
return
414+
}
415+
} else {
416+
chunks, err = listChunksByRunIDV4(r.fs, runID, artifact.ID, blockList)
417+
if err != nil {
418+
log.Error("Error merge chunks: %v", err)
419+
ctx.Error(http.StatusInternalServerError, "Error merge chunks")
420+
return
421+
}
422+
artifact.FileSize = chunks[len(chunks)-1].End + 1
423+
artifact.FileCompressedSize = chunks[len(chunks)-1].End + 1
357424
}
425+
358426
checksum := ""
359427
if req.Hash != nil {
360428
checksum = req.Hash.Value
@@ -455,7 +523,7 @@ func (r *artifactV4Routes) getSignedArtifactURL(ctx *ArtifactContext) {
455523
}
456524
}
457525
if respData.SignedUrl == "" {
458-
respData.SignedUrl = r.buildArtifactURL(ctx, "DownloadArtifact", artifactName, ctx.ActionTask.ID)
526+
respData.SignedUrl = r.buildArtifactURL(ctx, "DownloadArtifact", artifactName, ctx.ActionTask.ID, artifact.ID)
459527
}
460528
r.sendProtbufBody(ctx, &respData)
461529
}

0 commit comments

Comments
 (0)