-
Notifications
You must be signed in to change notification settings - Fork 13.8k
[FLINK-38355][table][FLIP-546] Support CREATE OR ALTER MATERIALIZED TABLE syntax
#27199
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
CREATE OR ALTER MATERIALIZED TABLE
CREATE OR ALTER MATERIALIZED TABLECREATE OR ALTER MATERIALIZED TABLE syntax
...l-parser/src/test/java/org/apache/flink/sql/parser/MaterializedTableStatementParserTest.java
Outdated
Show resolved
Hide resolved
...che/flink/table/planner/operations/converters/SqlAlterMaterializedTableAsQueryConverter.java
Outdated
Show resolved
Hide resolved
| .isEqualTo(newTable.getUnresolvedSchema().getPrimaryKey()); | ||
| assertThat(oldTable.getUnresolvedSchema().getWatermarkSpecs()) | ||
| .isEqualTo(newTable.getUnresolvedSchema().getWatermarkSpecs()); | ||
| assertThat(oldTable.getDefinitionQuery()).isNotEqualTo(newTable.getDefinitionQuery()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am curious what you think, we have removed DefinitionQuery, but we still have DefinitionFreshness. Should we rename DefinitionFreshness so it does not include the word Definition - maybe change just to freshness to simplify.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a valid point but we have deprecated the getFreshness() method in CatalogMaterializedTable interface.
...s/src/test/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogTest.java
Outdated
Show resolved
Hide resolved
...-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateOrAlterMaterializedTable.java
Outdated
Show resolved
Hide resolved
...l-parser/src/test/java/org/apache/flink/sql/parser/MaterializedTableStatementParserTest.java
Outdated
Show resolved
Hide resolved
.../apache/flink/table/operations/materializedtable/AlterMaterializedTableAsQueryOperation.java
Outdated
Show resolved
Hide resolved
...le-api-java/src/test/java/org/apache/flink/table/catalog/CatalogBaseTableResolutionTest.java
Outdated
Show resolved
Hide resolved
...le-api-java/src/test/java/org/apache/flink/table/catalog/CatalogBaseTableResolutionTest.java
Outdated
Show resolved
Hide resolved
b08780d to
ba9286e
Compare
ba9286e to
2cd5eae
Compare
| writer.keyword("MATERIALIZED TABLE"); | ||
| getTableName().unparse(writer, leftPrec, rightPrec); | ||
|
|
||
| if (!getColumnList().isEmpty() | ||
| || !getTableConstraints().isEmpty() | ||
| || getWatermark().isPresent()) { | ||
| SqlUnparseUtils.unparseTableSchema( | ||
| writer, | ||
| leftPrec, | ||
| rightPrec, | ||
| getColumnList(), | ||
| getTableConstraints(), | ||
| getWatermark().orElse(null)); | ||
| } | ||
|
|
||
| getComment() | ||
| .ifPresent( | ||
| comment -> { | ||
| writer.newlineAndIndent(); | ||
| writer.keyword("COMMENT"); | ||
| comment.unparse(writer, leftPrec, rightPrec); | ||
| }); | ||
|
|
||
| if (getDistribution() != null) { | ||
| writer.newlineAndIndent(); | ||
| getDistribution().unparse(writer, leftPrec, rightPrec); | ||
| } | ||
|
|
||
| if (!getPartitionKeyList().isEmpty()) { | ||
| writer.newlineAndIndent(); | ||
| writer.keyword("PARTITIONED BY"); | ||
| SqlWriter.Frame partitionedByFrame = writer.startList("(", ")"); | ||
| getPartitionKeyList().unparse(writer, leftPrec, rightPrec); | ||
| writer.endList(partitionedByFrame); | ||
| } | ||
|
|
||
| if (!getPropertyList().isEmpty()) { | ||
| writer.newlineAndIndent(); | ||
| writer.keyword("WITH"); | ||
| SqlWriter.Frame withFrame = writer.startList("(", ")"); | ||
| for (SqlNode property : getPropertyList()) { | ||
| SqlUnparseUtils.printIndent(writer); | ||
| property.unparse(writer, leftPrec, rightPrec); | ||
| } | ||
| writer.newlineAndIndent(); | ||
| writer.endList(withFrame); | ||
| } | ||
|
|
||
| if (getFreshness() != null) { | ||
| writer.newlineAndIndent(); | ||
| writer.keyword("FRESHNESS"); | ||
| writer.keyword("="); | ||
| getFreshness().unparse(writer, leftPrec, rightPrec); | ||
| } | ||
|
|
||
| if (getRefreshMode() != null) { | ||
| writer.newlineAndIndent(); | ||
| writer.keyword("REFRESH_MODE"); | ||
| writer.keyword("="); | ||
| writer.keyword(getRefreshMode().name()); | ||
| } | ||
|
|
||
| writer.newlineAndIndent(); | ||
| writer.keyword("AS"); | ||
| writer.newlineAndIndent(); | ||
| getAsQuery().unparse(writer, leftPrec, rightPrec); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess it is duplicating what we have in SqlCreateMaterializedTable
I wonder if we can extract this part into a separate method and reuse for both?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have removed the code from the SqlCreateMaterializedTable and only the child class SqlCreateOrAlterMaterializedTable is implementing the logic now. Or do we need both classes to implement it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, it means we need to reimplement it again if want to support something like ALTER MATERIALIZED TABLE ...
so yes, better to reuse
| "SELECT id, name FROM tbl_a", | ||
| "SELECT id, name FROM tbl_a"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we have at least one test where original and expanded are different?
If not then. probably should add it
| ? handleAlter(sqlCreateOrAlterMaterializedTable, context) | ||
| : handleCreate(sqlCreateOrAlterMaterializedTable, context, identifier); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we instead just redirect call to either SqlCreateMaterializedTableConverter or SqlAlterMaterializedTableConverter?
| public static final SqlSpecialOperator CREATE_OR_ALTER_OPERATOR = | ||
| new SqlSpecialOperator("CREATE OR ALTER MATERIALIZED TABLE", SqlKind.OTHER_DDL); | ||
|
|
||
| private final boolean isOrAlter; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we really need this field if we have different operators?
| @Override | ||
| public SqlOperator getOperator() { | ||
| return OPERATOR; | ||
| return CREATE_OPERATOR; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess if you drop this method override it will return the operator which is passed to the constructor and this is what we want, right?
| @Override | ||
| public SqlOperator getOperator() { | ||
| return isOrAlter ? CREATE_OR_ALTER_OPERATOR : CREATE_OPERATOR; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably no need for this method if already passed operator to constructor
| @Override | ||
| public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { | ||
| writer.keyword("CREATE"); | ||
| if (isOrAlter) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we use
| if (isOrAlter) { | |
| if (getOperator() == CREATE_OR_ALTER_OPERATOR) { |
?
and then no need for isOrAlter
| private static final String CREATE_COMMAND = "CREATE "; | ||
| private static final String CREATE_OR_ALTER_COMMAND = "CREATE OR ALTER "; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| private static final String CREATE_COMMAND = "CREATE "; | |
| private static final String CREATE_OR_ALTER_COMMAND = "CREATE OR ALTER "; | |
| private static final String CREATE_OPERATION = "CREATE "; | |
| private static final String CREATE_OR_ALTER_OPERATION = "CREATE OR ALTER "; |
| final ObjectIdentifier identifier = | ||
| this.getIdentifier(sqlCreateOrAlterMaterializedTable, context); | ||
| final ResolvedCatalogMaterializedTable oldTable = | ||
| getExistingResolvedMaterializedTable(context, identifier); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right before calling this method we already have derived these 2 values
an we reuse them instead doing same twice?
| oldTable, sqlCreateOrAlterMaterializedTable, context); | ||
|
|
||
| List<MaterializedTableChange> tableChanges = | ||
| buildTableChanges(sqlCreateOrAlterMaterializedTable, oldTable, context); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| buildTableChanges(sqlCreateOrAlterMaterializedTable, oldTable, context); | |
| buildTableChanges(sqlCreateOrAlterMaterializedTable, oldMaterializedTable, context); |
here and in other places oldTable -> oldMaterializedTable
| final MergeContext mergeContext = | ||
| this.getMergeContext(sqlCreateOrAlterMaterializedTable, context); | ||
|
|
||
| // Extract new columns |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably no need for this comment
| .refreshMode(refreshMode) | ||
| .refreshStatus(RefreshStatus.INITIALIZING); | ||
|
|
||
| // Preserve refresh handler from old table |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| // Preserve refresh handler from old table | |
| // Preserve refresh handler from old materialized table |
| private boolean tableExists(ConvertContext context, ObjectIdentifier identifier) { | ||
| return context.getCatalogManager().getTable(identifier).isPresent(); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure we need this method at all given the fact that there is getExistingResolvedMaterializedTable
moreover tableExists doesn't check table type
| return context.getCatalogManager().getTable(identifier).isPresent(); | ||
| } | ||
|
|
||
| private CatalogMaterializedTable buildNewCatalogMaterializedTableFromOldTable( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ideally this method should be in alterMaterializedTable converter
may be we can move it in future when that converter becomes more mature
What is the purpose of the change
This pull request implements the
CREATE OR ALTER MATERIALIZED TABLEsyntax as proposed in FLIP-546. This new command provides an idempotent way to manage materialized tables, enabling declarative deployment patterns for CI/CD pipelines and infrastructure-as-code workflows.The command intelligently routes to either CREATE or ALTER logic:
CREATE MATERIALIZED TABLE)ALTER MATERIALIZED TABLE AS)This eliminates the need for complex DROP-IF-EXISTS patterns and makes materialized table management more robust and predictable in automated deployment scenarios.
Brief change log
CREATE OR ALTER MATERIALIZED TABLESQL syntax to the Flink SQL parserSqlCreateOrAlterMaterializedTableto handle both create and alter operations based on table existenceSqlCreateOrAlterMaterializedTableConverterfor better encapsulationgetOriginalQuery()method toCatalogMaterializedTableinterface (aligning withCatalogViewinterface)Verifying this change
This change added tests and can be verified as follows:
MaterializedTableStatementParserTestto verify SQL parsing ofCREATE OR ALTERsyntaxSqlMaterializedTableNodeToOperationConverterTest:testCreateOrAlterMaterializedTable()- verifies CREATE behavior when table doesn't existtestCreateOrAlterMaterializedTableForExistingTable()- verifies ALTER behavior when table exists with schema evolutionSqlGatewayRestEndpointMaterializedTableITCaseto validate end-to-end behavior via REST APICREATE OR ALTERstatement multiple timesDoes this pull request potentially affect one of the following parts:
@Public(Evolving): yes -CatalogMaterializedTableinterface extended withgetOriginalQuery()methodDocumentation
docs/content/docs/dev/table/materialized-table/statements.mdexplaining CREATE OR ALTER syntax, behavior, examples, and use cases