[FLINK-32609] Support Projection Pushdown#174
Conversation
218aec0 to
1192c9b
Compare
1192c9b to
e22ed76
Compare
e22ed76 to
3dc9030
Compare
3dc9030 to
acc9e2e
Compare
cc64369 to
722df65
Compare
722df65 to
04e0bd6
Compare
04e0bd6 to
e9fa8ff
Compare
e9fa8ff to
dda90a7
Compare
|
This PR is being marked as stale since it has not had any activity in the last 90 days. If you are having difficulty finding a reviewer, please reach out to the If this PR is no longer valid or desired, please feel free to close it. |
|
bumping to keep alive |
2b1c998 to
95374fd
Compare
Savonitar
left a comment
There was a problem hiding this comment.
Hi.
Thanks for working on this feature, it is a useful optimization and the implementation looks solid. I've started reviewing the PR and posted some questions/comments.
| TOP_LEVEL, | ||
|
|
||
| /** The format supports projection pushdown for top-level and nested fields. */ | ||
| ALL |
There was a problem hiding this comment.
Do we validate format compatibility with projection pushdown levels? To prevent unsupported combination.
There was a problem hiding this comment.
No, we don't validate format compatibility at configuration time. This was intentional. Although I've done some of the work to figure out what level of projection is supported by some formats (avro, csv, json, avro-confluent, debezium-avro-confluent), the fact is there are more formats out there beyond those maintained by Flink or even in the open source ecosystem (we have some custom formats at my company for example). Even for the formats I've figured out, it's possible the support could potentially vary by version e.g. if the avro format projection pushdown bugs are fixed.
Something we could consider long-term is adding a method to ProjectableDecodingFormat to indicate its supported projection level. However, even that wouldn't fully solve the problem e.g. the avro format implements ProjectableDecodingFormat but doesn't actually support all projection pushdowns.
In the short-term, I've chosen to simply document the appropriate combinations in the flink-connector-kafka config docs here.
|
|
||
| @Override | ||
| public void applyProjection(final int[][] projectedFields, final DataType producedDataType) { | ||
| this.projectedPhysicalFields = projectedFields; |
There was a problem hiding this comment.
Should we also update this.producedDataType here?
There was a problem hiding this comment.
Good question! We don't need to update producedDataType in applyProjection because the javadoc for applyReadableMetadata explicitly addresses this scenario:
/**
* ...
*
* <p>Note: Use the passed data type instead of {@link ResolvedSchema#toPhysicalRowDataType()}
* for describing the final output data type when creating {@link TypeInformation}. If the
* source implements {@link SupportsProjectionPushDown}, the projection is already considered in
* the given output data type, use the {@code producedDataType} provided by this method instead
* of the {@code producedDataType} provided by {@link
* SupportsProjectionPushDown#applyProjection(int[][], DataType)}.
*
* ...
*/
void applyReadableMetadata(List<String> metadataKeys, DataType producedDataType);So we're following the recommended approach i.e. using the producedDataType from applyReadableMetadata rather than the one from applyProjection
| @Override | ||
| public boolean supportsMetadataProjection() { | ||
| return false; | ||
| throw new IllegalStateException( |
There was a problem hiding this comment.
Should this throw an exception instead of returning a boolean? Could you please elaborate on throwing an exception?
E.g. what if this method is called for logging purpose?
I've checked other flink-connectors and they either return false or use the default (return true), none throw an exception.
There was a problem hiding this comment.
From the javadoc for supportsMetadataProjection:
/**
* ...
*
* <p>This method is only called if the source does <em>not</em> implement {@link
* SupportsProjectionPushDown}.
*
* ...
*/
default boolean supportsMetadataProjection() {Since KafkaDynamicSource now implements SupportsProjectionPushDown, this method should never be called by the planner. Originally, my reasoning for throwing an exception here was to catch any unexpected invocations that would indicate a bug in the planner.
That said, I take your point about consistency with other connectors and potential issues if it's called for logging/debugging purposes. I've now removed the method entirely (so it will by default return true) 👍
| * org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource}. | ||
| */ | ||
| @Internal | ||
| public enum FormatProjectionPushdownLevel { |
There was a problem hiding this comment.
This enum (NONE, TOP_LEVEL, ALL) requires users to understand:
- Whether the format supports projection pushdown
- Whether the format supports nested projections
- Known bugs in specific formats (e.g. as was mentioned in the PR description "Avro FLINK-35324")
Maybe we can replace per-format level configuration with a single boolean flag:
'projection-pushdown.enabled' = 'true'
and internally format will decide which projection it could use.
Would this approach address the use cases you had in mind? Or this is added intentionally as a potential workaround for problems with formats?
There was a problem hiding this comment.
Or this is added intentionally as a potential workaround for problems with formats?
Yep, that's one of the main reasons but also because there are more formats out there that I can't account for. See my response to your first question here for more details.
I'm open to discussing this more. This is the only part of this PR I don't like but I felt was a necessary compromise in the short-term.
|
|
||
| /** A {@link ScanTableSource} for {@link DynamicKafkaSource}. */ | ||
| @Internal | ||
| public class DynamicKafkaTableSource |
There was a problem hiding this comment.
Could you please clarify why DynamicKafkaTableSource does not implement SupportsProjectionPushDown?
There was a problem hiding this comment.
DynamicKafkaTableSource is a new source that was just recently added a couple of weeks ago. I'm trying to keep this PR as small as possible so I haven't added SupportsProjectionPushDown support here yet but I'd be happy to implement it in the future.
Let me know if you'd prefer I include it in this PR or follow up separately.
There was a problem hiding this comment.
I agree with keeping PRs small and focused. A follow-up PR is fine. I think it makes sense to create a follow-up Jira ticket after the approvals/merge so we don't lose it
7248a83 to
1f283ee
Compare
|
Sorry for the delay. I want to take another look and complete my review next week. |
Savonitar
left a comment
There was a problem hiding this comment.
Thanks for the updates/replies!
| } | ||
|
|
||
| @Override | ||
| public Set<ConfigOption<?>> optionalOptions() { |
There was a problem hiding this comment.
- Shouldn't we add
KEY_PROJECTION_PUSHDOWN_LEVELandVALUE_PROJECTION_PUSHDOWN_LEVELtooptionalOptions()method?
The factory reads these options at line 190-191 (increateDynamicTableSource) and passes them toKafkaDynamicSource, but they aren't registered as optional options.
As a result, when a user setskey.format-projection-pushdown-level = TOP_LEVELon anupsert-kafkatable, FactoryHelper will throw
ValidationException: Unsupported options found for 'upsert-kafka'
because the key isn't in the consumed set.
Or am I missing something?
- Could we add a test that creates an upsert-kafka table with a non-default pushdown level to catch this?
There was a problem hiding this comment.
Sorry, this was a miss, great catch!
I haven't had to use projection-pushdown + upsert yet.
Added the options and test in the latest commit
There was a problem hiding this comment.
Thanks for adding the pushdown options to optionalOptions().
However, it looks like format-level projection pushdown doesn't work for upsert-kafka. Regardless of the configured pushdown level, all value fields are always fully deserialized, and projection happens only afterwards via the Projector.
The root cause is DecodingFormatWrapper -> it only implements DecodingFormat, not ProjectableDecodingFormat. When Decoder.create() checks decodingFormat instanceof ProjectableDecodingFormat, the wrapper always fails the check, so the projectInsideDeserializer path is never taken for upsert-kafka.
I implemented a failing test to verify this hypothesis and to provide a basis for the fix: Savonitar@8e2c1aed
The test writes data with a breaking schema change on a non-projected value field (name changes from INT to STRING), then reads selecting only user_id and payload. With working format-level pushdown, name would be skipped during deserialization and the query would succeed. Instead, it fails with JsonParseException because all fields are deserialized.
I took into account the style of existing tests in UpsertKafkaTableITCase and your KafkaTableITCase#testProjectionPushdownWithJsonFormatAndBreakingSchemaChange to align. I kept the commit in my fork rather than pushing to your branch -> feel free to cherry-pick it as a regression test, or reimplement it entirely if you prefer. The test should pass when the fix is implemented.
A possible fix: make DecodingFormatWrapper also implement ProjectableDecodingFormat and delegate createRuntimeDecoder(context, producedDataType, projections) to the inner format when it's a ProjectableDecodingFormat.
Could you please take a look and implement the fix if you agree with the analysis?
1f283ee to
e49caa8
Compare
e49caa8 to
a525328
Compare
Implements
SupportsProjectionPushDownforKafkaDynamicSourceallowing queries to skip deserializing columns they don't need.Configuration
key.format-projection-pushdown-levelNONENONE,TOP_LEVEL,ALLvalue.format-projection-pushdown-levelNONENONE,TOP_LEVEL,ALLBenefits
Schema evolution tolerance - Queries only fail on breaking changes to fields they actually use. For example, if column
achanges fromINTtoSTRING, the querySELECT b FROM kafkastill succeeds becauseais never deserialized. (Only applies to formats that decode fields independently e.g.json,avro-confluent,debezium-avro-confluent)Performance - Unneeded columns are filtered at the
TableSourceScannode rather than downstream. Note: Kafka itself does not support projection pushdown, so the optimization happens during deserialization.Why opt-in?
Formats have varying support levels, and incorrect configuration can cause runtime errors or wrong results. Users must explicitly enable the appropriate level for their format. Recommended configuration:
avroNONE(has issues, see FLINK-35324)avro-confluentTOP_LEVELdebezium-avro-confluentTOP_LEVELcsvTOP_LEVEL(CSV can't have nested fields technically so users could setALL)jsonALL