Skip to content

Commit f95e7ee

Browse files
authored
Add ClickHouse monitor e2e test (#66)
Signed-off-by: Yanjun Zhou <[email protected]>
1 parent 0f59f96 commit f95e7ee

File tree

5 files changed

+131
-6
lines changed

5 files changed

+131
-6
lines changed

Diff for: ci/jenkins/test-vmc.sh

+1-1
Original file line numberDiff line numberDiff line change
@@ -335,7 +335,7 @@ function deliver_antrea {
335335

336336
control_plane_ip="$(kubectl get nodes -o wide --no-headers=true | awk -v role="$CONTROL_PLANE_NODE_ROLE" '$3 ~ role {print $6}')"
337337

338-
${GIT_CHECKOUT_DIR}/hack/generate-manifest.sh --no-grafana > ${GIT_CHECKOUT_DIR}/build/yamls/flow-visibility.yml
338+
${GIT_CHECKOUT_DIR}/hack/generate-manifest.sh --no-grafana --ch-size 100Mi --ch-monitor-threshold 0.1 > ${GIT_CHECKOUT_DIR}/build/yamls/flow-visibility.yml
339339
${GIT_CHECKOUT_DIR}/hack/generate-manifest.sh --no-grafana --spark-operator > ${GIT_CHECKOUT_DIR}/build/yamls/flow-visibility-with-spark.yml
340340

341341
${SCP_WITH_ANTREA_CI_KEY} $GIT_CHECKOUT_DIR/build/charts/theia/crds/clickhouse-operator-install-bundle.yaml capv@${control_plane_ip}:~

Diff for: ci/kind/test-e2e-kind.sh

+1-1
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ function print_usage {
3939
TESTBED_CMD=$(dirname $0)"/kind-setup.sh"
4040
YML_DIR=$(dirname $0)"/../../build/yamls"
4141
FLOW_AGGREGATOR_CMD=$(dirname $0)"/../../hack/generate-manifest-flow-aggregator.sh"
42-
FLOW_VISIBILITY_CMD=$(dirname $0)"/../../hack/generate-manifest.sh --no-grafana"
42+
FLOW_VISIBILITY_CMD=$(dirname $0)"/../../hack/generate-manifest.sh --no-grafana --ch-size 100Mi --ch-monitor-threshold 0.1"
4343
FLOW_VISIBILITY_WITH_SPARK_CMD=$(dirname $0)"/../../hack/generate-manifest.sh --no-grafana --spark-operator"
4444
CH_OPERATOR_YML=$(dirname $0)"/../../build/charts/theia/crds/clickhouse-operator-install-bundle.yaml"
4545

Diff for: hack/generate-manifest.sh

+21-3
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,15 @@ function echoerr {
2323
_usage="Usage: $0 [--mode (dev|release)] [--keep] [--help|-h]
2424
Generate a YAML manifest for the Clickhouse-Grafana Flow-visibility Solution, using Helm and
2525
Kustomize, and print it to stdout.
26-
--mode (dev|release) Choose the configuration variant that you need (default is 'dev')
27-
--spark-operator Generate a manifest with Spark Operator enabled.
28-
--no-grafana Generate a manifest with Grafana disabled.
26+
--mode (dev|release) Choose the configuration variant that you need (default is 'dev')
27+
--spark-operator Generate a manifest with Spark Operator enabled.
28+
--no-grafana Generate a manifest with Grafana disabled.
29+
--ch-size <size> Deploy the ClickHouse with a specific storage size. Can be a
30+
plain integer or as a fixed-point number using one of these quantity
31+
suffixes: E, P, T, G, M, K. Or the power-of-two equivalents:
32+
Ei, Pi, Ti, Gi, Mi, Ki. (default is 8Gi)
33+
--ch-monitor-threshold <threshold> Deploy the ClickHouse monitor with a specific threshold. Can
34+
vary from 0 to 1. (default is 0.5)
2935
This tool uses Helm 3 (https://helm.sh/) and Kustomize (https://github.com/kubernetes-sigs/kustomize)
3036
to generate manifests for Theia. You can set the HELM and KUSTOMIZE environment variable to
3137
the path of the helm and kustomize binary you want us to use. Otherwise we will download the
@@ -43,6 +49,8 @@ function print_help {
4349
MODE="dev"
4450
SPARK_OP=false
4551
GRAFANA=true
52+
CH_SIZE="8Gi"
53+
CH_THRESHOLD=0.5
4654

4755
while [[ $# -gt 0 ]]
4856
do
@@ -61,6 +69,14 @@ case $key in
6169
GRAFANA=false
6270
shift 1
6371
;;
72+
--ch-size)
73+
CH_SIZE="$2"
74+
shift 2
75+
;;
76+
--ch-monitor-threshold)
77+
CH_THRESHOLD="$2"
78+
shift 2
79+
;;
6480
-h|--help)
6581
print_usage
6682
exit 0
@@ -114,6 +130,8 @@ fi
114130

115131
HELM_VALUES=()
116132

133+
HELM_VALUES+=("clickhouse.storage.size=$CH_SIZE" "clickhouse.monitor.threshold=$CH_THRESHOLD")
134+
117135
if [ "$MODE" == "dev" ] && [ -n "$IMG_NAME" ]; then
118136
HELM_VALUES+=("clickhouse.monitorImage.repository=$IMG_NAME")
119137
fi

Diff for: test/e2e/flowvisibility_test.go

+88-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ package e2e
1717
import (
1818
"encoding/json"
1919
"fmt"
20-
2120
"net"
2221
"strconv"
2322
"strings"
@@ -113,6 +112,9 @@ const (
113112
antreaIngressTableInitFlowCount = 6
114113
ingressTableInitFlowCount = 1
115114
egressTableInitFlowCount = 1
115+
monitorThreshold = 0.1
116+
monitorDeletePercentage = 0.5
117+
monitorIperfRounds = 80
116118
)
117119

118120
var (
@@ -462,6 +464,91 @@ func testHelper(t *testing.T, data *TestData, podAIPs, podBIPs, podCIPs, podDIPs
462464
checkRecordsForFlows(t, data, podAIPs.ipv4.String(), svcC.Spec.ClusterIP, isServiceIPv6, false, true, false, false)
463465
}
464466
})
467+
468+
// ClickHouseMonitor tests ensure ClickHouse monitor inspects the ClickHouse
469+
// Pod storage usage and deletes the data when the stoage usage grows above
470+
// the threshold.
471+
t.Run("ClickHouseMonitor", func(t *testing.T) {
472+
var flow testFlow
473+
if !isIPv6 {
474+
flow = testFlow{
475+
srcIP: podAIPs.ipv4.String(),
476+
dstIP: podBIPs.ipv4.String(),
477+
srcPodName: "perftest-a",
478+
dstPodName: "perftest-b",
479+
}
480+
} else {
481+
flow = testFlow{
482+
srcIP: podAIPs.ipv6.String(),
483+
dstIP: podBIPs.ipv6.String(),
484+
srcPodName: "perftest-a",
485+
dstPodName: "perftest-b",
486+
}
487+
}
488+
checkClickHouseMonitor(t, data, isIPv6, flow)
489+
})
490+
}
491+
492+
func checkClickHouseMonitor(t *testing.T, data *TestData, isIPv6 bool, flow testFlow) {
493+
checkClickHouseMonitorLogs(t, data, false, 0)
494+
var cmdStr string
495+
// iperf3 has a limit on maximum parallel streams at 128
496+
if !isIPv6 {
497+
cmdStr = fmt.Sprintf("iperf3 -u -c %s -P 128 -n 1", flow.dstIP)
498+
} else {
499+
cmdStr = fmt.Sprintf("iperf3 -u -6 -c %s -P 128 -n 1", flow.dstIP)
500+
}
501+
log.Infof("Generating flow records to exceed monitor threshold...")
502+
for i := 0; i < monitorIperfRounds; i++ {
503+
stdout, stderr, err := data.RunCommandFromPod(testNamespace, flow.srcPodName, "perftool", []string{"bash", "-c", cmdStr})
504+
require.NoErrorf(t, err, "Error when running iPerf3 client: %v,\nstdout:%s\nstderr:%s", err, stdout, stderr)
505+
}
506+
log.Infof("Waiting for the flows to be exported...")
507+
time.Sleep(30 * time.Second)
508+
// Get the number of records in database before the monitor deletes the records
509+
stdout, stderr, err := data.RunCommandFromPod(flowVisibilityNamespace, clickHousePodName, "clickhouse", []string{"bash", "-c", "clickhouse client -q \"SELECT COUNT() FROM default.flows\""})
510+
require.NoErrorf(t, err, "Error when querying ClickHouse server: %v,\nstdout:%s\nstderr:%s", err, stdout, stderr)
511+
numRecord, err := strconv.ParseInt(strings.TrimSuffix(stdout, "\n"), 10, 64)
512+
require.NoErrorf(t, err, "Failed when parsing the number of records %v", err)
513+
log.Infof("Waiting for the monitor to detect and clean up the ClickHouse storage")
514+
time.Sleep(2 * time.Minute)
515+
checkClickHouseMonitorLogs(t, data, true, numRecord)
516+
}
517+
518+
func checkClickHouseMonitorLogs(t *testing.T, data *TestData, deleted bool, numRecord int64) {
519+
logString, err := data.GetPodLogs(flowVisibilityNamespace, clickHousePodName,
520+
&corev1.PodLogOptions{
521+
Container: "clickhouse-monitor",
522+
},
523+
)
524+
require.NoErrorf(t, err, "Error when getting ClickHouse monitor logs: %v", err)
525+
// ClickHouse monitor log sample
526+
// "Memory usage" total=2062296555 used=42475 percentage=2.0595970980516912e-05
527+
logs := strings.Split(logString, "Memory usage")
528+
memoryUsageLog := strings.Split(logs[len(logs)-1], "\n")[0]
529+
assert.Contains(t, memoryUsageLog, "total=")
530+
assert.Contains(t, memoryUsageLog, "used=")
531+
assert.Contains(t, memoryUsageLog, "percentage=")
532+
percentage, err := strconv.ParseFloat(strings.Split(memoryUsageLog, "percentage=")[1], 64)
533+
require.NoErrorf(t, err, "Failed when parsing the memory usage percentage %v", err)
534+
if !deleted {
535+
assert.LessOrEqual(t, percentage, monitorThreshold)
536+
} else {
537+
assert.Greater(t, percentage, monitorThreshold)
538+
// Monitor deletes records from table flows and related MVs
539+
assert.Contains(t, logString, "ALTER TABLE default.flows DELETE WHERE timeInserted < toDateTime", "Monitor should delete records from Table flows")
540+
assert.Contains(t, logString, "ALTER TABLE default.flows_pod_view DELETE WHERE timeInserted < toDateTime", "Monitor should delete records from View flows_pod_view")
541+
assert.Contains(t, logString, "ALTER TABLE default.flows_node_view DELETE WHERE timeInserted < toDateTime", "Monitor should delete records from View flows_node_view")
542+
assert.Contains(t, logString, "ALTER TABLE default.flows_policy_view DELETE WHERE timeInserted < toDateTime", "Monitor should delete records from View flows_policy_view")
543+
assert.Contains(t, logString, "Skip rounds after a successful deletion", "Monitor should skip rounds after a successful deletion")
544+
require.Contains(t, logString, "SELECT timeInserted FROM default.flows LIMIT 1 OFFSET ", "Monitor should log the deletion SQL command")
545+
deletedRecordLog := strings.Split(logString, "SELECT timeInserted FROM default.flows LIMIT 1 OFFSET ")[1]
546+
deletedRecordLog = strings.Split(deletedRecordLog, "\n")[0]
547+
numDeletedRecord, err := strconv.ParseInt(deletedRecordLog, 10, 64)
548+
require.NoErrorf(t, err, "Failed when parsing the number of deleted records %v", err)
549+
550+
assert.InDeltaf(t, numDeletedRecord, float64(numRecord)*monitorDeletePercentage, float64(numDeletedRecord)*0.15, "Difference between expected and actual number of deleted Records should be lower than 15%")
551+
}
465552
}
466553

467554
func checkRecordsForFlows(t *testing.T, data *TestData, srcIP string, dstIP string, isIPv6 bool, isIntraNode bool, checkService bool, checkK8sNetworkPolicy bool, checkAntreaNetworkPolicy bool) {

Diff for: test/e2e/framework.go

+20
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"bytes"
1919
"context"
2020
"fmt"
21+
"io"
2122
"net"
2223
"regexp"
2324
"strconv"
@@ -649,6 +650,25 @@ func (data *TestData) PodWaitFor(timeout time.Duration, name, namespace string,
649650
return pod, nil
650651
}
651652

653+
// Gets pod logs from Pod
654+
func (data *TestData) GetPodLogs(namespace, name string, podLogOpts *corev1.PodLogOptions) (string, error) {
655+
var logString string
656+
req := data.clientset.CoreV1().Pods(namespace).GetLogs(name, podLogOpts)
657+
podLogs, err := req.Stream(context.TODO())
658+
if err != nil {
659+
return logString, fmt.Errorf("error when opening stream: %v", err)
660+
}
661+
defer podLogs.Close()
662+
663+
buf := new(bytes.Buffer)
664+
_, err = io.Copy(buf, podLogs)
665+
if err != nil {
666+
return logString, fmt.Errorf("error when copying data from Pod logs to buffer: %v", err)
667+
}
668+
logString = buf.String()
669+
return logString, nil
670+
}
671+
652672
func parsePodIPs(podIPStrings sets.String) (*PodIPs, error) {
653673
ips := new(PodIPs)
654674
for idx := range podIPStrings.List() {

0 commit comments

Comments
 (0)