Skip to content

Commit ccaa707

Browse files
Auden Woolfsonauden-woolfson
authored andcommitted
Add health check to route queries to healthy cluster
1 parent a7a7f80 commit ccaa707

File tree

12 files changed

+344
-31
lines changed

12 files changed

+344
-31
lines changed

presto-router/etc/router-config.json

Lines changed: 0 additions & 14 deletions
This file was deleted.

presto-router/src/main/java/com/facebook/presto/router/RouterModule.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,26 +14,39 @@
1414
package com.facebook.presto.router;
1515

1616
import com.facebook.airlift.configuration.AbstractConfigurationAwareModule;
17+
import com.facebook.presto.client.NodeVersion;
1718
import com.facebook.presto.router.cluster.ClusterManager;
1819
import com.facebook.presto.router.cluster.ClusterStatusResource;
1920
import com.facebook.presto.router.cluster.ClusterStatusTracker;
2021
import com.facebook.presto.router.cluster.ForClusterInfoTracker;
22+
import com.facebook.presto.router.cluster.ForClusterManager;
2123
import com.facebook.presto.router.cluster.ForQueryInfoTracker;
2224
import com.facebook.presto.router.cluster.RemoteInfoFactory;
25+
import com.facebook.presto.router.cluster.RemoteStateConfig;
2326
import com.facebook.presto.router.predictor.ForQueryCpuPredictor;
2427
import com.facebook.presto.router.predictor.ForQueryMemoryPredictor;
2528
import com.facebook.presto.router.predictor.PredictorManager;
2629
import com.facebook.presto.router.predictor.RemoteQueryFactory;
30+
import com.facebook.presto.server.PluginManagerConfig;
31+
import com.facebook.presto.server.ServerConfig;
32+
import com.facebook.presto.server.WebUiResource;
33+
import com.facebook.presto.server.security.oauth2.ForOAuth2;
2734
import com.google.inject.Binder;
2835
import com.google.inject.Scopes;
2936
import io.airlift.units.Duration;
3037

3138
import java.lang.annotation.Annotation;
39+
import java.util.concurrent.ExecutorService;
40+
import java.util.concurrent.ScheduledExecutorService;
3241

42+
import static com.facebook.airlift.concurrent.Threads.daemonThreadsNamed;
43+
import static com.facebook.airlift.concurrent.Threads.threadsNamed;
3344
import static com.facebook.airlift.configuration.ConfigBinder.configBinder;
3445
import static com.facebook.airlift.http.client.HttpClientBinder.httpClientBinder;
3546
import static com.facebook.airlift.http.server.HttpServerBinder.httpServerBinder;
3647
import static com.facebook.airlift.jaxrs.JaxrsBinder.jaxrsBinder;
48+
import static java.util.concurrent.Executors.newCachedThreadPool;
49+
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
3750
import static java.util.concurrent.TimeUnit.SECONDS;
3851

