Skip to content

Commit 450517e

Browse files
committed
[docker] Add fast client support to Docker quickstart
Add D2-based Venice fast client to the Docker quickstart, enabling direct data reads to storage servers bypassing the router. Changes: - Add D2ConfigUtils: production-ready D2 setup utility extracted from test code (setupD2Config, createD2Server, getD2Servers) - Add D2 self-announcement to RouterServer.main() for cluster discovery services, gated by router.d2.announce.enabled (default false) - Add D2 self-announcement to VeniceServer.run() for server D2 service, gated by server.d2.announce.enabled (default false) - Add FastClientQueryTool: CLI for fast-client reads using D2, with optional --insecure flag for trust-all SSL - Add shadow JAR config to venice-client build.gradle - Fix ServerReadMetadataRepository to respect server SSL config when generating instance URLs (was hardcoded to HTTPS) - Add HelixUtils.instanceIdToUrl(instanceId, https) overload - Add fast-client-fetch.sh wrapper script and Docker packaging - Update quickstart docs with fast client usage (Option B) - Add unit tests for HelixUtils, FastClientQueryTool, and ServerReadMetadataRepository SSL-aware URL generation
1 parent 90250e9 commit 450517e

File tree

18 files changed

+679
-12
lines changed

18 files changed

+679
-12
lines changed

