Skip to content

Commit a9ec9d5

Browse files
[Feature][Core] Support record metrics in flink engine (#6035)
1 parent 456cd17 commit a9ec9d5

File tree

19 files changed

+1092
-20
lines changed

19 files changed

+1092
-20
lines changed

seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/DateTimeUtils.java

+16
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717

1818
package org.apache.seatunnel.common.utils;
1919

20+
import java.time.Instant;
2021
import java.time.LocalDateTime;
22+
import java.time.ZoneId;
2123
import java.time.format.DateTimeFormatter;
2224
import java.util.HashMap;
2325
import java.util.Map;
@@ -49,10 +51,24 @@ public static LocalDateTime parse(String dateTime, Formatter formatter) {
4951
return LocalDateTime.parse(dateTime, FORMATTER_MAP.get(formatter));
5052
}
5153

54+
public static LocalDateTime parse(long timestamp) {
55+
return parse(timestamp, ZoneId.systemDefault());
56+
}
57+
58+
public static LocalDateTime parse(long timestamp, ZoneId zoneId) {
59+
Instant instant = Instant.ofEpochMilli(timestamp);
60+
return LocalDateTime.ofInstant(instant, zoneId);
61+
}
62+
5263
public static String toString(LocalDateTime dateTime, Formatter formatter) {
5364
return dateTime.format(FORMATTER_MAP.get(formatter));
5465
}
5566

67+
public static String toString(long timestamp, Formatter formatter) {
68+
Instant instant = Instant.ofEpochMilli(timestamp);
69+
return toString(LocalDateTime.ofInstant(instant, ZoneId.systemDefault()), formatter);
70+
}
71+
5672
public enum Formatter {
5773
YYYY_MM_DD_HH_MM_SS("yyyy-MM-dd HH:mm:ss"),
5874
YYYY_MM_DD_HH_MM_SS_SPOT("yyyy.MM.dd HH:mm:ss"),
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.common.utils;
19+
20+
import org.apache.seatunnel.common.utils.DateTimeUtils.Formatter;
21+
22+
import org.junit.jupiter.api.Assertions;
23+
import org.junit.jupiter.api.Test;
24+
25+
import java.time.LocalDateTime;
26+
import java.time.ZoneId;
27+
28+
public class DateTimeUtilsTest {
29+
30+
@Test
31+
public void testParseDateString() {
32+
final String datetime = "2023-12-22 00:00:00";
33+
LocalDateTime parse = DateTimeUtils.parse(datetime, Formatter.YYYY_MM_DD_HH_MM_SS);
34+
Assertions.assertEquals(0, parse.getMinute());
35+
Assertions.assertEquals(0, parse.getHour());
36+
Assertions.assertEquals(0, parse.getSecond());
37+
Assertions.assertEquals(22, parse.getDayOfMonth());
38+
Assertions.assertEquals(12, parse.getMonth().getValue());
39+
Assertions.assertEquals(2023, parse.getYear());
40+
Assertions.assertEquals(22, parse.getDayOfMonth());
41+
}
42+
43+
@Test
44+
public void testParseTimestamp() {
45+
// 2023-12-22 12:55:20
46+
final long timestamp = 1703220920013L;
47+
LocalDateTime parse = DateTimeUtils.parse(timestamp, ZoneId.of("Asia/Shanghai"));
48+
49+
Assertions.assertEquals(55, parse.getMinute());
50+
Assertions.assertEquals(12, parse.getHour());
51+
Assertions.assertEquals(20, parse.getSecond());
52+
Assertions.assertEquals(22, parse.getDayOfMonth());
53+
Assertions.assertEquals(12, parse.getMonth().getValue());
54+
Assertions.assertEquals(2023, parse.getYear());
55+
Assertions.assertEquals(22, parse.getDayOfMonth());
56+
}
57+
}

seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java

+27-10
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,13 @@
3333
import org.apache.seatunnel.core.starter.execution.RuntimeEnvironment;
3434
import org.apache.seatunnel.core.starter.execution.TaskExecution;
3535
import org.apache.seatunnel.core.starter.flink.FlinkStarter;
36+
import org.apache.seatunnel.translation.flink.metric.FlinkJobMetricsSummary;
3637

38+
import org.apache.flink.api.common.JobExecutionResult;
3739
import org.apache.flink.api.common.RuntimeExecutionMode;
3840

39-
import lombok.extern.slf4j.Slf4j;
41+
import org.slf4j.Logger;
42+
import org.slf4j.LoggerFactory;
4043

4144
import java.io.File;
4245
import java.net.MalformedURLException;
@@ -51,8 +54,10 @@
5154
import java.util.stream.Stream;
5255

5356
/** Used to execute a SeaTunnelTask. */
54-
@Slf4j
5557
public class FlinkExecution implements TaskExecution {
58+
59+
private static final Logger LOGGER = LoggerFactory.getLogger(FlinkExecution.class);
60+
5661
private final FlinkRuntimeEnvironment flinkRuntimeEnvironment;
5762
private final PluginExecuteProcessor<DataStreamTableInfo, FlinkRuntimeEnvironment>
5863
sourcePluginExecuteProcessor;
@@ -109,20 +114,32 @@ public void execute() throws TaskExecuteException {
109114
dataStreams = sourcePluginExecuteProcessor.execute(dataStreams);
110115
dataStreams = transformPluginExecuteProcessor.execute(dataStreams);
111116
sinkPluginExecuteProcessor.execute(dataStreams);
112-
log.info(
117+
LOGGER.info(
113118
"Flink Execution Plan: {}",
114119
flinkRuntimeEnvironment.getStreamExecutionEnvironment().getExecutionPlan());
115-
log.info("Flink job name: {}", flinkRuntimeEnvironment.getJobName());
120+
LOGGER.info("Flink job name: {}", flinkRuntimeEnvironment.getJobName());
116121
if (!flinkRuntimeEnvironment.isStreaming()) {
117122
flinkRuntimeEnvironment
118123
.getStreamExecutionEnvironment()
119124
.setRuntimeMode(RuntimeExecutionMode.BATCH);
120-
log.info("Flink job Mode: {}", JobMode.BATCH);
125+
LOGGER.info("Flink job Mode: {}", JobMode.BATCH);
121126
}
122127
try {
123-
flinkRuntimeEnvironment
124-
.getStreamExecutionEnvironment()
125-
.execute(flinkRuntimeEnvironment.getJobName());
128+
final long jobStartTime = System.currentTimeMillis();
129+
JobExecutionResult jobResult =
130+
flinkRuntimeEnvironment
131+
.getStreamExecutionEnvironment()
132+
.execute(flinkRuntimeEnvironment.getJobName());
133+
final long jobEndTime = System.currentTimeMillis();
134+
135+
final FlinkJobMetricsSummary jobMetricsSummary =
136+
FlinkJobMetricsSummary.builder()
137+
.jobExecutionResult(jobResult)
138+
.jobStartTime(jobStartTime)
139+
.jobEndTime(jobEndTime)
140+
.build();
141+
142+
LOGGER.info("Job finished, execution result: \n{}", jobMetricsSummary);
126143
} catch (Exception e) {
127144
throw new TaskExecuteException("Execute Flink job error", e);
128145
}
@@ -170,9 +187,9 @@ private Config injectJarsToConfig(Config config, String path, List<URL> jars) {
170187
for (URL jarUrl : jars) {
171188
if (new File(jarUrl.getFile()).exists()) {
172189
validJars.add(jarUrl);
173-
log.info("Inject jar to config: {}", jarUrl);
190+
LOGGER.info("Inject jar to config: {}", jarUrl);
174191
} else {
175-
log.warn("Remove invalid jar when inject jars into config: {}", jarUrl);
192+
LOGGER.warn("Remove invalid jar when inject jars into config: {}", jarUrl);
176193
}
177194
}
178195

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.e2e.connector.fake;
19+
20+
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode;
21+
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode;
22+
23+
import org.apache.seatunnel.api.common.metrics.MetricNames;
24+
import org.apache.seatunnel.common.utils.JsonUtils;
25+
import org.apache.seatunnel.e2e.common.TestSuiteBase;
26+
import org.apache.seatunnel.e2e.common.container.EngineType;
27+
import org.apache.seatunnel.e2e.common.container.TestContainer;
28+
import org.apache.seatunnel.e2e.common.container.flink.Flink13Container;
29+
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
30+
31+
import org.apache.http.client.methods.CloseableHttpResponse;
32+
import org.apache.http.client.methods.HttpGet;
33+
import org.apache.http.impl.client.CloseableHttpClient;
34+
import org.apache.http.impl.client.HttpClients;
35+
import org.apache.http.util.EntityUtils;
36+
37+
import org.awaitility.Awaitility;
38+
import org.junit.jupiter.api.Assertions;
39+
import org.junit.jupiter.api.TestTemplate;
40+
import org.slf4j.Logger;
41+
import org.slf4j.LoggerFactory;
42+
import org.testcontainers.containers.Container;
43+
44+
import java.io.IOException;
45+
import java.net.URI;
46+
import java.util.HashMap;
47+
import java.util.Map;
48+
import java.util.concurrent.TimeUnit;
49+
50+
@DisabledOnContainer(
51+
value = {},
52+
type = {EngineType.SPARK, EngineType.SEATUNNEL})
53+
public class FlinkMetricsIT extends TestSuiteBase {
54+
55+
private static final Logger LOGGER = LoggerFactory.getLogger(FlinkMetricsIT.class);
56+
57+
@TestTemplate
58+
public void testFlinkMetrics(TestContainer container) throws IOException, InterruptedException {
59+
Container.ExecResult executeResult =
60+
container.executeJob("/fake_to_assert_verify_flink_metrics.conf");
61+
Assertions.assertEquals(0, executeResult.getExitCode());
62+
final String jobListUrl = "http://%s:8081/jobs/overview";
63+
final String jobDetailsUrl = "http://%s:8081/jobs/%s";
64+
final String jobAccumulatorUrl = "http://%s:8081/jobs/%s/vertices/%s/accumulators";
65+
final String jobManagerHost;
66+
String dockerHost = System.getenv("DOCKER_HOST");
67+
if (dockerHost == null) {
68+
jobManagerHost = "localhost";
69+
} else {
70+
URI uri = URI.create(dockerHost);
71+
jobManagerHost = uri.getHost();
72+
}
73+
// create http client
74+
CloseableHttpClient httpClient = HttpClients.createDefault();
75+
76+
// get job id
77+
HttpGet httpGet = new HttpGet(String.format(jobListUrl, jobManagerHost));
78+
CloseableHttpResponse response = httpClient.execute(httpGet);
79+
Assertions.assertEquals(response.getStatusLine().getStatusCode(), 200);
80+
String responseContent = EntityUtils.toString(response.getEntity());
81+
ObjectNode jsonNode = JsonUtils.parseObject(responseContent);
82+
String jobId = jsonNode.get("jobs").get(0).get("jid").asText();
83+
Assertions.assertNotNull(jobId);
84+
85+
// get job vertices
86+
httpGet = new HttpGet(String.format(jobDetailsUrl, jobManagerHost, jobId));
87+
response = httpClient.execute(httpGet);
88+
Assertions.assertEquals(response.getStatusLine().getStatusCode(), 200);
89+
90+
responseContent = EntityUtils.toString(response.getEntity());
91+
jsonNode = JsonUtils.parseObject(responseContent);
92+
String verticeId = jsonNode.get("vertices").get(0).get("id").asText();
93+
94+
Awaitility.given()
95+
.ignoreExceptions()
96+
.await()
97+
.atMost(10L, TimeUnit.SECONDS)
98+
.untilAsserted(
99+
() -> {
100+
HttpGet httpGetTemp =
101+
new HttpGet(
102+
String.format(
103+
jobAccumulatorUrl,
104+
jobManagerHost,
105+
jobId,
106+
verticeId));
107+
CloseableHttpResponse responseTemp = httpClient.execute(httpGetTemp);
108+
String responseContentTemp =
109+
EntityUtils.toString(responseTemp.getEntity());
110+
JsonNode jsonNodeTemp = JsonUtils.parseObject(responseContentTemp);
111+
JsonNode metrics = jsonNodeTemp.get("user-accumulators");
112+
int size = metrics.size();
113+
if (size <= 0) {
114+
throw new IllegalStateException(
115+
"Flink metrics not synchronized yet, next round");
116+
}
117+
});
118+
119+
// get metrics
120+
httpGet = new HttpGet(String.format(jobAccumulatorUrl, jobManagerHost, jobId, verticeId));
121+
response = httpClient.execute(httpGet);
122+
responseContent = EntityUtils.toString(response.getEntity());
123+
jsonNode = JsonUtils.parseObject(responseContent);
124+
JsonNode metrics = jsonNode.get("user-accumulators");
125+
126+
int size = metrics.size();
127+
128+
Assertions.assertTrue(size > 0);
129+
130+
Map<String, String> metricsMap = new HashMap<>();
131+
132+
for (JsonNode metric : metrics) {
133+
String name = metric.get("name").asText();
134+
String value = metric.get("value").asText();
135+
metricsMap.put(name, value);
136+
}
137+
138+
String sourceReceivedCount = metricsMap.get(MetricNames.SOURCE_RECEIVED_COUNT);
139+
String sourceReceivedBytes = metricsMap.get(MetricNames.SOURCE_RECEIVED_BYTES);
140+
141+
Assertions.assertEquals(5, Integer.valueOf(sourceReceivedCount));
142+
Assertions.assertEquals(2160, Integer.valueOf(sourceReceivedBytes));
143+
144+
// Due to limitations in Flink 13 version and code, the metrics on the writer side cannot be
145+
// aggregated into the global accumulator and can only be viewed in the operator based on
146+
// parallelism dimensions
147+
if (!(container instanceof Flink13Container)) {
148+
String sinkWriteCount = metricsMap.get(MetricNames.SINK_WRITE_COUNT);
149+
String sinkWriteBytes = metricsMap.get(MetricNames.SINK_WRITE_BYTES);
150+
Assertions.assertEquals(5, Integer.valueOf(sinkWriteCount));
151+
Assertions.assertEquals(2160, Integer.valueOf(sinkWriteBytes));
152+
}
153+
154+
httpClient.close();
155+
}
156+
}

0 commit comments

Comments
 (0)