Skip to content

Commit

Permalink
[Improve][Connector-V2] Improve schema evolution on column insert aft…
Browse files Browse the repository at this point in the history
…er for mysql-jdbc (#8017)
  • Loading branch information
jw-itq authored Nov 16, 2024
1 parent a0eeeb9 commit 3fb05da
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -529,14 +529,16 @@ default void applySchemaChange(
throws SQLException {
String tableIdentifierWithQuoted = tableIdentifier(tablePath);
Column addColumn = event.getColumn();
String afterColumn = event.getAfterColumn();
String addColumnSQL =
buildAlterTableSql(
event.getSourceDialectName(),
addColumn.getSourceType(),
AlterType.ADD.name(),
addColumn,
tableIdentifierWithQuoted,
StringUtils.EMPTY);
StringUtils.EMPTY,
afterColumn);
try (Statement statement = connection.createStatement()) {
log.info("Executing add column SQL: " + addColumnSQL);
statement.execute(addColumnSQL);
Expand All @@ -549,6 +551,7 @@ default void applySchemaChange(
String tableIdentifierWithQuoted = tableIdentifier(tablePath);
Column changeColumn = event.getColumn();
String oldColumnName = event.getOldColumn();
String afterColumn = event.getAfterColumn();
String changeColumnSQL =
buildAlterTableSql(
event.getSourceDialectName(),
Expand All @@ -558,7 +561,8 @@ default void applySchemaChange(
: AlterType.CHANGE.name(),
changeColumn,
tableIdentifierWithQuoted,
oldColumnName);
oldColumnName,
afterColumn);

try (Statement statement = connection.createStatement()) {
log.info("Executing change column SQL: " + changeColumnSQL);
Expand All @@ -571,14 +575,16 @@ default void applySchemaChange(
throws SQLException {
String tableIdentifierWithQuoted = tableIdentifier(tablePath);
Column modifyColumn = event.getColumn();
String afterColumn = event.getAfterColumn();
String modifyColumnSQL =
buildAlterTableSql(
event.getSourceDialectName(),
modifyColumn.getSourceType(),
AlterType.MODIFY.name(),
modifyColumn,
tableIdentifierWithQuoted,
StringUtils.EMPTY);
StringUtils.EMPTY,
afterColumn);

try (Statement statement = connection.createStatement()) {
log.info("Executing modify column SQL: " + modifyColumnSQL);
Expand All @@ -598,7 +604,8 @@ default void applySchemaChange(
AlterType.DROP.name(),
null,
tableIdentifierWithQuoted,
dropColumn);
dropColumn,
null);
try (Statement statement = connection.createStatement()) {
log.info("Executing drop column SQL: " + dropColumnSQL);
statement.execute(dropColumnSQL);
Expand All @@ -614,6 +621,7 @@ default void applySchemaChange(
* @param newColumn new column after ddl
* @param tableName table name of sink table
* @param oldColumnName old column name before ddl
* @param afterColumn column before the new column
* @return alter table sql for sink table after schema change
*/
default String buildAlterTableSql(
Expand All @@ -622,7 +630,8 @@ default String buildAlterTableSql(
String alterOperation,
Column newColumn,
String tableName,
String oldColumnName) {
String oldColumnName,
String afterColumn) {
if (StringUtils.equals(alterOperation, AlterType.DROP.name())) {
return String.format(
"ALTER TABLE %s drop column %s", tableName, quoteIdentifier(oldColumnName));
Expand Down Expand Up @@ -654,6 +663,7 @@ default String buildAlterTableSql(
}
basicSql = decorateWithNullable(basicSql, typeBasicTypeDefine, sourceDialectName);
basicSql = decorateWithComment(tableName, basicSql, typeBasicTypeDefine);
basicSql = decorateWithAfterColumn(basicSql, afterColumn);
return dialectName().equals(DatabaseIdentifier.ORACLE) ? basicSql : basicSql + ";";
}

Expand Down Expand Up @@ -779,6 +789,21 @@ default String decorateWithComment(
return sql.toString();
}

/**
* decorate with after
*
* @param basicSql alter table sql for sink table
* @param afterColumn column before the new column
* @return alter table sql with after for sink table
*/
default String decorateWithAfterColumn(String basicSql, String afterColumn) {
StringBuilder sql = new StringBuilder(basicSql);
if (StringUtils.isNotBlank(afterColumn)) {
sql.append("AFTER ").append(afterColumn).append(StringUtils.SPACE);
}
return sql.toString();
}

/**
* whether quotes with default value
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import java.sql.Connection;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;

@Slf4j
public abstract class AbstractJdbcSinkWriter<ResourceT>
Expand Down Expand Up @@ -90,8 +91,27 @@ protected void processSchemaChangeEvent(AlterTableColumnEvent event) throws IOEx
List<Column> columns = new ArrayList<>(tableSchema.getColumns());
switch (event.getEventType()) {
case SCHEMA_CHANGE_ADD_COLUMN:
Column addColumn = ((AlterTableAddColumnEvent) event).getColumn();
columns.add(addColumn);
AlterTableAddColumnEvent alterTableAddColumnEvent =
(AlterTableAddColumnEvent) event;
Column addColumn = alterTableAddColumnEvent.getColumn();
String afterColumn = alterTableAddColumnEvent.getAfterColumn();
if (StringUtils.isNotBlank(afterColumn)) {
Optional<Column> columnOptional =
columns.stream()
.filter(column -> afterColumn.equals(column.getName()))
.findFirst();
if (!columnOptional.isPresent()) {
columns.add(addColumn);
break;
}
columnOptional.ifPresent(
column -> {
int index = columns.indexOf(column);
columns.add(index + 1, addColumn);
});
} else {
columns.add(addColumn);
}
break;
case SCHEMA_CHANGE_DROP_COLUMN:
String dropColumn = ((AlterTableDropColumnEvent) event).getColumn();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,19 @@ values (128,"scooter","Small 2-wheel scooter",3.14,'xx',1,1.1,'2023-02-02 09:09:
(136,"spare tire","24 inch spare tire",22.2,'xx',9,1.9,'2023-02-02 09:09:09');
update products set name = 'dailai' where id = 135;

alter table products ADD COLUMN add_column6 varchar(64) not null default 'ff' after id;
delete from products where id = 115;
insert into products
values (173,'tt',"scooter","Small 2-wheel scooter",3.14,'xx',1,1.1,'2023-02-02 09:09:09'),
(174,'tt',"car battery","12V car battery",8.1,'xx',2,1.2,'2023-02-02 09:09:09'),
(175,'tt',"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3",0.8,'xx',3,1.3,'2023-02-02 09:09:09'),
(176,'tt',"hammer","12oz carpenter's hammer",0.75,'xx',4,1.4,'2023-02-02 09:09:09'),
(177,'tt',"hammer","14oz carpenter's hammer",0.875,'xx',5,1.5,'2023-02-02 09:09:09'),
(178,'tt',"hammer","16oz carpenter's hammer",1.0,'xx',6,1.6,'2023-02-02 09:09:09'),
(179,'tt',"rocks","box of assorted rocks",5.3,'xx',7,1.7,'2023-02-02 09:09:09'),
(180,'tt',"jacket","water resistent black wind breaker",0.1,'xx',8,1.8,'2023-02-02 09:09:09'),
(181,'tt',"spare tire","24 inch spare tire",22.2,'xx',9,1.9,'2023-02-02 09:09:09');

-- add column for irrelevant table
ALTER TABLE products_on_hand ADD COLUMN add_column5 varchar(64) not null default 'yy';

Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
CREATE DATABASE IF NOT EXISTS `shop`;
use shop;

alter table products drop column add_column4;
alter table products drop column add_column4,drop column add_column6;
insert into products
values (137,"scooter","Small 2-wheel scooter",3.14,'xx',1,1.1),
(138,"car battery","12V car battery",8.1,'xx',2,1.2),
Expand Down

0 comments on commit 3fb05da

Please sign in to comment.