3952
public class RouterModule
@@ -52,25 +65,45 @@ public class RouterModule
5265
@Override
5366
protected void setup(Binder binder)
5467
{
68+
ServerConfig serverConfig = buildConfigObject(ServerConfig.class);
69+
5570
httpServerBinder(binder).bindResource(UI_PATH, ROUTER_UI).withWelcomeFile(INDEX_HTML);
5671
configBinder(binder).bindConfig(RouterConfig.class);
5772

73+
configBinder(binder).bindConfig(RemoteStateConfig.class);
74+
configBinder(binder).bindConfigDefaults(RemoteStateConfig.class, config -> config.setSecondsToUnhealthy(60));
75+
76+
// resource for serving static content
77+
jaxrsBinder(binder).bind(WebUiResource.class);
78+
79+
binder.bind(ScheduledExecutorService.class).annotatedWith(ForClusterManager.class).toInstance(newSingleThreadScheduledExecutor(threadsNamed("cluster-config")));
80+
5881
binder.bind(ClusterManager.class).in(Scopes.SINGLETON);
5982
binder.bind(RemoteInfoFactory.class).in(Scopes.SINGLETON);
6083

6184
bindHttpClient(binder, QUERY_TRACKER, ForQueryInfoTracker.class, IDLE_TIMEOUT_SECOND, REQUEST_TIMEOUT_SECOND);
6285
bindHttpClient(binder, QUERY_TRACKER, ForClusterInfoTracker.class, IDLE_TIMEOUT_SECOND, REQUEST_TIMEOUT_SECOND);
6386

87+
//Determine the NodeVersion
88+
NodeVersion nodeVersion = new NodeVersion(serverConfig.getPrestoVersion());
89+
binder.bind(NodeVersion.class).toInstance(nodeVersion);
90+
6491
binder.bind(ClusterStatusTracker.class).in(Scopes.SINGLETON);
6592

6693
binder.bind(PredictorManager.class).in(Scopes.SINGLETON);
6794
binder.bind(RemoteQueryFactory.class).in(Scopes.SINGLETON);
6895

96+
binder.bind(RouterPluginManager.class).in(Scopes.SINGLETON);
97+
configBinder(binder).bindConfig(PluginManagerConfig.class);
98+
6999
bindHttpClient(binder, QUERY_PREDICTOR, ForQueryCpuPredictor.class, IDLE_TIMEOUT_SECOND, PREDICTOR_REQUEST_TIMEOUT_SECOND);
70100
bindHttpClient(binder, QUERY_PREDICTOR, ForQueryMemoryPredictor.class, IDLE_TIMEOUT_SECOND, PREDICTOR_REQUEST_TIMEOUT_SECOND);
71101

72102
jaxrsBinder(binder).bind(RouterResource.class);
73103
jaxrsBinder(binder).bind(ClusterStatusResource.class);
104+
105+
//binder.bind(Executor.class).annotatedWith(ForOAuth2.class).toInstance(executor);
106+
binder.bind(ExecutorService.class).annotatedWith(ForOAuth2.class).toInstance(newCachedThreadPool(daemonThreadsNamed("oauth-executor-%s")));
74107
}
75108

76109
private void bindHttpClient(Binder binder, String name, Class<? extends Annotation> annotation, int idleTimeout, int requestTimeout)

