Skip to content

Commit 7bbd2da

Browse files
committed
[opt-34810][HBase][core]优化脏数据打印,修复在prejob模式下,脏数据插件序列化失败问题
1 parent f61e779 commit 7bbd2da

File tree

19 files changed

+169
-85
lines changed

19 files changed

+169
-85
lines changed

core/pom.xml

+1-11
Original file line numberDiff line numberDiff line change
@@ -108,17 +108,6 @@
108108
<artifactId>joda-time</artifactId>
109109
<version>2.5</version>
110110
</dependency>
111-
<dependency>
112-
<groupId>org.slf4j</groupId>
113-
<artifactId>slf4j-simple</artifactId>
114-
<version>1.7.30</version>
115-
</dependency>
116-
117-
<dependency>
118-
<groupId>log4j</groupId>
119-
<artifactId>log4j</artifactId>
120-
<version>1.2.17</version>
121-
</dependency>
122111
</dependencies>
123112

124113
<build>
@@ -187,6 +176,7 @@
187176
<include name="${project.artifactId}-${project.version}.jar" />
188177
</fileset>
189178
</copy>
179+
<!--suppress UnresolvedMavenProperty -->
190180
<move file="${basedir}/../sqlplugins/${project.artifactId}-${project.version}.jar"
191181
tofile="${basedir}/../sqlplugins/${project.name}-${git.branch}.jar" />
192182
</tasks>

core/src/main/java/com/dtstack/flink/sql/dirtyManager/consumer/AbstractDirtyDataConsumer.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import org.slf4j.LoggerFactory;
2424

2525
import java.io.Serializable;
26-
import java.util.Properties;
26+
import java.util.Map;
2727
import java.util.concurrent.LinkedBlockingQueue;
2828
import java.util.concurrent.TimeUnit;
2929
import java.util.concurrent.atomic.AtomicBoolean;
@@ -66,7 +66,7 @@ public abstract class AbstractDirtyDataConsumer implements Runnable, Serializabl
6666
* @param properties 任务参数
6767
* @throws Exception throw exception
6868
*/
69-
public abstract void init(Properties properties) throws Exception;
69+
public abstract void init(Map<String, Object> properties) throws Exception;
7070

