Skip to content

Commit

Permalink
clean files
Browse files Browse the repository at this point in the history
  • Loading branch information
auden-woolfson committed Feb 3, 2025
1 parent ccaa707 commit 0d7cafe
Show file tree
Hide file tree
Showing 22 changed files with 3,186 additions and 4,288 deletions.
6 changes: 6 additions & 0 deletions presto-hive-hadoop2/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,12 @@
<artifactId>presto-hive</artifactId>
<type>test-jar</type>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
Expand Down
10 changes: 10 additions & 0 deletions presto-hudi/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,16 @@
<artifactId>presto-hive</artifactId>
<type>test-jar</type>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
Expand Down
1 change: 1 addition & 0 deletions presto-password-authenticators/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
<dependency>
<groupId>com.google.inject</groupId>
<artifactId>guice</artifactId>
<version>5.1.0</version>
</dependency>

<dependency>
Expand Down
16 changes: 10 additions & 6 deletions presto-router/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@
<artifactId>http-server</artifactId>
</dependency>

<dependency>
<groupId>com.facebook.airlift</groupId>
<artifactId>stats</artifactId>
</dependency>

<dependency>
<groupId>com.facebook.airlift</groupId>
<artifactId>jaxrs</artifactId>
Expand Down Expand Up @@ -169,12 +174,6 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-main</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-tpch</artifactId>
Expand All @@ -192,5 +191,10 @@
<artifactId>mockwebserver</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-main</artifactId>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -30,22 +30,18 @@
import com.facebook.presto.server.PluginManagerConfig;
import com.facebook.presto.server.ServerConfig;
import com.facebook.presto.server.WebUiResource;
import com.facebook.presto.server.security.oauth2.ForOAuth2;
import com.google.inject.Binder;
import com.google.inject.Scopes;
import io.airlift.units.Duration;

import java.lang.annotation.Annotation;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;

import static com.facebook.airlift.concurrent.Threads.daemonThreadsNamed;
import static com.facebook.airlift.concurrent.Threads.threadsNamed;
import static com.facebook.airlift.configuration.ConfigBinder.configBinder;
import static com.facebook.airlift.http.client.HttpClientBinder.httpClientBinder;
import static com.facebook.airlift.http.server.HttpServerBinder.httpServerBinder;
import static com.facebook.airlift.jaxrs.JaxrsBinder.jaxrsBinder;
import static java.util.concurrent.Executors.newCachedThreadPool;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
import static java.util.concurrent.TimeUnit.SECONDS;

Expand All @@ -71,7 +67,7 @@ protected void setup(Binder binder)
configBinder(binder).bindConfig(RouterConfig.class);

configBinder(binder).bindConfig(RemoteStateConfig.class);
configBinder(binder).bindConfigDefaults(RemoteStateConfig.class, config -> config.setSecondsToUnhealthy(60));
configBinder(binder).bindConfigDefaults(RemoteStateConfig.class, config -> config.setSecondsToUnhealthy(30));

// resource for serving static content
jaxrsBinder(binder).bind(WebUiResource.class);
Expand All @@ -93,17 +89,13 @@ protected void setup(Binder binder)
binder.bind(PredictorManager.class).in(Scopes.SINGLETON);
binder.bind(RemoteQueryFactory.class).in(Scopes.SINGLETON);

binder.bind(RouterPluginManager.class).in(Scopes.SINGLETON);
configBinder(binder).bindConfig(PluginManagerConfig.class);

bindHttpClient(binder, QUERY_PREDICTOR, ForQueryCpuPredictor.class, IDLE_TIMEOUT_SECOND, PREDICTOR_REQUEST_TIMEOUT_SECOND);
bindHttpClient(binder, QUERY_PREDICTOR, ForQueryMemoryPredictor.class, IDLE_TIMEOUT_SECOND, PREDICTOR_REQUEST_TIMEOUT_SECOND);

jaxrsBinder(binder).bind(RouterResource.class);
jaxrsBinder(binder).bind(ClusterStatusResource.class);

//binder.bind(Executor.class).annotatedWith(ForOAuth2.class).toInstance(executor);
binder.bind(ExecutorService.class).annotatedWith(ForOAuth2.class).toInstance(newCachedThreadPool(daemonThreadsNamed("oauth-executor-%s")));
}

private void bindHttpClient(Binder binder, String name, Class<? extends Annotation> annotation, int idleTimeout, int requestTimeout)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@
package com.facebook.presto.router;

import com.facebook.airlift.log.Logger;
import com.facebook.airlift.stats.CounterStat;
import com.facebook.presto.router.cluster.ClusterManager;
import com.facebook.presto.router.cluster.RequestInfo;
import com.google.inject.Inject;
import org.weakref.jmx.Managed;
import org.weakref.jmx.Nested;