build.gradle

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -760,6 +760,8 @@ ext.createDiffFile = { ->
760760

761761
// venice-client
762762
':!clients/venice-client/src/main/java/com/linkedin/venice/fastclient/factory/ClientFactory.java',
763+
// CLI tool requiring real D2/ZK infrastructure to test
764+
':!clients/venice-client/src/main/java/com/linkedin/venice/fastclient/FastClientQueryTool.java',
763765
// unit test for gRPC Transport Client is not straightforward, adding to exclusion list for now
764766
':!clients/venice-client/src/main/java/com/linkedin/venice/fastclient/transport/GrpcTransportClient.java',
765767
// unit test for deprecated DispatchingVsonStoreClient is not meaningful since most logic is in its parent class
@@ -771,6 +773,8 @@ ext.createDiffFile = { ->
771773

772774
// venice-common
773775
':!internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerClient.java',
776+
// D2 utility requiring real ZooKeeper to test
777+
':!internal/venice-common/src/main/java/com/linkedin/venice/d2/D2ConfigUtils.java',
774778
':!internal/venice-common/src/main/java/com/linkedin/venice/acl/handler/StoreAclHandler.java',
775779

776780
// venice-client-common

clients/venice-client/build.gradle

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
plugins {
2+
id 'com.github.johnrengelman.shadow'
3+
}
4+
15
dependencies {
26
// For helix-based metadata impl, and this will be removed before onboarding any customers.
37
implementation project(':internal:venice-common')
@@ -35,6 +39,24 @@ dependencies {
3539
testImplementation libraries.openTelemetryTestSdk
3640
}
3741

42+
shadowJar {
43+
mergeServiceFiles()
44+
}
45+
46+
artifacts {
47+
archives shadowJar
48+
}
49+
50+
jar {
51+
manifest {
52+
attributes = [
53+
'Implementation-Title': 'Venice Fast Client',
54+
'Implementation-Version': project.version,
55+
'Main-Class': 'com.linkedin.venice.fastclient.FastClientQueryTool'
56+
]
57+
}
58+
}
59+
3860
ext {
3961
jacocoCoverageThreshold = 0.53
4062
}
@@ -44,4 +66,4 @@ checkerFramework {
4466
checkers = ['org.checkerframework.checker.nullness.NullnessChecker']
4567
skipCheckerFramework = true
4668
excludeTests = true
47-
}
69+
}
Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
package com.linkedin.venice.fastclient;
2+
3+
import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
4+
import com.linkedin.d2.balancer.D2Client;
5+
import com.linkedin.d2.balancer.D2ClientBuilder;
6+
import com.linkedin.r2.transport.common.Client;
7+
import com.linkedin.r2.transport.common.TransportClientFactory;
8+
import com.linkedin.r2.transport.common.bridge.client.TransportClientAdapter;
9+
import com.linkedin.r2.transport.http.client.HttpClientFactory;
10+
import com.linkedin.venice.D2.D2ClientUtils;
11+
import com.linkedin.venice.client.store.AvroGenericStoreClient;
12+
import com.linkedin.venice.exceptions.VeniceException;
13+
import com.linkedin.venice.fastclient.factory.ClientFactory;
14+
import com.linkedin.venice.fastclient.meta.StoreMetadataFetchMode;
15+
import java.io.ByteArrayInputStream;
16+
import java.io.IOException;
17+
import java.nio.charset.StandardCharsets;
18+
import java.security.SecureRandom;
19+
import java.security.cert.X509Certificate;
20+
import java.util.Collections;
21+
import java.util.HashMap;
22+
import java.util.Map;
23+
import java.util.concurrent.TimeUnit;
24+
import javax.net.ssl.SSLContext;
25+
import javax.net.ssl.SSLParameters;
26+
import javax.net.ssl.TrustManager;
27+
import javax.net.ssl.X509TrustManager;
28+
import org.apache.avro.Schema;
29+
import org.apache.avro.generic.GenericDatumReader;
30+
31+
32+
/**
33+
* A CLI tool to query values from a Venice store using the fast client with D2 service discovery.
34+
*
35+
* Usage: java -jar venice-client-all.jar {@literal <store_name>} {@literal <key>} {@literal <zk_address>} [--insecure]
36+
*
37+
* The optional --insecure flag enables a trust-all SSL context for environments where HTTPS
38+
* is used with self-signed or untrusted certificates (e.g., Docker quickstart).
39+
* Without this flag, the tool uses plain HTTP transport only.
40+
*/
41+
public class FastClientQueryTool {
42+
public static void main(String[] args) throws Exception {
43+
if (args.length < 3) {
44+
System.out.println("Usage: java -jar venice-client-all.jar <store_name> <key> <zk_address> [--insecure]");
45+
System.exit(1);
46+
}
47+
48+
String storeName = args[0];
49+
String keyString = args[1];
50+
String zkAddress = args[2];
51+
boolean insecure = args.length > 3 && "--insecure".equals(args[3]);
52+
53+
TransportClientFactory httpTransport = new HttpClientFactory.Builder().setUsePipelineV2(true).build();
54+
D2ClientBuilder d2ClientBuilder = new D2ClientBuilder().setZkHosts(zkAddress)
55+
.setZkSessionTimeout(5000, TimeUnit.MILLISECONDS)
56+
.setZkStartupTimeout(5000, TimeUnit.MILLISECONDS)
57+
.setLbWaitTimeout(5000, TimeUnit.MILLISECONDS)
58+
.setBasePath("/d2");
59+
60+
Map<String, Object> r2Properties;
61+
62+
if (insecure) {
63+
// Trust-all SSLContext for environments with self-signed/untrusted certs (non-production use)
64+
TrustManager[] trustAllCerts = new TrustManager[] { new X509TrustManager() {
65+
public X509Certificate[] getAcceptedIssuers() {
66+
return new X509Certificate[0];
67+
}
68+
69+
public void checkClientTrusted(X509Certificate[] certs, String authType) {
70+
}
71+
72+
public void checkServerTrusted(X509Certificate[] certs, String authType) {
73+
}
74+
} };
75+
SSLContext sslContext = SSLContext.getInstance("TLS");
76+
sslContext.init(null, trustAllCerts, new SecureRandom());
77+
SSLParameters sslParameters = sslContext.getDefaultSSLParameters();
78+
79+
Map<String, TransportClientFactory> transportClients = new HashMap<>();
80+
transportClients.put("http", httpTransport);
81+
transportClients.put("https", httpTransport);
82+
83+
d2ClientBuilder.setSSLContext(sslContext)
84+
.setSSLParameters(sslParameters)
85+
.setIsSSLEnabled(true)
86+
.setClientFactories(transportClients);
87+
88+
r2Properties = new HashMap<>();
89+
r2Properties.put(HttpClientFactory.HTTP_SSL_CONTEXT, sslContext);
90+
r2Properties.put(HttpClientFactory.HTTP_SSL_PARAMS, sslParameters);
91+
} else {
92+
r2Properties = Collections.emptyMap();
93+
}
94+
95+
D2Client d2Client = d2ClientBuilder.build();
96+
D2ClientUtils.startClient(d2Client);
97+
98+
Client r2Client = new TransportClientAdapter(httpTransport.getClient(r2Properties));
99+
100+
// Build fast client config
101+
ClientConfig clientConfig = new ClientConfig.ClientConfigBuilder<>().setStoreName(storeName)
102+
.setR2Client(r2Client)
103+
.setD2Client(d2Client)
104+
.setClusterDiscoveryD2Service("venice-discovery")
105+
.setStoreMetadataFetchMode(StoreMetadataFetchMode.SERVER_BASED_METADATA)
106+
.setMetadataRefreshIntervalInSeconds(1)
107+
.build();
108+
109+
try (AvroGenericStoreClient<Object, Object> client = ClientFactory.getAndStartGenericStoreClient(clientConfig)) {
110+
// Poll until metadata (key schema) is available
111+
Schema keySchema = null;
112+
Exception lastException = null;
113+
long deadline = System.currentTimeMillis() + 30_000;
114+
while (keySchema == null) {
115+
if (System.currentTimeMillis() > deadline) {
116+
String message = "Timed out waiting for metadata to be fetched for store: " + storeName;
117+
if (lastException != null) {
118+
throw new VeniceException(message + ". Last error: " + lastException.getMessage(), lastException);
119+
}
120+
throw new VeniceException(message);
121+
}
122+
try {
123+
keySchema = client.getKeySchema();
124+
} catch (Exception e) {
125+
// Metadata not yet available, record and retry until timeout
126+
lastException = e;
127+
keySchema = null;
128+
}
129+
if (keySchema == null) {
130+
Thread.sleep(200);
131+
}
132+
}
133+
134+
Object key = convertKey(keyString, keySchema);
135+
136+
Object value = client.get(key).get(15, TimeUnit.SECONDS);
137+
138+
System.out.println("key-class=" + key.getClass().getCanonicalName());
139+
System.out.println("value-class=" + (value == null ? "null" : value.getClass().getCanonicalName()));
140+
System.out.println("key=" + keyString);
141+
System.out.println("value=" + (value == null ? "null" : value.toString()));
142+
} finally {
143+
D2ClientUtils.shutdownClient(d2Client);
144+
}
145+
}
146+
147+
static Object convertKey(String keyString, Schema keySchema) {
148+
Object key;
149+
switch (keySchema.getType()) {
150+
case INT:
151+
key = Integer.parseInt(keyString);
152+
break;
153+
case LONG:
154+
key = Long.parseLong(keyString);
155+
break;
156+
case FLOAT:
157+
key = Float.parseFloat(keyString);
158+
break;
159+
case DOUBLE:
160+
key = Double.parseDouble(keyString);
161+
break;
162+
case BOOLEAN:
163+
key = Boolean.parseBoolean(keyString);
164+
break;
165+
case STRING:
166+
key = keyString;
167+
break;
168+
default:
169+
try {
170+
key = new GenericDatumReader<>(keySchema, keySchema).read(
171+
null,
172+
AvroCompatibilityHelper
173+
.newJsonDecoder(keySchema, new ByteArrayInputStream(keyString.getBytes(StandardCharsets.UTF_8))));
174+
} catch (IOException e) {
175+
throw new VeniceException("Invalid input key: " + keyString, e);
176+
}
177+
break;
178+
}
179+
return key;
180+
}
181+
}
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
package com.linkedin.venice.fastclient;
2+
3+
import static org.testng.Assert.assertEquals;
4+
import static org.testng.Assert.assertTrue;
5+
6+
import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
7+
import com.linkedin.venice.exceptions.VeniceException;
8+
import org.apache.avro.Schema;
9+
import org.testng.annotations.Test;
10+
11+
12+
public class FastClientQueryToolTest {
13+
@Test
14+
public void testConvertKeyString() {
15+
Schema schema = Schema.create(Schema.Type.STRING);
16+
Object key = FastClientQueryTool.convertKey("hello", schema);
17+
assertEquals(key, "hello");
18+
}
19+
20+
@Test
21+
public void testConvertKeyInt() {
22+
Schema schema = Schema.create(Schema.Type.INT);
23+
Object key = FastClientQueryTool.convertKey("42", schema);
24+
assertEquals(key, 42);
25+
}
26+
27+
@Test
28+
public void testConvertKeyLong() {
29+
Schema schema = Schema.create(Schema.Type.LONG);
30+
Object key = FastClientQueryTool.convertKey("123456789", schema);
31+
assertEquals(key, 123456789L);
32+
}
33+
34+
@Test
35+
public void testConvertKeyFloat() {
36+
Schema schema = Schema.create(Schema.Type.FLOAT);
37+
Object key = FastClientQueryTool.convertKey("1.5", schema);
38+
assertEquals(key, 1.5f);
39+
}
40+
41+
@Test
42+
public void testConvertKeyDouble() {
43+
Schema schema = Schema.create(Schema.Type.DOUBLE);
44+
Object key = FastClientQueryTool.convertKey("1.5", schema);
45+
assertEquals(key, 1.5);
46+
}
47+
48+
@Test
49+
public void testConvertKeyBoolean() {
50+
Schema schema = Schema.create(Schema.Type.BOOLEAN);
51+
Object key = FastClientQueryTool.convertKey("true", schema);
52+
assertEquals(key, true);
53+
54+
key = FastClientQueryTool.convertKey("false", schema);
55+
assertEquals(key, false);
56+
}
57+
58+
@Test(expectedExceptions = NumberFormatException.class)
59+
public void testConvertKeyIntWithInvalidInput() {
60+
Schema schema = Schema.create(Schema.Type.INT);
61+
FastClientQueryTool.convertKey("not_a_number", schema);
62+
}
63+
64+
@Test(expectedExceptions = VeniceException.class)
65+
public void testConvertKeyComplexSchemaWithInvalidJson() {
66+
Schema schema = Schema.createRecord("TestRecord", null, "test", false);
67+
schema.setFields(
68+
java.util.Collections.singletonList(
69+
AvroCompatibilityHelper.createSchemaField("field1", Schema.create(Schema.Type.STRING), null, null)));
70+
FastClientQueryTool.convertKey("not_valid_json", schema);
71+
}
72+
73+
@Test
74+
public void testConvertKeyComplexSchemaWithValidJson() {
75+
Schema schema = Schema.createRecord("TestRecord", null, "test", false);
76+
schema.setFields(
77+
java.util.Collections.singletonList(
78+
AvroCompatibilityHelper.createSchemaField("field1", Schema.create(Schema.Type.STRING), null, null)));
79+
Object key = FastClientQueryTool.convertKey("{\"field1\": \"value1\"}", schema);
80+
assertTrue(key.toString().contains("value1"));
81+
}
82+
}

docker/build-venice-docker-images.sh

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ version=$oss_release
1616
cp *py venice-client/
1717
cp ../clients/venice-push-job/build/libs/venice-push-job-all.jar venice-client/
1818
cp ../clients/venice-thin-client/build/libs/venice-thin-client-all.jar venice-client/
19+
cp ../clients/venice-client/build/libs/venice-client-all.jar venice-client/
1920
cp ../clients/venice-admin-tool/build/libs/venice-admin-tool-all.jar venice-client/
2021
cp *py venice-client-jupyter/
2122
cp ../clients/venice-push-job/build/libs/venice-push-job-all.jar venice-client-jupyter/
@@ -51,6 +52,7 @@ done
5152

5253
rm -f venice-client/venice-push-job-all.jar
5354
rm -f venice-client/venice-thin-client-all.jar
55+
rm -f venice-client/venice-client-all.jar
5456
rm -f venice-client/venice-admin-tool-all.jar
5557
rm -f venice-client-jupyter/venice-push-job-all.jar
5658
rm -f venice-client-jupyter/venice-thin-client-all.jar

docker/venice-client/Dockerfile

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,12 @@ WORKDIR ${VENICE_DIR}
1515

1616
COPY venice-push-job-all.jar bin/venice-push-job-all.jar
1717
COPY venice-thin-client-all.jar bin/venice-thin-client-all.jar
18+
COPY venice-client-all.jar bin/venice-client-all.jar
1819
COPY venice-admin-tool-all.jar bin/venice-admin-tool-all.jar
1920
COPY sample-data sample-data
2021
COPY run-vpj.sh .
2122
COPY fetch.sh .
23+
COPY fast-client-fetch.sh .
2224
COPY create-store.sh .
2325
COPY avro-to-json.sh .
2426
RUN chmod +x *.sh
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
#!/bin/bash
2+
3+
if [ "$#" -lt 2 ]; then
4+
echo "Usage: $0 <storeName> <key>" >&2
5+
exit 1
6+
fi
7+
8+
storeName=$1
9+
key=$2
10+
java -jar /opt/venice/bin/venice-client-all.jar "$storeName" "$key" zookeeper:2181 2>/dev/null

docker/venice-controller/single-dc-configs/controller.properties

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ ssl.to.kakfa=false
1010
controller.parent.mode=false
1111
controller.system.schema.cluster.name=venice-cluster0
1212
cluster.to.d2=venice-cluster0:venice-discovery
13+
cluster.to.server.d2=venice-cluster0:venice-server-d2
1314
delay.to.rebalance.ms=0
1415
offline.job.start.timeout.ms=60000
1516
topic.cleanup.sleep.interval.between.topic.list.fetch.ms=30000

docker/venice-router/single-dc-configs/router.properties

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,17 @@ router.connection.limit=20
66
system.schema.cluster.name=venice-cluster0
77
zookeeper.address=zookeeper:2181
88
sslToStorageNodes=false
9+
router.enable.ssl=false
910
max.read.capacity=20000000
1011
router.max.outgoing.connection=10
1112
router.httpasyncclient.connection.warming.low.water.mark=1
1213
kafka.zk.address=zookeeper:2181
1314
router.max.outgoing.connection.per.route=2
1415
router.http.client.pool.size=2
1516
cluster.to.d2=venice-cluster0:venice-discovery
17+
cluster.to.server.d2=venice-cluster0:venice-server-d2
1618
kafka.bootstrap.servers=kafka:9092
1719
router.storage.node.client.type=APACHE_HTTP_ASYNC_CLIENT
18-
router.io.worker.count=4
20+
router.io.worker.count=4
21+
router.d2.announce.enabled=true
22+
router.d2.announce.host=venice-router

0 commit comments

Comments
 (0)