1818package org .apache .flink .cdc .common .utils ;
1919
2020import org .apache .flink .cdc .common .annotation .PublicEvolving ;
21+ import org .apache .flink .cdc .common .annotation .VisibleForTesting ;
2122import org .apache .flink .cdc .common .data .RecordData ;
2223import org .apache .flink .cdc .common .event .AddColumnEvent ;
2324import org .apache .flink .cdc .common .event .AlterColumnTypeEvent ;
2627import org .apache .flink .cdc .common .event .SchemaChangeEvent ;
2728import org .apache .flink .cdc .common .schema .Column ;
2829import org .apache .flink .cdc .common .schema .Schema ;
30+ import org .apache .flink .cdc .common .types .DataType ;
31+ import org .apache .flink .cdc .common .types .DataTypeFamily ;
32+ import org .apache .flink .cdc .common .types .DataTypeRoot ;
33+ import org .apache .flink .cdc .common .types .DataTypes ;
34+ import org .apache .flink .cdc .common .types .DecimalType ;
35+
36+ import javax .annotation .Nullable ;
2937
3038import java .util .ArrayList ;
39+ import java .util .Collections ;
3140import java .util .LinkedList ;
3241import java .util .List ;
42+ import java .util .Objects ;
3343import java .util .stream .Collectors ;
44+ import java .util .stream .IntStream ;
3445
3546/** Utils for {@link Schema} to perform the ability of evolution. */
3647@ PublicEvolving
@@ -56,6 +67,177 @@ public static List<RecordData.FieldGetter> createFieldGetters(List<Column> colum
5667 return fieldGetters ;
5768 }
5869
70+ /** Restore original data fields from RecordData structure. */
71+ public static List <Object > restoreOriginalData (
72+ @ Nullable RecordData recordData , List <RecordData .FieldGetter > fieldGetters ) {
73+ if (recordData == null ) {
74+ return Collections .emptyList ();
75+ }
76+ List <Object > actualFields = new ArrayList <>();
77+ for (RecordData .FieldGetter fieldGetter : fieldGetters ) {
78+ actualFields .add (fieldGetter .getFieldOrNull (recordData ));
79+ }
80+ return actualFields ;
81+ }
82+
83+ /** Merge compatible upstream schemas. */
84+ public static Schema mergeCompatibleSchemas (List <Schema > schemas ) {
85+ if (schemas .isEmpty ()) {
86+ return null ;
87+ } else if (schemas .size () == 1 ) {
88+ return schemas .get (0 );
89+ } else {
90+ Schema outputSchema = null ;
91+ for (Schema schema : schemas ) {
92+ outputSchema = mergeSchema (outputSchema , schema );
93+ }
94+ return outputSchema ;
95+ }
96+ }
97+
98+ /** Try to combine two schemas with potential incompatible type. */
99+ @ VisibleForTesting
100+ public static Schema mergeSchema (@ Nullable Schema lhs , Schema rhs ) {
101+ if (lhs == null ) {
102+ return rhs ;
103+ }
104+ if (lhs .getColumnCount () != rhs .getColumnCount ()) {
105+ throw new IllegalStateException (
106+ String .format (
107+ "Unable to merge schema %s and %s with different column counts." ,
108+ lhs , rhs ));
109+ }
110+ if (!lhs .primaryKeys ().equals (rhs .primaryKeys ())) {
111+ throw new IllegalStateException (
112+ String .format (
113+ "Unable to merge schema %s and %s with different primary keys." ,
114+ lhs , rhs ));
115+ }
116+ if (!lhs .partitionKeys ().equals (rhs .partitionKeys ())) {
117+ throw new IllegalStateException (
118+ String .format (
119+ "Unable to merge schema %s and %s with different partition keys." ,
120+ lhs , rhs ));
121+ }
122+ if (!lhs .options ().equals (rhs .options ())) {
123+ throw new IllegalStateException (
124+ String .format (
125+ "Unable to merge schema %s and %s with different options." , lhs , rhs ));
126+ }
127+ if (!Objects .equals (lhs .comment (), rhs .comment ())) {
128+ throw new IllegalStateException (
129+ String .format (
130+ "Unable to merge schema %s and %s with different comments." , lhs , rhs ));
131+ }
132+
133+ List <Column > leftColumns = lhs .getColumns ();
134+ List <Column > rightColumns = rhs .getColumns ();
135+
136+ List <Column > mergedColumns =
137+ IntStream .range (0 , lhs .getColumnCount ())
138+ .mapToObj (i -> mergeColumn (leftColumns .get (i ), rightColumns .get (i )))
139+ .collect (Collectors .toList ());
140+
141+ return lhs .copy (mergedColumns );
142+ }
143+
144+ /** Try to combine two columns with potential incompatible type. */
145+ @ VisibleForTesting
146+ public static Column mergeColumn (Column lhs , Column rhs ) {
147+ if (!Objects .equals (lhs .getName (), rhs .getName ())) {
148+ throw new IllegalStateException (
149+ String .format (
150+ "Unable to merge column %s and %s with different name." , lhs , rhs ));
151+ }
152+ if (!Objects .equals (lhs .getComment (), rhs .getComment ())) {
153+ throw new IllegalStateException (
154+ String .format (
155+ "Unable to merge column %s and %s with different comments." , lhs , rhs ));
156+ }
157+ return lhs .copy (mergeDataType (lhs .getType (), rhs .getType ()));
158+ }
159+
160+ /** Try to combine given data types to a compatible wider data type. */
161+ @ VisibleForTesting
162+ public static DataType mergeDataType (DataType lhs , DataType rhs ) {
163+ // Ignore nullability during data type merge
164+ boolean nullable = lhs .isNullable () || rhs .isNullable ();
165+ lhs = lhs .notNull ();
166+ rhs = rhs .notNull ();
167+
168+ DataType mergedType ;
169+ if (lhs .equals (rhs )) {
170+ // identical type
171+ mergedType = rhs ;
172+ } else if (lhs .is (DataTypeFamily .INTEGER_NUMERIC )
173+ && rhs .is (DataTypeFamily .INTEGER_NUMERIC )) {
174+ mergedType = DataTypes .BIGINT ();
175+ } else if (lhs .is (DataTypeFamily .CHARACTER_STRING )
176+ && rhs .is (DataTypeFamily .CHARACTER_STRING )) {
177+ mergedType = DataTypes .STRING ();
178+ } else if (lhs .is (DataTypeFamily .APPROXIMATE_NUMERIC )
179+ && rhs .is (DataTypeFamily .APPROXIMATE_NUMERIC )) {
180+ mergedType = DataTypes .DOUBLE ();
181+ } else if (lhs .is (DataTypeRoot .DECIMAL ) && rhs .is (DataTypeRoot .DECIMAL )) {
182+ // Merge two decimal types
183+ DecimalType lhsDecimal = (DecimalType ) lhs ;
184+ DecimalType rhsDecimal = (DecimalType ) rhs ;
185+ int resultIntDigits =
186+ Math .max (
187+ lhsDecimal .getPrecision () - lhsDecimal .getScale (),
188+ rhsDecimal .getPrecision () - rhsDecimal .getScale ());
189+ int resultScale = Math .max (lhsDecimal .getScale (), rhsDecimal .getScale ());
190+ mergedType = DataTypes .DECIMAL (resultIntDigits + resultScale , resultScale );
191+ } else if (lhs .is (DataTypeRoot .DECIMAL ) && rhs .is (DataTypeFamily .EXACT_NUMERIC )) {
192+ // Merge decimal and int
193+ DecimalType lhsDecimal = (DecimalType ) lhs ;
194+ mergedType =
195+ DataTypes .DECIMAL (
196+ Math .max (
197+ lhsDecimal .getPrecision (),
198+ lhsDecimal .getScale () + getNumericPrecision (rhs )),
199+ lhsDecimal .getScale ());
200+ } else if (rhs .is (DataTypeRoot .DECIMAL ) && lhs .is (DataTypeFamily .EXACT_NUMERIC )) {
201+ // Merge decimal and int
202+ DecimalType rhsDecimal = (DecimalType ) rhs ;
203+ mergedType =
204+ DataTypes .DECIMAL (
205+ Math .max (
206+ rhsDecimal .getPrecision (),
207+ rhsDecimal .getScale () + getNumericPrecision (lhs )),
208+ rhsDecimal .getScale ());
209+ } else {
210+ throw new IllegalStateException (
211+ String .format ("Incompatible types: \" %s\" and \" %s\" " , lhs , rhs ));
212+ }
213+
214+ if (nullable ) {
215+ return mergedType .nullable ();
216+ } else {
217+ return mergedType .notNull ();
218+ }
219+ }
220+
221+ @ VisibleForTesting
222+ public static int getNumericPrecision (DataType dataType ) {
223+ if (dataType .is (DataTypeFamily .EXACT_NUMERIC )) {
224+ if (dataType .is (DataTypeRoot .TINYINT )) {
225+ return 3 ;
226+ } else if (dataType .is (DataTypeRoot .SMALLINT )) {
227+ return 5 ;
228+ } else if (dataType .is (DataTypeRoot .INTEGER )) {
229+ return 10 ;
230+ } else if (dataType .is (DataTypeRoot .BIGINT )) {
231+ return 19 ;
232+ } else if (dataType .is (DataTypeRoot .DECIMAL )) {
233+ return ((DecimalType ) dataType ).getPrecision ();
234+ }
235+ }
236+
237+ throw new IllegalArgumentException (
238+ "Failed to get precision of non-exact decimal type " + dataType );
239+ }
240+
59241 /** apply SchemaChangeEvent to the old schema and return the schema after changing. */
60242 public static Schema applySchemaChangeEvent (Schema schema , SchemaChangeEvent event ) {
61243 if (event instanceof AddColumnEvent ) {
0 commit comments