Skip to content

Commit 60835a9

Browse files
committed
fix binary length and reorganize test cases
1 parent cac0789 commit 60835a9

File tree

6 files changed

+172
-222
lines changed

6 files changed

+172
-222
lines changed

Diff for: flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oceanbase/src/main/java/org/apache/flink/cdc/connectors/oceanbase/catalog/OceanBaseColumn.java

+2-5
Original file line numberDiff line numberDiff line change
@@ -51,13 +51,10 @@ public class OceanBaseColumn implements Serializable {
5151
*/
5252
@Nullable private final Integer columnSize;
5353

54-
/**
55-
* The number of fractional digits for numeric data. This is null for other data types.
56-
* NUMBER_SCALE in information_schema.COLUMNS.
57-
*/
54+
/** The number of fractional digits for numeric data. This is null for other data types. */
5855
@Nullable private final Integer numericScale;
5956

60-
/** The column comment. COLUMN_COMMENT in information_schema.COLUMNS. */
57+
/** The column comment. */
6158
@Nullable private final String columnComment;
6259

6360
private OceanBaseColumn(

Diff for: flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oceanbase/src/main/java/org/apache/flink/cdc/connectors/oceanbase/catalog/OceanBaseMySQLCatalog.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,7 @@ public void alterColumnType(
236236
"table name cannot be null or empty.");
237237

238238
OceanBaseUtils.CdcDataTypeTransformer cdcDataTypeTransformer =
239-
new OceanBaseUtils.CdcDataTypeTransformer(false, new OceanBaseColumn.Builder());
239+
new OceanBaseUtils.CdcDataTypeTransformer(new OceanBaseColumn.Builder());
240240
OceanBaseColumn oceanBaseColumn =
241241
dataType.accept(cdcDataTypeTransformer).setColumnName(columnName).build();
242242
String alterTypeSql =
@@ -463,6 +463,7 @@ protected String getFullColumnType(
463463
return String.format("DECIMAL(%d, %s)", columnSize.get(), decimalDigits.get());
464464
case "CHAR":
465465
case "VARCHAR":
466+
case "BINARY":
466467
case "VARBINARY":
467468
Preconditions.checkArgument(
468469
columnSize.isPresent(), type + " type must have column size");

Diff for: flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oceanbase/src/main/java/org/apache/flink/cdc/connectors/oceanbase/sink/OceanBaseMetadataApplier.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ private void applyAddColumnEvent(AddColumnEvent addColumnEvent) {
133133
.setColumnName(column.getName())
134134
.setOrdinalPosition(-1)
135135
.setColumnComment(column.getComment());
136-
OceanBaseUtils.toOceanBaseDataType(column, false, builder);
136+
OceanBaseUtils.toOceanBaseDataType(column, builder);
137137
addColumns.add(builder.build());
138138
}
139139

Diff for: flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oceanbase/src/main/java/org/apache/flink/cdc/connectors/oceanbase/sink/OceanBaseUtils.java

+6-9
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ public static OceanBaseTable toOceanBaseTable(TableId tableId, Schema schema) {
7676
.setOrdinalPosition(i)
7777
.setDefaultValue(column.getDefaultValueExpression())
7878
.setColumnComment(column.getComment());
79-
toOceanBaseDataType(column, isPrimaryKeys, builder);
79+
toOceanBaseDataType(column, builder);
8080
oceanBaseColumns.add(builder.build());
8181
}
8282

@@ -98,10 +98,8 @@ public static OceanBaseTable toOceanBaseTable(TableId tableId, Schema schema) {
9898
}
9999

100100
/** Convert CDC data type to OceanBase data type. */
101-
public static void toOceanBaseDataType(
102-
Column cdcColumn, boolean isPrimaryKeys, OceanBaseColumn.Builder builder) {
103-
CdcDataTypeTransformer dataTypeTransformer =
104-
new CdcDataTypeTransformer(isPrimaryKeys, builder);
101+
public static void toOceanBaseDataType(Column cdcColumn, OceanBaseColumn.Builder builder) {
102+
CdcDataTypeTransformer dataTypeTransformer = new CdcDataTypeTransformer(builder);
105103
cdcColumn.getType().accept(dataTypeTransformer);
106104
}
107105

@@ -121,7 +119,6 @@ public static void toOceanBaseDataType(
121119
public static final String DECIMAL = "DECIMAL";
122120
public static final String CHAR = "CHAR";
123121
public static final String VARCHAR = "VARCHAR";
124-
public static final String STRING = "STRING";
125122
public static final String DATE = "DATE";
126123
public static final String DATETIME = "DATETIME";
127124
public static final String TIMESTAMP = "TIMESTAMP";
@@ -144,10 +141,8 @@ public static class CdcDataTypeTransformer
144141
extends DataTypeDefaultVisitor<OceanBaseColumn.Builder> {
145142

146143
private final OceanBaseColumn.Builder builder;
147-
private final boolean isPrimaryKeys;
148144

149-
public CdcDataTypeTransformer(boolean isPrimaryKeys, OceanBaseColumn.Builder builder) {
150-
this.isPrimaryKeys = isPrimaryKeys;
145+
public CdcDataTypeTransformer(OceanBaseColumn.Builder builder) {
151146
this.builder = builder;
152147
}
153148

@@ -188,7 +183,9 @@ public OceanBaseColumn.Builder visit(BigIntType bigIntType) {
188183

189184
@Override
190185
public OceanBaseColumn.Builder visit(BinaryType binaryType) {
186+
int length = binaryType.getLength();
191187
builder.setDataType(BINARY);
188+
builder.setColumnSize(length);
192189
builder.setNullable(binaryType.isNullable());
193190
return builder;
194191
}

Diff for: flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oceanbase/src/test/java/org/apache/flink/cdc/connectors/oceanbase/sink/OceanBaseMetadataApplierTest.java

-206
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import org.apache.flink.cdc.common.schema.Schema;
2727
import org.apache.flink.cdc.common.types.BooleanType;
2828
import org.apache.flink.cdc.common.types.DataType;
29-
import org.apache.flink.cdc.common.types.DataTypes;
3029
import org.apache.flink.cdc.common.types.IntType;
3130
import org.apache.flink.cdc.common.types.LocalZonedTimestampType;
3231
import org.apache.flink.cdc.connectors.oceanbase.catalog.OceanBaseColumn;
@@ -134,211 +133,6 @@ public void testCreateTable() {
134133
assertEquals(expectTable, actualTable);
135134
}
136135

137-
@Test
138-
public void testCreateTableWithAllType() {
139-
TableId tableId = TableId.parse("test.tbl6");
140-
Schema schema =
141-
Schema.newBuilder()
142-
.physicalColumn("col1", new IntType(false))
143-
.physicalColumn("col2", DataTypes.BOOLEAN())
144-
.physicalColumn("col3", DataTypes.TIMESTAMP_LTZ())
145-
.physicalColumn("col4", DataTypes.BYTES())
146-
.physicalColumn("col5", DataTypes.TINYINT())
147-
.physicalColumn("col6", DataTypes.SMALLINT())
148-
.physicalColumn("col7", DataTypes.BIGINT())
149-
.physicalColumn("col8", DataTypes.FLOAT())
150-
.physicalColumn("col9", DataTypes.DOUBLE())
151-
.physicalColumn("col10", DataTypes.DECIMAL(6, 3))
152-
.physicalColumn("col11", DataTypes.CHAR(5))
153-
.physicalColumn("col12", DataTypes.VARCHAR(10))
154-
.physicalColumn("col13", DataTypes.STRING())
155-
.physicalColumn("col14", DataTypes.DATE())
156-
.physicalColumn("col15", DataTypes.TIME())
157-
.physicalColumn("col16", DataTypes.TIME(6))
158-
.physicalColumn("col17", DataTypes.TIMESTAMP())
159-
.physicalColumn("col18", DataTypes.TIMESTAMP(3))
160-
.physicalColumn("col19", DataTypes.TIMESTAMP_LTZ(3))
161-
.physicalColumn("col20", DataTypes.TIMESTAMP_TZ())
162-
.physicalColumn("col21", DataTypes.TIMESTAMP_TZ(3))
163-
.primaryKey("col1")
164-
.build();
165-
CreateTableEvent createTableEvent = new CreateTableEvent(tableId, schema);
166-
metadataApplier.applySchemaChange(createTableEvent);
167-
168-
OceanBaseTable actualTable =
169-
getTable(tableId.getSchemaName(), tableId.getTableName()).orElse(null);
170-
assertNotNull(actualTable);
171-
172-
List<OceanBaseColumn> columns = new ArrayList<>();
173-
columns.add(
174-
new OceanBaseColumn.Builder()
175-
.setColumnName("col1")
176-
.setOrdinalPosition(0)
177-
.setDataType("int")
178-
.setNumericScale(0)
179-
.setNullable(false)
180-
.build());
181-
columns.add(
182-
new OceanBaseColumn.Builder()
183-
.setColumnName("col2")
184-
.setOrdinalPosition(1)
185-
.setDataType("tinyint")
186-
.setNumericScale(0)
187-
.setNullable(true)
188-
.build());
189-
columns.add(
190-
new OceanBaseColumn.Builder()
191-
.setColumnName("col3")
192-
.setOrdinalPosition(2)
193-
.setDataType("timestamp")
194-
.setNullable(true)
195-
.build());
196-
197-
columns.add(
198-
new OceanBaseColumn.Builder()
199-
.setColumnName("col4")
200-
.setOrdinalPosition(3)
201-
.setDataType("longblob")
202-
.setNullable(true)
203-
.build());
204-
columns.add(
205-
new OceanBaseColumn.Builder()
206-
.setColumnName("col5")
207-
.setOrdinalPosition(4)
208-
.setDataType("tinyint")
209-
.setNumericScale(0)
210-
.setNullable(true)
211-
.build());
212-
columns.add(
213-
new OceanBaseColumn.Builder()
214-
.setColumnName("col6")
215-
.setOrdinalPosition(5)
216-
.setDataType("smallint")
217-
.setNumericScale(0)
218-
.setNullable(true)
219-
.build());
220-
columns.add(
221-
new OceanBaseColumn.Builder()
222-
.setColumnName("col7")
223-
.setOrdinalPosition(6)
224-
.setDataType("bigint")
225-
.setNumericScale(0)
226-
.setNullable(true)
227-
.build());
228-
columns.add(
229-
new OceanBaseColumn.Builder()
230-
.setColumnName("col8")
231-
.setOrdinalPosition(7)
232-
.setDataType("float")
233-
.setNullable(true)
234-
.build());
235-
columns.add(
236-
new OceanBaseColumn.Builder()
237-
.setColumnName("col9")
238-
.setOrdinalPosition(8)
239-
.setDataType("double")
240-
.setNullable(true)
241-
.build());
242-
columns.add(
243-
new OceanBaseColumn.Builder()
244-
.setColumnName("col10")
245-
.setOrdinalPosition(9)
246-
.setDataType("decimal")
247-
.setNumericScale(3)
248-
.setColumnSize(6)
249-
.setNullable(true)
250-
.build());
251-
columns.add(
252-
new OceanBaseColumn.Builder()
253-
.setColumnName("col11")
254-
.setOrdinalPosition(10)
255-
.setDataType("char")
256-
.setNullable(true)
257-
.build());
258-
columns.add(
259-
new OceanBaseColumn.Builder()
260-
.setColumnName("col12")
261-
.setOrdinalPosition(11)
262-
.setDataType("varchar")
263-
.setNullable(true)
264-
.build());
265-
266-
columns.add(
267-
new OceanBaseColumn.Builder()
268-
.setColumnName("col13")
269-
.setOrdinalPosition(12)
270-
.setDataType("text")
271-
.setNullable(true)
272-
.build());
273-
columns.add(
274-
new OceanBaseColumn.Builder()
275-
.setColumnName("col14")
276-
.setOrdinalPosition(13)
277-
.setDataType("date")
278-
.setNullable(true)
279-
.build());
280-
columns.add(
281-
new OceanBaseColumn.Builder()
282-
.setColumnName("col15")
283-
.setOrdinalPosition(14)
284-
.setDataType("time")
285-
.setNullable(true)
286-
.build());
287-
columns.add(
288-
new OceanBaseColumn.Builder()
289-
.setColumnName("col16")
290-
.setOrdinalPosition(15)
291-
.setDataType("time")
292-
.setNullable(true)
293-
.build());
294-
columns.add(
295-
new OceanBaseColumn.Builder()
296-
.setColumnName("col17")
297-
.setOrdinalPosition(16)
298-
.setDataType("datetime")
299-
.setNullable(true)
300-
.build());
301-
columns.add(
302-
new OceanBaseColumn.Builder()
303-
.setColumnName("col18")
304-
.setOrdinalPosition(17)
305-
.setDataType("datetime")
306-
.setNullable(true)
307-
.build());
308-
columns.add(
309-
new OceanBaseColumn.Builder()
310-
.setColumnName("col19")
311-
.setOrdinalPosition(18)
312-
.setDataType("timestamp")
313-
.setNullable(true)
314-
.build());
315-
columns.add(
316-
new OceanBaseColumn.Builder()
317-
.setColumnName("col20")
318-
.setOrdinalPosition(19)
319-
.setDataType("timestamp")
320-
.setNullable(true)
321-
.build());
322-
columns.add(
323-
new OceanBaseColumn.Builder()
324-
.setColumnName("col21")
325-
.setOrdinalPosition(20)
326-
.setDataType("timestamp")
327-
.setNullable(true)
328-
.build());
329-
330-
OceanBaseTable expectTable =
331-
new OceanBaseTable.Builder()
332-
.setDatabaseName(tableId.getSchemaName())
333-
.setTableName(tableId.getTableName())
334-
.setTableType(OceanBaseTable.TableType.PRIMARY_KEY)
335-
.setColumns(columns)
336-
.setTableKeys(schema.primaryKeys())
337-
.build();
338-
339-
assertEquals(expectTable, actualTable);
340-
}
341-
342136
@Test
343137
public void testDropTable() {
344138
TableId tableId = TableId.parse("test.tbl2");

0 commit comments

Comments
 (0)