From 5ca9eef5250c257d466d98dd13a8d0dc14fe8ab4 Mon Sep 17 00:00:00 2001 From: Sergio Pena Date: Fri, 6 Dec 2024 14:27:54 -0600 Subject: [PATCH] [FLINK-36861][table-planner] CREATE TABLE AS with METADATA columns doesn't work --- .../planner/operations/MergeTableAsUtil.java | 23 +- .../planner/operations/SchemaBuilderUtil.java | 8 + .../SqlCTASNodeToOperationTest.java | 419 ++++++++++++++++++ .../SqlDdlToOperationConverterTest.java | 305 ------------- .../SqlRTASNodeToOperationConverterTest.java | 47 +- 5 files changed, 486 insertions(+), 316 deletions(-) create mode 100644 flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlCTASNodeToOperationTest.java diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/MergeTableAsUtil.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/MergeTableAsUtil.java index e3a1b08578cca..3509b62cb1a28 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/MergeTableAsUtil.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/MergeTableAsUtil.java @@ -26,6 +26,7 @@ import org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint; import org.apache.flink.table.api.Schema; import org.apache.flink.table.api.Schema.UnresolvedColumn; +import org.apache.flink.table.api.Schema.UnresolvedMetadataColumn; import org.apache.flink.table.api.Schema.UnresolvedPhysicalColumn; import org.apache.flink.table.api.TableException; import org.apache.flink.table.api.ValidationException; @@ -386,19 +387,31 @@ private void validateImplicitCastCompatibility( int columnPos, UnresolvedColumn sourceColumn, UnresolvedColumn sinkColumn) { - if (!(sinkColumn instanceof UnresolvedPhysicalColumn)) { + LogicalType sinkColumnType; + + if (sinkColumn instanceof UnresolvedPhysicalColumn) { + sinkColumnType = getLogicalType(((UnresolvedPhysicalColumn) sinkColumn)); + } else if ((sinkColumn instanceof UnresolvedMetadataColumn)) { + if (((UnresolvedMetadataColumn) sinkColumn).isVirtual()) { + throw new ValidationException( + String.format( + "A column named '%s' already exists in the source schema. " + + "Virtual metadata columns cannot overwrite " + + "columns from source.", + columnName)); + } + + sinkColumnType = getLogicalType(((UnresolvedMetadataColumn) sinkColumn)); + } else { throw new ValidationException( String.format( "A column named '%s' already exists in the source schema. " - + "Computed and metadata columns cannot overwrite " - + "regular columns.", + + "Computed columns cannot overwrite columns from source.", columnName)); } LogicalType sourceColumnType = getLogicalType(((UnresolvedPhysicalColumn) sourceColumn)); - LogicalType sinkColumnType = getLogicalType(((UnresolvedPhysicalColumn) sinkColumn)); - if (!supportsImplicitCast(sourceColumnType, sinkColumnType)) { throw new ValidationException( String.format( diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SchemaBuilderUtil.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SchemaBuilderUtil.java index ddadb0786736b..12c24689f24bb 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SchemaBuilderUtil.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SchemaBuilderUtil.java @@ -267,6 +267,14 @@ LogicalType getLogicalType(UnresolvedPhysicalColumn column) { return dataTypeFactory.createDataType(column.getDataType()).getLogicalType(); } + /** + * Gets the column data type of {@link UnresolvedMetadataColumn} column and convert it to a + * {@link LogicalType}. + */ + LogicalType getLogicalType(UnresolvedMetadataColumn column) { + return dataTypeFactory.createDataType(column.getDataType()).getLogicalType(); + } + Optional getComment(SqlTableColumn column) { return column.getComment().map(c -> ((SqlLiteral) c).getValueAs(String.class)); } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlCTASNodeToOperationTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlCTASNodeToOperationTest.java new file mode 100644 index 0000000000000..e7720058ea910 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlCTASNodeToOperationTest.java @@ -0,0 +1,419 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.operations; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.SqlDialect; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.TableDistribution; +import org.apache.flink.table.operations.CreateTableASOperation; +import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.planner.calcite.FlinkPlannerImpl; +import org.apache.flink.table.planner.parse.CalciteParser; + +import org.apache.calcite.sql.SqlNode; +import org.assertj.core.api.HamcrestCondition; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.flink.table.planner.utils.OperationMatchers.isCreateTableOperation; +import static org.apache.flink.table.planner.utils.OperationMatchers.partitionedBy; +import static org.apache.flink.table.planner.utils.OperationMatchers.withDistribution; +import static org.apache.flink.table.planner.utils.OperationMatchers.withNoDistribution; +import static org.apache.flink.table.planner.utils.OperationMatchers.withSchema; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Test base for testing convert CREATE TABLE AS statement to operation. */ +public class SqlCTASNodeToOperationTest extends SqlNodeToOperationConversionTestBase { + @Test + public void testCreateTableAsWithNotFoundColumnIdentifiers() { + CatalogTable catalogTable = + CatalogTable.newBuilder() + .schema( + Schema.newBuilder() + .column("f0", DataTypes.INT().notNull()) + .column("f1", DataTypes.INT()) + .build()) + .build(); + + catalogManager.createTable( + catalogTable, ObjectIdentifier.of("builtin", "default", "src1"), false); + + final String sql = "create table tbl1 (f1, f2) AS SELECT * FROM src1"; + + assertThatThrownBy(() -> parseAndConvert(sql)) + .isInstanceOf(ValidationException.class) + .hasMessageContaining("Column 'f2' not found in the source schema."); + } + + @Test + public void testCreateTableAsWithMismatchIdentifiersLength() { + CatalogTable catalogTable = + CatalogTable.newBuilder() + .schema( + Schema.newBuilder() + .column("f0", DataTypes.INT().notNull()) + .column("f1", DataTypes.INT()) + .build()) + .build(); + + catalogManager.createTable( + catalogTable, ObjectIdentifier.of("builtin", "default", "src1"), false); + + final String sql = "create table tbl1 (f1) AS SELECT * FROM src1"; + + assertThatThrownBy(() -> parseAndConvert(sql)) + .isInstanceOf(ValidationException.class) + .hasMessageContaining( + "The number of columns in the column list " + + "must match the number of columns in the source schema."); + } + + @Test + public void testCreateTableAsWithColumns() { + CatalogTable catalogTable = + CatalogTable.newBuilder() + .schema( + Schema.newBuilder() + .column("f0", DataTypes.INT().notNull()) + .column("f1", DataTypes.TIMESTAMP(3)) + .build()) + .build(); + + catalogManager.createTable( + catalogTable, ObjectIdentifier.of("builtin", "default", "src1"), false); + + final String sql = + "create table tbl1 (c0 int, c1 double metadata, c2 as c0 * f0, c3 timestamp(3), c4 int metadata virtual, " + + "watermark FOR c3 AS c3 - interval '3' second) " + + "AS SELECT * FROM src1"; + + Operation ctas = parseAndConvert(sql); + Operation operation = ((CreateTableASOperation) ctas).getCreateTableOperation(); + assertThat(operation) + .is( + new HamcrestCondition<>( + isCreateTableOperation( + withNoDistribution(), + withSchema( + Schema.newBuilder() + .column("c0", DataTypes.INT()) + .columnByMetadata("c1", DataTypes.DOUBLE()) + .columnByExpression("c2", "`c0` * `f0`") + .column("c3", DataTypes.TIMESTAMP(3)) + .columnByMetadata( + "c4", DataTypes.INT(), true) + .column("f0", DataTypes.INT().notNull()) + .column("f1", DataTypes.TIMESTAMP(3)) + .watermark( + "c3", "`c3` - INTERVAL '3' SECOND") + .build())))); + } + + @Test + public void testCreateTableAsWithColumnsOverridden() { + CatalogTable catalogTable = + CatalogTable.newBuilder() + .schema( + Schema.newBuilder() + .column("f0", DataTypes.INT().notNull()) + .column("f1", DataTypes.INT()) + .column("f2", DataTypes.TIMESTAMP(3)) + .column("f3", DataTypes.STRING()) + .build()) + .build(); + + catalogManager.createTable( + catalogTable, ObjectIdentifier.of("builtin", "default", "src1"), false); + + final String sql = + "create table tbl1 (c0 int, f0 bigint not null, a1 double, f2 timestamp(3) metadata, a3 string metadata) " + + "AS SELECT f0, f1 as `a1`, f2, f3 as `a3` FROM src1"; + + Operation ctas = parseAndConvert(sql); + Operation operation = ((CreateTableASOperation) ctas).getCreateTableOperation(); + assertThat(operation) + .is( + new HamcrestCondition<>( + isCreateTableOperation( + withNoDistribution(), + withSchema( + Schema.newBuilder() + .column("c0", DataTypes.INT()) + .column("f0", DataTypes.BIGINT().notNull()) + .column("a1", DataTypes.DOUBLE()) + .columnByMetadata( + "f2", DataTypes.TIMESTAMP(3)) + .columnByMetadata("a3", DataTypes.STRING()) + .build())))); + } + + @Test + public void testCreateTableAsWithOverriddenVirtualMetadataColumnsNotAllowed() { + CatalogTable catalogTable = + CatalogTable.newBuilder() + .schema( + Schema.newBuilder() + .column("f0", DataTypes.INT().notNull()) + .column("f1", DataTypes.BIGINT()) + .build()) + .build(); + + catalogManager.createTable( + catalogTable, ObjectIdentifier.of("builtin", "default", "src1"), false); + + final String sql = + "create table tbl1 (f1 bigint metadata virtual) " + "AS SELECT * FROM src1"; + + assertThatThrownBy(() -> parseAndConvert(sql)) + .isInstanceOf(ValidationException.class) + .hasMessageContaining( + "A column named 'f1' already exists in the source schema. " + + "Virtual metadata columns cannot overwrite columns from " + + "source."); + } + + @Test + public void testCreateTableAsWithOverriddenComputedColumnsNotAllowed() { + CatalogTable catalogTable = + CatalogTable.newBuilder() + .schema( + Schema.newBuilder() + .column("f0", DataTypes.INT().notNull()) + .column("f1", DataTypes.BIGINT()) + .build()) + .build(); + + catalogManager.createTable( + catalogTable, ObjectIdentifier.of("builtin", "default", "src1"), false); + + final String sql = "create table tbl1 (f1 as 'f0 * 2') " + "AS SELECT * FROM src1"; + + assertThatThrownBy(() -> parseAndConvert(sql)) + .isInstanceOf(ValidationException.class) + .hasMessageContaining( + "A column named 'f1' already exists in the source schema. " + + "Computed columns cannot overwrite columns from source."); + } + + @Test + public void testCreateTableAsWithPrimaryAndPartitionKey() { + CatalogTable catalogTable = + CatalogTable.newBuilder() + .schema( + Schema.newBuilder() + .column("f0", DataTypes.INT().notNull()) + .column("f1", DataTypes.TIMESTAMP(3)) + .build()) + .build(); + + catalogManager.createTable( + catalogTable, ObjectIdentifier.of("builtin", "default", "src1"), false); + + final String sql = + "create table tbl1 (PRIMARY KEY (f0) NOT ENFORCED) " + + "PARTITIONED BY (f0) AS SELECT * FROM src1"; + + Operation ctas = parseAndConvert(sql); + Operation operation = ((CreateTableASOperation) ctas).getCreateTableOperation(); + assertThat(operation) + .is( + new HamcrestCondition<>( + isCreateTableOperation( + withNoDistribution(), + partitionedBy("f0"), + withSchema( + Schema.newBuilder() + .column("f0", DataTypes.INT().notNull()) + .column("f1", DataTypes.TIMESTAMP(3)) + .primaryKey("f0") + .build())))); + } + + @Test + public void testCreateTableAsWithWatermark() { + CatalogTable catalogTable = + CatalogTable.newBuilder() + .schema( + Schema.newBuilder() + .column("f0", DataTypes.INT().notNull()) + .column("f1", DataTypes.TIMESTAMP(3)) + .build()) + .build(); + + catalogManager.createTable( + catalogTable, ObjectIdentifier.of("builtin", "default", "src1"), false); + + final String sql = + "create table tbl1 (WATERMARK FOR f1 AS f1 - INTERVAL '3' SECOND) " + + "AS SELECT * FROM src1"; + + Operation ctas = parseAndConvert(sql); + Operation operation = ((CreateTableASOperation) ctas).getCreateTableOperation(); + assertThat(operation) + .is( + new HamcrestCondition<>( + isCreateTableOperation( + withNoDistribution(), + withSchema( + Schema.newBuilder() + .column("f0", DataTypes.INT().notNull()) + .column("f1", DataTypes.TIMESTAMP(3)) + .watermark( + "f1", "`f1` - INTERVAL '3' SECOND") + .build())))); + } + + @Test + public void testCreateTableAsWithNotNullColumnsAreNotAllowed() { + CatalogTable catalogTable = + CatalogTable.newBuilder() + .schema( + Schema.newBuilder() + .column("f0", DataTypes.INT().notNull()) + .column("f1", DataTypes.INT()) + .build()) + .build(); + + catalogManager.createTable( + catalogTable, ObjectIdentifier.of("builtin", "default", "src1"), false); + + final String sql = "create table tbl1 (c0 int not null) " + "AS SELECT * FROM src1"; + + assertThatThrownBy(() -> parseAndConvert(sql)) + .isInstanceOf(ValidationException.class) + .hasMessageContaining("Column 'c0' has no default value and does not allow NULLs."); + } + + @Test + public void testCreateTableAsWithIncompatibleImplicitCastTypes() { + CatalogTable catalogTable = + CatalogTable.newBuilder() + .schema( + Schema.newBuilder() + .column("f0", DataTypes.INT().notNull()) + .column("f1", DataTypes.TIMESTAMP(3)) + .build()) + .build(); + + catalogManager.createTable( + catalogTable, ObjectIdentifier.of("builtin", "default", "src1"), false); + + final String sql = "create table tbl1 (f0 boolean) AS SELECT * FROM src1"; + + assertThatThrownBy(() -> parseAndConvert(sql)) + .isInstanceOf(ValidationException.class) + .hasMessageContaining( + "Incompatible types for sink column 'f0' at position 0. " + + "The source column has type 'INT NOT NULL', while the target " + + "column has type 'BOOLEAN'."); + } + + @Test + public void testMergingCreateTableAsWithDistribution() { + CatalogTable catalogTable = + CatalogTable.newBuilder() + .schema( + Schema.newBuilder() + .column("f0", DataTypes.INT().notNull()) + .column("f1", DataTypes.TIMESTAMP(3)) + .columnByExpression("f2", "`f0` + 12345") + .watermark("f1", "`f1` - interval '1' second") + .build()) + .distribution(TableDistribution.ofHash(Collections.singletonList("f0"), 3)) + .partitionKeys(Arrays.asList("f0", "f1")) + .build(); + + catalogManager.createTable( + catalogTable, ObjectIdentifier.of("builtin", "default", "sourceTable"), false); + + final String sql = + "create table derivedTable DISTRIBUTED BY HASH(f0) INTO 2 BUCKETS " + + "AS SELECT * FROM sourceTable"; + + Operation ctas = parseAndConvert(sql); + Operation operation = ((CreateTableASOperation) ctas).getCreateTableOperation(); + assertThat(operation) + .is( + new HamcrestCondition<>( + isCreateTableOperation( + withDistribution( + TableDistribution.ofHash( + Collections.singletonList("f0"), 2)), + withSchema( + Schema.newBuilder() + .column("f0", DataTypes.INT().notNull()) + .column("f1", DataTypes.TIMESTAMP(3)) + .column("f2", DataTypes.INT().notNull()) + .build())))); + } + + @Test + public void testMergingCreateTableAsWitEmptyDistribution() { + Map sourceProperties = new HashMap<>(); + sourceProperties.put("format.type", "json"); + CatalogTable catalogTable = + CatalogTable.newBuilder() + .schema( + Schema.newBuilder() + .column("f0", DataTypes.INT().notNull()) + .column("f1", DataTypes.TIMESTAMP(3)) + .columnByExpression("f2", "`f0` + 12345") + .watermark("f1", "`f1` - interval '1' second") + .build()) + .distribution(TableDistribution.ofHash(Collections.singletonList("f0"), 3)) + .partitionKeys(Arrays.asList("f0", "f1")) + .options(sourceProperties) + .build(); + + catalogManager.createTable( + catalogTable, ObjectIdentifier.of("builtin", "default", "sourceTable"), false); + + final String sql = "create table derivedTable AS SELECT * FROM sourceTable"; + Operation ctas = parseAndConvert(sql); + Operation operation = ((CreateTableASOperation) ctas).getCreateTableOperation(); + assertThat(operation) + .is( + new HamcrestCondition<>( + isCreateTableOperation( + withNoDistribution(), + withSchema( + Schema.newBuilder() + .column("f0", DataTypes.INT().notNull()) + .column("f1", DataTypes.TIMESTAMP(3)) + .column("f2", DataTypes.INT().notNull()) + .build())))); + } + + private Operation parseAndConvert(String sql) { + final FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT); + final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT); + + SqlNode node = parser.parse(sql); + return SqlNodeToOperationConversion.convert(planner, catalogManager, node).get(); + } +} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java index 6904763dbf635..05075538fb4dc 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java @@ -688,311 +688,6 @@ public void tesCreateTableAsWithOrderingColumns() { .build())))); } - @Test - public void testCreateTableAsWithNotFoundColumnIdentifiers() { - CatalogTable catalogTable = - CatalogTable.newBuilder() - .schema( - Schema.newBuilder() - .column("f0", DataTypes.INT().notNull()) - .column("f1", DataTypes.INT()) - .build()) - .build(); - - catalogManager.createTable( - catalogTable, ObjectIdentifier.of("builtin", "default", "src1"), false); - - final String sql = "create table tbl1 (f1, f2) AS SELECT * FROM src1"; - - assertThatThrownBy(() -> parseAndConvert(sql)) - .isInstanceOf(ValidationException.class) - .hasMessageContaining("Column 'f2' not found in the source schema."); - } - - @Test - public void testCreateTableAsWithMismatchIdentifiersLength() { - CatalogTable catalogTable = - CatalogTable.newBuilder() - .schema( - Schema.newBuilder() - .column("f0", DataTypes.INT().notNull()) - .column("f1", DataTypes.INT()) - .build()) - .build(); - - catalogManager.createTable( - catalogTable, ObjectIdentifier.of("builtin", "default", "src1"), false); - - final String sql = "create table tbl1 (f1) AS SELECT * FROM src1"; - - assertThatThrownBy(() -> parseAndConvert(sql)) - .isInstanceOf(ValidationException.class) - .hasMessageContaining( - "The number of columns in the column list " - + "must match the number of columns in the source schema."); - } - - @Test - public void testCreateTableAsWithColumns() { - CatalogTable catalogTable = - CatalogTable.newBuilder() - .schema( - Schema.newBuilder() - .column("f0", DataTypes.INT().notNull()) - .column("f1", DataTypes.TIMESTAMP(3)) - .build()) - .build(); - - catalogManager.createTable( - catalogTable, ObjectIdentifier.of("builtin", "default", "src1"), false); - - final String sql = - "create table tbl1 (c0 int, c1 double metadata, c2 as c0 * f0, c3 timestamp(3), " - + "watermark FOR c3 AS c3 - interval '3' second) " - + "AS SELECT * FROM src1"; - - Operation ctas = parseAndConvert(sql); - Operation operation = ((CreateTableASOperation) ctas).getCreateTableOperation(); - assertThat(operation) - .is( - new HamcrestCondition<>( - isCreateTableOperation( - withNoDistribution(), - withSchema( - Schema.newBuilder() - .column("c0", DataTypes.INT()) - .columnByMetadata("c1", DataTypes.DOUBLE()) - .columnByExpression("c2", "`c0` * `f0`") - .column("c3", DataTypes.TIMESTAMP(3)) - .column("f0", DataTypes.INT().notNull()) - .column("f1", DataTypes.TIMESTAMP(3)) - .watermark( - "c3", "`c3` - INTERVAL '3' SECOND") - .build())))); - } - - @Test - public void testCreateTableAsWithColumnsOverridden() { - CatalogTable catalogTable = - CatalogTable.newBuilder() - .schema( - Schema.newBuilder() - .column("f0", DataTypes.INT().notNull()) - .column("f1", DataTypes.INT()) - .build()) - .build(); - - catalogManager.createTable( - catalogTable, ObjectIdentifier.of("builtin", "default", "src1"), false); - - final String sql = - "create table tbl1 (c0 int, f0 bigint not null, f1 double) " - + "AS SELECT * FROM src1"; - - Operation ctas = parseAndConvert(sql); - Operation operation = ((CreateTableASOperation) ctas).getCreateTableOperation(); - assertThat(operation) - .is( - new HamcrestCondition<>( - isCreateTableOperation( - withNoDistribution(), - withSchema( - Schema.newBuilder() - .column("c0", DataTypes.INT()) - .column("f0", DataTypes.BIGINT().notNull()) - .column("f1", DataTypes.DOUBLE()) - .build())))); - } - - @Test - public void testCreateTableAsWithPrimaryAndPartitionKey() { - CatalogTable catalogTable = - CatalogTable.newBuilder() - .schema( - Schema.newBuilder() - .column("f0", DataTypes.INT().notNull()) - .column("f1", DataTypes.TIMESTAMP(3)) - .build()) - .build(); - - catalogManager.createTable( - catalogTable, ObjectIdentifier.of("builtin", "default", "src1"), false); - - final String sql = - "create table tbl1 (PRIMARY KEY (f0) NOT ENFORCED) " - + "PARTITIONED BY (f0) AS SELECT * FROM src1"; - - Operation ctas = parseAndConvert(sql); - Operation operation = ((CreateTableASOperation) ctas).getCreateTableOperation(); - assertThat(operation) - .is( - new HamcrestCondition<>( - isCreateTableOperation( - withNoDistribution(), - partitionedBy("f0"), - withSchema( - Schema.newBuilder() - .column("f0", DataTypes.INT().notNull()) - .column("f1", DataTypes.TIMESTAMP(3)) - .primaryKey("f0") - .build())))); - } - - @Test - public void testCreateTableAsWithWatermark() { - CatalogTable catalogTable = - CatalogTable.newBuilder() - .schema( - Schema.newBuilder() - .column("f0", DataTypes.INT().notNull()) - .column("f1", DataTypes.TIMESTAMP(3)) - .build()) - .build(); - - catalogManager.createTable( - catalogTable, ObjectIdentifier.of("builtin", "default", "src1"), false); - - final String sql = - "create table tbl1 (WATERMARK FOR f1 AS f1 - INTERVAL '3' SECOND) " - + "AS SELECT * FROM src1"; - - Operation ctas = parseAndConvert(sql); - Operation operation = ((CreateTableASOperation) ctas).getCreateTableOperation(); - assertThat(operation) - .is( - new HamcrestCondition<>( - isCreateTableOperation( - withNoDistribution(), - withSchema( - Schema.newBuilder() - .column("f0", DataTypes.INT().notNull()) - .column("f1", DataTypes.TIMESTAMP(3)) - .watermark( - "f1", "`f1` - INTERVAL '3' SECOND") - .build())))); - } - - @Test - public void testCreateTableAsWithNotNullColumnsAreNotAllowed() { - CatalogTable catalogTable = - CatalogTable.newBuilder() - .schema( - Schema.newBuilder() - .column("f0", DataTypes.INT().notNull()) - .column("f1", DataTypes.INT()) - .build()) - .build(); - - catalogManager.createTable( - catalogTable, ObjectIdentifier.of("builtin", "default", "src1"), false); - - final String sql = "create table tbl1 (c0 int not null) " + "AS SELECT * FROM src1"; - - assertThatThrownBy(() -> parseAndConvert(sql)) - .isInstanceOf(ValidationException.class) - .hasMessageContaining("Column 'c0' has no default value and does not allow NULLs."); - } - - @Test - public void testCreateTableAsWithIncompatibleImplicitCastTypes() { - CatalogTable catalogTable = - CatalogTable.newBuilder() - .schema( - Schema.newBuilder() - .column("f0", DataTypes.INT().notNull()) - .column("f1", DataTypes.TIMESTAMP(3)) - .build()) - .build(); - - catalogManager.createTable( - catalogTable, ObjectIdentifier.of("builtin", "default", "src1"), false); - - final String sql = "create table tbl1 (f0 boolean) AS SELECT * FROM src1"; - - assertThatThrownBy(() -> parseAndConvert(sql)) - .isInstanceOf(ValidationException.class) - .hasMessageContaining( - "Incompatible types for sink column 'f0' at position 0. " - + "The source column has type 'INT NOT NULL', while the target " - + "column has type 'BOOLEAN'."); - } - - @Test - public void testMergingCreateTableAsWithDistribution() { - CatalogTable catalogTable = - CatalogTable.newBuilder() - .schema( - Schema.newBuilder() - .column("f0", DataTypes.INT().notNull()) - .column("f1", DataTypes.TIMESTAMP(3)) - .columnByExpression("f2", "`f0` + 12345") - .watermark("f1", "`f1` - interval '1' second") - .build()) - .distribution(TableDistribution.ofHash(Collections.singletonList("f0"), 3)) - .partitionKeys(Arrays.asList("f0", "f1")) - .build(); - - catalogManager.createTable( - catalogTable, ObjectIdentifier.of("builtin", "default", "sourceTable"), false); - - final String sql = - "create table derivedTable DISTRIBUTED BY HASH(f0) INTO 2 BUCKETS " - + "AS SELECT * FROM sourceTable"; - - Operation ctas = parseAndConvert(sql); - Operation operation = ((CreateTableASOperation) ctas).getCreateTableOperation(); - assertThat(operation) - .is( - new HamcrestCondition<>( - isCreateTableOperation( - withDistribution( - TableDistribution.ofHash( - Collections.singletonList("f0"), 2)), - withSchema( - Schema.newBuilder() - .column("f0", DataTypes.INT().notNull()) - .column("f1", DataTypes.TIMESTAMP(3)) - .column("f2", DataTypes.INT().notNull()) - .build())))); - } - - @Test - public void testMergingCreateTableAsWitEmptyDistribution() { - Map sourceProperties = new HashMap<>(); - sourceProperties.put("format.type", "json"); - CatalogTable catalogTable = - CatalogTable.newBuilder() - .schema( - Schema.newBuilder() - .column("f0", DataTypes.INT().notNull()) - .column("f1", DataTypes.TIMESTAMP(3)) - .columnByExpression("f2", "`f0` + 12345") - .watermark("f1", "`f1` - interval '1' second") - .build()) - .distribution(TableDistribution.ofHash(Collections.singletonList("f0"), 3)) - .partitionKeys(Arrays.asList("f0", "f1")) - .options(sourceProperties) - .build(); - - catalogManager.createTable( - catalogTable, ObjectIdentifier.of("builtin", "default", "sourceTable"), false); - - final String sql = "create table derivedTable AS SELECT * FROM sourceTable"; - Operation ctas = parseAndConvert(sql); - Operation operation = ((CreateTableASOperation) ctas).getCreateTableOperation(); - assertThat(operation) - .is( - new HamcrestCondition<>( - isCreateTableOperation( - withNoDistribution(), - withSchema( - Schema.newBuilder() - .column("f0", DataTypes.INT().notNull()) - .column("f1", DataTypes.TIMESTAMP(3)) - .column("f2", DataTypes.INT().notNull()) - .build())))); - } - @Test public void testCreateTableInvalidPartition() { final String sql = diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlRTASNodeToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlRTASNodeToOperationConverterTest.java index 78ed39bf9802b..40114de579ff5 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlRTASNodeToOperationConverterTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlRTASNodeToOperationConverterTest.java @@ -121,13 +121,14 @@ public void testCreateOrReplaceTableAsWithColumns() { String sql = "CREATE OR REPLACE TABLE " + tableName - + "(c0 int, c1 double metadata, c2 as c0 * a) " + + "(c0 int, c1 double metadata, c2 as c0 * a, c3 int metadata virtual) " + " WITH ('k1' = 'v1', 'k2' = 'v2') as SELECT * FROM t1"; Schema tableSchema = Schema.newBuilder() .column("c0", DataTypes.INT()) .columnByMetadata("c1", DataTypes.DOUBLE()) .columnByExpression("c2", "`c0` * `a`") + .columnByMetadata("c3", DataTypes.INT(), true) .fromSchema(getDefaultTableSchema()) .build(); @@ -140,15 +141,16 @@ public void testCreateOrReplaceTableAsWithColumnsOverridden() { String sql = "CREATE OR REPLACE TABLE " + tableName - + "(c0 int, a double, c int) " - + " WITH ('k1' = 'v1', 'k2' = 'v2') as SELECT * FROM t1"; + + "(c0 int, a double, bb string, c int metadata, dd string metadata) " + + " WITH ('k1' = 'v1', 'k2' = 'v2') " + + "as SELECT a, b as `bb`, c, d as `dd` FROM t1"; Schema tableSchema = Schema.newBuilder() .column("c0", DataTypes.INT()) .column("a", DataTypes.DOUBLE()) - .column("b", DataTypes.STRING()) - .column("c", DataTypes.INT()) - .column("d", DataTypes.STRING()) + .column("bb", DataTypes.STRING()) + .columnByMetadata("c", DataTypes.INT()) + .columnByMetadata("dd", DataTypes.STRING()) .build(); testCommonReplaceTableAs(sql, tableName, null, tableSchema, null, Collections.emptyList()); @@ -168,6 +170,39 @@ public void testCreateOrReplaceTableAsWithNotNullColumnsAreNotAllowed() { .hasMessageContaining("Column 'c0' has no default value and does not allow NULLs."); } + @Test + public void testCreateOrReplaceTableAsWithOverriddenVirtualMetadataColumnsNotAllowed() { + String tableName = "create_or_replace_table"; + String sql = + "CREATE OR REPLACE TABLE " + + tableName + + "(c int metadata virtual) " + + " WITH ('k1' = 'v1', 'k2' = 'v2') as SELECT * FROM t1"; + + assertThatThrownBy(() -> parseAndConvert(sql)) + .isInstanceOf(ValidationException.class) + .hasMessageContaining( + "A column named 'c' already exists in the source schema. " + + "Virtual metadata columns cannot overwrite columns from " + + "source."); + } + + @Test + public void testCreateOrReplaceTableAsWithOverriddenComputedColumnsNotAllowed() { + String tableName = "create_or_replace_table"; + String sql = + "CREATE OR REPLACE TABLE " + + tableName + + "(c as 'f0 * 2') " + + " WITH ('k1' = 'v1', 'k2' = 'v2') as SELECT * FROM t1"; + + assertThatThrownBy(() -> parseAndConvert(sql)) + .isInstanceOf(ValidationException.class) + .hasMessageContaining( + "A column named 'c' already exists in the source schema. " + + "Computed columns cannot overwrite columns from source."); + } + @Test public void testCreateOrReplaceTableAsWithIncompatibleImplicitCastTypes() { String tableName = "create_or_replace_table";