Skip to content

Commit 6c7b195

Browse files
authored
[FLINK-19295][yarn][tests] Exclude more meaningless akka shutdown errors (apache#13439)
1 parent 713d02e commit 6c7b195

File tree

2 files changed

+67
-19
lines changed

2 files changed

+67
-19
lines changed

flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java

Lines changed: 22 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@
9090
import java.util.UUID;
9191
import java.util.concurrent.ConcurrentMap;
9292
import java.util.function.Supplier;
93+
import java.util.regex.Matcher;
9394
import java.util.regex.Pattern;
9495
import java.util.stream.Collectors;
9596

@@ -123,25 +124,26 @@ public abstract class YarnTestBase extends TestLogger {
123124
};
124125

125126
/** These strings are white-listed, overriding the prohibited strings. */
126-
protected static final String[] WHITELISTED_STRINGS = {
127-
"akka.remote.RemoteTransportExceptionNoStackTrace",
127+
protected static final Pattern[] WHITELISTED_STRINGS = {
128+
Pattern.compile("akka\\.remote\\.RemoteTransportExceptionNoStackTrace"),
128129
// workaround for annoying InterruptedException logging:
129130
// https://issues.apache.org/jira/browse/YARN-1022
130-
"java.lang.InterruptedException",
131-
// very specific on purpose
132-
"Remote connection to [null] failed with java.net.ConnectException: Connection refused",
133-
"Remote connection to [null] failed with java.nio.channels.NotYetConnectedException",
134-
"java.io.IOException: Connection reset by peer",
131+
Pattern.compile("java\\.lang\\.InterruptedException"),
132+
// very specific on purpose; whitelist meaningless exceptions that occur during akka shutdown:
133+
Pattern.compile("Remote connection to \\[null\\] failed with java.net.ConnectException: Connection refused"),
134+
Pattern.compile("Remote connection to \\[null\\] failed with java.nio.channels.NotYetConnectedException"),
135+
Pattern.compile("java\\.io\\.IOException: Connection reset by peer"),
136+
Pattern.compile("Association with remote system \\[akka.tcp://flink@[^]]+\\] has failed, address is now gated for \\[50\\] ms. Reason: \\[Association failed with \\[akka.tcp://flink@[^]]+\\]\\] Caused by: \\[java.net.ConnectException: Connection refused: [^]]+\\]"),
135137

136138
// filter out expected ResourceManagerException caused by intended shutdown request
137-
YarnResourceManager.ERROR_MASSAGE_ON_SHUTDOWN_REQUEST,
139+
Pattern.compile(YarnResourceManager.ERROR_MASSAGE_ON_SHUTDOWN_REQUEST),
138140

139141
// this can happen in Akka 2.4 on shutdown.
140-
"java.util.concurrent.RejectedExecutionException: Worker has already been shutdown",
142+
Pattern.compile("java\\.util\\.concurrent\\.RejectedExecutionException: Worker has already been shutdown"),
141143

142-
"org.apache.flink.util.FlinkException: Stopping JobMaster",
143-
"org.apache.flink.util.FlinkException: JobManager is shutting down.",
144-
"lost the leadership."
144+
Pattern.compile("org\\.apache\\.flink.util\\.FlinkException: Stopping JobMaster"),
145+
Pattern.compile("org\\.apache\\.flink.util\\.FlinkException: JobManager is shutting down\\."),
146+
Pattern.compile("lost the leadership.")
145147
};
146148

147149
// Temp directory which is deleted after the unit test.
@@ -390,7 +392,7 @@ private static void writeHDFSSiteConfigXML(Configuration coreSite, File targetFo
390392
* So always run "mvn clean" before running the tests here.
391393
*
392394
*/
393-
public static void ensureNoProhibitedStringInLogFiles(final String[] prohibited, final String[] whitelisted) {
395+
public static void ensureNoProhibitedStringInLogFiles(final String[] prohibited, final Pattern[] whitelisted) {
394396
File cwd = new File("target/" + YARN_CONFIGURATION.get(TEST_CLUSTER_NAME_KEY));
395397
Assert.assertTrue("Expecting directory " + cwd.getAbsolutePath() + " to exist", cwd.exists());
396398
Assert.assertTrue("Expecting directory " + cwd.getAbsolutePath() + " to be a directory", cwd.isDirectory());
@@ -400,17 +402,18 @@ public static void ensureNoProhibitedStringInLogFiles(final String[] prohibited,
400402
@Override
401403
public boolean accept(File dir, String name) {
402404
// scan each file for prohibited strings.
403-
File f = new File(dir.getAbsolutePath() + "/" + name);
405+
File logFile = new File(dir.getAbsolutePath() + "/" + name);
404406
try {
405-
BufferingScanner scanner = new BufferingScanner(new Scanner(f), 10);
407+
BufferingScanner scanner = new BufferingScanner(new Scanner(logFile), 10);
406408
while (scanner.hasNextLine()) {
407409
final String lineFromFile = scanner.nextLine();
408410
for (String aProhibited : prohibited) {
409411
if (lineFromFile.contains(aProhibited)) {
410412

411413
boolean whitelistedFound = false;
412-
for (String white : whitelisted) {
413-
if (lineFromFile.contains(white)) {
414+
for (Pattern whitelistPattern : whitelisted) {
415+
Matcher whitelistMatch = whitelistPattern.matcher(lineFromFile);
416+
if (whitelistMatch.find()) {
414417
whitelistedFound = true;
415418
break;
416419
}
@@ -419,7 +422,7 @@ public boolean accept(File dir, String name) {
419422
if (!whitelistedFound) {
420423
// logging in FATAL to see the actual message in CI tests.
421424
Marker fatal = MarkerFactory.getMarker("FATAL");
422-
LOG.error(fatal, "Prohibited String '{}' in '{}:{}'", aProhibited, f.getAbsolutePath(), lineFromFile);
425+
LOG.error(fatal, "Prohibited String '{}' in '{}:{}'", aProhibited, logFile.getAbsolutePath(), lineFromFile);
423426

424427
StringBuilder logExcerpt = new StringBuilder();
425428

@@ -456,7 +459,7 @@ public boolean accept(File dir, String name) {
456459

457460
}
458461
} catch (FileNotFoundException e) {
459-
LOG.warn("Unable to locate file: " + e.getMessage() + " file: " + f.getAbsolutePath());
462+
LOG.warn("Unable to locate file: " + e.getMessage() + " file: " + logFile.getAbsolutePath());
460463
}
461464

462465
return false;
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.yarn;
20+
21+
import org.junit.Assert;
22+
import org.junit.Test;
23+
24+
import java.util.regex.Pattern;
25+
26+
/**
27+
* Tests for {@link YarnTestBase}.
28+
*/
29+
public class YarnTestBaseTest {
30+
31+
@Test
32+
public void ensureWhitelistEntryMatches() {
33+
ensureWhitelistEntryMatch("465 java.lang.InterruptedException: sleep interrupted");
34+
ensureWhitelistEntryMatch("2020-09-19 22:06:19,458 WARN akka.remote.ReliableDeliverySupervisor [] - Association with remote system [akka.tcp://flink@e466f3e261f3:42352] has failed, address is now gated for [50] ms. Reason: [Association failed with [akka.tcp://flink@e466f3e261f3:42352]] Caused by: [java.net.ConnectException: Connection refused: e466f3e261f3/192.168.224.2:42352]");
35+
}
36+
37+
private void ensureWhitelistEntryMatch(String probe) {
38+
for (Pattern pattern : YarnTestBase.WHITELISTED_STRINGS) {
39+
if (pattern.matcher(probe).find()) {
40+
return;
41+
}
42+
}
43+
Assert.fail("The following string didn't match any whitelisted patterns '" + probe + "'");
44+
}
45+
}

0 commit comments

Comments
 (0)