|
18 | 18 |
|
19 | 19 | package org.apache.seatunnel.e2e.connector.kafka;
|
20 | 20 |
|
| 21 | +import org.apache.seatunnel.api.table.catalog.CatalogTable; |
| 22 | +import org.apache.seatunnel.api.table.catalog.CatalogTableUtil; |
21 | 23 | import org.apache.seatunnel.api.table.type.BasicType;
|
22 | 24 | import org.apache.seatunnel.api.table.type.DecimalType;
|
23 | 25 | import org.apache.seatunnel.api.table.type.LocalTimeType;
|
@@ -117,111 +119,115 @@ public class KafkaFormatIT extends TestSuiteBase implements TestResource {
|
117 | 119 | private static final String PG_SINK_TABLE1 = "sink";
|
118 | 120 | private static final String PG_SINK_TABLE2 = "sink2";
|
119 | 121 |
|
120 |
| - private static final Map<String, SeaTunnelRowType> sinkTableRowTypes = new HashMap<>(); |
| 122 | + private static final Map<String, CatalogTable> sinkTables = new HashMap<>(); |
121 | 123 |
|
122 | 124 | static {
|
123 |
| - sinkTableRowTypes.put( |
| 125 | + sinkTables.put( |
124 | 126 | PG_SINK_TABLE1,
|
125 |
| - new SeaTunnelRowType( |
126 |
| - new String[] {"id", "name", "description", "weight"}, |
127 |
| - new SeaTunnelDataType[] { |
128 |
| - BasicType.INT_TYPE, |
129 |
| - BasicType.STRING_TYPE, |
130 |
| - BasicType.STRING_TYPE, |
131 |
| - BasicType.STRING_TYPE |
132 |
| - })); |
133 |
| - |
134 |
| - sinkTableRowTypes.put( |
| 127 | + CatalogTableUtil.getCatalogTable( |
| 128 | + PG_SINK_TABLE1, |
| 129 | + new SeaTunnelRowType( |
| 130 | + new String[] {"id", "name", "description", "weight"}, |
| 131 | + new SeaTunnelDataType[] { |
| 132 | + BasicType.INT_TYPE, |
| 133 | + BasicType.STRING_TYPE, |
| 134 | + BasicType.STRING_TYPE, |
| 135 | + BasicType.STRING_TYPE |
| 136 | + }))); |
| 137 | + |
| 138 | + sinkTables.put( |
135 | 139 | PG_SINK_TABLE2,
|
136 |
| - new SeaTunnelRowType( |
137 |
| - new String[] { |
138 |
| - "id", |
139 |
| - "f_binary", |
140 |
| - "f_blob", |
141 |
| - "f_long_varbinary", |
142 |
| - "f_longblob", |
143 |
| - "f_tinyblob", |
144 |
| - "f_varbinary", |
145 |
| - "f_smallint", |
146 |
| - "f_smallint_unsigned", |
147 |
| - "f_mediumint", |
148 |
| - "f_mediumint_unsigned", |
149 |
| - "f_int", |
150 |
| - "f_int_unsigned", |
151 |
| - "f_integer", |
152 |
| - "f_integer_unsigned", |
153 |
| - "f_bigint", |
154 |
| - "f_bigint_unsigned", |
155 |
| - "f_numeric", |
156 |
| - "f_decimal", |
157 |
| - "f_float", |
158 |
| - "f_double", |
159 |
| - "f_double_precision", |
160 |
| - "f_longtext", |
161 |
| - "f_mediumtext", |
162 |
| - "f_text", |
163 |
| - "f_tinytext", |
164 |
| - "f_varchar", |
165 |
| - "f_date", |
166 |
| - "f_datetime", |
167 |
| - "f_timestamp", |
168 |
| - "f_bit1", |
169 |
| - "f_bit64", |
170 |
| - "f_char", |
171 |
| - "f_enum", |
172 |
| - "f_mediumblob", |
173 |
| - "f_long_varchar", |
174 |
| - "f_real", |
175 |
| - "f_time", |
176 |
| - "f_tinyint", |
177 |
| - "f_tinyint_unsigned", |
178 |
| - "f_json", |
179 |
| - "f_year" |
180 |
| - }, |
181 |
| - new SeaTunnelDataType[] { |
182 |
| - BasicType.INT_TYPE, |
183 |
| - PrimitiveByteArrayType.INSTANCE, |
184 |
| - PrimitiveByteArrayType.INSTANCE, |
185 |
| - PrimitiveByteArrayType.INSTANCE, |
186 |
| - PrimitiveByteArrayType.INSTANCE, |
187 |
| - PrimitiveByteArrayType.INSTANCE, |
188 |
| - PrimitiveByteArrayType.INSTANCE, |
189 |
| - BasicType.SHORT_TYPE, |
190 |
| - BasicType.INT_TYPE, |
191 |
| - BasicType.INT_TYPE, |
192 |
| - BasicType.INT_TYPE, |
193 |
| - BasicType.INT_TYPE, |
194 |
| - BasicType.INT_TYPE, |
195 |
| - BasicType.INT_TYPE, |
196 |
| - BasicType.LONG_TYPE, |
197 |
| - BasicType.LONG_TYPE, |
198 |
| - new DecimalType(10, 0), |
199 |
| - new DecimalType(10, 0), |
200 |
| - new DecimalType(10, 0), |
201 |
| - BasicType.FLOAT_TYPE, |
202 |
| - BasicType.DOUBLE_TYPE, |
203 |
| - BasicType.DOUBLE_TYPE, |
204 |
| - BasicType.STRING_TYPE, |
205 |
| - BasicType.STRING_TYPE, |
206 |
| - BasicType.STRING_TYPE, |
207 |
| - BasicType.STRING_TYPE, |
208 |
| - BasicType.STRING_TYPE, |
209 |
| - LocalTimeType.LOCAL_DATE_TYPE, |
210 |
| - LocalTimeType.LOCAL_DATE_TIME_TYPE, |
211 |
| - LocalTimeType.LOCAL_DATE_TIME_TYPE, |
212 |
| - BasicType.BOOLEAN_TYPE, |
213 |
| - BasicType.BYTE_TYPE, |
214 |
| - BasicType.STRING_TYPE, |
215 |
| - BasicType.STRING_TYPE, |
216 |
| - PrimitiveByteArrayType.INSTANCE, |
217 |
| - BasicType.STRING_TYPE, |
218 |
| - BasicType.DOUBLE_TYPE, |
219 |
| - LocalTimeType.LOCAL_TIME_TYPE, |
220 |
| - BasicType.BYTE_TYPE, |
221 |
| - BasicType.INT_TYPE, |
222 |
| - BasicType.STRING_TYPE, |
223 |
| - BasicType.INT_TYPE |
224 |
| - })); |
| 140 | + CatalogTableUtil.getCatalogTable( |
| 141 | + PG_SINK_TABLE2, |
| 142 | + new SeaTunnelRowType( |
| 143 | + new String[] { |
| 144 | + "id", |
| 145 | + "f_binary", |
| 146 | + "f_blob", |
| 147 | + "f_long_varbinary", |
| 148 | + "f_longblob", |
| 149 | + "f_tinyblob", |
| 150 | + "f_varbinary", |
| 151 | + "f_smallint", |
| 152 | + "f_smallint_unsigned", |
| 153 | + "f_mediumint", |
| 154 | + "f_mediumint_unsigned", |
| 155 | + "f_int", |
| 156 | + "f_int_unsigned", |
| 157 | + "f_integer", |
| 158 | + "f_integer_unsigned", |
| 159 | + "f_bigint", |
| 160 | + "f_bigint_unsigned", |
| 161 | + "f_numeric", |
| 162 | + "f_decimal", |
| 163 | + "f_float", |
| 164 | + "f_double", |
| 165 | + "f_double_precision", |
| 166 | + "f_longtext", |
| 167 | + "f_mediumtext", |
| 168 | + "f_text", |
| 169 | + "f_tinytext", |
| 170 | + "f_varchar", |
| 171 | + "f_date", |
| 172 | + "f_datetime", |
| 173 | + "f_timestamp", |
| 174 | + "f_bit1", |
| 175 | + "f_bit64", |
| 176 | + "f_char", |
| 177 | + "f_enum", |
| 178 | + "f_mediumblob", |
| 179 | + "f_long_varchar", |
| 180 | + "f_real", |
| 181 | + "f_time", |
| 182 | + "f_tinyint", |
| 183 | + "f_tinyint_unsigned", |
| 184 | + "f_json", |
| 185 | + "f_year" |
| 186 | + }, |
| 187 | + new SeaTunnelDataType[] { |
| 188 | + BasicType.INT_TYPE, |
| 189 | + PrimitiveByteArrayType.INSTANCE, |
| 190 | + PrimitiveByteArrayType.INSTANCE, |
| 191 | + PrimitiveByteArrayType.INSTANCE, |
| 192 | + PrimitiveByteArrayType.INSTANCE, |
| 193 | + PrimitiveByteArrayType.INSTANCE, |
| 194 | + PrimitiveByteArrayType.INSTANCE, |
| 195 | + BasicType.SHORT_TYPE, |
| 196 | + BasicType.INT_TYPE, |
| 197 | + BasicType.INT_TYPE, |
| 198 | + BasicType.INT_TYPE, |
| 199 | + BasicType.INT_TYPE, |
| 200 | + BasicType.INT_TYPE, |
| 201 | + BasicType.INT_TYPE, |
| 202 | + BasicType.LONG_TYPE, |
| 203 | + BasicType.LONG_TYPE, |
| 204 | + new DecimalType(10, 0), |
| 205 | + new DecimalType(10, 0), |
| 206 | + new DecimalType(10, 0), |
| 207 | + BasicType.FLOAT_TYPE, |
| 208 | + BasicType.DOUBLE_TYPE, |
| 209 | + BasicType.DOUBLE_TYPE, |
| 210 | + BasicType.STRING_TYPE, |
| 211 | + BasicType.STRING_TYPE, |
| 212 | + BasicType.STRING_TYPE, |
| 213 | + BasicType.STRING_TYPE, |
| 214 | + BasicType.STRING_TYPE, |
| 215 | + LocalTimeType.LOCAL_DATE_TYPE, |
| 216 | + LocalTimeType.LOCAL_DATE_TIME_TYPE, |
| 217 | + LocalTimeType.LOCAL_DATE_TIME_TYPE, |
| 218 | + BasicType.BOOLEAN_TYPE, |
| 219 | + BasicType.BYTE_TYPE, |
| 220 | + BasicType.STRING_TYPE, |
| 221 | + BasicType.STRING_TYPE, |
| 222 | + PrimitiveByteArrayType.INSTANCE, |
| 223 | + BasicType.STRING_TYPE, |
| 224 | + BasicType.DOUBLE_TYPE, |
| 225 | + LocalTimeType.LOCAL_TIME_TYPE, |
| 226 | + BasicType.BYTE_TYPE, |
| 227 | + BasicType.INT_TYPE, |
| 228 | + BasicType.STRING_TYPE, |
| 229 | + BasicType.INT_TYPE |
| 230 | + }))); |
225 | 231 | }
|
226 | 232 |
|
227 | 233 | // Used to map local data paths to kafa topics that need to be written to kafka
|
@@ -839,7 +845,7 @@ private List<List<Object>> getPostgreSinkTableList(String tableName) {
|
839 | 845 | while (resultSet.next()) {
|
840 | 846 | SeaTunnelRow row =
|
841 | 847 | postgresJdbcRowConverter.toInternal(
|
842 |
| - resultSet, sinkTableRowTypes.get(tableName)); |
| 848 | + resultSet, sinkTables.get(tableName).getTableSchema()); |
843 | 849 | actual.add(Arrays.asList(row.getFields()));
|
844 | 850 | }
|
845 | 851 | }
|
|
0 commit comments