Skip to content

Commit fc71888

Browse files
ChaomingZhangCNzhangchaoming.zcm
and
zhangchaoming.zcm
authored
[hotfix][cdc-common] Remove duplicated code to improve performance
This closes #3840. Co-authored-by: zhangchaoming.zcm <[email protected]>
1 parent ddc645b commit fc71888

File tree

2 files changed

+6
-1
lines changed

2 files changed

+6
-1
lines changed

Diff for: flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java

+6
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.apache.flink.cdc.common.types.TimestampType;
3838
import org.apache.flink.cdc.common.types.ZonedTimestampType;
3939

40+
import javax.annotation.CheckReturnValue;
4041
import javax.annotation.Nullable;
4142

4243
import java.util.ArrayList;
@@ -56,6 +57,7 @@ public class SchemaUtils {
5657
* create a list of {@link RecordData.FieldGetter} from given {@link Schema} to get Object from
5758
* RecordData.
5859
*/
60+
@CheckReturnValue
5961
public static List<RecordData.FieldGetter> createFieldGetters(Schema schema) {
6062
return createFieldGetters(schema.getColumns());
6163
}
@@ -64,6 +66,7 @@ public static List<RecordData.FieldGetter> createFieldGetters(Schema schema) {
6466
* create a list of {@link RecordData.FieldGetter} from given {@link Column} to get Object from
6567
* RecordData.
6668
*/
69+
@CheckReturnValue
6770
public static List<RecordData.FieldGetter> createFieldGetters(List<Column> columns) {
6871
List<RecordData.FieldGetter> fieldGetters = new ArrayList<>(columns.size());
6972
for (int i = 0; i < columns.size(); i++) {
@@ -73,6 +76,7 @@ public static List<RecordData.FieldGetter> createFieldGetters(List<Column> colum
7376
}
7477

7578
/** Restore original data fields from RecordData structure. */
79+
@CheckReturnValue
7680
public static List<Object> restoreOriginalData(
7781
@Nullable RecordData recordData, List<RecordData.FieldGetter> fieldGetters) {
7882
if (recordData == null) {
@@ -86,6 +90,7 @@ public static List<Object> restoreOriginalData(
8690
}
8791

8892
/** apply SchemaChangeEvent to the old schema and return the schema after changing. */
93+
@CheckReturnValue
8994
public static Schema applySchemaChangeEvent(Schema schema, SchemaChangeEvent event) {
9095
return SchemaChangeEventVisitor.visit(
9196
event,
@@ -210,6 +215,7 @@ private static Schema applyAlterColumnTypeEvent(AlterColumnTypeEvent event, Sche
210215
* position indicators. This is necessary since extra calculated columns might be added, and
211216
* `FIRST` / `LAST` position might differ.
212217
*/
218+
@CheckReturnValue
213219
public static Optional<SchemaChangeEvent> transformSchemaChangeEvent(
214220
boolean hasAsterisk, List<String> referencedColumns, SchemaChangeEvent event) {
215221
Optional<SchemaChangeEvent> evolvedSchemaChangeEvent = Optional.empty();

Diff for: flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaDerivator.java

-1
Original file line numberDiff line numberDiff line change
@@ -311,7 +311,6 @@ public Optional<DataChangeEvent> coerceDataRecord(
311311

312312
List<RecordData.FieldGetter> upstreamSchemaReader =
313313
upstreamRecordGetterCache.getUnchecked(upstreamSchema);
314-
SchemaUtils.createFieldGetters(upstreamSchema);
315314
BinaryRecordDataGenerator evolvedSchemaWriter =
316315
evolvedRecordWriterCache.getUnchecked(evolvedSchema);
317316

0 commit comments

Comments
 (0)