Skip to content

Commit 519d35d

Browse files
authored
Move reconnect logic to main thread (#51)
* Move reconnect logic to the main thread * Retry on most exceptions * Add recoverable and unrecoverable exceptions * Add missing license headers * Improve logs on unknown exceptions
1 parent 5eb394f commit 519d35d

File tree

4 files changed

+114
-60
lines changed

4 files changed

+114
-60
lines changed

src/main/java/io/seqera/tower/agent/Agent.java

Lines changed: 50 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import io.micronaut.rxjava2.http.client.websockets.RxWebSocketClient;
2121
import io.micronaut.scheduling.TaskScheduler;
2222
import io.micronaut.websocket.exceptions.WebSocketClientException;
23+
import io.seqera.tower.agent.exceptions.RecoverableException;
24+
import io.seqera.tower.agent.exceptions.UnrecoverableException;
2325
import io.seqera.tower.agent.exchange.CommandRequest;
2426
import io.seqera.tower.agent.exchange.CommandResponse;
2527
import io.seqera.tower.agent.exchange.HeartbeatMessage;
@@ -38,6 +40,7 @@
3840
import java.lang.module.ModuleDescriptor;
3941
import java.net.URI;
4042
import java.net.URISyntaxException;
43+
import java.net.UnknownHostException;
4144
import java.nio.file.Files;
4245
import java.nio.file.InvalidPathException;
4346
import java.nio.file.Path;
@@ -98,19 +101,30 @@ public static void main(String[] args) throws Exception {
98101
public void run() {
99102
try {
100103
validateParameters();
101-
checkTower();
102-
connectTower();
103104
sendPeriodicHeartbeat();
104-
} catch (Exception e) {
105+
infiniteLoop();
106+
} catch (UnrecoverableException e) {
105107
logger.error(e.getMessage());
106108
System.exit(1);
109+
} catch (Throwable e) {
110+
logger.error(e.getMessage(), e);
111+
System.exit(1);
107112
}
108113
}
109114

110-
private void connectTowerDelay() {
111-
TaskScheduler scheduler = ctx.getBean(TaskScheduler.class);
112-
Duration delay = Duration.ofSeconds(2);
113-
scheduler.schedule(delay, this::connectTower);
115+
private void infiniteLoop() throws InterruptedException, IOException {
116+
while (true) {
117+
try {
118+
if (agentClient == null || !agentClient.isOpen()) {
119+
checkTower();
120+
connectTower();
121+
}
122+
} catch (RecoverableException e) {
123+
logger.error(e.getMessage());
124+
}
125+
126+
Thread.sleep(2000);
127+
}
114128
}
115129

116130
/**
@@ -121,32 +135,28 @@ private void connectTower() {
121135
try {
122136
final URI uri = new URI(url + "/agent/" + agentKey + "/connect");
123137
if (!uri.getScheme().equals("https")) {
124-
logger.error("You are trying to connect to an insecure server: {}", url);
125-
System.exit(1);
138+
throw new UnrecoverableException(String.format("You are trying to connect to an insecure server: %s", url));
126139
}
127140

128141
final MutableHttpRequest<?> req = HttpRequest.GET(uri).bearerAuth(token);
129142
final RxWebSocketClient webSocketClient = ctx.getBean(RxWebSocketClient.class);
130143
agentClient = webSocketClient.connect(AgentClientSocket.class, req)
131144
.timeout(5, TimeUnit.SECONDS)
132145
.blockingFirst();
133-
agentClient.setConnectCallback(this::connectTowerDelay);
134146
agentClient.setCommandRequestCallback(this::execCommand);
135147
sendInfoMessage();
136148
} catch (URISyntaxException e) {
137-
logger.error("Invalid URI: {}/agent/{}/connect - {}", url, agentKey, e.getMessage());
138-
System.exit(1);
149+
throw new UnrecoverableException(String.format("Invalid URI: %s/agent/%s/connect - %s", url, agentKey, e.getMessage()));
139150
} catch (WebSocketClientException e) {
140-
logger.error("Connection error - {}", e.getMessage());
141-
System.exit(1);
151+
throw new RecoverableException(String.format("Connection error - %s", e.getMessage()));
152+
} catch (UnknownHostException e) {
153+
throw new RecoverableException("Unknown host exception - Check that it's a valid DNS domain.");
142154
} catch (Exception e) {
143155
if (e.getCause() instanceof TimeoutException) {
144-
logger.error("Connection timeout [trying to reconnect in {} seconds]", heartbeatDelay);
145-
} else {
146-
logger.error("Unknown problem");
147-
e.printStackTrace();
156+
throw new RecoverableException(String.format("Connection timeout -- %s", e.getCause().getMessage()));
148157
}
149-
System.exit(1);
158+
159+
throw new RecoverableException(String.format("Unknown problem - %s", e.getMessage()), e);
150160
}
151161
}
152162

@@ -159,6 +169,7 @@ private void execCommand(CommandRequest message) {
159169
CommandResponse response;
160170

161171
try {
172+
logger.trace("REQUEST: {}", message.getCommand());
162173
Process process = new ProcessBuilder()
163174
.command("sh", "-c", message.getCommand())
164175
.redirectErrorStream(true)
@@ -199,12 +210,10 @@ private void execCommand(CommandRequest message) {
199210
private void sendPeriodicHeartbeat() {
200211
TaskScheduler scheduler = ctx.getBean(TaskScheduler.class);
201212
scheduler.scheduleWithFixedDelay(heartbeatDelay, heartbeatDelay, () -> {
202-
if (agentClient.isOpen()) {
213+
if (agentClient != null && agentClient.isOpen()) {
203214
logger.info("Sending heartbeat");
215+
logger.trace("websocket session '{}'", agentClient.getId());
204216
agentClient.send(new HeartbeatMessage());
205-
} else {
206-
logger.info("Trying to reconnect");
207-
connectTower();
208217
}
209218
});
210219
}
@@ -226,8 +235,7 @@ private void validateParameters() throws IOException {
226235
// Fetch username
227236
validatedUserName = System.getenv().getOrDefault("USER", System.getProperty("user.name"));
228237
if (validatedUserName == null || validatedUserName.isEmpty() || validatedUserName.isBlank() || validatedUserName.equals("?")) {
229-
logger.error("Impossible to detect current Unix username. Try setting USER environment variable.");
230-
System.exit(1);
238+
throw new UnrecoverableException("Impossible to detect current Unix username. Try setting USER environment variable.");
231239
}
232240

233241
// Set default workDir
@@ -237,15 +245,13 @@ private void validateParameters() throws IOException {
237245
try {
238246
workDir = Paths.get(defaultPath);
239247
} catch (InvalidPathException e) {
240-
logger.error("Impossible to define a default work directory. Please provide one using '--work-dir'.");
241-
System.exit(1);
248+
throw new UnrecoverableException("Impossible to define a default work directory. Please provide one using '--work-dir'.");
242249
}
243250
}
244251

245252
// Validate workDir exists
246253
if (!Files.exists(workDir)) {
247-
logger.error("The work directory '{}' do not exists. Create it or provide a different one using '--work-dir'.", workDir);
248-
System.exit(1);
254+
throw new UnrecoverableException(String.format("The work directory '%s' do not exists. Create it or provide a different one using '--work-dir'.", workDir));
249255
}
250256
validatedWorkDir = workDir.toAbsolutePath().normalize().toString();
251257

@@ -261,38 +267,35 @@ private void validateParameters() throws IOException {
261267
* Do some health checks to the Tower API endpoint to verify that it is available and
262268
* compatible with this Agent.
263269
*/
264-
private void checkTower() {
270+
private void checkTower() throws IOException {
265271
final RxHttpClient httpClient = ctx.getBean(RxHttpClient.class);
272+
ServiceInfoResponse infoResponse = null;
266273
try {
267274
final URI uri = new URI(url + "/service-info");
268275
final MutableHttpRequest<?> req = HttpRequest.GET(uri).bearerAuth(token);
269-
270-
ServiceInfoResponse infoResponse = httpClient.retrieve(req, ServiceInfoResponse.class).blockingFirst();
271-
if (infoResponse.getServiceInfo() != null && infoResponse.getServiceInfo().getApiVersion() != null) {
272-
final ModuleDescriptor.Version systemApiVersion = ModuleDescriptor.Version.parse(infoResponse.getServiceInfo().getApiVersion());
273-
final ModuleDescriptor.Version requiredApiVersion = ModuleDescriptor.Version.parse(getVersionApi());
274-
275-
if (systemApiVersion.compareTo(requiredApiVersion) < 0) {
276-
logger.error("Tower at '{}' is running API version {} and the agent needs a minimum of {}", url, systemApiVersion, requiredApiVersion);
277-
System.exit(1);
278-
}
279-
}
276+
infoResponse = httpClient.retrieve(req, ServiceInfoResponse.class).blockingFirst();
280277
} catch (Exception e) {
281278
if (url.contains("/api")) {
282-
logger.error("Tower API endpoint '{}' it is not available", url);
283-
} else {
284-
logger.error("Tower API endpoint '{}' it is not available (did you mean '{}/api'?)", url, url);
279+
throw new RecoverableException(String.format("Tower API endpoint '%s' it is not available", url));
280+
}
281+
throw new RecoverableException(String.format("Tower API endpoint '%s' it is not available (did you mean '%s/api'?)", url, url));
282+
}
283+
284+
if (infoResponse != null && infoResponse.getServiceInfo() != null && infoResponse.getServiceInfo().getApiVersion() != null) {
285+
final ModuleDescriptor.Version systemApiVersion = ModuleDescriptor.Version.parse(infoResponse.getServiceInfo().getApiVersion());
286+
final ModuleDescriptor.Version requiredApiVersion = ModuleDescriptor.Version.parse(getVersionApi());
287+
288+
if (systemApiVersion.compareTo(requiredApiVersion) < 0) {
289+
throw new UnrecoverableException(String.format("Tower at '%s' is running API version %s and the agent needs a minimum of %s", url, systemApiVersion, requiredApiVersion));
285290
}
286-
System.exit(1);
287291
}
288292

289293
try {
290294
final URI uri = new URI(url + "/user");
291295
final MutableHttpRequest<?> req = HttpRequest.GET(uri).bearerAuth(token);
292296
httpClient.retrieve(req).blockingFirst();
293297
} catch (Exception e) {
294-
logger.error("Invalid TOWER_ACCESS_TOKEN, check that the given token has access at '{}'.", url);
295-
System.exit(1);
298+
throw new UnrecoverableException(String.format("Invalid TOWER_ACCESS_TOKEN, check that the given token has access at '%s'.", url));
296299
}
297300
}
298301

src/main/java/io/seqera/tower/agent/AgentClientSocket.java

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,6 @@ abstract class AgentClientSocket implements AutoCloseable {
3939
private WebSocketSession session;
4040
private Instant openingTime;
4141

42-
// Callback to reconnect the agent
43-
private Runnable connectCallback;
44-
4542
// Callback to manage a command request
4643
private Consumer<CommandRequest> commandRequestCallback;
4744

@@ -79,7 +76,6 @@ void onClose(CloseReason reason) {
7976

8077
if (reason.getCode() == 4001) {
8178
logger.info("Closing to reauthenticate the session");
82-
return;
8379
} else {
8480
logger.info("Closed for unknown reason after");
8581
if (openingTime != null) {
@@ -88,11 +84,6 @@ void onClose(CloseReason reason) {
8884
logger.info("Session duration {}", duration);
8985
}
9086
}
91-
92-
if (connectCallback != null) {
93-
logger.info("Reconnecting in 2 seconds");
94-
connectCallback.run();
95-
}
9687
}
9788

9889
abstract void send(AgentMessage message);
@@ -103,13 +94,13 @@ public boolean isOpen() {
10394
return session.isOpen();
10495
}
10596

106-
public void setConnectCallback(Runnable connectCallback) {
107-
this.connectCallback = connectCallback;
108-
}
109-
11097
public void setCommandRequestCallback(Consumer<CommandRequest> callback) {
11198
this.commandRequestCallback = callback;
11299
}
113100

101+
public String getId() {
102+
return session.getId();
103+
}
104+
114105

115106
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Copyright (c) 2021, Seqera Labs.
3+
*
4+
* This Source Code Form is subject to the terms of the Mozilla Public
5+
* License, v. 2.0. If a copy of the MPL was not distributed with this
6+
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
7+
*
8+
* This Source Code Form is "Incompatible With Secondary Licenses", as
9+
* defined by the Mozilla Public License, v. 2.0.
10+
*/
11+
12+
package io.seqera.tower.agent.exceptions;
13+
14+
/**
15+
* A recoverable exception is an exception that Tower Agent will log as
16+
* an error, but it will keep running and retrying to connect.
17+
*/
18+
public class RecoverableException extends RuntimeException {
19+
20+
public RecoverableException() {
21+
}
22+
23+
public RecoverableException(String message) {
24+
super(message);
25+
}
26+
27+
public RecoverableException(String message, Throwable cause) {
28+
super(message, cause);
29+
}
30+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Copyright (c) 2021, Seqera Labs.
3+
*
4+
* This Source Code Form is subject to the terms of the Mozilla Public
5+
* License, v. 2.0. If a copy of the MPL was not distributed with this
6+
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
7+
*
8+
* This Source Code Form is "Incompatible With Secondary Licenses", as
9+
* defined by the Mozilla Public License, v. 2.0.
10+
*/
11+
12+
package io.seqera.tower.agent.exceptions;
13+
14+
/**
15+
* An unrecoverable exception is an exception that Tower Agent will log as
16+
* an error and cause it to exit with an exit code error.
17+
*/
18+
public class UnrecoverableException extends RuntimeException {
19+
20+
public UnrecoverableException() {
21+
}
22+
23+
public UnrecoverableException(String message) {
24+
super(message);
25+
}
26+
27+
public UnrecoverableException(String message, Throwable cause) {
28+
super(message, cause);
29+
}
30+
}

0 commit comments

Comments
 (0)