From a412a0081c7e13af97654ec2c4d7694f0210854a Mon Sep 17 00:00:00 2001 From: jakob-ed <15202881+jakob-ed@users.noreply.github.com> Date: Mon, 18 Jan 2021 15:43:00 +0100 Subject: [PATCH 1/8] [FLINK-20625][pubsub,e2e] Add basic Pub/Sub source using new interface * WIP * WIP * Working WIP * Clean up * Place new Pub/Sub source into existing Pub/Sub connector module * Clean up * Apply Spotless code formatting [FLINK-20625][pubsub,e2e] Attempt to support stopping the reader when stopmark is encountered [FLINK-20625][pubsub,e2e] Add checkpointing and do some refactorings * Simplify fetching from Pub/Sub in SplitReader * Allow Pub/Sub source to be only continuous unbounded * Add basic PubSubSource builder * Add configuration options for SubscriberFactory to PubSubSource, remove unused collector * Add checkpointing [FLINK-20625][pubsub,e2e] Allow multiple records inside single Pub/Sub message for deserialization [FLINK-20625][pubsub,e2e] Add Javadocs, README and clean up [FLINK-20625][pubsub,e2e] Reduce visibility of classes and their members [FLINK-20625][pubsub,e2e] Propagate Pub/Sub subscriber creation errors from SplitReader [FLINK-20625][pubsub,e2e] Use constants for default Pub/Sub subscriber parameters [FLINK-20625][pubsub,e2e] Fix dynamic Scala version in artifact example [FLINK-20625][pubsub,e2e] Rename PubSubEnumeratorCheckpoint -> PubSubEnumeratorState [FLINK-20625][pubsub,e2e] Add version checks for deserialization [FLINK-20625][pubsub,e2e] Remove unnecessary declaration of exception-throwing [FLINK-20625][pubsub,e2e] Remove disfunctional end-of-stream logic [FLINK-20625][pubsub,e2e] Avoid concurrency issues with list of Pub/Sub messages to acknowledge [FLINK-20625][pubsub,e2e] Refactor PubSubSourceBuilder [FLINK-20625][pubsub,e2e] Clarify consistency guarantee description [FLINK-20625][pubsub,e2e] Clarify Pub/Sub request timeout [FLINK-20625][pubsub,e2e] Restructure and extend readme, add basic architecture info to docstring [FLINK-20625][pubsub,e2e] Attempt to solve concurrency issues with checkpointing --- .../pubsub/EmulatedPubSubNewSourceTest.java | 180 +++++++++++ flink-connector-gcp-pubsub/pom.xml | 6 + .../DefaultPubSubSubscriberFactory.java | 18 +- .../pubsub/DeserializationSchemaWrapper.java | 4 +- .../gcp/pubsub/source/PubSubSource.java | 303 ++++++++++++++++++ .../connectors/gcp/pubsub/source/README.md | 69 ++++ .../enumerator/PubSubEnumeratorState.java | 26 ++ .../PubSubEnumeratorStateSerializer.java | 54 ++++ .../enumerator/PubSubSourceEnumerator.java | 77 +++++ .../source/reader/PubSubRecordEmitter.java | 40 +++ .../reader/PubSubSourceFetcherManager.java | 120 +++++++ .../source/reader/PubSubSourceReader.java | 100 ++++++ .../source/reader/PubSubSplitReader.java | 184 +++++++++++ .../gcp/pubsub/source/split/PubSubSplit.java | 36 +++ .../source/split/PubSubSplitSerializer.java | 53 +++ .../pubsub/source/split/PubSubSplitState.java | 22 ++ 16 files changed, 1288 insertions(+), 4 deletions(-) create mode 100644 flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/EmulatedPubSubNewSourceTest.java create mode 100644 flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/PubSubSource.java create mode 100644 flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/README.md create mode 100644 flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/enumerator/PubSubEnumeratorState.java create mode 100644 flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/enumerator/PubSubEnumeratorStateSerializer.java create mode 100644 flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/enumerator/PubSubSourceEnumerator.java create mode 100644 flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/reader/PubSubRecordEmitter.java create mode 100644 flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/reader/PubSubSourceFetcherManager.java create mode 100644 flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/reader/PubSubSourceReader.java create mode 100644 flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/reader/PubSubSplitReader.java create mode 100644 flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/split/PubSubSplit.java create mode 100644 flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/split/PubSubSplitSerializer.java create mode 100644 flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/split/PubSubSplitState.java diff --git a/flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/EmulatedPubSubNewSourceTest.java b/flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/EmulatedPubSubNewSourceTest.java new file mode 100644 index 00000000..83ade583 --- /dev/null +++ b/flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/EmulatedPubSubNewSourceTest.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.gcp.pubsub; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamUtils; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.EmulatorCredentials; +import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.GCloudUnitTestBase; +import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.PubSubSubscriberFactoryForEmulator; +import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.PubsubHelper; +import org.apache.flink.streaming.connectors.gcp.pubsub.source.PubSubSource; + +import com.google.cloud.pubsub.v1.Publisher; +import com.google.protobuf.ByteString; +import com.google.pubsub.v1.PubsubMessage; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ExecutionException; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** Test of {@link PubSubSource} against the GCP Pub/Sub emulator SDK. */ +public class EmulatedPubSubNewSourceTest extends GCloudUnitTestBase { + private static final String PROJECT_NAME = "FLProject"; + private static final String TOPIC_NAME = "FLTopic"; + private static final String SUBSCRIPTION_NAME = "FLSubscription"; + + private static PubsubHelper pubsubHelper; + + @Before + public void setUp() throws Exception { + pubsubHelper = getPubsubHelper(); + pubsubHelper.createTopic(PROJECT_NAME, TOPIC_NAME); + pubsubHelper.createSubscription(PROJECT_NAME, SUBSCRIPTION_NAME, PROJECT_NAME, TOPIC_NAME); + } + + @After + public void tearDown() throws Exception { + pubsubHelper.deleteSubscription(PROJECT_NAME, SUBSCRIPTION_NAME); + pubsubHelper.deleteTopic(PROJECT_NAME, TOPIC_NAME); + } + + public void testFlinkSource(boolean testWithFailure) throws Exception { + // Create some messages and put them into pubsub + List input = + Arrays.asList( + "One", "Two", "Three", "Four", "Five", "Six", "Seven", "Eight", "Nine", + "Ten"); + + List messagesToSend = new ArrayList<>(input); + + // Publish the messages into PubSub + Publisher publisher = pubsubHelper.createPublisher(PROJECT_NAME, TOPIC_NAME); + messagesToSend.forEach( + s -> { + try { + publisher + .publish( + PubsubMessage.newBuilder() + .setData(ByteString.copyFromUtf8(s)) + .build()) + .get(); + } catch (InterruptedException | ExecutionException e) { + e.printStackTrace(); + } + }); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(100); + env.setParallelism(1); + if (testWithFailure) { + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 2000)); + } else { + env.setRestartStrategy(RestartStrategies.noRestart()); + } + + PubSubSource source = + PubSubSource.builder() + .setDeserializationSchema(new SimpleStringSchema()) + .setProjectName(PROJECT_NAME) + .setSubscriptionName(SUBSCRIPTION_NAME) + .setCredentials(EmulatorCredentials.getInstance()) + .setPubSubSubscriberFactory( + new PubSubSubscriberFactoryForEmulator( + getPubSubHostPort(), + PROJECT_NAME, + SUBSCRIPTION_NAME, + 10, + Duration.ofSeconds(1), + 3)) + .build(); + + DataStream fromPubSub = + env.fromSource(source, WatermarkStrategy.noWatermarks(), "test-pubsub-new-source"); + + if (testWithFailure) { + fromPubSub = fromPubSub.map(new FailureMapFunction<>(3)); + } + + List output = new ArrayList<>(); + DataStreamUtils.collect(fromPubSub).forEachRemaining(output::add); + + assertEquals("Wrong number of elements", input.size(), output.size()); + for (String test : input) { + assertTrue("Missing " + test, output.contains(test)); + } + } + + private class FailureMapFunction extends RichMapFunction { + private final long numberOfRecordsUntilFailure; + private long numberOfRecordsProcessed; + + private FailureMapFunction(long numberOfRecordsBeforeFailure) { + this.numberOfRecordsUntilFailure = numberOfRecordsBeforeFailure; + } + + @Override + public T map(T value) throws Exception { + numberOfRecordsProcessed++; + + if (shouldThrowException()) { + throw new Exception( + "Deliberately thrown exception to induce crash for failure recovery testing."); + } + return value; + } + + private boolean shouldThrowException() { + return getRuntimeContext().getAttemptNumber() <= 1 + && (numberOfRecordsProcessed >= numberOfRecordsUntilFailure); + } + } + + // IMPORTANT: This test makes use of things that happen in the emulated PubSub that + // are GUARANTEED to be different in the real Google hosted PubSub. + // So running these tests against the real thing will have a very high probability of + // failing. + // The assumptions: + // 1) The ordering of the messages is maintained. + // 2) Exactly once: We assume that every message we put in comes out exactly once. + // In the real PubSub there are a lot of situations (mostly failure/retry) where this is not + // true. + @Test + public void testFlinkSourceOk() throws Exception { + testFlinkSource(false); + } + + @Test + public void testFlinkSourceFailure() throws Exception { + testFlinkSource(true); + } +} diff --git a/flink-connector-gcp-pubsub/pom.xml b/flink-connector-gcp-pubsub/pom.xml index ee9299c2..0cc99da7 100644 --- a/flink-connector-gcp-pubsub/pom.xml +++ b/flink-connector-gcp-pubsub/pom.xml @@ -35,6 +35,12 @@ under the License. jar + + org.apache.flink + flink-connector-base + ${project.version} + + org.apache.flink flink-streaming-java diff --git a/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/DefaultPubSubSubscriberFactory.java b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/DefaultPubSubSubscriberFactory.java index 3c32ad5f..a0bf68b8 100644 --- a/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/DefaultPubSubSubscriberFactory.java +++ b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/DefaultPubSubSubscriberFactory.java @@ -20,6 +20,7 @@ import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriber; import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriberFactory; +import org.apache.flink.streaming.connectors.gcp.pubsub.source.PubSubSource; import com.google.auth.Credentials; import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings; @@ -34,13 +35,26 @@ import java.io.IOException; import java.time.Duration; -class DefaultPubSubSubscriberFactory implements PubSubSubscriberFactory { +/** + * A default {@link PubSubSubscriberFactory} used by the {@link PubSubSource.PubSubSourceBuilder} to + * obtain a subscriber with which messages can be pulled from GCP Pub/Sub. + */ +public class DefaultPubSubSubscriberFactory implements PubSubSubscriberFactory { private final int retries; private final Duration timeout; private final int maxMessagesPerPull; private final String projectSubscriptionName; - DefaultPubSubSubscriberFactory( + /** + * @param projectSubscriptionName The formatted name of the Pub/Sub project and subscription to + * pull messages from. Can be easily obtained through {@link + * com.google.pubsub.v1.ProjectSubscriptionName}. + * @param retries The number of times the reception of a message should be retried in case of + * failure. + * @param pullTimeout The timeout after which a message pull request is deemed a failure + * @param maxMessagesPerPull The maximum number of messages that should be pulled in one go. + */ + public DefaultPubSubSubscriberFactory( String projectSubscriptionName, int retries, Duration pullTimeout, diff --git a/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/DeserializationSchemaWrapper.java b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/DeserializationSchemaWrapper.java index 1e56df36..47228503 100644 --- a/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/DeserializationSchemaWrapper.java +++ b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/DeserializationSchemaWrapper.java @@ -29,10 +29,10 @@ * This class wraps a {@link DeserializationSchema} so it can be used in a {@link PubSubSource} as a * {@link PubSubDeserializationSchema}. */ -class DeserializationSchemaWrapper implements PubSubDeserializationSchema { +public class DeserializationSchemaWrapper implements PubSubDeserializationSchema { private final DeserializationSchema deserializationSchema; - DeserializationSchemaWrapper(DeserializationSchema deserializationSchema) { + public DeserializationSchemaWrapper(DeserializationSchema deserializationSchema) { this.deserializationSchema = deserializationSchema; } diff --git a/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/PubSubSource.java b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/PubSubSource.java new file mode 100644 index 00000000..5506d6ea --- /dev/null +++ b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/PubSubSource.java @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.gcp.pubsub.source; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.streaming.connectors.gcp.pubsub.DefaultPubSubSubscriberFactory; +import org.apache.flink.streaming.connectors.gcp.pubsub.DeserializationSchemaWrapper; +import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubDeserializationSchema; +import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriberFactory; +import org.apache.flink.streaming.connectors.gcp.pubsub.source.enumerator.PubSubEnumeratorState; +import org.apache.flink.streaming.connectors.gcp.pubsub.source.enumerator.PubSubEnumeratorStateSerializer; +import org.apache.flink.streaming.connectors.gcp.pubsub.source.enumerator.PubSubSourceEnumerator; +import org.apache.flink.streaming.connectors.gcp.pubsub.source.reader.PubSubRecordEmitter; +import org.apache.flink.streaming.connectors.gcp.pubsub.source.reader.PubSubSourceReader; +import org.apache.flink.streaming.connectors.gcp.pubsub.source.reader.PubSubSplitReader; +import org.apache.flink.streaming.connectors.gcp.pubsub.source.split.PubSubSplit; +import org.apache.flink.streaming.connectors.gcp.pubsub.source.split.PubSubSplitSerializer; +import org.apache.flink.util.Preconditions; + +import com.google.auth.Credentials; +import com.google.pubsub.v1.ProjectSubscriptionName; + +import java.io.IOException; +import java.time.Duration; +import java.util.Properties; +import java.util.function.Supplier; + +import static com.google.cloud.pubsub.v1.SubscriptionAdminSettings.defaultCredentialsProviderBuilder; + +/** + * A source implementation to pull messages from GCP Pub/Sub into Flink. + * + *

The {@link PubSubSourceEnumerator} assigns a static {@PubSubSplit} to every + * {@PubSubSourceReader} that joins. The split does not contain any split-specific information + * because Pub/Sub does not allow subscribers to specify a "range" of messages to pull by providing + * partitions or offsets. However, Pub/Sub will automatically load-balance messages between multiple + * readers which use the same subscription. + * + *

A {@link PubSubSource} can be constructed through the {@link PubSubSourceBuilder} like so: + * + *

{@code
+ * PubSubSource.builder()
+ *         // The deserialization schema to deserialize Pub/Sub messages
+ *         .setDeserializationSchema(new SimpleStringSchema())
+ *         // The name string of your Pub/Sub project
+ *         .setProjectName(PROJECT_NAME)
+ *         // The name string of the subscription you would like to receive messages from
+ *         .setSubscriptionName(SUBSCRIPTION_NAME)
+ *         // An instance of com.google.auth.Credentials to authenticate against Google Cloud
+ *         .setCredentials(CREDENTIALS)
+ *         .setPubSubSubscriberFactory(
+ *                 // The maximum number of messages that should be pulled in one go
+ *                 3,
+ *                 // The timeout after which a message pull request is deemed a failure
+ *                 Duration.ofSeconds(1),
+ *                 // The number of times the reception of a message should be retried in case of failure
+ *                 10)
+ *         .build();
+ * }
+ * + *

More details can be found at {@link PubSubSourceBuilder} + * + * @param The output type of the source. + */ +public class PubSubSource + implements Source, ResultTypeQueryable { + private final PubSubDeserializationSchema deserializationSchema; + private final PubSubSubscriberFactory pubSubSubscriberFactory; + private final Properties props; + private final Credentials credentials; + + private PubSubSource( + PubSubDeserializationSchema deserializationSchema, + PubSubSubscriberFactory pubSubSubscriberFactory, + Properties props, + Credentials credentials) { + this.deserializationSchema = deserializationSchema; + this.pubSubSubscriberFactory = pubSubSubscriberFactory; + this.props = props; + this.credentials = credentials; + } + + @Override + public Boundedness getBoundedness() { + return Boundedness.CONTINUOUS_UNBOUNDED; + } + + @Override + public SourceReader createReader(SourceReaderContext readerContext) { + FutureCompletingBlockingQueue>> elementsQueue = + new FutureCompletingBlockingQueue<>(); + Supplier> splitReaderSupplier = + () -> + new PubSubSplitReader<>( + deserializationSchema, pubSubSubscriberFactory, credentials); + PubSubRecordEmitter recordEmitter = new PubSubRecordEmitter<>(); + + return new PubSubSourceReader<>( + elementsQueue, + splitReaderSupplier, + recordEmitter, + toConfiguration(props), + readerContext); + } + + @Override + public SplitEnumerator createEnumerator( + SplitEnumeratorContext enumContext) { + return new PubSubSourceEnumerator(enumContext); + } + + @Override + public SplitEnumerator restoreEnumerator( + SplitEnumeratorContext enumContext, PubSubEnumeratorState checkpoint) { + return new PubSubSourceEnumerator(enumContext); + } + + @Override + public SimpleVersionedSerializer getSplitSerializer() { + return new PubSubSplitSerializer(); + } + + @Override + public SimpleVersionedSerializer getEnumeratorCheckpointSerializer() { + return new PubSubEnumeratorStateSerializer(); + } + + @Override + public TypeInformation getProducedType() { + return deserializationSchema.getProducedType(); + } + + /** + * Get a builder to build a {@link PubSubSource}. + * + * @return A builder for a {@link PubSubSource}. + */ + public static PubSubSourceBuilder builder() { + return new PubSubSourceBuilder<>(); + } + + /** @param */ + public static class PubSubSourceBuilder { + private static final int DEFAULT_PUBSUB_SUBSCRIBER_NUMBER_OF_RETRIES = 3; + private static final int DEFAULT_PUBSUB_SUBSCRIBER_MAX_MESSAGES_PER_PULL = 100; + + private PubSubDeserializationSchema deserializationSchema; + private String projectName; + private String subscriptionName; + private PubSubSubscriberFactory pubSubSubscriberFactory; + private Properties props; + private Credentials credentials; + + private PubSubSourceBuilder() { + this.props = new Properties(); + } + + /** + * Set the DeserializationSchema used to deserialize incoming Pub/Sub messages. Use any + * {@link DeserializationSchema} to use in the {@link PubSubSource}. The schema will be + * wrapped automatically for compatibility with the source. + * + * @param deserializationSchema a deserialization schema to use. + */ + public PubSubSourceBuilder setDeserializationSchema( + DeserializationSchema deserializationSchema) { + Preconditions.checkNotNull(deserializationSchema); + this.deserializationSchema = new DeserializationSchemaWrapper<>(deserializationSchema); + return this; + } + + /** + * Set the PubSubDeserializationSchema used to deserialize incoming Pub/Sub messages. + * + * @param deserializationSchema a deserialization schema to use. + */ + public PubSubSourceBuilder setDeserializationSchema( + PubSubDeserializationSchema deserializationSchema) { + Preconditions.checkNotNull(deserializationSchema); + this.deserializationSchema = deserializationSchema; + return this; + } + + /** @param projectName the name string of your Pub/Sub project */ + public PubSubSourceBuilder setProjectName(String projectName) { + Preconditions.checkNotNull(projectName); + this.projectName = projectName; + return this; + } + + /** + * @param subscriptionName the name string of the subscription you would like to receive + * messages from + */ + public PubSubSourceBuilder setSubscriptionName(String subscriptionName) { + Preconditions.checkNotNull(subscriptionName); + this.subscriptionName = subscriptionName; + return this; + } + + /** + * @param credentials an instance of {@com.google.auth.Credentials} to authenticate against + * Google Cloud + */ + public PubSubSourceBuilder setCredentials(Credentials credentials) { + this.credentials = credentials; + return this; + } + + /** @param pubSubSubscriberFactory a custom factory to create Pub/Sub subscribers from */ + public PubSubSourceBuilder setPubSubSubscriberFactory( + PubSubSubscriberFactory pubSubSubscriberFactory) { + this.pubSubSubscriberFactory = pubSubSubscriberFactory; + return this; + } + + /** + * Create a parameterized {@DefaultPubSubSubscriberFactory} and set it on the builder. + * + * @param maxMessagesPerPull The maximum number of messages that should be pulled in one go. + * @param perRequestTimeout The timeout per request from the subscriber + * @param retries The number of times the reception of a message should be retried in case + * of * failure. + */ + public PubSubSourceBuilder setPubSubSubscriberFactory( + int maxMessagesPerPull, Duration perRequestTimeout, int retries) { + this.pubSubSubscriberFactory = + new DefaultPubSubSubscriberFactory( + ProjectSubscriptionName.format(projectName, subscriptionName), + retries, + perRequestTimeout, + maxMessagesPerPull); + return this; + } + + public PubSubSourceBuilder setProps(Properties props) { + this.props = props; + return this; + } + + public PubSubSource build() throws IOException { + Preconditions.checkNotNull( + deserializationSchema, "Deserialization schema must be provided."); + Preconditions.checkNotNull( + projectName, "Google Cloud Pub/Sub projectName must be set."); + Preconditions.checkNotNull( + subscriptionName, "Google Cloud Pub/Sub subscriptionName must be set."); + + if (credentials == null) { + credentials = defaultCredentialsProviderBuilder().build().getCredentials(); + } + + if (pubSubSubscriberFactory == null) { + pubSubSubscriberFactory = + new DefaultPubSubSubscriberFactory( + ProjectSubscriptionName.format(projectName, subscriptionName), + DEFAULT_PUBSUB_SUBSCRIBER_NUMBER_OF_RETRIES, + Duration.ofSeconds(15), + DEFAULT_PUBSUB_SUBSCRIBER_MAX_MESSAGES_PER_PULL); + } + + return new PubSubSource( + deserializationSchema, pubSubSubscriberFactory, props, credentials); + } + } + + // ----------- private helper methods --------------- + + private Configuration toConfiguration(Properties props) { + Configuration config = new Configuration(); + props.stringPropertyNames().forEach(key -> config.setString(key, props.getProperty(key))); + return config; + } +} diff --git a/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/README.md b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/README.md new file mode 100644 index 00000000..fa927b23 --- /dev/null +++ b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/README.md @@ -0,0 +1,69 @@ +# Flink Source for Google Cloud Pub/Sub + +This is a source implementation for receiving Google Cloud Pub/Sub messages in Flink with an at-least-once guarantee. + +## Installation + +Add this dependency entry to your pom.xml to use the Google Cloud Pub/Sub source: + +```xml + + org.apache.flink + flink-connector-gcp-pubsub_${scala.binary.version} + 1.13-SNAPSHOT + +``` + +## Architecture + +### Split Enumerator + +An enumerator is supposed to discover splits and assign them to the readers so that they can do the actual reading. However, the implementation of the `PubSubSourceEnumerator` doesn't do any real work discovery because Pub/Sub doesn't expose any partitions from which splits could be constructed. Instead, the enumerator assigns a static `PubSubSplit` to every `PubSubSourceReader` that joins so that the readers can start pulling messages. The static source split doesn't contain split-specific information like partitions/offsets because this information can not be obtained from Pub/Sub. Because of the static source split, there is no state in the enumerator which would have to be snapshot when checkpointing. + +### Source Reader + +A `PubSubSourceReader` uses Pub/Sub's pull mechanism to read new messages from the Pub/Sub subscription specified by the user. In the case of parallel-running source readers in Flink, every source reader is passed the same source split from the enumerator. Because of this, all source readers use the same connection details and the same Pub/Sub subscription to receive messages. In this case, Pub/Sub automatically load-balances messages between all source readers which pull from the same subscription. This way, parallel processing is achieved in the source. The source reader is notified when a checkpoint completes so that it can trigger the acknowledgement of successfully received Pub/Sub messages through the split reader. As a result, when a checkpoint completes, all messages which were successfully pulled since the previous checkpoint are acknowledged to Pub/Sub to ensure they won't be redelivered. + +## Delivery Guarantee + +Google Cloud Pub/Sub only guarantees at-least-once message delivery. This guarantee is kept up by the source as well. To achieve this, the source makes use of Pub/Sub's expectation that a message should be acknowledged by the subscriber to signal that the message has been consumed successfully. Any message that has not been acknowledged yet will be automatically redelivered by Pub/Sub once an ack deadline has passed. + +When a checkpoint completes, all messages which were successfully pulled since the previous checkpoint are acknowledged to Pub/Sub. This ensures at-least-once delivery in the source because in the case of failure, non-checkpointed messages have not yet been acknowledged and will therefore be redelivered by the Pub/Sub service. + +## Usage + +To keep up the Google Cloud Pub/Sub at-least-once guarantee, messages are acknowledged against Pub/Sub when checkpointing succeeds. If a message is not acknowledged within an acknowledge deadline, Pub/Sub will attempt redelivery. To avoid unnecessary redelivery of successfully received messages, the checkpointing interval should always be configured (much) *lower* than the Google Cloud Pub/Sub acknowledge deadline. + +```java +import org.apache.flink.streaming.connectors.gcp.pubsub.source.PubSubSource; + +StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + +// Checkpointing must be enabled for the source to work so that messages can be acknowledged towards Pub/Sub +env.enableCheckpointing(1000); + +// Parallelism > 1 may be set +// env.setParallelism(4); + +PubSubSource source = + PubSubSource.builder() + // The deserialization schema to deserialize Pub/Sub messages + .setDeserializationSchema(new SimpleStringSchema()) + // The name string of your Pub/Sub project + .setProjectName(PROJECT_NAME) + // The name string of the subscription you would like to receive messages from + .setSubscriptionName(SUBSCRIPTION_NAME) + // An instance of the com.google.auth.Credentials class to authenticate against Google Cloud + .setCredentials(CREDENTIALS) + .setPubSubSubscriberFactory( + // The maximum number of messages that should be pulled in one go + 3, + // The timeout after which a message pull request is deemed a failure + Duration.ofSeconds(1), + // The number of times the reception of a message should be retried in case of failure + 10) + .build(); + +DataStream fromPubSub = + env.fromSource(source, WatermarkStrategy.noWatermarks(), "pubsub-source"); +``` diff --git a/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/enumerator/PubSubEnumeratorState.java b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/enumerator/PubSubEnumeratorState.java new file mode 100644 index 00000000..9d5fc06f --- /dev/null +++ b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/enumerator/PubSubEnumeratorState.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.gcp.pubsub.source.enumerator; + +/** + * A stub to contain the checkpoint data of a {@link PubSubSourceEnumerator}. GCP Pub/Sub does not + * expose any partitions or similar concepts which would need handling by the enumerator. Therefore, + * there are no offsets or other data that could be saved in a checkpoint. + */ +public class PubSubEnumeratorState {} diff --git a/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/enumerator/PubSubEnumeratorStateSerializer.java b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/enumerator/PubSubEnumeratorStateSerializer.java new file mode 100644 index 00000000..22d28231 --- /dev/null +++ b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/enumerator/PubSubEnumeratorStateSerializer.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.gcp.pubsub.source.enumerator; + +import org.apache.flink.core.io.SimpleVersionedSerializer; + +import java.io.IOException; + +/** + * A stub to serialize the contents of a {@link PubSubEnumeratorState}. Because no data is stored in + * such a checkpoint, no proper serialization is necessary. + */ +public class PubSubEnumeratorStateSerializer + implements SimpleVersionedSerializer { + private static final int CURRENT_VERSION = 0; + + @Override + public int getVersion() { + return CURRENT_VERSION; + } + + @Override + public byte[] serialize(PubSubEnumeratorState enumeratorCheckpoint) throws IOException { + return new byte[0]; + } + + @Override + public PubSubEnumeratorState deserialize(int version, byte[] serialized) throws IOException { + if (version == 0) { + return new PubSubEnumeratorState(); + } + throw new IOException( + String.format( + "The bytes are serialized with version %d, " + + "while this deserializer only supports version up to %d", + version, CURRENT_VERSION)); + } +} diff --git a/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/enumerator/PubSubSourceEnumerator.java b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/enumerator/PubSubSourceEnumerator.java new file mode 100644 index 00000000..c3c48a5f --- /dev/null +++ b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/enumerator/PubSubSourceEnumerator.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.gcp.pubsub.source.enumerator; + +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.streaming.connectors.gcp.pubsub.source.split.PubSubSplit; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.List; + +/** + * The enumerator for the {@link + * org.apache.flink.streaming.connectors.gcp.pubsub.source.PubSubSource}. It does not do any work + * discovery as envisioned by FLIP-27 because GCP Pub/Sub hides partitions and other implementation + * details. + */ +public class PubSubSourceEnumerator implements SplitEnumerator { + private final SplitEnumeratorContext context; + + public PubSubSourceEnumerator(SplitEnumeratorContext context) { + this.context = context; + } + + @Override + public void start() {} + + @Override + public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {} + + @Override + public void addSplitsBack(List splits, int subtaskId) {} + + /** + * When a new reader joins, the enumerator actively assigns it with a generic {@link + * PubSubSplit} so that it can start pulling messages. + * + * @param subtaskId the subtask ID of the new source reader. + */ + @Override + public void addReader(int subtaskId) { + context.assignSplit(new PubSubSplit(), subtaskId); + } + + /** + * In the case of GCP Pub/Sub, there's no state of the enumerator that could be captured in a + * checkpoint. + * + * @return A checkpoint stub. + * @throws Exception + */ + @Override + public PubSubEnumeratorState snapshotState() throws Exception { + return new PubSubEnumeratorState(); + } + + @Override + public void close() throws IOException {} +} diff --git a/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/reader/PubSubRecordEmitter.java b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/reader/PubSubRecordEmitter.java new file mode 100644 index 00000000..aad6cb95 --- /dev/null +++ b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/reader/PubSubRecordEmitter.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.gcp.pubsub.source.reader; + +import org.apache.flink.api.connector.source.SourceOutput; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.connector.base.source.reader.RecordEmitter; +import org.apache.flink.streaming.connectors.gcp.pubsub.source.split.PubSubSplitState; + +/** + * A custom {@link RecordEmitter} to emit a record which includes the data of the received GCP + * Pub/Sub message and the publication time of the message. + * + * @param The type of the record data to be emitted. + */ +public class PubSubRecordEmitter implements RecordEmitter, T, PubSubSplitState> { + + @Override + public void emitRecord( + Tuple2 element, SourceOutput output, PubSubSplitState splitState) + throws Exception { + output.collect(element.f0, element.f1); + } +} diff --git a/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/reader/PubSubSourceFetcherManager.java b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/reader/PubSubSourceFetcherManager.java new file mode 100644 index 00000000..5452b757 --- /dev/null +++ b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/reader/PubSubSourceFetcherManager.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.gcp.pubsub.source.reader; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager; +import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher; +import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherTask; +import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; +import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; +import org.apache.flink.streaming.connectors.gcp.pubsub.source.split.PubSubSplit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.function.Supplier; + +/** + * A custom {@link SingleThreadFetcherManager} so that the reception of GCP Pub/Sub messages can be + * acknowledged towards GCP Pub/Sub once they have been successfully checkpointed in Flink. As long + * as a received message has not been acknowledged, GCP Pub/Sub will attempt to deliver it again. + */ +class PubSubSourceFetcherManager + extends SingleThreadFetcherManager, PubSubSplit> { + private static final Logger LOG = LoggerFactory.getLogger(PubSubSourceFetcherManager.class); + + PubSubSourceFetcherManager( + FutureCompletingBlockingQueue>> elementsQueue, + Supplier, PubSubSplit>> splitReaderSupplier) { + super(elementsQueue, splitReaderSupplier); + } + + void prepareForAcknowledgement(long checkpointId) { + SplitFetcher, PubSubSplit> splitFetcher = fetchers.get(0); + + if (splitFetcher != null) { + enqueuePrepareForAcknowledgementTask(splitFetcher, checkpointId); + } else { + splitFetcher = createSplitFetcher(); + enqueuePrepareForAcknowledgementTask(splitFetcher, checkpointId); + startFetcher(splitFetcher); + } + } + + private void enqueuePrepareForAcknowledgementTask( + SplitFetcher, PubSubSplit> splitFetcher, long checkpointId) { + PubSubSplitReader pubSubSplitReader = + (PubSubSplitReader) splitFetcher.getSplitReader(); + + splitFetcher.enqueueTask( + new SplitFetcherTask() { + @Override + public boolean run() { + pubSubSplitReader.prepareForAcknowledgement(checkpointId); + return true; + } + + @Override + public void wakeUp() {} + }); + } + + /** + * Creates a {@link SplitFetcher} if there's none available yet and enqueues a task to + * acknowledge GCP Pub/Sub messages. + */ + void acknowledgeMessages(long checkpointId) { + SplitFetcher, PubSubSplit> splitFetcher = fetchers.get(0); + + if (splitFetcher != null) { + enqueueAcknowledgeMessagesTask(splitFetcher, checkpointId); + } else { + splitFetcher = createSplitFetcher(); + enqueueAcknowledgeMessagesTask(splitFetcher, checkpointId); + startFetcher(splitFetcher); + } + } + + /** + * Enqueues a task that, when run, notifies a {@link PubSubSplitReader} of a successful + * checkpoint so that GCP Pub/Sub messages received since the previous checkpoint can be + * acknowledged. + * + * @param splitFetcher the split fetcher on which the acknowledge task should be enqueued. + */ + private void enqueueAcknowledgeMessagesTask( + SplitFetcher, PubSubSplit> splitFetcher, long checkpointId) { + PubSubSplitReader pubSubSplitReader = + (PubSubSplitReader) splitFetcher.getSplitReader(); + + splitFetcher.enqueueTask( + new SplitFetcherTask() { + @Override + public boolean run() { + pubSubSplitReader.acknowledgeMessages(checkpointId); + return true; + } + + @Override + public void wakeUp() {} + }); + } +} diff --git a/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/reader/PubSubSourceReader.java b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/reader/PubSubSourceReader.java new file mode 100644 index 00000000..3d608f83 --- /dev/null +++ b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/reader/PubSubSourceReader.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.gcp.pubsub.source.reader; + +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.base.source.reader.RecordEmitter; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase; +import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; +import org.apache.flink.streaming.connectors.gcp.pubsub.source.split.PubSubSplit; +import org.apache.flink.streaming.connectors.gcp.pubsub.source.split.PubSubSplitState; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.function.Supplier; + +/** The source reader to read from GCP Pub/Sub. */ +public class PubSubSourceReader + extends SingleThreadMultiplexSourceReaderBase< + Tuple2, T, PubSubSplit, PubSubSplitState> { + private static final Logger LOG = LoggerFactory.getLogger(PubSubSourceReader.class); + + public PubSubSourceReader( + FutureCompletingBlockingQueue>> elementsQueue, + Supplier> splitReaderSupplier, + RecordEmitter, T, PubSubSplitState> recordEmitter, + Configuration config, + SourceReaderContext context) { + super( + elementsQueue, + new PubSubSourceFetcherManager<>(elementsQueue, splitReaderSupplier::get), + recordEmitter, + config, + context); + } + + @Override + protected void onSplitFinished(Map finishedSplitIds) {} + + @Override + public List snapshotState(long checkpointId) { + ((PubSubSourceFetcherManager) splitFetcherManager) + .prepareForAcknowledgement(checkpointId); + return Arrays.asList(new PubSubSplit()); + } + + /** + * Communicates with the {@link PubSubSourceFetcherManager} about the completion of a checkpoint + * so that messages received from GCP Pub/Sub can be acknowledged from a {@link + * PubSubSplitReader}. + * + * @param checkpointId the checkpoint ID. + * @throws Exception + */ + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception { + LOG.info("Acknowledging received GCP Pub/Sub messages for checkpoint {}", checkpointId); + ((PubSubSourceFetcherManager) splitFetcherManager).acknowledgeMessages(checkpointId); + } + + @Override + protected PubSubSplitState initializedState(PubSubSplit split) { + return new PubSubSplitState(); + } + + /** + * Simply returns a new instance of {@link PubSubSplit} because GCP Pub/Sub offers no control of + * pulling messages beyond the configuration of project name and subscription name. + * + * @param splitId the split ID + * @param splitState the split state + * @return a fresh instance of {@link PubSubSplit} + */ + @Override + protected PubSubSplit toSplitType(String splitId, PubSubSplitState splitState) { + return new PubSubSplit(); + } +} diff --git a/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/reader/PubSubSplitReader.java b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/reader/PubSubSplitReader.java new file mode 100644 index 00000000..769f60c0 --- /dev/null +++ b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/reader/PubSubSplitReader.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.gcp.pubsub.source.reader; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.connector.base.source.reader.RecordsBySplits; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; +import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange; +import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubDeserializationSchema; +import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriber; +import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriberFactory; +import org.apache.flink.streaming.connectors.gcp.pubsub.source.split.PubSubSplit; +import org.apache.flink.util.Collector; + +import com.google.auth.Credentials; +import com.google.pubsub.v1.ReceivedMessage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.SortedMap; +import java.util.TreeMap; + +/** + * A {@link SplitReader} to read from a given {@link PubSubSubscriber}. + * + * @param the type of the record. + */ +public class PubSubSplitReader implements SplitReader, PubSubSplit> { + private static final Logger LOG = LoggerFactory.getLogger(PubSubSplitReader.class); + private static final long UPCOMING_CHECKPOINT = 0; + private final PubSubDeserializationSchema deserializationSchema; + private final PubSubSubscriberFactory pubSubSubscriberFactory; + private final Credentials credentials; + private PubSubSubscriber subscriber; + private final PubSubCollector collector; + // Store the IDs of GCP Pub/Sub messages that yet have to be acknowledged so that they are not + // resent. Must be synchronized because it's accessed both by the fetcher and the reader thread. + private final SortedMap> messageIdsToAcknowledge = + Collections.synchronizedSortedMap(new TreeMap<>()); + + /** + * @param deserializationSchema a deserialization schema to apply to incoming message payloads. + * @param pubSubSubscriberFactory a factory from which a new subscriber can be created from + * @param credentials the credentials to use for creating a new subscriber + */ + public PubSubSplitReader( + PubSubDeserializationSchema deserializationSchema, + PubSubSubscriberFactory pubSubSubscriberFactory, + Credentials credentials) { + + this.deserializationSchema = deserializationSchema; + this.pubSubSubscriberFactory = pubSubSubscriberFactory; + this.credentials = credentials; + this.collector = new PubSubCollector(); + + this.messageIdsToAcknowledge.put(UPCOMING_CHECKPOINT, new ArrayList<>()); + } + + @Override + public RecordsWithSplitIds> fetch() throws IOException { + RecordsBySplits.Builder> recordsBySplits = new RecordsBySplits.Builder<>(); + if (subscriber == null) { + subscriber = pubSubSubscriberFactory.getSubscriber(credentials); + } + + for (ReceivedMessage receivedMessage : subscriber.pull()) { + try { + // Deserialize messages into a collector so that logic in the user-provided + // deserialization schema decides how to map GCP Pub/Sub messages to records in + // Flink. This allows e.g. batching together multiple Flink records in a single GCP + // Pub/Sub message. + deserializationSchema.deserialize(receivedMessage.getMessage(), collector); + collector + .getMessages() + .forEach( + message -> + recordsBySplits.add( + PubSubSplit.SPLIT_ID, + new Tuple2<>( + message, + // A timestamp provided by GCP Pub/Sub + // indicating when the message was initially + // published + receivedMessage + .getMessage() + .getPublishTime() + .getSeconds()))); + } catch (Exception e) { + throw new IOException("Failed to deserialize received message due to", e); + } finally { + collector.reset(); + } + + messageIdsToAcknowledge.get(UPCOMING_CHECKPOINT).add(receivedMessage.getAckId()); + } + + return recordsBySplits.build(); + } + + @Override + public void handleSplitsChanges(SplitsChange splitsChanges) {} + + @Override + public void wakeUp() {} + + @Override + public void close() throws Exception { + if (subscriber != null) { + subscriber.close(); + } + } + + private class PubSubCollector implements Collector { + private final List messages = new ArrayList<>(); + + @Override + public void collect(T message) { + messages.add(message); + } + + @Override + public void close() {} + + private List getMessages() { + return messages; + } + + private void reset() { + messages.clear(); + } + } + + // ------------------------------------------------------ + + void prepareForAcknowledgement(long checkpointId) { + synchronized (messageIdsToAcknowledge) { + messageIdsToAcknowledge.put( + checkpointId, messageIdsToAcknowledge.remove(UPCOMING_CHECKPOINT)); + messageIdsToAcknowledge.put(UPCOMING_CHECKPOINT, new ArrayList<>()); + } + } + + /** + * Acknowledge the reception of messages towards GCP Pub/Sub since the last checkpoint. As long + * as a received message has not been acknowledged, GCP Pub/Sub will attempt to deliver it + * again. + * + *

Calling this message is enqueued by the {@link PubSubSourceFetcherManager} on checkpoint. + */ + void acknowledgeMessages(long checkpointId) { + synchronized (messageIdsToAcknowledge) { + List messageIdsForCheckpoint = messageIdsToAcknowledge.get(checkpointId); + if (!messageIdsForCheckpoint.isEmpty() && subscriber != null) { + LOG.info( + "Acknowledging messages for checkpoint {} with IDs {}", + checkpointId, + messageIdsForCheckpoint); + subscriber.acknowledge(messageIdsForCheckpoint); + } + messageIdsToAcknowledge.remove(checkpointId); + } + } +} diff --git a/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/split/PubSubSplit.java b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/split/PubSubSplit.java new file mode 100644 index 00000000..30065b4c --- /dev/null +++ b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/split/PubSubSplit.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.gcp.pubsub.source.split; + +import org.apache.flink.api.connector.source.SourceSplit; + +/** + * A {@link SourceSplit} implementation for the {@link + * org.apache.flink.streaming.connectors.gcp.pubsub.source.PubSubSource}. It only saves a generic + * split ID because a split does not carry any specific information for pulling from a project + * subscription. That is because GCP Pub/Sub hides partitions and other implementation details. + */ +public class PubSubSplit implements SourceSplit { + public static final String SPLIT_ID = "0"; + + @Override + public String splitId() { + return SPLIT_ID; + } +} diff --git a/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/split/PubSubSplitSerializer.java b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/split/PubSubSplitSerializer.java new file mode 100644 index 00000000..ca0f9e0e --- /dev/null +++ b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/split/PubSubSplitSerializer.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.gcp.pubsub.source.split; + +import org.apache.flink.core.io.SimpleVersionedSerializer; + +import java.io.IOException; + +/** + * A stub to serialize instances of {@link PubSubSplit}. No real deserialization or serialization is + * carried out because of how generic the {@link PubSubSplit} is. + */ +public class PubSubSplitSerializer implements SimpleVersionedSerializer { + private static final int CURRENT_VERSION = 0; + + @Override + public int getVersion() { + return CURRENT_VERSION; + } + + @Override + public byte[] serialize(PubSubSplit obj) throws IOException { + return new byte[0]; + } + + @Override + public PubSubSplit deserialize(int version, byte[] serialized) throws IOException { + if (version == 0) { + return new PubSubSplit(); + } + throw new IOException( + String.format( + "The bytes are serialized with version %d, " + + "while this deserializer only supports version up to %d", + version, CURRENT_VERSION)); + } +} diff --git a/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/split/PubSubSplitState.java b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/split/PubSubSplitState.java new file mode 100644 index 00000000..d6194141 --- /dev/null +++ b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/split/PubSubSplitState.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.gcp.pubsub.source.split; + +/** A stub for the always empty state of a {@link PubSubSplit}. */ +public class PubSubSplitState extends PubSubSplit {} From 81775b05a7850bce6ea059dd7eda7b00a496a353 Mon Sep 17 00:00:00 2001 From: Ryan Skraba Date: Mon, 10 Jan 2022 10:22:36 +0100 Subject: [PATCH 2/8] [FLINK-20625][pubsub,e2e] Update parameter for EnumeratorState --- .../gcp/pubsub/source/enumerator/PubSubSourceEnumerator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/enumerator/PubSubSourceEnumerator.java b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/enumerator/PubSubSourceEnumerator.java index c3c48a5f..64d860d0 100644 --- a/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/enumerator/PubSubSourceEnumerator.java +++ b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/enumerator/PubSubSourceEnumerator.java @@ -68,7 +68,7 @@ public void addReader(int subtaskId) { * @throws Exception */ @Override - public PubSubEnumeratorState snapshotState() throws Exception { + public PubSubEnumeratorState snapshotState(long checkpointId) throws Exception { return new PubSubEnumeratorState(); } From e81bac423f0fee6cb8ac60d0cd73481bb466e660 Mon Sep 17 00:00:00 2001 From: Ryan Skraba Date: Thu, 17 Feb 2022 13:48:40 +0100 Subject: [PATCH 3/8] [FLINK-20625][pubsub] Align new source with Kafka package names --- .../gcp/pubsub/source => }/README.md | 0 .../gcp/pubsub/source/PubSubSource.java | 21 ++++++++++--------- .../enumerator/PubSubEnumeratorState.java | 5 +++-- .../PubSubEnumeratorStateSerializer.java | 5 +++-- .../enumerator/PubSubSourceEnumerator.java | 14 ++++++------- .../source/reader/PubSubRecordEmitter.java | 7 ++++--- .../reader/PubSubSourceFetcherManager.java | 7 ++++--- .../source/reader/PubSubSourceReader.java | 9 ++++---- .../source/reader/PubSubSplitReader.java | 7 ++++--- .../gcp/pubsub/source/split/PubSubSplit.java | 11 +++++----- .../source/split/PubSubSplitSerializer.java | 5 +++-- .../pubsub/source/split/PubSubSplitState.java | 5 +++-- .../DefaultPubSubSubscriberFactory.java | 2 +- .../gcp/pubsub/EmulatedPubSubSourceTest.java | 9 ++++---- 14 files changed, 59 insertions(+), 48 deletions(-) rename flink-connector-gcp-pubsub/{src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source => }/README.md (100%) rename flink-connector-gcp-pubsub/src/main/java/org/apache/flink/{streaming/connectors => connector}/gcp/pubsub/source/PubSubSource.java (93%) rename flink-connector-gcp-pubsub/src/main/java/org/apache/flink/{streaming/connectors => connector}/gcp/pubsub/source/enumerator/PubSubEnumeratorState.java (89%) rename flink-connector-gcp-pubsub/src/main/java/org/apache/flink/{streaming/connectors => connector}/gcp/pubsub/source/enumerator/PubSubEnumeratorStateSerializer.java (93%) rename flink-connector-gcp-pubsub/src/main/java/org/apache/flink/{streaming/connectors => connector}/gcp/pubsub/source/enumerator/PubSubSourceEnumerator.java (83%) rename flink-connector-gcp-pubsub/src/main/java/org/apache/flink/{streaming/connectors => connector}/gcp/pubsub/source/reader/PubSubRecordEmitter.java (87%) rename flink-connector-gcp-pubsub/src/main/java/org/apache/flink/{streaming/connectors => connector}/gcp/pubsub/source/reader/PubSubSourceFetcherManager.java (95%) rename flink-connector-gcp-pubsub/src/main/java/org/apache/flink/{streaming/connectors => connector}/gcp/pubsub/source/reader/PubSubSourceReader.java (92%) rename flink-connector-gcp-pubsub/src/main/java/org/apache/flink/{streaming/connectors => connector}/gcp/pubsub/source/reader/PubSubSplitReader.java (97%) rename flink-connector-gcp-pubsub/src/main/java/org/apache/flink/{streaming/connectors => connector}/gcp/pubsub/source/split/PubSubSplit.java (74%) rename flink-connector-gcp-pubsub/src/main/java/org/apache/flink/{streaming/connectors => connector}/gcp/pubsub/source/split/PubSubSplitSerializer.java (93%) rename flink-connector-gcp-pubsub/src/main/java/org/apache/flink/{streaming/connectors => connector}/gcp/pubsub/source/split/PubSubSplitState.java (87%) rename flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/EmulatedPubSubNewSourceTest.java => flink-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/connector/gcp/pubsub/EmulatedPubSubSourceTest.java (96%) diff --git a/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/README.md b/flink-connector-gcp-pubsub/README.md similarity index 100% rename from flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/README.md rename to flink-connector-gcp-pubsub/README.md diff --git a/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/PubSubSource.java b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/PubSubSource.java similarity index 93% rename from flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/PubSubSource.java rename to flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/PubSubSource.java index 5506d6ea..68ca5fea 100644 --- a/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/PubSubSource.java +++ b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/PubSubSource.java @@ -7,16 +7,17 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * https://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. + * */ -package org.apache.flink.streaming.connectors.gcp.pubsub.source; +package org.apache.flink.connector.gcp.pubsub.source; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -31,19 +32,19 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; +import org.apache.flink.connector.gcp.pubsub.source.enumerator.PubSubEnumeratorState; +import org.apache.flink.connector.gcp.pubsub.source.enumerator.PubSubEnumeratorStateSerializer; +import org.apache.flink.connector.gcp.pubsub.source.enumerator.PubSubSourceEnumerator; +import org.apache.flink.connector.gcp.pubsub.source.reader.PubSubRecordEmitter; +import org.apache.flink.connector.gcp.pubsub.source.reader.PubSubSourceReader; +import org.apache.flink.connector.gcp.pubsub.source.reader.PubSubSplitReader; +import org.apache.flink.connector.gcp.pubsub.source.split.PubSubSplit; +import org.apache.flink.connector.gcp.pubsub.source.split.PubSubSplitSerializer; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.streaming.connectors.gcp.pubsub.DefaultPubSubSubscriberFactory; import org.apache.flink.streaming.connectors.gcp.pubsub.DeserializationSchemaWrapper; import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubDeserializationSchema; import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriberFactory; -import org.apache.flink.streaming.connectors.gcp.pubsub.source.enumerator.PubSubEnumeratorState; -import org.apache.flink.streaming.connectors.gcp.pubsub.source.enumerator.PubSubEnumeratorStateSerializer; -import org.apache.flink.streaming.connectors.gcp.pubsub.source.enumerator.PubSubSourceEnumerator; -import org.apache.flink.streaming.connectors.gcp.pubsub.source.reader.PubSubRecordEmitter; -import org.apache.flink.streaming.connectors.gcp.pubsub.source.reader.PubSubSourceReader; -import org.apache.flink.streaming.connectors.gcp.pubsub.source.reader.PubSubSplitReader; -import org.apache.flink.streaming.connectors.gcp.pubsub.source.split.PubSubSplit; -import org.apache.flink.streaming.connectors.gcp.pubsub.source.split.PubSubSplitSerializer; import org.apache.flink.util.Preconditions; import com.google.auth.Credentials; diff --git a/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/enumerator/PubSubEnumeratorState.java b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/enumerator/PubSubEnumeratorState.java similarity index 89% rename from flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/enumerator/PubSubEnumeratorState.java rename to flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/enumerator/PubSubEnumeratorState.java index 9d5fc06f..bde9e457 100644 --- a/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/enumerator/PubSubEnumeratorState.java +++ b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/enumerator/PubSubEnumeratorState.java @@ -7,16 +7,17 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * https://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. + * */ -package org.apache.flink.streaming.connectors.gcp.pubsub.source.enumerator; +package org.apache.flink.connector.gcp.pubsub.source.enumerator; /** * A stub to contain the checkpoint data of a {@link PubSubSourceEnumerator}. GCP Pub/Sub does not diff --git a/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/enumerator/PubSubEnumeratorStateSerializer.java b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/enumerator/PubSubEnumeratorStateSerializer.java similarity index 93% rename from flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/enumerator/PubSubEnumeratorStateSerializer.java rename to flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/enumerator/PubSubEnumeratorStateSerializer.java index 22d28231..6b06dc81 100644 --- a/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/enumerator/PubSubEnumeratorStateSerializer.java +++ b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/enumerator/PubSubEnumeratorStateSerializer.java @@ -7,16 +7,17 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * https://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. + * */ -package org.apache.flink.streaming.connectors.gcp.pubsub.source.enumerator; +package org.apache.flink.connector.gcp.pubsub.source.enumerator; import org.apache.flink.core.io.SimpleVersionedSerializer; diff --git a/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/enumerator/PubSubSourceEnumerator.java b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/enumerator/PubSubSourceEnumerator.java similarity index 83% rename from flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/enumerator/PubSubSourceEnumerator.java rename to flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/enumerator/PubSubSourceEnumerator.java index 64d860d0..300a50c9 100644 --- a/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/enumerator/PubSubSourceEnumerator.java +++ b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/enumerator/PubSubSourceEnumerator.java @@ -7,20 +7,22 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * https://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. + * */ -package org.apache.flink.streaming.connectors.gcp.pubsub.source.enumerator; +package org.apache.flink.connector.gcp.pubsub.source.enumerator; import org.apache.flink.api.connector.source.SplitEnumerator; import org.apache.flink.api.connector.source.SplitEnumeratorContext; -import org.apache.flink.streaming.connectors.gcp.pubsub.source.split.PubSubSplit; +import org.apache.flink.connector.gcp.pubsub.source.PubSubSource; +import org.apache.flink.connector.gcp.pubsub.source.split.PubSubSplit; import javax.annotation.Nullable; @@ -28,10 +30,8 @@ import java.util.List; /** - * The enumerator for the {@link - * org.apache.flink.streaming.connectors.gcp.pubsub.source.PubSubSource}. It does not do any work - * discovery as envisioned by FLIP-27 because GCP Pub/Sub hides partitions and other implementation - * details. + * The enumerator for the {@link PubSubSource}. It does not do any work discovery as envisioned by + * FLIP-27 because GCP Pub/Sub hides partitions and other implementation details. */ public class PubSubSourceEnumerator implements SplitEnumerator { private final SplitEnumeratorContext context; diff --git a/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/reader/PubSubRecordEmitter.java b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/reader/PubSubRecordEmitter.java similarity index 87% rename from flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/reader/PubSubRecordEmitter.java rename to flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/reader/PubSubRecordEmitter.java index aad6cb95..311743c5 100644 --- a/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/reader/PubSubRecordEmitter.java +++ b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/reader/PubSubRecordEmitter.java @@ -7,21 +7,22 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * https://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. + * */ -package org.apache.flink.streaming.connectors.gcp.pubsub.source.reader; +package org.apache.flink.connector.gcp.pubsub.source.reader; import org.apache.flink.api.connector.source.SourceOutput; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.connector.base.source.reader.RecordEmitter; -import org.apache.flink.streaming.connectors.gcp.pubsub.source.split.PubSubSplitState; +import org.apache.flink.connector.gcp.pubsub.source.split.PubSubSplitState; /** * A custom {@link RecordEmitter} to emit a record which includes the data of the received GCP diff --git a/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/reader/PubSubSourceFetcherManager.java b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/reader/PubSubSourceFetcherManager.java similarity index 95% rename from flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/reader/PubSubSourceFetcherManager.java rename to flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/reader/PubSubSourceFetcherManager.java index 5452b757..36893438 100644 --- a/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/reader/PubSubSourceFetcherManager.java +++ b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/reader/PubSubSourceFetcherManager.java @@ -7,16 +7,17 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * https://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. + * */ -package org.apache.flink.streaming.connectors.gcp.pubsub.source.reader; +package org.apache.flink.connector.gcp.pubsub.source.reader; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; @@ -25,7 +26,7 @@ import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherTask; import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; -import org.apache.flink.streaming.connectors.gcp.pubsub.source.split.PubSubSplit; +import org.apache.flink.connector.gcp.pubsub.source.split.PubSubSplit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/reader/PubSubSourceReader.java b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/reader/PubSubSourceReader.java similarity index 92% rename from flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/reader/PubSubSourceReader.java rename to flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/reader/PubSubSourceReader.java index 3d608f83..76363e3b 100644 --- a/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/reader/PubSubSourceReader.java +++ b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/reader/PubSubSourceReader.java @@ -7,16 +7,17 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * https://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. + * */ -package org.apache.flink.streaming.connectors.gcp.pubsub.source.reader; +package org.apache.flink.connector.gcp.pubsub.source.reader; import org.apache.flink.api.connector.source.SourceReaderContext; import org.apache.flink.api.java.tuple.Tuple2; @@ -25,8 +26,8 @@ import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase; import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; -import org.apache.flink.streaming.connectors.gcp.pubsub.source.split.PubSubSplit; -import org.apache.flink.streaming.connectors.gcp.pubsub.source.split.PubSubSplitState; +import org.apache.flink.connector.gcp.pubsub.source.split.PubSubSplit; +import org.apache.flink.connector.gcp.pubsub.source.split.PubSubSplitState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/reader/PubSubSplitReader.java b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/reader/PubSubSplitReader.java similarity index 97% rename from flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/reader/PubSubSplitReader.java rename to flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/reader/PubSubSplitReader.java index 769f60c0..2c99325d 100644 --- a/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/reader/PubSubSplitReader.java +++ b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/reader/PubSubSplitReader.java @@ -7,26 +7,27 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * https://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. + * */ -package org.apache.flink.streaming.connectors.gcp.pubsub.source.reader; +package org.apache.flink.connector.gcp.pubsub.source.reader; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.connector.base.source.reader.RecordsBySplits; import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange; +import org.apache.flink.connector.gcp.pubsub.source.split.PubSubSplit; import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubDeserializationSchema; import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriber; import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriberFactory; -import org.apache.flink.streaming.connectors.gcp.pubsub.source.split.PubSubSplit; import org.apache.flink.util.Collector; import com.google.auth.Credentials; diff --git a/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/split/PubSubSplit.java b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/split/PubSubSplit.java similarity index 74% rename from flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/split/PubSubSplit.java rename to flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/split/PubSubSplit.java index 30065b4c..6f2a6dc0 100644 --- a/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/split/PubSubSplit.java +++ b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/split/PubSubSplit.java @@ -7,23 +7,24 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * https://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. + * */ -package org.apache.flink.streaming.connectors.gcp.pubsub.source.split; +package org.apache.flink.connector.gcp.pubsub.source.split; import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.connector.gcp.pubsub.source.PubSubSource; /** - * A {@link SourceSplit} implementation for the {@link - * org.apache.flink.streaming.connectors.gcp.pubsub.source.PubSubSource}. It only saves a generic - * split ID because a split does not carry any specific information for pulling from a project + * A {@link SourceSplit} implementation for the {@link PubSubSource}. It only saves a generic split + * ID because a split does not carry any specific information for pulling from a project * subscription. That is because GCP Pub/Sub hides partitions and other implementation details. */ public class PubSubSplit implements SourceSplit { diff --git a/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/split/PubSubSplitSerializer.java b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/split/PubSubSplitSerializer.java similarity index 93% rename from flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/split/PubSubSplitSerializer.java rename to flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/split/PubSubSplitSerializer.java index ca0f9e0e..42c1fc7d 100644 --- a/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/split/PubSubSplitSerializer.java +++ b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/split/PubSubSplitSerializer.java @@ -7,16 +7,17 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * https://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. + * */ -package org.apache.flink.streaming.connectors.gcp.pubsub.source.split; +package org.apache.flink.connector.gcp.pubsub.source.split; import org.apache.flink.core.io.SimpleVersionedSerializer; diff --git a/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/split/PubSubSplitState.java b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/split/PubSubSplitState.java similarity index 87% rename from flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/split/PubSubSplitState.java rename to flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/split/PubSubSplitState.java index d6194141..ca4cf268 100644 --- a/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/source/split/PubSubSplitState.java +++ b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/split/PubSubSplitState.java @@ -7,16 +7,17 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * https://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. + * */ -package org.apache.flink.streaming.connectors.gcp.pubsub.source.split; +package org.apache.flink.connector.gcp.pubsub.source.split; /** A stub for the always empty state of a {@link PubSubSplit}. */ public class PubSubSplitState extends PubSubSplit {} diff --git a/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/DefaultPubSubSubscriberFactory.java b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/DefaultPubSubSubscriberFactory.java index a0bf68b8..fb6a5256 100644 --- a/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/DefaultPubSubSubscriberFactory.java +++ b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/DefaultPubSubSubscriberFactory.java @@ -18,9 +18,9 @@ package org.apache.flink.streaming.connectors.gcp.pubsub; +import org.apache.flink.connector.gcp.pubsub.source.PubSubSource; import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriber; import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriberFactory; -import org.apache.flink.streaming.connectors.gcp.pubsub.source.PubSubSource; import com.google.auth.Credentials; import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings; diff --git a/flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/EmulatedPubSubNewSourceTest.java b/flink-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/connector/gcp/pubsub/EmulatedPubSubSourceTest.java similarity index 96% rename from flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/EmulatedPubSubNewSourceTest.java rename to flink-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/connector/gcp/pubsub/EmulatedPubSubSourceTest.java index 83ade583..8d9a421a 100644 --- a/flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/EmulatedPubSubNewSourceTest.java +++ b/flink-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/connector/gcp/pubsub/EmulatedPubSubSourceTest.java @@ -7,21 +7,23 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * https://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. + * */ -package org.apache.flink.streaming.connectors.gcp.pubsub; +package org.apache.flink.connector.gcp.pubsub; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.connector.gcp.pubsub.source.PubSubSource; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamUtils; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -29,7 +31,6 @@ import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.GCloudUnitTestBase; import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.PubSubSubscriberFactoryForEmulator; import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.PubsubHelper; -import org.apache.flink.streaming.connectors.gcp.pubsub.source.PubSubSource; import com.google.cloud.pubsub.v1.Publisher; import com.google.protobuf.ByteString; @@ -48,7 +49,7 @@ import static org.junit.Assert.assertTrue; /** Test of {@link PubSubSource} against the GCP Pub/Sub emulator SDK. */ -public class EmulatedPubSubNewSourceTest extends GCloudUnitTestBase { +public class EmulatedPubSubSourceTest extends GCloudUnitTestBase { private static final String PROJECT_NAME = "FLProject"; private static final String TOPIC_NAME = "FLTopic"; private static final String SUBSCRIPTION_NAME = "FLSubscription"; From 459d773189c30ad5416af82375117dd530c30f9f Mon Sep 17 00:00:00 2001 From: Ryan Skraba Date: Thu, 17 Feb 2022 13:50:47 +0100 Subject: [PATCH 4/8] [FLINK-20625] Fix end-to-end test stopping condition --- .../connector/gcp/pubsub/EmulatedPubSubSourceTest.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/flink-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/connector/gcp/pubsub/EmulatedPubSubSourceTest.java b/flink-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/connector/gcp/pubsub/EmulatedPubSubSourceTest.java index 8d9a421a..f3dadc59 100644 --- a/flink-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/connector/gcp/pubsub/EmulatedPubSubSourceTest.java +++ b/flink-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/connector/gcp/pubsub/EmulatedPubSubSourceTest.java @@ -25,7 +25,6 @@ import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.connector.gcp.pubsub.source.PubSubSource; import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.DataStreamUtils; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.EmulatorCredentials; import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.GCloudUnitTestBase; @@ -126,8 +125,9 @@ public void testFlinkSource(boolean testWithFailure) throws Exception { fromPubSub = fromPubSub.map(new FailureMapFunction<>(3)); } - List output = new ArrayList<>(); - DataStreamUtils.collect(fromPubSub).forEachRemaining(output::add); + // Asking for any more elements would wait forever, and there isn't a graceful way to + // indicate end of stream. + List output = fromPubSub.executeAndCollect(input.size()); assertEquals("Wrong number of elements", input.size(), output.size()); for (String test : input) { @@ -135,7 +135,7 @@ public void testFlinkSource(boolean testWithFailure) throws Exception { } } - private class FailureMapFunction extends RichMapFunction { + private static class FailureMapFunction extends RichMapFunction { private final long numberOfRecordsUntilFailure; private long numberOfRecordsProcessed; From 3b33d92aca33f5e93bcf03fa724651eb4b6eb25b Mon Sep 17 00:00:00 2001 From: Ryan Skraba Date: Thu, 17 Feb 2022 14:24:16 +0100 Subject: [PATCH 5/8] [docs] Minor documentation fixes --- .../gcp/pubsub/EmulatedPubSubSourceTest.java | 181 ++++++++++++++++++ .../EmulatedPubSubNewSourceTest.java.orig | 180 +++++++++++++++++ flink-connector-gcp-pubsub/README.md | 49 ++++- flink-connector-gcp-pubsub/pom.xml | 2 +- .../gcp/pubsub/source/PubSubSource.java | 8 +- .../pubsub/EmulatedPubSubSourceTest.java.orig | 181 ++++++++++++++++++ 6 files changed, 586 insertions(+), 15 deletions(-) create mode 100644 flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/connector/gcp/pubsub/EmulatedPubSubSourceTest.java create mode 100644 flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/EmulatedPubSubNewSourceTest.java.orig create mode 100644 flink-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/connector/gcp/pubsub/EmulatedPubSubSourceTest.java.orig diff --git a/flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/connector/gcp/pubsub/EmulatedPubSubSourceTest.java b/flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/connector/gcp/pubsub/EmulatedPubSubSourceTest.java new file mode 100644 index 00000000..f3dadc59 --- /dev/null +++ b/flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/connector/gcp/pubsub/EmulatedPubSubSourceTest.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.connector.gcp.pubsub; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.connector.gcp.pubsub.source.PubSubSource; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.EmulatorCredentials; +import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.GCloudUnitTestBase; +import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.PubSubSubscriberFactoryForEmulator; +import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.PubsubHelper; + +import com.google.cloud.pubsub.v1.Publisher; +import com.google.protobuf.ByteString; +import com.google.pubsub.v1.PubsubMessage; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ExecutionException; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** Test of {@link PubSubSource} against the GCP Pub/Sub emulator SDK. */ +public class EmulatedPubSubSourceTest extends GCloudUnitTestBase { + private static final String PROJECT_NAME = "FLProject"; + private static final String TOPIC_NAME = "FLTopic"; + private static final String SUBSCRIPTION_NAME = "FLSubscription"; + + private static PubsubHelper pubsubHelper; + + @Before + public void setUp() throws Exception { + pubsubHelper = getPubsubHelper(); + pubsubHelper.createTopic(PROJECT_NAME, TOPIC_NAME); + pubsubHelper.createSubscription(PROJECT_NAME, SUBSCRIPTION_NAME, PROJECT_NAME, TOPIC_NAME); + } + + @After + public void tearDown() throws Exception { + pubsubHelper.deleteSubscription(PROJECT_NAME, SUBSCRIPTION_NAME); + pubsubHelper.deleteTopic(PROJECT_NAME, TOPIC_NAME); + } + + public void testFlinkSource(boolean testWithFailure) throws Exception { + // Create some messages and put them into pubsub + List input = + Arrays.asList( + "One", "Two", "Three", "Four", "Five", "Six", "Seven", "Eight", "Nine", + "Ten"); + + List messagesToSend = new ArrayList<>(input); + + // Publish the messages into PubSub + Publisher publisher = pubsubHelper.createPublisher(PROJECT_NAME, TOPIC_NAME); + messagesToSend.forEach( + s -> { + try { + publisher + .publish( + PubsubMessage.newBuilder() + .setData(ByteString.copyFromUtf8(s)) + .build()) + .get(); + } catch (InterruptedException | ExecutionException e) { + e.printStackTrace(); + } + }); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(100); + env.setParallelism(1); + if (testWithFailure) { + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 2000)); + } else { + env.setRestartStrategy(RestartStrategies.noRestart()); + } + + PubSubSource source = + PubSubSource.builder() + .setDeserializationSchema(new SimpleStringSchema()) + .setProjectName(PROJECT_NAME) + .setSubscriptionName(SUBSCRIPTION_NAME) + .setCredentials(EmulatorCredentials.getInstance()) + .setPubSubSubscriberFactory( + new PubSubSubscriberFactoryForEmulator( + getPubSubHostPort(), + PROJECT_NAME, + SUBSCRIPTION_NAME, + 10, + Duration.ofSeconds(1), + 3)) + .build(); + + DataStream fromPubSub = + env.fromSource(source, WatermarkStrategy.noWatermarks(), "test-pubsub-new-source"); + + if (testWithFailure) { + fromPubSub = fromPubSub.map(new FailureMapFunction<>(3)); + } + + // Asking for any more elements would wait forever, and there isn't a graceful way to + // indicate end of stream. + List output = fromPubSub.executeAndCollect(input.size()); + + assertEquals("Wrong number of elements", input.size(), output.size()); + for (String test : input) { + assertTrue("Missing " + test, output.contains(test)); + } + } + + private static class FailureMapFunction extends RichMapFunction { + private final long numberOfRecordsUntilFailure; + private long numberOfRecordsProcessed; + + private FailureMapFunction(long numberOfRecordsBeforeFailure) { + this.numberOfRecordsUntilFailure = numberOfRecordsBeforeFailure; + } + + @Override + public T map(T value) throws Exception { + numberOfRecordsProcessed++; + + if (shouldThrowException()) { + throw new Exception( + "Deliberately thrown exception to induce crash for failure recovery testing."); + } + return value; + } + + private boolean shouldThrowException() { + return getRuntimeContext().getAttemptNumber() <= 1 + && (numberOfRecordsProcessed >= numberOfRecordsUntilFailure); + } + } + + // IMPORTANT: This test makes use of things that happen in the emulated PubSub that + // are GUARANTEED to be different in the real Google hosted PubSub. + // So running these tests against the real thing will have a very high probability of + // failing. + // The assumptions: + // 1) The ordering of the messages is maintained. + // 2) Exactly once: We assume that every message we put in comes out exactly once. + // In the real PubSub there are a lot of situations (mostly failure/retry) where this is not + // true. + @Test + public void testFlinkSourceOk() throws Exception { + testFlinkSource(false); + } + + @Test + public void testFlinkSourceFailure() throws Exception { + testFlinkSource(true); + } +} diff --git a/flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/EmulatedPubSubNewSourceTest.java.orig b/flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/EmulatedPubSubNewSourceTest.java.orig new file mode 100644 index 00000000..83ade583 --- /dev/null +++ b/flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/EmulatedPubSubNewSourceTest.java.orig @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.gcp.pubsub; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamUtils; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.EmulatorCredentials; +import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.GCloudUnitTestBase; +import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.PubSubSubscriberFactoryForEmulator; +import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.PubsubHelper; +import org.apache.flink.streaming.connectors.gcp.pubsub.source.PubSubSource; + +import com.google.cloud.pubsub.v1.Publisher; +import com.google.protobuf.ByteString; +import com.google.pubsub.v1.PubsubMessage; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ExecutionException; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** Test of {@link PubSubSource} against the GCP Pub/Sub emulator SDK. */ +public class EmulatedPubSubNewSourceTest extends GCloudUnitTestBase { + private static final String PROJECT_NAME = "FLProject"; + private static final String TOPIC_NAME = "FLTopic"; + private static final String SUBSCRIPTION_NAME = "FLSubscription"; + + private static PubsubHelper pubsubHelper; + + @Before + public void setUp() throws Exception { + pubsubHelper = getPubsubHelper(); + pubsubHelper.createTopic(PROJECT_NAME, TOPIC_NAME); + pubsubHelper.createSubscription(PROJECT_NAME, SUBSCRIPTION_NAME, PROJECT_NAME, TOPIC_NAME); + } + + @After + public void tearDown() throws Exception { + pubsubHelper.deleteSubscription(PROJECT_NAME, SUBSCRIPTION_NAME); + pubsubHelper.deleteTopic(PROJECT_NAME, TOPIC_NAME); + } + + public void testFlinkSource(boolean testWithFailure) throws Exception { + // Create some messages and put them into pubsub + List input = + Arrays.asList( + "One", "Two", "Three", "Four", "Five", "Six", "Seven", "Eight", "Nine", + "Ten"); + + List messagesToSend = new ArrayList<>(input); + + // Publish the messages into PubSub + Publisher publisher = pubsubHelper.createPublisher(PROJECT_NAME, TOPIC_NAME); + messagesToSend.forEach( + s -> { + try { + publisher + .publish( + PubsubMessage.newBuilder() + .setData(ByteString.copyFromUtf8(s)) + .build()) + .get(); + } catch (InterruptedException | ExecutionException e) { + e.printStackTrace(); + } + }); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(100); + env.setParallelism(1); + if (testWithFailure) { + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 2000)); + } else { + env.setRestartStrategy(RestartStrategies.noRestart()); + } + + PubSubSource source = + PubSubSource.builder() + .setDeserializationSchema(new SimpleStringSchema()) + .setProjectName(PROJECT_NAME) + .setSubscriptionName(SUBSCRIPTION_NAME) + .setCredentials(EmulatorCredentials.getInstance()) + .setPubSubSubscriberFactory( + new PubSubSubscriberFactoryForEmulator( + getPubSubHostPort(), + PROJECT_NAME, + SUBSCRIPTION_NAME, + 10, + Duration.ofSeconds(1), + 3)) + .build(); + + DataStream fromPubSub = + env.fromSource(source, WatermarkStrategy.noWatermarks(), "test-pubsub-new-source"); + + if (testWithFailure) { + fromPubSub = fromPubSub.map(new FailureMapFunction<>(3)); + } + + List output = new ArrayList<>(); + DataStreamUtils.collect(fromPubSub).forEachRemaining(output::add); + + assertEquals("Wrong number of elements", input.size(), output.size()); + for (String test : input) { + assertTrue("Missing " + test, output.contains(test)); + } + } + + private class FailureMapFunction extends RichMapFunction { + private final long numberOfRecordsUntilFailure; + private long numberOfRecordsProcessed; + + private FailureMapFunction(long numberOfRecordsBeforeFailure) { + this.numberOfRecordsUntilFailure = numberOfRecordsBeforeFailure; + } + + @Override + public T map(T value) throws Exception { + numberOfRecordsProcessed++; + + if (shouldThrowException()) { + throw new Exception( + "Deliberately thrown exception to induce crash for failure recovery testing."); + } + return value; + } + + private boolean shouldThrowException() { + return getRuntimeContext().getAttemptNumber() <= 1 + && (numberOfRecordsProcessed >= numberOfRecordsUntilFailure); + } + } + + // IMPORTANT: This test makes use of things that happen in the emulated PubSub that + // are GUARANTEED to be different in the real Google hosted PubSub. + // So running these tests against the real thing will have a very high probability of + // failing. + // The assumptions: + // 1) The ordering of the messages is maintained. + // 2) Exactly once: We assume that every message we put in comes out exactly once. + // In the real PubSub there are a lot of situations (mostly failure/retry) where this is not + // true. + @Test + public void testFlinkSourceOk() throws Exception { + testFlinkSource(false); + } + + @Test + public void testFlinkSourceFailure() throws Exception { + testFlinkSource(true); + } +} diff --git a/flink-connector-gcp-pubsub/README.md b/flink-connector-gcp-pubsub/README.md index fa927b23..06f2dfdc 100644 --- a/flink-connector-gcp-pubsub/README.md +++ b/flink-connector-gcp-pubsub/README.md @@ -1,16 +1,18 @@ # Flink Source for Google Cloud Pub/Sub -This is a source implementation for receiving Google Cloud Pub/Sub messages in Flink with an at-least-once guarantee. +This is a source implementation for receiving Google Cloud Pub/Sub messages in Flink with an +at-least-once guarantee. ## Installation Add this dependency entry to your pom.xml to use the Google Cloud Pub/Sub source: ```xml + - org.apache.flink - flink-connector-gcp-pubsub_${scala.binary.version} - 1.13-SNAPSHOT + org.apache.flink + flink-connector-gcp-pubsub + ${project.version} ``` @@ -18,24 +20,51 @@ Add this dependency entry to your pom.xml to use the Google Cloud Pub/Sub source ### Split Enumerator -An enumerator is supposed to discover splits and assign them to the readers so that they can do the actual reading. However, the implementation of the `PubSubSourceEnumerator` doesn't do any real work discovery because Pub/Sub doesn't expose any partitions from which splits could be constructed. Instead, the enumerator assigns a static `PubSubSplit` to every `PubSubSourceReader` that joins so that the readers can start pulling messages. The static source split doesn't contain split-specific information like partitions/offsets because this information can not be obtained from Pub/Sub. Because of the static source split, there is no state in the enumerator which would have to be snapshot when checkpointing. +An enumerator is supposed to discover splits and assign them to the readers so that they can do the +actual reading. However, the implementation of the `PubSubSourceEnumerator` doesn't do any real work +discovery because Pub/Sub doesn't expose any partitions from which splits could be constructed. +Instead, the enumerator assigns a static `PubSubSplit` to every `PubSubSourceReader` that joins so +that the readers can start pulling messages. The static source split doesn't contain split-specific +information like partitions/offsets because this information can not be obtained from Pub/Sub. +Because of the static source split, there is no state in the enumerator which would have to be +snapshot when checkpointing. ### Source Reader -A `PubSubSourceReader` uses Pub/Sub's pull mechanism to read new messages from the Pub/Sub subscription specified by the user. In the case of parallel-running source readers in Flink, every source reader is passed the same source split from the enumerator. Because of this, all source readers use the same connection details and the same Pub/Sub subscription to receive messages. In this case, Pub/Sub automatically load-balances messages between all source readers which pull from the same subscription. This way, parallel processing is achieved in the source. The source reader is notified when a checkpoint completes so that it can trigger the acknowledgement of successfully received Pub/Sub messages through the split reader. As a result, when a checkpoint completes, all messages which were successfully pulled since the previous checkpoint are acknowledged to Pub/Sub to ensure they won't be redelivered. +A `PubSubSourceReader` uses Pub/Sub's pull mechanism to read new messages from the Pub/Sub +subscription specified by the user. In the case of parallel-running source readers in Flink, every +source reader is passed the same source split from the enumerator. Because of this, all source +readers use the same connection details and the same Pub/Sub subscription to receive messages. In +this case, Pub/Sub automatically load-balances messages between all source readers which pull from +the same subscription. This way, parallel processing is achieved in the source. The source reader is +notified when a checkpoint completes so that it can trigger the acknowledgement of successfully +received Pub/Sub messages through the split reader. As a result, when a checkpoint completes, all +messages which were successfully pulled since the previous checkpoint are acknowledged to Pub/Sub to +ensure they won't be redelivered. ## Delivery Guarantee -Google Cloud Pub/Sub only guarantees at-least-once message delivery. This guarantee is kept up by the source as well. To achieve this, the source makes use of Pub/Sub's expectation that a message should be acknowledged by the subscriber to signal that the message has been consumed successfully. Any message that has not been acknowledged yet will be automatically redelivered by Pub/Sub once an ack deadline has passed. +Google Cloud Pub/Sub only guarantees at-least-once message delivery. This guarantee is kept up by +the source as well. To achieve this, the source makes use of Pub/Sub's expectation that a message +should be acknowledged by the subscriber to signal that the message has been consumed successfully. +Any message that has not been acknowledged yet will be automatically redelivered by Pub/Sub once an +ack deadline has passed. -When a checkpoint completes, all messages which were successfully pulled since the previous checkpoint are acknowledged to Pub/Sub. This ensures at-least-once delivery in the source because in the case of failure, non-checkpointed messages have not yet been acknowledged and will therefore be redelivered by the Pub/Sub service. +When a checkpoint completes, all messages which were successfully pulled since the previous +checkpoint are acknowledged to Pub/Sub. This ensures at-least-once delivery in the source because in +the case of failure, non-checkpointed messages have not yet been acknowledged and will therefore be +redelivered by the Pub/Sub service. ## Usage -To keep up the Google Cloud Pub/Sub at-least-once guarantee, messages are acknowledged against Pub/Sub when checkpointing succeeds. If a message is not acknowledged within an acknowledge deadline, Pub/Sub will attempt redelivery. To avoid unnecessary redelivery of successfully received messages, the checkpointing interval should always be configured (much) *lower* than the Google Cloud Pub/Sub acknowledge deadline. +To keep up the Google Cloud Pub/Sub at-least-once guarantee, messages are acknowledged against +Pub/Sub when checkpointing succeeds. If a message is not acknowledged within an acknowledge +deadline, Pub/Sub will attempt redelivery. To avoid unnecessary redelivery of successfully received +messages, the checkpointing interval should always be configured (much) *lower* than the Google +Cloud Pub/Sub acknowledge deadline. ```java -import org.apache.flink.streaming.connectors.gcp.pubsub.source.PubSubSource; +import org.apache.flink.connector.gcp.pubsub.source.PubSubSource; StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); diff --git a/flink-connector-gcp-pubsub/pom.xml b/flink-connector-gcp-pubsub/pom.xml index 0cc99da7..5ad64033 100644 --- a/flink-connector-gcp-pubsub/pom.xml +++ b/flink-connector-gcp-pubsub/pom.xml @@ -38,7 +38,7 @@ under the License. org.apache.flink flink-connector-base - ${project.version} + ${flink.version} diff --git a/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/PubSubSource.java b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/PubSubSource.java index 68ca5fea..43eb5a59 100644 --- a/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/PubSubSource.java +++ b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/PubSubSource.java @@ -60,9 +60,9 @@ /** * A source implementation to pull messages from GCP Pub/Sub into Flink. * - *

The {@link PubSubSourceEnumerator} assigns a static {@PubSubSplit} to every - * {@PubSubSourceReader} that joins. The split does not contain any split-specific information - * because Pub/Sub does not allow subscribers to specify a "range" of messages to pull by providing + *

The {@link PubSubSourceEnumerator} assigns a static {@link PubSubSplit} to every {@link + * PubSubSourceReader} that joins. The split does not contain any split-specific information because + * Pub/Sub does not allow subscribers to specify a "range" of messages to pull by providing * partitions or offsets. However, Pub/Sub will automatically load-balance messages between multiple * readers which use the same subscription. * @@ -250,7 +250,7 @@ public PubSubSourceBuilder setPubSubSubscriberFactory( * @param maxMessagesPerPull The maximum number of messages that should be pulled in one go. * @param perRequestTimeout The timeout per request from the subscriber * @param retries The number of times the reception of a message should be retried in case - * of * failure. + * of failure. */ public PubSubSourceBuilder setPubSubSubscriberFactory( int maxMessagesPerPull, Duration perRequestTimeout, int retries) { diff --git a/flink-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/connector/gcp/pubsub/EmulatedPubSubSourceTest.java.orig b/flink-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/connector/gcp/pubsub/EmulatedPubSubSourceTest.java.orig new file mode 100644 index 00000000..8d9a421a --- /dev/null +++ b/flink-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/connector/gcp/pubsub/EmulatedPubSubSourceTest.java.orig @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.connector.gcp.pubsub; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.connector.gcp.pubsub.source.PubSubSource; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamUtils; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.EmulatorCredentials; +import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.GCloudUnitTestBase; +import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.PubSubSubscriberFactoryForEmulator; +import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.PubsubHelper; + +import com.google.cloud.pubsub.v1.Publisher; +import com.google.protobuf.ByteString; +import com.google.pubsub.v1.PubsubMessage; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ExecutionException; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** Test of {@link PubSubSource} against the GCP Pub/Sub emulator SDK. */ +public class EmulatedPubSubSourceTest extends GCloudUnitTestBase { + private static final String PROJECT_NAME = "FLProject"; + private static final String TOPIC_NAME = "FLTopic"; + private static final String SUBSCRIPTION_NAME = "FLSubscription"; + + private static PubsubHelper pubsubHelper; + + @Before + public void setUp() throws Exception { + pubsubHelper = getPubsubHelper(); + pubsubHelper.createTopic(PROJECT_NAME, TOPIC_NAME); + pubsubHelper.createSubscription(PROJECT_NAME, SUBSCRIPTION_NAME, PROJECT_NAME, TOPIC_NAME); + } + + @After + public void tearDown() throws Exception { + pubsubHelper.deleteSubscription(PROJECT_NAME, SUBSCRIPTION_NAME); + pubsubHelper.deleteTopic(PROJECT_NAME, TOPIC_NAME); + } + + public void testFlinkSource(boolean testWithFailure) throws Exception { + // Create some messages and put them into pubsub + List input = + Arrays.asList( + "One", "Two", "Three", "Four", "Five", "Six", "Seven", "Eight", "Nine", + "Ten"); + + List messagesToSend = new ArrayList<>(input); + + // Publish the messages into PubSub + Publisher publisher = pubsubHelper.createPublisher(PROJECT_NAME, TOPIC_NAME); + messagesToSend.forEach( + s -> { + try { + publisher + .publish( + PubsubMessage.newBuilder() + .setData(ByteString.copyFromUtf8(s)) + .build()) + .get(); + } catch (InterruptedException | ExecutionException e) { + e.printStackTrace(); + } + }); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(100); + env.setParallelism(1); + if (testWithFailure) { + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 2000)); + } else { + env.setRestartStrategy(RestartStrategies.noRestart()); + } + + PubSubSource source = + PubSubSource.builder() + .setDeserializationSchema(new SimpleStringSchema()) + .setProjectName(PROJECT_NAME) + .setSubscriptionName(SUBSCRIPTION_NAME) + .setCredentials(EmulatorCredentials.getInstance()) + .setPubSubSubscriberFactory( + new PubSubSubscriberFactoryForEmulator( + getPubSubHostPort(), + PROJECT_NAME, + SUBSCRIPTION_NAME, + 10, + Duration.ofSeconds(1), + 3)) + .build(); + + DataStream fromPubSub = + env.fromSource(source, WatermarkStrategy.noWatermarks(), "test-pubsub-new-source"); + + if (testWithFailure) { + fromPubSub = fromPubSub.map(new FailureMapFunction<>(3)); + } + + List output = new ArrayList<>(); + DataStreamUtils.collect(fromPubSub).forEachRemaining(output::add); + + assertEquals("Wrong number of elements", input.size(), output.size()); + for (String test : input) { + assertTrue("Missing " + test, output.contains(test)); + } + } + + private class FailureMapFunction extends RichMapFunction { + private final long numberOfRecordsUntilFailure; + private long numberOfRecordsProcessed; + + private FailureMapFunction(long numberOfRecordsBeforeFailure) { + this.numberOfRecordsUntilFailure = numberOfRecordsBeforeFailure; + } + + @Override + public T map(T value) throws Exception { + numberOfRecordsProcessed++; + + if (shouldThrowException()) { + throw new Exception( + "Deliberately thrown exception to induce crash for failure recovery testing."); + } + return value; + } + + private boolean shouldThrowException() { + return getRuntimeContext().getAttemptNumber() <= 1 + && (numberOfRecordsProcessed >= numberOfRecordsUntilFailure); + } + } + + // IMPORTANT: This test makes use of things that happen in the emulated PubSub that + // are GUARANTEED to be different in the real Google hosted PubSub. + // So running these tests against the real thing will have a very high probability of + // failing. + // The assumptions: + // 1) The ordering of the messages is maintained. + // 2) Exactly once: We assume that every message we put in comes out exactly once. + // In the real PubSub there are a lot of situations (mostly failure/retry) where this is not + // true. + @Test + public void testFlinkSourceOk() throws Exception { + testFlinkSource(false); + } + + @Test + public void testFlinkSourceFailure() throws Exception { + testFlinkSource(true); + } +} From 7e8e4abee14df77bcb944e289e94dd7ed8e0d9d8 Mon Sep 17 00:00:00 2001 From: David Christle Date: Mon, 13 Nov 2023 04:49:09 -0800 Subject: [PATCH 6/8] Improvements to FLINK-20625 FLIP-27 connector (#1) * Add retry mechanism to acknowledging pubsub messages * Remove synchronized locks and instead decouple reader and fetcher threads with ArrayBlockingQueue. This fixes a concurrency bug between separate threads calling fetch/acknowledgeMessages. * Handle failed checkpoints and ensure subscriber is not null in acknowledgeMessages. * Implement splitAckIds with iteration instead of recursion. Fixes bug where payload can exceed maxPayload. --------- Co-authored-by: Richard Deurwaarder --- .../reader/PubSubSourceFetcherManager.java | 3 +- .../source/reader/PubSubSplitReader.java | 138 +++++++++++++----- .../pubsub/BlockingGrpcPubSubSubscriber.java | 92 +++++++----- 3 files changed, 161 insertions(+), 72 deletions(-) diff --git a/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/reader/PubSubSourceFetcherManager.java b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/reader/PubSubSourceFetcherManager.java index 36893438..0b72b5e5 100644 --- a/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/reader/PubSubSourceFetcherManager.java +++ b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/reader/PubSubSourceFetcherManager.java @@ -31,6 +31,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.util.function.Supplier; /** @@ -109,7 +110,7 @@ private void enqueueAcknowledgeMessagesTask( splitFetcher.enqueueTask( new SplitFetcherTask() { @Override - public boolean run() { + public boolean run() throws IOException { pubSubSplitReader.acknowledgeMessages(checkpointId); return true; } diff --git a/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/reader/PubSubSplitReader.java b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/reader/PubSubSplitReader.java index 2c99325d..974cdf33 100644 --- a/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/reader/PubSubSplitReader.java +++ b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/reader/PubSubSplitReader.java @@ -37,10 +37,12 @@ import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; +import java.util.HashMap; import java.util.List; -import java.util.SortedMap; -import java.util.TreeMap; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.stream.Collectors; /** * A {@link SplitReader} to read from a given {@link PubSubSubscriber}. @@ -49,16 +51,21 @@ */ public class PubSubSplitReader implements SplitReader, PubSubSplit> { private static final Logger LOG = LoggerFactory.getLogger(PubSubSplitReader.class); - private static final long UPCOMING_CHECKPOINT = 0; + private static final int RECEIVED_MESSAGE_QUEUE_MAX_RETRY_COUNT = 5; + private static final int RECEIVED_MESSAGE_QUEUE_CAPACITY = 500000; + private static final long RECEIVED_MESSAGE_QUEUE_RETRY_SLEEP_MILLIS = 1000; private final PubSubDeserializationSchema deserializationSchema; private final PubSubSubscriberFactory pubSubSubscriberFactory; private final Credentials credentials; - private PubSubSubscriber subscriber; + private volatile PubSubSubscriber subscriber; private final PubSubCollector collector; - // Store the IDs of GCP Pub/Sub messages that yet have to be acknowledged so that they are not - // resent. Must be synchronized because it's accessed both by the fetcher and the reader thread. - private final SortedMap> messageIdsToAcknowledge = - Collections.synchronizedSortedMap(new TreeMap<>()); + + // Store the IDs of GCP Pub/Sub messages we have fetched & processed. Since the reader thread + // processes messages and the fetcher thread acknowledges them, the thread-safe queue + // decouples them. + private final BlockingQueue ackIdsQueue = new ArrayBlockingQueue<>(RECEIVED_MESSAGE_QUEUE_CAPACITY); + private final Map> messageIdsToAcknowledge = new HashMap<>(); + /** * @param deserializationSchema a deserialization schema to apply to incoming message payloads. @@ -74,15 +81,17 @@ public PubSubSplitReader( this.pubSubSubscriberFactory = pubSubSubscriberFactory; this.credentials = credentials; this.collector = new PubSubCollector(); - - this.messageIdsToAcknowledge.put(UPCOMING_CHECKPOINT, new ArrayList<>()); } @Override public RecordsWithSplitIds> fetch() throws IOException { RecordsBySplits.Builder> recordsBySplits = new RecordsBySplits.Builder<>(); if (subscriber == null) { - subscriber = pubSubSubscriberFactory.getSubscriber(credentials); + synchronized (this) { + if (subscriber == null) { + subscriber = pubSubSubscriberFactory.getSubscriber(credentials); + } + } } for (ReceivedMessage receivedMessage : subscriber.pull()) { @@ -113,12 +122,39 @@ public RecordsWithSplitIds> fetch() throws IOException { collector.reset(); } - messageIdsToAcknowledge.get(UPCOMING_CHECKPOINT).add(receivedMessage.getAckId()); + enqueueAcknowledgementId(receivedMessage.getAckId()); } return recordsBySplits.build(); } + + /** + * Enqueue an acknowledgment ID to be acknowledged towards GCP Pub/Sub with retries. + * @param ackId the ID of the message to acknowledge + */ + public void enqueueAcknowledgementId(String ackId) { + int retryCount = 0; + + while (retryCount < RECEIVED_MESSAGE_QUEUE_MAX_RETRY_COUNT) { + boolean enqueued = ackIdsQueue.offer(ackId); + if (!enqueued) { + retryCount++; + try { + Thread.sleep(RECEIVED_MESSAGE_QUEUE_RETRY_SLEEP_MILLIS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.error("Thread interrupted while waiting to enqueue acknowledgment ID.", e); + return; + } + } else { + return; + } + } + + LOG.warn("Queue is full. Unable to enqueue acknowledgment ID after " + RECEIVED_MESSAGE_QUEUE_MAX_RETRY_COUNT + " retries."); + } + @Override public void handleSplitsChanges(SplitsChange splitsChanges) {} @@ -152,34 +188,70 @@ private void reset() { } } - // ------------------------------------------------------ - void prepareForAcknowledgement(long checkpointId) { - synchronized (messageIdsToAcknowledge) { - messageIdsToAcknowledge.put( - checkpointId, messageIdsToAcknowledge.remove(UPCOMING_CHECKPOINT)); - messageIdsToAcknowledge.put(UPCOMING_CHECKPOINT, new ArrayList<>()); - } + /** + * Prepare for acknowledging messages received since the last checkpoint by draining the + * {@link #ackIdsQueue} into {@link #messageIdsToAcknowledge}. + * + *

Calling this method is enqueued by the {@link PubSubSourceFetcherManager} to snapshot + * state before a checkpoint. + * + * @param checkpointId the ID of the checkpoint for which to prepare for acknowledging messages + */ + public void prepareForAcknowledgement(long checkpointId) { + List ackIds = new ArrayList<>(); + ackIdsQueue.drainTo(ackIds); + messageIdsToAcknowledge.put(checkpointId, ackIds); } /** - * Acknowledge the reception of messages towards GCP Pub/Sub since the last checkpoint. As long - * as a received message has not been acknowledged, GCP Pub/Sub will attempt to deliver it + * Acknowledge the reception of messages towards GCP Pub/Sub since the last checkpoint. If a received message + * is not acknowledged before the subscription's acknowledgment timeout, GCP Pub/Sub will attempt to deliver it * again. * - *

Calling this message is enqueued by the {@link PubSubSourceFetcherManager} on checkpoint. + *

Calling this method is enqueued by the {@link PubSubSourceFetcherManager} on checkpoint. + * + * @param checkpointId the ID of the checkpoint for which to acknowledge messages */ - void acknowledgeMessages(long checkpointId) { - synchronized (messageIdsToAcknowledge) { - List messageIdsForCheckpoint = messageIdsToAcknowledge.get(checkpointId); - if (!messageIdsForCheckpoint.isEmpty() && subscriber != null) { - LOG.info( - "Acknowledging messages for checkpoint {} with IDs {}", - checkpointId, - messageIdsForCheckpoint); - subscriber.acknowledge(messageIdsForCheckpoint); + void acknowledgeMessages(long checkpointId) throws IOException { + if (subscriber == null) { + synchronized (this) { + if (subscriber == null) { + subscriber = pubSubSubscriberFactory.getSubscriber(credentials); + } + } + } + + if (!messageIdsToAcknowledge.containsKey(checkpointId)) { + LOG.error( + "Checkpoint {} not found in set of in-flight checkpoints {}.", + checkpointId, + messageIdsToAcknowledge.keySet().stream() + .map(String::valueOf) + .collect(Collectors.joining(","))); + return; + } + + List messageIdsForCheckpoint = messageIdsToAcknowledge.remove(checkpointId); + if (!messageIdsForCheckpoint.isEmpty()) { + LOG.debug( + "Acknowledging {} messages for checkpoint {}.", + messageIdsForCheckpoint.size(), + checkpointId); + subscriber.acknowledge(messageIdsForCheckpoint); + } else { + LOG.debug("No messages to acknowledge for checkpoint {}.", checkpointId); + } + + // Handle the case where a checkpoint is aborted and the messages for that checkpoint are + // never acknowledged. Here, we log any remaining checkpointIds and clear them. This relies + // on GCP Pub/Sub to redeliver the unacked messages. + if (!messageIdsToAcknowledge.isEmpty()) { + // Loop through any remaining checkpointIds in messageIdsToAcknowledge, and then clear them. + for (Map.Entry> entry : messageIdsToAcknowledge.entrySet()) { + LOG.warn("Checkpoint {} was not acknowledged - clearing {} unacked messages.", entry.getKey(), entry.getValue().size()); } - messageIdsToAcknowledge.remove(checkpointId); + messageIdsToAcknowledge.clear(); } } } diff --git a/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/BlockingGrpcPubSubSubscriber.java b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/BlockingGrpcPubSubSubscriber.java index 790e1e1e..ec341297 100644 --- a/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/BlockingGrpcPubSubSubscriber.java +++ b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/BlockingGrpcPubSubSubscriber.java @@ -18,7 +18,6 @@ package org.apache.flink.streaming.connectors.gcp.pubsub; -import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriber; import com.google.pubsub.v1.AcknowledgeRequest; @@ -29,10 +28,10 @@ import io.grpc.StatusRuntimeException; import java.time.Duration; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; -import static java.util.Collections.emptyList; import static java.util.concurrent.TimeUnit.SECONDS; /** @@ -83,52 +82,69 @@ private List pull(int retriesRemaining) { @Override public void acknowledge(List acknowledgementIds) { - if (acknowledgementIds.isEmpty()) { - return; - } - - // grpc servers won't accept acknowledge requests that are too large so we split the ackIds - Tuple2, List> splittedAckIds = splitAckIds(acknowledgementIds); - while (!splittedAckIds.f0.isEmpty()) { - AcknowledgeRequest acknowledgeRequest = - AcknowledgeRequest.newBuilder() - .setSubscription(projectSubscriptionName) - .addAllAckIds(splittedAckIds.f0) - .build(); - - stub.withDeadlineAfter(60, SECONDS).acknowledge(acknowledgeRequest); - - splittedAckIds = splitAckIds(splittedAckIds.f1); - } - } + List> splittedAckIds = splitAckIds(acknowledgementIds); + while (!splittedAckIds.isEmpty()) { + AcknowledgeRequest acknowledgeRequest = + AcknowledgeRequest.newBuilder() + .setSubscription(projectSubscriptionName) + .addAllAckIds(splittedAckIds.remove(0)) + .build(); + + acknowledgeWithRetries(acknowledgeRequest, retries); + } + } + + private void acknowledgeWithRetries(AcknowledgeRequest acknowledgeRequest, int retriesRemaining) { + try { + stub.withDeadlineAfter(timeout.toMillis(), TimeUnit.MILLISECONDS).acknowledge(acknowledgeRequest); + } catch (StatusRuntimeException e) { + if (retriesRemaining > 0) { + acknowledgeWithRetries(acknowledgeRequest, retriesRemaining - 1); + return; + } + + throw e; + } + } /* maxPayload is the maximum number of bytes to devote to actual ids in - * acknowledgement or modifyAckDeadline requests. A serialized - * AcknowledgeRequest grpc call has a small constant overhead, plus the size of the - * subscription name, plus 3 bytes per ID (a tag byte and two size bytes). A - * ModifyAckDeadlineRequest has an additional few bytes for the deadline. We - * don't know the subscription name here, so we just assume the size exclusive - * of ids is 100 bytes. - - * With gRPC there is no way for the client to know the server's max message size (it is - * configurable on the server). We know from experience that it is 512K. - * @return First list contains no more than 512k bytes, second list contains remaining ids - */ - private Tuple2, List> splitAckIds(List ackIds) { - final int maxPayload = 500 * 1024; // little below 512k bytes to be on the safe side + * acknowledgement or modifyAckDeadline requests. A serialized + * AcknowledgeRequest grpc call has a small constant overhead, plus the size of the + * subscription name, plus 3 bytes per ID (a tag byte and two size bytes). A + * ModifyAckDeadlineRequest has an additional few bytes for the deadline. We + * don't know the subscription name here, so we just assume the size exclusive + * of ids is 100 bytes. + + * With gRPC there is no way for the client to know the server's max message size (it is + * configurable on the server). We know from experience that it is 512K. + * @return First list contains no more than 512k bytes, second list contains remaining ids + */ + private List> splitAckIds(List ackIds) { + int queueSize = ackIds.size(); + final int maxPayload = 500 * 1024; // slightly below 512k bytes to be on the safe side final int fixedOverheadPerCall = 100; final int overheadPerId = 3; + List> outputLists = new ArrayList<>(); + List currentList = new ArrayList<>(); int totalBytes = fixedOverheadPerCall; - for (int i = 0; i < ackIds.size(); i++) { - totalBytes += ackIds.get(i).length() + overheadPerId; - if (totalBytes > maxPayload) { - return Tuple2.of(ackIds.subList(0, i), ackIds.subList(i, ackIds.size())); + for (String ackId : ackIds) { + if (totalBytes + ackId.length() + overheadPerId > maxPayload) { + outputLists.add(currentList); + currentList = new ArrayList<>(); + totalBytes = fixedOverheadPerCall; } + currentList.add(ackId); + totalBytes += ackId.length() + overheadPerId; } + if (!currentList.isEmpty()) { + outputLists.add(currentList); + } + + assert outputLists.stream().map(List::size).reduce(0, Integer::sum) == queueSize; - return Tuple2.of(ackIds, emptyList()); + return outputLists; } @Override From 55f0cf1e9ab05d2397623416d84367eacf8ab537 Mon Sep 17 00:00:00 2001 From: Ryan Skraba Date: Wed, 15 Nov 2023 14:00:20 +0100 Subject: [PATCH 7/8] [hotfix] Spotless code --- .../source/reader/PubSubSplitReader.java | 30 ++++---- .../pubsub/BlockingGrpcPubSubSubscriber.java | 72 ++++++++++--------- 2 files changed, 55 insertions(+), 47 deletions(-) diff --git a/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/reader/PubSubSplitReader.java b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/reader/PubSubSplitReader.java index 974cdf33..7d404fab 100644 --- a/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/reader/PubSubSplitReader.java +++ b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/reader/PubSubSplitReader.java @@ -63,10 +63,10 @@ public class PubSubSplitReader implements SplitReader, PubSub // Store the IDs of GCP Pub/Sub messages we have fetched & processed. Since the reader thread // processes messages and the fetcher thread acknowledges them, the thread-safe queue // decouples them. - private final BlockingQueue ackIdsQueue = new ArrayBlockingQueue<>(RECEIVED_MESSAGE_QUEUE_CAPACITY); + private final BlockingQueue ackIdsQueue = + new ArrayBlockingQueue<>(RECEIVED_MESSAGE_QUEUE_CAPACITY); private final Map> messageIdsToAcknowledge = new HashMap<>(); - /** * @param deserializationSchema a deserialization schema to apply to incoming message payloads. * @param pubSubSubscriberFactory a factory from which a new subscriber can be created from @@ -128,9 +128,9 @@ public RecordsWithSplitIds> fetch() throws IOException { return recordsBySplits.build(); } - /** * Enqueue an acknowledgment ID to be acknowledged towards GCP Pub/Sub with retries. + * * @param ackId the ID of the message to acknowledge */ public void enqueueAcknowledgementId(String ackId) { @@ -152,7 +152,10 @@ public void enqueueAcknowledgementId(String ackId) { } } - LOG.warn("Queue is full. Unable to enqueue acknowledgment ID after " + RECEIVED_MESSAGE_QUEUE_MAX_RETRY_COUNT + " retries."); + LOG.warn( + "Queue is full. Unable to enqueue acknowledgment ID after " + + RECEIVED_MESSAGE_QUEUE_MAX_RETRY_COUNT + + " retries."); } @Override @@ -188,10 +191,9 @@ private void reset() { } } - /** - * Prepare for acknowledging messages received since the last checkpoint by draining the - * {@link #ackIdsQueue} into {@link #messageIdsToAcknowledge}. + * Prepare for acknowledging messages received since the last checkpoint by draining the {@link + * #ackIdsQueue} into {@link #messageIdsToAcknowledge}. * *

Calling this method is enqueued by the {@link PubSubSourceFetcherManager} to snapshot * state before a checkpoint. @@ -205,9 +207,9 @@ public void prepareForAcknowledgement(long checkpointId) { } /** - * Acknowledge the reception of messages towards GCP Pub/Sub since the last checkpoint. If a received message - * is not acknowledged before the subscription's acknowledgment timeout, GCP Pub/Sub will attempt to deliver it - * again. + * Acknowledge the reception of messages towards GCP Pub/Sub since the last checkpoint. If a + * received message is not acknowledged before the subscription's acknowledgment timeout, GCP + * Pub/Sub will attempt to deliver it again. * *

Calling this method is enqueued by the {@link PubSubSourceFetcherManager} on checkpoint. * @@ -247,9 +249,13 @@ void acknowledgeMessages(long checkpointId) throws IOException { // never acknowledged. Here, we log any remaining checkpointIds and clear them. This relies // on GCP Pub/Sub to redeliver the unacked messages. if (!messageIdsToAcknowledge.isEmpty()) { - // Loop through any remaining checkpointIds in messageIdsToAcknowledge, and then clear them. + // Loop through any remaining checkpointIds in messageIdsToAcknowledge, and then clear + // them. for (Map.Entry> entry : messageIdsToAcknowledge.entrySet()) { - LOG.warn("Checkpoint {} was not acknowledged - clearing {} unacked messages.", entry.getKey(), entry.getValue().size()); + LOG.warn( + "Checkpoint {} was not acknowledged - clearing {} unacked messages.", + entry.getKey(), + entry.getValue().size()); } messageIdsToAcknowledge.clear(); } diff --git a/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/BlockingGrpcPubSubSubscriber.java b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/BlockingGrpcPubSubSubscriber.java index ec341297..9f7ae631 100644 --- a/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/BlockingGrpcPubSubSubscriber.java +++ b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/BlockingGrpcPubSubSubscriber.java @@ -82,43 +82,45 @@ private List pull(int retriesRemaining) { @Override public void acknowledge(List acknowledgementIds) { - List> splittedAckIds = splitAckIds(acknowledgementIds); - while (!splittedAckIds.isEmpty()) { - AcknowledgeRequest acknowledgeRequest = - AcknowledgeRequest.newBuilder() - .setSubscription(projectSubscriptionName) - .addAllAckIds(splittedAckIds.remove(0)) - .build(); - - acknowledgeWithRetries(acknowledgeRequest, retries); - } - } - - private void acknowledgeWithRetries(AcknowledgeRequest acknowledgeRequest, int retriesRemaining) { - try { - stub.withDeadlineAfter(timeout.toMillis(), TimeUnit.MILLISECONDS).acknowledge(acknowledgeRequest); - } catch (StatusRuntimeException e) { - if (retriesRemaining > 0) { - acknowledgeWithRetries(acknowledgeRequest, retriesRemaining - 1); - return; - } - - throw e; - } - } + List> splittedAckIds = splitAckIds(acknowledgementIds); + while (!splittedAckIds.isEmpty()) { + AcknowledgeRequest acknowledgeRequest = + AcknowledgeRequest.newBuilder() + .setSubscription(projectSubscriptionName) + .addAllAckIds(splittedAckIds.remove(0)) + .build(); + + acknowledgeWithRetries(acknowledgeRequest, retries); + } + } + + private void acknowledgeWithRetries( + AcknowledgeRequest acknowledgeRequest, int retriesRemaining) { + try { + stub.withDeadlineAfter(timeout.toMillis(), TimeUnit.MILLISECONDS) + .acknowledge(acknowledgeRequest); + } catch (StatusRuntimeException e) { + if (retriesRemaining > 0) { + acknowledgeWithRetries(acknowledgeRequest, retriesRemaining - 1); + return; + } + + throw e; + } + } /* maxPayload is the maximum number of bytes to devote to actual ids in - * acknowledgement or modifyAckDeadline requests. A serialized - * AcknowledgeRequest grpc call has a small constant overhead, plus the size of the - * subscription name, plus 3 bytes per ID (a tag byte and two size bytes). A - * ModifyAckDeadlineRequest has an additional few bytes for the deadline. We - * don't know the subscription name here, so we just assume the size exclusive - * of ids is 100 bytes. - - * With gRPC there is no way for the client to know the server's max message size (it is - * configurable on the server). We know from experience that it is 512K. - * @return First list contains no more than 512k bytes, second list contains remaining ids - */ + * acknowledgement or modifyAckDeadline requests. A serialized + * AcknowledgeRequest grpc call has a small constant overhead, plus the size of the + * subscription name, plus 3 bytes per ID (a tag byte and two size bytes). A + * ModifyAckDeadlineRequest has an additional few bytes for the deadline. We + * don't know the subscription name here, so we just assume the size exclusive + * of ids is 100 bytes. + + * With gRPC there is no way for the client to know the server's max message size (it is + * configurable on the server). We know from experience that it is 512K. + * @return First list contains no more than 512k bytes, second list contains remaining ids + */ private List> splitAckIds(List ackIds) { int queueSize = ackIds.size(); final int maxPayload = 500 * 1024; // slightly below 512k bytes to be on the safe side From b6e38a61a510e763eeb5cff113d07590f5c00456 Mon Sep 17 00:00:00 2001 From: Ryan Skraba Date: Wed, 15 Nov 2023 15:15:34 +0100 Subject: [PATCH 8/8] [FLINK-20625] Review comments and code clean-up --- .../connector/gcp/pubsub/source/PubSubSource.java | 10 +++++----- .../gcp/pubsub/source/reader/PubSubRecordEmitter.java | 3 +-- .../source/reader/PubSubSourceFetcherManager.java | 6 ++++-- .../gcp/pubsub/source/reader/PubSubSourceReader.java | 9 ++++----- .../gcp/pubsub/source/reader/PubSubSplitReader.java | 2 +- 5 files changed, 15 insertions(+), 15 deletions(-) diff --git a/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/PubSubSource.java b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/PubSubSource.java index 43eb5a59..48c3a2c2 100644 --- a/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/PubSubSource.java +++ b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/PubSubSource.java @@ -64,7 +64,7 @@ * PubSubSourceReader} that joins. The split does not contain any split-specific information because * Pub/Sub does not allow subscribers to specify a "range" of messages to pull by providing * partitions or offsets. However, Pub/Sub will automatically load-balance messages between multiple - * readers which use the same subscription. + * readers using same subscription. * *

A {@link PubSubSource} can be constructed through the {@link PubSubSourceBuilder} like so: * @@ -229,8 +229,8 @@ public PubSubSourceBuilder setSubscriptionName(String subscriptionName) { } /** - * @param credentials an instance of {@com.google.auth.Credentials} to authenticate against - * Google Cloud + * @param credentials an instance of {@link com.google.auth.Credentials} to authenticate + * against Google Cloud */ public PubSubSourceBuilder setCredentials(Credentials credentials) { this.credentials = credentials; @@ -245,7 +245,7 @@ public PubSubSourceBuilder setPubSubSubscriberFactory( } /** - * Create a parameterized {@DefaultPubSubSubscriberFactory} and set it on the builder. + * Create a parameterized {@link DefaultPubSubSubscriberFactory} and set it on the builder. * * @param maxMessagesPerPull The maximum number of messages that should be pulled in one go. * @param perRequestTimeout The timeout per request from the subscriber @@ -289,7 +289,7 @@ public PubSubSource build() throws IOException { DEFAULT_PUBSUB_SUBSCRIBER_MAX_MESSAGES_PER_PULL); } - return new PubSubSource( + return new PubSubSource<>( deserializationSchema, pubSubSubscriberFactory, props, credentials); } } diff --git a/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/reader/PubSubRecordEmitter.java b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/reader/PubSubRecordEmitter.java index 311743c5..53ae2bc0 100644 --- a/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/reader/PubSubRecordEmitter.java +++ b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/reader/PubSubRecordEmitter.java @@ -34,8 +34,7 @@ public class PubSubRecordEmitter implements RecordEmitter, T, @Override public void emitRecord( - Tuple2 element, SourceOutput output, PubSubSplitState splitState) - throws Exception { + Tuple2 element, SourceOutput output, PubSubSplitState splitState) { output.collect(element.f0, element.f1); } } diff --git a/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/reader/PubSubSourceFetcherManager.java b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/reader/PubSubSourceFetcherManager.java index 0b72b5e5..c41cf4fc 100644 --- a/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/reader/PubSubSourceFetcherManager.java +++ b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/reader/PubSubSourceFetcherManager.java @@ -20,6 +20,7 @@ package org.apache.flink.connector.gcp.pubsub.source.reader; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; import org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager; import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher; @@ -45,8 +46,9 @@ class PubSubSourceFetcherManager PubSubSourceFetcherManager( FutureCompletingBlockingQueue>> elementsQueue, - Supplier, PubSubSplit>> splitReaderSupplier) { - super(elementsQueue, splitReaderSupplier); + Supplier, PubSubSplit>> splitReaderSupplier, + Configuration config) { + super(elementsQueue, splitReaderSupplier, config); } void prepareForAcknowledgement(long checkpointId) { diff --git a/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/reader/PubSubSourceReader.java b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/reader/PubSubSourceReader.java index 76363e3b..ae92f5fb 100644 --- a/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/reader/PubSubSourceReader.java +++ b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/reader/PubSubSourceReader.java @@ -32,7 +32,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.function.Supplier; @@ -51,7 +51,7 @@ public PubSubSourceReader( SourceReaderContext context) { super( elementsQueue, - new PubSubSourceFetcherManager<>(elementsQueue, splitReaderSupplier::get), + new PubSubSourceFetcherManager<>(elementsQueue, splitReaderSupplier::get, config), recordEmitter, config, context); @@ -64,7 +64,7 @@ protected void onSplitFinished(Map finishedSplitIds) { public List snapshotState(long checkpointId) { ((PubSubSourceFetcherManager) splitFetcherManager) .prepareForAcknowledgement(checkpointId); - return Arrays.asList(new PubSubSplit()); + return Collections.singletonList(new PubSubSplit()); } /** @@ -73,10 +73,9 @@ public List snapshotState(long checkpointId) { * PubSubSplitReader}. * * @param checkpointId the checkpoint ID. - * @throws Exception */ @Override - public void notifyCheckpointComplete(long checkpointId) throws Exception { + public void notifyCheckpointComplete(long checkpointId) { LOG.info("Acknowledging received GCP Pub/Sub messages for checkpoint {}", checkpointId); ((PubSubSourceFetcherManager) splitFetcherManager).acknowledgeMessages(checkpointId); } diff --git a/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/reader/PubSubSplitReader.java b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/reader/PubSubSplitReader.java index 7d404fab..c754eef8 100644 --- a/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/reader/PubSubSplitReader.java +++ b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/reader/PubSubSplitReader.java @@ -73,7 +73,7 @@ public class PubSubSplitReader implements SplitReader, PubSub * @param credentials the credentials to use for creating a new subscriber */ public PubSubSplitReader( - PubSubDeserializationSchema deserializationSchema, + PubSubDeserializationSchema deserializationSchema, PubSubSubscriberFactory pubSubSubscriberFactory, Credentials credentials) {