Skip to content

Commit 88d1230

Browse files
committed
[fix-35378] optimize sonar scan result, update gitlab ci script
1 parent d688310 commit 88d1230

File tree

8 files changed

+113
-77
lines changed

8 files changed

+113
-77
lines changed

.gitlab-ci.yml

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,22 @@
1-
build:
1+
stages:
2+
- validate
3+
- test
4+
5+
test-job:
26
stage: test
37
script:
48
- mvn clean org.jacoco:jacoco-maven-plugin:0.7.8:prepare-agent package -Dmaven.test.failure.ignore=true -q
9+
only:
10+
- v1.10.0_dev
11+
tags:
12+
- dt-insight-engine
13+
14+
validate-job:
15+
stage: validate
16+
script:
517
- mvn sonar:sonar -Dsonar.projectKey="dt-insight-engine/flinkStreamSQL" -Dsonar.login=11974c5e9a29625efa09fdc3c3fdc031efb1aab1 -Dsonar.host.url=http://172.16.100.198:9000 -Dsonar.jdbc.url=jdbc:postgresql://172.16.100.198:5432/sonar -Dsonar.java.binaries=target/sonar
618
- sh ci/sonar_notify.sh
719
only:
8-
- 1.10_release_4.0.x
20+
- v1.10.0_dev
921
tags:
10-
- dt-insight-engine
22+
- dt-insight-engine

