Skip to content

Commit ff9a47a

Browse files
committed
[FLINK-34690] Cast decimal to VARCHAR/INT/BIGINT/LARGEINT as primary key in starrocks sink
1 parent 88c23b5 commit ff9a47a

File tree

2 files changed

+86
-4
lines changed

2 files changed

+86
-4
lines changed

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtils.java

+23-4
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ public static void toStarRocksDataType(
129129
* @param fieldType the element type of the RecordData
130130
* @param fieldPos the element position of the RecordData
131131
* @param zoneId the time zone used when converting from <code>TIMESTAMP WITH LOCAL TIME ZONE
132-
* </code>
132+
* </code>
133133
*/
134134
public static RecordData.FieldGetter createFieldGetter(
135135
DataType fieldType, int fieldPos, ZoneId zoneId) {
@@ -297,10 +297,29 @@ public StarRocksColumn.Builder visit(DoubleType doubleType) {
297297

298298
@Override
299299
public StarRocksColumn.Builder visit(DecimalType decimalType) {
300-
builder.setDataType(DECIMAL);
300+
// StarRocks is not support Decimal as primary key, so decimal should be cast to INT,
301+
// BIGINT, LARGEINT or VARHCAR.
302+
if (!isPrimaryKeys) {
303+
builder.setDataType(DECIMAL);
304+
builder.setColumnSize(decimalType.getPrecision());
305+
builder.setDecimalDigits(decimalType.getScale());
306+
} else if (decimalType.getPrecision() < 10 && decimalType.getScale() == 0) {
307+
// Int is range from [-2,147,483,648, 2,147,483,647],
308+
// some data with precision of 10 is out of range.
309+
builder.setDataType(INT);
310+
} else if (decimalType.getPrecision() < 19 && decimalType.getScale() == 0) {
311+
// BigInt is range from [-9,223.372,036,854,775,808~9,223.372,036,854,775,807],
312+
// some data with precision of 19 is out of range.
313+
builder.setDataType(BIGINT);
314+
} else if (decimalType.getPrecision() < 38 && decimalType.getScale() == 0) {
315+
// LargeInt is range from [-1.701411835E38 ~ 1.701411835E38],
316+
// some data with precision of 38 is out of range.
317+
builder.setDataType(LARGEINT);
318+
} else {
319+
builder.setDataType(VARCHAR);
320+
builder.setColumnSize(Math.min(decimalType.getPrecision(), MAX_VARCHAR_SIZE));
321+
}
301322
builder.setNullable(decimalType.isNullable());
302-
builder.setColumnSize(decimalType.getPrecision());
303-
builder.setDecimalDigits(decimalType.getScale());
304323
return builder;
305324
}
306325

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/CdcDataTypeTransformerTest.java

+63
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.flink.cdc.connectors.starrocks.sink;
1919

2020
import org.apache.flink.cdc.common.types.CharType;
21+
import org.apache.flink.cdc.common.types.DecimalType;
2122
import org.apache.flink.cdc.common.types.VarCharType;
2223

2324
import com.starrocks.connector.flink.catalog.StarRocksColumn;
@@ -75,6 +76,68 @@ public void testCharTypeForPrimaryKey() {
7576
assertTrue(smallLengthColumn.isNullable());
7677
}
7778

79+
@Test
80+
public void testDecimalForPrimaryKey() {
81+
// map to Int of StarRocks if primary key column with precision < 10 and scale = 0
82+
StarRocksColumn.Builder intLengthBuilder =
83+
new StarRocksColumn.Builder().setColumnName("primary_key").setOrdinalPosition(0);
84+
new DecimalType(9, 0)
85+
.accept(new StarRocksUtils.CdcDataTypeTransformer(true, intLengthBuilder));
86+
StarRocksColumn intLengthColumn = intLengthBuilder.build();
87+
assertEquals("primary_key", intLengthColumn.getColumnName());
88+
assertEquals(0, intLengthColumn.getOrdinalPosition());
89+
assertEquals(StarRocksUtils.INT, intLengthColumn.getDataType());
90+
assertTrue(intLengthColumn.isNullable());
91+
92+
// map to BigInt of StarRocks if primary key column with precision < 19 and scale = 0
93+
StarRocksColumn.Builder bigIntLengthBuilder =
94+
new StarRocksColumn.Builder().setColumnName("primary_key").setOrdinalPosition(0);
95+
new DecimalType(10, 0)
96+
.accept(new StarRocksUtils.CdcDataTypeTransformer(true, bigIntLengthBuilder));
97+
StarRocksColumn bigIntLengthColumn = bigIntLengthBuilder.build();
98+
assertEquals("primary_key", bigIntLengthColumn.getColumnName());
99+
assertEquals(0, bigIntLengthColumn.getOrdinalPosition());
100+
assertEquals(StarRocksUtils.BIGINT, bigIntLengthColumn.getDataType());
101+
assertTrue(bigIntLengthColumn.isNullable());
102+
103+
// map to LARGEINT of StarRocks if primary key column with precision < 18 and scale = 0
104+
StarRocksColumn.Builder unsignedBigIntLengthBuilder =
105+
new StarRocksColumn.Builder().setColumnName("primary_key").setOrdinalPosition(0);
106+
new DecimalType(20, 0)
107+
.accept(
108+
new StarRocksUtils.CdcDataTypeTransformer(
109+
true, unsignedBigIntLengthBuilder));
110+
StarRocksColumn unsignedBigIntLengthColumn = unsignedBigIntLengthBuilder.build();
111+
assertEquals("primary_key", unsignedBigIntLengthColumn.getColumnName());
112+
assertEquals(0, unsignedBigIntLengthColumn.getOrdinalPosition());
113+
assertEquals(StarRocksUtils.LARGEINT, unsignedBigIntLengthColumn.getDataType());
114+
assertTrue(unsignedBigIntLengthColumn.isNullable());
115+
116+
// map to Varchar of StarRocks if primary key column with precision >= 38 and scale = 0
117+
StarRocksColumn.Builder outOfBoundLengthBuilder =
118+
new StarRocksColumn.Builder().setColumnName("primary_key").setOrdinalPosition(0);
119+
new DecimalType(38, 0)
120+
.accept(new StarRocksUtils.CdcDataTypeTransformer(true, outOfBoundLengthBuilder));
121+
StarRocksColumn outOfBoundColumn = outOfBoundLengthBuilder.build();
122+
assertEquals("primary_key", outOfBoundColumn.getColumnName());
123+
assertEquals(0, outOfBoundColumn.getOrdinalPosition());
124+
assertEquals(StarRocksUtils.VARCHAR, outOfBoundColumn.getDataType());
125+
assertEquals(Integer.valueOf(38), outOfBoundColumn.getColumnSize().orElse(null));
126+
assertTrue(unsignedBigIntLengthColumn.isNullable());
127+
128+
// map to Varchar of StarRocks if primary key column with scale > 0
129+
StarRocksColumn.Builder scaleBuilder =
130+
new StarRocksColumn.Builder().setColumnName("primary_key").setOrdinalPosition(0);
131+
new DecimalType(12, 1)
132+
.accept(new StarRocksUtils.CdcDataTypeTransformer(true, scaleBuilder));
133+
StarRocksColumn scaleBoundColumn = scaleBuilder.build();
134+
assertEquals("primary_key", scaleBoundColumn.getColumnName());
135+
assertEquals(0, scaleBoundColumn.getOrdinalPosition());
136+
assertEquals(StarRocksUtils.VARCHAR, scaleBoundColumn.getDataType());
137+
assertEquals(Integer.valueOf(12), scaleBoundColumn.getColumnSize().orElse(null));
138+
assertTrue(unsignedBigIntLengthColumn.isNullable());
139+
}
140+
78141
@Test
79142
public void testVarCharType() {
80143
// the length fo StarRocks should be 3 times as that of CDC if CDC length * 3 <=

0 commit comments

Comments
 (0)