Skip to content

Fix auto concurrency cancellation skips commit status updates (#33764) #33849

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 12 additions & 8 deletions models/actions/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func updateRepoRunsNumbers(ctx context.Context, repo *repo_model.Repository) err

// CancelPreviousJobs cancels all previous jobs of the same repository, reference, workflow, and event.
// It's useful when a new run is triggered, and all previous runs needn't be continued anymore.
func CancelPreviousJobs(ctx context.Context, repoID int64, ref, workflowID string, event webhook_module.HookEventType) error {
func CancelPreviousJobs(ctx context.Context, repoID int64, ref, workflowID string, event webhook_module.HookEventType) ([]*ActionRunJob, error) {
// Find all runs in the specified repository, reference, and workflow with non-final status
runs, total, err := db.FindAndCount[ActionRun](ctx, FindRunOptions{
RepoID: repoID,
Expand All @@ -204,22 +204,24 @@ func CancelPreviousJobs(ctx context.Context, repoID int64, ref, workflowID strin
Status: []Status{StatusRunning, StatusWaiting, StatusBlocked},
})
if err != nil {
return err
return nil, err
}

// If there are no runs found, there's no need to proceed with cancellation, so return nil.
if total == 0 {
return nil
return nil, nil
}

cancelledJobs := make([]*ActionRunJob, 0, total)

// Iterate over each found run and cancel its associated jobs.
for _, run := range runs {
// Find all jobs associated with the current run.
jobs, err := db.Find[ActionRunJob](ctx, FindRunJobOptions{
RunID: run.ID,
})
if err != nil {
return err
return cancelledJobs, err
}

// Iterate over each job and attempt to cancel it.
Expand All @@ -238,27 +240,29 @@ func CancelPreviousJobs(ctx context.Context, repoID int64, ref, workflowID strin
// Update the job's status and stopped time in the database.
n, err := UpdateRunJob(ctx, job, builder.Eq{"task_id": 0}, "status", "stopped")
if err != nil {
return err
return cancelledJobs, err
}

// If the update affected 0 rows, it means the job has changed in the meantime, so we need to try again.
if n == 0 {
return fmt.Errorf("job has changed, try again")
return cancelledJobs, fmt.Errorf("job has changed, try again")
}

cancelledJobs = append(cancelledJobs, job)
// Continue with the next job.
continue
}

// If the job has an associated task, try to stop the task, effectively cancelling the job.
if err := StopTask(ctx, job.TaskID, StatusCancelled); err != nil {
return err
return cancelledJobs, err
}
cancelledJobs = append(cancelledJobs, job)
}
}

// Return nil to indicate successful cancellation of all running and waiting jobs.
return nil
return cancelledJobs, nil
}

// InsertRun inserts a run
Expand Down
13 changes: 7 additions & 6 deletions models/actions/schedule.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,21 +120,22 @@ func DeleteScheduleTaskByRepo(ctx context.Context, id int64) error {
return committer.Commit()
}

func CleanRepoScheduleTasks(ctx context.Context, repo *repo_model.Repository) error {
func CleanRepoScheduleTasks(ctx context.Context, repo *repo_model.Repository) ([]*ActionRunJob, error) {
// If actions disabled when there is schedule task, this will remove the outdated schedule tasks
// There is no other place we can do this because the app.ini will be changed manually
if err := DeleteScheduleTaskByRepo(ctx, repo.ID); err != nil {
return fmt.Errorf("DeleteCronTaskByRepo: %v", err)
return nil, fmt.Errorf("DeleteCronTaskByRepo: %v", err)
}
// cancel running cron jobs of this repository and delete old schedules
if err := CancelPreviousJobs(
jobs, err := CancelPreviousJobs(
ctx,
repo.ID,
repo.DefaultBranch,
"",
webhook_module.HookEventSchedule,
); err != nil {
return fmt.Errorf("CancelPreviousJobs: %v", err)
)
if err != nil {
return jobs, fmt.Errorf("CancelPreviousJobs: %v", err)
}
return nil
return jobs, nil
}
3 changes: 1 addition & 2 deletions routers/api/v1/repo/repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"strings"
"time"

actions_model "code.gitea.io/gitea/models/actions"
activities_model "code.gitea.io/gitea/models/activities"
"code.gitea.io/gitea/models/db"
"code.gitea.io/gitea/models/organization"
Expand Down Expand Up @@ -1050,7 +1049,7 @@ func updateRepoArchivedState(ctx *context.APIContext, opts api.EditRepoOption) e
ctx.Error(http.StatusInternalServerError, "ArchiveRepoState", err)
return err
}
if err := actions_model.CleanRepoScheduleTasks(ctx, repo); err != nil {
if err := actions_service.CleanRepoScheduleTasks(ctx, repo); err != nil {
log.Error("CleanRepoScheduleTasks for archived repo %s/%s: %v", ctx.Repo.Owner.Name, repo.Name, err)
}
log.Trace("Repository was archived: %s/%s", ctx.Repo.Owner.Name, repo.Name)
Expand Down
2 changes: 1 addition & 1 deletion routers/web/repo/actions/view.go
Original file line number Diff line number Diff line change
Expand Up @@ -903,7 +903,7 @@ func Run(ctx *context_module.Context) {
}

// cancel running jobs of the same workflow
if err := actions_model.CancelPreviousJobs(
if err := actions_service.CancelPreviousJobs(
ctx,
run.RepoID,
run.Ref,
Expand Down
3 changes: 1 addition & 2 deletions routers/web/repo/setting/setting.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"time"

"code.gitea.io/gitea/models"
actions_model "code.gitea.io/gitea/models/actions"
"code.gitea.io/gitea/models/db"
"code.gitea.io/gitea/models/organization"
"code.gitea.io/gitea/models/perm"
Expand Down Expand Up @@ -902,7 +901,7 @@ func SettingsPost(ctx *context.Context) {
return
}

if err := actions_model.CleanRepoScheduleTasks(ctx, repo); err != nil {
if err := actions_service.CleanRepoScheduleTasks(ctx, repo); err != nil {
log.Error("CleanRepoScheduleTasks for archived repo %s/%s: %v", ctx.Repo.Owner.Name, repo.Name, err)
}

Expand Down
22 changes: 21 additions & 1 deletion services/actions/clear_tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ import (

actions_model "code.gitea.io/gitea/models/actions"
"code.gitea.io/gitea/models/db"
repo_model "code.gitea.io/gitea/models/repo"
"code.gitea.io/gitea/modules/actions"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/timeutil"
webhook_module "code.gitea.io/gitea/modules/webhook"
)

// StopZombieTasks stops the task which have running status, but haven't been updated for a long time
Expand All @@ -32,6 +34,24 @@ func StopEndlessTasks(ctx context.Context) error {
})
}

func notifyWorkflowJobStatusUpdate(ctx context.Context, jobs []*actions_model.ActionRunJob) {
if len(jobs) > 0 {
CreateCommitStatus(ctx, jobs...)
}
}

func CancelPreviousJobs(ctx context.Context, repoID int64, ref, workflowID string, event webhook_module.HookEventType) error {
jobs, err := actions_model.CancelPreviousJobs(ctx, repoID, ref, workflowID, event)
notifyWorkflowJobStatusUpdate(ctx, jobs)
return err
}

func CleanRepoScheduleTasks(ctx context.Context, repo *repo_model.Repository) error {
jobs, err := actions_model.CleanRepoScheduleTasks(ctx, repo)
notifyWorkflowJobStatusUpdate(ctx, jobs)
return err
}

func stopTasks(ctx context.Context, opts actions_model.FindTaskOptions) error {
tasks, err := db.Find[actions_model.ActionTask](ctx, opts)
if err != nil {
Expand Down Expand Up @@ -67,7 +87,7 @@ func stopTasks(ctx context.Context, opts actions_model.FindTaskOptions) error {
remove()
}

CreateCommitStatus(ctx, jobs...)
notifyWorkflowJobStatusUpdate(ctx, jobs)

return nil
}
Expand Down
6 changes: 3 additions & 3 deletions services/actions/notifier_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func notify(ctx context.Context, input *notifyInput) error {
return nil
}
if unit_model.TypeActions.UnitGlobalDisabled() {
if err := actions_model.CleanRepoScheduleTasks(ctx, input.Repo); err != nil {
if err := CleanRepoScheduleTasks(ctx, input.Repo); err != nil {
log.Error("CleanRepoScheduleTasks: %v", err)
}
return nil
Expand Down Expand Up @@ -341,7 +341,7 @@ func handleWorkflows(
// cancel running jobs if the event is push or pull_request_sync
if run.Event == webhook_module.HookEventPush ||
run.Event == webhook_module.HookEventPullRequestSync {
if err := actions_model.CancelPreviousJobs(
if err := CancelPreviousJobs(
ctx,
run.RepoID,
run.Ref,
Expand Down Expand Up @@ -472,7 +472,7 @@ func handleSchedules(
log.Error("CountSchedules: %v", err)
return err
} else if count > 0 {
if err := actions_model.CleanRepoScheduleTasks(ctx, input.Repo); err != nil {
if err := CleanRepoScheduleTasks(ctx, input.Repo); err != nil {
log.Error("CleanRepoScheduleTasks: %v", err)
}
}
Expand Down
2 changes: 1 addition & 1 deletion services/actions/schedule_tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func startTasks(ctx context.Context) error {
// cancel running jobs if the event is push
if row.Schedule.Event == webhook_module.HookEventPush {
// cancel running jobs of the same workflow
if err := actions_model.CancelPreviousJobs(
if err := CancelPreviousJobs(
ctx,
row.RepoID,
row.Schedule.Ref,
Expand Down
5 changes: 3 additions & 2 deletions services/repository/branch.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"code.gitea.io/gitea/modules/timeutil"
"code.gitea.io/gitea/modules/util"
webhook_module "code.gitea.io/gitea/modules/webhook"
actions_service "code.gitea.io/gitea/services/actions"
notify_service "code.gitea.io/gitea/services/notify"
files_service "code.gitea.io/gitea/services/repository/files"

Expand Down Expand Up @@ -428,7 +429,7 @@ func RenameBranch(ctx context.Context, repo *repo_model.Repository, doer *user_m
log.Error("DeleteCronTaskByRepo: %v", err)
}
// cancel running cron jobs of this repository and delete old schedules
if err := actions_model.CancelPreviousJobs(
if err := actions_service.CancelPreviousJobs(
ctx,
repo.ID,
from,
Expand Down Expand Up @@ -609,7 +610,7 @@ func SetRepoDefaultBranch(ctx context.Context, repo *repo_model.Repository, gitR
log.Error("DeleteCronTaskByRepo: %v", err)
}
// cancel running cron jobs of this repository and delete old schedules
if err := actions_model.CancelPreviousJobs(
if err := actions_service.CancelPreviousJobs(
ctx,
repo.ID,
oldDefaultBranchName,
Expand Down
3 changes: 1 addition & 2 deletions services/repository/setting.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"context"
"slices"

actions_model "code.gitea.io/gitea/models/actions"
"code.gitea.io/gitea/models/db"
repo_model "code.gitea.io/gitea/models/repo"
"code.gitea.io/gitea/models/unit"
Expand All @@ -29,7 +28,7 @@ func UpdateRepositoryUnits(ctx context.Context, repo *repo_model.Repository, uni
}

if slices.Contains(deleteUnitTypes, unit.TypeActions) {
if err := actions_model.CleanRepoScheduleTasks(ctx, repo); err != nil {
if err := actions_service.CleanRepoScheduleTasks(ctx, repo); err != nil {
log.Error("CleanRepoScheduleTasks: %v", err)
}
}
Expand Down