Skip to content

Commit db2b77f

Browse files
committed
[Optimize-4127][gateway] The user-defined flink conf path overrides the flink conf path parameter
1 parent 31e6c2c commit db2b77f

File tree

3 files changed

+13
-18
lines changed

3 files changed

+13
-18
lines changed

Diff for: dinky-common/src/main/java/org/dinky/data/enums/Status.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -444,8 +444,8 @@ public enum Status {
444444
/**
445445
* gateway config
446446
*/
447-
GAETWAY_KUBERNETS_TEST_FAILED(180, "gateway.kubernetes.test.failed"),
448-
GAETWAY_KUBERNETS_TEST_SUCCESS(181, "gateway.kubernetes.test.success"),
447+
GATEWAY_KUBERNETES_TEST_FAILED(180, "gateway.kubernetes.test.failed"),
448+
GATEWAY_KUBERNETES_TEST_SUCCESS(181, "gateway.kubernetes.test.success"),
449449

450450
/**
451451
* process

Diff for: dinky-core/src/main/java/org/dinky/job/JobConfig.java

+3-14
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import org.dinky.gateway.enums.SavePointStrategy;
3030
import org.dinky.gateway.model.FlinkClusterConfig;
3131

32-
import org.apache.commons.lang3.StringUtils;
3332
import org.apache.flink.configuration.Configuration;
3433
import org.apache.flink.configuration.CoreOptions;
3534
import org.apache.flink.configuration.RestOptions;
@@ -43,14 +42,12 @@
4342
import lombok.AllArgsConstructor;
4443
import lombok.Builder;
4544
import lombok.Data;
46-
import lombok.extern.slf4j.Slf4j;
4745

4846
/**
4947
* JobConfig
5048
*
5149
* @since 2021/6/27 18:45
5250
*/
53-
@Slf4j
5451
@Data
5552
@Builder
5653
@AllArgsConstructor
@@ -261,17 +258,9 @@ public void buildGatewayConfig(FlinkClusterConfig config) {
261258
flinkConfig.getConfiguration().put(customConfig.getName(), customConfig.getValue());
262259
}
263260

264-
Map<String, String> configuration = flinkConfig.getConfiguration();
265-
266-
// In Kubernetes mode, must set jobmanager.memory.process.size.
267-
if (StringUtils.isBlank(configuration.get("jobmanager.memory.process.size"))) {
268-
log.warn("In Kubernetes mode, please configure 'jobmanager.memory.process.size', default 2048m");
269-
configuration.put("jobmanager.memory.process.size", "2048m");
270-
}
271-
272-
// Load job configuration content afterwards
273-
configuration.putAll(getConfigJson());
274-
configuration.put(CoreOptions.DEFAULT_PARALLELISM.key(), String.valueOf(parallelism));
261+
// Load job configuration content afterwords
262+
flinkConfig.getConfiguration().putAll(getConfigJson());
263+
flinkConfig.getConfiguration().put(CoreOptions.DEFAULT_PARALLELISM.key(), String.valueOf(parallelism));
275264
flinkConfig.setJobName(getJobName());
276265

277266
gatewayConfig = GatewayConfig.build(config);

Diff for: dinky-gateway/src/main/java/org/dinky/gateway/kubernetes/KubernetesGateway.java

+8-2
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,11 @@
3131
import org.dinky.gateway.result.TestResult;
3232
import org.dinky.utils.TextUtil;
3333

34+
import org.apache.commons.lang3.StringUtils;
3435
import org.apache.flink.configuration.ConfigOption;
3536
import org.apache.flink.configuration.CoreOptions;
3637
import org.apache.flink.configuration.DeploymentOptions;
38+
import org.apache.flink.configuration.DeploymentOptionsInternal;
3739
import org.apache.flink.configuration.GlobalConfiguration;
3840
import org.apache.flink.configuration.PipelineOptions;
3941
import org.apache.flink.kubernetes.KubernetesClusterClientFactory;
@@ -86,6 +88,10 @@ public void init() {
8688

8789
protected void initConfig() {
8890
flinkConfigPath = config.getClusterConfig().getFlinkConfigPath();
91+
// The user-defined flink conf path overrides the flink conf path parameter.
92+
if (StringUtils.isNotBlank(flinkConfigPath)) {
93+
addConfigParas(DeploymentOptionsInternal.CONF_DIR, flinkConfigPath);
94+
}
8995
flinkConfig = config.getFlinkConfig();
9096
String jobName = flinkConfig.getJobName();
9197
if (TextUtil.isEmpty(jobName)) {
@@ -222,9 +228,9 @@ public TestResult test() {
222228
}
223229
return TestResult.success();
224230
} catch (Exception e) {
225-
logger.error(Status.GAETWAY_KUBERNETS_TEST_FAILED.getMessage(), e);
231+
logger.error(Status.GATEWAY_KUBERNETES_TEST_FAILED.getMessage(), e);
226232
return TestResult.fail(
227-
StrFormatter.format("{}:{}", Status.GAETWAY_KUBERNETS_TEST_FAILED.getMessage(), e.getMessage()));
233+
StrFormatter.format("{}:{}", Status.GATEWAY_KUBERNETES_TEST_FAILED.getMessage(), e.getMessage()));
228234
} finally {
229235
close();
230236
}

0 commit comments

Comments
 (0)