Skip to content

Commit aa7efc3

Browse files
committed
[FLINK-37688] Implement Amazon CloudWatch Metric Sink Connector
1 parent 9d6746b commit aa7efc3

File tree

48 files changed

+4818
-0
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+4818
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
~ Licensed to the Apache Software Foundation (ASF) under one
4+
~ or more contributor license agreements. See the NOTICE file
5+
~ distributed with this work for additional information
6+
~ regarding copyright ownership. The ASF licenses this file
7+
~ to you under the Apache License, Version 2.0 (the
8+
~ "License"); you may not use this file except in compliance
9+
~ with the License. You may obtain a copy of the License at
10+
~
11+
~ http://www.apache.org/licenses/LICENSE-2.0
12+
~
13+
~ Unless required by applicable law or agreed to in writing, software
14+
~ distributed under the License is distributed on an "AS IS" BASIS,
15+
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
~ See the License for the specific language governing permissions and
17+
~ limitations under the License.
18+
-->
19+
20+
<project xmlns="http://maven.apache.org/POM/4.0.0"
21+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
22+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
23+
24+
<parent>
25+
<artifactId>flink-connector-aws-e2e-tests-parent</artifactId>
26+
<groupId>org.apache.flink</groupId>
27+
<version>5.1-SNAPSHOT</version>
28+
</parent>
29+
30+
<modelVersion>4.0.0</modelVersion>
31+
32+
<artifactId>flink-connector-aws-cloudwatch-e2e-tests</artifactId>
33+
<name>Flink : Connectors : AWS : E2E Tests : Amazon CloudWatch</name>
34+
<packaging>jar</packaging>
35+
36+
<properties>
37+
<aws.sdkv2.version>2.31.18</aws.sdkv2.version>
38+
</properties>
39+
40+
<dependencies>
41+
<dependency>
42+
<groupId>org.apache.flink</groupId>
43+
<artifactId>flink-streaming-java</artifactId>
44+
<version>${flink.version}</version>
45+
</dependency>
46+
47+
<dependency>
48+
<groupId>org.apache.flink</groupId>
49+
<artifactId>flink-sql-connector-cloudwatch</artifactId>
50+
<version>${project.version}</version>
51+
<scope>test</scope>
52+
</dependency>
53+
54+
<dependency>
55+
<groupId>org.apache.flink</groupId>
56+
<artifactId>flink-connector-cloudwatch</artifactId>
57+
<version>${project.version}</version>
58+
</dependency>
59+
60+
<dependency>
61+
<groupId>org.apache.flink</groupId>
62+
<artifactId>flink-connector-aws-base</artifactId>
63+
<version>${project.version}</version>
64+
</dependency>
65+
66+
<dependency>
67+
<groupId>org.apache.flink</groupId>
68+
<artifactId>flink-connector-aws-base</artifactId>
69+
<version>${project.version}</version>
70+
<type>test-jar</type>
71+
</dependency>
72+
73+
<dependency>
74+
<groupId>org.apache.flink</groupId>
75+
<artifactId>flink-connector-cloudwatch</artifactId>
76+
<version>${project.version}</version>
77+
<type>test-jar</type>
78+
</dependency>
79+
80+
<!-- Other third-party dependencies -->
81+
<dependency>
82+
<groupId>com.google.guava</groupId>
83+
<artifactId>guava</artifactId>
84+
<scope>test</scope>
85+
</dependency>
86+
87+
<dependency>
88+
<groupId>com.fasterxml.jackson.core</groupId>
89+
<artifactId>jackson-databind</artifactId>
90+
<scope>test</scope>
91+
</dependency>
92+
93+
<dependency>
94+
<groupId>com.fasterxml.jackson.datatype</groupId>
95+
<artifactId>jackson-datatype-jsr310</artifactId>
96+
<scope>test</scope>
97+
</dependency>
98+
99+
<dependency>
100+
<groupId>software.amazon.awssdk</groupId>
101+
<artifactId>s3</artifactId>
102+
<scope>test</scope>
103+
</dependency>
104+
105+
<dependency>
106+
<groupId>software.amazon.awssdk</groupId>
107+
<artifactId>cloudwatch</artifactId>
108+
<scope>test</scope>
109+
</dependency>
110+
</dependencies>
111+
112+
<build>
113+
<plugins>
114+
<plugin>
115+
<groupId>org.apache.maven.plugins</groupId>
116+
<artifactId>maven-dependency-plugin</artifactId>
117+
<executions>
118+
<execution>
119+
<id>copy</id>
120+
<phase>pre-integration-test</phase>
121+
<goals>
122+
<goal>copy</goal>
123+
</goals>
124+
</execution>
125+
</executions>
126+
<configuration>
127+
<artifactItems>
128+
<artifactItem>
129+
<groupId>org.apache.flink</groupId>
130+
<artifactId>flink-sql-connector-cloudwatch</artifactId>
131+
<version>${project.version}</version>
132+
<destFileName>sql-cloudwatch.jar</destFileName>
133+
<type>jar</type>
134+
<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
135+
</artifactItem>
136+
</artifactItems>
137+
</configuration>
138+
</plugin>
139+
</plugins>
140+
</build>
141+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.connector.cloudwatch.sink.test;
19+
20+
import org.apache.flink.connector.aws.testutils.AWSServicesTestUtils;
21+
import org.apache.flink.connector.aws.testutils.LocalstackContainer;
22+
import org.apache.flink.connector.aws.util.AWSGeneralUtil;
23+
import org.apache.flink.connector.cloudwatch.sink.CloudWatchSink;
24+
import org.apache.flink.connector.cloudwatch.sink.MetricWriteRequest;
25+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
26+
import org.apache.flink.test.junit5.MiniClusterExtension;
27+
28+
import org.junit.jupiter.api.AfterEach;
29+
import org.junit.jupiter.api.BeforeEach;
30+
import org.junit.jupiter.api.Test;
31+
import org.junit.jupiter.api.extension.ExtendWith;
32+
import org.slf4j.Logger;
33+
import org.slf4j.LoggerFactory;
34+
import org.testcontainers.containers.Network;
35+
import org.testcontainers.junit.jupiter.Container;
36+
import org.testcontainers.junit.jupiter.Testcontainers;
37+
import org.testcontainers.utility.DockerImageName;
38+
import software.amazon.awssdk.core.SdkSystemSetting;
39+
import software.amazon.awssdk.http.SdkHttpClient;
40+
import software.amazon.awssdk.services.cloudwatch.CloudWatchClient;
41+
import software.amazon.awssdk.services.cloudwatch.model.GetMetricDataRequest;
42+
import software.amazon.awssdk.services.cloudwatch.model.GetMetricDataResponse;
43+
import software.amazon.awssdk.services.cloudwatch.model.Metric;
44+
import software.amazon.awssdk.services.cloudwatch.model.MetricDataQuery;
45+
import software.amazon.awssdk.services.cloudwatch.model.MetricStat;
46+
47+
import java.time.Instant;
48+
import java.util.UUID;
49+
50+
import static org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.createConfig;
51+
import static org.assertj.core.api.Assertions.assertThat;
52+
53+
/**
54+
* Integration test for {@link CloudWatchSink}.
55+
*/
56+
@Testcontainers
57+
@ExtendWith(MiniClusterExtension.class)
58+
public class CloudWatchSinkITCase {
59+
private static final Logger LOG = LoggerFactory.getLogger(CloudWatchSinkITCase.class);
60+
61+
private static String testMetricName;
62+
private static final int NUMBER_OF_ELEMENTS = 50;
63+
64+
private static StreamExecutionEnvironment env;
65+
66+
private CloudWatchClient cloudWatchClient;
67+
private SdkHttpClient httpClient;
68+
private static final Network network = Network.newNetwork();
69+
private static final String LOCALSTACK_DOCKER_IMAGE_VERSION = "localstack/localstack:3.7.2";
70+
private static final String TEST_NAMESPACE = "test_namespace";
71+
72+
@Container
73+
private static final LocalstackContainer MOCK_CLOUDWATCH_CONTAINER =
74+
new LocalstackContainer(DockerImageName.parse(LOCALSTACK_DOCKER_IMAGE_VERSION))
75+
.withNetwork(network)
76+
.withNetworkAliases("localstack");
77+
78+
@BeforeEach
79+
public void setup() {
80+
System.setProperty(SdkSystemSetting.CBOR_ENABLED.property(), "false");
81+
82+
testMetricName = UUID.randomUUID().toString();
83+
84+
env = StreamExecutionEnvironment.getExecutionEnvironment();
85+
env.setParallelism(1);
86+
87+
httpClient = AWSServicesTestUtils.createHttpClient();
88+
89+
cloudWatchClient = AWSServicesTestUtils.createAwsSyncClient(
90+
MOCK_CLOUDWATCH_CONTAINER.getEndpoint(), httpClient, CloudWatchClient.builder());
91+
92+
LOG.info("Done setting up the localstack.");
93+
}
94+
95+
@AfterEach
96+
public void teardown() {
97+
System.clearProperty(SdkSystemSetting.CBOR_ENABLED.property());
98+
AWSGeneralUtil.closeResources(httpClient, cloudWatchClient);
99+
}
100+
101+
@Test
102+
public void testRandomDataSuccessfullyWritten() throws Exception {
103+
CloudWatchSink<MetricWriteRequest> cloudWatchSink = CloudWatchSink.<MetricWriteRequest>builder()
104+
.setNamespace(TEST_NAMESPACE)
105+
.setCloudWatchClientProperties(createConfig(MOCK_CLOUDWATCH_CONTAINER.getEndpoint()))
106+
.build();
107+
108+
Instant testTimestamp = Instant.now();
109+
110+
env.fromSequence(1, NUMBER_OF_ELEMENTS)
111+
.map(data -> MetricWriteRequest.builder()
112+
.withMetricName(testMetricName)
113+
.addValue(1.0d)
114+
.withTimestamp(testTimestamp)
115+
.build())
116+
.sinkTo(cloudWatchSink);
117+
118+
env.execute("Integration Test");
119+
120+
GetMetricDataResponse response = cloudWatchClient.getMetricData(GetMetricDataRequest.builder()
121+
.metricDataQueries(
122+
MetricDataQuery.builder().metricStat(getMetricStat("Sum")).build(),
123+
MetricDataQuery.builder().metricStat(getMetricStat("SampleCount")).build()
124+
)
125+
.startTime(testTimestamp.minusSeconds(300))
126+
.endTime(testTimestamp.plusSeconds(300))
127+
.build());
128+
129+
response.metricDataResults().forEach(
130+
result -> assertThat(result.values()).containsExactly(Double.valueOf(NUMBER_OF_ELEMENTS))
131+
);
132+
}
133+
134+
private static MetricStat getMetricStat(String stat) {
135+
return MetricStat.builder()
136+
.metric(
137+
Metric.builder()
138+
.namespace(TEST_NAMESPACE)
139+
.metricName(testMetricName)
140+
.build()
141+
)
142+
.stat(stat)
143+
.period(300)
144+
.build();
145+
}
146+
}

0 commit comments

Comments
 (0)