7171
/**
7272
* 检验consumer是否正在执行

core/src/main/java/com/dtstack/flink/sql/dirtyManager/consumer/DirtyConsumerFactory.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,10 @@
3333
* Date 2020/12/21 星期一
3434
*/
3535
public class DirtyConsumerFactory {
36-
private static final String DEFAULT_DIRTY_TYPE = "console";
37-
private static final String DIRTY_CONSUMER_PATH = "dirtyData";
38-
private static final String CLASS_PRE_STR = "com.dtstack.flink.sql.dirty";
39-
private static final String CLASS_POST_STR = "DirtyDataConsumer";
36+
public static final String DEFAULT_DIRTY_TYPE = "console";
37+
public static final String DIRTY_CONSUMER_PATH = "dirtyData";
38+
public static final String CLASS_PRE_STR = "com.dtstack.flink.sql.dirty";
39+
public static final String CLASS_POST_STR = "DirtyDataConsumer";
4040

4141
/**
4242
* 通过动态方式去加载脏数据插件

core/src/main/java/com/dtstack/flink/sql/dirtyManager/manager/DirtyDataManager.java

+68-20
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@
2727
import org.slf4j.LoggerFactory;
2828

2929
import java.io.Serializable;
30-
import java.util.Properties;
30+
import java.util.Map;
31+
import java.util.Objects;
3132
import java.util.concurrent.LinkedBlockingQueue;
3233
import java.util.concurrent.ThreadPoolExecutor;
3334
import java.util.concurrent.TimeUnit;
@@ -50,14 +51,17 @@
5051
* Date 2020/8/27 星期四
5152
*/
5253
public class DirtyDataManager implements Serializable {
54+
private static final long serialVersionUID = 1L;
5355

5456
public final static int MAX_POOL_SIZE_LIMIT = 5;
55-
private static final long serialVersionUID = 7190970299538893497L;
5657
private static final Logger LOG = LoggerFactory.getLogger(DirtyDataManager.class);
5758
private final static int MAX_TASK_QUEUE_SIZE = 100;
58-
public static AbstractDirtyDataConsumer consumer;
5959

60-
private static ThreadPoolExecutor dirtyDataConsumer;
60+
private AbstractDirtyDataConsumer consumer;
61+
private transient ThreadPoolExecutor dirtyDataConsumer;
62+
63+
private static final DirtyDataManager INSTANCE = new DirtyDataManager();
64+
6165
/**
6266
* 统计manager收集到的脏数据条数
6367
*/
@@ -75,30 +79,51 @@ public class DirtyDataManager implements Serializable {
7579
*/
7680
private double errorLimitRate;
7781

82+
private DirtyDataManager() {
83+
84+
}
85+
7886
/**
7987
* 通过参数生成manager实例,并同时将consumer实例化
8088
*/
81-
public static DirtyDataManager newInstance(Properties properties) {
89+
public static DirtyDataManager newInstance(Map<String, Object> properties) {
8290
try {
83-
DirtyDataManager manager = new DirtyDataManager();
84-
manager.blockingInterval = Long.parseLong(String.valueOf(properties.getOrDefault(DIRTY_BLOCK_STR, DEFAULT_BLOCKING_INTERVAL)));
85-
manager.errorLimitRate = Double.parseDouble(String.valueOf(properties.getOrDefault(DIRTY_LIMIT_RATE_STR, DEFAULT_ERROR_LIMIT_RATE)));
86-
consumer = DirtyConsumerFactory.getDirtyConsumer(
87-
properties.getProperty(PLUGIN_TYPE_STR, DEFAULT_TYPE)
88-
, properties.getProperty(PLUGIN_PATH_STR)
89-
, properties.getProperty(PLUGIN_LOAD_MODE_STR)
90-
);
91-
consumer.init(properties);
92-
consumer.setQueue(new LinkedBlockingQueue<>());
93-
dirtyDataConsumer = new ThreadPoolExecutor(MAX_POOL_SIZE_LIMIT, MAX_POOL_SIZE_LIMIT, 0, TimeUnit.MILLISECONDS,
94-
new LinkedBlockingQueue<>(MAX_TASK_QUEUE_SIZE), new DTThreadFactory("dirtyDataConsumer", true), new ThreadPoolExecutor.CallerRunsPolicy());
95-
dirtyDataConsumer.execute(consumer);
96-
return manager;
91+
INSTANCE.setBlockingInterval(Long.parseLong(
92+
String.valueOf(properties.getOrDefault(DIRTY_BLOCK_STR, DEFAULT_BLOCKING_INTERVAL))));
93+
INSTANCE.setErrorLimitRate(Double.parseDouble(
94+
String.valueOf(properties.getOrDefault(DIRTY_LIMIT_RATE_STR, DEFAULT_ERROR_LIMIT_RATE))));
95+
96+
INSTANCE.setConsumer(properties);
97+
return INSTANCE;
9798
} catch (Exception e) {
9899
throw new RuntimeException("create dirtyManager error!", e);
99100
}
100101
}
101102

103+
private void setConsumer(Map<String, Object> properties) throws Exception {
104+
consumer = DirtyConsumerFactory.getDirtyConsumer(
105+
String.valueOf(properties.getOrDefault(PLUGIN_TYPE_STR, DEFAULT_TYPE)),
106+
String.valueOf(properties.get(PLUGIN_PATH_STR)),
107+
String.valueOf(properties.get(PLUGIN_LOAD_MODE_STR))
108+
);
109+
consumer.init(properties);
110+
consumer.setQueue(new LinkedBlockingQueue<>());
111+
}
112+
113+
public void execute() {
114+
if (Objects.isNull(dirtyDataConsumer)) {
115+
dirtyDataConsumer = new ThreadPoolExecutor(
116+
MAX_POOL_SIZE_LIMIT,
117+
MAX_POOL_SIZE_LIMIT,
118+
0,
119+
TimeUnit.MILLISECONDS,
120+
new LinkedBlockingQueue<>(MAX_TASK_QUEUE_SIZE),
121+
new DTThreadFactory("dirtyDataConsumer", true),
122+
new ThreadPoolExecutor.CallerRunsPolicy());
123+
dirtyDataConsumer.execute(consumer);
124+
}
125+
}
126+
102127
/**
103128
* 设置脏数据插件默认配置
104129
*
@@ -113,7 +138,6 @@ public static String buildDefaultDirty() {
113138

114139
/**
115140
* 脏数据收集任务停止,任务停止之前,需要将队列中所有的数据清空
116-
* TODO consumer 关闭时仍有数据没有消费到,假如有500条数据,在结束时实际消费数量可能只有493
117141
*/
118142
public void close() {
119143
if (checkConsumer()) {
@@ -150,4 +174,28 @@ public void collectDirtyData(String dataInfo, String cause) {
150174
public boolean checkConsumer() {
151175
return consumer.isRunning();
152176
}
177+
178+
public AtomicLong getCount() {
179+
return count;
180+
}
181+
182+
public AtomicLong getErrorCount() {
183+
return errorCount;
184+
}
185+
186+
public long getBlockingInterval() {
187+
return blockingInterval;
188+
}
189+
190+
public void setBlockingInterval(long blockingInterval) {
191+
this.blockingInterval = blockingInterval;
192+
}
193+
194+
public double getErrorLimitRate() {
195+
return errorLimitRate;
196+
}
197+
198+
public void setErrorLimitRate(double errorLimitRate) {
199+
this.errorLimitRate = errorLimitRate;
200+
}
153201
}

core/src/main/java/com/dtstack/flink/sql/dirtyManager/manager/DirtyKeys.java

+3
Original file line numberDiff line numberDiff line change
@@ -37,4 +37,7 @@ public class DirtyKeys {
3737
public final static String PLUGIN_LOAD_MODE_STR = "pluginLoadMode";
3838

3939
public final static String PRINT_LIMIT_STR = "printLimit";
40+
41+
public final static String ENV_CLASSLOADER = "envClassLoader";
42+
public final static String TABLE_NAME = "tableName";
4043
}

core/src/main/java/com/dtstack/flink/sql/exec/ExecuteProcessHelper.java

+18-9
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import com.dtstack.flink.sql.classloader.ClassLoaderManager;
2222
import com.dtstack.flink.sql.dirtyManager.manager.DirtyDataManager;
23+
import com.dtstack.flink.sql.dirtyManager.manager.DirtyKeys;
2324
import com.dtstack.flink.sql.enums.ClusterMode;
2425
import com.dtstack.flink.sql.enums.ECacheType;
2526
import com.dtstack.flink.sql.enums.EPluginLoadMode;
@@ -87,9 +88,10 @@
8788
import java.util.stream.Stream;
8889

8990
/**
90-
* 任务执行时的流程方法
91+
* 任务执行时的流程方法
9192
* Date: 2020/2/17
9293
* Company: www.dtstack.com
94+
*
9395
* @author maqi
9496
*/
9597
public class ExecuteProcessHelper {
@@ -104,6 +106,7 @@ public class ExecuteProcessHelper {
104106

105107
public static FlinkPlanner flinkPlanner = new FlinkPlanner();
106108

109+
@SuppressWarnings("unchecked")
107110
public static ParamsInfo parseParams(String[] args) throws Exception {
108111
LOG.info("------------program params-------------------------");
109112
Arrays.stream(args).forEach(arg -> LOG.info("{}", arg));
@@ -124,15 +127,15 @@ public static ParamsInfo parseParams(String[] args) throws Exception {
124127
"Non-local mode or shipfile deployment mode, remoteSqlPluginPath is required");
125128
String confProp = URLDecoder.decode(options.getConfProp(), Charsets.UTF_8.toString());
126129
Properties confProperties = PluginUtil.jsonStrToObject(confProp, Properties.class);
127-
Properties dirtyProperties = PluginUtil.jsonStrToObject(Objects.isNull(dirtyStr) ?
128-
DirtyDataManager.buildDefaultDirty() : dirtyStr, Properties.class);
130+
Map<String, Object> dirtyProperties = (Map<String, Object>) PluginUtil.jsonStrToObject(Objects.isNull(dirtyStr) ?
131+
DirtyDataManager.buildDefaultDirty() : dirtyStr, Map.class);
129132

130-
if (Objects.isNull(dirtyProperties.getProperty(PLUGIN_LOAD_STR))) {
133+
if (Objects.isNull(dirtyProperties.get(PLUGIN_LOAD_STR))) {
131134
dirtyProperties.put(PLUGIN_LOAD_STR, pluginLoadMode);
132135
}
133136

134-
if (!pluginLoadMode.equalsIgnoreCase(EPluginLoadMode.LOCALTEST.name()) && Objects.isNull(dirtyProperties.getProperty(PLUGIN_PATH_STR))) {
135-
dirtyProperties.setProperty(PLUGIN_PATH_STR,
137+
if (!pluginLoadMode.equalsIgnoreCase(EPluginLoadMode.LOCALTEST.name()) && Objects.isNull(dirtyProperties.get(PLUGIN_PATH_STR))) {
138+
dirtyProperties.put(PLUGIN_PATH_STR,
136139
Objects.isNull(remoteSqlPluginPath) ? localSqlPluginPath : remoteSqlPluginPath);
137140
}
138141

@@ -210,7 +213,7 @@ public static StreamExecutionEnvironment getStreamExecution(ParamsInfo paramsInf
210213
return env;
211214
}
212215

213-
216+
@SuppressWarnings("unchecked")
214217
public static List<URL> getExternalJarUrls(String addJarListStr) throws java.io.IOException {
215218
List<URL> jarUrlList = Lists.newArrayList();
216219
if (Strings.isNullOrEmpty(addJarListStr)) {
@@ -322,7 +325,7 @@ public static Set<URL> registerTable(
322325
, String localSqlPluginPath
323326
, String remoteSqlPluginPath
324327
, String pluginLoadMode
325-
, Properties dirtyProperties
328+
, Map<String, Object> dirtyProperties
326329
, Map<String, AbstractSideTableInfo> sideTableMap
327330
, Map<String, Table> registerTableCache
328331
) throws Exception {
@@ -389,11 +392,17 @@ public static Set<URL> registerTable(
389392
if (localSqlPluginPath == null || localSqlPluginPath.isEmpty()) {
390393
return Sets.newHashSet();
391394
}
395+
pluginClassPathSets.add(PluginUtil.buildDirtyPluginUrl(
396+
String.valueOf(dirtyProperties.get(DirtyKeys.PLUGIN_TYPE_STR)),
397+
String.valueOf(dirtyProperties.get(DirtyKeys.PLUGIN_PATH_STR)),
398+
String.valueOf(dirtyProperties.get(DirtyKeys.PLUGIN_LOAD_MODE_STR))
399+
));
392400
return pluginClassPathSets;
393401
}
394402

395403
/**
396-
* perjob模式将job依赖的插件包路径存储到cacheFile,在外围将插件包路径传递给jobgraph
404+
* perjob模式将job依赖的插件包路径存储到cacheFile,在外围将插件包路径传递给jobgraph
405+
*
397406
* @param env
398407
* @param classPathSet
399408
*/

core/src/main/java/com/dtstack/flink/sql/exec/ParamsInfo.java

+7-5
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,14 @@
2121

2222
import java.net.URL;
2323
import java.util.List;
24+
import java.util.Map;
2425
import java.util.Properties;
2526

2627
/**
2728
* 解析传递的参数信息
2829
* Date: 2020/2/24
2930
* Company: www.dtstack.com
31+
*
3032
* @author maqi
3133
*/
3234
public class ParamsInfo {
@@ -40,7 +42,7 @@ public class ParamsInfo {
4042
private String deployMode;
4143
private Properties confProp;
4244
private boolean getPlan = false;
43-
private Properties dirtyProperties;
45+
private Map<String, Object> dirtyProperties;
4446

4547
public ParamsInfo(
4648
String sql
@@ -50,7 +52,7 @@ public ParamsInfo(
5052
, String pluginLoadMode
5153
, String deployMode
5254
, Properties confProp
53-
, Properties dirtyProperties) {
55+
, Map<String, Object> dirtyProperties) {
5456
this.sql = sql;
5557
this.name = name;
5658
this.jarUrlList = jarUrlList;
@@ -102,7 +104,7 @@ public Properties getConfProp() {
102104
return confProp;
103105
}
104106

105-
public Properties getDirtyProperties() {
107+
public Map<String, Object> getDirtyProperties() {
106108
return dirtyProperties;
107109
}
108110

@@ -140,7 +142,7 @@ public static class Builder {
140142
private String pluginLoadMode;
141143
private String deployMode;
142144
private Properties confProp;
143-
private Properties dirtyProperties;
145+
private Map<String, Object> dirtyProperties;
144146

145147
public ParamsInfo.Builder setSql(String sql) {
146148
this.sql = sql;
@@ -183,7 +185,7 @@ public ParamsInfo.Builder setConfProp(Properties confProp) {
183185
return this;
184186
}
185187

186-
public ParamsInfo.Builder setDirtyProperties(Properties dirtyProperties) {
188+
public ParamsInfo.Builder setDirtyProperties(Map<String, Object> dirtyProperties) {
187189
this.dirtyProperties = dirtyProperties;
188190
return this;
189191
}

core/src/main/java/com/dtstack/flink/sql/format/DeserializationMetricWrapper.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ public class DeserializationMetricWrapper extends AbstractDeserializationSchema<
7171

7272
protected transient Meter numInBytesRate;
7373

74-
protected DirtyDataManager dirtyDataManager;
74+
protected DirtyDataManager dirtyDataManager;
7575

7676
public DeserializationMetricWrapper(
7777
TypeInformation<Row> typeInfo
@@ -110,6 +110,7 @@ public Row deserialize(byte[] message) throws IOException {
110110
return row;
111111
} catch (Exception e) {
112112
//add metric of dirty data
113+
dirtyDataManager.execute();
113114
dirtyDataManager.collectDirtyData(new String(message), e.getMessage());
114115
dirtyDataCounter.inc();
115116
return null;

0 commit comments

Comments
 (0)