Skip to content

Commit ac0371e

Browse files
committed
[fix-35187][rdb]修复连接失败任务不停止
1 parent cd0be00 commit ac0371e

File tree

14 files changed

+226
-226
lines changed

14 files changed

+226
-226
lines changed

clickhouse/clickhouse-side/clickhouse-async-side/src/main/java/com/dtstack/flink/sql/side/clickhouse/ClickhouseAsyncReqRow.java

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -19,23 +19,16 @@
1919

2020
package com.dtstack.flink.sql.side.clickhouse;
2121

22-
import com.dtstack.flink.sql.factory.DTThreadFactory;
22+
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
2323
import com.dtstack.flink.sql.side.FieldInfo;
2424
import com.dtstack.flink.sql.side.JoinInfo;
25-
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
2625
import com.dtstack.flink.sql.side.rdb.async.RdbAsyncReqRow;
2726
import com.dtstack.flink.sql.side.rdb.table.RdbSideTableInfo;
28-
import io.vertx.core.Vertx;
29-
import io.vertx.core.VertxOptions;
3027
import io.vertx.core.json.JsonObject;
31-
import io.vertx.ext.jdbc.JDBCClient;
3228
import org.apache.flink.api.java.typeutils.RowTypeInfo;
3329
import org.apache.flink.configuration.Configuration;
3430

3531
import java.util.List;
36-
import java.util.concurrent.LinkedBlockingQueue;
37-
import java.util.concurrent.ThreadPoolExecutor;
38-
import java.util.concurrent.TimeUnit;
3932

4033

