Skip to content

Commit 2da2000

Browse files
lunnywolfogre
andauthored
Use global lock instead of NewExclusivePool to allow distributed lock between multiple Gitea instances (#31813)
Replace #26486 Fix #19620 --------- Co-authored-by: Jason Song <[email protected]>
1 parent a581847 commit 2da2000

File tree

13 files changed

+185
-107
lines changed

13 files changed

+185
-107
lines changed

assets/go-licenses.json

+15
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

custom/conf/app.example.ini

+6
Original file line numberDiff line numberDiff line change
@@ -2713,3 +2713,9 @@ LEVEL = Info
27132713
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
27142714
;; storage type
27152715
;STORAGE_TYPE = local
2716+
2717+
;[global_lock]
2718+
;; Lock service type, could be memory or redis
2719+
;SERVICE_TYPE = memory
2720+
;; Ignored for the "memory" type. For "redis" use something like `redis://127.0.0.1:6379/0`
2721+
;SERVICE_CONN_STR =

modules/globallock/globallock.go

+10-2
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,22 @@ package globallock
66
import (
77
"context"
88
"sync"
9+
10+
"code.gitea.io/gitea/modules/setting"
911
)
1012

1113
var (
1214
defaultLocker Locker
1315
initOnce sync.Once
1416
initFunc = func() {
15-
// TODO: read the setting and initialize the default locker.
16-
// Before implementing this, don't use it.
17+
switch setting.GlobalLock.ServiceType {
18+
case "redis":
19+
defaultLocker = NewRedisLocker(setting.GlobalLock.ServiceConnStr)
20+
case "memory":
21+
fallthrough
22+
default:
23+
defaultLocker = NewMemoryLocker()
24+
}
1725
} // define initFunc as a variable to make it possible to change it in tests
1826
)
1927

modules/setting/gloabl_lock.go

+37
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
// Copyright 2024 The Gitea Authors. All rights reserved.
2+
// SPDX-License-Identifier: MIT
3+
4+
package setting
5+
6+
import (
7+
"code.gitea.io/gitea/modules/log"
8+
"code.gitea.io/gitea/modules/nosql"
9+
)
10+
11+
// GlobalLock represents configuration of global lock
12+
var GlobalLock = struct {
13+
ServiceType string
14+
ServiceConnStr string
15+
}{
16+
ServiceType: "memory",
17+
}
18+
19+
func loadGlobalLockFrom(rootCfg ConfigProvider) {
20+
sec := rootCfg.Section("global_lock")
21+
GlobalLock.ServiceType = sec.Key("SERVICE_TYPE").MustString("memory")
22+
switch GlobalLock.ServiceType {
23+
case "memory":
24+
case "redis":
25+
connStr := sec.Key("SERVICE_CONN_STR").String()
26+
if connStr == "" {
27+
log.Fatal("SERVICE_CONN_STR is empty for redis")
28+
}
29+
u := nosql.ToRedisURI(connStr)
30+
if u == nil {
31+
log.Fatal("SERVICE_CONN_STR %s is not a valid redis connection string", connStr)
32+
}
33+
GlobalLock.ServiceConnStr = connStr
34+
default:
35+
log.Fatal("Unknown sync lock service type: %s", GlobalLock.ServiceType)
36+
}
37+
}

modules/setting/global_lock_test.go

+35
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
// Copyright 2024 The Gitea Authors. All rights reserved.
2+
// SPDX-License-Identifier: MIT
3+
4+
package setting
5+
6+
import (
7+
"testing"
8+
9+
"github.com/stretchr/testify/assert"
10+
)
11+
12+
func TestLoadGlobalLockConfig(t *testing.T) {
13+
t.Run("DefaultGlobalLockConfig", func(t *testing.T) {
14+
iniStr := ``
15+
cfg, err := NewConfigProviderFromData(iniStr)
16+
assert.NoError(t, err)
17+
18+
loadGlobalLockFrom(cfg)
19+
assert.EqualValues(t, "memory", GlobalLock.ServiceType)
20+
})
21+
22+
t.Run("RedisGlobalLockConfig", func(t *testing.T) {
23+
iniStr := `
24+
[global_lock]
25+
SERVICE_TYPE = redis
26+
SERVICE_CONN_STR = addrs=127.0.0.1:6379 db=0
27+
`
28+
cfg, err := NewConfigProviderFromData(iniStr)
29+
assert.NoError(t, err)
30+
31+
loadGlobalLockFrom(cfg)
32+
assert.EqualValues(t, "redis", GlobalLock.ServiceType)
33+
assert.EqualValues(t, "addrs=127.0.0.1:6379 db=0", GlobalLock.ServiceConnStr)
34+
})
35+
}

modules/setting/setting.go

+1
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,7 @@ func loadCommonSettingsFrom(cfg ConfigProvider) error {
147147
loadGitFrom(cfg)
148148
loadMirrorFrom(cfg)
149149
loadMarkupFrom(cfg)
150+
loadGlobalLockFrom(cfg)
150151
loadOtherFrom(cfg)
151152
return nil
152153
}

modules/sync/exclusive_pool.go

-69
This file was deleted.

services/pull/check.go

+10-3
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
user_model "code.gitea.io/gitea/models/user"
2222
"code.gitea.io/gitea/modules/git"
2323
"code.gitea.io/gitea/modules/gitrepo"
24+
"code.gitea.io/gitea/modules/globallock"
2425
"code.gitea.io/gitea/modules/graceful"
2526
"code.gitea.io/gitea/modules/log"
2627
"code.gitea.io/gitea/modules/process"
@@ -334,9 +335,15 @@ func handler(items ...string) []string {
334335
}
335336

336337
func testPR(id int64) {
337-
pullWorkingPool.CheckIn(fmt.Sprint(id))
338-
defer pullWorkingPool.CheckOut(fmt.Sprint(id))
339-
ctx, _, finished := process.GetManager().AddContext(graceful.GetManager().HammerContext(), fmt.Sprintf("Test PR[%d] from patch checking queue", id))
338+
ctx := graceful.GetManager().HammerContext()
339+
releaser, err := globallock.Lock(ctx, getPullWorkingLockKey(id))
340+
if err != nil {
341+
log.Error("lock.Lock(): %v", err)
342+
return
343+
}
344+
defer releaser()
345+
346+
ctx, _, finished := process.GetManager().AddContext(ctx, fmt.Sprintf("Test PR[%d] from patch checking queue", id))
340347
defer finished()
341348

342349
pr, err := issues_model.GetPullRequestByID(ctx, id)

services/pull/merge.go

+18-7
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
user_model "code.gitea.io/gitea/models/user"
2424
"code.gitea.io/gitea/modules/cache"
2525
"code.gitea.io/gitea/modules/git"
26+
"code.gitea.io/gitea/modules/globallock"
2627
"code.gitea.io/gitea/modules/httplib"
2728
"code.gitea.io/gitea/modules/log"
2829
"code.gitea.io/gitea/modules/references"
@@ -169,9 +170,6 @@ func Merge(ctx context.Context, pr *issues_model.PullRequest, doer *user_model.U
169170
return fmt.Errorf("unable to load head repo: %w", err)
170171
}
171172

172-
pullWorkingPool.CheckIn(fmt.Sprint(pr.ID))
173-
defer pullWorkingPool.CheckOut(fmt.Sprint(pr.ID))
174-
175173
prUnit, err := pr.BaseRepo.GetUnit(ctx, unit.TypePullRequests)
176174
if err != nil {
177175
log.Error("pr.BaseRepo.GetUnit(unit.TypePullRequests): %v", err)
@@ -184,11 +182,18 @@ func Merge(ctx context.Context, pr *issues_model.PullRequest, doer *user_model.U
184182
return models.ErrInvalidMergeStyle{ID: pr.BaseRepo.ID, Style: mergeStyle}
185183
}
186184

185+
releaser, err := globallock.Lock(ctx, getPullWorkingLockKey(pr.ID))
186+
if err != nil {
187+
log.Error("lock.Lock(): %v", err)
188+
return fmt.Errorf("lock.Lock: %w", err)
189+
}
190+
defer releaser()
187191
defer func() {
188192
go AddTestPullRequestTask(doer, pr.BaseRepo.ID, pr.BaseBranch, false, "", "")
189193
}()
190194

191195
_, err = doMergeAndPush(ctx, pr, doer, mergeStyle, expectedHeadCommitID, message, repo_module.PushTriggerPRMergeToBase)
196+
releaser()
192197
if err != nil {
193198
return err
194199
}
@@ -487,10 +492,14 @@ func CheckPullBranchProtections(ctx context.Context, pr *issues_model.PullReques
487492

488493
// MergedManually mark pr as merged manually
489494
func MergedManually(ctx context.Context, pr *issues_model.PullRequest, doer *user_model.User, baseGitRepo *git.Repository, commitID string) error {
490-
pullWorkingPool.CheckIn(fmt.Sprint(pr.ID))
491-
defer pullWorkingPool.CheckOut(fmt.Sprint(pr.ID))
495+
releaser, err := globallock.Lock(ctx, getPullWorkingLockKey(pr.ID))
496+
if err != nil {
497+
log.Error("lock.Lock(): %v", err)
498+
return fmt.Errorf("lock.Lock: %w", err)
499+
}
500+
defer releaser()
492501

493-
if err := db.WithTx(ctx, func(ctx context.Context) error {
502+
err = db.WithTx(ctx, func(ctx context.Context) error {
494503
if err := pr.LoadBaseRepo(ctx); err != nil {
495504
return err
496505
}
@@ -540,7 +549,9 @@ func MergedManually(ctx context.Context, pr *issues_model.PullRequest, doer *use
540549
return fmt.Errorf("SetMerged failed")
541550
}
542551
return nil
543-
}); err != nil {
552+
})
553+
releaser()
554+
if err != nil {
544555
return err
545556
}
546557

services/pull/pull.go

+10-5
Original file line numberDiff line numberDiff line change
@@ -25,20 +25,21 @@ import (
2525
"code.gitea.io/gitea/modules/container"
2626
"code.gitea.io/gitea/modules/git"
2727
"code.gitea.io/gitea/modules/gitrepo"
28+
"code.gitea.io/gitea/modules/globallock"
2829
"code.gitea.io/gitea/modules/graceful"
2930
"code.gitea.io/gitea/modules/json"
3031
"code.gitea.io/gitea/modules/log"
3132
repo_module "code.gitea.io/gitea/modules/repository"
3233
"code.gitea.io/gitea/modules/setting"
33-
"code.gitea.io/gitea/modules/sync"
3434
"code.gitea.io/gitea/modules/util"
3535
gitea_context "code.gitea.io/gitea/services/context"
3636
issue_service "code.gitea.io/gitea/services/issue"
3737
notify_service "code.gitea.io/gitea/services/notify"
3838
)
3939

40-
// TODO: use clustered lock (unique queue? or *abuse* cache)
41-
var pullWorkingPool = sync.NewExclusivePool()
40+
func getPullWorkingLockKey(prID int64) string {
41+
return fmt.Sprintf("pull_working_%d", prID)
42+
}
4243

4344
// NewPullRequest creates new pull request with labels for repository.
4445
func NewPullRequest(ctx context.Context, repo *repo_model.Repository, issue *issues_model.Issue, labelIDs []int64, uuids []string, pr *issues_model.PullRequest, assigneeIDs []int64) error {
@@ -202,8 +203,12 @@ func NewPullRequest(ctx context.Context, repo *repo_model.Repository, issue *iss
202203

203204
// ChangeTargetBranch changes the target branch of this pull request, as the given user.
204205
func ChangeTargetBranch(ctx context.Context, pr *issues_model.PullRequest, doer *user_model.User, targetBranch string) (err error) {
205-
pullWorkingPool.CheckIn(fmt.Sprint(pr.ID))
206-
defer pullWorkingPool.CheckOut(fmt.Sprint(pr.ID))
206+
releaser, err := globallock.Lock(ctx, getPullWorkingLockKey(pr.ID))
207+
if err != nil {
208+
log.Error("lock.Lock(): %v", err)
209+
return fmt.Errorf("lock.Lock: %w", err)
210+
}
211+
defer releaser()
207212

208213
// Current target branch is already the same
209214
if pr.BaseBranch == targetBranch {

services/pull/update.go

+7-2
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"code.gitea.io/gitea/models/unit"
1515
user_model "code.gitea.io/gitea/models/user"
1616
"code.gitea.io/gitea/modules/git"
17+
"code.gitea.io/gitea/modules/globallock"
1718
"code.gitea.io/gitea/modules/log"
1819
"code.gitea.io/gitea/modules/repository"
1920
)
@@ -25,8 +26,12 @@ func Update(ctx context.Context, pr *issues_model.PullRequest, doer *user_model.
2526
return fmt.Errorf("update of agit flow pull request's head branch is unsupported")
2627
}
2728

28-
pullWorkingPool.CheckIn(fmt.Sprint(pr.ID))
29-
defer pullWorkingPool.CheckOut(fmt.Sprint(pr.ID))
29+
releaser, err := globallock.Lock(ctx, getPullWorkingLockKey(pr.ID))
30+
if err != nil {
31+
log.Error("lock.Lock(): %v", err)
32+
return fmt.Errorf("lock.Lock: %w", err)
33+
}
34+
defer releaser()
3035

3136
diffCount, err := GetDiverging(ctx, pr)
3237
if err != nil {

0 commit comments

Comments
 (0)