CLOUDWATCH_CLIENT_USER_AGENT_PREFIX =
+ ConfigOptions.key("aws.cloudwatch.client.user-agent-prefix")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("CloudWatch identifier for user agent prefix.");
+}
diff --git a/flink-connector-aws/flink-connector-cloudwatch/src/main/java/org/apache/flink/connector/cloudwatch/sink/CloudWatchExceptionClassifiers.java b/flink-connector-aws/flink-connector-cloudwatch/src/main/java/org/apache/flink/connector/cloudwatch/sink/CloudWatchExceptionClassifiers.java
new file mode 100644
index 000000000..565071633
--- /dev/null
+++ b/flink-connector-aws/flink-connector-cloudwatch/src/main/java/org/apache/flink/connector/cloudwatch/sink/CloudWatchExceptionClassifiers.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cloudwatch.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.aws.sink.throwable.AWSExceptionClassifierUtil;
+import org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier;
+
+import software.amazon.awssdk.services.cloudwatch.model.CloudWatchException;
+import software.amazon.awssdk.services.cloudwatch.model.ResourceNotFoundException;
+
+/**
+ * Class containing set of {@link FatalExceptionClassifier} for {@link
+ * software.amazon.awssdk.services.cloudwatch.model.CloudWatchException}.
+ */
+@Internal
+public class CloudWatchExceptionClassifiers {
+
+ public static FatalExceptionClassifier getNotAuthorizedExceptionClassifier() {
+ return AWSExceptionClassifierUtil.withAWSServiceErrorCode(
+ CloudWatchException.class,
+ "NotAuthorized",
+ err ->
+ new CloudWatchSinkException(
+ "Encountered non-recoverable exception: NotAuthorized", err));
+ }
+
+ public static FatalExceptionClassifier getAccessDeniedExceptionClassifier() {
+ return AWSExceptionClassifierUtil.withAWSServiceErrorCode(
+ CloudWatchException.class,
+ "AccessDeniedException",
+ err ->
+ new CloudWatchSinkException(
+ "Encountered non-recoverable exception: AccessDeniedException",
+ err));
+ }
+
+ public static FatalExceptionClassifier getResourceNotFoundExceptionClassifier() {
+ return FatalExceptionClassifier.withRootCauseOfType(
+ ResourceNotFoundException.class,
+ err ->
+ new CloudWatchSinkException(
+ "Encountered non-recoverable exception relating to not being able to find the specified resources",
+ err));
+ }
+}
diff --git a/flink-connector-aws/flink-connector-cloudwatch/src/main/java/org/apache/flink/connector/cloudwatch/sink/CloudWatchSink.java b/flink-connector-aws/flink-connector-cloudwatch/src/main/java/org/apache/flink/connector/cloudwatch/sink/CloudWatchSink.java
new file mode 100644
index 000000000..c6bd448fc
--- /dev/null
+++ b/flink-connector-aws/flink-connector-cloudwatch/src/main/java/org/apache/flink/connector/cloudwatch/sink/CloudWatchSink.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cloudwatch.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.connector.base.sink.AsyncSinkBase;
+import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.connector.cloudwatch.sink.client.CloudWatchAsyncClientProvider;
+import org.apache.flink.connector.cloudwatch.sink.client.SdkClientProvider;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.Preconditions;
+
+import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Properties;
+
+/**
+ * A CloudWatch Sink that performs async requests against a destination CloudWatch using the
+ * buffering protocol specified in {@link AsyncSinkBase}.
+ *
+ * The sink internally uses a {@link CloudWatchAsyncClient} to communicate with the AWS endpoint.
+ *
+ *
Please see the writer implementation in {@link CloudWatchSinkWriter}
+ *
+ *
maxBatchSize is calculated in terms of requestEntries (MetricWriteRequest). In CloudWatch,
+ * each PutMetricDataRequest can have maximum of 1000 MetricWriteRequest, hence the maxBatchSize
+ * cannot be more than 1000.
+ *
+ *
maxBatchSizeInBytes is calculated in terms of size of requestEntries (MetricWriteRequest). In
+ * CloudWatch, each PutMetricDataRequest can have maximum of 1MB of payload, hence the
+ * maxBatchSizeInBytes cannot be more than 1 MB.
+ *
+ * @param Type of the elements handled by this sink
+ */
+@PublicEvolving
+public class CloudWatchSink extends AsyncSinkBase {
+
+ private final String namespace;
+ private final Properties cloudWatchClientProperties;
+ private transient SdkClientProvider asyncClientSdkClientProviderOverride;
+ private final InvalidMetricDataRetryMode invalidMetricDataRetryMode;
+
+ CloudWatchSink(
+ ElementConverter elementConverter,
+ Integer maxBatchSize,
+ Integer maxInFlightRequests,
+ Integer maxBufferedRequests,
+ Long maxBatchSizeInBytes,
+ Long maxTimeInBufferMS,
+ Long maxRecordSizeInBytes,
+ String namespace,
+ Properties cloudWatchClientProperties,
+ InvalidMetricDataRetryMode invalidMetricDataRetryMode) {
+ super(
+ elementConverter,
+ maxBatchSize,
+ maxInFlightRequests,
+ maxBufferedRequests,
+ maxBatchSizeInBytes,
+ maxTimeInBufferMS,
+ maxRecordSizeInBytes);
+ this.namespace =
+ Preconditions.checkNotNull(
+ namespace,
+ "The cloudWatch namespace must not be null when initializing the CloudWatch Sink.");
+ this.invalidMetricDataRetryMode = invalidMetricDataRetryMode;
+ Preconditions.checkArgument(
+ !this.namespace.isEmpty(),
+ "The cloudWatch namespace must be set when initializing the CloudWatch Sink.");
+
+ Preconditions.checkArgument(
+ (this.getMaxBatchSize() <= 1000),
+ "The cloudWatch MaxBatchSize must not be greater than 1,000.");
+
+ Preconditions.checkArgument(
+ (this.getMaxBatchSizeInBytes() <= 1000 * 1000),
+ "The cloudWatch MaxBatchSizeInBytes must not be greater than 1,000,000.");
+
+ this.cloudWatchClientProperties = cloudWatchClientProperties;
+ }
+
+ /**
+ * Create a {@link CloudWatchSinkBuilder} to allow the fluent construction of a new {@code
+ * CloudWatchSink}.
+ *
+ * @param type of incoming records
+ * @return {@link CloudWatchSinkBuilder}
+ */
+ public static CloudWatchSinkBuilder builder() {
+ return new CloudWatchSinkBuilder<>();
+ }
+
+ @Override
+ public StatefulSinkWriter> createWriter(
+ InitContext context) throws IOException {
+ return new CloudWatchSinkWriter<>(
+ getElementConverter(),
+ context,
+ getMaxBatchSize(),
+ getMaxInFlightRequests(),
+ getMaxBufferedRequests(),
+ getMaxBatchSizeInBytes(),
+ getMaxTimeInBufferMS(),
+ getMaxRecordSizeInBytes(),
+ namespace,
+ getAsyncClientProvider(cloudWatchClientProperties),
+ Collections.emptyList(),
+ invalidMetricDataRetryMode);
+ }
+
+ @Override
+ public StatefulSinkWriter> restoreWriter(
+ InitContext context,
+ Collection> recoveredState)
+ throws IOException {
+ return new CloudWatchSinkWriter<>(
+ getElementConverter(),
+ context,
+ getMaxBatchSize(),
+ getMaxInFlightRequests(),
+ getMaxBufferedRequests(),
+ getMaxBatchSizeInBytes(),
+ getMaxTimeInBufferMS(),
+ getMaxRecordSizeInBytes(),
+ namespace,
+ getAsyncClientProvider(cloudWatchClientProperties),
+ recoveredState,
+ invalidMetricDataRetryMode);
+ }
+
+ private SdkClientProvider getAsyncClientProvider(
+ Properties clientProperties) {
+ if (asyncClientSdkClientProviderOverride != null) {
+ return asyncClientSdkClientProviderOverride;
+ }
+ return new CloudWatchAsyncClientProvider(clientProperties);
+ }
+
+ @Internal
+ @VisibleForTesting
+ void setCloudWatchAsyncClientProvider(
+ SdkClientProvider asyncClientSdkClientProviderOverride) {
+ this.asyncClientSdkClientProviderOverride = asyncClientSdkClientProviderOverride;
+ }
+
+ @Override
+ public SimpleVersionedSerializer>
+ getWriterStateSerializer() {
+ return new CloudWatchStateSerializer();
+ }
+}
diff --git a/flink-connector-aws/flink-connector-cloudwatch/src/main/java/org/apache/flink/connector/cloudwatch/sink/CloudWatchSinkBuilder.java b/flink-connector-aws/flink-connector-cloudwatch/src/main/java/org/apache/flink/connector/cloudwatch/sink/CloudWatchSinkBuilder.java
new file mode 100644
index 000000000..e91db051a
--- /dev/null
+++ b/flink-connector-aws/flink-connector-cloudwatch/src/main/java/org/apache/flink/connector/cloudwatch/sink/CloudWatchSinkBuilder.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cloudwatch.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.connector.base.sink.AsyncSinkBaseBuilder;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+
+import software.amazon.awssdk.http.Protocol;
+
+import java.util.Optional;
+import java.util.Properties;
+
+import static org.apache.flink.connector.aws.config.AWSConfigConstants.HTTP_PROTOCOL_VERSION;
+import static software.amazon.awssdk.http.Protocol.HTTP1_1;
+
+/**
+ * Builder to construct {@link CloudWatchSink}.
+ *
+ * The following example shows the minimum setup to create a {@link CloudWatchSink} that writes
+ * String values to a CloudWatch named cloudWatchUrl.
+ *
+ *
{@code
+ * Properties sinkProperties = new Properties();
+ * sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1");
+ *
+ * CloudWatchSink cloudWatchSink =
+ * CloudWatchSink.builder()
+ * .setNamespace("namespace")
+ * .setCloudWatchClientProperties(sinkProperties)
+ * .build();
+ * }
+ *
+ * If the following parameters are not set in this builder, the following defaults will be used:
+ *
+ *
+ * - {@code maxBatchSize} will be 100
+ *
- {@code maxInFlightRequests} will be 50
+ *
- {@code maxBufferedRequests} will be 5000
+ *
- {@code maxBatchSizeInBytes} will be 100 KB
+ *
- {@code maxTimeInBufferMs} will be 5000ms
+ *
- {@code maxRecordSizeInBytes} will be 1 KB
+ *
- {@code invalidMetricDataRetryMode} will be FAIL_ON_ERROR
+ *
+ *
+ * @param type of elements that should be persisted in the destination
+ */
+@PublicEvolving
+public class CloudWatchSinkBuilder
+ extends AsyncSinkBaseBuilder> {
+
+ private static final int DEFAULT_MAX_BATCH_SIZE = 100;
+ private static final int DEFAULT_MAX_IN_FLIGHT_REQUESTS = 50;
+ private static final int DEFAULT_MAX_BUFFERED_REQUESTS = 5_000;
+ private static final long DEFAULT_MAX_BATCH_SIZE_IN_B = 100 * 1000;
+ private static final long DEFAULT_MAX_TIME_IN_BUFFER_MS = 5000;
+ private static final long DEFAULT_MAX_RECORD_SIZE_IN_B = 1000;
+ private static final InvalidMetricDataRetryMode DEFAULT_INVALID_METRIC_DATA_RETRY_MODE =
+ InvalidMetricDataRetryMode.FAIL_ON_ERROR;
+ private static final Protocol DEFAULT_HTTP_PROTOCOL = HTTP1_1;
+
+ private String namespace;
+ private Properties cloudWatchClientProperties;
+ private InvalidMetricDataRetryMode invalidMetricDataRetryMode;
+
+ private ElementConverter elementConverter;
+
+ public CloudWatchSinkBuilder setElementConverter(
+ final ElementConverter elementConverter) {
+ this.elementConverter = elementConverter;
+ return this;
+ }
+
+ public CloudWatchSinkBuilder setNamespace(String namespace) {
+ this.namespace = namespace;
+ return this;
+ }
+
+ /**
+ * If writing to CloudWatch results in a failure being returned due to Invalid Metric Data
+ * provided, the retry mode will be determined based on this config.
+ *
+ * @param invalidMetricDataRetryMode retry mode
+ * @return {@link CloudWatchSinkBuilder} itself
+ */
+ public CloudWatchSinkBuilder setInvalidMetricDataRetryMode(
+ InvalidMetricDataRetryMode invalidMetricDataRetryMode) {
+ this.invalidMetricDataRetryMode = invalidMetricDataRetryMode;
+ return this;
+ }
+
+ /**
+ * A set of properties used by the sink to create the CloudWatch client. This may be used to set
+ * the aws region, credentials etc. See the docs for usage and syntax.
+ *
+ * @param cloudWatchClientProps CloudWatch client properties
+ * @return {@link CloudWatchSinkBuilder} itself
+ */
+ public CloudWatchSinkBuilder setCloudWatchClientProperties(
+ final Properties cloudWatchClientProps) {
+ cloudWatchClientProperties = cloudWatchClientProps;
+ return this;
+ }
+
+ protected InvalidMetricDataRetryMode getInvalidMetricDataRetryMode() {
+ return this.invalidMetricDataRetryMode;
+ }
+
+ @VisibleForTesting
+ Properties getClientPropertiesWithDefaultHttpProtocol() {
+ Properties clientProperties =
+ Optional.ofNullable(cloudWatchClientProperties).orElse(new Properties());
+ clientProperties.putIfAbsent(HTTP_PROTOCOL_VERSION, DEFAULT_HTTP_PROTOCOL.toString());
+ return clientProperties;
+ }
+
+ @Override
+ public CloudWatchSink build() {
+ return new CloudWatchSink<>(
+ Optional.ofNullable(elementConverter)
+ .orElse(
+ (DefaultMetricWriteRequestElementConverter)
+ DefaultMetricWriteRequestElementConverter.builder()
+ .build()),
+ Optional.ofNullable(getMaxBatchSize()).orElse(DEFAULT_MAX_BATCH_SIZE),
+ Optional.ofNullable(getMaxInFlightRequests())
+ .orElse(DEFAULT_MAX_IN_FLIGHT_REQUESTS),
+ Optional.ofNullable(getMaxBufferedRequests()).orElse(DEFAULT_MAX_BUFFERED_REQUESTS),
+ Optional.ofNullable(getMaxBatchSizeInBytes()).orElse(DEFAULT_MAX_BATCH_SIZE_IN_B),
+ Optional.ofNullable(getMaxTimeInBufferMS()).orElse(DEFAULT_MAX_TIME_IN_BUFFER_MS),
+ Optional.ofNullable(getMaxRecordSizeInBytes()).orElse(DEFAULT_MAX_RECORD_SIZE_IN_B),
+ namespace,
+ getClientPropertiesWithDefaultHttpProtocol(),
+ Optional.ofNullable(getInvalidMetricDataRetryMode())
+ .orElse(DEFAULT_INVALID_METRIC_DATA_RETRY_MODE));
+ }
+}
diff --git a/flink-connector-aws/flink-connector-cloudwatch/src/main/java/org/apache/flink/connector/cloudwatch/sink/CloudWatchSinkException.java b/flink-connector-aws/flink-connector-cloudwatch/src/main/java/org/apache/flink/connector/cloudwatch/sink/CloudWatchSinkException.java
new file mode 100644
index 000000000..e07b2ce09
--- /dev/null
+++ b/flink-connector-aws/flink-connector-cloudwatch/src/main/java/org/apache/flink/connector/cloudwatch/sink/CloudWatchSinkException.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cloudwatch.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * A {@link RuntimeException} wrapper indicating the exception was thrown from the CloudWatch Sink.
+ */
+@PublicEvolving
+class CloudWatchSinkException extends RuntimeException {
+
+ public CloudWatchSinkException(final String message) {
+ super(message);
+ }
+
+ public CloudWatchSinkException(final String message, final Throwable cause) {
+ super(message, cause);
+ }
+
+ /**
+ * When the flag {@code failOnError} is set in {@link CloudWatchSinkWriter}, this exception is
+ * raised as soon as any exception occurs when writing to CloudWatch.
+ */
+ static class CloudWatchFailFastSinkException extends CloudWatchSinkException {
+
+ private static final String ERROR_MESSAGE =
+ "Encountered an exception while persisting records, not retrying due to {failOnError} being set.";
+
+ public CloudWatchFailFastSinkException() {
+ super(ERROR_MESSAGE);
+ }
+
+ public CloudWatchFailFastSinkException(final String errorMessage) {
+ super(errorMessage);
+ }
+
+ public CloudWatchFailFastSinkException(final String errorMessage, final Throwable cause) {
+ super(errorMessage, cause);
+ }
+
+ public CloudWatchFailFastSinkException(final Throwable cause) {
+ super(ERROR_MESSAGE, cause);
+ }
+ }
+}
diff --git a/flink-connector-aws/flink-connector-cloudwatch/src/main/java/org/apache/flink/connector/cloudwatch/sink/CloudWatchSinkWriter.java b/flink-connector-aws/flink-connector-cloudwatch/src/main/java/org/apache/flink/connector/cloudwatch/sink/CloudWatchSinkWriter.java
new file mode 100644
index 000000000..75e04e2af
--- /dev/null
+++ b/flink-connector-aws/flink-connector-cloudwatch/src/main/java/org/apache/flink/connector/cloudwatch/sink/CloudWatchSinkWriter.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cloudwatch.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.connector.aws.sink.throwable.AWSExceptionHandler;
+import org.apache.flink.connector.aws.util.AWSGeneralUtil;
+import org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier;
+import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
+import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration;
+import org.apache.flink.connector.cloudwatch.sink.client.SdkClientProvider;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
+import software.amazon.awssdk.services.cloudwatch.model.InvalidFormatException;
+import software.amazon.awssdk.services.cloudwatch.model.InvalidParameterCombinationException;
+import software.amazon.awssdk.services.cloudwatch.model.InvalidParameterValueException;
+import software.amazon.awssdk.services.cloudwatch.model.MissingRequiredParameterException;
+import software.amazon.awssdk.services.cloudwatch.model.PutMetricDataRequest;
+import software.amazon.awssdk.services.cloudwatch.model.PutMetricDataResponse;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers.getInvalidCredentialsExceptionClassifier;
+import static org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers.getSdkClientMisconfiguredExceptionClassifier;
+import static org.apache.flink.connector.base.sink.writer.AsyncSinkFatalExceptionClassifiers.getInterruptedExceptionClassifier;
+
+/**
+ * Sink writer created by {@link CloudWatchSink} to write to CloudWatch. More details on the
+ * operation of this sink writer may be found in the doc for {@link CloudWatchSink}. More details on
+ * the internals of this sink writer may be found in {@link AsyncSinkWriter}.
+ *
+ * The {@link CloudWatchAsyncClient} used here may be configured in the standard way for the AWS
+ * SDK 2.x. e.g. the provision of {@code AWS_REGION}, {@code AWS_ACCESS_KEY_ID} and {@code
+ * AWS_SECRET_ACCESS_KEY} through environment variables etc.
+ *
+ *
The batching of this sink is in terms of estimated size of a MetricWriteRequest in bytes. The
+ * goal is adaptively increase the number of MetricWriteRequest in each batch, a
+ * PutMetricDataRequest sent to CloudWatch, to a configurable number. This is the parameter
+ * maxBatchSizeInBytes which is calculated based on getSizeInBytes.
+ *
+ *
getSizeInBytes(requestEntry) returns the size of a MetricWriteRequest in bytes which is
+ * estimated by assuming each double takes 8 bytes, and each string char takes 1 byte (UTF_8
+ * encoded).
+ */
+@Internal
+class CloudWatchSinkWriter extends AsyncSinkWriter {
+
+ private static final Logger LOG = LoggerFactory.getLogger(CloudWatchSinkWriter.class);
+
+ private final SdkClientProvider clientProvider;
+
+ private static final AWSExceptionHandler CLOUDWATCH_FATAL_EXCEPTION_HANDLER =
+ AWSExceptionHandler.withClassifier(
+ FatalExceptionClassifier.createChain(
+ getInterruptedExceptionClassifier(),
+ getInvalidCredentialsExceptionClassifier(),
+ CloudWatchExceptionClassifiers.getResourceNotFoundExceptionClassifier(),
+ CloudWatchExceptionClassifiers.getAccessDeniedExceptionClassifier(),
+ CloudWatchExceptionClassifiers.getNotAuthorizedExceptionClassifier(),
+ getSdkClientMisconfiguredExceptionClassifier()));
+
+ private static final List CLOUDWATCH_INVALID_METRIC_EXCEPTION =
+ Arrays.asList(
+ InvalidFormatException.class,
+ InvalidParameterCombinationException.class,
+ InvalidParameterValueException.class,
+ MissingRequiredParameterException.class);
+
+ private static final int BYTES_PER_DOUBLE = 8;
+
+ private final Counter numRecordsOutErrorsCounter;
+
+ /* Namespace of CloudWatch metric */
+ private final String namespace;
+
+ /* The sink writer metric group */
+ private final SinkWriterMetricGroup metrics;
+
+ /* The retry mode when an invalid metric caused failure */
+ private final InvalidMetricDataRetryMode invalidMetricDataRetryMode;
+
+ CloudWatchSinkWriter(
+ ElementConverter elementConverter,
+ Sink.InitContext context,
+ int maxBatchSize,
+ int maxInFlightRequests,
+ int maxBufferedRequests,
+ long maxBatchSizeInBytes,
+ long maxTimeInBufferMS,
+ long maxRecordSizeInBytes,
+ String namespace,
+ SdkClientProvider clientProvider,
+ Collection> initialStates,
+ InvalidMetricDataRetryMode invalidMetricDataRetryMode) {
+ super(
+ elementConverter,
+ context,
+ AsyncSinkWriterConfiguration.builder()
+ .setMaxBatchSize(maxBatchSize)
+ .setMaxBatchSizeInBytes(maxBatchSizeInBytes)
+ .setMaxInFlightRequests(maxInFlightRequests)
+ .setMaxBufferedRequests(maxBufferedRequests)
+ .setMaxTimeInBufferMS(maxTimeInBufferMS)
+ .setMaxRecordSizeInBytes(maxRecordSizeInBytes)
+ .build(),
+ initialStates);
+ this.namespace = namespace;
+ this.metrics = context.metricGroup();
+ this.numRecordsOutErrorsCounter = metrics.getNumRecordsOutErrorsCounter();
+ this.clientProvider = clientProvider;
+ this.invalidMetricDataRetryMode = invalidMetricDataRetryMode;
+ }
+
+ @Override
+ protected void submitRequestEntries(
+ List requestEntries,
+ Consumer> requestResult) {
+
+ final PutMetricDataRequest putMetricDataRequest =
+ PutMetricDataRequest.builder()
+ .namespace(namespace)
+ .metricData(
+ requestEntries.stream()
+ .map(MetricWriteRequest::toMetricDatum)
+ .collect(Collectors.toList()))
+ .strictEntityValidation(true)
+ .build();
+
+ CompletableFuture future =
+ clientProvider.getClient().putMetricData(putMetricDataRequest);
+
+ // CloudWatchAsyncClient PutMetricDataRequest does not fail partially.
+ // If there is only one poison pill Metric Datum, the whole request fails fully.
+ future.whenComplete(
+ (response, err) -> {
+ if (err != null) {
+ handleFullyFailedRequest(err, requestEntries, requestResult);
+ } else {
+ requestResult.accept(Collections.emptyList());
+ }
+ })
+ .exceptionally(
+ ex -> {
+ getFatalExceptionCons()
+ .accept(
+ new CloudWatchSinkException
+ .CloudWatchFailFastSinkException(
+ ex.getMessage(), ex));
+ return null;
+ });
+ }
+
+ @Override
+ protected long getSizeInBytes(MetricWriteRequest requestEntry) {
+ long sizeInBytes = 0L;
+ sizeInBytes += requestEntry.getMetricName().getBytes(StandardCharsets.UTF_8).length;
+ sizeInBytes += (long) requestEntry.getValues().length * BYTES_PER_DOUBLE;
+ sizeInBytes += (long) requestEntry.getCounts().length * BYTES_PER_DOUBLE;
+
+ sizeInBytes +=
+ Arrays.stream(requestEntry.getDimensions())
+ .map(
+ dimension ->
+ dimension.getName().getBytes(StandardCharsets.UTF_8).length
+ + dimension
+ .getValue()
+ .getBytes(StandardCharsets.UTF_8)
+ .length)
+ .reduce(Integer::sum)
+ .orElse(0);
+
+ sizeInBytes +=
+ Stream.of(
+ requestEntry.getStatisticSum(),
+ requestEntry.getStatisticCount(),
+ requestEntry.getStatisticMax(),
+ requestEntry.getStatisticMin(),
+ requestEntry.getStorageResolution(),
+ requestEntry.getTimestamp())
+ .filter(Objects::nonNull)
+ .count()
+ * BYTES_PER_DOUBLE;
+
+ return sizeInBytes;
+ }
+
+ @Override
+ public void close() {
+ AWSGeneralUtil.closeResources(clientProvider);
+ }
+
+ private void handleFullyFailedRequest(
+ Throwable err,
+ List requestEntries,
+ Consumer> requestResult) {
+
+ numRecordsOutErrorsCounter.inc(requestEntries.size());
+ boolean isFatal =
+ CLOUDWATCH_FATAL_EXCEPTION_HANDLER.consumeIfFatal(err, getFatalExceptionCons());
+ if (isFatal) {
+ return;
+ }
+
+ if (CLOUDWATCH_INVALID_METRIC_EXCEPTION.stream()
+ .anyMatch(clazz -> ExceptionUtils.findThrowable(err, clazz).isPresent())) {
+ handleInvalidMetricRequest(err, requestEntries, requestResult);
+ return;
+ }
+
+ LOG.warn(
+ "CloudWatch Sink failed to write and will retry all {} entries to CloudWatch, First request was {}",
+ requestEntries.size(),
+ requestEntries.get(0).toString(),
+ err);
+ requestResult.accept(requestEntries);
+ }
+
+ private void handleInvalidMetricRequest(
+ Throwable err,
+ List requestEntries,
+ Consumer> requestResult) {
+
+ switch (invalidMetricDataRetryMode) {
+ case SKIP_METRIC_ON_ERROR:
+ LOG.warn(
+ "CloudWatch Sink failed to write and will skip sending all {} entries",
+ requestEntries.size(),
+ err);
+ requestResult.accept(Collections.emptyList());
+ return;
+ case FAIL_ON_ERROR:
+ LOG.warn(
+ "CloudWatch Sink failed to write all {} entries and will fail the job",
+ requestEntries.size(),
+ err);
+ getFatalExceptionCons()
+ .accept(new CloudWatchSinkException.CloudWatchFailFastSinkException(err));
+ return;
+ case RETRY:
+ default:
+ LOG.warn(
+ "CloudWatch Sink failed to write and will retry all {} entries to CloudWatch, First request was {}",
+ requestEntries.size(),
+ requestEntries.get(0).toString(),
+ err);
+ requestResult.accept(requestEntries);
+ }
+ }
+}
diff --git a/flink-connector-aws/flink-connector-cloudwatch/src/main/java/org/apache/flink/connector/cloudwatch/sink/CloudWatchStateSerializer.java b/flink-connector-aws/flink-connector-cloudwatch/src/main/java/org/apache/flink/connector/cloudwatch/sink/CloudWatchStateSerializer.java
new file mode 100644
index 000000000..ca5fea94a
--- /dev/null
+++ b/flink-connector-aws/flink-connector-cloudwatch/src/main/java/org/apache/flink/connector/cloudwatch/sink/CloudWatchStateSerializer.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cloudwatch.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.base.sink.writer.AsyncSinkWriterStateSerializer;
+
+import software.amazon.awssdk.utils.StringUtils;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+/** CloudWatch implementation {@link AsyncSinkWriterStateSerializer}. */
+@Internal
+public class CloudWatchStateSerializer extends AsyncSinkWriterStateSerializer {
+ @Override
+ protected void serializeRequestToStream(
+ final MetricWriteRequest request, final DataOutputStream out) throws IOException {
+
+ serializeMetricName(request.getMetricName(), out);
+ serializeValues(request, out);
+ serializeCounts(request, out);
+ serializeDimensions(request, out);
+ serializeDoubleValue(request.getStatisticMax(), out);
+ serializeDoubleValue(request.getStatisticMin(), out);
+ serializeDoubleValue(request.getStatisticSum(), out);
+ serializeDoubleValue(request.getStatisticCount(), out);
+ serializeUnit(request.getUnit(), out);
+ serializeStorageResolution(request.getStorageResolution(), out);
+ serializeTimestamp(request.getTimestamp(), out);
+ }
+
+ @Override
+ protected MetricWriteRequest deserializeRequestFromStream(
+ final long requestSize, final DataInputStream in) throws IOException {
+ MetricWriteRequest.Builder builder = MetricWriteRequest.builder();
+
+ builder.withMetricName(deserializeMetricName(in));
+ deserializeValues(in).forEach(builder::addValue);
+ deserializeCounts(in).forEach(builder::addCount);
+ deserializeDimensions(in)
+ .forEach(
+ dimension ->
+ builder.addDimension(dimension.getName(), dimension.getValue()));
+ Optional.ofNullable(deserializeDoubleValue(in)).ifPresent(builder::withStatisticMax);
+ Optional.ofNullable(deserializeDoubleValue(in)).ifPresent(builder::withStatisticMin);
+ Optional.ofNullable(deserializeDoubleValue(in)).ifPresent(builder::withStatisticSum);
+ Optional.ofNullable(deserializeDoubleValue(in)).ifPresent(builder::withStatisticCount);
+ Optional.ofNullable(deserializeUnit(in)).ifPresent(builder::withUnit);
+ Optional.ofNullable(deserializeStorageResolution(in))
+ .ifPresent(builder::withStorageResolution);
+ Optional.ofNullable(deserializeTimestamp(in)).ifPresent(builder::withTimestamp);
+
+ return builder.build();
+ }
+
+ private void serializeMetricName(String metricName, DataOutputStream out) throws IOException {
+ out.writeUTF(metricName);
+ }
+
+ private String deserializeMetricName(DataInputStream in) throws IOException {
+ return in.readUTF();
+ }
+
+ private void serializeValues(MetricWriteRequest request, DataOutputStream out)
+ throws IOException {
+ boolean hasValues = request.getValues() != null && request.getValues().length > 0;
+ out.writeBoolean(hasValues);
+ if (hasValues) {
+ out.writeInt(request.getValues().length);
+ for (Double value : request.getValues()) {
+ out.writeDouble(value);
+ }
+ }
+ }
+
+ private List deserializeValues(DataInputStream in) throws IOException {
+ return getDoubles(in);
+ }
+
+ private void serializeCounts(MetricWriteRequest request, DataOutputStream out)
+ throws IOException {
+ boolean hasCounts = request.getCounts() != null && request.getCounts().length > 0;
+ out.writeBoolean(hasCounts);
+ if (hasCounts) {
+ out.writeInt(request.getCounts().length);
+ for (Double count : request.getCounts()) {
+ out.writeDouble(count);
+ }
+ }
+ }
+
+ private List deserializeCounts(DataInputStream in) throws IOException {
+ return getDoubles(in);
+ }
+
+ private List getDoubles(DataInputStream in) throws IOException {
+ boolean hasDoubles = in.readBoolean();
+ if (!hasDoubles) {
+ return new ArrayList<>();
+ }
+
+ int size = in.readInt();
+ List doubles = new ArrayList<>();
+ for (int i = 0; i < size; i++) {
+ doubles.add(in.readDouble());
+ }
+ return doubles;
+ }
+
+ private void serializeDimensions(MetricWriteRequest request, DataOutputStream out)
+ throws IOException {
+ boolean hasDimensions =
+ request.getDimensions() != null && request.getDimensions().length > 0;
+ out.writeBoolean(hasDimensions);
+ if (hasDimensions) {
+ out.writeInt(request.getDimensions().length);
+ for (MetricWriteRequest.Dimension dimension : request.getDimensions()) {
+ out.writeUTF(dimension.getName());
+ out.writeUTF(dimension.getValue());
+ }
+ }
+ }
+
+ private List deserializeDimensions(DataInputStream in)
+ throws IOException {
+ boolean hasDimensions = in.readBoolean();
+ if (!hasDimensions) {
+ return new ArrayList<>();
+ }
+
+ int size = in.readInt();
+ List dimensions = new ArrayList<>();
+ for (int i = 0; i < size; i++) {
+ String name = in.readUTF();
+ String value = in.readUTF();
+ dimensions.add(new MetricWriteRequest.Dimension(name, value));
+ }
+ return dimensions;
+ }
+
+ private void serializeDoubleValue(Double value, DataOutputStream out) throws IOException {
+ boolean hasValue = value != null;
+
+ out.writeBoolean(hasValue);
+ if (hasValue) {
+ out.writeDouble(value);
+ }
+ }
+
+ private Double deserializeDoubleValue(DataInputStream in) throws IOException {
+ boolean hasValue = in.readBoolean();
+ if (!hasValue) {
+ return null;
+ }
+ return in.readDouble();
+ }
+
+ private void serializeUnit(String unit, DataOutputStream out) throws IOException {
+ boolean hasUnit = !StringUtils.isEmpty(unit);
+ out.writeBoolean(hasUnit);
+ if (hasUnit) {
+ out.writeUTF(unit);
+ }
+ }
+
+ private String deserializeUnit(DataInputStream in) throws IOException {
+ boolean hasUnit = in.readBoolean();
+ if (!hasUnit) {
+ return null;
+ }
+ return in.readUTF();
+ }
+
+ private void serializeStorageResolution(Integer storageRes, DataOutputStream out)
+ throws IOException {
+ boolean hasStorageResolution = storageRes != null;
+ out.writeBoolean(hasStorageResolution);
+ if (hasStorageResolution) {
+ out.writeInt(storageRes);
+ }
+ }
+
+ private Integer deserializeStorageResolution(DataInputStream in) throws IOException {
+ boolean hasStorageResolution = in.readBoolean();
+ if (!hasStorageResolution) {
+ return null;
+ }
+ return in.readInt();
+ }
+
+ private void serializeTimestamp(Instant timestamp, DataOutputStream out) throws IOException {
+ boolean hasTimestamp = timestamp != null;
+ out.writeBoolean(hasTimestamp);
+ if (hasTimestamp) {
+ out.writeLong(timestamp.toEpochMilli());
+ }
+ }
+
+ private Instant deserializeTimestamp(DataInputStream in) throws IOException {
+ boolean hasTimestamp = in.readBoolean();
+ if (!hasTimestamp) {
+ return null;
+ }
+ return Instant.ofEpochMilli(in.readLong());
+ }
+
+ @Override
+ public int getVersion() {
+ return 1;
+ }
+}
diff --git a/flink-connector-aws/flink-connector-cloudwatch/src/main/java/org/apache/flink/connector/cloudwatch/sink/DefaultMetricWriteRequestElementConverter.java b/flink-connector-aws/flink-connector-cloudwatch/src/main/java/org/apache/flink/connector/cloudwatch/sink/DefaultMetricWriteRequestElementConverter.java
new file mode 100644
index 000000000..09bd354f3
--- /dev/null
+++ b/flink-connector-aws/flink-connector-cloudwatch/src/main/java/org/apache/flink/connector/cloudwatch/sink/DefaultMetricWriteRequestElementConverter.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cloudwatch.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+
+import java.util.UnknownFormatConversionException;
+
+/**
+ * An implementation of the {@link ElementConverter} that uses the AWS CloudWatch SDK v2. The user
+ * needs to provide the {@code InputT} in the form of {@link MetricWriteRequest}.
+ *
+ * The InputT needs to be structured and only supports {@link MetricWriteRequest}
+ */
+@Internal
+public class DefaultMetricWriteRequestElementConverter
+ extends MetricWriteRequestElementConverter {
+
+ @Override
+ public MetricWriteRequest apply(InputT element, SinkWriter.Context context) {
+ if (!(element instanceof MetricWriteRequest)) {
+ throw new UnknownFormatConversionException(
+ "DefaultMetricWriteRequestElementConverter only supports MetricWriteRequest element.");
+ }
+
+ return (MetricWriteRequest) element;
+ }
+
+ public static Builder builder() {
+ return new Builder<>();
+ }
+
+ /** A builder for the DefaultMetricWriteRequestElementConverter. */
+ public static class Builder {
+
+ public DefaultMetricWriteRequestElementConverter build() {
+ return new DefaultMetricWriteRequestElementConverter<>();
+ }
+ }
+}
diff --git a/flink-connector-aws/flink-connector-cloudwatch/src/main/java/org/apache/flink/connector/cloudwatch/sink/InvalidMetricDataRetryMode.java b/flink-connector-aws/flink-connector-cloudwatch/src/main/java/org/apache/flink/connector/cloudwatch/sink/InvalidMetricDataRetryMode.java
new file mode 100644
index 000000000..75c9eb5be
--- /dev/null
+++ b/flink-connector-aws/flink-connector-cloudwatch/src/main/java/org/apache/flink/connector/cloudwatch/sink/InvalidMetricDataRetryMode.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cloudwatch.sink;
+
+/** RetryMode to handle invalid CloudWatch Metric Data. */
+public enum InvalidMetricDataRetryMode {
+ FAIL_ON_ERROR,
+ SKIP_METRIC_ON_ERROR,
+ RETRY
+}
diff --git a/flink-connector-aws/flink-connector-cloudwatch/src/main/java/org/apache/flink/connector/cloudwatch/sink/MetricWriteRequest.java b/flink-connector-aws/flink-connector-cloudwatch/src/main/java/org/apache/flink/connector/cloudwatch/sink/MetricWriteRequest.java
new file mode 100644
index 000000000..d884665d2
--- /dev/null
+++ b/flink-connector-aws/flink-connector-cloudwatch/src/main/java/org/apache/flink/connector/cloudwatch/sink/MetricWriteRequest.java
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cloudwatch.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import software.amazon.awssdk.annotations.NotNull;
+import software.amazon.awssdk.services.cloudwatch.model.MetricDatum;
+import software.amazon.awssdk.services.cloudwatch.model.StatisticSet;
+
+import java.io.Serializable;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/** Pojo used as sink input, containing information for a single CloudWatch MetricDatum Object. */
+@PublicEvolving
+public class MetricWriteRequest implements Serializable {
+
+ public String metricName;
+ public Dimension[] dimensions;
+ public Double[] values;
+ public Double[] counts;
+ public Instant timestamp;
+ public String unit;
+ public Integer storageResolution;
+ public Double statisticMax;
+ public Double statisticMin;
+ public Double statisticSum;
+ public Double statisticCount;
+
+ public MetricWriteRequest() {}
+
+ public MetricWriteRequest(
+ @NotNull String metricName,
+ Dimension[] dimensions,
+ Double[] values,
+ Double[] counts,
+ Instant timestamp,
+ String unit,
+ Integer storageResolution,
+ Double statisticMax,
+ Double statisticMin,
+ Double statisticSum,
+ Double statisticCount) {
+ this.metricName = metricName;
+ this.dimensions = dimensions;
+ this.values = values;
+ this.counts = counts;
+ this.timestamp = timestamp;
+ this.unit = unit;
+ this.storageResolution = storageResolution;
+ this.statisticMax = statisticMax;
+ this.statisticMin = statisticMin;
+ this.statisticSum = statisticSum;
+ this.statisticCount = statisticCount;
+ }
+
+ public Dimension[] getDimensions() {
+ return dimensions;
+ }
+
+ public Double[] getValues() {
+ return values;
+ }
+
+ public String getMetricName() {
+ return metricName;
+ }
+
+ public Double[] getCounts() {
+ return counts;
+ }
+
+ public Instant getTimestamp() {
+ return timestamp;
+ }
+
+ public String getUnit() {
+ return unit;
+ }
+
+ public Integer getStorageResolution() {
+ return storageResolution;
+ }
+
+ public Double getStatisticMax() {
+ return statisticMax;
+ }
+
+ public Double getStatisticMin() {
+ return statisticMin;
+ }
+
+ public Double getStatisticSum() {
+ return statisticSum;
+ }
+
+ public Double getStatisticCount() {
+ return statisticCount;
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ /** A single Dimension. */
+ public static class Dimension implements Serializable {
+ public String name;
+ public String value;
+
+ public Dimension() {}
+
+ public Dimension(String name, String value) {
+ this.name = name;
+ this.value = value;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public String getValue() {
+ return value;
+ }
+
+ public software.amazon.awssdk.services.cloudwatch.model.Dimension toCloudWatchDimension() {
+ return software.amazon.awssdk.services.cloudwatch.model.Dimension.builder()
+ .name(this.name)
+ .value(this.value)
+ .build();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ Dimension label = (Dimension) o;
+ return new EqualsBuilder()
+ .append(name, label.name)
+ .append(value, label.value)
+ .isEquals();
+ }
+
+ @Override
+ public int hashCode() {
+ return new HashCodeBuilder(17, 37).append(name).append(value).toHashCode();
+ }
+ }
+
+ /** Builder for sink input pojo instance. */
+ public static final class Builder {
+ private final List dimensions = new ArrayList<>();
+ private final List values = new ArrayList<>();
+ private final List counts = new ArrayList<>();
+ private String metricName;
+ private Instant timestamp;
+ private String unit;
+ private Integer storageResolution;
+ private Double statisticMax;
+ private Double statisticMin;
+ private Double statisticSum;
+ private Double statisticCount;
+
+ private Builder() {}
+
+ public Builder withMetricName(String metricName) {
+ this.metricName = metricName;
+ return this;
+ }
+
+ public Builder addDimension(String dimensionName, String dimensionValue) {
+ dimensions.add(new Dimension(dimensionName, dimensionValue));
+ return this;
+ }
+
+ public Builder addValue(Double value) {
+ values.add(value);
+ return this;
+ }
+
+ public Builder addCount(Double count) {
+ counts.add(count);
+ return this;
+ }
+
+ public Builder withTimestamp(Instant timestamp) {
+ this.timestamp = timestamp;
+ return this;
+ }
+
+ public Builder withUnit(String unit) {
+ this.unit = unit;
+ return this;
+ }
+
+ public Builder withStorageResolution(Integer storageResolution) {
+ this.storageResolution = storageResolution;
+ return this;
+ }
+
+ public Builder withStatisticMax(Double statisticMax) {
+ this.statisticMax = statisticMax;
+ return this;
+ }
+
+ public Builder withStatisticMin(Double statisticMin) {
+ this.statisticMin = statisticMin;
+ return this;
+ }
+
+ public Builder withStatisticSum(Double statisticSum) {
+ this.statisticSum = statisticSum;
+ return this;
+ }
+
+ public Builder withStatisticCount(Double statisticCount) {
+ this.statisticCount = statisticCount;
+ return this;
+ }
+
+ public MetricWriteRequest build() {
+ return new MetricWriteRequest(
+ metricName,
+ dimensions.toArray(new Dimension[dimensions.size()]),
+ values.toArray(new Double[values.size()]),
+ counts.toArray(new Double[counts.size()]),
+ timestamp,
+ unit,
+ storageResolution,
+ statisticMax,
+ statisticMin,
+ statisticSum,
+ statisticCount);
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ MetricWriteRequest that = (MetricWriteRequest) o;
+ return Arrays.equals(dimensions, that.dimensions)
+ && Arrays.equals(values, that.values)
+ && Objects.equals(metricName, that.metricName)
+ && Objects.equals(timestamp, that.timestamp)
+ && Arrays.equals(counts, that.counts)
+ && Objects.equals(unit, that.unit)
+ && Objects.equals(storageResolution, that.storageResolution)
+ && Objects.equals(statisticMax, that.statisticMax)
+ && Objects.equals(statisticMin, that.statisticMin)
+ && Objects.equals(statisticSum, that.statisticSum)
+ && Objects.equals(statisticCount, that.statisticCount);
+ }
+
+ @Override
+ public int hashCode() {
+ Integer result = Objects.hash(metricName);
+ result = 31 * result + Arrays.hashCode(dimensions);
+ result = 31 * result + Arrays.hashCode(values);
+ result = 31 * result + Arrays.hashCode(counts);
+ result = 31 * result + Objects.hash(timestamp);
+ result = 31 * result + Objects.hashCode(unit);
+ result = 31 * result + Objects.hashCode(storageResolution);
+ result = 31 * result + Objects.hashCode(statisticMax);
+ result = 31 * result + Objects.hashCode(statisticMin);
+ result = 31 * result + Objects.hashCode(statisticSum);
+ result = 31 * result + Objects.hashCode(statisticCount);
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return toMetricDatum().toString();
+ }
+
+ public MetricDatum toMetricDatum() {
+ MetricDatum.Builder builder =
+ MetricDatum.builder().metricName(metricName).values(values).counts(counts);
+
+ Optional.ofNullable(timestamp).ifPresent(builder::timestamp);
+ Optional.ofNullable(unit).ifPresent(builder::unit);
+ Optional.ofNullable(storageResolution).ifPresent(builder::storageResolution);
+
+ if (dimensions.length > 0) {
+ builder.dimensions(
+ Arrays.stream(dimensions)
+ .map(Dimension::toCloudWatchDimension)
+ .collect(Collectors.toList()));
+ }
+
+ if (statisticMax != null
+ | statisticMin != null
+ | statisticSum != null
+ | statisticCount != null) {
+ StatisticSet.Builder statisticBuilder = StatisticSet.builder();
+ Optional.ofNullable(statisticMax).ifPresent(statisticBuilder::maximum);
+ Optional.ofNullable(statisticMin).ifPresent(statisticBuilder::minimum);
+ Optional.ofNullable(statisticSum).ifPresent(statisticBuilder::sum);
+ Optional.ofNullable(statisticCount).ifPresent(statisticBuilder::sampleCount);
+ builder.statisticValues(statisticBuilder.build());
+ }
+
+ return builder.build();
+ }
+}
diff --git a/flink-connector-aws/flink-connector-cloudwatch/src/main/java/org/apache/flink/connector/cloudwatch/sink/MetricWriteRequestElementConverter.java b/flink-connector-aws/flink-connector-cloudwatch/src/main/java/org/apache/flink/connector/cloudwatch/sink/MetricWriteRequestElementConverter.java
new file mode 100644
index 000000000..1e3f6fc34
--- /dev/null
+++ b/flink-connector-aws/flink-connector-cloudwatch/src/main/java/org/apache/flink/connector/cloudwatch/sink/MetricWriteRequestElementConverter.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cloudwatch.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+
+/**
+ * An implementation of the {@link ElementConverter} that uses the AWS CloudWatch SDK v2. The user
+ * only needs to provide the {@code InputT} to transform it into a {@link MetricWriteRequest} that
+ * may be persisted.
+ */
+@Internal
+public abstract class MetricWriteRequestElementConverter
+ implements ElementConverter {
+
+ @Override
+ public abstract MetricWriteRequest apply(InputT element, SinkWriter.Context context);
+}
diff --git a/flink-connector-aws/flink-connector-cloudwatch/src/main/java/org/apache/flink/connector/cloudwatch/sink/client/CloudWatchAsyncClientProvider.java b/flink-connector-aws/flink-connector-cloudwatch/src/main/java/org/apache/flink/connector/cloudwatch/sink/client/CloudWatchAsyncClientProvider.java
new file mode 100644
index 000000000..61312af7e
--- /dev/null
+++ b/flink-connector-aws/flink-connector-cloudwatch/src/main/java/org/apache/flink/connector/cloudwatch/sink/client/CloudWatchAsyncClientProvider.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cloudwatch.sink.client;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.aws.util.AWSClientUtil;
+import org.apache.flink.connector.aws.util.AWSGeneralUtil;
+import org.apache.flink.connector.cloudwatch.sink.CloudWatchConfigConstants;
+
+import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
+import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
+
+import java.util.Properties;
+
+/** Provides a {@link CloudWatchAsyncClient}. */
+@Internal
+public class CloudWatchAsyncClientProvider implements SdkClientProvider {
+
+ private final SdkAsyncHttpClient httpClient;
+ private final CloudWatchAsyncClient cloudWatchAsyncClient;
+
+ public CloudWatchAsyncClientProvider(Properties clientProperties) {
+ this.httpClient = AWSGeneralUtil.createAsyncHttpClient(clientProperties);
+ this.cloudWatchAsyncClient = buildClient(clientProperties, httpClient);
+ }
+
+ @Override
+ public CloudWatchAsyncClient getClient() {
+ return cloudWatchAsyncClient;
+ }
+
+ @Override
+ public void close() {
+ AWSGeneralUtil.closeResources(httpClient, cloudWatchAsyncClient);
+ }
+
+ private CloudWatchAsyncClient buildClient(
+ Properties cloudWatchClientProperties, SdkAsyncHttpClient httpClient) {
+ AWSGeneralUtil.validateAwsCredentials(cloudWatchClientProperties);
+
+ return AWSClientUtil.createAwsAsyncClient(
+ cloudWatchClientProperties,
+ httpClient,
+ CloudWatchAsyncClient.builder(),
+ CloudWatchConfigConstants.BASE_CLOUDWATCH_USER_AGENT_PREFIX_FORMAT.key(),
+ CloudWatchConfigConstants.CLOUDWATCH_CLIENT_USER_AGENT_PREFIX.key());
+ }
+}
diff --git a/flink-connector-aws/flink-connector-cloudwatch/src/main/java/org/apache/flink/connector/cloudwatch/sink/client/SdkClientProvider.java b/flink-connector-aws/flink-connector-cloudwatch/src/main/java/org/apache/flink/connector/cloudwatch/sink/client/SdkClientProvider.java
new file mode 100644
index 000000000..1497d4533
--- /dev/null
+++ b/flink-connector-aws/flink-connector-cloudwatch/src/main/java/org/apache/flink/connector/cloudwatch/sink/client/SdkClientProvider.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cloudwatch.sink.client;
+
+import org.apache.flink.annotation.Internal;
+
+import software.amazon.awssdk.core.SdkClient;
+import software.amazon.awssdk.utils.SdkAutoCloseable;
+
+/** Provides a {@link SdkClient}. */
+@Internal
+public interface SdkClientProvider extends SdkAutoCloseable {
+
+ /**
+ * Returns {@link T}.
+ *
+ * @return the AWS SDK client
+ */
+ T getClient();
+}
diff --git a/flink-connector-aws/flink-connector-cloudwatch/src/main/java/org/apache/flink/connector/cloudwatch/table/CloudWatchConnectorOptions.java b/flink-connector-aws/flink-connector-cloudwatch/src/main/java/org/apache/flink/connector/cloudwatch/table/CloudWatchConnectorOptions.java
new file mode 100644
index 000000000..3e7336b2e
--- /dev/null
+++ b/flink-connector-aws/flink-connector-cloudwatch/src/main/java/org/apache/flink/connector/cloudwatch/table/CloudWatchConnectorOptions.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cloudwatch.table;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.connector.cloudwatch.sink.InvalidMetricDataRetryMode;
+
+import java.util.List;
+import java.util.Map;
+
+/** Options for the CloudWatch connector. */
+@PublicEvolving
+public class CloudWatchConnectorOptions {
+ public static final ConfigOption AWS_REGION =
+ ConfigOptions.key("aws.region")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("AWS region of used Cloudwatch metric.");
+
+ public static final ConfigOption