Skip to content

Commit 5263432

Browse files
[release-1.16] JobSink: Delete secrets associated with jobs when jobs are deleted (#8332)
* JobSink: Delete secrets associated with jobs when jobs are deleted As reported in #8323 old JobSink secrets lead to processing old events again while new events are lost. Using OwnerReference and k8s garbage collection, now a secret created for a given event is bound to a given Job lifecycle, so that when a job is deleted, the associated secret will be deleted. Signed-off-by: Pierangelo Di Pilato <[email protected]> * Fix jobsink name generator + add unit and fuzz tests Signed-off-by: Pierangelo Di Pilato <[email protected]> * Fix e2e test Signed-off-by: Pierangelo Di Pilato <[email protected]> * Lint Signed-off-by: Pierangelo Di Pilato <[email protected]> --------- Signed-off-by: Pierangelo Di Pilato <[email protected]> Co-authored-by: Pierangelo Di Pilato <[email protected]>
1 parent a164684 commit 5263432

File tree

5 files changed

+250
-56
lines changed

5 files changed

+250
-56
lines changed

Diff for: cmd/jobsink/main.go

+71-53
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"context"
2121
"crypto/md5" //nolint:gosec
2222
"crypto/tls"
23+
"encoding/hex"
2324
"fmt"
2425
"log"
2526
"net/http"
@@ -231,11 +232,11 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
231232
return
232233
}
233234

234-
id := toIdHashLabelValue(event.Source(), event.ID())
235-
logger.Debug("Getting job for event", zap.String("URI", r.RequestURI), zap.String("id", id))
235+
jobName := toJobName(ref.Name, event.Source(), event.ID())
236+
logger.Debug("Getting job for event", zap.String("URI", r.RequestURI), zap.String("jobName", jobName))
236237

237238
jobs, err := h.k8s.BatchV1().Jobs(js.GetNamespace()).List(r.Context(), metav1.ListOptions{
238-
LabelSelector: jobLabelSelector(ref, id),
239+
LabelSelector: jobLabelSelector(ref, jobName),
239240
Limit: 1,
240241
})
241242
if err != nil {
@@ -256,56 +257,21 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
256257
return
257258
}
258259

259-
jobName := kmeta.ChildName(ref.Name, id)
260-
261-
logger.Debug("Creating secret for event", zap.String("URI", r.RequestURI), zap.String("jobName", jobName))
262-
263-
jobSinkUID := js.GetUID()
264-
265-
or := metav1.OwnerReference{
266-
APIVersion: sinksv.SchemeGroupVersion.String(),
267-
Kind: sinks.JobSinkResource.Resource,
268-
Name: js.GetName(),
269-
UID: jobSinkUID,
270-
Controller: ptr.Bool(true),
271-
BlockOwnerDeletion: ptr.Bool(false),
272-
}
273-
274-
secret := &corev1.Secret{
275-
TypeMeta: metav1.TypeMeta{},
276-
ObjectMeta: metav1.ObjectMeta{
277-
Name: jobName,
278-
Namespace: ref.Namespace,
279-
Labels: map[string]string{
280-
sinks.JobSinkIDLabel: id,
281-
sinks.JobSinkNameLabel: ref.Name,
282-
},
283-
OwnerReferences: []metav1.OwnerReference{or},
284-
},
285-
Immutable: ptr.Bool(true),
286-
Data: map[string][]byte{"event": eventBytes},
287-
Type: corev1.SecretTypeOpaque,
288-
}
289-
290-
_, err = h.k8s.CoreV1().Secrets(ref.Namespace).Create(r.Context(), secret, metav1.CreateOptions{})
291-
if err != nil && !apierrors.IsAlreadyExists(err) {
292-
logger.Warn("Failed to create secret", zap.Error(err))
293-
294-
w.Header().Add("Reason", err.Error())
295-
w.WriteHeader(http.StatusInternalServerError)
296-
return
297-
}
298-
299-
logger.Debug("Creating job for event", zap.String("URI", r.RequestURI), zap.String("jobName", jobName))
300-
301260
job := js.Spec.Job.DeepCopy()
302261
job.Name = jobName
303262
if job.Labels == nil {
304263
job.Labels = make(map[string]string, 4)
305264
}
306-
job.Labels[sinks.JobSinkIDLabel] = id
265+
job.Labels[sinks.JobSinkIDLabel] = jobName
307266
job.Labels[sinks.JobSinkNameLabel] = ref.Name
308-
job.OwnerReferences = append(job.OwnerReferences, or)
267+
job.OwnerReferences = append(job.OwnerReferences, metav1.OwnerReference{
268+
APIVersion: sinksv.SchemeGroupVersion.String(),
269+
Kind: sinks.JobSinkResource.Resource,
270+
Name: js.GetName(),
271+
UID: js.GetUID(),
272+
Controller: ptr.Bool(true),
273+
BlockOwnerDeletion: ptr.Bool(false),
274+
})
309275
var mountPathName string
310276
for i := range job.Spec.Template.Spec.Containers {
311277
found := false
@@ -346,14 +312,66 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
346312
})
347313
}
348314

