Skip to content

Commit e4d71f0

Browse files
committed
Merge remote-tracking branch 'origin/1.10_release_4.1.x' into hotfix_1.10_4.1.x_35378
# Conflicts: # launcher/src/main/java/com/dtstack/flink/sql/launcher/LauncherMain.java
2 parents 88d1230 + bd722f1 commit e4d71f0

File tree

47 files changed

+1530
-319
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+1530
-319
lines changed

.gitignore

+3-1
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,6 @@ bin/nohup.out
1616
.DS_Store
1717
bin/sideSql.txt
1818
*.keytab
19-
krb5.conf
19+
krb5.conf
20+
.gradle
21+
gradle

core/pom.xml

+1
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,7 @@
176176
<include name="${project.artifactId}-${project.version}.jar" />
177177
</fileset>
178178
</copy>
179+
<!--suppress UnresolvedMavenProperty -->
179180
<move file="${basedir}/../sqlplugins/${project.artifactId}-${project.version}.jar"
180181
tofile="${basedir}/../sqlplugins/${project.name}-${git.branch}.jar" />
181182
</tasks>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.dirtyManager.consumer;
20+
21+
import com.dtstack.flink.sql.dirtyManager.entity.DirtyDataEntity;
22+
import org.slf4j.Logger;
23+
import org.slf4j.LoggerFactory;
24+
25+
import java.io.Serializable;
26+
import java.util.Map;
27+
import java.util.concurrent.LinkedBlockingQueue;
28+
import java.util.concurrent.TimeUnit;
29+
import java.util.concurrent.atomic.AtomicBoolean;
30+
import java.util.concurrent.atomic.AtomicLong;
31+
32+
/**
33+
* @author tiezhu
34+
* Company dtstack
35+
* Date 2020/8/27 星期四
36+
*/
37+
public abstract class AbstractDirtyDataConsumer implements Runnable, Serializable {
38+
protected static final long serialVersionUID = -6058598201315176687L;
39+
40+
protected static final Logger LOG = LoggerFactory.getLogger(AbstractDirtyDataConsumer.class);
41+
42+
protected Long errorLimit = 1000L;
43+
protected AtomicLong errorCount = new AtomicLong(0L);
44+
45+
protected AtomicLong count = new AtomicLong(0L);
46+
47+
public AtomicBoolean isRunning = new AtomicBoolean(true);
48+
49+
protected LinkedBlockingQueue<DirtyDataEntity> queue;
50+
51+
/**
52+
* 消费队列数据
53+
*
54+
* @throws Exception throw exception
55+
*/
56+
public abstract void consume() throws Exception;
57+
58+
/**
59+
* 关闭消费者,需要释放资源
60+
*/
61+
public abstract void close();
62+
63+
/**
64+
* 初始化消费者,初始化定时任务
65+
*
66+
* @param properties 任务参数
67+
* @throws Exception throw exception
68+
*/
69+
public abstract void init(Map<String, Object> properties) throws Exception;
70+
71+
/**
72+
* 检验consumer是否正在执行
73+
*/
74+
public boolean isRunning() {
75+
return isRunning.get();
76+
}
77+
78+
@Override
79+
public void run() {
80+
try {
81+
while (isRunning.get()) {
82+
consume();
83+
}
84+
} catch (Exception e) {
85+
LOG.error("consume dirtyData error", e);
86+
if (errorCount.getAndIncrement() > errorLimit) {
87+
throw new RuntimeException("The task failed due to the number of dirty data consume failed reached the limit " + errorLimit);
88+
}
89+
}
90+
}
91+
92+
public AbstractDirtyDataConsumer setQueue(LinkedBlockingQueue<DirtyDataEntity> queue) {
93+
this.queue = queue;
94+
return this;
95+
}
96+
97+
public void collectDirtyData(DirtyDataEntity dataEntity, long blockingInterval) throws InterruptedException {
98+
queue.offer(dataEntity, blockingInterval, TimeUnit.MILLISECONDS);
99+
}
100+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.dirtyManager.consumer;
20+
21+
import com.dtstack.flink.sql.classloader.ClassLoaderManager;
22+
import com.dtstack.flink.sql.util.PluginUtil;
23+
24+
import java.io.File;
25+
import java.lang.reflect.Constructor;
26+
import java.util.Objects;
27+
28+
import static com.dtstack.flink.sql.util.PluginUtil.upperCaseFirstChar;
29+
30+
/**
31+
* @author tiezhu
32+
* Company dtstack
33+
* Date 2020/12/21 星期一
34+
*/
35+
public class DirtyConsumerFactory {
36+
public static final String DEFAULT_DIRTY_TYPE = "console";
37+
public static final String DIRTY_CONSUMER_PATH = "dirtyData";
38+
public static final String CLASS_PRE_STR = "com.dtstack.flink.sql.dirty";
39+
public static final String CLASS_POST_STR = "DirtyDataConsumer";
40+
41+
/**
42+
* 通过动态方式去加载脏数据插件
43+
*
44+
* @param dirtyType 脏数据插件类型
45+
* @param pluginPath 脏数据插件直地址
46+
* @param pluginLoadMode 插件加载方式
47+
* @return 脏数据消费者
48+
* @throws Exception exception
49+
*/
50+
public static AbstractDirtyDataConsumer getDirtyConsumer(
51+
String dirtyType
52+
, String pluginPath
53+
, String pluginLoadMode) throws Exception {
54+
if (Objects.isNull(dirtyType)) {
55+
dirtyType = DEFAULT_DIRTY_TYPE;
56+
}
57+
String consumerType = DIRTY_CONSUMER_PATH + File.separator + dirtyType;
58+
String consumerJar = PluginUtil.getJarFileDirPath(consumerType, pluginPath, pluginLoadMode);
59+
String className = CLASS_PRE_STR + "." + dirtyType.toLowerCase() + "." + upperCaseFirstChar(dirtyType + CLASS_POST_STR);
60+
return ClassLoaderManager.newInstance(consumerJar, cl -> {
61+
Class<?> clazz = cl.loadClass(className);
62+
Constructor<?> constructor = clazz.getConstructor();
63+
return (AbstractDirtyDataConsumer) constructor.newInstance();
64+
});
65+
}
66+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.dirtyManager.entity;
20+
21+
import java.sql.Date;
22+
import java.text.SimpleDateFormat;
23+
24+
/**
25+
* @author tiezhu
26+
* Company dtstack
27+
* Date 2020/8/27 星期四
28+
*/
29+
public class DirtyDataEntity {
30+
private final SimpleDateFormat timeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
31+
/**
32+
* 脏数据信息内容
33+
*/
34+
private String dirtyData;
35+
36+
/**
37+
* 脏数据处理时间
38+
*/
39+
private String processDate;
40+
41+
/**
42+
* 产生脏数据的原因
43+
*/
44+
private String cause;
45+
46+
/**
47+
* 产生异常的字段
48+
*/
49+
private String field;
50+
51+
public String getDirtyData() {
52+
return dirtyData;
53+
}
54+
55+
public void setDirtyData(String dirtyData) {
56+
this.dirtyData = dirtyData;
57+
}
58+
59+
public String getProcessDate() {
60+
return processDate;
61+
}
62+
63+
public void setProcessDate(String processDate) {
64+
this.processDate = processDate;
65+
}
66+
67+
public String getCause() {
68+
return cause;
69+
}
70+
71+
public void setCause(String cause) {
72+
this.cause = cause;
73+
}
74+
75+
public String getField() {
76+
return field;
77+
}
78+
79+
public void setField(String field) {
80+
this.field = field;
81+
}
82+
83+
public DirtyDataEntity(String dirtyData, Long processDate, String cause, String field) {
84+
this.dirtyData = dirtyData;
85+
this.processDate = timeFormat.format(processDate);
86+
this.cause = cause;
87+
this.field = field;
88+
}
89+
90+
public DirtyDataEntity(String dirtyData, Long processDate, String cause) {
91+
this.dirtyData = dirtyData;
92+
this.processDate = timeFormat.format(processDate);
93+
this.cause = cause;
94+
}
95+
96+
@Override
97+
public String toString() {
98+
return "DirtyDataEntity{" +
99+
"dirtyData='" + dirtyData + '\'' +
100+
", processDate=" + processDate +
101+
", cause='" + cause + '\'' +
102+
'}';
103+
}
104+
105+
/**
106+
* 获取脏数据信息,返回字符数组
107+
* @return 脏数据信息字符数组
108+
*/
109+
public String[] get() {
110+
return new String[] {dirtyData, String.valueOf(processDate), cause};
111+
}
112+
}

0 commit comments

Comments
 (0)