diff --git a/flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/connector/gcp/pubsub/EmulatedPubSubSourceTest.java b/flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/connector/gcp/pubsub/EmulatedPubSubSourceTest.java new file mode 100644 index 00000000..f3dadc59 --- /dev/null +++ b/flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/connector/gcp/pubsub/EmulatedPubSubSourceTest.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.connector.gcp.pubsub; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.connector.gcp.pubsub.source.PubSubSource; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.EmulatorCredentials; +import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.GCloudUnitTestBase; +import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.PubSubSubscriberFactoryForEmulator; +import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.PubsubHelper; + +import com.google.cloud.pubsub.v1.Publisher; +import com.google.protobuf.ByteString; +import com.google.pubsub.v1.PubsubMessage; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ExecutionException; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** Test of {@link PubSubSource} against the GCP Pub/Sub emulator SDK. */ +public class EmulatedPubSubSourceTest extends GCloudUnitTestBase { + private static final String PROJECT_NAME = "FLProject"; + private static final String TOPIC_NAME = "FLTopic"; + private static final String SUBSCRIPTION_NAME = "FLSubscription"; + + private static PubsubHelper pubsubHelper; + + @Before + public void setUp() throws Exception { + pubsubHelper = getPubsubHelper(); + pubsubHelper.createTopic(PROJECT_NAME, TOPIC_NAME); + pubsubHelper.createSubscription(PROJECT_NAME, SUBSCRIPTION_NAME, PROJECT_NAME, TOPIC_NAME); + } + + @After + public void tearDown() throws Exception { + pubsubHelper.deleteSubscription(PROJECT_NAME, SUBSCRIPTION_NAME); + pubsubHelper.deleteTopic(PROJECT_NAME, TOPIC_NAME); + } + + public void testFlinkSource(boolean testWithFailure) throws Exception { + // Create some messages and put them into pubsub + List input = + Arrays.asList( + "One", "Two", "Three", "Four", "Five", "Six", "Seven", "Eight", "Nine", + "Ten"); + + List messagesToSend = new ArrayList<>(input); + + // Publish the messages into PubSub + Publisher publisher = pubsubHelper.createPublisher(PROJECT_NAME, TOPIC_NAME); + messagesToSend.forEach( + s -> { + try { + publisher + .publish( + PubsubMessage.newBuilder() + .setData(ByteString.copyFromUtf8(s)) + .build()) + .get(); + } catch (InterruptedException | ExecutionException e) { + e.printStackTrace(); + } + }); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(100); + env.setParallelism(1); + if (testWithFailure) { + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 2000)); + } else { + env.setRestartStrategy(RestartStrategies.noRestart()); + } + + PubSubSource source = + PubSubSource.builder() + .setDeserializationSchema(new SimpleStringSchema()) + .setProjectName(PROJECT_NAME) + .setSubscriptionName(SUBSCRIPTION_NAME) + .setCredentials(EmulatorCredentials.getInstance()) + .setPubSubSubscriberFactory( + new PubSubSubscriberFactoryForEmulator( + getPubSubHostPort(), + PROJECT_NAME, + SUBSCRIPTION_NAME, + 10, + Duration.ofSeconds(1), + 3)) + .build(); + + DataStream fromPubSub = + env.fromSource(source, WatermarkStrategy.noWatermarks(), "test-pubsub-new-source"); + + if (testWithFailure) { + fromPubSub = fromPubSub.map(new FailureMapFunction<>(3)); + } + + // Asking for any more elements would wait forever, and there isn't a graceful way to + // indicate end of stream. + List output = fromPubSub.executeAndCollect(input.size()); + + assertEquals("Wrong number of elements", input.size(), output.size()); + for (String test : input) { + assertTrue("Missing " + test, output.contains(test)); + } + } + + private static class FailureMapFunction extends RichMapFunction { + private final long numberOfRecordsUntilFailure; + private long numberOfRecordsProcessed; + + private FailureMapFunction(long numberOfRecordsBeforeFailure) { + this.numberOfRecordsUntilFailure = numberOfRecordsBeforeFailure; + } + + @Override + public T map(T value) throws Exception { + numberOfRecordsProcessed++; + + if (shouldThrowException()) { + throw new Exception( + "Deliberately thrown exception to induce crash for failure recovery testing."); + } + return value; + } + + private boolean shouldThrowException() { + return getRuntimeContext().getAttemptNumber() <= 1 + && (numberOfRecordsProcessed >= numberOfRecordsUntilFailure); + } + } + + // IMPORTANT: This test makes use of things that happen in the emulated PubSub that + // are GUARANTEED to be different in the real Google hosted PubSub. + // So running these tests against the real thing will have a very high probability of + // failing. + // The assumptions: + // 1) The ordering of the messages is maintained. + // 2) Exactly once: We assume that every message we put in comes out exactly once. + // In the real PubSub there are a lot of situations (mostly failure/retry) where this is not + // true. + @Test + public void testFlinkSourceOk() throws Exception { + testFlinkSource(false); + } + + @Test + public void testFlinkSourceFailure() throws Exception { + testFlinkSource(true); + } +} diff --git a/flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/EmulatedPubSubNewSourceTest.java.orig b/flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/EmulatedPubSubNewSourceTest.java.orig new file mode 100644 index 00000000..83ade583 --- /dev/null +++ b/flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/EmulatedPubSubNewSourceTest.java.orig @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.gcp.pubsub; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamUtils; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.EmulatorCredentials; +import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.GCloudUnitTestBase; +import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.PubSubSubscriberFactoryForEmulator; +import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.PubsubHelper; +import org.apache.flink.streaming.connectors.gcp.pubsub.source.PubSubSource; + +import com.google.cloud.pubsub.v1.Publisher; +import com.google.protobuf.ByteString; +import com.google.pubsub.v1.PubsubMessage; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ExecutionException; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** Test of {@link PubSubSource} against the GCP Pub/Sub emulator SDK. */ +public class EmulatedPubSubNewSourceTest extends GCloudUnitTestBase { + private static final String PROJECT_NAME = "FLProject"; + private static final String TOPIC_NAME = "FLTopic"; + private static final String SUBSCRIPTION_NAME = "FLSubscription"; + + private static PubsubHelper pubsubHelper; + + @Before + public void setUp() throws Exception { + pubsubHelper = getPubsubHelper(); + pubsubHelper.createTopic(PROJECT_NAME, TOPIC_NAME); + pubsubHelper.createSubscription(PROJECT_NAME, SUBSCRIPTION_NAME, PROJECT_NAME, TOPIC_NAME); + } + + @After + public void tearDown() throws Exception { + pubsubHelper.deleteSubscription(PROJECT_NAME, SUBSCRIPTION_NAME); + pubsubHelper.deleteTopic(PROJECT_NAME, TOPIC_NAME); + } + + public void testFlinkSource(boolean testWithFailure) throws Exception { + // Create some messages and put them into pubsub + List input = + Arrays.asList( + "One", "Two", "Three", "Four", "Five", "Six", "Seven", "Eight", "Nine", + "Ten"); + + List messagesToSend = new ArrayList<>(input); + + // Publish the messages into PubSub + Publisher publisher = pubsubHelper.createPublisher(PROJECT_NAME, TOPIC_NAME); + messagesToSend.forEach( + s -> { + try { + publisher + .publish( + PubsubMessage.newBuilder() + .setData(ByteString.copyFromUtf8(s)) + .build()) + .get(); + } catch (InterruptedException | ExecutionException e) { + e.printStackTrace(); + } + }); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(100); + env.setParallelism(1); + if (testWithFailure) { + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 2000)); + } else { + env.setRestartStrategy(RestartStrategies.noRestart()); + } + + PubSubSource source = + PubSubSource.builder() + .setDeserializationSchema(new SimpleStringSchema()) + .setProjectName(PROJECT_NAME) + .setSubscriptionName(SUBSCRIPTION_NAME) + .setCredentials(EmulatorCredentials.getInstance()) + .setPubSubSubscriberFactory( + new PubSubSubscriberFactoryForEmulator( + getPubSubHostPort(), + PROJECT_NAME, + SUBSCRIPTION_NAME, + 10, + Duration.ofSeconds(1), + 3)) + .build(); + + DataStream fromPubSub = + env.fromSource(source, WatermarkStrategy.noWatermarks(), "test-pubsub-new-source"); + + if (testWithFailure) { + fromPubSub = fromPubSub.map(new FailureMapFunction<>(3)); + } + + List output = new ArrayList<>(); + DataStreamUtils.collect(fromPubSub).forEachRemaining(output::add); + + assertEquals("Wrong number of elements", input.size(), output.size()); + for (String test : input) { + assertTrue("Missing " + test, output.contains(test)); + } + } + + private class FailureMapFunction extends RichMapFunction { + private final long numberOfRecordsUntilFailure; + private long numberOfRecordsProcessed; + + private FailureMapFunction(long numberOfRecordsBeforeFailure) { + this.numberOfRecordsUntilFailure = numberOfRecordsBeforeFailure; + } + + @Override + public T map(T value) throws Exception { + numberOfRecordsProcessed++; + + if (shouldThrowException()) { + throw new Exception( + "Deliberately thrown exception to induce crash for failure recovery testing."); + } + return value; + } + + private boolean shouldThrowException() { + return getRuntimeContext().getAttemptNumber() <= 1 + && (numberOfRecordsProcessed >= numberOfRecordsUntilFailure); + } + } + + // IMPORTANT: This test makes use of things that happen in the emulated PubSub that + // are GUARANTEED to be different in the real Google hosted PubSub. + // So running these tests against the real thing will have a very high probability of + // failing. + // The assumptions: + // 1) The ordering of the messages is maintained. + // 2) Exactly once: We assume that every message we put in comes out exactly once. + // In the real PubSub there are a lot of situations (mostly failure/retry) where this is not + // true. + @Test + public void testFlinkSourceOk() throws Exception { + testFlinkSource(false); + } + + @Test + public void testFlinkSourceFailure() throws Exception { + testFlinkSource(true); + } +} diff --git a/flink-connector-gcp-pubsub/README.md b/flink-connector-gcp-pubsub/README.md new file mode 100644 index 00000000..06f2dfdc --- /dev/null +++ b/flink-connector-gcp-pubsub/README.md @@ -0,0 +1,98 @@ +# Flink Source for Google Cloud Pub/Sub + +This is a source implementation for receiving Google Cloud Pub/Sub messages in Flink with an +at-least-once guarantee. + +## Installation + +Add this dependency entry to your pom.xml to use the Google Cloud Pub/Sub source: + +```xml + + + org.apache.flink + flink-connector-gcp-pubsub + ${project.version} + +``` + +## Architecture + +### Split Enumerator + +An enumerator is supposed to discover splits and assign them to the readers so that they can do the +actual reading. However, the implementation of the `PubSubSourceEnumerator` doesn't do any real work +discovery because Pub/Sub doesn't expose any partitions from which splits could be constructed. +Instead, the enumerator assigns a static `PubSubSplit` to every `PubSubSourceReader` that joins so +that the readers can start pulling messages. The static source split doesn't contain split-specific +information like partitions/offsets because this information can not be obtained from Pub/Sub. +Because of the static source split, there is no state in the enumerator which would have to be +snapshot when checkpointing. + +### Source Reader + +A `PubSubSourceReader` uses Pub/Sub's pull mechanism to read new messages from the Pub/Sub +subscription specified by the user. In the case of parallel-running source readers in Flink, every +source reader is passed the same source split from the enumerator. Because of this, all source +readers use the same connection details and the same Pub/Sub subscription to receive messages. In +this case, Pub/Sub automatically load-balances messages between all source readers which pull from +the same subscription. This way, parallel processing is achieved in the source. The source reader is +notified when a checkpoint completes so that it can trigger the acknowledgement of successfully +received Pub/Sub messages through the split reader. As a result, when a checkpoint completes, all +messages which were successfully pulled since the previous checkpoint are acknowledged to Pub/Sub to +ensure they won't be redelivered. + +## Delivery Guarantee + +Google Cloud Pub/Sub only guarantees at-least-once message delivery. This guarantee is kept up by +the source as well. To achieve this, the source makes use of Pub/Sub's expectation that a message +should be acknowledged by the subscriber to signal that the message has been consumed successfully. +Any message that has not been acknowledged yet will be automatically redelivered by Pub/Sub once an +ack deadline has passed. + +When a checkpoint completes, all messages which were successfully pulled since the previous +checkpoint are acknowledged to Pub/Sub. This ensures at-least-once delivery in the source because in +the case of failure, non-checkpointed messages have not yet been acknowledged and will therefore be +redelivered by the Pub/Sub service. + +## Usage + +To keep up the Google Cloud Pub/Sub at-least-once guarantee, messages are acknowledged against +Pub/Sub when checkpointing succeeds. If a message is not acknowledged within an acknowledge +deadline, Pub/Sub will attempt redelivery. To avoid unnecessary redelivery of successfully received +messages, the checkpointing interval should always be configured (much) *lower* than the Google +Cloud Pub/Sub acknowledge deadline. + +```java +import org.apache.flink.connector.gcp.pubsub.source.PubSubSource; + +StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + +// Checkpointing must be enabled for the source to work so that messages can be acknowledged towards Pub/Sub +env.enableCheckpointing(1000); + +// Parallelism > 1 may be set +// env.setParallelism(4); + +PubSubSource source = + PubSubSource.builder() + // The deserialization schema to deserialize Pub/Sub messages + .setDeserializationSchema(new SimpleStringSchema()) + // The name string of your Pub/Sub project + .setProjectName(PROJECT_NAME) + // The name string of the subscription you would like to receive messages from + .setSubscriptionName(SUBSCRIPTION_NAME) + // An instance of the com.google.auth.Credentials class to authenticate against Google Cloud + .setCredentials(CREDENTIALS) + .setPubSubSubscriberFactory( + // The maximum number of messages that should be pulled in one go + 3, + // The timeout after which a message pull request is deemed a failure + Duration.ofSeconds(1), + // The number of times the reception of a message should be retried in case of failure + 10) + .build(); + +DataStream fromPubSub = + env.fromSource(source, WatermarkStrategy.noWatermarks(), "pubsub-source"); +``` diff --git a/flink-connector-gcp-pubsub/pom.xml b/flink-connector-gcp-pubsub/pom.xml index ee9299c2..5ad64033 100644 --- a/flink-connector-gcp-pubsub/pom.xml +++ b/flink-connector-gcp-pubsub/pom.xml @@ -35,6 +35,12 @@ under the License. jar + + org.apache.flink + flink-connector-base + ${flink.version} + + org.apache.flink flink-streaming-java diff --git a/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/PubSubSource.java b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/PubSubSource.java new file mode 100644 index 00000000..48c3a2c2 --- /dev/null +++ b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/PubSubSource.java @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.connector.gcp.pubsub.source; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; +import org.apache.flink.connector.gcp.pubsub.source.enumerator.PubSubEnumeratorState; +import org.apache.flink.connector.gcp.pubsub.source.enumerator.PubSubEnumeratorStateSerializer; +import org.apache.flink.connector.gcp.pubsub.source.enumerator.PubSubSourceEnumerator; +import org.apache.flink.connector.gcp.pubsub.source.reader.PubSubRecordEmitter; +import org.apache.flink.connector.gcp.pubsub.source.reader.PubSubSourceReader; +import org.apache.flink.connector.gcp.pubsub.source.reader.PubSubSplitReader; +import org.apache.flink.connector.gcp.pubsub.source.split.PubSubSplit; +import org.apache.flink.connector.gcp.pubsub.source.split.PubSubSplitSerializer; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.streaming.connectors.gcp.pubsub.DefaultPubSubSubscriberFactory; +import org.apache.flink.streaming.connectors.gcp.pubsub.DeserializationSchemaWrapper; +import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubDeserializationSchema; +import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriberFactory; +import org.apache.flink.util.Preconditions; + +import com.google.auth.Credentials; +import com.google.pubsub.v1.ProjectSubscriptionName; + +import java.io.IOException; +import java.time.Duration; +import java.util.Properties; +import java.util.function.Supplier; + +import static com.google.cloud.pubsub.v1.SubscriptionAdminSettings.defaultCredentialsProviderBuilder; + +/** + * A source implementation to pull messages from GCP Pub/Sub into Flink. + * + *

The {@link PubSubSourceEnumerator} assigns a static {@link PubSubSplit} to every {@link + * PubSubSourceReader} that joins. The split does not contain any split-specific information because + * Pub/Sub does not allow subscribers to specify a "range" of messages to pull by providing + * partitions or offsets. However, Pub/Sub will automatically load-balance messages between multiple + * readers using same subscription. + * + *

A {@link PubSubSource} can be constructed through the {@link PubSubSourceBuilder} like so: + * + *

{@code
+ * PubSubSource.builder()
+ *         // The deserialization schema to deserialize Pub/Sub messages
+ *         .setDeserializationSchema(new SimpleStringSchema())
+ *         // The name string of your Pub/Sub project
+ *         .setProjectName(PROJECT_NAME)
+ *         // The name string of the subscription you would like to receive messages from
+ *         .setSubscriptionName(SUBSCRIPTION_NAME)
+ *         // An instance of com.google.auth.Credentials to authenticate against Google Cloud
+ *         .setCredentials(CREDENTIALS)
+ *         .setPubSubSubscriberFactory(
+ *                 // The maximum number of messages that should be pulled in one go
+ *                 3,
+ *                 // The timeout after which a message pull request is deemed a failure
+ *                 Duration.ofSeconds(1),
+ *                 // The number of times the reception of a message should be retried in case of failure
+ *                 10)
+ *         .build();
+ * }
+ * + *

More details can be found at {@link PubSubSourceBuilder} + * + * @param The output type of the source. + */ +public class PubSubSource + implements Source, ResultTypeQueryable { + private final PubSubDeserializationSchema deserializationSchema; + private final PubSubSubscriberFactory pubSubSubscriberFactory; + private final Properties props; + private final Credentials credentials; + + private PubSubSource( + PubSubDeserializationSchema deserializationSchema, + PubSubSubscriberFactory pubSubSubscriberFactory, + Properties props, + Credentials credentials) { + this.deserializationSchema = deserializationSchema; + this.pubSubSubscriberFactory = pubSubSubscriberFactory; + this.props = props; + this.credentials = credentials; + } + + @Override + public Boundedness getBoundedness() { + return Boundedness.CONTINUOUS_UNBOUNDED; + } + + @Override + public SourceReader createReader(SourceReaderContext readerContext) { + FutureCompletingBlockingQueue>> elementsQueue = + new FutureCompletingBlockingQueue<>(); + Supplier> splitReaderSupplier = + () -> + new PubSubSplitReader<>( + deserializationSchema, pubSubSubscriberFactory, credentials); + PubSubRecordEmitter recordEmitter = new PubSubRecordEmitter<>(); + + return new PubSubSourceReader<>( + elementsQueue, + splitReaderSupplier, + recordEmitter, + toConfiguration(props), + readerContext); + } + + @Override + public SplitEnumerator createEnumerator( + SplitEnumeratorContext enumContext) { + return new PubSubSourceEnumerator(enumContext); + } + + @Override + public SplitEnumerator restoreEnumerator( + SplitEnumeratorContext enumContext, PubSubEnumeratorState checkpoint) { + return new PubSubSourceEnumerator(enumContext); + } + + @Override + public SimpleVersionedSerializer getSplitSerializer() { + return new PubSubSplitSerializer(); + } + + @Override + public SimpleVersionedSerializer getEnumeratorCheckpointSerializer() { + return new PubSubEnumeratorStateSerializer(); + } + + @Override + public TypeInformation getProducedType() { + return deserializationSchema.getProducedType(); + } + + /** + * Get a builder to build a {@link PubSubSource}. + * + * @return A builder for a {@link PubSubSource}. + */ + public static PubSubSourceBuilder builder() { + return new PubSubSourceBuilder<>(); + } + + /** @param */ + public static class PubSubSourceBuilder { + private static final int DEFAULT_PUBSUB_SUBSCRIBER_NUMBER_OF_RETRIES = 3; + private static final int DEFAULT_PUBSUB_SUBSCRIBER_MAX_MESSAGES_PER_PULL = 100; + + private PubSubDeserializationSchema deserializationSchema; + private String projectName; + private String subscriptionName; + private PubSubSubscriberFactory pubSubSubscriberFactory; + private Properties props; + private Credentials credentials; + + private PubSubSourceBuilder() { + this.props = new Properties(); + } + + /** + * Set the DeserializationSchema used to deserialize incoming Pub/Sub messages. Use any + * {@link DeserializationSchema} to use in the {@link PubSubSource}. The schema will be + * wrapped automatically for compatibility with the source. + * + * @param deserializationSchema a deserialization schema to use. + */ + public PubSubSourceBuilder setDeserializationSchema( + DeserializationSchema deserializationSchema) { + Preconditions.checkNotNull(deserializationSchema); + this.deserializationSchema = new DeserializationSchemaWrapper<>(deserializationSchema); + return this; + } + + /** + * Set the PubSubDeserializationSchema used to deserialize incoming Pub/Sub messages. + * + * @param deserializationSchema a deserialization schema to use. + */ + public PubSubSourceBuilder setDeserializationSchema( + PubSubDeserializationSchema deserializationSchema) { + Preconditions.checkNotNull(deserializationSchema); + this.deserializationSchema = deserializationSchema; + return this; + } + + /** @param projectName the name string of your Pub/Sub project */ + public PubSubSourceBuilder setProjectName(String projectName) { + Preconditions.checkNotNull(projectName); + this.projectName = projectName; + return this; + } + + /** + * @param subscriptionName the name string of the subscription you would like to receive + * messages from + */ + public PubSubSourceBuilder setSubscriptionName(String subscriptionName) { + Preconditions.checkNotNull(subscriptionName); + this.subscriptionName = subscriptionName; + return this; + } + + /** + * @param credentials an instance of {@link com.google.auth.Credentials} to authenticate + * against Google Cloud + */ + public PubSubSourceBuilder setCredentials(Credentials credentials) { + this.credentials = credentials; + return this; + } + + /** @param pubSubSubscriberFactory a custom factory to create Pub/Sub subscribers from */ + public PubSubSourceBuilder setPubSubSubscriberFactory( + PubSubSubscriberFactory pubSubSubscriberFactory) { + this.pubSubSubscriberFactory = pubSubSubscriberFactory; + return this; + } + + /** + * Create a parameterized {@link DefaultPubSubSubscriberFactory} and set it on the builder. + * + * @param maxMessagesPerPull The maximum number of messages that should be pulled in one go. + * @param perRequestTimeout The timeout per request from the subscriber + * @param retries The number of times the reception of a message should be retried in case + * of failure. + */ + public PubSubSourceBuilder setPubSubSubscriberFactory( + int maxMessagesPerPull, Duration perRequestTimeout, int retries) { + this.pubSubSubscriberFactory = + new DefaultPubSubSubscriberFactory( + ProjectSubscriptionName.format(projectName, subscriptionName), + retries, + perRequestTimeout, + maxMessagesPerPull); + return this; + } + + public PubSubSourceBuilder setProps(Properties props) { + this.props = props; + return this; + } + + public PubSubSource build() throws IOException { + Preconditions.checkNotNull( + deserializationSchema, "Deserialization schema must be provided."); + Preconditions.checkNotNull( + projectName, "Google Cloud Pub/Sub projectName must be set."); + Preconditions.checkNotNull( + subscriptionName, "Google Cloud Pub/Sub subscriptionName must be set."); + + if (credentials == null) { + credentials = defaultCredentialsProviderBuilder().build().getCredentials(); + } + + if (pubSubSubscriberFactory == null) { + pubSubSubscriberFactory = + new DefaultPubSubSubscriberFactory( + ProjectSubscriptionName.format(projectName, subscriptionName), + DEFAULT_PUBSUB_SUBSCRIBER_NUMBER_OF_RETRIES, + Duration.ofSeconds(15), + DEFAULT_PUBSUB_SUBSCRIBER_MAX_MESSAGES_PER_PULL); + } + + return new PubSubSource<>( + deserializationSchema, pubSubSubscriberFactory, props, credentials); + } + } + + // ----------- private helper methods --------------- + + private Configuration toConfiguration(Properties props) { + Configuration config = new Configuration(); + props.stringPropertyNames().forEach(key -> config.setString(key, props.getProperty(key))); + return config; + } +} diff --git a/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/enumerator/PubSubEnumeratorState.java b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/enumerator/PubSubEnumeratorState.java new file mode 100644 index 00000000..bde9e457 --- /dev/null +++ b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/enumerator/PubSubEnumeratorState.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.connector.gcp.pubsub.source.enumerator; + +/** + * A stub to contain the checkpoint data of a {@link PubSubSourceEnumerator}. GCP Pub/Sub does not + * expose any partitions or similar concepts which would need handling by the enumerator. Therefore, + * there are no offsets or other data that could be saved in a checkpoint. + */ +public class PubSubEnumeratorState {} diff --git a/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/enumerator/PubSubEnumeratorStateSerializer.java b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/enumerator/PubSubEnumeratorStateSerializer.java new file mode 100644 index 00000000..6b06dc81 --- /dev/null +++ b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/enumerator/PubSubEnumeratorStateSerializer.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.connector.gcp.pubsub.source.enumerator; + +import org.apache.flink.core.io.SimpleVersionedSerializer; + +import java.io.IOException; + +/** + * A stub to serialize the contents of a {@link PubSubEnumeratorState}. Because no data is stored in + * such a checkpoint, no proper serialization is necessary. + */ +public class PubSubEnumeratorStateSerializer + implements SimpleVersionedSerializer { + private static final int CURRENT_VERSION = 0; + + @Override + public int getVersion() { + return CURRENT_VERSION; + } + + @Override + public byte[] serialize(PubSubEnumeratorState enumeratorCheckpoint) throws IOException { + return new byte[0]; + } + + @Override + public PubSubEnumeratorState deserialize(int version, byte[] serialized) throws IOException { + if (version == 0) { + return new PubSubEnumeratorState(); + } + throw new IOException( + String.format( + "The bytes are serialized with version %d, " + + "while this deserializer only supports version up to %d", + version, CURRENT_VERSION)); + } +} diff --git a/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/enumerator/PubSubSourceEnumerator.java b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/enumerator/PubSubSourceEnumerator.java new file mode 100644 index 00000000..300a50c9 --- /dev/null +++ b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/enumerator/PubSubSourceEnumerator.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.connector.gcp.pubsub.source.enumerator; + +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.connector.gcp.pubsub.source.PubSubSource; +import org.apache.flink.connector.gcp.pubsub.source.split.PubSubSplit; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.List; + +/** + * The enumerator for the {@link PubSubSource}. It does not do any work discovery as envisioned by + * FLIP-27 because GCP Pub/Sub hides partitions and other implementation details. + */ +public class PubSubSourceEnumerator implements SplitEnumerator { + private final SplitEnumeratorContext context; + + public PubSubSourceEnumerator(SplitEnumeratorContext context) { + this.context = context; + } + + @Override + public void start() {} + + @Override + public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {} + + @Override + public void addSplitsBack(List splits, int subtaskId) {} + + /** + * When a new reader joins, the enumerator actively assigns it with a generic {@link + * PubSubSplit} so that it can start pulling messages. + * + * @param subtaskId the subtask ID of the new source reader. + */ + @Override + public void addReader(int subtaskId) { + context.assignSplit(new PubSubSplit(), subtaskId); + } + + /** + * In the case of GCP Pub/Sub, there's no state of the enumerator that could be captured in a + * checkpoint. + * + * @return A checkpoint stub. + * @throws Exception + */ + @Override + public PubSubEnumeratorState snapshotState(long checkpointId) throws Exception { + return new PubSubEnumeratorState(); + } + + @Override + public void close() throws IOException {} +} diff --git a/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/reader/PubSubRecordEmitter.java b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/reader/PubSubRecordEmitter.java new file mode 100644 index 00000000..53ae2bc0 --- /dev/null +++ b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/reader/PubSubRecordEmitter.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.connector.gcp.pubsub.source.reader; + +import org.apache.flink.api.connector.source.SourceOutput; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.connector.base.source.reader.RecordEmitter; +import org.apache.flink.connector.gcp.pubsub.source.split.PubSubSplitState; + +/** + * A custom {@link RecordEmitter} to emit a record which includes the data of the received GCP + * Pub/Sub message and the publication time of the message. + * + * @param The type of the record data to be emitted. + */ +public class PubSubRecordEmitter implements RecordEmitter, T, PubSubSplitState> { + + @Override + public void emitRecord( + Tuple2 element, SourceOutput output, PubSubSplitState splitState) { + output.collect(element.f0, element.f1); + } +} diff --git a/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/reader/PubSubSourceFetcherManager.java b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/reader/PubSubSourceFetcherManager.java new file mode 100644 index 00000000..c41cf4fc --- /dev/null +++ b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/reader/PubSubSourceFetcherManager.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.connector.gcp.pubsub.source.reader; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager; +import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher; +import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherTask; +import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; +import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; +import org.apache.flink.connector.gcp.pubsub.source.split.PubSubSplit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.function.Supplier; + +/** + * A custom {@link SingleThreadFetcherManager} so that the reception of GCP Pub/Sub messages can be + * acknowledged towards GCP Pub/Sub once they have been successfully checkpointed in Flink. As long + * as a received message has not been acknowledged, GCP Pub/Sub will attempt to deliver it again. + */ +class PubSubSourceFetcherManager + extends SingleThreadFetcherManager, PubSubSplit> { + private static final Logger LOG = LoggerFactory.getLogger(PubSubSourceFetcherManager.class); + + PubSubSourceFetcherManager( + FutureCompletingBlockingQueue>> elementsQueue, + Supplier, PubSubSplit>> splitReaderSupplier, + Configuration config) { + super(elementsQueue, splitReaderSupplier, config); + } + + void prepareForAcknowledgement(long checkpointId) { + SplitFetcher, PubSubSplit> splitFetcher = fetchers.get(0); + + if (splitFetcher != null) { + enqueuePrepareForAcknowledgementTask(splitFetcher, checkpointId); + } else { + splitFetcher = createSplitFetcher(); + enqueuePrepareForAcknowledgementTask(splitFetcher, checkpointId); + startFetcher(splitFetcher); + } + } + + private void enqueuePrepareForAcknowledgementTask( + SplitFetcher, PubSubSplit> splitFetcher, long checkpointId) { + PubSubSplitReader pubSubSplitReader = + (PubSubSplitReader) splitFetcher.getSplitReader(); + + splitFetcher.enqueueTask( + new SplitFetcherTask() { + @Override + public boolean run() { + pubSubSplitReader.prepareForAcknowledgement(checkpointId); + return true; + } + + @Override + public void wakeUp() {} + }); + } + + /** + * Creates a {@link SplitFetcher} if there's none available yet and enqueues a task to + * acknowledge GCP Pub/Sub messages. + */ + void acknowledgeMessages(long checkpointId) { + SplitFetcher, PubSubSplit> splitFetcher = fetchers.get(0); + + if (splitFetcher != null) { + enqueueAcknowledgeMessagesTask(splitFetcher, checkpointId); + } else { + splitFetcher = createSplitFetcher(); + enqueueAcknowledgeMessagesTask(splitFetcher, checkpointId); + startFetcher(splitFetcher); + } + } + + /** + * Enqueues a task that, when run, notifies a {@link PubSubSplitReader} of a successful + * checkpoint so that GCP Pub/Sub messages received since the previous checkpoint can be + * acknowledged. + * + * @param splitFetcher the split fetcher on which the acknowledge task should be enqueued. + */ + private void enqueueAcknowledgeMessagesTask( + SplitFetcher, PubSubSplit> splitFetcher, long checkpointId) { + PubSubSplitReader pubSubSplitReader = + (PubSubSplitReader) splitFetcher.getSplitReader(); + + splitFetcher.enqueueTask( + new SplitFetcherTask() { + @Override + public boolean run() throws IOException { + pubSubSplitReader.acknowledgeMessages(checkpointId); + return true; + } + + @Override + public void wakeUp() {} + }); + } +} diff --git a/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/reader/PubSubSourceReader.java b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/reader/PubSubSourceReader.java new file mode 100644 index 00000000..ae92f5fb --- /dev/null +++ b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/reader/PubSubSourceReader.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.connector.gcp.pubsub.source.reader; + +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.base.source.reader.RecordEmitter; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase; +import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; +import org.apache.flink.connector.gcp.pubsub.source.split.PubSubSplit; +import org.apache.flink.connector.gcp.pubsub.source.split.PubSubSplitState; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.function.Supplier; + +/** The source reader to read from GCP Pub/Sub. */ +public class PubSubSourceReader + extends SingleThreadMultiplexSourceReaderBase< + Tuple2, T, PubSubSplit, PubSubSplitState> { + private static final Logger LOG = LoggerFactory.getLogger(PubSubSourceReader.class); + + public PubSubSourceReader( + FutureCompletingBlockingQueue>> elementsQueue, + Supplier> splitReaderSupplier, + RecordEmitter, T, PubSubSplitState> recordEmitter, + Configuration config, + SourceReaderContext context) { + super( + elementsQueue, + new PubSubSourceFetcherManager<>(elementsQueue, splitReaderSupplier::get, config), + recordEmitter, + config, + context); + } + + @Override + protected void onSplitFinished(Map finishedSplitIds) {} + + @Override + public List snapshotState(long checkpointId) { + ((PubSubSourceFetcherManager) splitFetcherManager) + .prepareForAcknowledgement(checkpointId); + return Collections.singletonList(new PubSubSplit()); + } + + /** + * Communicates with the {@link PubSubSourceFetcherManager} about the completion of a checkpoint + * so that messages received from GCP Pub/Sub can be acknowledged from a {@link + * PubSubSplitReader}. + * + * @param checkpointId the checkpoint ID. + */ + @Override + public void notifyCheckpointComplete(long checkpointId) { + LOG.info("Acknowledging received GCP Pub/Sub messages for checkpoint {}", checkpointId); + ((PubSubSourceFetcherManager) splitFetcherManager).acknowledgeMessages(checkpointId); + } + + @Override + protected PubSubSplitState initializedState(PubSubSplit split) { + return new PubSubSplitState(); + } + + /** + * Simply returns a new instance of {@link PubSubSplit} because GCP Pub/Sub offers no control of + * pulling messages beyond the configuration of project name and subscription name. + * + * @param splitId the split ID + * @param splitState the split state + * @return a fresh instance of {@link PubSubSplit} + */ + @Override + protected PubSubSplit toSplitType(String splitId, PubSubSplitState splitState) { + return new PubSubSplit(); + } +} diff --git a/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/reader/PubSubSplitReader.java b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/reader/PubSubSplitReader.java new file mode 100644 index 00000000..c754eef8 --- /dev/null +++ b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/reader/PubSubSplitReader.java @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.connector.gcp.pubsub.source.reader; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.connector.base.source.reader.RecordsBySplits; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; +import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange; +import org.apache.flink.connector.gcp.pubsub.source.split.PubSubSplit; +import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubDeserializationSchema; +import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriber; +import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriberFactory; +import org.apache.flink.util.Collector; + +import com.google.auth.Credentials; +import com.google.pubsub.v1.ReceivedMessage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.stream.Collectors; + +/** + * A {@link SplitReader} to read from a given {@link PubSubSubscriber}. + * + * @param the type of the record. + */ +public class PubSubSplitReader implements SplitReader, PubSubSplit> { + private static final Logger LOG = LoggerFactory.getLogger(PubSubSplitReader.class); + private static final int RECEIVED_MESSAGE_QUEUE_MAX_RETRY_COUNT = 5; + private static final int RECEIVED_MESSAGE_QUEUE_CAPACITY = 500000; + private static final long RECEIVED_MESSAGE_QUEUE_RETRY_SLEEP_MILLIS = 1000; + private final PubSubDeserializationSchema deserializationSchema; + private final PubSubSubscriberFactory pubSubSubscriberFactory; + private final Credentials credentials; + private volatile PubSubSubscriber subscriber; + private final PubSubCollector collector; + + // Store the IDs of GCP Pub/Sub messages we have fetched & processed. Since the reader thread + // processes messages and the fetcher thread acknowledges them, the thread-safe queue + // decouples them. + private final BlockingQueue ackIdsQueue = + new ArrayBlockingQueue<>(RECEIVED_MESSAGE_QUEUE_CAPACITY); + private final Map> messageIdsToAcknowledge = new HashMap<>(); + + /** + * @param deserializationSchema a deserialization schema to apply to incoming message payloads. + * @param pubSubSubscriberFactory a factory from which a new subscriber can be created from + * @param credentials the credentials to use for creating a new subscriber + */ + public PubSubSplitReader( + PubSubDeserializationSchema deserializationSchema, + PubSubSubscriberFactory pubSubSubscriberFactory, + Credentials credentials) { + + this.deserializationSchema = deserializationSchema; + this.pubSubSubscriberFactory = pubSubSubscriberFactory; + this.credentials = credentials; + this.collector = new PubSubCollector(); + } + + @Override + public RecordsWithSplitIds> fetch() throws IOException { + RecordsBySplits.Builder> recordsBySplits = new RecordsBySplits.Builder<>(); + if (subscriber == null) { + synchronized (this) { + if (subscriber == null) { + subscriber = pubSubSubscriberFactory.getSubscriber(credentials); + } + } + } + + for (ReceivedMessage receivedMessage : subscriber.pull()) { + try { + // Deserialize messages into a collector so that logic in the user-provided + // deserialization schema decides how to map GCP Pub/Sub messages to records in + // Flink. This allows e.g. batching together multiple Flink records in a single GCP + // Pub/Sub message. + deserializationSchema.deserialize(receivedMessage.getMessage(), collector); + collector + .getMessages() + .forEach( + message -> + recordsBySplits.add( + PubSubSplit.SPLIT_ID, + new Tuple2<>( + message, + // A timestamp provided by GCP Pub/Sub + // indicating when the message was initially + // published + receivedMessage + .getMessage() + .getPublishTime() + .getSeconds()))); + } catch (Exception e) { + throw new IOException("Failed to deserialize received message due to", e); + } finally { + collector.reset(); + } + + enqueueAcknowledgementId(receivedMessage.getAckId()); + } + + return recordsBySplits.build(); + } + + /** + * Enqueue an acknowledgment ID to be acknowledged towards GCP Pub/Sub with retries. + * + * @param ackId the ID of the message to acknowledge + */ + public void enqueueAcknowledgementId(String ackId) { + int retryCount = 0; + + while (retryCount < RECEIVED_MESSAGE_QUEUE_MAX_RETRY_COUNT) { + boolean enqueued = ackIdsQueue.offer(ackId); + if (!enqueued) { + retryCount++; + try { + Thread.sleep(RECEIVED_MESSAGE_QUEUE_RETRY_SLEEP_MILLIS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.error("Thread interrupted while waiting to enqueue acknowledgment ID.", e); + return; + } + } else { + return; + } + } + + LOG.warn( + "Queue is full. Unable to enqueue acknowledgment ID after " + + RECEIVED_MESSAGE_QUEUE_MAX_RETRY_COUNT + + " retries."); + } + + @Override + public void handleSplitsChanges(SplitsChange splitsChanges) {} + + @Override + public void wakeUp() {} + + @Override + public void close() throws Exception { + if (subscriber != null) { + subscriber.close(); + } + } + + private class PubSubCollector implements Collector { + private final List messages = new ArrayList<>(); + + @Override + public void collect(T message) { + messages.add(message); + } + + @Override + public void close() {} + + private List getMessages() { + return messages; + } + + private void reset() { + messages.clear(); + } + } + + /** + * Prepare for acknowledging messages received since the last checkpoint by draining the {@link + * #ackIdsQueue} into {@link #messageIdsToAcknowledge}. + * + *

Calling this method is enqueued by the {@link PubSubSourceFetcherManager} to snapshot + * state before a checkpoint. + * + * @param checkpointId the ID of the checkpoint for which to prepare for acknowledging messages + */ + public void prepareForAcknowledgement(long checkpointId) { + List ackIds = new ArrayList<>(); + ackIdsQueue.drainTo(ackIds); + messageIdsToAcknowledge.put(checkpointId, ackIds); + } + + /** + * Acknowledge the reception of messages towards GCP Pub/Sub since the last checkpoint. If a + * received message is not acknowledged before the subscription's acknowledgment timeout, GCP + * Pub/Sub will attempt to deliver it again. + * + *

Calling this method is enqueued by the {@link PubSubSourceFetcherManager} on checkpoint. + * + * @param checkpointId the ID of the checkpoint for which to acknowledge messages + */ + void acknowledgeMessages(long checkpointId) throws IOException { + if (subscriber == null) { + synchronized (this) { + if (subscriber == null) { + subscriber = pubSubSubscriberFactory.getSubscriber(credentials); + } + } + } + + if (!messageIdsToAcknowledge.containsKey(checkpointId)) { + LOG.error( + "Checkpoint {} not found in set of in-flight checkpoints {}.", + checkpointId, + messageIdsToAcknowledge.keySet().stream() + .map(String::valueOf) + .collect(Collectors.joining(","))); + return; + } + + List messageIdsForCheckpoint = messageIdsToAcknowledge.remove(checkpointId); + if (!messageIdsForCheckpoint.isEmpty()) { + LOG.debug( + "Acknowledging {} messages for checkpoint {}.", + messageIdsForCheckpoint.size(), + checkpointId); + subscriber.acknowledge(messageIdsForCheckpoint); + } else { + LOG.debug("No messages to acknowledge for checkpoint {}.", checkpointId); + } + + // Handle the case where a checkpoint is aborted and the messages for that checkpoint are + // never acknowledged. Here, we log any remaining checkpointIds and clear them. This relies + // on GCP Pub/Sub to redeliver the unacked messages. + if (!messageIdsToAcknowledge.isEmpty()) { + // Loop through any remaining checkpointIds in messageIdsToAcknowledge, and then clear + // them. + for (Map.Entry> entry : messageIdsToAcknowledge.entrySet()) { + LOG.warn( + "Checkpoint {} was not acknowledged - clearing {} unacked messages.", + entry.getKey(), + entry.getValue().size()); + } + messageIdsToAcknowledge.clear(); + } + } +} diff --git a/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/split/PubSubSplit.java b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/split/PubSubSplit.java new file mode 100644 index 00000000..6f2a6dc0 --- /dev/null +++ b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/split/PubSubSplit.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.connector.gcp.pubsub.source.split; + +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.connector.gcp.pubsub.source.PubSubSource; + +/** + * A {@link SourceSplit} implementation for the {@link PubSubSource}. It only saves a generic split + * ID because a split does not carry any specific information for pulling from a project + * subscription. That is because GCP Pub/Sub hides partitions and other implementation details. + */ +public class PubSubSplit implements SourceSplit { + public static final String SPLIT_ID = "0"; + + @Override + public String splitId() { + return SPLIT_ID; + } +} diff --git a/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/split/PubSubSplitSerializer.java b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/split/PubSubSplitSerializer.java new file mode 100644 index 00000000..42c1fc7d --- /dev/null +++ b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/split/PubSubSplitSerializer.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.connector.gcp.pubsub.source.split; + +import org.apache.flink.core.io.SimpleVersionedSerializer; + +import java.io.IOException; + +/** + * A stub to serialize instances of {@link PubSubSplit}. No real deserialization or serialization is + * carried out because of how generic the {@link PubSubSplit} is. + */ +public class PubSubSplitSerializer implements SimpleVersionedSerializer { + private static final int CURRENT_VERSION = 0; + + @Override + public int getVersion() { + return CURRENT_VERSION; + } + + @Override + public byte[] serialize(PubSubSplit obj) throws IOException { + return new byte[0]; + } + + @Override + public PubSubSplit deserialize(int version, byte[] serialized) throws IOException { + if (version == 0) { + return new PubSubSplit(); + } + throw new IOException( + String.format( + "The bytes are serialized with version %d, " + + "while this deserializer only supports version up to %d", + version, CURRENT_VERSION)); + } +} diff --git a/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/split/PubSubSplitState.java b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/split/PubSubSplitState.java new file mode 100644 index 00000000..ca4cf268 --- /dev/null +++ b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/split/PubSubSplitState.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.connector.gcp.pubsub.source.split; + +/** A stub for the always empty state of a {@link PubSubSplit}. */ +public class PubSubSplitState extends PubSubSplit {} diff --git a/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/BlockingGrpcPubSubSubscriber.java b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/BlockingGrpcPubSubSubscriber.java index 790e1e1e..9f7ae631 100644 --- a/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/BlockingGrpcPubSubSubscriber.java +++ b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/BlockingGrpcPubSubSubscriber.java @@ -18,7 +18,6 @@ package org.apache.flink.streaming.connectors.gcp.pubsub; -import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriber; import com.google.pubsub.v1.AcknowledgeRequest; @@ -29,10 +28,10 @@ import io.grpc.StatusRuntimeException; import java.time.Duration; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; -import static java.util.Collections.emptyList; import static java.util.concurrent.TimeUnit.SECONDS; /** @@ -83,22 +82,30 @@ private List pull(int retriesRemaining) { @Override public void acknowledge(List acknowledgementIds) { - if (acknowledgementIds.isEmpty()) { - return; - } - - // grpc servers won't accept acknowledge requests that are too large so we split the ackIds - Tuple2, List> splittedAckIds = splitAckIds(acknowledgementIds); - while (!splittedAckIds.f0.isEmpty()) { + List> splittedAckIds = splitAckIds(acknowledgementIds); + while (!splittedAckIds.isEmpty()) { AcknowledgeRequest acknowledgeRequest = AcknowledgeRequest.newBuilder() .setSubscription(projectSubscriptionName) - .addAllAckIds(splittedAckIds.f0) + .addAllAckIds(splittedAckIds.remove(0)) .build(); - stub.withDeadlineAfter(60, SECONDS).acknowledge(acknowledgeRequest); + acknowledgeWithRetries(acknowledgeRequest, retries); + } + } + + private void acknowledgeWithRetries( + AcknowledgeRequest acknowledgeRequest, int retriesRemaining) { + try { + stub.withDeadlineAfter(timeout.toMillis(), TimeUnit.MILLISECONDS) + .acknowledge(acknowledgeRequest); + } catch (StatusRuntimeException e) { + if (retriesRemaining > 0) { + acknowledgeWithRetries(acknowledgeRequest, retriesRemaining - 1); + return; + } - splittedAckIds = splitAckIds(splittedAckIds.f1); + throw e; } } @@ -114,21 +121,32 @@ public void acknowledge(List acknowledgementIds) { * configurable on the server). We know from experience that it is 512K. * @return First list contains no more than 512k bytes, second list contains remaining ids */ - private Tuple2, List> splitAckIds(List ackIds) { - final int maxPayload = 500 * 1024; // little below 512k bytes to be on the safe side + private List> splitAckIds(List ackIds) { + int queueSize = ackIds.size(); + final int maxPayload = 500 * 1024; // slightly below 512k bytes to be on the safe side final int fixedOverheadPerCall = 100; final int overheadPerId = 3; + List> outputLists = new ArrayList<>(); + List currentList = new ArrayList<>(); int totalBytes = fixedOverheadPerCall; - for (int i = 0; i < ackIds.size(); i++) { - totalBytes += ackIds.get(i).length() + overheadPerId; - if (totalBytes > maxPayload) { - return Tuple2.of(ackIds.subList(0, i), ackIds.subList(i, ackIds.size())); + for (String ackId : ackIds) { + if (totalBytes + ackId.length() + overheadPerId > maxPayload) { + outputLists.add(currentList); + currentList = new ArrayList<>(); + totalBytes = fixedOverheadPerCall; } + currentList.add(ackId); + totalBytes += ackId.length() + overheadPerId; + } + if (!currentList.isEmpty()) { + outputLists.add(currentList); } - return Tuple2.of(ackIds, emptyList()); + assert outputLists.stream().map(List::size).reduce(0, Integer::sum) == queueSize; + + return outputLists; } @Override diff --git a/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/DefaultPubSubSubscriberFactory.java b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/DefaultPubSubSubscriberFactory.java index 3c32ad5f..fb6a5256 100644 --- a/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/DefaultPubSubSubscriberFactory.java +++ b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/DefaultPubSubSubscriberFactory.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.connectors.gcp.pubsub; +import org.apache.flink.connector.gcp.pubsub.source.PubSubSource; import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriber; import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriberFactory; @@ -34,13 +35,26 @@ import java.io.IOException; import java.time.Duration; -class DefaultPubSubSubscriberFactory implements PubSubSubscriberFactory { +/** + * A default {@link PubSubSubscriberFactory} used by the {@link PubSubSource.PubSubSourceBuilder} to + * obtain a subscriber with which messages can be pulled from GCP Pub/Sub. + */ +public class DefaultPubSubSubscriberFactory implements PubSubSubscriberFactory { private final int retries; private final Duration timeout; private final int maxMessagesPerPull; private final String projectSubscriptionName; - DefaultPubSubSubscriberFactory( + /** + * @param projectSubscriptionName The formatted name of the Pub/Sub project and subscription to + * pull messages from. Can be easily obtained through {@link + * com.google.pubsub.v1.ProjectSubscriptionName}. + * @param retries The number of times the reception of a message should be retried in case of + * failure. + * @param pullTimeout The timeout after which a message pull request is deemed a failure + * @param maxMessagesPerPull The maximum number of messages that should be pulled in one go. + */ + public DefaultPubSubSubscriberFactory( String projectSubscriptionName, int retries, Duration pullTimeout, diff --git a/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/DeserializationSchemaWrapper.java b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/DeserializationSchemaWrapper.java index 1e56df36..47228503 100644 --- a/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/DeserializationSchemaWrapper.java +++ b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/DeserializationSchemaWrapper.java @@ -29,10 +29,10 @@ * This class wraps a {@link DeserializationSchema} so it can be used in a {@link PubSubSource} as a * {@link PubSubDeserializationSchema}. */ -class DeserializationSchemaWrapper implements PubSubDeserializationSchema { +public class DeserializationSchemaWrapper implements PubSubDeserializationSchema { private final DeserializationSchema deserializationSchema; - DeserializationSchemaWrapper(DeserializationSchema deserializationSchema) { + public DeserializationSchemaWrapper(DeserializationSchema deserializationSchema) { this.deserializationSchema = deserializationSchema; } diff --git a/flink-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/connector/gcp/pubsub/EmulatedPubSubSourceTest.java b/flink-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/connector/gcp/pubsub/EmulatedPubSubSourceTest.java new file mode 100644 index 00000000..f3dadc59 --- /dev/null +++ b/flink-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/connector/gcp/pubsub/EmulatedPubSubSourceTest.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.connector.gcp.pubsub; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.connector.gcp.pubsub.source.PubSubSource; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.EmulatorCredentials; +import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.GCloudUnitTestBase; +import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.PubSubSubscriberFactoryForEmulator; +import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.PubsubHelper; + +import com.google.cloud.pubsub.v1.Publisher; +import com.google.protobuf.ByteString; +import com.google.pubsub.v1.PubsubMessage; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ExecutionException; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** Test of {@link PubSubSource} against the GCP Pub/Sub emulator SDK. */ +public class EmulatedPubSubSourceTest extends GCloudUnitTestBase { + private static final String PROJECT_NAME = "FLProject"; + private static final String TOPIC_NAME = "FLTopic"; + private static final String SUBSCRIPTION_NAME = "FLSubscription"; + + private static PubsubHelper pubsubHelper; + + @Before + public void setUp() throws Exception { + pubsubHelper = getPubsubHelper(); + pubsubHelper.createTopic(PROJECT_NAME, TOPIC_NAME); + pubsubHelper.createSubscription(PROJECT_NAME, SUBSCRIPTION_NAME, PROJECT_NAME, TOPIC_NAME); + } + + @After + public void tearDown() throws Exception { + pubsubHelper.deleteSubscription(PROJECT_NAME, SUBSCRIPTION_NAME); + pubsubHelper.deleteTopic(PROJECT_NAME, TOPIC_NAME); + } + + public void testFlinkSource(boolean testWithFailure) throws Exception { + // Create some messages and put them into pubsub + List input = + Arrays.asList( + "One", "Two", "Three", "Four", "Five", "Six", "Seven", "Eight", "Nine", + "Ten"); + + List messagesToSend = new ArrayList<>(input); + + // Publish the messages into PubSub + Publisher publisher = pubsubHelper.createPublisher(PROJECT_NAME, TOPIC_NAME); + messagesToSend.forEach( + s -> { + try { + publisher + .publish( + PubsubMessage.newBuilder() + .setData(ByteString.copyFromUtf8(s)) + .build()) + .get(); + } catch (InterruptedException | ExecutionException e) { + e.printStackTrace(); + } + }); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(100); + env.setParallelism(1); + if (testWithFailure) { + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 2000)); + } else { + env.setRestartStrategy(RestartStrategies.noRestart()); + } + + PubSubSource source = + PubSubSource.builder() + .setDeserializationSchema(new SimpleStringSchema()) + .setProjectName(PROJECT_NAME) + .setSubscriptionName(SUBSCRIPTION_NAME) + .setCredentials(EmulatorCredentials.getInstance()) + .setPubSubSubscriberFactory( + new PubSubSubscriberFactoryForEmulator( + getPubSubHostPort(), + PROJECT_NAME, + SUBSCRIPTION_NAME, + 10, + Duration.ofSeconds(1), + 3)) + .build(); + + DataStream fromPubSub = + env.fromSource(source, WatermarkStrategy.noWatermarks(), "test-pubsub-new-source"); + + if (testWithFailure) { + fromPubSub = fromPubSub.map(new FailureMapFunction<>(3)); + } + + // Asking for any more elements would wait forever, and there isn't a graceful way to + // indicate end of stream. + List output = fromPubSub.executeAndCollect(input.size()); + + assertEquals("Wrong number of elements", input.size(), output.size()); + for (String test : input) { + assertTrue("Missing " + test, output.contains(test)); + } + } + + private static class FailureMapFunction extends RichMapFunction { + private final long numberOfRecordsUntilFailure; + private long numberOfRecordsProcessed; + + private FailureMapFunction(long numberOfRecordsBeforeFailure) { + this.numberOfRecordsUntilFailure = numberOfRecordsBeforeFailure; + } + + @Override + public T map(T value) throws Exception { + numberOfRecordsProcessed++; + + if (shouldThrowException()) { + throw new Exception( + "Deliberately thrown exception to induce crash for failure recovery testing."); + } + return value; + } + + private boolean shouldThrowException() { + return getRuntimeContext().getAttemptNumber() <= 1 + && (numberOfRecordsProcessed >= numberOfRecordsUntilFailure); + } + } + + // IMPORTANT: This test makes use of things that happen in the emulated PubSub that + // are GUARANTEED to be different in the real Google hosted PubSub. + // So running these tests against the real thing will have a very high probability of + // failing. + // The assumptions: + // 1) The ordering of the messages is maintained. + // 2) Exactly once: We assume that every message we put in comes out exactly once. + // In the real PubSub there are a lot of situations (mostly failure/retry) where this is not + // true. + @Test + public void testFlinkSourceOk() throws Exception { + testFlinkSource(false); + } + + @Test + public void testFlinkSourceFailure() throws Exception { + testFlinkSource(true); + } +} diff --git a/flink-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/connector/gcp/pubsub/EmulatedPubSubSourceTest.java.orig b/flink-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/connector/gcp/pubsub/EmulatedPubSubSourceTest.java.orig new file mode 100644 index 00000000..8d9a421a --- /dev/null +++ b/flink-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/connector/gcp/pubsub/EmulatedPubSubSourceTest.java.orig @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.connector.gcp.pubsub; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.connector.gcp.pubsub.source.PubSubSource; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamUtils; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.EmulatorCredentials; +import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.GCloudUnitTestBase; +import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.PubSubSubscriberFactoryForEmulator; +import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.PubsubHelper; + +import com.google.cloud.pubsub.v1.Publisher; +import com.google.protobuf.ByteString; +import com.google.pubsub.v1.PubsubMessage; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ExecutionException; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** Test of {@link PubSubSource} against the GCP Pub/Sub emulator SDK. */ +public class EmulatedPubSubSourceTest extends GCloudUnitTestBase { + private static final String PROJECT_NAME = "FLProject"; + private static final String TOPIC_NAME = "FLTopic"; + private static final String SUBSCRIPTION_NAME = "FLSubscription"; + + private static PubsubHelper pubsubHelper; + + @Before + public void setUp() throws Exception { + pubsubHelper = getPubsubHelper(); + pubsubHelper.createTopic(PROJECT_NAME, TOPIC_NAME); + pubsubHelper.createSubscription(PROJECT_NAME, SUBSCRIPTION_NAME, PROJECT_NAME, TOPIC_NAME); + } + + @After + public void tearDown() throws Exception { + pubsubHelper.deleteSubscription(PROJECT_NAME, SUBSCRIPTION_NAME); + pubsubHelper.deleteTopic(PROJECT_NAME, TOPIC_NAME); + } + + public void testFlinkSource(boolean testWithFailure) throws Exception { + // Create some messages and put them into pubsub + List input = + Arrays.asList( + "One", "Two", "Three", "Four", "Five", "Six", "Seven", "Eight", "Nine", + "Ten"); + + List messagesToSend = new ArrayList<>(input); + + // Publish the messages into PubSub + Publisher publisher = pubsubHelper.createPublisher(PROJECT_NAME, TOPIC_NAME); + messagesToSend.forEach( + s -> { + try { + publisher + .publish( + PubsubMessage.newBuilder() + .setData(ByteString.copyFromUtf8(s)) + .build()) + .get(); + } catch (InterruptedException | ExecutionException e) { + e.printStackTrace(); + } + }); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(100); + env.setParallelism(1); + if (testWithFailure) { + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 2000)); + } else { + env.setRestartStrategy(RestartStrategies.noRestart()); + } + + PubSubSource source = + PubSubSource.builder() + .setDeserializationSchema(new SimpleStringSchema()) + .setProjectName(PROJECT_NAME) + .setSubscriptionName(SUBSCRIPTION_NAME) + .setCredentials(EmulatorCredentials.getInstance()) + .setPubSubSubscriberFactory( + new PubSubSubscriberFactoryForEmulator( + getPubSubHostPort(), + PROJECT_NAME, + SUBSCRIPTION_NAME, + 10, + Duration.ofSeconds(1), + 3)) + .build(); + + DataStream fromPubSub = + env.fromSource(source, WatermarkStrategy.noWatermarks(), "test-pubsub-new-source"); + + if (testWithFailure) { + fromPubSub = fromPubSub.map(new FailureMapFunction<>(3)); + } + + List output = new ArrayList<>(); + DataStreamUtils.collect(fromPubSub).forEachRemaining(output::add); + + assertEquals("Wrong number of elements", input.size(), output.size()); + for (String test : input) { + assertTrue("Missing " + test, output.contains(test)); + } + } + + private class FailureMapFunction extends RichMapFunction { + private final long numberOfRecordsUntilFailure; + private long numberOfRecordsProcessed; + + private FailureMapFunction(long numberOfRecordsBeforeFailure) { + this.numberOfRecordsUntilFailure = numberOfRecordsBeforeFailure; + } + + @Override + public T map(T value) throws Exception { + numberOfRecordsProcessed++; + + if (shouldThrowException()) { + throw new Exception( + "Deliberately thrown exception to induce crash for failure recovery testing."); + } + return value; + } + + private boolean shouldThrowException() { + return getRuntimeContext().getAttemptNumber() <= 1 + && (numberOfRecordsProcessed >= numberOfRecordsUntilFailure); + } + } + + // IMPORTANT: This test makes use of things that happen in the emulated PubSub that + // are GUARANTEED to be different in the real Google hosted PubSub. + // So running these tests against the real thing will have a very high probability of + // failing. + // The assumptions: + // 1) The ordering of the messages is maintained. + // 2) Exactly once: We assume that every message we put in comes out exactly once. + // In the real PubSub there are a lot of situations (mostly failure/retry) where this is not + // true. + @Test + public void testFlinkSourceOk() throws Exception { + testFlinkSource(false); + } + + @Test + public void testFlinkSourceFailure() throws Exception { + testFlinkSource(true); + } +}