349-
_, err = h.k8s.BatchV1().Jobs(ref.Namespace).Create(r.Context(), job, metav1.CreateOptions{})
350-
if err != nil {
315+
logger.Debug("Creating job for event",
316+
zap.String("URI", r.RequestURI),
317+
zap.String("jobName", jobName),
318+
zap.Any("job", job),
319+
)
320+
321+
createdJob, err := h.k8s.BatchV1().Jobs(ref.Namespace).Create(r.Context(), job, metav1.CreateOptions{})
322+
if err != nil && !apierrors.IsAlreadyExists(err) {
351323
logger.Warn("Failed to create job", zap.Error(err))
352324

353325
w.Header().Add("Reason", err.Error())
354326
w.WriteHeader(http.StatusInternalServerError)
355327
return
356328
}
329+
if apierrors.IsAlreadyExists(err) {
330+
logger.Debug("Job already exists", zap.String("URI", r.RequestURI), zap.String("jobName", jobName))
331+
}
332+
333+
secret := &corev1.Secret{
334+
TypeMeta: metav1.TypeMeta{},
335+
ObjectMeta: metav1.ObjectMeta{
336+
Name: jobName,
337+
Namespace: ref.Namespace,
338+
Labels: map[string]string{
339+
sinks.JobSinkIDLabel: jobName,
340+
sinks.JobSinkNameLabel: ref.Name,
341+
},
342+
OwnerReferences: []metav1.OwnerReference{
343+
{
344+
APIVersion: "batch/v1",
345+
Kind: "Job",
346+
Name: createdJob.Name,
347+
UID: createdJob.UID,
348+
Controller: ptr.Bool(true),
349+
BlockOwnerDeletion: ptr.Bool(false),
350+
},
351+
},
352+
},
353+
Immutable: ptr.Bool(true),
354+
Data: map[string][]byte{"event": eventBytes},
355+
Type: corev1.SecretTypeOpaque,
356+
}
357+
358+
logger.Debug("Creating secret for event",
359+
zap.String("URI", r.RequestURI),
360+
zap.String("jobName", jobName),
361+
zap.Any("secret.metadata", secret.ObjectMeta),
362+
)
363+
364+
_, err = h.k8s.CoreV1().Secrets(ref.Namespace).Create(r.Context(), secret, metav1.CreateOptions{})
365+
if err != nil && !apierrors.IsAlreadyExists(err) {
366+
logger.Warn("Failed to create secret", zap.Error(err))
367+
368+
w.Header().Add("Reason", err.Error())
369+
w.WriteHeader(http.StatusInternalServerError)
370+
return
371+
}
372+
if apierrors.IsAlreadyExists(err) {
373+
logger.Debug("Secret already exists", zap.String("URI", r.RequestURI), zap.String("jobName", jobName))
374+
}
357375

358376
w.Header().Add("Location", locationHeader(ref, event.Source(), event.ID()))
359377
w.WriteHeader(http.StatusAccepted)
@@ -391,8 +409,7 @@ func (h *Handler) handleGet(ctx context.Context, w http.ResponseWriter, r *http.
391409
eventSource := parts[6]
392410
eventID := parts[8]
393411

394-
id := toIdHashLabelValue(eventSource, eventID)
395-
jobName := kmeta.ChildName(ref.Name, id)
412+
jobName := toJobName(ref.Name, eventSource, eventID)
396413

397414
job, err := h.k8s.BatchV1().Jobs(ref.Namespace).Get(r.Context(), jobName, metav1.GetOptions{})
398415
if err != nil {
@@ -445,6 +462,7 @@ func jobLabelSelector(ref types.NamespacedName, id string) string {
445462
return fmt.Sprintf("%s=%s,%s=%s", sinks.JobSinkIDLabel, id, sinks.JobSinkNameLabel, ref.Name)
446463
}
447464

448-
func toIdHashLabelValue(source, id string) string {
449-
return utils.ToDNS1123Subdomain(fmt.Sprintf("%s", md5.Sum([]byte(fmt.Sprintf("%s-%s", source, id))))) //nolint:gosec
465+
func toJobName(js string, source, id string) string {
466+
h := md5.Sum([]byte(source + id)) //nolint:gosec
467+
return kmeta.ChildName(js+"-", utils.ToDNS1123Subdomain(hex.EncodeToString(h[:])))
450468
}

Diff for: cmd/jobsink/main_test.go

+106
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
/*
2+
Copyright 2024 The Knative Authors
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package main
18+
19+
import (
20+
"testing"
21+
22+
"k8s.io/apimachinery/pkg/api/validation"
23+
24+
"knative.dev/eventing/pkg/utils"
25+
)
26+
27+
type testCase struct {
28+
JobSinkName string
29+
Source string
30+
Id string
31+
}
32+
33+
func TestToJobName(t *testing.T) {
34+
testcases := []testCase{
35+
{
36+
JobSinkName: "job-sink-success",
37+
Source: "mysource3/myservice",
38+
Id: "2234-5678",
39+
},
40+
{
41+
JobSinkName: "a",
42+
Source: "0",
43+
Id: "0",
44+
},
45+
}
46+
47+
for _, tc := range testcases {
48+
t.Run(tc.JobSinkName+"_"+tc.Source+"_"+tc.Id, func(t *testing.T) {
49+
if errs := validation.NameIsDNS1035Label(tc.JobSinkName, false); len(errs) != 0 {
50+
t.Errorf("Invalid JobSinkName: %v", errs)
51+
}
52+
53+
name := toJobName(tc.JobSinkName, tc.Source, tc.Id)
54+
doubleName := toJobName(tc.JobSinkName, tc.Source, tc.Id)
55+
if name != doubleName {
56+
t.Errorf("Before: %q, after: %q", name, doubleName)
57+
}
58+
59+
if got := utils.ToDNS1123Subdomain(name); got != name {
60+
t.Errorf("ToDNS1123Subdomain(Want) returns a different result, Want: %q, Got: %q", name, got)
61+
}
62+
63+
if errs := validation.NameIsDNS1035Label(name, false); len(errs) != 0 {
64+
t.Errorf("toJobName produced invalid name %q given %q, %q, %q: errors: %#v", name, tc.JobSinkName, tc.Source, tc.Id, errs)
65+
}
66+
})
67+
}
68+
}
69+
70+
func FuzzToJobName(f *testing.F) {
71+
testcases := []testCase{
72+
{
73+
JobSinkName: "job-sink-success",
74+
Source: "mysource3/myservice",
75+
Id: "2234-5678",
76+
},
77+
{
78+
JobSinkName: "a",
79+
Source: "0",
80+
Id: "0",
81+
},
82+
}
83+
84+
for _, tc := range testcases {
85+
f.Add(tc.JobSinkName, tc.Source, tc.Id) // Use f.Add to provide a seed corpus
86+
}
87+
f.Fuzz(func(t *testing.T, js, source, id string) {
88+
if errs := validation.NameIsDNSLabel(js, false); len(errs) != 0 {
89+
t.Skip("Prerequisite: invalid jobsink name")
90+
}
91+
92+
name := toJobName(js, source, id)
93+
doubleName := toJobName(js, source, id)
94+
if name != doubleName {
95+
t.Errorf("Before: %q, after: %q", name, doubleName)
96+
}
97+
98+
if got := utils.ToDNS1123Subdomain(name); got != name {
99+
t.Errorf("ToDNS1123Subdomain(Want) returns a different result, Want: %q, Got: %q", name, got)
100+
}
101+
102+
if errs := validation.NameIsDNSLabel(name, false); len(errs) != 0 {
103+
t.Errorf("toJobName produced invalid name %q given %q, %q, %q: errors: %#v", name, js, source, id, errs)
104+
}
105+
})
106+
}

Diff for: hack/e2e-debug.sh

+1-1
Original file line numberDiff line numberDiff line change
@@ -35,4 +35,4 @@ wait_until_pods_running knative-eventing || fail_test "Pods in knative-eventing
3535

3636
header "Running tests"
3737

38-
go test -tags=e2e -v -timeout=30m -run="${test_name}" "${test_dir}" || fail_test "Test(s) failed"
38+
go test -tags=e2e -v -timeout=30m -parallel=12 -run="${test_name}" "${test_dir}" || fail_test "Test(s) failed"

Diff for: test/rekt/features/jobsink/jobsink.go

+55-1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
cetest "github.com/cloudevents/sdk-go/v2/test"
2525
"github.com/google/uuid"
2626
batchv1 "k8s.io/api/batch/v1"
27+
corev1 "k8s.io/api/core/v1"
2728
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2829
"k8s.io/apimachinery/pkg/runtime/schema"
2930
"k8s.io/apimachinery/pkg/util/wait"
@@ -42,11 +43,14 @@ import (
4243
"knative.dev/eventing/test/rekt/resources/jobsink"
4344
)
4445

45-
func Success() *feature.Feature {
46+
func Success(jobSinkName string) *feature.Feature {
4647
f := feature.NewFeature()
4748

4849
sink := feature.MakeRandomK8sName("sink")
4950
jobSink := feature.MakeRandomK8sName("jobsink")
51+
if jobSinkName != "" {
52+
jobSink = jobSinkName
53+
}
5054
source := feature.MakeRandomK8sName("source")
5155

5256
sinkURL := &apis.URL{Scheme: "http", Host: sink}
@@ -78,6 +82,32 @@ func Success() *feature.Feature {
7882
return f
7983
}
8084

85+
func DeleteJobsCascadeSecretsDeletion(jobSink string) *feature.Feature {
86+
f := feature.NewFeature()
87+
88+
f.Setup("Prerequisite: At least one secret for jobsink present", verifySecretsForJobSink(jobSink, func(secrets *corev1.SecretList) bool {
89+
return len(secrets.Items) > 0
90+
}))
91+
92+
f.Requirement("delete jobs for jobsink", func(ctx context.Context, t feature.T) {
93+
policy := metav1.DeletePropagationBackground
94+
err := kubeclient.Get(ctx).BatchV1().
95+
Jobs(environment.FromContext(ctx).Namespace()).
96+
DeleteCollection(ctx, metav1.DeleteOptions{PropagationPolicy: &policy}, metav1.ListOptions{
97+
LabelSelector: fmt.Sprintf("%s=%s", sinks.JobSinkNameLabel, jobSink),
98+
})
99+
if err != nil {
100+
t.Error(err)
101+
}
102+
})
103+
104+
f.Assert("No secrets for jobsink are present", verifySecretsForJobSink(jobSink, func(secrets *corev1.SecretList) bool {
105+
return len(secrets.Items) == 0
106+
}))
107+
108+
return f
109+
}
110+
81111
func SuccessTLS() *feature.Feature {
82112
f := feature.NewFeature()
83113

@@ -234,3 +264,27 @@ func AtLeastOneJobIsComplete(jobSinkName string) feature.StepFn {
234264
t.Errorf("No job is complete:\n%v", string(bytes))
235265
}
236266
}
267+
268+
func verifySecretsForJobSink(jobSink string, verify func(secrets *corev1.SecretList) bool) feature.StepFn {
269+
return func(ctx context.Context, t feature.T) {
270+
271+
interval, timeout := environment.PollTimingsFromContext(ctx)
272+
lastSecretList := &corev1.SecretList{}
273+
err := wait.PollUntilContextTimeout(ctx, interval, timeout, true, func(ctx context.Context) (bool, error) {
274+
var err error
275+
lastSecretList, err = kubeclient.Get(ctx).CoreV1().
276+
Secrets(environment.FromContext(ctx).Namespace()).
277+
List(ctx, metav1.ListOptions{
278+
LabelSelector: fmt.Sprintf("%s=%s", sinks.JobSinkNameLabel, jobSink),
279+
})
280+
if err != nil {
281+
return false, fmt.Errorf("failed to list secrets: %w", err)
282+
}
283+
return verify(lastSecretList), nil
284+
})
285+
if err != nil {
286+
bytes, _ := json.Marshal(lastSecretList)
287+
t.Errorf("failed to wait for no secrets: %v\nSecret list:\n%s", err, string(bytes))
288+
}
289+
}
290+
}

0 commit comments

Comments
 (0)