Skip to content

Commit 2ad8006

Browse files
authored
[FLINK-36864][cdc-runtime] Fix unable to use numeric literals that goes beyond Int32 range (#3785)
Signed-off-by: yuxiqian <[email protected]>
1 parent 6d21941 commit 2ad8006

File tree

4 files changed

+217
-0
lines changed

4 files changed

+217
-0
lines changed

flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1779,6 +1779,104 @@ void testNumericCastingsWithTruncation() throws Exception {
17791779
"DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[2, null, null, null, null, null, null, null, null, null], op=INSERT, meta=()}");
17801780
}
17811781

1782+
@Test
1783+
void testTransformWithLargeLiterals() throws Exception {
1784+
FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
1785+
1786+
// Setup value source
1787+
Configuration sourceConfig = new Configuration();
1788+
1789+
sourceConfig.set(
1790+
ValuesDataSourceOptions.EVENT_SET_ID,
1791+
ValuesDataSourceHelper.EventSetId.CUSTOM_SOURCE_EVENTS);
1792+
1793+
TableId tableId = TableId.tableId("default_namespace", "default_schema", "mytable1");
1794+
List<Event> events = generateSchemaEvolutionEvents(tableId);
1795+
1796+
ValuesDataSourceHelper.setSourceEvents(Collections.singletonList(events));
1797+
1798+
SourceDef sourceDef =
1799+
new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig);
1800+
1801+
// Setup value sink
1802+
Configuration sinkConfig = new Configuration();
1803+
SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig);
1804+
1805+
// Setup pipeline
1806+
Configuration pipelineConfig = new Configuration();
1807+
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
1808+
pipelineConfig.set(
1809+
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
1810+
PipelineDef pipelineDef =
1811+
new PipelineDef(
1812+
sourceDef,
1813+
sinkDef,
1814+
Collections.emptyList(),
1815+
Collections.singletonList(
1816+
new TransformDef(
1817+
"\\.*.\\.*.\\.*",
1818+
"*, 2147483647 AS int_max, "
1819+
+ "2147483648 AS greater_than_int_max, "
1820+
+ "-2147483648 AS int_min, "
1821+
+ "-2147483649 AS less_than_int_min, "
1822+
+ "CAST(1234567890123456789 AS DECIMAL(20, 0)) AS really_big_decimal",
1823+
"CAST(id AS BIGINT) + 2147483648 > 2147483649", // equivalent to id > 1
1824+
null,
1825+
null,
1826+
null,
1827+
null)),
1828+
Collections.emptyList(),
1829+
pipelineConfig);
1830+
1831+
// Execute the pipeline
1832+
PipelineExecution execution = composer.compose(pipelineDef);
1833+
execution.execute();
1834+
1835+
// Check the order and content of all received events
1836+
String[] outputEvents = outCaptor.toString().trim().split("\n");
1837+
1838+
assertThat(outputEvents)
1839+
.containsExactly(
1840+
// Initial stage
1841+
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT,`int_max` INT,`greater_than_int_max` BIGINT,`int_min` INT,`less_than_int_min` BIGINT,`really_big_decimal` DECIMAL(19, 0)}, primaryKeys=id, options=()}",
1842+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Barcarolle, 22, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], op=INSERT, meta=()}",
1843+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[3, Cecily, 23, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], op=INSERT, meta=()}",
1844+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[3, Cecily, 23, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], after=[3, Colin, 24, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], op=UPDATE, meta=()}",
1845+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, Barcarolle, 22, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], after=[], op=DELETE, meta=()}",
1846+
1847+
// Add Column
1848+
"AddColumnEvent{tableId=default_namespace.default_schema.mytable1, addedColumns=[ColumnWithPosition{column=`rank` STRING, position=BEFORE, existedColumnName=id}, ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}",
1849+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1st, 4, Derrida, 24, 0, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], op=INSERT, meta=()}",
1850+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2nd, 5, Eve, 25, 1, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], op=INSERT, meta=()}",
1851+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2nd, 5, Eve, 25, 1, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], after=[2nd, 5, Eva, 20, 2, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], op=UPDATE, meta=()}",
1852+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[3rd, 6, Fiona, 26, 3, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], op=INSERT, meta=()}",
1853+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[3rd, 6, Fiona, 26, 3, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], after=[], op=DELETE, meta=()}",
1854+
1855+
// Alter column type
1856+
"AlterColumnTypeEvent{tableId=default_namespace.default_schema.mytable1, typeMapping={gender=INT, name=VARCHAR(17), age=DOUBLE}, oldTypeMapping={gender=TINYINT, name=STRING, age=INT}}",
1857+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[4th, 7, Gem, 19.0, -1, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], op=INSERT, meta=()}",
1858+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[5th, 8, Helen, 18.0, -2, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], op=INSERT, meta=()}",
1859+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[5th, 8, Helen, 18.0, -2, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], after=[5th, 8, Harry, 18.0, -3, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], op=UPDATE, meta=()}",
1860+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[6th, 9, IINA, 17.0, 0, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], op=INSERT, meta=()}",
1861+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[6th, 9, IINA, 17.0, 0, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], after=[], op=DELETE, meta=()}",
1862+
1863+
// Rename column
1864+
"RenameColumnEvent{tableId=default_namespace.default_schema.mytable1, nameMapping={gender=biological_sex, age=toshi}}",
1865+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[7th, 10, Julia, 24.0, 1, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], op=INSERT, meta=()}",
1866+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[8th, 11, Kalle, 23.0, 0, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], op=INSERT, meta=()}",
1867+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[8th, 11, Kalle, 23.0, 0, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], after=[8th, 11, Kella, 18.0, 0, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], op=UPDATE, meta=()}",
1868+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[9th, 12, Lynx, 17.0, 0, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], op=INSERT, meta=()}",
1869+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[9th, 12, Lynx, 17.0, 0, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], after=[], op=DELETE, meta=()}",
1870+
1871+
// Drop column
1872+
"DropColumnEvent{tableId=default_namespace.default_schema.mytable1, droppedColumnNames=[biological_sex, toshi]}",
1873+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[10th, 13, Munroe, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], op=INSERT, meta=()}",
1874+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[11th, 14, Neko, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], op=INSERT, meta=()}",
1875+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[11th, 14, Neko, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], after=[11th, 14, Nein, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], op=UPDATE, meta=()}",
1876+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[12th, 15, Oops, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], op=INSERT, meta=()}",
1877+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[12th, 15, Oops, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], after=[], op=DELETE, meta=()}");
1878+
}
1879+
17821880
private List<Event> generateSchemaEvolutionEvents(TableId tableId) {
17831881
List<Event> events = new ArrayList<>();
17841882

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.calcite.sql.SqlLiteral;
3232
import org.apache.calcite.sql.SqlNode;
3333
import org.apache.calcite.sql.SqlNodeList;
34+
import org.apache.calcite.sql.SqlNumericLiteral;
3435
import org.apache.calcite.sql.fun.SqlCase;
3536
import org.apache.calcite.sql.type.SqlTypeName;
3637
import org.codehaus.commons.compiler.CompileException;
@@ -138,6 +139,13 @@ private static Java.Rvalue translateSqlSqlLiteral(SqlLiteral sqlLiteral) {
138139
if (sqlLiteral instanceof SqlCharStringLiteral) {
139140
// Double quotation marks represent strings in Janino.
140141
value = "\"" + value.substring(1, value.length() - 1) + "\"";
142+
} else if (sqlLiteral instanceof SqlNumericLiteral) {
143+
if (((SqlNumericLiteral) sqlLiteral).isInteger()) {
144+
long longValue = sqlLiteral.longValue(true);
145+
if (longValue > Integer.MAX_VALUE || longValue < Integer.MIN_VALUE) {
146+
value += "L";
147+
}
148+
}
141149
}
142150
if (SQL_TYPE_NAME_IGNORE.contains(sqlLiteral.getTypeName())) {
143151
value = "\"" + value + "\"";

flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/JaninoCompilerTest.java

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.flink.cdc.runtime.parser;
1919

20+
import org.apache.flink.api.java.tuple.Tuple2;
21+
2022
import org.codehaus.commons.compiler.CompileException;
2123
import org.codehaus.commons.compiler.Location;
2224
import org.codehaus.janino.ExpressionEvaluator;
@@ -34,6 +36,9 @@
3436
import java.util.ArrayList;
3537
import java.util.Arrays;
3638
import java.util.List;
39+
import java.util.stream.Stream;
40+
41+
import static org.assertj.core.api.Assertions.assertThat;
3742

3843
/** Unit tests for the {@link JaninoCompiler}. */
3944
public class JaninoCompilerTest {
@@ -120,4 +125,81 @@ public void testBuildInFunction() throws InvocationTargetException {
120125
Object evaluate = expressionEvaluator.evaluate(params.toArray());
121126
Assert.assertEquals(3.0, evaluate);
122127
}
128+
129+
@Test
130+
public void testLargeNumericLiterals() {
131+
// Test parsing integer literals
132+
Stream.of(
133+
Tuple2.of("0", 0),
134+
Tuple2.of("1", 1),
135+
Tuple2.of("1", 1),
136+
Tuple2.of("2147483647", 2147483647),
137+
Tuple2.of("-2147483648", -2147483648))
138+
.forEach(
139+
t -> {
140+
String expression = t.f0;
141+
List<String> columnNames = new ArrayList<>();
142+
List<Class<?>> paramTypes = new ArrayList<>();
143+
ExpressionEvaluator expressionEvaluator =
144+
JaninoCompiler.compileExpression(
145+
JaninoCompiler.loadSystemFunction(expression),
146+
columnNames,
147+
paramTypes,
148+
Integer.class);
149+
try {
150+
assertThat(expressionEvaluator.evaluate()).isEqualTo(t.f1);
151+
} catch (InvocationTargetException e) {
152+
throw new RuntimeException(e);
153+
}
154+
});
155+
156+
// Test parsing double literals
157+
Stream.of(
158+
Tuple2.of("3.1415926", 3.1415926),
159+
Tuple2.of("0.0", 0.0),
160+
Tuple2.of("17.0", 17.0),
161+
Tuple2.of("123456789.123456789", 123456789.123456789),
162+
Tuple2.of("-987654321.987654321", -987654321.987654321))
163+
.forEach(
164+
t -> {
165+
String expression = t.f0;
166+
List<String> columnNames = new ArrayList<>();
167+
List<Class<?>> paramTypes = new ArrayList<>();
168+
ExpressionEvaluator expressionEvaluator =
169+
JaninoCompiler.compileExpression(
170+
JaninoCompiler.loadSystemFunction(expression),
171+
columnNames,
172+
paramTypes,
173+
Double.class);
174+
try {
175+
assertThat(expressionEvaluator.evaluate()).isEqualTo(t.f1);
176+
} catch (InvocationTargetException e) {
177+
throw new RuntimeException(e);
178+
}
179+
});
180+
181+
// Test parsing long literals
182+
Stream.of(
183+
Tuple2.of("2147483648L", 2147483648L),
184+
Tuple2.of("-2147483649L", -2147483649L),
185+
Tuple2.of("9223372036854775807L", 9223372036854775807L),
186+
Tuple2.of("-9223372036854775808L", -9223372036854775808L))
187+
.forEach(
188+
t -> {
189+
String expression = t.f0;
190+
List<String> columnNames = new ArrayList<>();
191+
List<Class<?>> paramTypes = new ArrayList<>();
192+
ExpressionEvaluator expressionEvaluator =
193+
JaninoCompiler.compileExpression(
194+
JaninoCompiler.loadSystemFunction(expression),
195+
columnNames,
196+
paramTypes,
197+
Long.class);
198+
try {
199+
assertThat(expressionEvaluator.evaluate()).isEqualTo(t.f1);
200+
} catch (InvocationTargetException e) {
201+
throw new RuntimeException(e);
202+
}
203+
});
204+
}
123205
}

flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.apache.calcite.rel.RelRoot;
3737
import org.apache.calcite.rel.type.RelDataTypeSystem;
3838
import org.apache.calcite.rex.RexBuilder;
39+
import org.apache.calcite.runtime.CalciteContextException;
3940
import org.apache.calcite.sql.SqlNode;
4041
import org.apache.calcite.sql.SqlSelect;
4142
import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
@@ -461,6 +462,34 @@ public void testTranslateUdfFilterToJaninoExpression() {
461462
"__instanceOfAddOneFunctionClass.eval(__instanceOfAddOneFunctionClass.eval(id)) > 4 || !valueEquals(__instanceOfTypeOfFunctionClass.eval(id), \"bool\") && !valueEquals(__instanceOfFormatFunctionClass.eval(\"from %s to %s is %s\", \"a\", \"z\", \"lie\"), \"\")");
462463
}
463464

465+
@Test
466+
void testLargeNumericalLiterals() {
467+
// For literals within [-2147483648, 2147483647] range, plain Integers are OK
468+
testFilterExpression("id > 2147483647", "id > 2147483647");
469+
testFilterExpression("id < -2147483648", "id < -2147483648");
470+
471+
// For out-of-range literals, an extra `L` suffix is required
472+
testFilterExpression("id > 2147483648", "id > 2147483648L");
473+
testFilterExpression("id > -2147483649", "id > -2147483649L");
474+
testFilterExpression("id < 9223372036854775807", "id < 9223372036854775807L");
475+
testFilterExpression("id > -9223372036854775808", "id > -9223372036854775808L");
476+
477+
// But there's still a limit
478+
Assertions.assertThatThrownBy(
479+
() ->
480+
TransformParser.translateFilterExpressionToJaninoExpression(
481+
"id > 9223372036854775808", Collections.emptyList()))
482+
.isExactlyInstanceOf(CalciteContextException.class)
483+
.hasMessageContaining("Numeric literal '9223372036854775808' out of range");
484+
485+
Assertions.assertThatThrownBy(
486+
() ->
487+
TransformParser.translateFilterExpressionToJaninoExpression(
488+
"id < -9223372036854775809", Collections.emptyList()))
489+
.isExactlyInstanceOf(CalciteContextException.class)
490+
.hasMessageContaining("Numeric literal '-9223372036854775809' out of range");
491+
}
492+
464493
private void testFilterExpression(String expression, String expressionExpect) {
465494
String janinoExpression =
466495
TransformParser.translateFilterExpressionToJaninoExpression(

0 commit comments

Comments
 (0)