Skip to content

Commit c656e71

Browse files
committed
[FLINK-38355] Support CreateOrAlter MATERIALIZED TABLE
1 parent 078dc82 commit c656e71

File tree

30 files changed

+1034
-392
lines changed

30 files changed

+1034
-392
lines changed

docs/content.zh/docs/dev/table/materialized-table/statements.md

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ under the License.
2929
Flink SQL 目前支持以下物化表操作:
3030
- [CREATE MATERIALIZED TABLE](#create-materialized-table)
3131
- [ALTER MATERIALIZED TABLE](#alter-materialized-table)
32+
- [CREATE OR ALTER MATERIALIZED TABLE](#create-or-alter-materialized-table)
3233
- [DROP MATERIALIZED TABLE](#drop-materialized-table)
3334

3435
# CREATE MATERIALIZED TABLE
@@ -389,6 +390,102 @@ GROUP BY
389390
- Schema 演进当前仅支持在原表 schema 尾部追加`可空列`
390391
- 在持续模式下,新的流式作业不会从原来的流式作业的状态恢复。这可能会导致短暂的数据重复或丢失。
391392

393+
394+
# CREATE OR ALTER MATERIALIZED TABLE
395+
396+
```
397+
CREATE OR ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name
398+
399+
[ ([ <table_constraint> ]) ]
400+
401+
[COMMENT table_comment]
402+
403+
[PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
404+
405+
[WITH (key1=val1, key2=val2, ...)]
406+
407+
FRESHNESS = INTERVAL '<num>' { SECOND | MINUTE | HOUR | DAY }
408+
409+
[REFRESH_MODE = { CONTINUOUS | FULL }]
410+
411+
AS <select_statement>
412+
413+
<table_constraint>:
414+
[CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED
415+
```
416+
417+
`CREATE OR ALTER MATERIALIZED TABLE` is a convenient statement that combines create and alter functionality:
418+
419+
- **If the table does not exist**: Creates a new materialized table (behaves like [CREATE MATERIALIZED TABLE](#create-materialized-table))
420+
- **If the table exists**: Modifies the query definition (behaves like [ALTER MATERIALIZED TABLE AS](#as-select_statement-1))
421+
422+
This is particularly useful in declarative deployment scenarios where you want to define the desired state of a materialized table without needing to check if it already exists.
423+
424+
## Syntax Details
425+
426+
The syntax is identical to `CREATE MATERIALIZED TABLE`. See the sections above for detailed explanations of:
427+
- [PRIMARY KEY](#primary-key)
428+
- [PARTITIONED BY](#partitioned-by)
429+
- [WITH Options](#with-options)
430+
- [FRESHNESS](#freshness)
431+
- [REFRESH_MODE](#refresh_mode)
432+
- [AS <select_statement>](#as-select_statement)
433+
434+
## Behavior When Table Exists
435+
436+
When the materialized table already exists, the operation behaves as if you ran `ALTER MATERIALIZED TABLE AS <select_statement>`:
437+
438+
**Full mode:**
439+
1. Update the `schema` and `query definition` of the materialized table.
440+
2. The table is refreshed using the new query definition when the next refresh job is triggered.
441+
442+
**Continuous mode:**
443+
1. Pause the current running refresh job.
444+
2. Update the `schema` and `query definition` of the materialized table.
445+
3. Start a new refresh job to refresh the materialized table from the beginning.
446+
447+
See [ALTER MATERIALIZED TABLE AS](#as-select_statement-1) for more details.
448+
449+
## Examples
450+
451+
452+
```sql
453+
-- First execution: creates the table
454+
CREATE OR ALTER MATERIALIZED TABLE my_materialized_table
455+
FRESHNESS = INTERVAL '10' SECOND
456+
AS
457+
SELECT
458+
user_id,
459+
COUNT(*) AS event_count,
460+
SUM(amount) AS total_amount
461+
FROM
462+
kafka_catalog.db1.events
463+
WHERE
464+
event_type = 'purchase'
465+
GROUP BY
466+
user_id;
467+
468+
-- Second execution: alters the query definition (adds avg_amount column)
469+
CREATE OR ALTER MATERIALIZED TABLE my_materialized_table
470+
AS
471+
SELECT
472+
user_id,
473+
COUNT(*) AS event_count,
474+
SUM(amount) AS total_amount,
475+
AVG(amount) AS avg_amount -- Add a new nullable column at the end
476+
FROM
477+
kafka_catalog.db1.events
478+
WHERE
479+
event_type = 'purchase'
480+
GROUP BY
481+
user_id;
482+
```
483+
484+
<span class="label label-danger">Note</span>
485+
- When altering an existing table, schema evolution currently only supports adding `nullable` columns to the end of the original table's schema.
486+
- In continuous mode, the new refresh job will not restore from the state of the original refresh job when altering.
487+
- All limitations from both CREATE and ALTER operations apply.
488+
392489
# DROP MATERIALIZED TABLE
393490

394491
```text

docs/content/docs/dev/table/materialized-table/statements.md

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ under the License.
2929
Flink SQL supports the following Materialized Table statements for now:
3030
- [CREATE MATERIALIZED TABLE](#create-materialized-table)
3131
- [ALTER MATERIALIZED TABLE](#alter-materialized-table)
32+
- [CREATE OR ALTER MATERIALIZED TABLE](#create-or-alter-materialized-table)
3233
- [DROP MATERIALIZED TABLE](#drop-materialized-table)
3334

3435
# CREATE MATERIALIZED TABLE
@@ -391,6 +392,101 @@ ALTER MATERIALIZED TABLE my_materialized_table
391392
- Schema evolution currently only supports adding `nullable` columns to the end of the original table's schema.
392393
- In continuous mode, the new refresh job will not restore from the state of the original refresh job. This may result in temporary data duplication or loss.
393394

395+
# CREATE OR ALTER MATERIALIZED TABLE
396+
397+
```
398+
CREATE OR ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name
399+
400+
[ ([ <table_constraint> ]) ]
401+
402+
[COMMENT table_comment]
403+
404+
[PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
405+
406+
[WITH (key1=val1, key2=val2, ...)]
407+
408+
FRESHNESS = INTERVAL '<num>' { SECOND | MINUTE | HOUR | DAY }
409+
410+
[REFRESH_MODE = { CONTINUOUS | FULL }]
411+
412+
AS <select_statement>
413+
414+
<table_constraint>:
415+
[CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED
416+
```
417+
418+
`CREATE OR ALTER MATERIALIZED TABLE` is a convenient statement that combines create and alter functionality:
419+
420+
- **If the table does not exist**: Creates a new materialized table (behaves like [CREATE MATERIALIZED TABLE](#create-materialized-table))
421+
- **If the table exists**: Modifies the query definition (behaves like [ALTER MATERIALIZED TABLE AS](#as-select_statement-1))
422+
423+
This is particularly useful in declarative deployment scenarios where you want to define the desired state of a materialized table without needing to check if it already exists.
424+
425+
## Syntax Details
426+
427+
The syntax is identical to `CREATE MATERIALIZED TABLE`. See the sections above for detailed explanations of:
428+
- [PRIMARY KEY](#primary-key)
429+
- [PARTITIONED BY](#partitioned-by)
430+
- [WITH Options](#with-options)
431+
- [FRESHNESS](#freshness)
432+
- [REFRESH_MODE](#refresh_mode)
433+
- [AS <select_statement>](#as-select_statement)
434+
435+
## Behavior When Table Exists
436+
437+
When the materialized table already exists, the operation behaves as if you ran `ALTER MATERIALIZED TABLE AS <select_statement>`:
438+
439+
**Full mode:**
440+
1. Update the `schema` and `query definition` of the materialized table.
441+
2. The table is refreshed using the new query definition when the next refresh job is triggered.
442+
443+
**Continuous mode:**
444+
1. Pause the current running refresh job.
445+
2. Update the `schema` and `query definition` of the materialized table.
446+
3. Start a new refresh job to refresh the materialized table from the beginning.
447+
448+
See [ALTER MATERIALIZED TABLE AS](#as-select_statement-1) for more details.
449+
450+
## Examples
451+
452+
453+
```sql
454+
-- First execution: creates the table
455+
CREATE OR ALTER MATERIALIZED TABLE my_materialized_table
456+
FRESHNESS = INTERVAL '10' SECOND
457+
AS
458+
SELECT
459+
user_id,
460+
COUNT(*) AS event_count,
461+
SUM(amount) AS total_amount
462+
FROM
463+
kafka_catalog.db1.events
464+
WHERE
465+
event_type = 'purchase'
466+
GROUP BY
467+
user_id;
468+
469+
-- Second execution: alters the query definition (adds avg_amount column)
470+
CREATE OR ALTER MATERIALIZED TABLE my_materialized_table
471+
AS
472+
SELECT
473+
user_id,
474+
COUNT(*) AS event_count,
475+
SUM(amount) AS total_amount,
476+
AVG(amount) AS avg_amount -- Add a new nullable column at the end
477+
FROM
478+
kafka_catalog.db1.events
479+
WHERE
480+
event_type = 'purchase'
481+
GROUP BY
482+
user_id;
483+
```
484+
485+
<span class="label label-danger">Note</span>
486+
- When altering an existing table, schema evolution currently only supports adding `nullable` columns to the end of the original table's schema.
487+
- In continuous mode, the new refresh job will not restore from the state of the original refresh job when altering.
488+
- All limitations from both CREATE and ALTER operations apply.
489+
394490
# DROP MATERIALIZED TABLE
395491

396492
```text

flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -166,8 +166,7 @@ public void close() throws Exception {
166166
public ResultFetcher callMaterializedTableOperation(
167167
OperationExecutor operationExecutor,
168168
OperationHandle handle,
169-
MaterializedTableOperation op,
170-
String statement) {
169+
MaterializedTableOperation op) {
171170
if (op instanceof CreateMaterializedTableOperation) {
172171
return callCreateMaterializedTableOperation(
173172
operationExecutor, handle, (CreateMaterializedTableOperation) op);
@@ -267,7 +266,7 @@ private void createMaterializedTableInFullMode(
267266
CreateRefreshWorkflow createRefreshWorkflow =
268267
new CreatePeriodicRefreshWorkflow(
269268
materializedTableIdentifier,
270-
catalogMaterializedTable.getDefinitionQuery(),
269+
catalogMaterializedTable.getExpandedQuery(),
271270
cronExpression,
272271
getSessionInitializationConf(operationExecutor),
273272
Collections.emptyMap(),
@@ -570,7 +569,7 @@ private void executeContinuousRefreshJob(
570569
String insertStatement =
571570
getInsertStatement(
572571
materializedTableIdentifier,
573-
catalogMaterializedTable.getDefinitionQuery(),
572+
catalogMaterializedTable.getExpandedQuery(),
574573
dynamicOptions);
575574

576575
JobExecutionResult result =
@@ -652,7 +651,7 @@ public ResultFetcher refreshMaterializedTable(
652651
String insertStatement =
653652
getRefreshStatement(
654653
materializedTableIdentifier,
655-
materializedTable.getDefinitionQuery(),
654+
materializedTable.getExpandedQuery(),
656655
refreshPartitions,
657656
dynamicOptions);
658657

@@ -869,8 +868,8 @@ private ResultFetcher callAlterMaterializedTableChangeOperation(
869868
LOG.warn(
870869
"Failed to start the continuous refresh job for materialized table {} using new query {}, rollback to origin query {}.",
871870
tableIdentifier,
872-
op.getCatalogMaterializedTable().getDefinitionQuery(),
873-
suspendMaterializedTable.getDefinitionQuery(),
871+
op.getCatalogMaterializedTable().getExpandedQuery(),
872+
suspendMaterializedTable.getExpandedQuery(),
874873
e);
875874

876875
AlterMaterializedTableChangeOperation rollbackChangeOperation =
@@ -892,7 +891,7 @@ private ResultFetcher callAlterMaterializedTableChangeOperation(
892891
throw new SqlExecutionException(
893892
String.format(
894893
"Failed to start the continuous refresh job using new query %s when altering materialized table %s select query.",
895-
op.getCatalogMaterializedTable().getDefinitionQuery(),
894+
op.getCatalogMaterializedTable().getExpandedQuery(),
896895
tableIdentifier),
897896
e);
898897
}
@@ -945,8 +944,7 @@ private AlterMaterializedTableChangeOperation generateRollbackAlterMaterializedT
945944
oldMaterializedTable.getSerializedRefreshHandler()));
946945
} else if (tableChange instanceof TableChange.ModifyDefinitionQuery) {
947946
rollbackChanges.add(
948-
TableChange.modifyDefinitionQuery(
949-
oldMaterializedTable.getDefinitionQuery()));
947+
TableChange.modifyDefinitionQuery(oldMaterializedTable.getExpandedQuery()));
950948
} else {
951949
throw new ValidationException(
952950
String.format(

flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -556,8 +556,7 @@ private ResultFetcher executeOperation(
556556
return sessionContext
557557
.getSessionState()
558558
.materializedTableManager
559-
.callMaterializedTableOperation(
560-
this, handle, (MaterializedTableOperation) op, statement);
559+
.callMaterializedTableOperation(this, handle, (MaterializedTableOperation) op);
561560
} else {
562561
return callOperation(tableEnv, handle, op);
563562
}

flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1083,7 +1083,7 @@ void testAlterMaterializedTableAsQueryInFullMode() throws Exception {
10831083
.isEqualTo(
10841084
Collections.singletonList(
10851085
Column.physical("order_amount_sum", DataTypes.INT())));
1086-
assertThat(newTable.getDefinitionQuery())
1086+
assertThat(newTable.getExpandedQuery())
10871087
.isEqualTo(
10881088
String.format(
10891089
"SELECT `tmp`.`user_id`, `tmp`.`shop_id`, `tmp`.`ds`, COUNT(`tmp`.`order_id`) AS `order_cnt`, SUM(`tmp`.`order_amount`) AS `order_amount_sum`\n"
@@ -1223,7 +1223,7 @@ void testAlterMaterializedTableAsQueryInFullModeWithSuspendStatus() throws Excep
12231223
.isEqualTo(
12241224
Collections.singletonList(
12251225
Column.physical("order_amount_sum", DataTypes.INT())));
1226-
assertThat(newTable.getDefinitionQuery())
1226+
assertThat(newTable.getExpandedQuery())
12271227
.isEqualTo(
12281228
String.format(
12291229
"SELECT `tmp`.`user_id`, `tmp`.`shop_id`, `tmp`.`ds`, COUNT(`tmp`.`order_id`) AS `order_cnt`, SUM(`tmp`.`order_amount`) AS `order_amount_sum`\n"
@@ -1310,7 +1310,7 @@ void testAlterMaterializedTableAsQueryInContinuousMode(@TempDir Path temporaryPa
13101310
.isEqualTo(oldTable.getResolvedSchema().getPrimaryKey());
13111311
assertThat(newTable.getResolvedSchema().getWatermarkSpecs())
13121312
.isEqualTo(oldTable.getResolvedSchema().getWatermarkSpecs());
1313-
assertThat(newTable.getDefinitionQuery())
1313+
assertThat(newTable.getExpandedQuery())
13141314
.isEqualTo(
13151315
String.format(
13161316
"SELECT COALESCE(`tmp`.`user_id`, CAST(0 AS BIGINT)) AS `user_id`, `tmp`.`shop_id`, COALESCE(`tmp`.`ds`, '') AS `ds`, SUM(`tmp`.`payment_amount_cents`) AS `payed_buy_fee_sum`, SUM(1) AS `pv`\n"
@@ -1409,7 +1409,7 @@ void testAlterMaterializedTableAsQueryInContinuousModeWithSuspendStatus(
14091409

14101410
assertThat(getAddedColumns(newTable.getResolvedSchema(), oldTable.getResolvedSchema()))
14111411
.isEqualTo(Collections.singletonList(Column.physical("pv", DataTypes.INT())));
1412-
assertThat(newTable.getDefinitionQuery())
1412+
assertThat(newTable.getExpandedQuery())
14131413
.isEqualTo(
14141414
String.format(
14151415
"SELECT `tmp`.`user_id`, `tmp`.`shop_id`, `tmp`.`ds`, SUM(`tmp`.`payment_amount_cents`) AS `payed_buy_fee_sum`, SUM(1) AS `pv`\n"

flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@
7676
"org.apache.flink.sql.parser.ddl.SqlCreateCatalog"
7777
"org.apache.flink.sql.parser.ddl.SqlCreateDatabase"
7878
"org.apache.flink.sql.parser.ddl.SqlCreateFunction"
79-
"org.apache.flink.sql.parser.ddl.SqlCreateMaterializedTable"
79+
"org.apache.flink.sql.parser.ddl.SqlCreateOrAlterMaterializedTable"
8080
"org.apache.flink.sql.parser.ddl.SqlCreateModel"
8181
"org.apache.flink.sql.parser.ddl.SqlCreateModelAs"
8282
"org.apache.flink.sql.parser.ddl.SqlCreateTable"

flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1852,9 +1852,9 @@ SqlNode SqlReplaceTable() :
18521852
}
18531853

18541854
/**
1855-
* Parses a CREATE MATERIALIZED TABLE statement.
1855+
* Parses a CREATE [OR ALTER] MATERIALIZED TABLE statement.
18561856
*/
1857-
SqlCreate SqlCreateMaterializedTable(Span s, boolean replace, boolean isTemporary) :
1857+
SqlCreate SqlCreateOrAlterMaterializedTable(Span s, boolean replace, boolean isTemporary) :
18581858
{
18591859
final SqlParserPos startPos = s.pos();
18601860
SqlIdentifier tableName;
@@ -1866,8 +1866,14 @@ SqlCreate SqlCreateMaterializedTable(Span s, boolean replace, boolean isTemporar
18661866
SqlNode freshness = null;
18671867
SqlLiteral refreshMode = null;
18681868
SqlNode asQuery = null;
1869+
boolean isOrAlter = false;
18691870
}
18701871
{
1872+
[
1873+
<OR> <ALTER> {
1874+
isOrAlter = true;
1875+
}
1876+
]
18711877
<MATERIALIZED>
18721878
{
18731879
if (isTemporary) {
@@ -1934,7 +1940,7 @@ SqlCreate SqlCreateMaterializedTable(Span s, boolean replace, boolean isTemporar
19341940
<AS>
19351941
asQuery = OrderedQueryOrExpr(ExprContext.ACCEPT_QUERY)
19361942
{
1937-
return new SqlCreateMaterializedTable(
1943+
return new SqlCreateOrAlterMaterializedTable(
19381944
startPos.plus(getPos()),
19391945
tableName,
19401946
constraint,
@@ -1944,7 +1950,8 @@ SqlCreate SqlCreateMaterializedTable(Span s, boolean replace, boolean isTemporar
19441950
propertyList,
19451951
(SqlIntervalLiteral) freshness,
19461952
refreshMode,
1947-
asQuery);
1953+
asQuery,
1954+
isOrAlter);
19481955
}
19491956
}
19501957

@@ -2632,7 +2639,7 @@ SqlCreate SqlCreateExtended(Span s, boolean replace) :
26322639
(
26332640
create = SqlCreateCatalog(s, replace)
26342641
|
2635-
create = SqlCreateMaterializedTable(s, replace, isTemporary)
2642+
create = SqlCreateOrAlterMaterializedTable(s, replace, isTemporary)
26362643
|
26372644
create = SqlCreateTable(s, replace, isTemporary)
26382645
|

0 commit comments

Comments
 (0)