Skip to content

Commit 63b9b3c

Browse files
committed
[FLINK-2967] Make delay for local host connection strategy configurable
1 parent 81f8c05 commit 63b9b3c

File tree

9 files changed

+59
-23
lines changed

9 files changed

+59
-23
lines changed

flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamUtils.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,17 +36,18 @@ public final class DataStreamUtils {
3636
* @return The iterator
3737
*/
3838
public static <OUT> Iterator<OUT> collect(DataStream<OUT> stream) {
39-
TypeSerializer serializer = stream.getType().createSerializer(stream.getExecutionEnvironment().getConfig());
40-
DataStreamIterator<OUT> it = new DataStreamIterator<OUT>(serializer);
39+
TypeSerializer<OUT> serializer = stream.getType().createSerializer(stream.getExecutionEnvironment().getConfig());
40+
DataStreamIterator<OUT> it = new DataStreamIterator<>(serializer);
4141

4242
//Find out what IP of us should be given to CollectSink, that it will be able to connect to
4343
StreamExecutionEnvironment env = stream.getExecutionEnvironment();
4444
InetAddress clientAddress;
4545
if(env instanceof RemoteStreamEnvironment) {
46-
String host = ((RemoteStreamEnvironment)env).getHost();
47-
int port = ((RemoteStreamEnvironment)env).getPort();
46+
RemoteStreamEnvironment rse = (RemoteStreamEnvironment) env;
47+
String host = rse.getHost();
48+
int port = rse.getPort();
4849
try {
49-
clientAddress = ConnectionUtils.findConnectingAddress(new InetSocketAddress(host, port), 2000, 400);
50+
clientAddress = ConnectionUtils.findConnectingAddress(new InetSocketAddress(host, port), 2000, 400, rse.getConfiguration());
5051
} catch (IOException e) {
5152
throw new RuntimeException("IOException while trying to connect to the master", e);
5253
}
@@ -61,7 +62,7 @@ public static <OUT> Iterator<OUT> collect(DataStream<OUT> stream) {
6162
DataStreamSink<OUT> sink = stream.addSink(new CollectSink<OUT>(clientAddress, it.getPort(), serializer));
6263
sink.setParallelism(1); // It would not work if multiple instances would connect to the same port
6364

64-
(new CallExecute(stream)).start();
65+
(new CallExecute<>(stream)).start();
6566

6667
return it;
6768
}

flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,16 @@ public final class ConfigConstants {
160160
*/
161161
public static final String TASK_MANAGER_MAX_REGISTRATION_DURATION = "taskmanager.maxRegistrationDuration";
162162

163+
/**
164+
* The first strategy of the TaskManager to detect its own (externally reachable) IP address
165+
* is using the address returned by InetAddress.getLocalHost().
166+
* The default timeout for checking that IP address is 50 ms. This configuration parameter allows
167+
* users to set a higher value.
168+
*
169+
* See FLINK-2967 for details.
170+
*/
171+
public static final String TASK_MANAGER_ADDRESS_DETECTION_LOCAL_HOST_TIMEOUT = "taskmanager.address-detection.local-host-timeout";
172+
163173
// --------------------------- Runtime Algorithms -------------------------------
164174

165175
/**
@@ -665,7 +675,8 @@ public final class ConfigConstants {
665675
* The default path to the file containing the list of access privileged users and passwords.
666676
*/
667677
public static final String DEFAULT_WEB_ACCESS_FILE_PATH = null;
668-
678+
679+
669680
// ------------------------------ Akka Values ------------------------------
670681

671682
public static String DEFAULT_AKKA_TRANSPORT_HEARTBEAT_INTERVAL = "1000 s";

flink-runtime/src/main/java/org/apache/flink/runtime/net/ConnectionUtils.java

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
import java.util.UUID;
3030
import java.util.concurrent.TimeUnit;
3131

32+
import org.apache.flink.configuration.ConfigConstants;
33+
import org.apache.flink.configuration.Configuration;
3234
import org.apache.flink.runtime.akka.AkkaUtils;
3335
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException;
3436
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
@@ -93,9 +95,10 @@ public int getTimeout() {
9395
* @param maxWaitMillis The maximum time that this method tries to connect, before falling
9496
* back to the heuristics.
9597
* @param startLoggingAfter The time after which the method will log on INFO level.
98+
* @param conf Configuration object for the address detecton
9699
*/
97100
public static InetAddress findConnectingAddress(InetSocketAddress targetAddress,
98-
long maxWaitMillis, long startLoggingAfter) throws IOException
101+
long maxWaitMillis, long startLoggingAfter, Configuration conf) throws IOException
99102
{
100103
if (targetAddress == null) {
101104
throw new NullPointerException("targetAddress must not be null");
@@ -119,7 +122,7 @@ public static InetAddress findConnectingAddress(InetSocketAddress targetAddress,
119122
}
120123
// go over the strategies ADDRESS - FAST_CONNECT - SLOW_CONNECT
121124
do {
122-
InetAddress address = findAddressUsingStrategy(strategy, targetAddress, logging);
125+
InetAddress address = findAddressUsingStrategy(strategy, targetAddress, logging, conf);
123126
if (address != null) {
124127
return address;
125128
}
@@ -170,7 +173,7 @@ public static InetAddress findConnectingAddress(InetSocketAddress targetAddress,
170173

171174
// our attempts timed out. use the heuristic fallback
172175
LOG.warn("Could not connect to {}. Selecting a local address using heuristics.", targetAddress);
173-
InetAddress heuristic = findAddressUsingStrategy(AddressDetectionState.HEURISTIC, targetAddress, true);
176+
InetAddress heuristic = findAddressUsingStrategy(AddressDetectionState.HEURISTIC, targetAddress, true, conf);
174177
if (heuristic != null) {
175178
return heuristic;
176179
}
@@ -183,13 +186,19 @@ public static InetAddress findConnectingAddress(InetSocketAddress targetAddress,
183186

184187
private static InetAddress findAddressUsingStrategy(AddressDetectionState strategy,
185188
InetSocketAddress targetAddress,
186-
boolean logging) throws IOException
189+
boolean logging,
190+
Configuration conf) throws IOException
187191
{
188192
// try LOCAL_HOST strategy independent of the network interfaces
189193
if(strategy == AddressDetectionState.LOCAL_HOST) {
190194
InetAddress localhostName = InetAddress.getLocalHost();
191-
192-
if(tryToConnect(localhostName, targetAddress, strategy.getTimeout(), logging)) {
195+
int timeout = strategy.getTimeout();
196+
// check for a custom timeout
197+
if(conf.getInteger(ConfigConstants.TASK_MANAGER_ADDRESS_DETECTION_LOCAL_HOST_TIMEOUT, -1) != -1 ) {
198+
timeout = conf.getInteger(ConfigConstants.TASK_MANAGER_ADDRESS_DETECTION_LOCAL_HOST_TIMEOUT, -1);
199+
LOG.info("Using the custom-configured timeout for the LOCAL_HOST address detection strategy: {}", timeout);
200+
}
201+
if(tryToConnect(localhostName, targetAddress, timeout, logging)) {
193202
LOG.debug("Using InetAddress.getLocalHost() immediately for the connecting address");
194203
return localhostName;
195204
} else {
@@ -315,6 +324,11 @@ private enum LeaderRetrievalState {
315324
private String akkaURL;
316325
private LeaderRetrievalState retrievalState = LeaderRetrievalState.NOT_RETRIEVED;
317326
private Exception exception;
327+
private Configuration configuration;
328+
329+
public LeaderConnectingAddressListener(Configuration configuration) {
330+
this.configuration = configuration;
331+
}
318332

319333
public InetAddress findConnectingAddress(
320334
FiniteDuration timeout) throws LeaderRetrievalException {
@@ -369,7 +383,7 @@ public InetAddress findConnectingAddress(
369383
}
370384

371385
do {
372-
InetAddress address = ConnectionUtils.findAddressUsingStrategy(strategy, targetAddress, logging);
386+
InetAddress address = ConnectionUtils.findAddressUsingStrategy(strategy, targetAddress, logging, configuration);
373387
if (address != null) {
374388
return address;
375389
}
@@ -418,7 +432,7 @@ public InetAddress findConnectingAddress(
418432

419433
if (targetAddress != null) {
420434
LOG.warn("Could not connect to {}. Selecting a local address using heuristics.", targetAddress);
421-
heuristic = findAddressUsingStrategy(AddressDetectionState.HEURISTIC, targetAddress, true);
435+
heuristic = findAddressUsingStrategy(AddressDetectionState.HEURISTIC, targetAddress, true, configuration);
422436
}
423437

424438
if (heuristic != null) {

flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -146,8 +146,8 @@ public static LeaderConnectionInfo retrieveLeaderConnectionInfo(
146146

147147
public static InetAddress findConnectingAddress(
148148
LeaderRetrievalService leaderRetrievalService,
149-
FiniteDuration timeout) throws LeaderRetrievalException {
150-
ConnectionUtils.LeaderConnectingAddressListener listener = new ConnectionUtils.LeaderConnectingAddressListener();
149+
FiniteDuration timeout, Configuration configuration) throws LeaderRetrievalException {
150+
ConnectionUtils.LeaderConnectingAddressListener listener = new ConnectionUtils.LeaderConnectingAddressListener(configuration);
151151

152152
try {
153153
leaderRetrievalService.start(listener);

flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1353,7 +1353,8 @@ object TaskManager {
13531353

13541354
val taskManagerAddress = LeaderRetrievalUtils.findConnectingAddress(
13551355
leaderRetrievalService,
1356-
lookupTimeout)
1356+
lookupTimeout,
1357+
configuration)
13571358

13581359
taskManagerHostname = taskManagerAddress.getHostName()
13591360
LOG.info(s"TaskManager will use hostname/address '${taskManagerHostname}' " +

flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ public void testTimeoutOfFindConnectingAddress() throws Exception {
185185
FiniteDuration timeout = new FiniteDuration(10, TimeUnit.SECONDS);
186186

187187
LeaderRetrievalService leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(config);
188-
InetAddress result = LeaderRetrievalUtils.findConnectingAddress(leaderRetrievalService, timeout);
188+
InetAddress result = LeaderRetrievalUtils.findConnectingAddress(leaderRetrievalService, timeout, config);
189189

190190
assertEquals(InetAddress.getLocalHost(), result);
191191
}
@@ -207,7 +207,7 @@ public FindConnectingAddress(Configuration config, FiniteDuration timeout) {
207207
public void run() {
208208
try {
209209
LeaderRetrievalService leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(config);
210-
result = LeaderRetrievalUtils.findConnectingAddress(leaderRetrievalService, timeout);
210+
result = LeaderRetrievalUtils.findConnectingAddress(leaderRetrievalService, timeout, this.config);
211211
} catch (Exception e) {
212212
exception = e;
213213
}

flink-runtime/src/test/java/org/apache/flink/runtime/net/ConnectionUtilsTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import static org.junit.Assert.*;
2121

22+
import org.apache.flink.configuration.Configuration;
2223
import org.apache.flink.util.OperatingSystem;
2324
import org.junit.Test;
2425

@@ -47,7 +48,7 @@ public void testFindConnectableAddress() {
4748
InetSocketAddress unreachable = new InetSocketAddress("localhost", unusedPort);
4849

4950
final long start = System.currentTimeMillis();
50-
InetAddress add = ConnectionUtils.findConnectingAddress(unreachable, 2000, 400);
51+
InetAddress add = ConnectionUtils.findConnectingAddress(unreachable, 2000, 400, new Configuration());
5152

5253
// check that it did not take forever
5354
assertTrue(System.currentTimeMillis() - start < (OperatingSystem.isWindows() ? 30000 : 8000));

flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,4 +250,12 @@ public String getHost() {
250250
public int getPort() {
251251
return port;
252252
}
253+
254+
/**
255+
* Return Flink configuration
256+
* @return Flink configuration
257+
*/
258+
public Configuration getConfiguration() {
259+
return config;
260+
}
253261
}

flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -147,9 +147,9 @@ public void connectToCluster() throws IOException {
147147
LOG.info("Start actor system.");
148148
// find name of own public interface, able to connect to the JM
149149
// try to find address for 2 seconds. log after 400 ms.
150-
InetAddress ownHostname = ConnectionUtils.findConnectingAddress(jobManagerAddress, 2000, 400);
150+
InetAddress ownHostname = ConnectionUtils.findConnectingAddress(jobManagerAddress, 2000, 400, this.flinkConfig);
151151
actorSystem = AkkaUtils.createActorSystem(flinkConfig,
152-
new Some(new Tuple2<String, Integer>(ownHostname.getCanonicalHostName(), 0)));
152+
new Some(new Tuple2<>(ownHostname.getCanonicalHostName(), 0)));
153153

154154
// Create the leader election service
155155
flinkConfig.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, this.jobManagerAddress.getHostName());

0 commit comments

Comments
 (0)