Skip to content

Commit d9eb607

Browse files
loserwang1024zhangchaoming.zcm
authored and
zhangchaoming.zcm
committed
[FLINK-34690] Cast decimal to VARCHAR as primary key in starrocks sink (apache#3150)
1 parent 021243e commit d9eb607

File tree

2 files changed

+58
-3
lines changed

2 files changed

+58
-3
lines changed

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtils.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -297,10 +297,20 @@ public StarRocksColumn.Builder visit(DoubleType doubleType) {
297297

298298
@Override
299299
public StarRocksColumn.Builder visit(DecimalType decimalType) {
300-
builder.setDataType(DECIMAL);
300+
// StarRocks does not support Decimal as primary key, so decimal should be cast to
301+
// VARCHAR.
302+
if (!isPrimaryKeys) {
303+
builder.setDataType(DECIMAL);
304+
builder.setColumnSize(decimalType.getPrecision());
305+
builder.setDecimalDigits(decimalType.getScale());
306+
} else {
307+
builder.setDataType(VARCHAR);
308+
// For a DecimalType with precision N, we may need N + 1 or N + 2 characters to store it as a
309+
// string (one for negative sign, and one for decimal point)
310+
builder.setColumnSize(Math.min(
311+
decimalType.getScale() != 0? decimalType.getPrecision() + 2:decimalType.getPrecision() + 1, MAX_VARCHAR_SIZE));
312+
}
301313
builder.setNullable(decimalType.isNullable());
302-
builder.setColumnSize(decimalType.getPrecision());
303-
builder.setDecimalDigits(decimalType.getScale());
304314
return builder;
305315
}
306316

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/CdcDataTypeTransformerTest.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.flink.cdc.connectors.starrocks.sink;
1919

2020
import org.apache.flink.cdc.common.types.CharType;
21+
import org.apache.flink.cdc.common.types.DecimalType;
2122
import org.apache.flink.cdc.common.types.VarCharType;
2223

2324
import com.starrocks.connector.flink.catalog.StarRocksColumn;
@@ -75,6 +76,50 @@ public void testCharTypeForPrimaryKey() {
7576
assertTrue(smallLengthColumn.isNullable());
7677
}
7778

79+
@Test
80+
public void testDecimalForPrimaryKey() {
81+
// Map to DECIMAL of StarRocks if column is DECIMAL type and not primary key.
82+
StarRocksColumn.Builder noPrimaryKeyBuilder =
83+
new StarRocksColumn.Builder().setColumnName("no_primary_key").setOrdinalPosition(0);
84+
new DecimalType(20, 1)
85+
.accept(new StarRocksUtils.CdcDataTypeTransformer(false, noPrimaryKeyBuilder));
86+
StarRocksColumn noPrimaryKeyColumn = noPrimaryKeyBuilder.build();
87+
assertEquals("no_primary_key", noPrimaryKeyColumn.getColumnName());
88+
assertEquals(0, noPrimaryKeyColumn.getOrdinalPosition());
89+
assertEquals(StarRocksUtils.DECIMAL, noPrimaryKeyColumn.getDataType());
90+
assertEquals(Integer.valueOf(20), noPrimaryKeyColumn.getColumnSize().orElse(null));
91+
assertEquals(Integer.valueOf(1), noPrimaryKeyColumn.getDecimalDigits().get());
92+
assertTrue(noPrimaryKeyColumn.isNullable());
93+
94+
// Map to VARCHAR of StarRocks if column is DECIMAL type and primary key.
95+
StarRocksColumn.Builder primaryKeyBuilder =
96+
new StarRocksColumn.Builder().setColumnName("primary_key").setOrdinalPosition(1);
97+
new DecimalType(20, 1)
98+
.notNull()
99+
.accept(new StarRocksUtils.CdcDataTypeTransformer(true, primaryKeyBuilder));
100+
StarRocksColumn primaryKeyColumn = primaryKeyBuilder.build();
101+
assertEquals("primary_key", primaryKeyColumn.getColumnName());
102+
assertEquals(1, primaryKeyColumn.getOrdinalPosition());
103+
assertEquals(StarRocksUtils.VARCHAR, primaryKeyColumn.getDataType());
104+
assertEquals(Integer.valueOf(22), primaryKeyColumn.getColumnSize().orElse(null));
105+
assertTrue(!primaryKeyColumn.isNullable());
106+
107+
// Map to VARCHAR of StarRocks if column is DECIMAL type and primary key
108+
// DECIMAL(20,0) is common in cdc pipeline, for example, the upstream cdc source is unsigned
109+
// BIGINT.
110+
StarRocksColumn.Builder unsignedBigIntKeyBuilder =
111+
new StarRocksColumn.Builder().setColumnName("primary_key").setOrdinalPosition(1);
112+
new DecimalType(20, 0)
113+
.notNull()
114+
.accept(new StarRocksUtils.CdcDataTypeTransformer(true, unsignedBigIntKeyBuilder));
115+
StarRocksColumn unsignedBigIntColumn = unsignedBigIntKeyBuilder.build();
116+
assertEquals("primary_key", unsignedBigIntColumn.getColumnName());
117+
assertEquals(1, unsignedBigIntColumn.getOrdinalPosition());
118+
assertEquals(StarRocksUtils.VARCHAR, unsignedBigIntColumn.getDataType());
119+
assertEquals(Integer.valueOf(21), unsignedBigIntColumn.getColumnSize().orElse(null));
120+
assertTrue(!unsignedBigIntColumn.isNullable());
121+
}
122+
78123
@Test
79124
public void testVarCharType() {
80125
// the length fo StarRocks should be 3 times as that of CDC if CDC length * 3 <=

0 commit comments

Comments
 (0)