import javax.annotation.security.PermitAll;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
Expand All @@ -35,12 +37,13 @@
import static javax.ws.rs.core.MediaType.TEXT_PLAIN_TYPE;
import static javax.ws.rs.core.Response.Status.BAD_GATEWAY;

@Path("/")
@PermitAll
@Path("/v1")
public class RouterResource
{
private static final Logger log = Logger.get(RouterResource.class);
private final ClusterManager clusterManager;
private static final CounterStat successRedirectRequests = new CounterStat();
private static final CounterStat failedRedirectRequests = new CounterStat();

@Inject
public RouterResource(ClusterManager clusterManager)
Expand All @@ -49,23 +52,39 @@ public RouterResource(ClusterManager clusterManager)
}

@POST
@Path("/v1/statement")
@Path("statement")
@Produces(APPLICATION_JSON)
public Response routeQuery(String statement, @Context HttpServletRequest servletRequest)
{
RequestInfo requestInfo = new RequestInfo(servletRequest, statement);
URI coordinatorUri = clusterManager.getDestination(requestInfo).orElseThrow(() -> badRequest(BAD_GATEWAY, "No Presto cluster available"));
URI statementUri = uriBuilderFrom(coordinatorUri).replacePath("/v1/statement").build();
successRedirectRequests.update(1);
log.info("route query to %s", statementUri);
return Response.temporaryRedirect(statementUri).build();
}

private static WebApplicationException badRequest(Response.Status status, String message)
{
failedRedirectRequests.update(1);
throw new WebApplicationException(
Response.status(status)
.type(TEXT_PLAIN_TYPE)
.entity(message)
.build());
}

@Managed
@Nested
public CounterStat getFailedRedirectRequests()
{
return failedRedirectRequests;
}

@Managed
@Nested
public CounterStat getSuccessRedirectRequests()
{
return successRedirectRequests;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package com.facebook.presto.router.cluster;

import com.facebook.airlift.log.Logger;
import com.facebook.presto.router.RouterConfig;
import com.facebook.presto.router.scheduler.Scheduler;
import com.facebook.presto.router.scheduler.SchedulerFactory;
Expand All @@ -24,8 +25,10 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;

import javax.annotation.PostConstruct;
import javax.inject.Inject;

import java.io.File;
import java.net.URI;
import java.util.HashMap;
import java.util.List;
Expand All @@ -39,6 +42,7 @@

import static com.facebook.presto.router.RouterUtil.parseRouterConfig;
import static com.facebook.presto.router.scheduler.SchedulerType.WEIGHTED_RANDOM_CHOICE;
import static com.facebook.presto.router.scheduler.SchedulerType.WEIGHTED_ROUND_ROBIN;
import static com.facebook.presto.spi.StandardErrorCode.CONFIGURATION_INVALID;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableList.toImmutableList;
Expand All @@ -47,15 +51,17 @@

public class ClusterManager
{
private final Map<String, GroupSpec> groups;
private final List<SelectorRuleSpec> groupSelectors;
private final SchedulerType schedulerType;
private final Scheduler scheduler;
private final HashMap<String, HashMap<URI, Integer>> serverWeights = new HashMap<>();
private Map<String, GroupSpec> groups;
private List<SelectorRuleSpec> groupSelectors;
private SchedulerType schedulerType;
private Scheduler scheduler;
private HashMap<String, HashMap<URI, Integer>> serverWeights = new HashMap<>();
private HashMap<URI, URI> discoveryURIs = new HashMap<>();
private final RouterConfig routerConfig;
private final ScheduledExecutorService scheduledExecutorService;
private final AtomicLong lastConfigUpdate = new AtomicLong();
private final RemoteInfoFactory remoteInfoFactory;
private final Logger log = Logger.get(ClusterManager.class);

// Cluster status
private final ConcurrentHashMap<URI, RemoteClusterInfo> remoteClusterInfos = new ConcurrentHashMap<>();
Expand All @@ -68,13 +74,20 @@ public ClusterManager(RouterConfig config, @ForClusterManager ScheduledExecutorS
this.scheduledExecutorService = scheduledExecutorService;
RouterSpec routerSpec = parseRouterConfig(config)
.orElseThrow(() -> new PrestoException(CONFIGURATION_INVALID, "Failed to load router config"));

this.groups = ImmutableMap.copyOf(routerSpec.getGroups().stream().collect(toMap(GroupSpec::getName, group -> group)));
this.groupSelectors = ImmutableList.copyOf(routerSpec.getSelectors());
this.schedulerType = routerSpec.getSchedulerType();
this.scheduler = new SchedulerFactory(routerSpec.getSchedulerType()).create();
this.remoteInfoFactory = requireNonNull(remoteInfoFactory, "remoteInfoFactory is null");
this.initializeServerWeights();
this.initializeMembersDiscoveryURI();
List<URI> allClusters = getAllClusters();
allClusters.forEach(uri -> {
log.info("Attaching cluster %s to the router", uri.getHost());
remoteClusterInfos.put(uri, remoteInfoFactory.createRemoteClusterInfo(discoveryURIs.get(uri)));
remoteQueryInfos.put(uri, remoteInfoFactory.createRemoteQueryInfo(discoveryURIs.get(uri)));
log.info("Successfully attached cluster %s to the router. Queries will be routed to cluster after successful health check", uri.getHost());
});
}

@PostConstruct
Expand All @@ -92,13 +105,28 @@ public void startConfigReloadTask()
this.schedulerType = routerSpec.getSchedulerType();
this.scheduler = new SchedulerFactory(routerSpec.getSchedulerType()).create();
this.initializeServerWeights();
this.initializeMembersDiscoveryURI();
List<URI> allClusters = getAllClusters();
allClusters.forEach(uri -> {
if (!remoteClusterInfos.containsKey(uri)) {
log.info("Attaching cluster %s to the router", uri.getHost());
remoteClusterInfos.put(uri, remoteInfoFactory.createRemoteClusterInfo(discoveryURIs.get(uri)));
remoteQueryInfos.put(uri, remoteInfoFactory.createRemoteQueryInfo(discoveryURIs.get(uri)));
log.info("Successfully attached cluster %s to the router. Queries will be routed to cluster after successful health check", uri.getHost());
}
});
for (URI uri : remoteClusterInfos.keySet()) {
if (!allClusters.contains(uri)) {
log.info("Removing cluster %s from the router", uri.getHost());
remoteClusterInfos.remove(uri);
remoteQueryInfos.remove(uri);
discoveryURIs.remove(uri);
log.info("Successfully removed cluster %s from the router", uri.getHost());
}
}
lastConfigUpdate.set(newConfigUpdateTime);
}
}, 0L, (long) 30, TimeUnit.SECONDS);
getAllClusters().forEach(uri -> {
remoteClusterInfos.put(uri, remoteInfoFactory.createRemoteClusterInfo(uri));
remoteQueryInfos.put(uri, remoteInfoFactory.createRemoteQueryInfo(uri));
});
}, 0L, (long) 5, TimeUnit.SECONDS);
}

