Skip to content

Commit 8d42a34

Browse files
author
Kubernetes Submit Queue
authored
Merge pull request kubernetes#48597 from crassirostris/sd-logging-e2e-fix-soak
Automatic merge from submit-queue (batch tested with PRs 47040, 48597, 48608, 48653) Fix Stackdriver Logging e2e soak tests Start reading logs in parallel with running pods in soak tests Fixed kubernetes#48606
2 parents 9d079c4 + 01df709 commit 8d42a34

File tree

5 files changed

+58
-36
lines changed

5 files changed

+58
-36
lines changed

test/e2e/cluster-logging/es.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ var _ = framework.KubeDescribe("Cluster level logging using Elasticsearch [Featu
4949
framework.ExpectNoError(err, "Fluentd deployed incorrectly")
5050

5151
By("Running synthetic logger")
52-
pod := createLoggingPod(f, podName, "", 10*60, 10*time.Minute)
52+
pod := startNewLoggingPod(f, podName, "", 10*60, 10*time.Minute)
5353
defer f.PodClient().Delete(podName, &meta_v1.DeleteOptions{})
5454
err = framework.WaitForPodNameRunningInNamespace(f.ClientSet, podName, f.Namespace.Name)
5555
framework.ExpectNoError(err, fmt.Sprintf("Should've successfully waited for pod %s to be running", podName))

test/e2e/cluster-logging/sd.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ var _ = framework.KubeDescribe("Cluster level logging implemented by Stackdriver
4747
framework.ExpectNoError(err, "Fluentd deployed incorrectly")
4848

4949
By("Running synthetic logger")
50-
pod := createLoggingPod(f, podName, "", 10*60, 10*time.Minute)
50+
pod := startNewLoggingPod(f, podName, "", 10*60, 10*time.Minute)
5151
defer f.PodClient().Delete(podName, &meta_v1.DeleteOptions{})
5252
err = framework.WaitForPodNameRunningInNamespace(f.ClientSet, podName, f.Namespace.Name)
5353
framework.ExpectNoError(err, fmt.Sprintf("Should've successfully waited for pod %s to be running", podName))

test/e2e/cluster-logging/sd_events.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ var _ = framework.KubeDescribe("Cluster level logging implemented by Stackdriver
7373
By("Running pods to generate events while waiting for some of them to be ingested")
7474
wait.PollUntil(eventCreationInterval, func() (bool, error) {
7575
podName := "synthlogger"
76-
createLoggingPod(f, podName, "", 1, 1*time.Second)
76+
startNewLoggingPod(f, podName, "", 1, 1*time.Second)
7777
defer f.PodClient().Delete(podName, &meta_v1.DeleteOptions{})
7878
err = framework.WaitForPodSuccessInNamespace(f.ClientSet, podName, f.Namespace.Name)
7979
if err != nil {

test/e2e/cluster-logging/sd_soak.go

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@ const (
3434
// considered acceptable. Once per hour is fine for now, as long as it
3535
// doesn't loose too much logs.
3636
maxAllowedRestartsPerHour = 1.0
37+
// lastPodIngestionSlack is the amount of time to wait for the last pod's
38+
// logs to be ingested by the logging agent.
39+
lastPodIngestionSlack = 5 * time.Minute
3740
)
3841

3942
var _ = framework.KubeDescribe("Cluster level logging implemented by Stackdriver [Feature:StackdriverLogging] [Soak]", func() {
@@ -57,21 +60,28 @@ var _ = framework.KubeDescribe("Cluster level logging implemented by Stackdriver
5760
float64(time.Hour) * maxAllowedRestartsPerHour))
5861

5962
podRunDelay := time.Duration(int64(jobDuration) / int64(maxPodCount))
60-
podRunCount := int(testDuration.Seconds())/int(podRunDelay.Seconds()) - 1
63+
podRunCount := maxPodCount*(int(testDuration/jobDuration)-1) + 1
6164
linesPerPod := linesPerPodPerSecond * int(jobDuration.Seconds())
6265

63-
By("Running short-living pods")
6466
pods := []*loggingPod{}
6567
for runIdx := 0; runIdx < podRunCount; runIdx++ {
6668
for nodeIdx, node := range nodes {
6769
podName := fmt.Sprintf("job-logs-generator-%d-%d-%d-%d", maxPodCount, linesPerPod, runIdx, nodeIdx)
68-
pods = append(pods, createLoggingPod(f, podName, node.Name, linesPerPod, jobDuration))
69-
70-
defer f.PodClient().Delete(podName, &meta_v1.DeleteOptions{})
70+
pods = append(pods, newLoggingPod(podName, node.Name, linesPerPod, jobDuration))
7171
}
72-
time.Sleep(podRunDelay)
7372
}
7473

74+
By("Running short-living pods")
75+
go func() {
76+
for _, pod := range pods {
77+
pod.Start(f)
78+
time.Sleep(podRunDelay)
79+
defer f.PodClient().Delete(pod.Name, &meta_v1.DeleteOptions{})
80+
}
81+
// Waiting until the last pod has completed
82+
time.Sleep(jobDuration - podRunDelay + lastPodIngestionSlack)
83+
}()
84+
7585
By("Waiting for all log lines to be ingested")
7686
config := &loggingTestConfig{
7787
LogsProvider: sdLogsProvider,

test/e2e/cluster-logging/utils.go

Lines changed: 39 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -50,20 +50,20 @@ var (
5050
logEntryMessageRegex = regexp.MustCompile("(?:I\\d+ \\d+:\\d+:\\d+.\\d+ \\d+ logs_generator.go:67] )?(\\d+) .*")
5151
)
5252

53-
// Type to track the progress of logs generating pod
54-
type loggingPod struct {
55-
// Name of the pod
56-
Name string
57-
// Cache of ingested and read entries
58-
Occurrences map[int]logEntry
59-
// Number of lines expected to be ingested from this pod
60-
ExpectedLinesNumber int
61-
}
62-
6353
type logEntry struct {
6454
Payload string
6555
}
6656

57+
func (entry logEntry) getLogEntryNumber() (int, bool) {
58+
submatch := logEntryMessageRegex.FindStringSubmatch(entry.Payload)
59+
if submatch == nil || len(submatch) < 2 {
60+
return 0, false
61+
}
62+
63+
lineNumber, err := strconv.Atoi(submatch[1])
64+
return lineNumber, err == nil
65+
}
66+
6767
type logsProvider interface {
6868
Init() error
6969
Cleanup()
@@ -79,31 +79,37 @@ type loggingTestConfig struct {
7979
MaxAllowedFluentdRestarts int
8080
}
8181

82-
func (entry logEntry) getLogEntryNumber() (int, bool) {
83-
submatch := logEntryMessageRegex.FindStringSubmatch(entry.Payload)
84-
if submatch == nil || len(submatch) < 2 {
85-
return 0, false
86-
}
87-
88-
lineNumber, err := strconv.Atoi(submatch[1])
89-
return lineNumber, err == nil
82+
// Type to track the progress of logs generating pod
83+
type loggingPod struct {
84+
// Name equals to the pod name and the container name.
85+
Name string
86+
// NodeName is the name of the node this pod will be
87+
// assigned to. Can be empty.
88+
NodeName string
89+
// Occurrences is a cache of ingested and read entries.
90+
Occurrences map[int]logEntry
91+
// ExpectedLinesNumber is the number of lines that are
92+
// expected to be ingested from this pod.
93+
ExpectedLinesNumber int
94+
// RunDuration is how long the pod will live.
95+
RunDuration time.Duration
9096
}
9197

92-
func createLoggingPod(f *framework.Framework, podName string, nodeName string, totalLines int, loggingDuration time.Duration) *loggingPod {
93-
framework.Logf("Starting pod %s", podName)
94-
createLogsGeneratorPod(f, podName, nodeName, totalLines, loggingDuration)
95-
98+
func newLoggingPod(podName string, nodeName string, totalLines int, loggingDuration time.Duration) *loggingPod {
9699
return &loggingPod{
97100
Name: podName,
101+
NodeName: nodeName,
98102
Occurrences: make(map[int]logEntry),
99103
ExpectedLinesNumber: totalLines,
104+
RunDuration: loggingDuration,
100105
}
101106
}
102107

103-
func createLogsGeneratorPod(f *framework.Framework, podName string, nodeName string, linesCount int, duration time.Duration) {
108+
func (p *loggingPod) Start(f *framework.Framework) {
109+
framework.Logf("Starting pod %s", p.Name)
104110
f.PodClient().Create(&api_v1.Pod{
105111
ObjectMeta: meta_v1.ObjectMeta{
106-
Name: podName,
112+
Name: p.Name,
107113
},
108114
Spec: api_v1.PodSpec{
109115
RestartPolicy: api_v1.RestartPolicyNever,
@@ -114,11 +120,11 @@ func createLogsGeneratorPod(f *framework.Framework, podName string, nodeName str
114120
Env: []api_v1.EnvVar{
115121
{
116122
Name: "LOGS_GENERATOR_LINES_TOTAL",
117-
Value: strconv.Itoa(linesCount),
123+
Value: strconv.Itoa(p.ExpectedLinesNumber),
118124
},
119125
{
120126
Name: "LOGS_GENERATOR_DURATION",
121-
Value: duration.String(),
127+
Value: p.RunDuration.String(),
122128
},
123129
},
124130
Resources: api_v1.ResourceRequirements{
@@ -133,11 +139,17 @@ func createLogsGeneratorPod(f *framework.Framework, podName string, nodeName str
133139
},
134140
},
135141
},
136-
NodeName: nodeName,
142+
NodeName: p.NodeName,
137143
},
138144
})
139145
}
140146

147+
func startNewLoggingPod(f *framework.Framework, podName string, nodeName string, totalLines int, loggingDuration time.Duration) *loggingPod {
148+
pod := newLoggingPod(podName, nodeName, totalLines, loggingDuration)
149+
pod.Start(f)
150+
return pod
151+
}
152+
141153
func waitForSomeLogs(f *framework.Framework, config *loggingTestConfig) error {
142154
podHasIngestedLogs := make([]bool, len(config.Pods))
143155
podWithIngestedLogsCount := 0

0 commit comments

Comments
 (0)