Skip to content

Commit a3966ad

Browse files
agrawalreetikazacw7
authored andcommitted
Add test for async data cache API on node
1 parent 73595be commit a3966ad

File tree

8 files changed

+353
-19
lines changed

8 files changed

+353
-19
lines changed

presto-docs/src/main/sphinx/presto_cpp/features.rst

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,30 @@ Other HTTP endpoints include:
5353

5454
The request/response flow of Presto C++ is identical to Java workers. The tasks or new splits are registered via `TaskUpdateRequest`. Resource utilization and query progress are sent to the coordinator via task endpoints.
5555

56+
* GET: /v1/operation/server/clearCache?type=memory: It clears the memory cache on worker node. Here is an example:
57+
58+
.. sourcecode:: http
59+
60+
curl -X GET "http://localhost:7777/v1/operation/server/clearCache?type=memory"
61+
62+
Cleared memory cache
63+
64+
* GET: /v1/operation/server/clearCache?type=ssd: It clears the ssd cache on worker node. Here is an example:
65+
66+
.. sourcecode:: http
67+
68+
curl -X GET "http://localhost:7777/v1/operation/server/clearCache?type=ssd"
69+
70+
Cleared ssd cache
71+
72+
* GET: /v1/operation/server/writeSsd: It writes data from memory cache to the ssd cache on worker node. Here is an example:
73+
74+
.. sourcecode:: http
75+
76+
curl -X GET "http://localhost:7777/v1/operation/server/writeSsd"
77+
78+
Succeeded write ssd cache
79+
5680
Remote Function Execution
5781
-------------------------
5882

presto-native-execution/src/test/java/com/facebook/presto/nativeworker/HiveExternalWorkerQueryRunner.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public static void main(String[] args)
3434
javaQueryRunner.close();
3535

3636
// Launch distributed runner.
37-
DistributedQueryRunner queryRunner = (DistributedQueryRunner) PrestoNativeQueryRunnerUtils.createQueryRunner(false, false);
37+
DistributedQueryRunner queryRunner = (DistributedQueryRunner) PrestoNativeQueryRunnerUtils.createQueryRunner(false, false, false, false);
3838
Thread.sleep(10);
3939
Logger log = Logger.get(DistributedQueryRunner.class);
4040
log.info("======== SERVER STARTED ========");
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package com.facebook.presto.nativeworker;
15+
16+
import java.io.BufferedReader;
17+
import java.io.InputStreamReader;
18+
import java.net.HttpURLConnection;
19+
import java.net.URL;
20+
import java.util.HashMap;
21+
import java.util.Map;
22+
import java.util.regex.Matcher;
23+
import java.util.regex.Pattern;
24+
25+
public final class NativeApiEndpointUtils
26+
{
27+
private NativeApiEndpointUtils() {}
28+
public static Map<String, Long> fetchScalarLongMetrics(String serverAndPort, String apiEndpoint, String requestMethod) throws Exception
29+
{
30+
String apiUrl = serverAndPort + apiEndpoint;
31+
URL url = new URL(apiUrl);
32+
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
33+
connection.setRequestMethod(requestMethod);
34+
connection.setConnectTimeout(5000);
35+
connection.setReadTimeout(5000);
36+
37+
Map<String, Long> metrics = new HashMap<>();
38+
Pattern metricPattern = Pattern.compile("^(\\w+)(?:\\{.*?\\})?\\s+(\\d+)$");
39+
40+
try (BufferedReader reader = new BufferedReader(new InputStreamReader(connection.getInputStream()))) {
41+
String line;
42+
while ((line = reader.readLine()) != null) {
43+
Matcher matcher = metricPattern.matcher(line);
44+
if (matcher.matches()) {
45+
String metricName = matcher.group(1);
46+
long metricValue = Long.parseLong(matcher.group(2));
47+
metrics.put(metricName, metricValue);
48+
}
49+
}
50+
}
51+
finally {
52+
connection.disconnect();
53+
}
54+
55+
return metrics;
56+
}
57+
58+
public static int sendWorkerRequest(String serverAndPort, String endPoint)
59+
{
60+
try {
61+
URL url = new URL(serverAndPort + endPoint);
62+
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
63+
connection.setRequestMethod("GET");
64+
connection.setRequestProperty("Accept", "application/json");
65+
return connection.getResponseCode();
66+
}
67+
catch (Exception e) {
68+
e.printStackTrace();
69+
return 500;
70+
}
71+
}
72+
}

presto-native-execution/src/test/java/com/facebook/presto/nativeworker/PrestoNativeQueryRunnerUtils.java

Lines changed: 48 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ public class PrestoNativeQueryRunnerUtils
9090

9191
private PrestoNativeQueryRunnerUtils() {}
9292

