Skip to content

Commit

Permalink
[FLINK-36861][table-planner] CREATE TABLE AS with METADATA columns do…
Browse files Browse the repository at this point in the history
…esn't work
  • Loading branch information
spena authored and dawidwys committed Dec 11, 2024
1 parent 529f640 commit 5ca9eef
Show file tree
Hide file tree
Showing 5 changed files with 486 additions and 316 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Schema.UnresolvedColumn;
import org.apache.flink.table.api.Schema.UnresolvedMetadataColumn;
import org.apache.flink.table.api.Schema.UnresolvedPhysicalColumn;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.ValidationException;
Expand Down Expand Up @@ -386,19 +387,31 @@ private void validateImplicitCastCompatibility(
int columnPos,
UnresolvedColumn sourceColumn,
UnresolvedColumn sinkColumn) {
if (!(sinkColumn instanceof UnresolvedPhysicalColumn)) {
LogicalType sinkColumnType;

if (sinkColumn instanceof UnresolvedPhysicalColumn) {
sinkColumnType = getLogicalType(((UnresolvedPhysicalColumn) sinkColumn));
} else if ((sinkColumn instanceof UnresolvedMetadataColumn)) {
if (((UnresolvedMetadataColumn) sinkColumn).isVirtual()) {
throw new ValidationException(
String.format(
"A column named '%s' already exists in the source schema. "
+ "Virtual metadata columns cannot overwrite "
+ "columns from source.",
columnName));
}

sinkColumnType = getLogicalType(((UnresolvedMetadataColumn) sinkColumn));
} else {
throw new ValidationException(
String.format(
"A column named '%s' already exists in the source schema. "
+ "Computed and metadata columns cannot overwrite "
+ "regular columns.",
+ "Computed columns cannot overwrite columns from source.",
columnName));
}

LogicalType sourceColumnType =
getLogicalType(((UnresolvedPhysicalColumn) sourceColumn));
LogicalType sinkColumnType = getLogicalType(((UnresolvedPhysicalColumn) sinkColumn));

if (!supportsImplicitCast(sourceColumnType, sinkColumnType)) {
throw new ValidationException(
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,14 @@ LogicalType getLogicalType(UnresolvedPhysicalColumn column) {
return dataTypeFactory.createDataType(column.getDataType()).getLogicalType();
}

/**
* Gets the column data type of {@link UnresolvedMetadataColumn} column and convert it to a
* {@link LogicalType}.
*/
LogicalType getLogicalType(UnresolvedMetadataColumn column) {
return dataTypeFactory.createDataType(column.getDataType()).getLogicalType();
}

Optional<String> getComment(SqlTableColumn column) {
return column.getComment().map(c -> ((SqlLiteral) c).getValueAs(String.class));
}
Expand Down
Loading

0 comments on commit 5ca9eef

Please sign in to comment.