core/src/main/java/com/dtstack/flink/sql/side/JoinNodeDealer.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,7 @@ private JoinInfo dealNestJoin(SqlJoin joinNode,
300300

301301
SqlNode newLeftNode = joinNode.getLeft();
302302

303+
// 双流JOIN再维表JOIN情况
303304
if(newLeftNode.getKind() != AS && parentRightIsSide){
304305

305306
String leftTbAlias = buildAs.getOperands()[1].toString();
@@ -975,12 +976,10 @@ public Map<String, String> buildTmpTableFieldRefOriField(Set<String> fieldSet, S
975976
} else {
976977
fields = StringUtils.splitByWholeSeparator(field, " as ");
977978
}
978-
if (fields != null) {
979-
fields = Stream
980-
.of(fields)
981-
.map(StringUtils::trimToNull)
982-
.toArray(String[]::new);
983-
}
979+
fields = Stream
980+
.of(fields)
981+
.map(StringUtils::trimToNull)
982+
.toArray(String[]::new);
984983
String oldKey = field;
985984
String[] oldFieldInfo = StringUtils.splitByWholeSeparator(fields[0], ".");
986985
String oldFieldName = oldFieldInfo.length == 2 ? oldFieldInfo[1] : oldFieldInfo[0];

impala/impala-sink/src/main/java/com/dtstack/flink/sql/sink/impala/ImpalaOutputFormat.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -178,12 +178,11 @@ private void init() throws SQLException {
178178
if (!storeType.equalsIgnoreCase(KUDU_TYPE)) {
179179
throw new IllegalArgumentException("update mode not support for non-kudu table!");
180180
}
181-
182181
updateStatement = connection.prepareStatement(buildUpdateSql(schema, tableName, fieldNames, primaryKeys));
183-
return;
182+
} else {
183+
valueFieldNames = rebuildFieldNameListAndTypeList(fieldNames, staticPartitionFields, fieldTypes, partitionFields);
184184
}
185185

186-
valueFieldNames = rebuildFieldNameListAndTypeList(fieldNames, staticPartitionFields, fieldTypes, partitionFields);
187186
}
188187

189188
private void initScheduledTask(Long batchWaitInterval) {
@@ -331,7 +330,7 @@ private void putRowIntoMap(Map<String, ArrayList<String>> rowDataMap, Tuple2<Str
331330
}
332331

333332
private List<String> rebuildFieldNameListAndTypeList(List<String> fieldNames, List<String> staticPartitionFields, List<String> fieldTypes, String partitionFields) {
334-
if (partitionFields.isEmpty()) {
333+
if (partitionFields == null || partitionFields.isEmpty()) {
335334
return fieldNames;
336335
}
337336

launcher/src/main/java/com/dtstack/flink/sql/launcher/LauncherMain.java

Lines changed: 16 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
import com.dtstack.flink.sql.option.Options;
3232
import com.dtstack.flink.sql.util.PluginUtil;
3333
import org.apache.commons.io.Charsets;
34+
import org.slf4j.Logger;
35+
import org.slf4j.LoggerFactory;
3436

3537
import java.io.BufferedReader;
3638
import java.io.FileInputStream;
@@ -51,7 +53,7 @@
5153

5254
public class LauncherMain {
5355

54-
56+
private static final Logger LOG = LoggerFactory.getLogger(LauncherMain.class);
5557

5658
public static JobParamsInfo parseArgs(String[] args) throws Exception {
5759
if (args.length == 1 && args[0].endsWith(".json")) {
@@ -97,30 +99,24 @@ public static JobParamsInfo parseArgs(String[] args) throws Exception {
9799
}
98100

99101
private static String[] parseJson(String[] args) {
100-
BufferedReader reader = null;
101102
StringBuilder lastStr = new StringBuilder();
102103
try {
103-
FileInputStream fileInputStream = new FileInputStream(args[0]);
104-
InputStreamReader inputStreamReader = new InputStreamReader(fileInputStream, StandardCharsets.UTF_8);
105-
reader = new BufferedReader(inputStreamReader);
106-
String tempString;
107-
while ((tempString = reader.readLine()) != null) {
108-
lastStr.append(tempString);
109-
}
110-
reader.close();
111-
} catch (IOException e) {
112-
e.printStackTrace();
113-
} finally {
114-
if (reader != null) {
115-
try {
116-
reader.close();
117-
} catch (IOException e) {
118-
e.printStackTrace();
104+
try (FileInputStream fileInputStream = new FileInputStream(args[0])) {
105+
try (InputStreamReader inputStreamReader =
106+
new InputStreamReader(fileInputStream, StandardCharsets.UTF_8)) {
107+
try (BufferedReader reader = new BufferedReader(inputStreamReader)) {
108+
String tempString;
109+
while ((tempString = reader.readLine()) != null) {
110+
lastStr.append(tempString);
111+
}
112+
}
119113
}
120114
}
115+
} catch (IOException e) {
116+
LOG.error("", e);
121117
}
122-
Map<String, Object> map = JSON.parseObject(lastStr.toString(), new TypeReference<Map<String, Object>>() {
123-
});
118+
119+
Map<String, Object> map = JSON.parseObject(lastStr.toString(), new TypeReference<Map<String, Object>>() {});
124120
List<String> list = new LinkedList<>();
125121

126122
for (Map.Entry<String, Object> entry : map.entrySet()) {
@@ -130,7 +126,6 @@ private static String[] parseJson(String[] args) {
130126
return list.toArray(new String[0]);
131127
}
132128

133-
134129
public static void main(String[] args) throws Exception {
135130
JobParamsInfo jobParamsInfo = parseArgs(args);
136131
ClusterMode execMode = ClusterMode.valueOf(jobParamsInfo.getMode());

launcher/src/main/java/com/dtstack/flink/sql/launcher/executor/StandaloneExecutor.java

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package com.dtstack.flink.sql.launcher.executor;
2020

2121
import com.dtstack.flink.sql.enums.EPluginLoadMode;
22+
import com.dtstack.flink.sql.launcher.LauncherMain;
2223
import com.dtstack.flink.sql.launcher.entity.JobParamsInfo;
2324
import com.dtstack.flink.sql.launcher.factory.StandaloneClientFactory;
2425
import com.dtstack.flink.sql.launcher.utils.JobGraphBuildUtil;
@@ -32,13 +33,18 @@
3233
import org.apache.flink.configuration.Configuration;
3334
import org.apache.flink.runtime.jobgraph.JobGraph;
3435
import org.apache.flink.util.Preconditions;
36+
import org.slf4j.Logger;
37+
import org.slf4j.LoggerFactory;
3538

3639
/**
3740
* Date: 2020/3/6
3841
* Company: www.dtstack.com
3942
* @author maqi
4043
*/
4144
public class StandaloneExecutor {
45+
46+
private static final Logger LOG = LoggerFactory.getLogger(StandaloneExecutor.class);
47+
4248
JobParamsInfo jobParamsInfo;
4349

4450
public StandaloneExecutor(JobParamsInfo jobParamsInfo) {
@@ -60,15 +66,21 @@ public void exec() throws Exception {
6066
JobGraphBuildUtil.fillJobGraphClassPath(jobGraph);
6167

6268
ClusterDescriptor clusterDescriptor = StandaloneClientFactory.INSTANCE.createClusterDescriptor("", flinkConfiguration);
63-
ClusterClientProvider clusterClientProvider = clusterDescriptor.retrieve(StandaloneClusterId.getInstance());
64-
ClusterClient clusterClient = clusterClientProvider.getClusterClient();
6569

70+
try {
71+
ClusterClientProvider clusterClientProvider = clusterDescriptor.retrieve(StandaloneClusterId.getInstance());
72+
ClusterClient clusterClient = clusterClientProvider.getClusterClient();
6673

67-
JobExecutionResult jobExecutionResult = ClientUtils.submitJob(clusterClient, jobGraph);
68-
String jobId = jobExecutionResult.getJobID().toString();
69-
System.out.println("jobID:" + jobId);
74+
JobExecutionResult jobExecutionResult = ClientUtils.submitJob(clusterClient, jobGraph);
75+
String jobId = jobExecutionResult.getJobID().toString();
76+
LOG.info("jobID:" + jobId);
77+
} finally {
78+
try {
79+
clusterDescriptor.close();
80+
} catch (Exception e) {
81+
LOG.info("Could not properly close the yarn cluster descriptor.", e);
82+
}
83+
}
7084

7185
}
72-
73-
7486
}

launcher/src/main/java/com/dtstack/flink/sql/launcher/executor/YarnJobClusterExecutor.java

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -76,26 +76,33 @@ public void exec() throws Exception {
7676
YarnClusterDescriptor clusterDescriptor = (YarnClusterDescriptor) YarnClusterClientFactory.INSTANCE
7777
.createClusterDescriptor(jobParamsInfo.getYarnConfDir(), flinkConfiguration);
7878

79-
List<File> shipFiles = getShipFiles(jobParamsInfo.getFlinkJarPath(), jobParamsInfo.getPluginLoadMode(), jobGraph, clusterDescriptor);
80-
81-
if (jobParamsInfo.getAddShipFile() != null) {
82-
List<String> addShipFilesPath = parsePathFromStr(jobParamsInfo.getAddShipFile());
83-
for (String path : addShipFilesPath) {
84-
shipFiles.add(getAddShipFile(path));
79+
try {
80+
List<File> shipFiles = getShipFiles(jobParamsInfo.getFlinkJarPath(), jobParamsInfo.getPluginLoadMode(), jobGraph, clusterDescriptor);
81+
82+
if (jobParamsInfo.getAddShipFile() != null) {
83+
List<String> addShipFilesPath = parsePathFromStr(jobParamsInfo.getAddShipFile());
84+
for (String path : addShipFilesPath) {
85+
shipFiles.add(getAddShipFile(path));
86+
}
8587
}
86-
}
87-
88-
dumpSameKeytab(flinkConfiguration, shipFiles);
8988

90-
clusterDescriptor.addShipFiles(shipFiles);
89+
dumpSameKeytab(flinkConfiguration, shipFiles);
90+
clusterDescriptor.addShipFiles(shipFiles);
9191

92-
ClusterSpecification clusterSpecification = YarnClusterClientFactory.INSTANCE.getClusterSpecification(flinkConfiguration);
93-
ClusterClientProvider<ApplicationId> applicationIdClusterClientProvider = clusterDescriptor.deployJobCluster(clusterSpecification, jobGraph, true);
92+
ClusterSpecification clusterSpecification = YarnClusterClientFactory.INSTANCE.getClusterSpecification(flinkConfiguration);
93+
ClusterClientProvider<ApplicationId> applicationIdClusterClientProvider = clusterDescriptor.deployJobCluster(clusterSpecification, jobGraph, true);
9494

95-
String applicationId = applicationIdClusterClientProvider.getClusterClient().getClusterId().toString();
96-
String flinkJobId = jobGraph.getJobID().toString();
95+
String applicationId = applicationIdClusterClientProvider.getClusterClient().getClusterId().toString();
96+
String flinkJobId = jobGraph.getJobID().toString();
9797

98-
LOG.info(String.format("deploy per_job with appId: %s, jobId: %s", applicationId, flinkJobId));
98+
LOG.info(String.format("deploy per_job with appId: %s, jobId: %s", applicationId, flinkJobId));
99+
} finally {
100+
try {
101+
clusterDescriptor.close();
102+
} catch (Exception e) {
103+
LOG.info("Could not properly close the yarn cluster descriptor.", e);
104+
}
105+
}
99106
}
100107

101108
private void dumpSameKeytab(Configuration flinkConfiguration, List<File> shipFiles) {

launcher/src/main/java/com/dtstack/flink/sql/launcher/executor/YarnSessionClusterExecutor.java

Lines changed: 31 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333
import org.apache.flink.runtime.jobgraph.JobGraph;
3434
import org.apache.hadoop.yarn.api.records.ApplicationId;
3535
import org.apache.hadoop.yarn.util.ConverterUtils;
36+
import org.slf4j.Logger;
37+
import org.slf4j.LoggerFactory;
3638

3739

3840
/**
@@ -41,6 +43,9 @@
4143
* @author maqi
4244
*/
4345
public class YarnSessionClusterExecutor {
46+
47+
private static final Logger LOG = LoggerFactory.getLogger(YarnJobClusterExecutor.class);
48+
4449
JobParamsInfo jobParamsInfo;
4550

4651
public YarnSessionClusterExecutor(JobParamsInfo jobParamsInfo) {
@@ -52,28 +57,37 @@ public void exec() throws Exception {
5257
Configuration flinkConfiguration = JobGraphBuildUtil.getFlinkConfiguration(jobParamsInfo.getFlinkConfDir(), jobParamsInfo.getConfProperties());
5358
ClusterDescriptor clusterDescriptor = YarnClusterClientFactory.INSTANCE.createClusterDescriptor(jobParamsInfo.getYarnConfDir(), flinkConfiguration);
5459

55-
Object yid = jobParamsInfo.getYarnSessionConfProperties().get("yid");
56-
if (null == yid) {
57-
throw new RuntimeException("yarnSessionMode yid is required");
58-
}
60+
try {
61+
Object yid = jobParamsInfo.getYarnSessionConfProperties().get("yid");
62+
if (null == yid) {
63+
throw new RuntimeException("yarnSessionMode yid is required");
64+
}
5965

60-
ApplicationId applicationId = ConverterUtils.toApplicationId(yid.toString());
61-
ClusterClientProvider<ApplicationId> retrieve = clusterDescriptor.retrieve(applicationId);
62-
ClusterClient<ApplicationId> clusterClient = retrieve.getClusterClient();
66+
ApplicationId applicationId = ConverterUtils.toApplicationId(yid.toString());
67+
ClusterClientProvider<ApplicationId> retrieve = clusterDescriptor.retrieve(applicationId);
68+
ClusterClient<ApplicationId> clusterClient = retrieve.getClusterClient();
6369

64-
if (StringUtils.equalsIgnoreCase(jobParamsInfo.getPluginLoadMode(), EPluginLoadMode.SHIPFILE.name())) {
65-
jobGraph.getUserArtifacts().clear();
66-
} else {
67-
JobGraphBuildUtil.fillJobGraphClassPath(jobGraph);
68-
}
70+
if (StringUtils.equalsIgnoreCase(jobParamsInfo.getPluginLoadMode(), EPluginLoadMode.SHIPFILE.name())) {
71+
jobGraph.getUserArtifacts().clear();
72+
} else {
73+
JobGraphBuildUtil.fillJobGraphClassPath(jobGraph);
74+
}
75+
76+
if (!StringUtils.isEmpty(jobParamsInfo.getUdfJar())) {
77+
JobGraphBuildUtil.fillUserJarForJobGraph(jobParamsInfo.getUdfJar(), jobGraph);
78+
}
6979

70-
if (!StringUtils.isEmpty(jobParamsInfo.getUdfJar())) {
71-
JobGraphBuildUtil.fillUserJarForJobGraph(jobParamsInfo.getUdfJar(), jobGraph);
80+
JobExecutionResult jobExecutionResult = ClientUtils.submitJob(clusterClient, jobGraph);
81+
String jobId = jobExecutionResult.getJobID().toString();
82+
LOG.info("jobID:" + jobId);
83+
} finally {
84+
try {
85+
clusterDescriptor.close();
86+
} catch (Exception e) {
87+
LOG.info("Could not properly close the yarn cluster descriptor.", e);
88+
}
7289
}
7390

74-
JobExecutionResult jobExecutionResult = ClientUtils.submitJob(clusterClient, jobGraph);
75-
String jobId = jobExecutionResult.getJobID().toString();
76-
System.out.println("jobID:" + jobId);
7791

7892
}
7993

rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/async/RdbAsyncReqRow.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -407,10 +407,8 @@ private void handleQuery(SQLConnection connection, Map<String, Object> inputPara
407407
// and close the connection
408408
connection.close(done -> {
409409
if (done.failed()) {
410-
throw new SuppressRestartsException(
411-
new Throwable(
412-
ExceptionTrace.traceOriginalCause(done.cause())
413-
)
410+
LOG.error("sql connection close failed! " +
411+
ExceptionTrace.traceOriginalCause(done.cause())
414412
);
415413
}
416414
});

0 commit comments

Comments
 (0)