4134
public class ClickhouseAsyncReqRow extends RdbAsyncReqRow {
@@ -48,6 +41,10 @@ public ClickhouseAsyncReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<Fi
4841
@Override
4942
public void open(Configuration parameters) throws Exception {
5043
super.open(parameters);
44+
}
45+
46+
@Override
47+
public JsonObject buildJdbcConfig() {
5148
JsonObject clickhouseClientConfig = new JsonObject();
5249
RdbSideTableInfo rdbSideTableInfo = (RdbSideTableInfo) sideInfo.getSideTableInfo();
5350
clickhouseClientConfig.put("url", rdbSideTableInfo.getUrl())
@@ -60,13 +57,6 @@ public void open(Configuration parameters) throws Exception {
6057
.put("idle_connection_test_period", DEFAULT_IDLE_CONNECTION_TEST_PEROID)
6158
.put("test_connection_on_checkin", DEFAULT_TEST_CONNECTION_ON_CHECKIN);
6259

63-
System.setProperty("vertx.disableFileCPResolving", "true");
64-
VertxOptions vo = new VertxOptions();
65-
vo.setEventLoopPoolSize(DEFAULT_VERTX_EVENT_LOOP_POOL_SIZE);
66-
vo.setWorkerPoolSize(rdbSideTableInfo.getAsyncPoolSize());
67-
vo.setFileResolverCachingEnabled(false);
68-
Vertx vertx = Vertx.vertx(vo);
69-
setRdbSqlClient(JDBCClient.createNonShared(vertx, clickhouseClientConfig));
60+
return clickhouseClientConfig;
7061
}
71-
7262
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package com.dtstack.flink.sql.exception;
2+
3+
import java.util.Objects;
4+
5+
/**
6+
* @author tiezhu
7+
* @date 2021/2/2 星期二
8+
* Company dtstack
9+
*/
10+
public class ExceptionTrace {
11+
// 追溯当前异常的最原始异常信息
12+
public static String traceOriginalCause(Throwable e) {
13+
String errorMsg;
14+
if (Objects.nonNull(e.getCause())) {
15+
errorMsg = traceOriginalCause(e.getCause());
16+
} else {
17+
errorMsg = e.getMessage();
18+
}
19+
return errorMsg;
20+
}
21+
}

core/src/main/java/com/dtstack/flink/sql/side/BaseAsyncReqRow.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.apache.flink.api.common.functions.RuntimeContext;
3535
import org.apache.flink.configuration.Configuration;
3636
import org.apache.flink.metrics.Counter;
37+
import org.apache.flink.runtime.execution.SuppressRestartsException;
3738
import org.apache.flink.streaming.api.functions.async.ResultFuture;
3839
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
3940
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
@@ -275,7 +276,7 @@ protected void dealFillDataError(BaseRow input, ResultFuture<BaseRow> resultFutu
275276
parseErrorRecords.inc();
276277
if (parseErrorRecords.getCount() > sideInfo.getSideTableInfo().getAsyncFailMaxNum(Long.MAX_VALUE)) {
277278
LOG.info("dealFillDataError", e);
278-
resultFuture.completeExceptionally(e);
279+
resultFuture.completeExceptionally(new SuppressRestartsException(e));
279280
} else {
280281
dealMissKey(input, resultFuture);
281282
}

db2/db2-side/db2-async-side/src/main/java/com/dtstack/flink/sql/side/db2/Db2AsyncReqRow.java

Lines changed: 8 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -18,23 +18,16 @@
1818

1919
package com.dtstack.flink.sql.side.db2;
2020

21-
import com.dtstack.flink.sql.factory.DTThreadFactory;
21+
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
2222
import com.dtstack.flink.sql.side.FieldInfo;
2323
import com.dtstack.flink.sql.side.JoinInfo;
24-
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
2524
import com.dtstack.flink.sql.side.rdb.async.RdbAsyncReqRow;
2625
import com.dtstack.flink.sql.side.rdb.table.RdbSideTableInfo;
27-
import io.vertx.core.Vertx;
28-
import io.vertx.core.VertxOptions;
2926
import io.vertx.core.json.JsonObject;
30-
import io.vertx.ext.jdbc.JDBCClient;
3127
import org.apache.flink.api.java.typeutils.RowTypeInfo;
3228
import org.apache.flink.configuration.Configuration;
3329

3430
import java.util.List;
35-
import java.util.concurrent.LinkedBlockingQueue;
36-
import java.util.concurrent.ThreadPoolExecutor;
37-
import java.util.concurrent.TimeUnit;
3831

3932
/**
4033
* Reason:
@@ -56,9 +49,13 @@ public Db2AsyncReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo
5649
@Override
5750
public void open(Configuration parameters) throws Exception {
5851
super.open(parameters);
59-
JsonObject db2lientConfig = new JsonObject();
52+
}
53+
54+
@Override
55+
public JsonObject buildJdbcConfig() {
56+
JsonObject db2ClientConfig = new JsonObject();
6057
RdbSideTableInfo rdbSideTableInfo = (RdbSideTableInfo) sideInfo.getSideTableInfo();
61-
db2lientConfig.put("url", rdbSideTableInfo.getUrl())
58+
db2ClientConfig.put("url", rdbSideTableInfo.getUrl())
6259
.put("driver_class", DB2_DRIVER)
6360
.put("max_pool_size", rdbSideTableInfo.getAsyncPoolSize())
6461
.put("user", rdbSideTableInfo.getUserName())
@@ -67,15 +64,6 @@ public void open(Configuration parameters) throws Exception {
6764
.put("preferred_test_query", DB2_PREFERRED_TEST_QUERY_SQL)
6865
.put("idle_connection_test_period", DEFAULT_IDLE_CONNECTION_TEST_PEROID)
6966
.put("test_connection_on_checkin", DEFAULT_TEST_CONNECTION_ON_CHECKIN);
70-
71-
System.setProperty("vertx.disableFileCPResolving", "true");
72-
73-
VertxOptions vo = new VertxOptions();
74-
vo.setEventLoopPoolSize(DEFAULT_VERTX_EVENT_LOOP_POOL_SIZE);
75-
vo.setWorkerPoolSize(rdbSideTableInfo.getAsyncPoolSize());
76-
vo.setFileResolverCachingEnabled(false);
77-
Vertx vertx = Vertx.vertx(vo);
78-
setRdbSqlClient(JDBCClient.createNonShared(vertx, db2lientConfig));
67+
return db2ClientConfig;
7968
}
80-
8169
}

impala/impala-side/impala-async-side/src/main/java/com/dtstack/flink/sql/side/impala/ImpalaAsyncReqRow.java

Lines changed: 28 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,11 @@
2424
import com.dtstack.flink.sql.side.impala.table.ImpalaSideTableInfo;
2525
import com.dtstack.flink.sql.side.rdb.async.RdbAsyncReqRow;
2626
import com.dtstack.flink.sql.util.KrbUtils;
27-
import io.vertx.core.Vertx;
28-
import io.vertx.core.VertxOptions;
2927
import io.vertx.core.json.JsonObject;
30-
import io.vertx.ext.jdbc.JDBCClient;
3128
import io.vertx.ext.sql.SQLClient;
3229
import org.apache.flink.api.java.typeutils.RowTypeInfo;
3330
import org.apache.flink.configuration.Configuration;
31+
import org.apache.flink.runtime.execution.SuppressRestartsException;
3432
import org.apache.flink.streaming.api.functions.async.ResultFuture;
3533
import org.apache.flink.table.dataformat.BaseRow;
3634
import org.apache.hadoop.security.UserGroupInformation;
@@ -65,41 +63,38 @@ public ImpalaAsyncReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldI
6563

6664
@Override
6765
public void open(Configuration parameters) throws Exception {
68-
ImpalaSideTableInfo impalaSideTableInfo = (ImpalaSideTableInfo) sideInfo.getSideTableInfo();
69-
if (impalaSideTableInfo.getAuthMech() == 1) {
70-
String keyTabFilePath = impalaSideTableInfo.getKeyTabFilePath();
71-
String krb5FilePath = impalaSideTableInfo.getKrb5FilePath();
72-
String principal = impalaSideTableInfo.getPrincipal();
73-
ugi = KrbUtils.loginAndReturnUgi(principal, keyTabFilePath, krb5FilePath);
74-
openJdbc(parameters);
75-
} else {
76-
openJdbc(parameters);
77-
}
66+
super.open(parameters);
7867
}
7968

80-
public void openJdbc(Configuration parameters) throws Exception {
81-
super.open(parameters);
69+
@Override
70+
public JsonObject buildJdbcConfig() {
8271
ImpalaSideTableInfo impalaSideTableInfo = (ImpalaSideTableInfo) sideInfo.getSideTableInfo();
72+
73+
try {
74+
if (impalaSideTableInfo.getAuthMech() == 1) {
75+
String keyTabFilePath = impalaSideTableInfo.getKeyTabFilePath();
76+
String krb5FilePath = impalaSideTableInfo.getKrb5FilePath();
77+
String principal = impalaSideTableInfo.getPrincipal();
78+
ugi = KrbUtils.loginAndReturnUgi(principal, keyTabFilePath, krb5FilePath);
79+
}
80+
} catch (Exception e) {
81+
throw new SuppressRestartsException(
82+
new Throwable("impala login with kerberos error. cause by: " + e.getMessage()));
83+
}
84+
8385
JsonObject impalaClientConfig = new JsonObject();
8486
impalaClientConfig.put("url", getUrl())
85-
.put("driver_class", IMPALA_DRIVER)
86-
.put("max_pool_size", impalaSideTableInfo.getAsyncPoolSize())
87-
.put("provider_class", DT_PROVIDER_CLASS)
88-
.put("idle_connection_test_period", 300)
89-
.put("test_connection_on_checkin", DEFAULT_TEST_CONNECTION_ON_CHECKIN)
90-
.put("max_idle_time", 600)
91-
.put("preferred_test_query", PREFERRED_TEST_QUERY_SQL)
92-
.put("idle_connection_test_period", DEFAULT_IDLE_CONNECTION_TEST_PEROID)
93-
.put("test_connection_on_checkin", DEFAULT_TEST_CONNECTION_ON_CHECKIN);
94-
95-
System.setProperty("vertx.disableFileCPResolving", "true");
96-
97-
VertxOptions vo = new VertxOptions();
98-
vo.setEventLoopPoolSize(DEFAULT_VERTX_EVENT_LOOP_POOL_SIZE);
99-
vo.setWorkerPoolSize(impalaSideTableInfo.getAsyncPoolSize());
100-
vo.setFileResolverCachingEnabled(false);
101-
Vertx vertx = Vertx.vertx(vo);
102-
setRdbSqlClient(JDBCClient.createNonShared(vertx, impalaClientConfig));
87+
.put("driver_class", IMPALA_DRIVER)
88+
.put("max_pool_size", impalaSideTableInfo.getAsyncPoolSize())
89+
.put("provider_class", DT_PROVIDER_CLASS)
90+
.put("idle_connection_test_period", 300)
91+
.put("test_connection_on_checkin", DEFAULT_TEST_CONNECTION_ON_CHECKIN)
92+
.put("max_idle_time", 600)
93+
.put("preferred_test_query", PREFERRED_TEST_QUERY_SQL)
94+
.put("idle_connection_test_period", DEFAULT_IDLE_CONNECTION_TEST_PEROID)
95+
.put("test_connection_on_checkin", DEFAULT_TEST_CONNECTION_ON_CHECKIN);
96+
97+
return impalaClientConfig;
10398
}
10499

105100
public String getUrl() {

kingbase/kingbase-side/kingbase-async-side/src/main/java/com/dtstack/flink/sql/side/kingbase/KingbaseAsyncReqRow.java

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,7 @@
2323
import com.dtstack.flink.sql.side.JoinInfo;
2424
import com.dtstack.flink.sql.side.rdb.async.RdbAsyncReqRow;
2525
import com.dtstack.flink.sql.side.rdb.table.RdbSideTableInfo;
26-
import io.vertx.core.Vertx;
27-
import io.vertx.core.VertxOptions;
2826
import io.vertx.core.json.JsonObject;
29-
import io.vertx.ext.jdbc.JDBCClient;
3027
import org.apache.flink.api.java.typeutils.RowTypeInfo;
3128
import org.apache.flink.configuration.Configuration;
3229

@@ -54,6 +51,10 @@ public KingbaseAsyncReqRow(RowTypeInfo rowTypeInfo,
5451
@Override
5552
public void open(Configuration parameters) throws Exception {
5653
super.open(parameters);
54+
}
55+
56+
@Override
57+
public JsonObject buildJdbcConfig() {
5758
JsonObject kingbaseClient = new JsonObject();
5859
RdbSideTableInfo rdbSideTableInfo = (RdbSideTableInfo) sideInfo.getSideTableInfo();
5960
kingbaseClient.put("url", rdbSideTableInfo.getUrl())
@@ -65,10 +66,6 @@ public void open(Configuration parameters) throws Exception {
6566
.put("idle_connection_test_period", DEFAULT_IDLE_CONNECTION_TEST_PEROID)
6667
.put("test_connection_on_checkin", DEFAULT_TEST_CONNECTION_ON_CHECKIN);
6768

68-
VertxOptions vo = new VertxOptions();
69-
vo.setEventLoopPoolSize(DEFAULT_VERTX_EVENT_LOOP_POOL_SIZE);
70-
vo.setWorkerPoolSize(rdbSideTableInfo.getAsyncPoolSize());
71-
Vertx vertx = Vertx.vertx(vo);
72-
setRdbSqlClient(JDBCClient.createNonShared(vertx, kingbaseClient));
69+
return kingbaseClient;
7370
}
7471
}

mysql/mysql-side/mysql-async-side/src/main/java/com/dtstack/flink/sql/side/mysql/MysqlAsyncReqRow.java

Lines changed: 6 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -19,23 +19,16 @@
1919

2020
package com.dtstack.flink.sql.side.mysql;
2121

22-
import com.dtstack.flink.sql.factory.DTThreadFactory;
22+
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
2323
import com.dtstack.flink.sql.side.FieldInfo;
2424
import com.dtstack.flink.sql.side.JoinInfo;
25-
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
2625
import com.dtstack.flink.sql.side.rdb.async.RdbAsyncReqRow;
2726
import com.dtstack.flink.sql.side.rdb.table.RdbSideTableInfo;
28-
import io.vertx.core.Vertx;
29-
import io.vertx.core.VertxOptions;
3027
import io.vertx.core.json.JsonObject;
31-
import io.vertx.ext.jdbc.JDBCClient;
3228
import org.apache.flink.api.java.typeutils.RowTypeInfo;
3329
import org.apache.flink.configuration.Configuration;
3430

3531
import java.util.List;
36-
import java.util.concurrent.LinkedBlockingQueue;
37-
import java.util.concurrent.ThreadPoolExecutor;
38-
import java.util.concurrent.TimeUnit;
3932

4033
/**
4134
* Mysql dim table
@@ -56,6 +49,10 @@ public MysqlAsyncReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldIn
5649
@Override
5750
public void open(Configuration parameters) throws Exception {
5851
super.open(parameters);
52+
}
53+
54+
@Override
55+
public JsonObject buildJdbcConfig() {
5956
JsonObject mysqlClientConfig = new JsonObject();
6057
RdbSideTableInfo rdbSideTableInfo = (RdbSideTableInfo) sideInfo.getSideTableInfo();
6158
mysqlClientConfig.put("url", rdbSideTableInfo.getUrl())
@@ -68,14 +65,6 @@ public void open(Configuration parameters) throws Exception {
6865
.put("idle_connection_test_period", DEFAULT_IDLE_CONNECTION_TEST_PEROID)
6966
.put("test_connection_on_checkin", DEFAULT_TEST_CONNECTION_ON_CHECKIN);
7067

71-
System.setProperty("vertx.disableFileCPResolving", "true");
72-
73-
VertxOptions vo = new VertxOptions();
74-
vo.setEventLoopPoolSize(DEFAULT_VERTX_EVENT_LOOP_POOL_SIZE);
75-
vo.setWorkerPoolSize(rdbSideTableInfo.getAsyncPoolSize());
76-
vo.setFileResolverCachingEnabled(false);
77-
Vertx vertx = Vertx.vertx(vo);
78-
setRdbSqlClient(JDBCClient.createNonShared(vertx, mysqlClientConfig));
68+
return mysqlClientConfig;
7969
}
80-
8170
}

oceanbase/oceanbase-side/oceanbase-async-side/src/main/java/com/dtstack/flink/sql/side/oceanbase/OceanbaseAsyncReqRow.java

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,7 @@
2222
import com.dtstack.flink.sql.side.JoinInfo;
2323
import com.dtstack.flink.sql.side.rdb.async.RdbAsyncReqRow;
2424
import com.dtstack.flink.sql.side.rdb.table.RdbSideTableInfo;
25-
import io.vertx.core.Vertx;
26-
import io.vertx.core.VertxOptions;
2725
import io.vertx.core.json.JsonObject;
28-
import io.vertx.ext.jdbc.JDBCClient;
2926
import org.apache.flink.api.java.typeutils.RowTypeInfo;
3027
import org.apache.flink.configuration.Configuration;
3128

@@ -49,6 +46,10 @@ public OceanbaseAsyncReqRow(RowTypeInfo rowTypeInfo,
4946
@Override
5047
public void open(Configuration parameters) throws Exception {
5148
super.open(parameters);
49+
}
50+
51+
@Override
52+
public JsonObject buildJdbcConfig() {
5253
JsonObject oceanbaseClientConfig = new JsonObject();
5354
RdbSideTableInfo rdbSideTableInfo = (RdbSideTableInfo) sideInfo.getSideTableInfo();
5455
oceanbaseClientConfig.put("url", rdbSideTableInfo.getUrl())
@@ -61,13 +62,6 @@ public void open(Configuration parameters) throws Exception {
6162
.put("idle_connection_test_period", DEFAULT_IDLE_CONNECTION_TEST_PEROID)
6263
.put("test_conncetion_on_checkin", DEFAULT_TEST_CONNECTION_ON_CHECKIN);
6364

64-
System.setProperty("vertx.disableFileCPResolving", "true");
65-
66-
VertxOptions vo = new VertxOptions();
67-
vo.setEventLoopPoolSize(DEFAULT_VERTX_EVENT_LOOP_POOL_SIZE);
68-
vo.setWorkerPoolSize(DEFAULT_VERTX_WORKER_POOL_SIZE);
69-
vo.setFileResolverCachingEnabled(false);
70-
Vertx vertx = Vertx.vertx(vo);
71-
setRdbSqlClient(JDBCClient.createNonShared(vertx, oceanbaseClientConfig));
65+
return oceanbaseClientConfig;
7266
}
7367
}

0 commit comments

Comments
 (0)