93-
public static QueryRunner createQueryRunner(boolean addStorageFormatToPath, boolean isCoordinatorSidecarEnabled)
93+
public static QueryRunner createQueryRunner(boolean addStorageFormatToPath, boolean isCoordinatorSidecarEnabled, boolean enableRuntimeMetricsCollection, boolean enableSsdCache)
9494
throws Exception
9595
{
9696
int cacheMaxSize = 4096; // 4GB size cache
@@ -102,7 +102,9 @@ public static QueryRunner createQueryRunner(boolean addStorageFormatToPath, bool
102102
cacheMaxSize,
103103
DEFAULT_STORAGE_FORMAT,
104104
addStorageFormatToPath,
105-
isCoordinatorSidecarEnabled);
105+
isCoordinatorSidecarEnabled,
106+
enableRuntimeMetricsCollection,
107+
enableSsdCache);
106108
}
107109

108110
public static QueryRunner createQueryRunner(
@@ -112,7 +114,9 @@ public static QueryRunner createQueryRunner(
112114
int cacheMaxSize,
113115
String storageFormat,
114116
boolean addStorageFormatToPath,
115-
boolean isCoordinatorSidecarEnabled)
117+
boolean isCoordinatorSidecarEnabled,
118+
boolean enableRuntimeMetricsCollection,
119+
boolean enableSsdCache)
116120
throws Exception
117121
{
118122
QueryRunner defaultQueryRunner = createJavaQueryRunner(dataDirectory, storageFormat, addStorageFormatToPath);
@@ -123,7 +127,8 @@ public static QueryRunner createQueryRunner(
123127

124128
defaultQueryRunner.close();
125129

126-
return createNativeQueryRunner(dataDirectory.get().toString(), prestoServerPath.get(), workerCount, cacheMaxSize, true, Optional.empty(), storageFormat, addStorageFormatToPath, false, isCoordinatorSidecarEnabled, false);
130+
return createNativeQueryRunner(dataDirectory.get().toString(), prestoServerPath.get(), workerCount, cacheMaxSize, true, Optional.empty(),
131+
storageFormat, addStorageFormatToPath, false, isCoordinatorSidecarEnabled, false, enableRuntimeMetricsCollection, enableSsdCache);
127132
}
128133

129134
public static QueryRunner createJavaQueryRunner()
@@ -310,7 +315,7 @@ public static QueryRunner createNativeIcebergQueryRunner(
310315
.setCreateTpchTables(false)
311316
.setAddJmxPlugin(false)
312317
.setNodeCount(OptionalInt.of(workerCount.orElse(4)))
313-
.setExternalWorkerLauncher(getExternalWorkerLauncher("iceberg", prestoServerPath, cacheMaxSize, remoteFunctionServerUds, false, false))
318+
.setExternalWorkerLauncher(getExternalWorkerLauncher("iceberg", prestoServerPath, cacheMaxSize, remoteFunctionServerUds, false, false, false, false))
314319
.setAddStorageFormatToPath(addStorageFormatToPath)
315320
.setDataDirectory(dataDirectory)
316321
.setTpcdsProperties(getNativeWorkerTpcdsProperties())
@@ -328,7 +333,9 @@ public static QueryRunner createNativeQueryRunner(
328333
boolean addStorageFormatToPath,
329334
Boolean failOnNestedLoopJoin,
330335
boolean isCoordinatorSidecarEnabled,
331-
boolean singleNodeExecutionEnabled)
336+
boolean singleNodeExecutionEnabled,
337+
boolean enableRuntimeMetricsCollection,
338+
boolean enableSsdCache)
332339
throws Exception
333340
{
334341
// The property "hive.allow-drop-table" needs to be set to true because security is always "legacy" in NativeQueryRunner.
@@ -358,7 +365,8 @@ public static QueryRunner createNativeQueryRunner(
358365
hiveProperties,
359366
workerCount,
360367
Optional.of(Paths.get(addStorageFormatToPath ? dataDirectory + "/" + storageFormat : dataDirectory)),
361-
getExternalWorkerLauncher("hive", prestoServerPath, cacheMaxSize, remoteFunctionServerUds, failOnNestedLoopJoin, isCoordinatorSidecarEnabled),
368+
getExternalWorkerLauncher("hive", prestoServerPath, cacheMaxSize, remoteFunctionServerUds, failOnNestedLoopJoin,
369+
isCoordinatorSidecarEnabled, enableRuntimeMetricsCollection, enableSsdCache),
362370
getNativeWorkerTpcdsProperties());
363371
}
364372

@@ -402,14 +410,14 @@ public static QueryRunner createNativeCteQueryRunner(boolean useThrift, String s
402410
hiveProperties,
403411
workerCount,
404412
Optional.of(Paths.get(addStorageFormatToPath ? dataDirectory + "/" + storageFormat : dataDirectory)),
405-
getExternalWorkerLauncher("hive", prestoServerPath, cacheMaxSize, Optional.empty(), false, false),
413+
getExternalWorkerLauncher("hive", prestoServerPath, cacheMaxSize, Optional.empty(), false, false, false, false),
406414
getNativeWorkerTpcdsProperties());
407415
}
408416

409417
public static QueryRunner createNativeQueryRunner(String remoteFunctionServerUds)
410418
throws Exception
411419
{
412-
return createNativeQueryRunner(false, DEFAULT_STORAGE_FORMAT, Optional.ofNullable(remoteFunctionServerUds), false, false, false);
420+
return createNativeQueryRunner(false, DEFAULT_STORAGE_FORMAT, Optional.ofNullable(remoteFunctionServerUds), false, false, false, false, false);
413421
}
414422