presto-router/src/main/java/com/facebook/presto/router/RouterResource.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@
4040
public class RouterResource
4141
{
4242
private static final Logger log = Logger.get(RouterResource.class);
43-
4443
private final ClusterManager clusterManager;
4544

4645
@Inject

presto-router/src/main/java/com/facebook/presto/router/cluster/ClusterManager.java

Lines changed: 62 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,18 @@
3131
import java.util.List;
3232
import java.util.Map;
3333
import java.util.Optional;
34+
import java.util.concurrent.ConcurrentHashMap;
35+
import java.util.concurrent.ScheduledExecutorService;
36+
import java.util.concurrent.TimeUnit;
37+
import java.util.concurrent.atomic.AtomicLong;
38+
import java.util.stream.Collectors;
3439

3540
import static com.facebook.presto.router.RouterUtil.parseRouterConfig;
3641
import static com.facebook.presto.router.scheduler.SchedulerType.WEIGHTED_RANDOM_CHOICE;
3742
import static com.facebook.presto.spi.StandardErrorCode.CONFIGURATION_INVALID;
3843
import static com.google.common.base.Preconditions.checkArgument;
3944
import static com.google.common.collect.ImmutableList.toImmutableList;
45+
import static java.util.Objects.requireNonNull;
4046
import static java.util.stream.Collectors.toMap;
4147

4248
public class ClusterManager
@@ -46,21 +52,55 @@ public class ClusterManager
4652
private final SchedulerType schedulerType;
4753
private final Scheduler scheduler;
4854
private final HashMap<String, HashMap<URI, Integer>> serverWeights = new HashMap<>();
55+
private final RouterConfig routerConfig;
56+
private final ScheduledExecutorService scheduledExecutorService;
57+
private final AtomicLong lastConfigUpdate = new AtomicLong();
58+
private final RemoteInfoFactory remoteInfoFactory;
59+
60+
// Cluster status
61+
private final ConcurrentHashMap<URI, RemoteClusterInfo> remoteClusterInfos = new ConcurrentHashMap<>();
62+
private final ConcurrentHashMap<URI, RemoteQueryInfo> remoteQueryInfos = new ConcurrentHashMap<>();
4963

5064
@Inject
51-
public ClusterManager(RouterConfig config)
65+
public ClusterManager(RouterConfig config, @ForClusterManager ScheduledExecutorService scheduledExecutorService, RemoteInfoFactory remoteInfoFactory)
5266
{
67+
this.routerConfig = config;
68+
this.scheduledExecutorService = scheduledExecutorService;
5369
RouterSpec routerSpec = parseRouterConfig(config)
5470
.orElseThrow(() -> new PrestoException(CONFIGURATION_INVALID, "Failed to load router config"));
5571

5672
this.groups = ImmutableMap.copyOf(routerSpec.getGroups().stream().collect(toMap(GroupSpec::getName, group -> group)));
5773
this.groupSelectors = ImmutableList.copyOf(routerSpec.getSelectors());
5874
this.schedulerType = routerSpec.getSchedulerType();
5975
this.scheduler = new SchedulerFactory(routerSpec.getSchedulerType()).create();
60-
76+
this.remoteInfoFactory = requireNonNull(remoteInfoFactory, "remoteInfoFactory is null");
6177
this.initializeServerWeights();
6278
}
6379

80+
@PostConstruct
81+
public void startConfigReloadTask()
82+
{
83+
File routerConfigFile = new File(routerConfig.getConfigFile());
84+
//ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
85+
scheduledExecutorService.scheduleAtFixedRate(() -> {
86+
long newConfigUpdateTime = routerConfigFile.lastModified();
87+
if (lastConfigUpdate.get() != newConfigUpdateTime) {
88+
RouterSpec routerSpec = parseRouterConfig(routerConfig)
89+
.orElseThrow(() -> new PrestoException(CONFIGURATION_INVALID, "Failed to load router config"));
90+
this.groups = ImmutableMap.copyOf(routerSpec.getGroups().stream().collect(toMap(GroupSpec::getName, group -> group)));
91+
this.groupSelectors = ImmutableList.copyOf(routerSpec.getSelectors());
92+
this.schedulerType = routerSpec.getSchedulerType();
93+
this.scheduler = new SchedulerFactory(routerSpec.getSchedulerType()).create();
94+
this.initializeServerWeights();
95+
lastConfigUpdate.set(newConfigUpdateTime);
96+
}
97+
}, 0L, (long) 30, TimeUnit.SECONDS);
98+
getAllClusters().forEach(uri -> {
99+
remoteClusterInfos.put(uri, remoteInfoFactory.createRemoteClusterInfo(uri));
100+
remoteQueryInfos.put(uri, remoteInfoFactory.createRemoteQueryInfo(uri));
101+
});
102+
}
103+
64104
public List<URI> getAllClusters()
65105
{
66106
return groups.values().stream()
@@ -77,7 +117,16 @@ public Optional<URI> getDestination(RequestInfo requestInfo)
77117

78118
checkArgument(groups.containsKey(target.get()));
79119
GroupSpec groupSpec = groups.get(target.get());
80-
scheduler.setCandidates(groupSpec.getMembers());
120+
121+
List<URI> healthyClusterURIs = groupSpec.getMembers().stream()
122+
.filter(entry -> remoteClusterInfos.get(entry).isHealthy())
123+
.collect(Collectors.toList());
124+
125+
if (healthyClusterURIs.isEmpty()) {
126+
return Optional.empty();
127+
}
128+
129+
scheduler.setCandidates(healthyClusterURIs);
81130
if (schedulerType == WEIGHTED_RANDOM_CHOICE) {
82131
scheduler.setWeights(serverWeights.get(groupSpec.getName()));
83132
}
@@ -105,4 +154,14 @@ private void initializeServerWeights()
105154
}
106155
});
107156
}
157+
158+
public ConcurrentHashMap<URI, RemoteClusterInfo> getRemoteClusterInfos()
159+
{
160+
return remoteClusterInfos;
161+
}
162+
163+
public ConcurrentHashMap<URI, RemoteQueryInfo> getRemoteQueryInfos()
164+
{
165+
return remoteQueryInfos;
166+
}
108167
}

