Skip to content

Commit 3303b36

Browse files
authored
Merge pull request #36061 from marqo-ai/fix/filedistribution-timeout-close
Track consecutive RPC timeouts in file distribution and close stale connections
2 parents 72991bc + 0ea1744 commit 3303b36

5 files changed

Lines changed: 168 additions & 14 deletions

File tree

config/src/main/java/com/yahoo/vespa/config/Connection.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,4 +17,6 @@ public interface Connection {
1717

1818
String getAddress();
1919

20+
default void closeConnection() {}
21+
2022
}

config/src/main/java/com/yahoo/vespa/config/JRTConnection.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,15 @@ public String getAddress() {
4343
return address;
4444
}
4545

46+
@Override
47+
public synchronized void closeConnection() {
48+
if (target != null) {
49+
logger.log(Level.INFO, "Closing connection to " + address);
50+
target.close();
51+
target = null;
52+
}
53+
}
54+
4655
/**
4756
* This is synchronized to avoid multiple ConfigInstances creating new targets simultaneously, if
4857
* the existing target is null, invalid or has not yet been initialized.

filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -62,11 +62,27 @@ public FileDownloader(ConnectionPool connectionPool,
6262
this.timeout = timeout;
6363
// Needed to receive RPC receiveFile* calls from server after starting download of file reference
6464
new FileReceiver(supervisor, downloads, downloadDirectory);
65-
this.fileReferenceDownloader = new FileReferenceDownloader(connectionPool,
66-
downloads,
67-
timeout,
68-
backoffInitialTime,
69-
downloadDirectory);
65+
this.fileReferenceDownloader = new FileReferenceDownloader(connectionPool, downloads, timeout,
66+
backoffInitialTime, downloadDirectory);
67+
if (forceDownload)
68+
log.log(Level.INFO, "Force download of file references (download even if file reference exists on disk)");
69+
}
70+
71+
public FileDownloader(ConnectionPool connectionPool,
72+
Supervisor supervisor,
73+
File downloadDirectory,
74+
Duration timeout,
75+
Duration backoffInitialTime,
76+
int maxTimeoutsBeforeClose) {
77+
this.connectionPool = connectionPool;
78+
this.supervisor = supervisor;
79+
this.downloadDirectory = downloadDirectory;
80+
this.timeout = timeout;
81+
// Needed to receive RPC receiveFile* calls from server after starting download of file reference
82+
new FileReceiver(supervisor, downloads, downloadDirectory);
83+
this.fileReferenceDownloader = new FileReferenceDownloader(connectionPool, downloads, timeout,
84+
backoffInitialTime, downloadDirectory,
85+
maxTimeoutsBeforeClose);
7086
if (forceDownload)
7187
log.log(Level.INFO, "Force download of file references (download even if file reference exists on disk)");
7288
}

filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java

Lines changed: 40 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
import com.yahoo.concurrent.DaemonThreadFactory;
55
import com.yahoo.config.FileReference;
6+
import com.yahoo.jrt.ErrorCode;
67
import com.yahoo.jrt.Int32Value;
78
import com.yahoo.jrt.Request;
89
import com.yahoo.jrt.Spec;
@@ -39,6 +40,8 @@ public class FileReferenceDownloader {
3940
private static final Logger log = Logger.getLogger(FileReferenceDownloader.class.getName());
4041
private static final Set<CompressionType> defaultAcceptedCompressionTypes = Set.of(lz4, none, zstd);
4142

43+
private enum DownloadResult { SUCCESS, TIMEOUT, FAILURE }
44+
4245
private final ExecutorService downloadExecutor =
4346
Executors.newFixedThreadPool(Math.max(8, Runtime.getRuntime().availableProcessors()),
4447
new DaemonThreadFactory("filereference downloader"));
@@ -49,17 +52,31 @@ public class FileReferenceDownloader {
4952
private final Optional<Duration> rpcTimeout; // Only used when overridden with env variable
5053
private final File downloadDirectory;
5154
private final AtomicBoolean shutDown = new AtomicBoolean(false);
55+
private final int maxTimeoutsBeforeClose;
5256

5357
FileReferenceDownloader(ConnectionPool connectionPool,
5458
Downloads downloads,
5559
Duration timeout,
5660
Duration backoffInitialTime,
5761
File downloadDirectory) {
62+
this(connectionPool, downloads, timeout, backoffInitialTime, downloadDirectory,
63+
Optional.ofNullable(System.getenv("VESPA_FILE_DOWNLOAD_MAX_TIMEOUTS_BEFORE_CLOSE"))
64+
.map(Integer::parseInt)
65+
.orElse(0));
66+
}
67+
68+
FileReferenceDownloader(ConnectionPool connectionPool,
69+
Downloads downloads,
70+
Duration timeout,
71+
Duration backoffInitialTime,
72+
File downloadDirectory,
73+
int maxTimeoutsBeforeClose) {
5874
this.connectionPool = connectionPool;
5975
this.downloads = downloads;
6076
this.downloadTimeout = timeout;
6177
this.backoffInitialTime = backoffInitialTime;
6278
this.downloadDirectory = downloadDirectory;
79+
this.maxTimeoutsBeforeClose = maxTimeoutsBeforeClose;
6380
// Undocumented on purpose, might change or be removed at any time
6481
var timeoutString = Optional.ofNullable(System.getenv("VESPA_FILE_DOWNLOAD_RPC_TIMEOUT"));
6582
this.rpcTimeout = timeoutString.map(t -> Duration.ofSeconds(Integer.parseInt(t)));
@@ -69,6 +86,7 @@ private void waitUntilDownloadStarted(FileReferenceDownload fileReferenceDownloa
6986
Instant end = Instant.now().plus(downloadTimeout);
7087
FileReference fileReference = fileReferenceDownload.fileReference();
7188
int retryCount = 0;
89+
int timeoutCount = 0;
7290
Connection connection = connectionPool.getCurrent();
7391
do {
7492
if (retryCount > 0)
@@ -81,8 +99,19 @@ private void waitUntilDownloadStarted(FileReferenceDownload fileReferenceDownloa
8199
var timeout = rpcTimeout.orElse(Duration.between(Instant.now(), end));
82100
log.log(Level.FINE, "Wait until download of " + fileReference + " has started, retryCount " + retryCount +
83101
", timeout " + timeout + " (request from " + fileReferenceDownload.client() + ")");
84-
if ( ! timeout.isNegative() && startDownloadRpc(fileReferenceDownload, retryCount, connection, timeout))
85-
return;
102+
if ( ! timeout.isNegative()) {
103+
var result = startDownloadRpc(fileReferenceDownload, retryCount, connection, timeout);
104+
if (result == DownloadResult.SUCCESS) return;
105+
if (result == DownloadResult.TIMEOUT && maxTimeoutsBeforeClose > 0) {
106+
timeoutCount++;
107+
if (timeoutCount >= maxTimeoutsBeforeClose) {
108+
log.log(Level.INFO, "RPC request for " + fileReference + " timed out " + timeoutCount +
109+
" times, closing connection to " + connection.getAddress());
110+
connection.closeConnection();
111+
timeoutCount = 0;
112+
}
113+
}
114+
}
86115

87116
retryCount++;
88117
// There might not be one connection that works for all file references (each file reference might
@@ -131,10 +160,13 @@ void startDownloadFromSource(FileReferenceDownload fileReferenceDownload, Spec s
131160

132161
log.log(Level.FINE, () -> "Will download " + fileReference + " with timeout " + downloadTimeout + " from " + spec.host());
133162
downloads.add(fileReferenceDownload);
134-
var downloading = startDownloadRpc(fileReferenceDownload, 1, connection, downloadTimeout);
163+
var result = startDownloadRpc(fileReferenceDownload, 1, connection, downloadTimeout);
164+
if (result == DownloadResult.TIMEOUT && maxTimeoutsBeforeClose > 0) {
165+
connection.closeConnection();
166+
}
135167
// Need to explicitly remove from downloads if downloading has not started.
136168
// If downloading *has* started FileReceiver will take care of that when download has completed or failed
137-
if ( ! downloading)
169+
if (result != DownloadResult.SUCCESS)
138170
downloads.remove(fileReference);
139171
});
140172
}
@@ -144,7 +176,7 @@ void failedDownloading(FileReference fileReference) {
144176
downloads.remove(fileReference);
145177
}
146178

147-
private boolean startDownloadRpc(FileReferenceDownload fileReferenceDownload, int retryCount, Connection connection, Duration timeout) {
179+
private DownloadResult startDownloadRpc(FileReferenceDownload fileReferenceDownload, int retryCount, Connection connection, Duration timeout) {
148180
Request request = createRequest(fileReferenceDownload);
149181
connection.invokeSync(request, timeout);
150182

@@ -157,18 +189,18 @@ private boolean startDownloadRpc(FileReferenceDownload fileReferenceDownload, in
157189

158190
if (errorCode == 0) {
159191
log.log(Level.FINE, () -> "Found " + fileReference + " available at " + address);
160-
return true;
192+
return DownloadResult.SUCCESS;
161193
} else {
162194
var error = FileApiErrorCodes.get(errorCode);
163195
log.log(logLevel, "Downloading " + fileReference + " from " + address + " failed (" + error + ")");
164-
return false;
196+
return DownloadResult.FAILURE;
165197
}
166198
} else {
167199
log.log(logLevel, "Downloading " + fileReference + " from " + address +
168200
" (client " + fileReferenceDownload.client() + ") failed:" +
169201
" error code " + request.errorCode() + " (" + request.errorMessage() + ")." +
170202
" (retry " + retryCount + ", rpc timeout " + timeout + ")");
171-
return false;
203+
return request.errorCode() == ErrorCode.TIMEOUT ? DownloadResult.TIMEOUT : DownloadResult.FAILURE;
172204
}
173205
}
174206

filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java

Lines changed: 96 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.util.concurrent.Future;
3333

3434
import static com.yahoo.jrt.ErrorCode.CONNECTION;
35+
import static com.yahoo.jrt.ErrorCode.TIMEOUT;
3536
import static com.yahoo.vespa.filedistribution.FileReferenceData.CompressionType.zstd;
3637
import static com.yahoo.vespa.filedistribution.FileReferenceData.Type;
3738
import static com.yahoo.vespa.filedistribution.FileReferenceData.Type.compressed;
@@ -264,6 +265,66 @@ public void receiveFile() throws IOException {
264265
assertEquals("content", IOUtils.readFile(downloadedFile));
265266
}
266267

268+
@Test
269+
public void testConnectionCloseOnTimeout() {
270+
int timesToTimeout = 2;
271+
MockConnection mockConnection = new MockConnection();
272+
MockConnection.TimeoutResponseHandler responseHandler =
273+
new MockConnection.TimeoutResponseHandler(timesToTimeout);
274+
mockConnection.setResponseHandler(responseHandler);
275+
276+
FileDownloader downloader = new FileDownloader(mockConnection, supervisor, downloadDir,
277+
Duration.ofSeconds(4), sleepBetweenRetries,
278+
1);
279+
FileReference fileReference = new FileReference("timeoutTest");
280+
// File won't be found, download will fail after retries and timeout
281+
assertFalse(downloader.getFile(new FileReferenceDownload(fileReference, "test")).isPresent());
282+
assertEquals("Expected closeConnection called for each timeout, got " + mockConnection.getCloseConnectionCount(),
283+
timesToTimeout, mockConnection.getCloseConnectionCount());
284+
downloader.close();
285+
}
286+
287+
@Test
288+
public void testConnectionCloseAfterNTimeouts() {
289+
int timesToTimeout = 6;
290+
int retriesOnTimeoutBeforeClose = 2;
291+
MockConnection mockConnection = new MockConnection();
292+
MockConnection.TimeoutResponseHandler responseHandler =
293+
new MockConnection.TimeoutResponseHandler(timesToTimeout);
294+
mockConnection.setResponseHandler(responseHandler);
295+
296+
FileDownloader downloader = new FileDownloader(mockConnection, supervisor, downloadDir,
297+
Duration.ofSeconds(4), sleepBetweenRetries,
298+
retriesOnTimeoutBeforeClose);
299+
FileReference fileReference = new FileReference("timeoutNTest");
300+
// File won't be found, download will fail after retries and timeout
301+
assertFalse(downloader.getFile(new FileReferenceDownload(fileReference, "test")).isPresent());
302+
// With retriesOnTimeoutBeforeClose=2, close happens after the 2nd timeout (count 1,2 -> close),
303+
// then counter resets, so 6 timeouts / 2 = 3 closes
304+
assertEquals("Expected 3 closeConnection calls for 6 timeouts with threshold 2, got " + mockConnection.getCloseConnectionCount(),
305+
3, mockConnection.getCloseConnectionCount());
306+
downloader.close();
307+
}
308+
309+
@Test
310+
public void testNoConnectionCloseOnTimeoutByDefault() {
311+
int timesToTimeout = 2;
312+
MockConnection mockConnection = new MockConnection();
313+
MockConnection.TimeoutResponseHandler responseHandler =
314+
new MockConnection.TimeoutResponseHandler(timesToTimeout);
315+
mockConnection.setResponseHandler(responseHandler);
316+
317+
// maxTimeoutsBeforeClose=0 means timeout-based close feature is disabled
318+
FileDownloader downloader = new FileDownloader(mockConnection, supervisor, downloadDir,
319+
Duration.ofSeconds(4), sleepBetweenRetries,
320+
0);
321+
FileReference fileReference = new FileReference("timeoutDefaultTest");
322+
assertFalse(downloader.getFile(new FileReferenceDownload(fileReference, "test")).isPresent());
323+
assertEquals("Expected no closeConnection calls when feature is disabled, got " + mockConnection.getCloseConnectionCount(),
324+
0, mockConnection.getCloseConnectionCount());
325+
downloader.close();
326+
}
327+
267328
private void writeFileReference(File dir, String fileReferenceString, String fileName) throws IOException {
268329
File fileReferenceDir = new File(dir, fileReferenceString);
269330
fileReferenceDir.mkdir();
@@ -307,6 +368,7 @@ private FileDownloader createDownloader(MockConnection connection, Duration time
307368
private static class MockConnection implements ConnectionPool, com.yahoo.vespa.config.Connection {
308369

309370
private ResponseHandler responseHandler;
371+
private int closeConnectionCount = 0;
310372

311373
MockConnection() {
312374
this(new FileReferenceFoundResponseHandler());
@@ -328,7 +390,16 @@ public void invokeSync(Request request, Duration jrtTimeout) {
328390

329391
@Override
330392
public String getAddress() {
331-
return null;
393+
return "localhost";
394+
}
395+
396+
@Override
397+
public void closeConnection() {
398+
closeConnectionCount++;
399+
}
400+
401+
int getCloseConnectionCount() {
402+
return closeConnectionCount;
332403
}
333404

334405
@Override
@@ -403,6 +474,30 @@ public void request(Request request) {
403474
}
404475
}
405476

477+
static class TimeoutResponseHandler implements MockConnection.ResponseHandler {
478+
479+
private final int timesToTimeout;
480+
private int timedOutTimes = 0;
481+
482+
TimeoutResponseHandler(int timesToTimeout) {
483+
super();
484+
this.timesToTimeout = timesToTimeout;
485+
}
486+
487+
@Override
488+
public void request(Request request) {
489+
if (request.methodName().equals("filedistribution.serveFile")) {
490+
if (timedOutTimes < timesToTimeout) {
491+
request.setError(TIMEOUT, "Request timed out");
492+
timedOutTimes++;
493+
} else {
494+
request.returnValues().add(new Int32Value(0));
495+
request.returnValues().add(new StringValue("OK"));
496+
}
497+
}
498+
}
499+
}
500+
406501
static class ConnectionErrorResponseHandler implements MockConnection.ResponseHandler {
407502

408503
private final int timesToFail;

0 commit comments

Comments
 (0)