Skip to content

Commit 86fc22a

Browse files
committed
Merge branch 'feat_1.10_dirtyDataConsumer' into hotfix_1.10_4.1.x_34810
2 parents d424dea + 289dba2 commit 86fc22a

File tree

7 files changed

+99
-51
lines changed

7 files changed

+99
-51
lines changed

core/src/main/java/com/dtstack/flink/sql/dirtyManager/manager/DirtyDataManager.java

+19-10
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,17 @@
3333
import java.util.concurrent.TimeUnit;
3434
import java.util.concurrent.atomic.AtomicLong;
3535

36+
import static com.dtstack.flink.sql.dirtyManager.manager.DirtyKeys.DEFAULT_BLOCKING_INTERVAL;
37+
import static com.dtstack.flink.sql.dirtyManager.manager.DirtyKeys.DEFAULT_ERROR_LIMIT_RATE;
38+
import static com.dtstack.flink.sql.dirtyManager.manager.DirtyKeys.DEFAULT_PRINT_LIMIT;
39+
import static com.dtstack.flink.sql.dirtyManager.manager.DirtyKeys.DEFAULT_TYPE;
40+
import static com.dtstack.flink.sql.dirtyManager.manager.DirtyKeys.DIRTY_BLOCK_STR;
41+
import static com.dtstack.flink.sql.dirtyManager.manager.DirtyKeys.DIRTY_LIMIT_RATE_STR;
42+
import static com.dtstack.flink.sql.dirtyManager.manager.DirtyKeys.PLUGIN_LOAD_MODE_STR;
43+
import static com.dtstack.flink.sql.dirtyManager.manager.DirtyKeys.PLUGIN_PATH_STR;
44+
import static com.dtstack.flink.sql.dirtyManager.manager.DirtyKeys.PLUGIN_TYPE_STR;
45+
import static com.dtstack.flink.sql.dirtyManager.manager.DirtyKeys.PRINT_LIMIT_STR;
46+
3647
/**
3748
* @author tiezhu
3849
* Company dtstack
@@ -43,11 +54,7 @@ public class DirtyDataManager implements Serializable {
4354
public final static int MAX_POOL_SIZE_LIMIT = 5;
4455
private static final long serialVersionUID = 7190970299538893497L;
4556
private static final Logger LOG = LoggerFactory.getLogger(DirtyDataManager.class);
46-
private static final String DIRTY_BLOCK_STR = "blockingInterval";
47-
private static final String DIRTY_LIMIT_RATE_STR = "errorLimitRate";
4857
private final static int MAX_TASK_QUEUE_SIZE = 100;
49-
private final static String DEFAULT_ERROR_LIMIT_RATE = "0.8";
50-
private final static String DEFAULT_BLOCKING_INTERVAL = "60";
5158
public static AbstractDirtyDataConsumer consumer;
5259

5360
private static ThreadPoolExecutor dirtyDataConsumer;
@@ -77,14 +84,14 @@ public static DirtyDataManager newInstance(Properties properties) {
7784
manager.blockingInterval = Long.parseLong(String.valueOf(properties.getOrDefault(DIRTY_BLOCK_STR, DEFAULT_BLOCKING_INTERVAL)));
7885
manager.errorLimitRate = Double.parseDouble(String.valueOf(properties.getOrDefault(DIRTY_LIMIT_RATE_STR, DEFAULT_ERROR_LIMIT_RATE)));
7986
consumer = DirtyConsumerFactory.getDirtyConsumer(
80-
properties.getProperty("type")
81-
, properties.getProperty("pluginPath")
82-
, properties.getProperty("pluginLoadMode")
87+
properties.getProperty(PLUGIN_TYPE_STR, DEFAULT_TYPE)
88+
, properties.getProperty(PLUGIN_PATH_STR)
89+
, properties.getProperty(PLUGIN_LOAD_MODE_STR)
8390
);
8491
consumer.init(properties);
8592
consumer.setQueue(new LinkedBlockingQueue<>());
8693
dirtyDataConsumer = new ThreadPoolExecutor(MAX_POOL_SIZE_LIMIT, MAX_POOL_SIZE_LIMIT, 0, TimeUnit.MILLISECONDS,
87-
new LinkedBlockingQueue<>(MAX_TASK_QUEUE_SIZE), new DTThreadFactory("dirtyDataConsumer"), new ThreadPoolExecutor.CallerRunsPolicy());
94+
new LinkedBlockingQueue<>(MAX_TASK_QUEUE_SIZE), new DTThreadFactory("dirtyDataConsumer", true), new ThreadPoolExecutor.CallerRunsPolicy());
8895
dirtyDataConsumer.execute(consumer);
8996
return manager;
9097
} catch (Exception e) {
@@ -99,8 +106,8 @@ public static DirtyDataManager newInstance(Properties properties) {
99106
*/
100107
public static String buildDefaultDirty() {
101108
JSONObject jsonObject = new JSONObject();
102-
jsonObject.put("type", "console");
103-
jsonObject.put("printLimit", "1000");
109+
jsonObject.put(PLUGIN_TYPE_STR, DEFAULT_TYPE);
110+
jsonObject.put(PRINT_LIMIT_STR, DEFAULT_PRINT_LIMIT);
104111
return jsonObject.toJSONString();
105112
}
106113

