Skip to content

Commit 8263ef4

Browse files
Adding resiliency in RedisClusterClient
(handle tcp keepidle in failover scenario, auth with ssl & handle cluster details endpoint returning IP in ssl scenario. Signed-off-by: Andrija Perovic <[email protected]>
1 parent 5f2192a commit 8263ef4

File tree

4 files changed

+93
-6
lines changed

4 files changed

+93
-6
lines changed

Diff for: Makefile

+2-2
Original file line numberDiff line numberDiff line change
@@ -71,10 +71,10 @@ push-serving-docker:
7171
docker push $(REGISTRY)/feast-serving:$(VERSION)
7272

7373
build-core-docker:
74-
docker build --build-arg VERSION=$(VERSION) -t $(REGISTRY)/feast-core:$(VERSION) -f infra/docker/core/Dockerfile .
74+
docker build --no-cache --build-arg VERSION=$(VERSION) -t $(REGISTRY)/feast-core:$(VERSION) -f infra/docker/core/Dockerfile .
7575

7676
build-serving-docker:
77-
docker build --build-arg VERSION=$(VERSION) -t $(REGISTRY)/feast-serving:$(VERSION) -f infra/docker/serving/Dockerfile .
77+
docker build --no-cache --build-arg VERSION=$(VERSION) -t $(REGISTRY)/feast-serving:$(VERSION) -f infra/docker/serving/Dockerfile .
7878

7979
# Versions
8080

Diff for: serving/src/main/java/feast/serving/config/FeastProperties.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -322,7 +322,9 @@ public RedisClusterStoreConfig getRedisClusterConfig() {
322322
return new RedisClusterStoreConfig(
323323
this.config.get("connection_string"),
324324
ReadFrom.valueOf(this.config.get("read_from")),
325-
Duration.parse(this.config.get("timeout")));
325+
Duration.parse(this.config.get("timeout")),
326+
Boolean.valueOf(this.config.getOrDefault("ssl", "false")),
327+
this.config.getOrDefault("password", ""));
326328
}
327329