415423
public static QueryRunner createNativeQueryRunner(boolean useThrift)
@@ -421,13 +429,13 @@ public static QueryRunner createNativeQueryRunner(boolean useThrift)
421429
public static QueryRunner createNativeQueryRunner(boolean useThrift, boolean failOnNestedLoopJoin)
422430
throws Exception
423431
{
424-
return createNativeQueryRunner(useThrift, DEFAULT_STORAGE_FORMAT, Optional.empty(), failOnNestedLoopJoin, false, false);
432+
return createNativeQueryRunner(useThrift, DEFAULT_STORAGE_FORMAT, Optional.empty(), failOnNestedLoopJoin, false, false, false, false);
425433
}
426434

427435
public static QueryRunner createNativeQueryRunner(boolean useThrift, String storageFormat)
428436
throws Exception
429437
{
430-
return createNativeQueryRunner(useThrift, storageFormat, Optional.empty(), false, false, false);
438+
return createNativeQueryRunner(useThrift, storageFormat, Optional.empty(), false, false, false, false, false);
431439
}
432440

433441
public static QueryRunner createNativeQueryRunner(
@@ -436,7 +444,9 @@ public static QueryRunner createNativeQueryRunner(
436444
Optional<String> remoteFunctionServerUds,
437445
Boolean failOnNestedLoopJoin,
438446
boolean isCoordinatorSidecarEnabled,
439-
boolean singleNodeExecutionEnabled)
447+
boolean singleNodeExecutionEnabled,
448+
boolean enableRuntimeMetricsCollection,
449+
boolean enableSSDCache)
440450
throws Exception
441451
{
442452
int cacheMaxSize = 0;
@@ -452,7 +462,9 @@ public static QueryRunner createNativeQueryRunner(
452462
true,
453463
failOnNestedLoopJoin,
454464
isCoordinatorSidecarEnabled,
455-
singleNodeExecutionEnabled);
465+
singleNodeExecutionEnabled,
466+
enableRuntimeMetricsCollection,
467+
enableSSDCache);
456468
}
457469

458470
// Start the remote function server. Return the UDS path used to communicate with it.
@@ -499,7 +511,15 @@ public static NativeQueryRunnerParameters getNativeQueryRunnerParameters()
499511
return new NativeQueryRunnerParameters(prestoServerPath, dataDirectory, workerCount);
500512
}
501513

502-
public static Optional<BiFunction<Integer, URI, Process>> getExternalWorkerLauncher(String catalogName, String prestoServerPath, int cacheMaxSize, Optional<String> remoteFunctionServerUds, Boolean failOnNestedLoopJoin, boolean isCoordinatorSidecarEnabled)
514+
public static Optional<BiFunction<Integer, URI, Process>> getExternalWorkerLauncher(
515+
String catalogName,
516+
String prestoServerPath,
517+
int cacheMaxSize,
518+
Optional<String> remoteFunctionServerUds,
519+
Boolean failOnNestedLoopJoin,
520+
boolean isCoordinatorSidecarEnabled,
521+
boolean enableRuntimeMetricsCollection,
522+
boolean enableSsdCache)
503523
{
504524
return
505525
Optional.of((workerIndex, discoveryUri) -> {
@@ -521,6 +541,19 @@ public static Optional<BiFunction<Integer, URI, Process>> getExternalWorkerLaunc
521541
"presto.default-namespace=native.default%n", configProperties);
522542
}
523543

544+
if (enableRuntimeMetricsCollection) {
545+
configProperties = format("%s%n" +
546+
"runtime-metrics-collection-enabled=true%n", configProperties);
547+
}
548+
549+
if (enableSsdCache) {
550+
Path ssdCacheDir = Paths.get(tempDirectoryPath + "/velox-ssd-cache");
551+
Files.createDirectories(ssdCacheDir);
552+
configProperties = format("%s%n" +
553+
"async-cache-ssd-gb=1%n" +
554+
"async-cache-ssd-path=%s/%n", configProperties, ssdCacheDir);
555+
}
556+
524557
if (remoteFunctionServerUds.isPresent()) {
525558
String jsonSignaturesPath = Resources.getResource(REMOTE_FUNCTION_JSON_SIGNATURES).getFile();
526559
configProperties = format("%s%n" +
@@ -564,7 +597,7 @@ public static Optional<BiFunction<Integer, URI, Process>> getExternalWorkerLaunc
564597
format("connector.name=tpch%n").getBytes());
565598

566599
// Disable stack trace capturing as some queries (using TRY) generate a lot of exceptions.
567-
return new ProcessBuilder(prestoServerPath, "--logtostderr=1", "--v=1")
600+
return new ProcessBuilder(prestoServerPath, "--logtostderr=1", "--v=1", "--velox_ssd_odirect=false")
568601
.directory(tempDirectoryPath.toFile())
569602
.redirectErrorStream(true)
570603
.redirectOutput(ProcessBuilder.Redirect.to(tempDirectoryPath.resolve("worker." + workerIndex + ".out").toFile()))

0 commit comments

Comments
 (0)