-
Notifications
You must be signed in to change notification settings - Fork 24
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-20625][pubsub,e2e] Add PubSubSource connector using FLIP-27 #2
base: main
Are you sure you want to change the base?
[FLINK-20625][pubsub,e2e] Add PubSubSource connector using FLIP-27 #2
Conversation
Thanks for opening this pull request! Please check out our contributing guidelines. (https://flink.apache.org/contributing/how-to-contribute.html) |
* WIP * WIP * Working WIP * Clean up * Place new Pub/Sub source into existing Pub/Sub connector module * Clean up * Apply Spotless code formatting [FLINK-20625][pubsub,e2e] Attempt to support stopping the reader when stopmark is encountered [FLINK-20625][pubsub,e2e] Add checkpointing and do some refactorings * Simplify fetching from Pub/Sub in SplitReader * Allow Pub/Sub source to be only continuous unbounded * Add basic PubSubSource builder * Add configuration options for SubscriberFactory to PubSubSource, remove unused collector * Add checkpointing [FLINK-20625][pubsub,e2e] Allow multiple records inside single Pub/Sub message for deserialization [FLINK-20625][pubsub,e2e] Add Javadocs, README and clean up [FLINK-20625][pubsub,e2e] Reduce visibility of classes and their members [FLINK-20625][pubsub,e2e] Propagate Pub/Sub subscriber creation errors from SplitReader [FLINK-20625][pubsub,e2e] Use constants for default Pub/Sub subscriber parameters [FLINK-20625][pubsub,e2e] Fix dynamic Scala version in artifact example [FLINK-20625][pubsub,e2e] Rename PubSubEnumeratorCheckpoint -> PubSubEnumeratorState [FLINK-20625][pubsub,e2e] Add version checks for deserialization [FLINK-20625][pubsub,e2e] Remove unnecessary declaration of exception-throwing [FLINK-20625][pubsub,e2e] Remove disfunctional end-of-stream logic [FLINK-20625][pubsub,e2e] Avoid concurrency issues with list of Pub/Sub messages to acknowledge [FLINK-20625][pubsub,e2e] Refactor PubSubSourceBuilder [FLINK-20625][pubsub,e2e] Clarify consistency guarantee description [FLINK-20625][pubsub,e2e] Clarify Pub/Sub request timeout [FLINK-20625][pubsub,e2e] Restructure and extend readme, add basic architecture info to docstring [FLINK-20625][pubsub,e2e] Attempt to solve concurrency issues with checkpointing
7813345
to
3b33d92
Compare
@RyanSkraba Do you have any idea who could help with a review here? |
@dchristle how about you? 😉 Would you have time to look at this? |
...ctor-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/PubSubSource.java
Outdated
Show resolved
Hide resolved
...sub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/reader/PubSubSplitReader.java
Outdated
Show resolved
Hide resolved
Hi @RyanSkraba, Thank you for shepherding and improving this connector implementation over the years. I'm hoping to help review & improve it so we can finally get it merged. I just opened RyanSkraba#1 to add some improvements based on my experience running this code in production. Our internal implementation has some other improvements, but they need to be properly cleaned up & can come in a later PR, as they aren't essential. Please take a look when you get a chance. |
* Add retry mechanism to acknowledging pubsub messages * Remove synchronized locks and instead decouple reader and fetcher threads with ArrayBlockingQueue. This fixes a concurrency bug between separate threads calling fetch/acknowledgeMessages. * Handle failed checkpoints and ensure subscriber is not null in acknowledgeMessages. * Implement splitAckIds with iteration instead of recursion. Fixes bug where payload can exceed maxPayload. --------- Co-authored-by: Richard Deurwaarder <[email protected]>
Just a quick update on this PR! Thanks so much @dchristle for the review and the improvements -- I've included everything here in the PR and added you as a collaborator to this branch. Please feel free to apply any additional changes that you've learned from your use in production! That's really the most valuable source of info for a new connector. I've applied the changes you've requested, as well as some other very minor code clean-ups suggested through the IDE (some unused exceptions, another missing Java generic and using a deprecated super-constructor). |
@dchristle thanks for the review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a general question, if we are delaying acks to checkpoints, how are we going to handle if checkpoint duration is bigger than ack deadline on pubsub source, also we are having multiple pull requests ahead of acking, this causes multiple duplicates IIUC. This is not tested in IT tests added.
implements Source<OUT, PubSubSplit, PubSubEnumeratorState>, ResultTypeQueryable<OUT> { | ||
private final PubSubDeserializationSchema<OUT> deserializationSchema; | ||
private final PubSubSubscriberFactory pubSubSubscriberFactory; | ||
private final Properties props; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: rename props, is t client props, reader props,....?
private String subscriptionName; | ||
private PubSubSubscriberFactory pubSubSubscriberFactory; | ||
private Properties props; | ||
private Credentials credentials; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should pass credential provider instead, to natively support dynamic credentials discovery and refreshing.
new DefaultPubSubSubscriberFactory( | ||
ProjectSubscriptionName.format(projectName, subscriptionName), | ||
DEFAULT_PUBSUB_SUBSCRIBER_NUMBER_OF_RETRIES, | ||
Duration.ofSeconds(15), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: why not expose as default like the others?
* | ||
* @param <OUT> The output type of the source. | ||
*/ | ||
public class PubSubSource<OUT> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missing tag, @PublicEvolving
or @Public
* The enumerator for the {@link PubSubSource}. It does not do any work discovery as envisioned by | ||
* FLIP-27 because GCP Pub/Sub hides partitions and other implementation details. | ||
*/ | ||
public class PubSubSourceEnumerator implements SplitEnumerator<PubSubSplit, PubSubEnumeratorState> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missing annotation, @Internal
FutureCompletingBlockingQueue<RecordsWithSplitIds<Tuple2<T, Long>>> elementsQueue, | ||
Supplier<SplitReader<Tuple2<T, Long>, PubSubSplit>> splitReaderSupplier, | ||
Configuration config) { | ||
super(elementsQueue, splitReaderSupplier, config); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Deprecated constructor
} | ||
} | ||
|
||
private void enqueuePrepareForAcknowledgementTask( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are calling this on onCheckpointComplete
where we enqueue a task that would possibly not start immediately. What happens if we fail during acking.
If we move the acking responsiblity to Source reader, that would be much cleaner and we could make it synchronous
@@ -0,0 +1,181 @@ | |||
/* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The file extension is not correct, also Maybe we rename it EmulatedPubSubSourceV2Test
* | ||
*/ | ||
|
||
package org.apache.flink.connector.gcp.pubsub; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should be enough to add to e2e tests module
import static org.junit.Assert.assertTrue; | ||
|
||
/** Test of {@link PubSubSource} against the GCP Pub/Sub emulator SDK. */ | ||
public class EmulatedPubSubSourceTest extends GCloudUnitTestBase { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should use Junit5
could I know when this PR will be merged and release in which version? thx |
What is the purpose of the change
Brief change log
master
and fixed conflictsmain
, re-homing the change to the new directory structure.Verifying this change
Please make sure both new and modified tests in this PR follows the conventions defined in our code quality guide: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing
This change added tests and can be verified as follows:
(example:)
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: noDocumentation