Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve][Jdbc] Optimize source data dynamic splitting when where_condition is configured #8760

Open
wants to merge 4 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ public class JdbcSourceTableConfig implements Serializable {
@JsonProperty("skip_analyze")
private Boolean skipAnalyze;

@JsonProperty("where_condition")
private String whereConditionClause;

@Tolerate
public JdbcSourceTableConfig() {}

Expand Down Expand Up @@ -97,6 +100,8 @@ public static List<JdbcSourceTableConfig> of(ReadonlyConfig connectorConfig) {
tableConfig.setUseSelectCount(
connectorConfig.get(JdbcSourceOptions.USE_SELECT_COUNT));
tableConfig.setSkipAnalyze(connectorConfig.get(JdbcSourceOptions.SKIP_ANALYZE));
tableConfig.setWhereConditionClause(
connectorConfig.get(JdbcSourceOptions.WHERE_CONDITION));
});

if (tableList.size() > 1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,30 @@ default Long approximateRowCntStatement(Connection connection, JdbcSourceTable t
return SQLUtils.countForTable(connection, tableIdentifier(table.getTablePath()));
}

/**
* Total number of entries in the lookup table with condition.
*
* @param connection The JDBC connection object used to connect to the database.
* @param table table info.
* @return row count
*/
default Long rowCntWithWhereCondition(Connection connection, JdbcSourceTable table)
throws SQLException {
String subQuerySQL;
if (StringUtils.isNotBlank(table.getQuery())) {
subQuerySQL =
String.format(
"SELECT * FROM (%s) AS T %s",
table.getQuery(), table.getWhereConditionClause());
} else {
subQuerySQL =
String.format(
"SELECT * FROM %s %s",
tableIdentifier(table.getTablePath()), table.getWhereConditionClause());
}
return SQLUtils.countForSubquery(connection, subQuerySQL);
}

/**
* Performs a sampling operation on the specified column of a table in a JDBC-connected
* database.
Expand All @@ -335,17 +359,23 @@ default Object[] sampleDataFromColumn(
int samplingRate,
int fetchSize)
throws Exception {
String whereConditionClause =
StringUtils.isNotBlank(table.getWhereConditionClause())
? table.getWhereConditionClause()
: "";
String sampleQuery;
if (StringUtils.isNotBlank(table.getQuery())) {
sampleQuery =
String.format(
"SELECT %s FROM (%s) AS T",
quoteIdentifier(columnName), table.getQuery());
"SELECT %s FROM (%s) AS T %s",
quoteIdentifier(columnName), table.getQuery(), whereConditionClause);
} else {
sampleQuery =
String.format(
"SELECT %s FROM %s",
quoteIdentifier(columnName), tableIdentifier(table.getTablePath()));
"SELECT %s FROM %s %s",
quoteIdentifier(columnName),
tableIdentifier(table.getTablePath()),
whereConditionClause);
}

try (PreparedStatement stmt = creatPreparedStatement(connection, sampleQuery, fetchSize)) {
Expand Down Expand Up @@ -390,28 +420,34 @@ default Object queryNextChunkMax(
Object includedLowerBound)
throws SQLException {
String quotedColumn = quoteIdentifier(columnName);
String whereConditionClause =
StringUtils.isNotBlank(table.getWhereConditionClause())
? table.getWhereConditionClause() + " AND"
: "WHERE";
String sqlQuery;
if (StringUtils.isNotBlank(table.getQuery())) {
sqlQuery =
String.format(
"SELECT MAX(%s) FROM ("
+ "SELECT %s FROM (%s) AS T1 WHERE %s >= ? ORDER BY %s ASC LIMIT %s"
+ "SELECT %s FROM (%s) AS T1 %s %s >= ? ORDER BY %s ASC LIMIT %s"
+ ") AS T2",
quotedColumn,
quotedColumn,
table.getQuery(),
whereConditionClause,
quotedColumn,
quotedColumn,
chunkSize);
} else {
sqlQuery =
String.format(
"SELECT MAX(%s) FROM ("
+ "SELECT %s FROM %s WHERE %s >= ? ORDER BY %s ASC LIMIT %s"
+ "SELECT %s FROM %s %s %s >= ? ORDER BY %s ASC LIMIT %s"
+ ") AS T",
quotedColumn,
quotedColumn,
tableIdentifier(table.getTablePath()),
whereConditionClause,
quotedColumn,
quotedColumn,
chunkSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,16 +151,22 @@ public Object[] sampleDataFromColumn(
int fetchSize)
throws Exception {
String sampleQuery;
String whereConditionClause =
StringUtils.isNotBlank(table.getWhereConditionClause())
? table.getWhereConditionClause()
: "";
if (StringUtils.isNotBlank(table.getQuery())) {
sampleQuery =
String.format(
"SELECT %s FROM (%s) AS T",
quoteIdentifier(columnName), table.getQuery());
"SELECT %s FROM (%s) AS T %s",
quoteIdentifier(columnName), table.getQuery(), whereConditionClause);
} else {
sampleQuery =
String.format(
"SELECT %s FROM %s",
quoteIdentifier(columnName), tableIdentifier(table.getTablePath()));
"SELECT %s FROM %s %s",
quoteIdentifier(columnName),
tableIdentifier(table.getTablePath()),
whereConditionClause);
}

try (Statement stmt =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,28 +258,34 @@ public Object queryNextChunkMax(
Object includedLowerBound)
throws SQLException {
String quotedColumn = quoteIdentifier(columnName);
String whereConditionClause =
StringUtils.isNotBlank(table.getWhereConditionClause())
? table.getWhereConditionClause() + " AND"
: "WHERE";
String sqlQuery;
if (StringUtils.isNotBlank(table.getQuery())) {
sqlQuery =
String.format(
"SELECT MAX(%s) FROM ("
+ "SELECT %s FROM (%s) WHERE %s >= ? ORDER BY %s ASC "
+ "SELECT %s FROM (%s) %s %s >= ? ORDER BY %s ASC "
+ ") WHERE ROWNUM <= %s",
quotedColumn,
quotedColumn,
table.getQuery(),
whereConditionClause,
quotedColumn,
quotedColumn,
chunkSize);
} else {
sqlQuery =
String.format(
"SELECT MAX(%s) FROM ("
+ "SELECT %s FROM %s WHERE %s >= ? ORDER BY %s ASC "
+ "SELECT %s FROM %s %s %s >= ? ORDER BY %s ASC "
+ ") WHERE ROWNUM <= %s",
quotedColumn,
quotedColumn,
tableIdentifier(table.getTablePath()),
whereConditionClause,
quotedColumn,
quotedColumn,
chunkSize);
Expand All @@ -306,16 +312,23 @@ public Object[] sampleDataFromColumn(
int samplingRate,
int fetchSize)
throws Exception {
String whereConditionClause =
StringUtils.isNotBlank(table.getWhereConditionClause())
? table.getWhereConditionClause()
: "";
String sampleQuery;
if (StringUtils.isNotBlank(table.getQuery())) {
sampleQuery =
String.format(
"SELECT %s FROM (%s) T", quoteIdentifier(columnName), table.getQuery());
"SELECT %s FROM (%s) T %s",
quoteIdentifier(columnName), table.getQuery(), whereConditionClause);
} else {
sampleQuery =
String.format(
"SELECT %s FROM %s",
quoteIdentifier(columnName), tableIdentifier(table.getTablePath()));
"SELECT %s FROM %s %s",
quoteIdentifier(columnName),
tableIdentifier(table.getTablePath()),
whereConditionClause);
}

try (PreparedStatement stmt = creatPreparedStatement(connection, sampleQuery, fetchSize)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,28 +112,34 @@ public Object queryNextChunkMax(

String quotedColumn = quoteIdentifier(columnName);
quotedColumn = convertType(quotedColumn, column.getSourceType());
String whereConditionClause =
StringUtils.isNotBlank(table.getWhereConditionClause())
? table.getWhereConditionClause() + " AND"
: "WHERE";
String sqlQuery;
if (StringUtils.isNotBlank(table.getQuery())) {
sqlQuery =
String.format(
"SELECT MAX(%s) FROM ("
+ "SELECT %s FROM (%s) AS T1 WHERE %s >= ? ORDER BY %s ASC LIMIT %s"
+ "SELECT %s FROM (%s) AS T1 %s %s >= ? ORDER BY %s ASC LIMIT %s"
+ ") AS T2",
quotedColumn,
quotedColumn,
table.getQuery(),
whereConditionClause,
quotedColumn,
quotedColumn,
chunkSize);
} else {
sqlQuery =
String.format(
"SELECT MAX(%s) FROM ("
+ "SELECT %s FROM %s WHERE %s >= ? ORDER BY %s ASC LIMIT %s"
+ "SELECT %s FROM %s %s %s >= ? ORDER BY %s ASC LIMIT %s"
+ ") AS T",
quotedColumn,
quotedColumn,
tableIdentifier(table.getTablePath()),
whereConditionClause,
quotedColumn,
quotedColumn,
chunkSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,11 @@ protected Pair<Object, Object> queryMinMax(JdbcSourceTable table, String columnN
columnName,
jdbcDialect.tableIdentifier(table.getTablePath()));
}

if (StringUtils.isNotBlank(config.getWhereConditionClause())) {
sqlQuery = String.format("%s %s", sqlQuery, config.getWhereConditionClause());
}

try (Statement stmt = getOrEstablishConnection().createStatement()) {
log.info("Split table, query min max: {}", sqlQuery);
try (ResultSet resultSet = stmt.executeQuery(sqlQuery)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,9 @@ private List<ChunkRange> evenlyColumnSplitChunks(
}

private Long queryApproximateRowCnt(JdbcSourceTable table) throws SQLException {
if (StringUtils.isNotBlank(table.getWhereConditionClause())) {
return jdbcDialect.rowCntWithWhereCondition(getOrEstablishConnection(), table);
}
return jdbcDialect.approximateRowCntStatement(getOrEstablishConnection(), table);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,5 @@ public class JdbcSourceTable implements Serializable {
private final Boolean useSelectCount;
private final Boolean skipAnalyze;
private final CatalogTable catalogTable;
private final String whereConditionClause;
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ public static Map<TablePath, JdbcSourceTable> getTables(
.partitionEnd(tableConfig.getPartitionEnd())
.useSelectCount(tableConfig.getUseSelectCount())
.skipAnalyze(tableConfig.getSkipAnalyze())
.whereConditionClause(tableConfig.getWhereConditionClause())
.catalogTable(catalogTable)
.build();
tables.put(tablePath, jdbcSourceTable);
Expand Down Expand Up @@ -143,6 +144,7 @@ public static Map<TablePath, JdbcSourceTable> getTables(
.partitionEnd(tableConfig.getPartitionEnd())
.useSelectCount(tableConfig.getUseSelectCount())
.skipAnalyze(tableConfig.getSkipAnalyze())
.whereConditionClause(tableConfig.getWhereConditionClause())
.catalogTable(catalogTable)
.build();

Expand Down
Loading