public List<URI> getAllClusters()
Expand All @@ -123,14 +151,14 @@ public Optional<URI> getDestination(RequestInfo requestInfo)
.collect(Collectors.toList());

if (healthyClusterURIs.isEmpty()) {
log.info("Healthy cluster not found!");
return Optional.empty();
}

scheduler.setCandidates(healthyClusterURIs);
if (schedulerType == WEIGHTED_RANDOM_CHOICE) {
if (schedulerType == WEIGHTED_RANDOM_CHOICE || schedulerType == WEIGHTED_ROUND_ROBIN) {
scheduler.setWeights(serverWeights.get(groupSpec.getName()));
}

return scheduler.getDestination(requestInfo.getUser());
}

Expand All @@ -155,6 +183,17 @@ private void initializeServerWeights()
});
}

private void initializeMembersDiscoveryURI()
{
groups.forEach((name, groupSpec) -> {
List<URI> members = groupSpec.getMembers();
List<URI> membersDiscoveryURI = groupSpec.getMembersDiscoveryURI();
for (int i = 0; i < members.size(); i++) {
discoveryURIs.put(members.get(i), membersDiscoveryURI.get(i));
}
});
}

public ConcurrentHashMap<URI, RemoteClusterInfo> getRemoteClusterInfos()
{
return remoteClusterInfos;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import static java.util.Objects.requireNonNull;
import static javax.ws.rs.core.MediaType.APPLICATION_JSON;

@Path("/")
@Path("/v1")
public class ClusterStatusResource
{
private final String environment;
Expand All @@ -51,15 +51,15 @@ public ClusterStatusResource(NodeInfo nodeInfo, ClusterStatusTracker clusterStat

// The web UI depend on the following service endpoints.
@GET
@Path("/v1/info")
@Path("info")
@Produces(APPLICATION_JSON)
public ServerInfo getInfo()
{
return new ServerInfo(UNKNOWN, environment, true, false, Optional.empty());
}

@GET
@Path("/v1/cluster")
@Path("cluster")
@Produces(APPLICATION_JSON)
public ClusterStats getClusterStats()
{
Expand All @@ -73,14 +73,14 @@ public ClusterStats getClusterStats()
}

@GET
@Path("/v1/query")
@Path("query")
public List<JsonNode> getAllQueryInfo(@QueryParam("state") String stateFilter)
{
return clusterStatusTracker.getAllQueryInfos();
}

@GET
@Path("/v1/all")
@Path("all")
@Produces(APPLICATION_JSON)
public List<URI> getAllClusters()
{
Expand Down
Loading

0 comments on commit 0d7cafe

Please sign in to comment.