328330
public RedisStoreConfig getRedisConfig() {

Diff for: storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/RedisClusterClient.java

+74-2
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,17 @@
2222
import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
2323
import io.lettuce.core.cluster.api.async.RedisAdvancedClusterAsyncCommands;
2424
import io.lettuce.core.codec.ByteArrayCodec;
25+
import io.lettuce.core.resource.ClientResources;
26+
import io.lettuce.core.resource.DnsResolvers;
27+
import io.lettuce.core.resource.MappingSocketAddressResolver;
28+
import io.lettuce.core.resource.NettyCustomizer;
29+
import io.netty.bootstrap.Bootstrap;
30+
import io.netty.channel.epoll.EpollChannelOption;
31+
import java.net.InetAddress;
32+
import java.net.UnknownHostException;
2533
import java.util.Arrays;
2634
import java.util.List;
35+
import java.util.Map;
2736
import java.util.stream.Collectors;
2837

2938
public class RedisClusterClient implements RedisClientAdapter {
@@ -62,18 +71,81 @@ private RedisClusterClient(Builder builder) {
6271
this.asyncCommands.setAutoFlushCommands(false);
6372
}
6473

74+
public static String getAddressString(String host) {
75+
try {
76+
return InetAddress.getByName(host).getHostAddress();
77+
} catch (UnknownHostException e) {
78+
throw new RuntimeException(String.format("getAllByName() failed: %s", e.getMessage()));
79+
}
80+
}
81+
82+
public static MappingSocketAddressResolver customSocketAddressResolver(
83+
RedisClusterStoreConfig config) {
84+
85+
List<String> configuredHosts =
86+
Arrays.stream(config.getConnectionString().split(","))
87+
.map(
88+
hostPort -> {
89+
return hostPort.trim().split(":")[0];
90+
})
91+
.collect(Collectors.toList());
92+
93+
Map<String, String> mapAddressHost =
94+
configuredHosts.stream()
95+
.collect(
96+
Collectors.toMap(host -> ((String) getAddressString(host)), host -> (String) host));
97+
98+
return MappingSocketAddressResolver.create(
99+
DnsResolvers.UNRESOLVED,
100+
hostAndPort ->
101+
mapAddressHost.keySet().stream().anyMatch(i -> i.equals(hostAndPort.getHostText()))
102+
? hostAndPort.of(
103+
mapAddressHost.get(hostAndPort.getHostText()), hostAndPort.getPort())
104+
: hostAndPort);
105+
}
106+
107+
public static ClientResources customClientResources(RedisClusterStoreConfig config) {
108+
ClientResources clientResources =
109+
ClientResources.builder()
110+
.nettyCustomizer(
111+
new NettyCustomizer() {
112+
@Override
113+
public void afterBootstrapInitialized(Bootstrap bootstrap) {
114+
bootstrap.option(EpollChannelOption.TCP_KEEPIDLE, 15);
115+
bootstrap.option(EpollChannelOption.TCP_KEEPINTVL, 5);
116+
bootstrap.option(EpollChannelOption.TCP_KEEPCNT, 3);
117+
// Socket Timeout (milliseconds)
118+
bootstrap.option(EpollChannelOption.TCP_USER_TIMEOUT, 60000);
119+
}
120+
})
121+
.socketAddressResolver(customSocketAddressResolver(config))
122+
.build();
123+
return clientResources;
124+
}
125+
65126
public static RedisClientAdapter create(RedisClusterStoreConfig config) {
127+
66128
List<RedisURI> redisURIList =
67129
Arrays.stream(config.getConnectionString().split(","))
68130
.map(
69131
hostPort -> {
70132
String[] hostPortSplit = hostPort.trim().split(":");
71-
return RedisURI.create(hostPortSplit[0], Integer.parseInt(hostPortSplit[1]));
133+
RedisURI redisURI =
134+
RedisURI.create(hostPortSplit[0], Integer.parseInt(hostPortSplit[1]));
135+
if (!config.getPassword().isEmpty()) {
136+
redisURI.setPassword(config.getPassword());
137+
}
138+
if (config.getSsl()) {
139+
redisURI.setSsl(true);
140+
}
141+
return redisURI;
72142
})
73143
.collect(Collectors.toList());
74144

75145
io.lettuce.core.cluster.RedisClusterClient client =
76-
io.lettuce.core.cluster.RedisClusterClient.create(redisURIList);
146+
io.lettuce.core.cluster.RedisClusterClient.create(
147+
customClientResources(config), redisURIList);
148+
77149
client.setOptions(
78150
ClusterClientOptions.builder()
79151
.socketOptions(SocketOptions.builder().keepAlive(true).tcpNoDelay(true).build())

Diff for: storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/RedisClusterStoreConfig.java

+14-1
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,16 @@ public class RedisClusterStoreConfig {
2323
private final String connectionString;
2424
private final ReadFrom readFrom;
2525
private final Duration timeout;
26+
private final String password;
27+
private final Boolean ssl;
2628

27-
public RedisClusterStoreConfig(String connectionString, ReadFrom readFrom, Duration timeout) {
29+
public RedisClusterStoreConfig(
30+
String connectionString, ReadFrom readFrom, Duration timeout, Boolean ssl, String password) {
2831
this.connectionString = connectionString;
2932
this.readFrom = readFrom;
3033
this.timeout = timeout;
34+
this.password = password;
35+
this.ssl = ssl;
3136
}
3237

3338
public String getConnectionString() {
@@ -41,4 +46,12 @@ public ReadFrom getReadFrom() {
4146
public Duration getTimeout() {
4247
return this.timeout;
4348
}
49+
50+
public String getPassword() {
51+
return this.password;
52+
}
53+
54+
public Boolean getSsl() {
55+
return this.ssl;
56+
}
4457
}

0 commit comments

Comments
 (0)