Skip to content

Commit 16e3d7b

Browse files
authored
Merge pull request #24841 from chaodaiG/crier-gerrit-multiple-reporters
Crier gerrit reporter: worker can be more than 1
2 parents 56b8c7e + b7ebf19 commit 16e3d7b

File tree

14 files changed

+892
-201
lines changed

14 files changed

+892
-201
lines changed

prow/cmd/crier/main.go

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -78,14 +78,6 @@ type options struct {
7878
}
7979

8080
func (o *options) validate() error {
81-
82-
// TODO(krzyzacy): gerrit && github report are actually stateful..
83-
// Need a better design to re-enable parallel reporting
84-
if o.gerritWorkers > 1 {
85-
logrus.Warn("gerrit reporter only supports one worker")
86-
o.gerritWorkers = 1
87-
}
88-
8981
if o.gerritWorkers+o.pubsubWorkers+o.githubWorkers+o.slackWorkers+o.gcsWorkers+o.k8sGCSWorkers+o.blobStorageWorkers+o.k8sBlobStorageWorkers <= 0 {
9082
return errors.New("crier need to have at least one report worker to start")
9183
}
@@ -253,7 +245,7 @@ func main() {
253245
}
254246

255247
if o.gerritWorkers > 0 {
256-
gerritReporter, err := gerritreporter.NewReporter(cfg, o.cookiefilePath, o.gerritProjects, mgr.GetCache())
248+
gerritReporter, err := gerritreporter.NewReporter(cfg, o.cookiefilePath, o.gerritProjects, mgr.GetClient())
257249
if err != nil {
258250
logrus.WithError(err).Fatal("Error starting gerrit reporter")
259251
}

prow/cmd/crier/main_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,10 +51,10 @@ func TestOptions(t *testing.T) {
5151
},
5252
//Gerrit Reporter
5353
{
54-
name: "gerrit only support one worker",
54+
name: "gerrit supports multiple workers",
5555
args: []string{"--gerrit-workers=99", "--gerrit-projects=foo=bar", "--cookiefile=foobar", "--config-path=foo"},
5656
expected: &options{
57-
gerritWorkers: 1,
57+
gerritWorkers: 99,
5858
cookiefilePath: "foobar",
5959
gerritProjects: map[string][]string{
6060
"foo": {"bar"},
@@ -74,7 +74,7 @@ func TestOptions(t *testing.T) {
7474
name: "gerrit missing --cookiefile",
7575
args: []string{"--gerrit-workers=5", "--gerrit-projects=foo=bar", "--config-path=foo"},
7676
expected: &options{
77-
gerritWorkers: 1,
77+
gerritWorkers: 5,
7878
gerritProjects: map[string][]string{
7979
"foo": {"bar"},
8080
},

prow/crier/BUILD.bazel

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,10 @@ go_library(
1010
visibility = ["//visibility:public"],
1111
deps = [
1212
"//prow/apis/prowjobs/v1:go_default_library",
13+
"//prow/crier/reporters/criercommonlib:go_default_library",
1314
"@com_github_prometheus_client_golang//prometheus:go_default_library",
1415
"@com_github_sirupsen_logrus//:go_default_library",
1516
"@io_k8s_apimachinery//pkg/api/errors:go_default_library",
16-
"@io_k8s_apimachinery//pkg/types:go_default_library",
17-
"@io_k8s_apimachinery//pkg/util/wait:go_default_library",
18-
"@io_k8s_client_go//util/retry:go_default_library",
1917
"@io_k8s_sigs_controller_runtime//pkg/builder:go_default_library",
2018
"@io_k8s_sigs_controller_runtime//pkg/client:go_default_library",
2119
"@io_k8s_sigs_controller_runtime//pkg/controller:go_default_library",
@@ -35,6 +33,7 @@ filegroup(
3533
name = "all-srcs",
3634
srcs = [
3735
":package-srcs",
36+
"//prow/crier/reporters/criercommonlib:all-srcs",
3837
"//prow/crier/reporters/gcs:all-srcs",
3938
"//prow/crier/reporters/gerrit:all-srcs",
4039
"//prow/crier/reporters/github:all-srcs",

prow/crier/controller.go

Lines changed: 2 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -24,16 +24,14 @@ import (
2424

2525
"github.com/sirupsen/logrus"
2626
"k8s.io/apimachinery/pkg/api/errors"
27-
"k8s.io/apimachinery/pkg/types"
28-
"k8s.io/apimachinery/pkg/util/wait"
29-
"k8s.io/client-go/util/retry"
3027
"sigs.k8s.io/controller-runtime/pkg/builder"
3128
ctrlruntimeclient "sigs.k8s.io/controller-runtime/pkg/client"
3229
"sigs.k8s.io/controller-runtime/pkg/controller"
3330
"sigs.k8s.io/controller-runtime/pkg/manager"
3431
"sigs.k8s.io/controller-runtime/pkg/reconcile"
3532

3633
prowv1 "k8s.io/test-infra/prow/apis/prowjobs/v1"
34+
"k8s.io/test-infra/prow/crier/reporters/criercommonlib"
3735
)
3836

3937
type ReportClient interface {
@@ -80,66 +78,6 @@ func New(
8078
return nil
8179
}
8280

83-
func (r *reconciler) updateReportState(ctx context.Context, pj *prowv1.ProwJob, log *logrus.Entry, reportedState prowv1.ProwJobState) error {
84-
// update pj report status
85-
newpj := pj.DeepCopy()
86-
// we set omitempty on PrevReportStates, so here we need to init it if is nil
87-
if newpj.Status.PrevReportStates == nil {
88-
newpj.Status.PrevReportStates = map[string]prowv1.ProwJobState{}
89-
}
90-
newpj.Status.PrevReportStates[r.reporter.GetName()] = reportedState
91-
92-
if err := r.pjclientset.Patch(ctx, newpj, ctrlruntimeclient.MergeFrom(pj)); err != nil {
93-
return fmt.Errorf("failed to patch: %w", err)
94-
}
95-
96-
// Block until the update is in the lister to make sure that events from another controller
97-
// that also does reporting dont trigger another report because our lister doesn't yet contain
98-
// the updated Status
99-
name := types.NamespacedName{Namespace: pj.Namespace, Name: pj.Name}
100-
if err := wait.Poll(100*time.Millisecond, 10*time.Second, func() (bool, error) {
101-
if err := r.pjclientset.Get(ctx, name, pj); err != nil {
102-
return false, err
103-
}
104-
if pj.Status.PrevReportStates != nil &&
105-
pj.Status.PrevReportStates[r.reporter.GetName()] == reportedState {
106-
return true, nil
107-
}
108-
return false, nil
109-
}); err != nil {
110-
return fmt.Errorf("failed to wait for updated report status to be in lister: %w", err)
111-
}
112-
return nil
113-
}
114-
115-
func (r *reconciler) updateReportStateWithRetries(ctx context.Context, pj *prowv1.ProwJob, log *logrus.Entry) error {
116-
reportState := pj.Status.State
117-
log = log.WithFields(logrus.Fields{
118-
"prowjob": pj.Name,
119-
"jobName": pj.Spec.Job,
120-
"jobStatus": reportState,
121-
})
122-
// We have to retry here, if we return we lose the information that we already reported this job.
123-
if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
124-
// Get it first, this is very cheap
125-
name := types.NamespacedName{Namespace: pj.Namespace, Name: pj.Name}
126-
if err := r.pjclientset.Get(ctx, name, pj); err != nil {
127-
return err
128-
}
129-
// Must not wrap until we have kube 1.19, otherwise the RetryOnConflict won't recognize conflicts
130-
// correctly
131-
return r.updateReportState(ctx, pj, log, reportState)
132-
}); err != nil {
133-
// Very subpar, we will report again. But even if we didn't do that now, we would do so
134-
// latest when crier gets restarted. In an ideal world, all reporters are idempotent and
135-
// reporting has no cost.
136-
return fmt.Errorf("failed to update report state on prowjob: %w", err)
137-
}
138-
139-
log.Info("Successfully updated report state on prowjob")
140-
return nil
141-
}
142-
14381
// Reconcile retrieves each queued item and takes the necessary handler action based off of if
14482
// the item was created or deleted.
14583
func (r *reconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
@@ -207,7 +145,7 @@ func (r *reconciler) reconcile(ctx context.Context, log *logrus.Entry, req recon
207145
log.WithField("job-count", len(pjs)).Info("Reported job(s), now will update pj(s).")
208146
var lastErr error
209147
for _, pjob := range pjs {
210-
if err := r.updateReportStateWithRetries(ctx, pjob, log); err != nil {
148+
if err := criercommonlib.UpdateReportStateWithRetries(ctx, pjob, log, r.pjclientset, r.reporter.GetName()); err != nil {
211149
log.WithError(err).Error("Failed to update report state on prowjob")
212150
// The error above is alreay logged, so it would be duplicated
213151
// effort to combine all errors to return, only capture the last
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
2+
3+
go_library(
4+
name = "go_default_library",
5+
srcs = [
6+
"shardedlock.go",
7+
"updatereportstatus.go",
8+
],
9+
importpath = "k8s.io/test-infra/prow/crier/reporters/criercommonlib",
10+
visibility = ["//visibility:public"],
11+
deps = [
12+
"//prow/apis/prowjobs/v1:go_default_library",
13+
"@com_github_sirupsen_logrus//:go_default_library",
14+
"@io_k8s_apimachinery//pkg/types:go_default_library",
15+
"@io_k8s_apimachinery//pkg/util/wait:go_default_library",
16+
"@io_k8s_client_go//util/retry:go_default_library",
17+
"@io_k8s_sigs_controller_runtime//pkg/client:go_default_library",
18+
"@org_golang_x_sync//semaphore:go_default_library",
19+
],
20+
)
21+
22+
filegroup(
23+
name = "package-srcs",
24+
srcs = glob(["**"]),
25+
tags = ["automanaged"],
26+
visibility = ["//visibility:private"],
27+
)
28+
29+
filegroup(
30+
name = "all-srcs",
31+
srcs = [":package-srcs"],
32+
tags = ["automanaged"],
33+
visibility = ["//visibility:public"],
34+
)
35+
36+
go_test(
37+
name = "go_default_test",
38+
srcs = ["shardedlock_test.go"],
39+
embed = [":go_default_library"],
40+
tags = ["manual"],
41+
deps = ["@org_golang_x_sync//semaphore:go_default_library"],
42+
)
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
/*
2+
Copyright 2022 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
// Package criercommonlib contains shared lib used by reporters
18+
package criercommonlib
19+
20+
import (
21+
"context"
22+
"time"
23+
24+
"github.com/sirupsen/logrus"
25+
"golang.org/x/sync/semaphore"
26+
)
27+
28+
// SimplePull contains info for identifying a shard
29+
type SimplePull struct {
30+
org, repo string
31+
number int
32+
}
33+
34+
// NewSimplePull creates SimplePull
35+
func NewSimplePull(org, repo string, number int) *SimplePull {
36+
return &SimplePull{org: org, repo: repo, number: number}
37+
}
38+
39+
// ShardedLock contains sharding information based on PRs
40+
type ShardedLock struct {
41+
// semaphore is chosed over mutex, as Acquire from semaphore respects
42+
// context timeout while mutex doesn't
43+
mapLock *semaphore.Weighted
44+
locks map[SimplePull]*semaphore.Weighted
45+
}
46+
47+
// NewShardedLock creates ShardedLock
48+
func NewShardedLock() *ShardedLock {
49+
return &ShardedLock{
50+
mapLock: semaphore.NewWeighted(1),
51+
locks: map[SimplePull]*semaphore.Weighted{},
52+
}
53+
}
54+
55+
// GetLock aquires the lock for a PR
56+
func (s *ShardedLock) GetLock(ctx context.Context, key SimplePull) (*semaphore.Weighted, error) {
57+
if err := s.mapLock.Acquire(ctx, 1); err != nil {
58+
return nil, err
59+
}
60+
defer s.mapLock.Release(1)
61+
if _, exists := s.locks[key]; !exists {
62+
s.locks[key] = semaphore.NewWeighted(1)
63+
}
64+
return s.locks[key], nil
65+
}
66+
67+
// Cleanup deletes all locks by acquiring first
68+
// the mapLock and then each individual lock before
69+
// deleting it. The individual lock must be acquired
70+
// because otherwise it may be held, we delete it from
71+
// the map, it gets recreated and acquired and two
72+
// routines report in parallel for the same job.
73+
// Note that while this function is running, no new
74+
// presubmit reporting can happen, as we hold the mapLock.
75+
func (s *ShardedLock) Cleanup() {
76+
ctx := context.Background()
77+
s.mapLock.Acquire(ctx, 1)
78+
defer s.mapLock.Release(1)
79+
80+
for key, lock := range s.locks {
81+
// There is a very low chance of race condition, that two threads got
82+
// different locks from the same PR, which would end up with duplicated
83+
// report once. Since this is very complicated to fix and the impact is
84+
// really low, would just keep it as is.
85+
// For details see: https://github.com/kubernetes/test-infra/pull/20343
86+
lock.Acquire(ctx, 1)
87+
delete(s.locks, key)
88+
lock.Release(1)
89+
}
90+
}
91+
92+
// RunCleanup asynchronously runs the cleanup once per hour.
93+
func (s *ShardedLock) RunCleanup() {
94+
go func() {
95+
for range time.Tick(time.Hour) {
96+
logrus.Debug("Starting to clean up presubmit locks")
97+
startTime := time.Now()
98+
s.Cleanup()
99+
logrus.WithField("duration", time.Since(startTime).String()).Debug("Finished cleaning up presubmit locks")
100+
}
101+
}()
102+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
Copyright 2022 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
// Package criercommonlib contains shared lib used by reporters
18+
package criercommonlib
19+
20+
import (
21+
"testing"
22+
23+
"golang.org/x/sync/semaphore"
24+
)
25+
26+
func TestShardedLockCleanup(t *testing.T) {
27+
t.Parallel()
28+
sl := &ShardedLock{mapLock: semaphore.NewWeighted(1), locks: map[SimplePull]*semaphore.Weighted{}}
29+
key := SimplePull{"org", "repo", 1}
30+
sl.locks[key] = semaphore.NewWeighted(1)
31+
sl.Cleanup()
32+
if _, exists := sl.locks[key]; exists {
33+
t.Error("lock didn't get cleaned up")
34+
}
35+
36+
}

0 commit comments

Comments
 (0)