Skip to content

Commit

Permalink
Add cluster client, request routes configuration and support for bulk…
Browse files Browse the repository at this point in the history
… response (#59)

* Add cluster client and routes support for cluster client.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Address PR feedback and add tests.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Minor javadoc update.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Minor javadoc fix

Signed-off-by: Yury-Fridlyand <[email protected]>

* Address PR review comments.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Address PR review comments.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Address PR review comments.

Signed-off-by: Yury-Fridlyand <[email protected]>

---------

Signed-off-by: Yury-Fridlyand <[email protected]>
  • Loading branch information
Yury-Fridlyand authored Jan 27, 2024
1 parent 9345634 commit f296c00
Show file tree
Hide file tree
Showing 12 changed files with 687 additions and 77 deletions.
11 changes: 5 additions & 6 deletions java/client/src/main/java/glide/api/BaseClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
@AllArgsConstructor
public abstract class BaseClient implements AutoCloseable {

protected ConnectionManager connectionManager;
protected CommandManager commandManager;
protected final ConnectionManager connectionManager;
protected final CommandManager commandManager;

/**
* Extracts the response from the Protobuf response and either throws an exception or returns the
Expand All @@ -22,10 +22,9 @@ public abstract class BaseClient implements AutoCloseable {
* @param response Redis protobuf message
* @return Response Object
*/
protected static Object handleObjectResponse(Response response) {
// return function to convert protobuf.Response into the response object by
// calling valueFromPointer
return (new BaseCommandResponseResolver(RedisValueResolver::valueFromPointer)).apply(response);
protected Object handleObjectResponse(Response response) {
// convert protobuf response into Object and then Object into T
return new BaseCommandResponseResolver(RedisValueResolver::valueFromPointer).apply(response);
}

/**
Expand Down
36 changes: 11 additions & 25 deletions java/client/src/main/java/glide/api/RedisClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,23 @@
import glide.managers.CommandManager;
import glide.managers.ConnectionManager;
import glide.managers.models.Command;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

/**
* Async (non-blocking) client for Redis in Standalone mode. Use {@link
* #CreateClient(RedisClientConfiguration)} to request a client to Redis.
* Async (non-blocking) client for Redis in Standalone mode. Use {@link #CreateClient} to request a
* client to Redis.
*/
public class RedisClient extends BaseClient implements BaseCommands {

protected RedisClient(ConnectionManager connectionManager, CommandManager commandManager) {
super(connectionManager, commandManager);
}

/**
* Request an async (non-blocking) Redis client in Standalone mode.
* Async request for an async (non-blocking) Redis client in Standalone mode.
*
* @param config - Redis Client Configuration
* @param config Redis client Configuration
* @return a Future to connect and return a RedisClient
*/
public static CompletableFuture<RedisClient> CreateClient(RedisClientConfiguration config) {
Expand Down Expand Up @@ -53,29 +58,10 @@ protected static CommandManager buildCommandManager(ChannelHandler channelHandle
return new CommandManager(channelHandler);
}

protected RedisClient(ConnectionManager connectionManager, CommandManager commandManager) {
super(connectionManager, commandManager);
}

/**
* Executes a single command, without checking inputs. Every part of the command, including
* subcommands, should be added as a separate value in args.
*
* @remarks This function should only be used for single-response commands. Commands that don't
* return response (such as SUBSCRIBE), or that return potentially more than a single response
* (such as XREAD), or that change the client's behavior (such as entering pub/sub mode on
* RESP2 connections) shouldn't be called using this function.
* @example Returns a list of all pub/sub clients:
* <pre>
* Object result = client.customCommand(new String[]{"CLIENT","LIST","TYPE", "PUBSUB"}).get();
* </pre>
*
* @param args arguments for the custom command
* @return a CompletableFuture with response result from Redis
*/
@Override
public CompletableFuture<Object> customCommand(String[] args) {
Command command =
Command.builder().requestType(Command.RequestType.CUSTOM_COMMAND).arguments(args).build();
return commandManager.submitNewCommand(command, BaseClient::handleObjectResponse);
return commandManager.submitNewCommand(command, Optional.empty(), this::handleObjectResponse);
}
}
67 changes: 67 additions & 0 deletions java/client/src/main/java/glide/api/RedisClusterClient.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package glide.api;

import static glide.api.RedisClient.buildChannelHandler;
import static glide.api.RedisClient.buildCommandManager;
import static glide.api.RedisClient.buildConnectionManager;

import glide.api.commands.ClusterBaseCommands;
import glide.api.models.ClusterValue;
import glide.api.models.configuration.RedisClusterClientConfiguration;
import glide.api.models.configuration.Route;
import glide.connectors.handlers.ChannelHandler;
import glide.managers.CommandManager;
import glide.managers.ConnectionManager;
import glide.managers.models.Command;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

/**
* Async (non-blocking) client for Redis in Cluster mode. Use {@link #CreateClient} to request a
* client to Redis.
*/
public class RedisClusterClient extends BaseClient implements ClusterBaseCommands {

protected RedisClusterClient(ConnectionManager connectionManager, CommandManager commandManager) {
super(connectionManager, commandManager);
}

/**
* Async request for an async (non-blocking) Redis client in Cluster mode.
*
* @param config Redis cluster client Configuration
* @return a Future to connect and return a ClusterClient
*/
public static CompletableFuture<RedisClusterClient> CreateClient(
RedisClusterClientConfiguration config) {
try {
ChannelHandler channelHandler = buildChannelHandler();
ConnectionManager connectionManager = buildConnectionManager(channelHandler);
CommandManager commandManager = buildCommandManager(channelHandler);
// TODO: Support exception throwing, including interrupted exceptions
return connectionManager
.connectToRedis(config)
.thenApply(ignored -> new RedisClusterClient(connectionManager, commandManager));
} catch (InterruptedException e) {
// Something bad happened while we were establishing netty connection to UDS
var future = new CompletableFuture<RedisClusterClient>();
future.completeExceptionally(e);
return future;
}
}

@Override
public CompletableFuture<ClusterValue<Object>> customCommand(String[] args) {
Command command =
Command.builder().requestType(Command.RequestType.CUSTOM_COMMAND).arguments(args).build();
return commandManager.submitNewCommand(
command, Optional.empty(), response -> ClusterValue.of(handleObjectResponse(response)));
}

@Override
public CompletableFuture<ClusterValue<Object>> customCommand(String[] args, Route route) {
Command command =
Command.builder().requestType(Command.RequestType.CUSTOM_COMMAND).arguments(args).build();
return commandManager.submitNewCommand(
command, Optional.of(route), response -> ClusterValue.of(handleObjectResponse(response)));
}
}
22 changes: 11 additions & 11 deletions java/client/src/main/java/glide/api/commands/BaseCommands.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,19 @@ public interface BaseCommands {

/**
* Executes a single command, without checking inputs. Every part of the command, including
* subcommands, should be added as a separate value in args.
* subcommands, should be added as a separate value in {@code args}.
*
* @remarks This function should only be used for single-response commands. Commands that don't
* return response (such as SUBSCRIBE), or that return potentially more than a single response
* (such as XREAD), or that change the client's behavior (such as entering pub/sub mode on
* RESP2 connections) shouldn't be called using this function.
* @example Returns a list of all pub/sub clients:
* <pre>
* Object result = client.customCommand(new String[]{"CLIENT","LIST","TYPE", "PUBSUB"}).get();
* </pre>
*
* @param args arguments for the custom command
* @return a CompletableFuture with response result from Redis
* return response (such as <em>SUBSCRIBE</em>), or that return potentially more than a single
* response (such as <em>XREAD</em>), or that change the client's behavior (such as entering
* <em>pub</em>/<em>sub</em> mode on <em>RESP2</em> connections) shouldn't be called using
* this function.
* @example Returns a list of all <em>pub</em>/<em>sub</em> clients:
* <p><code>
* Object result = client.customCommand(new String[]{ "CLIENT", "LIST", "TYPE", "PUBSUB" }).get();
* </code>
* @param args Arguments for the custom command including the command name
* @return A <em>CompletableFuture</em> with response result from Redis
*/
CompletableFuture<Object> customCommand(String[] args);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package glide.api.commands;

import glide.api.models.ClusterValue;
import glide.api.models.configuration.Route;
import java.util.concurrent.CompletableFuture;

/**
* Base Commands interface to handle generic command and transaction requests with routing options.
*/
public interface ClusterBaseCommands {

/**
* Executes a single command, without checking inputs. Every part of the command, including
* subcommands, should be added as a separate value in {@code args}.
*
* @remarks This function should only be used for single-response commands. Commands that don't
* return response (such as <em>SUBSCRIBE</em>), or that return potentially more than a single
* response (such as <em>XREAD</em>), or that change the client's behavior (such as entering
* <em>pub</em>/<em>sub</em> mode on <em>RESP2</em> connections) shouldn't be called using
* this function.
* @example Returns a list of all <em>pub</em>/<em>sub</em> clients:
* <p><code>
* Object result = client.customCommand(new String[]{ "CLIENT", "LIST", "TYPE", "PUBSUB" }).get();
* </code>
* @param args Arguments for the custom command including the command name
* @return A <em>CompletableFuture</em> with response result from Redis
*/
CompletableFuture<ClusterValue<Object>> customCommand(String[] args);

/**
* Executes a single command, without checking inputs. Every part of the command, including
* subcommands, should be added as a separate value in {@code args}.
*
* @remarks This function should only be used for single-response commands. Commands that don't
* return response (such as <em>SUBSCRIBE</em>), or that return potentially more than a single
* response (such as <em>XREAD</em>), or that change the client's behavior (such as entering
* <em>pub</em>/<em>sub</em> mode on <em>RESP2</em> connections) shouldn't be called using
* this function.
* @example Returns a list of all <em>pub</em>/<em>sub</em> clients:
* <p><code>
* Object result = client.customCommand(new String[]{ "CLIENT", "LIST", "TYPE", "PUBSUB" }).get();
* </code>
* @param args Arguments for the custom command including the command name
* @param route Routing configuration for the command
* @return A <em>CompletableFuture</em> with response result from Redis
*/
CompletableFuture<ClusterValue<Object>> customCommand(String[] args, Route route);
}
58 changes: 58 additions & 0 deletions java/client/src/main/java/glide/api/models/ClusterValue.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package glide.api.models;

import java.util.Map;

/**
* union-like type which can store single-value or multi-value retrieved from Redis. The
* multi-value, if defined, contains the routed value as a Map<String, Object> containing a cluster
* node address to cluster node value.
*
* @param <T> The wrapped data type
*/
public class ClusterValue<T> {
private Map<String, T> multiValue = null;

private T singleValue = null;

private ClusterValue() {}

/**
* Get per-node value.<br>
* Check with {@link #hasMultiData()} prior to accessing the data.
*/
public Map<String, T> getMultiValue() {
assert hasMultiData();
return multiValue;
}

/**
* Get the single value.<br>
* Check with {@link #hasSingleData()} ()} prior to accessing the data.
*/
public T getSingleValue() {
assert hasSingleData();
return singleValue;
}

/** A constructor for the value. */
@SuppressWarnings("unchecked")
public static <T> ClusterValue<T> of(Object data) {
var res = new ClusterValue<T>();
if (data instanceof Map) {
res.multiValue = (Map<String, T>) data;
} else {
res.singleValue = (T) data;
}
return res;
}

/** Check that multi-value is stored in this object. Use it prior to accessing the data. */
public boolean hasMultiData() {
return multiValue != null;
}

/** Check that single-value is stored in this object. Use it prior to accessing the data. */
public boolean hasSingleData() {
return !hasMultiData();
}
}
Loading

0 comments on commit f296c00

Please sign in to comment.