Skip to content

Commit 258f931

Browse files
xxsc0529Hisoka-X
andauthored
[Fix][Connector-V2]Oceanbase vector database is added as the source server (#7832)
Co-authored-by: Jia Fan <[email protected]>
1 parent 6b5f74e commit 258f931

File tree

10 files changed

+232
-27
lines changed

10 files changed

+232
-27
lines changed

docs/en/connector-v2/sink/Jdbc.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,7 @@ there are some reference value for params above.
245245
| Snowflake | net.snowflake.client.jdbc.SnowflakeDriver | jdbc&#58;snowflake://<account_name>.snowflakecomputing.com | / | https://mvnrepository.com/artifact/net.snowflake/snowflake-jdbc |
246246
| Vertica | com.vertica.jdbc.Driver | jdbc:vertica://localhost:5433 | / | https://repo1.maven.org/maven2/com/vertica/jdbc/vertica-jdbc/12.0.3-0/vertica-jdbc-12.0.3-0.jar |
247247
| Kingbase | com.kingbase8.Driver | jdbc:kingbase8://localhost:54321/db_test | / | https://repo1.maven.org/maven2/cn/com/kingbase/kingbase8/8.6.0/kingbase8-8.6.0.jar |
248-
| OceanBase | com.oceanbase.jdbc.Driver | jdbc:oceanbase://localhost:2881 | / | https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/2.4.11/oceanbase-client-2.4.11.jar |
248+
| OceanBase | com.oceanbase.jdbc.Driver | jdbc:oceanbase://localhost:2881 | / | https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/2.4.12/oceanbase-client-2.4.12.jar |
249249
| xugu | com.xugu.cloudjdbc.Driver | jdbc:xugu://localhost:5138 | / | https://repo1.maven.org/maven2/com/xugudb/xugu-jdbc/12.2.0/xugu-jdbc-12.2.0.jar |
250250
| InterSystems IRIS | com.intersystems.jdbc.IRISDriver | jdbc:IRIS://localhost:1972/%SYS | / | https://raw.githubusercontent.com/intersystems-community/iris-driver-distribution/main/JDBC/JDK18/intersystems-jdbc-3.8.4.jar |
251251
| opengauss | org.opengauss.Driver | jdbc:opengauss://localhost:5432/postgres | / | https://repo1.maven.org/maven2/org/opengauss/opengauss-jdbc/5.1.0-og/opengauss-jdbc-5.1.0-og.jar |

docs/en/connector-v2/source/Jdbc.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ there are some reference value for params above.
133133
| Redshift | com.amazon.redshift.jdbc42.Driver | jdbc:redshift://localhost:5439/testdb?defaultRowFetchSize=1000 | https://mvnrepository.com/artifact/com.amazon.redshift/redshift-jdbc42 |
134134
| Vertica | com.vertica.jdbc.Driver | jdbc:vertica://localhost:5433 | https://repo1.maven.org/maven2/com/vertica/jdbc/vertica-jdbc/12.0.3-0/vertica-jdbc-12.0.3-0.jar |
135135
| Kingbase | com.kingbase8.Driver | jdbc:kingbase8://localhost:54321/db_test | https://repo1.maven.org/maven2/cn/com/kingbase/kingbase8/8.6.0/kingbase8-8.6.0.jar |
136-
| OceanBase | com.oceanbase.jdbc.Driver | jdbc:oceanbase://localhost:2881 | https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/2.4.11/oceanbase-client-2.4.11.jar |
136+
| OceanBase | com.oceanbase.jdbc.Driver | jdbc:oceanbase://localhost:2881 | https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/2.4.12/oceanbase-client-2.4.12.jar |
137137
| Hive | org.apache.hive.jdbc.HiveDriver | jdbc:hive2://localhost:10000 | https://repo1.maven.org/maven2/org/apache/hive/hive-jdbc/3.1.3/hive-jdbc-3.1.3-standalone.jar |
138138
| xugu | com.xugu.cloudjdbc.Driver | jdbc:xugu://localhost:5138 | https://repo1.maven.org/maven2/com/xugudb/xugu-jdbc/12.2.0/xugu-jdbc-12.2.0.jar |
139139
| InterSystems IRIS | com.intersystems.jdbc.IRISDriver | jdbc:IRIS://localhost:1972/%SYS | https://raw.githubusercontent.com/intersystems-community/iris-driver-distribution/main/JDBC/JDK18/intersystems-jdbc-3.8.4.jar |

docs/zh/connector-v2/sink/Jdbc.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@ Sink插件常用参数,请参考 [Sink常用选项](../sink-common-options.md)
235235
| Snowflake | net.snowflake.client.jdbc.SnowflakeDriver | jdbc&#58;snowflake://<account_name>.snowflakecomputing.com | / | https://mvnrepository.com/artifact/net.snowflake/snowflake-jdbc |
236236
| Vertica | com.vertica.jdbc.Driver | jdbc:vertica://localhost:5433 | / | https://repo1.maven.org/maven2/com/vertica/jdbc/vertica-jdbc/12.0.3-0/vertica-jdbc-12.0.3-0.jar |
237237
| Kingbase | com.kingbase8.Driver | jdbc:kingbase8://localhost:54321/db_test | / | https://repo1.maven.org/maven2/cn/com/kingbase/kingbase8/8.6.0/kingbase8-8.6.0.jar |
238-
| OceanBase | com.oceanbase.jdbc.Driver | jdbc:oceanbase://localhost:2881 | / | https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/2.4.11/oceanbase-client-2.4.11.jar |
238+
| OceanBase | com.oceanbase.jdbc.Driver | jdbc:oceanbase://localhost:2881 | / | https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/2.4.12/oceanbase-client-2.4.12.jar |
239239
| opengauss | org.opengauss.Driver | jdbc:opengauss://localhost:5432/postgres | / | https://repo1.maven.org/maven2/org/opengauss/opengauss-jdbc/5.1.0-og/opengauss-jdbc-5.1.0-og.jar |
240240

241241
## 示例

seatunnel-connectors-v2/connector-jdbc/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@
4949
<postgis.jdbc.version>2.5.1</postgis.jdbc.version>
5050
<kingbase8.version>8.6.0</kingbase8.version>
5151
<hive.jdbc.version>3.1.3</hive.jdbc.version>
52-
<oceanbase.jdbc.version>2.4.11</oceanbase.jdbc.version>
52+
<oceanbase.jdbc.version>2.4.12</oceanbase.jdbc.version>
5353
<xugu.jdbc.version>12.2.0</xugu.jdbc.version>
5454
<iris.jdbc.version>3.0.0</iris.jdbc.version>
5555
<tikv.version>3.2.0</tikv.version>

seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseMySqlCatalog.java

Lines changed: 56 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,26 +20,32 @@
2020
import org.apache.seatunnel.api.table.catalog.CatalogTable;
2121
import org.apache.seatunnel.api.table.catalog.Column;
2222
import org.apache.seatunnel.api.table.catalog.ConstraintKey;
23+
import org.apache.seatunnel.api.table.catalog.PrimaryKey;
2324
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
2425
import org.apache.seatunnel.api.table.catalog.TablePath;
26+
import org.apache.seatunnel.api.table.catalog.TableSchema;
2527
import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
2628
import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
2729
import org.apache.seatunnel.common.utils.JdbcUrlUtil;
2830
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog;
29-
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils;
3031
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oceanbase.OceanBaseMySqlTypeConverter;
31-
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oceanbase.OceanBaseMySqlTypeMapper;
3232
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oceanbase.OceanBaseMysqlType;
3333

34+
import org.apache.commons.lang.StringUtils;
35+
3436
import com.google.common.base.Preconditions;
3537
import lombok.extern.slf4j.Slf4j;
3638

3739
import java.sql.Connection;
3840
import java.sql.DatabaseMetaData;
41+
import java.sql.PreparedStatement;
3942
import java.sql.ResultSet;
4043
import java.sql.ResultSetMetaData;
4144
import java.sql.SQLException;
4245
import java.sql.Statement;
46+
import java.util.ArrayList;
47+
import java.util.Collections;
48+
import java.util.HashMap;
4349
import java.util.Iterator;
4450
import java.util.List;
4551
import java.util.Locale;
@@ -197,12 +203,54 @@ protected String getDropDatabaseSql(String databaseName) {
197203

198204
@Override
199205
public CatalogTable getTable(String sqlQuery) throws SQLException {
200-
Connection defaultConnection = getConnection(defaultUrl);
201-
try (Statement statement = defaultConnection.createStatement();
202-
ResultSet resultSet = statement.executeQuery(sqlQuery)) {
203-
ResultSetMetaData metaData = resultSet.getMetaData();
204-
return CatalogUtils.getCatalogTable(
205-
metaData, new OceanBaseMySqlTypeMapper(typeConverter), sqlQuery);
206+
try (Connection connection = getConnection(defaultUrl)) {
207+
String tableName = null;
208+
String databaseName = null;
209+
String schemaName = null;
210+
String catalogName = "jdbc_catalog";
211+
TableSchema.Builder schemaBuilder = TableSchema.builder();
212+
213+
try (Statement statement = connection.createStatement();
214+
ResultSet resultSet = statement.executeQuery(sqlQuery)) {
215+
ResultSetMetaData metaData = resultSet.getMetaData();
216+
tableName = metaData.getTableName(1);
217+
databaseName = metaData.getCatalogName(1);
218+
schemaName = metaData.getSchemaName(1);
219+
catalogName = metaData.getCatalogName(1);
220+
}
221+
databaseName = StringUtils.defaultIfBlank(databaseName, null);
222+
schemaName = StringUtils.defaultIfBlank(schemaName, null);
223+
224+
TablePath tablePath =
225+
StringUtils.isBlank(tableName)
226+
? TablePath.DEFAULT
227+
: TablePath.of(databaseName, schemaName, tableName);
228+
229+
try (PreparedStatement ps =
230+
connection.prepareStatement(getSelectColumnsSql(tablePath));
231+
ResultSet columnResultSet = ps.executeQuery();
232+
ResultSet primaryKeys =
233+
connection
234+
.getMetaData()
235+
.getPrimaryKeys(catalogName, schemaName, tableName)) {
236+
while (primaryKeys.next()) {
237+
String primaryKeyColumnName = primaryKeys.getString("COLUMN_NAME");
238+
schemaBuilder.primaryKey(
239+
PrimaryKey.of(
240+
primaryKeyColumnName,
241+
Collections.singletonList(primaryKeyColumnName)));
242+
}
243+
while (columnResultSet.next()) {
244+
schemaBuilder.column(buildColumn(columnResultSet));
245+
}
246+
}
247+
return CatalogTable.of(
248+
TableIdentifier.of(catalogName, tablePath),
249+
schemaBuilder.build(),
250+
new HashMap<>(),
251+
new ArrayList<>(),
252+
"",
253+
catalogName);
206254
}
207255
}
208256

seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMySqlTypeConverter.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.seatunnel.api.table.type.DecimalType;
2626
import org.apache.seatunnel.api.table.type.LocalTimeType;
2727
import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
28+
import org.apache.seatunnel.api.table.type.VectorType;
2829
import org.apache.seatunnel.common.exception.CommonError;
2930
import org.apache.seatunnel.connectors.seatunnel.common.source.TypeDefineUtils;
3031
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
@@ -100,6 +101,9 @@ public class OceanBaseMySqlTypeConverter
100101
public static final long POWER_2_32 = (long) Math.pow(2, 32);
101102
public static final long MAX_VARBINARY_LENGTH = POWER_2_16 - 4;
102103

104+
private static final String VECTOR_TYPE_NAME = "";
105+
private static final String VECTOR_NAME = "VECTOR";
106+
103107
@Override
104108
public String identifier() {
105109
return DatabaseIdentifier.OCENABASE;
@@ -289,6 +293,17 @@ public Column convert(BasicTypeDefine typeDefine) {
289293
builder.dataType(LocalTimeType.LOCAL_DATE_TIME_TYPE);
290294
builder.scale(typeDefine.getScale());
291295
break;
296+
case VECTOR_TYPE_NAME:
297+
String columnType = typeDefine.getColumnType();
298+
if (columnType.startsWith("vector(") && columnType.endsWith(")")) {
299+
Integer number =
300+
Integer.parseInt(
301+
columnType.substring(
302+
columnType.indexOf("(") + 1, columnType.indexOf(")")));
303+
builder.dataType(VectorType.VECTOR_FLOAT_TYPE);
304+
builder.scale(number);
305+
}
306+
break;
292307
default:
293308
throw CommonError.convertToSeaTunnelTypeError(
294309
DatabaseIdentifier.OCENABASE, mysqlDataType, typeDefine.getName());
@@ -501,6 +516,11 @@ public BasicTypeDefine<OceanBaseMysqlType> reconvert(Column column) {
501516
builder.columnType(MYSQL_DATETIME);
502517
}
503518
break;
519+
case FLOAT_VECTOR:
520+
builder.nativeType(VECTOR_NAME);
521+
builder.columnType(String.format("%s(%s)", VECTOR_NAME, column.getScale()));
522+
builder.dataType(VECTOR_NAME);
523+
break;
504524
default:
505525
throw CommonError.convertToConnectorTypeError(
506526
DatabaseIdentifier.OCENABASE,

seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMysqlJdbcRowConverter.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
3333
import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.JdbcFieldTypeUtils;
3434

35+
import org.apache.commons.lang3.StringUtils;
36+
3537
import java.math.BigDecimal;
3638
import java.nio.ByteBuffer;
3739
import java.sql.Date;
@@ -89,12 +91,16 @@ public SeaTunnelRow toInternal(ResultSet rs, TableSchema tableSchema) throws SQL
8991
fields[fieldIndex] = JdbcFieldTypeUtils.getFloat(rs, resultSetIndex);
9092
break;
9193
case FLOAT_VECTOR:
92-
Object[] objects = (Object[]) rs.getObject(fieldIndex);
93-
Float[] arrays = new Float[objects.length];
94-
for (int i = 0; i < objects.length; i++) {
95-
arrays[i] = Float.parseFloat(objects[i].toString());
94+
String result = JdbcFieldTypeUtils.getString(rs, resultSetIndex);
95+
if (StringUtils.isNotBlank(result)) {
96+
result = result.replace("[", "").replace("]", "");
97+
String[] stringArray = result.split(",");
98+
Float[] arrays = new Float[stringArray.length];
99+
for (int i = 0; i < stringArray.length; i++) {
100+
arrays[i] = Float.parseFloat(stringArray[i]);
101+
}
102+
fields[fieldIndex] = BufferUtils.toByteBuffer(arrays);
96103
}
97-
fields[fieldIndex] = BufferUtils.toByteBuffer(arrays);
98104
break;
99105
case DOUBLE:
100106
fields[fieldIndex] = JdbcFieldTypeUtils.getDouble(rs, resultSetIndex);

seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseITBase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,6 @@ void checkResult(String executeKey, TestContainer container, Container.ExecResul
8787

8888
@Override
8989
String driverUrl() {
90-
return "https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/2.4.11/oceanbase-client-2.4.11.jar";
90+
return "https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/2.4.12/oceanbase-client-2.4.12.jar";
9191
}
9292
}

0 commit comments

Comments
 (0)