presto-router/src/main/java/com/facebook/presto/router/cluster/ClusterStatusTracker.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,8 @@ public ClusterStatusTracker(
6060
public void startPollingQueryInfo()
6161
{
6262
clusterManager.getAllClusters().forEach(uri -> {
63-
remoteClusterInfos.put(uri, remoteInfoFactory.createRemoteClusterInfo(uri));
64-
remoteQueryInfos.put(uri, remoteInfoFactory.createRemoteQueryInfo(uri));
63+
remoteClusterInfos.put(uri, clusterManager.getRemoteClusterInfos().get(uri));
64+
remoteQueryInfos.put(uri, clusterManager.getRemoteQueryInfos().get(uri));
6565
});
6666

6767
queryInfoUpdateExecutor.scheduleWithFixedDelay(() -> {

presto-router/src/main/java/com/facebook/presto/router/cluster/RemoteClusterInfo.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,9 @@ public class RemoteClusterInfo
3939
private final AtomicLong activeWorkers = new AtomicLong();
4040
private final AtomicLong runningDrivers = new AtomicLong();
4141

42-
public RemoteClusterInfo(HttpClient httpClient, URI remoteUri)
42+
public RemoteClusterInfo(HttpClient httpClient, URI remoteUri, RemoteStateConfig remoteStateConfig)
4343
{
44-
super(httpClient, remoteUri);
44+
super(httpClient, remoteUri, remoteStateConfig);
4545
}
4646

4747
@Override

presto-router/src/main/java/com/facebook/presto/router/cluster/RemoteInfoFactory.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,23 +29,26 @@ public class RemoteInfoFactory
2929

3030
private final HttpClient clusterInfoHttpClient;
3131
private final HttpClient queryInfoHttpClient;
32+
private final RemoteStateConfig remoteStateConfig;
3233

3334
@Inject
3435
public RemoteInfoFactory(
3536
@ForClusterInfoTracker HttpClient clusterInfoHttpClient,
36-
@ForQueryInfoTracker HttpClient queryInfoHttpClient)
37+
@ForQueryInfoTracker HttpClient queryInfoHttpClient,
38+
RemoteStateConfig remoteStateConfig)
3739
{
3840
this.clusterInfoHttpClient = requireNonNull(clusterInfoHttpClient, "Http client for cluster info is null");
3941
this.queryInfoHttpClient = requireNonNull(queryInfoHttpClient, "Http client for cluster info is null");
42+
this.remoteStateConfig = requireNonNull(remoteStateConfig, "remoteStateConfig is null");
4043
}
4144

4245
public RemoteQueryInfo createRemoteQueryInfo(URI uri)
4346
{
44-
return new RemoteQueryInfo(clusterInfoHttpClient, uriBuilderFrom(uri).appendPath(QUERY_INFO).build());
47+
return new RemoteQueryInfo(clusterInfoHttpClient, uriBuilderFrom(uri).appendPath(QUERY_INFO).build(), remoteStateConfig);
4548
}
4649

4750
public RemoteClusterInfo createRemoteClusterInfo(URI uri)
4851
{
49-
return new RemoteClusterInfo(queryInfoHttpClient, uriBuilderFrom(uri).appendPath(CLUSTER_INFO).build());
52+
return new RemoteClusterInfo(queryInfoHttpClient, uriBuilderFrom(uri).appendPath(CLUSTER_INFO).build(), remoteStateConfig);
5053
}
5154
}

presto-router/src/main/java/com/facebook/presto/router/cluster/RemoteQueryInfo.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,9 @@ public class RemoteQueryInfo
3737

3838
private final AtomicReference<Optional<List<JsonNode>>> queryList = new AtomicReference<>(Optional.empty());
3939

40-
public RemoteQueryInfo(HttpClient httpClient, URI remoteUri)
40+
public RemoteQueryInfo(HttpClient httpClient, URI remoteUri, RemoteStateConfig remoteStateConfig)
4141
{
42-
super(httpClient, remoteUri);
42+
super(httpClient, remoteUri, remoteStateConfig);
4343
}
4444

4545
public Optional<List<JsonNode>> getQueryList()

presto-router/src/main/java/com/facebook/presto/router/cluster/RemoteState.java

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626
import javax.annotation.Nullable;
2727
import javax.annotation.concurrent.ThreadSafe;
28+
import javax.inject.Inject;
2829

2930
import java.net.URI;
3031
import java.util.concurrent.Future;
@@ -51,11 +52,17 @@ public abstract class RemoteState
5152
private final AtomicReference<Future<?>> future = new AtomicReference<>();
5253
private final AtomicLong lastUpdateNanos = new AtomicLong();
5354
private final AtomicLong lastWarningLogged = new AtomicLong();
55+
private final long secondsToUnhealthy;
5456

55-
public RemoteState(HttpClient httpClient, URI remoteUri)
57+
private Boolean isHealthy = false;
58+
private long lastHealthyResponseTime;
59+
60+
@Inject
61+
public RemoteState(HttpClient httpClient, URI remoteUri, RemoteStateConfig remoteStateConfig)
5662
{
5763
this.httpClient = requireNonNull(httpClient, "httpClient is null");
5864
this.remoteUri = requireNonNull(remoteUri, "remoteUri is null");
65+
this.secondsToUnhealthy = remoteStateConfig.getSecondsToUnhealthy();
5966
}
6067

6168
public void handleResponse(JsonNode response) {}
@@ -71,9 +78,15 @@ public synchronized void asyncRefresh()
7178
lastWarningLogged.set(System.nanoTime());
7279
}
7380

81+
if (nanosSince(lastHealthyResponseTime).toMillis() >= (secondsToUnhealthy * 1_000) && isHealthy) {
82+
isHealthy = false;
83+
log.warn("%s marked as unhealthy", remoteUri);
84+
}
85+
7486
if (sinceUpdate.toMillis() > 1_000 && future.get() == null) {
7587
Request request = prepareGet()
7688
.setUri(remoteUri)
89+
.addHeader("Authorization", "Basic " + System.getenv("ROUTER_USER_CREDENTIALS"))
7790
.build();
7891

7992
HttpClient.HttpResponseFuture<FullJsonResponseHandler.JsonResponse<JsonNode>> responseFuture = httpClient.executeAsync(request, createFullJsonResponseHandler(JSON_CODEC));
@@ -91,8 +104,14 @@ public void onSuccess(@Nullable FullJsonResponseHandler.JsonResponse<JsonNode> r
91104
handleResponse(result.getValue());
92105
}
93106
if (result.getStatusCode() != OK.code()) {
94-
log.warn("Error fetching node state from %s returned status %d", remoteUri, result.getStatusCode());
95-
return;
107+
log.warn("Error fetching node state from %s returned status %d: %s", remoteUri, result.getStatusCode(), result.getStatusMessage());
108+
}
109+
else {
110+
if (!isHealthy) {
111+
log.info("%s marked as healthy", remoteUri);
112+
isHealthy = true;
113+
}
114+
lastHealthyResponseTime = System.nanoTime();
96115
}
97116
}
98117
}
@@ -107,4 +126,9 @@ public void onFailure(Throwable t)
107126
}, directExecutor());
108127
}
109128
}
129+
130+
public Boolean isHealthy()
131+
{
132+
return isHealthy;
133+
}
110134
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package com.facebook.presto.router.cluster;
15+
16+
import com.facebook.airlift.configuration.Config;
17+
import com.facebook.airlift.configuration.ConfigDescription;
18+
19+
import javax.validation.constraints.NotNull;
20+
21+
public class RemoteStateConfig
22+
{
23+
private long secondsToUnhealthy;
24+
25+
@Config("router.remote-state.cluster-unhealthy-timeout")
26+
@ConfigDescription("The amount of time in seconds that a cluster must remain unresponsive to health checks in order to be deemed \"unhealthy\"")
27+
public RemoteStateConfig setSecondsToUnhealthy(long secondsToUnhealthy)
28+
{
29+
this.secondsToUnhealthy = secondsToUnhealthy;
30+
return this;
31+
}
32+
33+
@NotNull
34+
public long getSecondsToUnhealthy()
35+
{
36+
return this.secondsToUnhealthy;
37+
}
38+
}

0 commit comments

Comments
 (0)