@@ -128,6 +135,8 @@ public void collectDirtyData(String dataInfo, String cause) {
128135
LOG.warn("dirty Data insert error ... Failed number: " + errorCount.incrementAndGet());
129136
LOG.warn("error dirty data:" + dirtyDataEntity.toString());
130137
if (errorCount.get() > Math.ceil(count.longValue() * errorLimitRate)) {
138+
// close consumer and manager
139+
close();
131140
throw new RuntimeException(String.format("The number of failed number 【%s】 reaches the limit, manager fails", errorCount.get()));
132141
}
133142
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.dirtyManager.manager;
20+
21+
/**
22+
* Date: 2021/1/6
23+
*
24+
* @author tiezhu
25+
* Company dtstack
26+
*/
27+
public class DirtyKeys {
28+
public final static String DEFAULT_TYPE = "console";
29+
public final static String DEFAULT_BLOCKING_INTERVAL = "60";
30+
public final static String DEFAULT_ERROR_LIMIT_RATE = "0.8";
31+
public final static String DEFAULT_PRINT_LIMIT = "1000";
32+
33+
public final static String DIRTY_BLOCK_STR = "blockingInterval";
34+
public final static String DIRTY_LIMIT_RATE_STR = "errorLimitRate";
35+
public final static String PLUGIN_TYPE_STR = "type";
36+
public final static String PLUGIN_PATH_STR = "pluginPath";
37+
public final static String PLUGIN_LOAD_MODE_STR = "pluginLoadMode";
38+
39+
public final static String PRINT_LIMIT_STR = "printLimit";
40+
}

core/src/main/java/com/dtstack/flink/sql/dirtyManager/manager/TestMain.java

-34
This file was deleted.

core/src/main/java/com/dtstack/flink/sql/exec/ExecuteProcessHelper.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,13 @@ public static StreamExecutionEnvironment getStreamExecution(ParamsInfo paramsInf
196196
// cache classPathSets
197197
ExecuteProcessHelper.registerPluginUrlToCachedFile(env, classPathSets);
198198

199-
ExecuteProcessHelper.sqlTranslation(paramsInfo.getLocalSqlPluginPath(), paramsInfo.getPluginLoadMode(), tableEnv, sqlTree, sideTableMap, registerTableCache);
199+
ExecuteProcessHelper.sqlTranslation(
200+
paramsInfo.getLocalSqlPluginPath(),
201+
paramsInfo.getPluginLoadMode(),
202+
tableEnv,
203+
sqlTree,
204+
sideTableMap,
205+
registerTableCache);
200206

201207
if (env instanceof MyLocalStreamEnvironment) {
202208
((MyLocalStreamEnvironment) env).setClasspaths(ClassLoaderManager.getClassPath());

core/src/main/java/com/dtstack/flink/sql/factory/DTThreadFactory.java

+18-3
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
* limitations under the License.
1717
*/
1818

19-
2019

2120
package com.dtstack.flink.sql.factory;
2221

@@ -33,6 +32,7 @@ public class DTThreadFactory implements ThreadFactory {
3332
private final static AtomicInteger THREAD_NUMBER = new AtomicInteger(1);
3433
private final ThreadGroup group;
3534
private final String namePrefix;
35+
private Boolean isDaemon = false;
3636

3737
public DTThreadFactory(String factoryName) {
3838
SecurityManager s = System.getSecurityManager();
@@ -43,14 +43,29 @@ public DTThreadFactory(String factoryName) {
4343
"-thread-";
4444
}
4545

46+
public DTThreadFactory(String factoryName, Boolean isDaemon) {
47+
SecurityManager s = System.getSecurityManager();
48+
group = (s != null) ? s.getThreadGroup() :
49+
Thread.currentThread().getThreadGroup();
50+
namePrefix = factoryName + "-pool-" +
51+
POOL_NUMBER.getAndIncrement() +
52+
"-thread-";
53+
this.isDaemon = isDaemon;
54+
}
55+
4656
@Override
4757
public Thread newThread(Runnable r) {
4858
Thread t = new Thread(group, r,
4959
namePrefix + THREAD_NUMBER.getAndIncrement(),
5060
0);
51-
if (t.isDaemon()) {
52-
t.setDaemon(false);
61+
if (this.isDaemon) {
62+
t.setDaemon(true);
63+
} else {
64+
if (t.isDaemon()) {
65+
t.setDaemon(false);
66+
}
5367
}
68+
5469
if (t.getPriority() != Thread.NORM_PRIORITY) {
5570
t.setPriority(Thread.NORM_PRIORITY);
5671
}

hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseOutputFormat.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -264,16 +264,16 @@ protected void dealInsert(Row record) {
264264

265265
try {
266266
table.put(put);
267+
if (outRecords.getCount() % ROW_PRINT_FREQUENCY == 0) {
268+
LOG.info(record.toString());
269+
}
267270
} catch (Exception e) {
268271
dirtyDataManager.collectDirtyData(
269272
record.toString()
270273
, e.getMessage());
271274
outDirtyRecords.inc();
272275
}
273276

274-
if (outRecords.getCount() % ROW_PRINT_FREQUENCY == 0) {
275-
LOG.info(record.toString());
276-
}
277277
outRecords.inc();
278278
}
279279

localTest/pom.xml

+12
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,18 @@
6969
<!-- <version>1.0-SNAPSHOT</version>-->
7070
<!-- </dependency>-->
7171

72+
<dependency>
73+
<groupId>com.dtstack.flink</groupId>
74+
<artifactId>dirtyConsumer.console</artifactId>
75+
<version>1.0-SNAPSHOT</version>
76+
</dependency>
77+
78+
<dependency>
79+
<groupId>com.dtstack.flink</groupId>
80+
<artifactId>dirtyConsumer.mysql</artifactId>
81+
<version>1.0-SNAPSHOT</version>
82+
</dependency>
83+
7284
<dependency>
7385
<groupId>com.dtstack.flink</groupId>
7486
<artifactId>sql.source.kafka11</artifactId>

0 commit comments

Comments
 (0)