Skip to content

Commit

Permalink
Use remaining timeout as rpc timeout when downloading
Browse files Browse the repository at this point in the history
Make sure to use remaining timeout as rpc timeout when downloading,
otherwise client initiating download will timeout before getting
any response. Environment variable will override this if set.
  • Loading branch information
hmusum committed Jan 7, 2025
1 parent 937b47f commit 2a9353b
Showing 1 changed file with 11 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public class FileReferenceDownloader {
private final Downloads downloads;
private final Duration downloadTimeout;
private final Duration backoffInitialTime;
private final Duration rpcTimeout;
private final Optional<Duration> rpcTimeout; // Only used when overridden with env variable
private final File downloadDirectory;
private final AtomicBoolean shutDown = new AtomicBoolean(false);

Expand All @@ -61,8 +61,8 @@ public class FileReferenceDownloader {
this.backoffInitialTime = backoffInitialTime;
this.downloadDirectory = downloadDirectory;
// Undocumented on purpose, might change or be removed at any time
String timeoutString = System.getenv("VESPA_FILE_DOWNLOAD_RPC_TIMEOUT");
this.rpcTimeout = Duration.ofSeconds(timeoutString == null ? 30 : Integer.parseInt(timeoutString));
var timeoutString = Optional.ofNullable(System.getenv("VESPA_FILE_DOWNLOAD_RPC_TIMEOUT"));
this.rpcTimeout = timeoutString.map(t -> Duration.ofSeconds(Integer.parseInt(t)));
}

private void waitUntilDownloadStarted(FileReferenceDownload fileReferenceDownload) {
Expand All @@ -78,7 +78,10 @@ private void waitUntilDownloadStarted(FileReferenceDownload fileReferenceDownloa
return;
if (FileDownloader.fileReferenceExists(fileReference, downloadDirectory))
return;
if (startDownloadRpc(fileReferenceDownload, retryCount, connection))
var timeout = rpcTimeout.orElse(Duration.between(Instant.now(), end));
log.log(Level.FINE, "Wait until download of " + fileReference + " has started, retryCount " + retryCount +
" timeout" + timeout + " (request from client " + fileReferenceDownload.client() + ")");
if ( ! timeout.isNegative() && startDownloadRpc(fileReferenceDownload, retryCount, connection, timeout))
return;

retryCount++;
Expand Down Expand Up @@ -113,7 +116,6 @@ CompletableFuture<Optional<File>> startDownload(FileReferenceDownload fileRefere
Optional<FileReferenceDownload> inProgress = downloads.get(fileReference);
if (inProgress.isPresent()) return inProgress.get().future();

log.log(Level.FINE, () -> "Will download " + fileReference + " with timeout " + downloadTimeout);
downloads.add(fileReferenceDownload);
downloadExecutor.submit(() -> waitUntilDownloadStarted(fileReferenceDownload));
return fileReferenceDownload.future();
Expand All @@ -129,7 +131,7 @@ void startDownloadFromSource(FileReferenceDownload fileReferenceDownload, Spec s
downloadExecutor.submit(() -> {
log.log(Level.FINE, () -> "Will download " + fileReference + " with timeout " + downloadTimeout + " from " + spec);
downloads.add(fileReferenceDownload);
startDownloadRpc(fileReferenceDownload, 1, connection);
startDownloadRpc(fileReferenceDownload, 1, connection, downloadTimeout);
downloads.remove(fileReference);
});
}
Expand All @@ -139,10 +141,9 @@ void failedDownloading(FileReference fileReference) {
downloads.remove(fileReference);
}

private boolean startDownloadRpc(FileReferenceDownload fileReferenceDownload, int retryCount, Connection connection) {
private boolean startDownloadRpc(FileReferenceDownload fileReferenceDownload, int retryCount, Connection connection, Duration timeout) {
Request request = createRequest(fileReferenceDownload);
Duration rpcTimeout = rpcTimeout(retryCount);
connection.invokeSync(request, rpcTimeout);
connection.invokeSync(request, timeout);

Level logLevel = (retryCount > 3 ? Level.INFO : Level.FINE);
FileReference fileReference = fileReferenceDownload.fileReference();
Expand All @@ -162,7 +163,7 @@ private boolean startDownloadRpc(FileReferenceDownload fileReferenceDownload, in
} else {
log.log(logLevel, "Downloading " + fileReference + " from " + address + " failed:" +
" error code " + request.errorCode() + " (" + request.errorMessage() + ")." +
" (retry " + retryCount + ", rpc timeout " + rpcTimeout + ")");
" (retry " + retryCount + ", rpc timeout " + timeout + ")");
return false;
}
}
Expand All @@ -177,10 +178,6 @@ private Request createRequest(FileReferenceDownload fileReferenceDownload) {
return request;
}

private Duration rpcTimeout(int retryCount) {
return Duration.ofSeconds(rpcTimeout.getSeconds()).plus(Duration.ofSeconds(retryCount * 5L));
}

private boolean validateResponse(Request request) {
if (request.isError()) {
return false;
Expand Down

0 comments on commit 2a9353b

Please sign in to comment.