Skip to content

Commit

Permalink
Java - Add custom command interface; and BaseCommands (valkey-io#837)
Browse files Browse the repository at this point in the history
* Java - Add custom command interface; and BaseCommands (#58)

* Add base command; add custom command

---------

Signed-off-by: Andrew Carbonetto <[email protected]>

* Clean up merge conflict

Signed-off-by: Andrew Carbonetto <[email protected]>

* Move Command resolvers to manager level

Signed-off-by: Andrew Carbonetto <[email protected]>

* Remove ClusterClient.java

Signed-off-by: Andrew Carbonetto <[email protected]>

* Spotless

Signed-off-by: Andrew Carbonetto <[email protected]>

* Update CommandManager comment

Signed-off-by: Andrew Carbonetto <[email protected]>

* Minor comments

Signed-off-by: Andrew Carbonetto <[email protected]>

* Move commands and response handlers to protected locations

Signed-off-by: Andrew Carbonetto <[email protected]>

* Clean up imports

Signed-off-by: Andrew Carbonetto <[email protected]>

* Update custom command documentation

Signed-off-by: Andrew Carbonetto <[email protected]>

* Update javadoc for RedisExceptionCheckedFunction

Signed-off-by: Andrew Carbonetto <[email protected]>

---------

Signed-off-by: Andrew Carbonetto <[email protected]>
  • Loading branch information
acarbonetto authored Jan 24, 2024
1 parent 2bb441d commit 9345634
Show file tree
Hide file tree
Showing 10 changed files with 487 additions and 131 deletions.
16 changes: 16 additions & 0 deletions java/client/src/main/java/glide/api/BaseClient.java
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package glide.api;

import glide.ffi.resolvers.RedisValueResolver;
import glide.managers.BaseCommandResponseResolver;
import glide.managers.CommandManager;
import glide.managers.ConnectionManager;
import java.util.concurrent.ExecutionException;
import lombok.AllArgsConstructor;
import response.ResponseOuterClass.Response;

/** Base Client class for Redis */
@AllArgsConstructor
Expand All @@ -12,6 +15,19 @@ public abstract class BaseClient implements AutoCloseable {
protected ConnectionManager connectionManager;
protected CommandManager commandManager;

/**
* Extracts the response from the Protobuf response and either throws an exception or returns the
* appropriate response as an Object
*
* @param response Redis protobuf message
* @return Response Object
*/
protected static Object handleObjectResponse(Response response) {
// return function to convert protobuf.Response into the response object by
// calling valueFromPointer
return (new BaseCommandResponseResolver(RedisValueResolver::valueFromPointer)).apply(response);
}

/**
* Closes this resource, relinquishing any underlying resources. This method is invoked
* automatically on objects managed by the try-with-resources statement.
Expand Down
26 changes: 25 additions & 1 deletion java/client/src/main/java/glide/api/RedisClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,20 @@

import static glide.ffi.resolvers.SocketListenerResolver.getSocket;

import glide.api.commands.BaseCommands;
import glide.api.models.configuration.RedisClientConfiguration;
import glide.connectors.handlers.CallbackDispatcher;
import glide.connectors.handlers.ChannelHandler;
import glide.managers.CommandManager;
import glide.managers.ConnectionManager;
import glide.managers.models.Command;
import java.util.concurrent.CompletableFuture;

/**
* Async (non-blocking) client for Redis in Standalone mode. Use {@link
* #CreateClient(RedisClientConfiguration)} to request a client to Redis.
*/
public class RedisClient extends BaseClient {
public class RedisClient extends BaseClient implements BaseCommands {

/**
* Request an async (non-blocking) Redis client in Standalone mode.
Expand Down Expand Up @@ -54,4 +56,26 @@ protected static CommandManager buildCommandManager(ChannelHandler channelHandle
protected RedisClient(ConnectionManager connectionManager, CommandManager commandManager) {
super(connectionManager, commandManager);
}

/**
* Executes a single command, without checking inputs. Every part of the command, including
* subcommands, should be added as a separate value in args.
*
* @remarks This function should only be used for single-response commands. Commands that don't
* return response (such as SUBSCRIBE), or that return potentially more than a single response
* (such as XREAD), or that change the client's behavior (such as entering pub/sub mode on
* RESP2 connections) shouldn't be called using this function.
* @example Returns a list of all pub/sub clients:
* <pre>
* Object result = client.customCommand(new String[]{"CLIENT","LIST","TYPE", "PUBSUB"}).get();
* </pre>
*
* @param args arguments for the custom command
* @return a CompletableFuture with response result from Redis
*/
public CompletableFuture<Object> customCommand(String[] args) {
Command command =
Command.builder().requestType(Command.RequestType.CUSTOM_COMMAND).arguments(args).build();
return commandManager.submitNewCommand(command, BaseClient::handleObjectResponse);
}
}
25 changes: 25 additions & 0 deletions java/client/src/main/java/glide/api/commands/BaseCommands.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package glide.api.commands;

import java.util.concurrent.CompletableFuture;

/** Base Commands interface to handle generic command and transaction requests. */
public interface BaseCommands {

/**
* Executes a single command, without checking inputs. Every part of the command, including
* subcommands, should be added as a separate value in args.
*
* @remarks This function should only be used for single-response commands. Commands that don't
* return response (such as SUBSCRIBE), or that return potentially more than a single response
* (such as XREAD), or that change the client's behavior (such as entering pub/sub mode on
* RESP2 connections) shouldn't be called using this function.
* @example Returns a list of all pub/sub clients:
* <pre>
* Object result = client.customCommand(new String[]{"CLIENT","LIST","TYPE", "PUBSUB"}).get();
* </pre>
*
* @param args arguments for the custom command
* @return a CompletableFuture with response result from Redis
*/
CompletableFuture<Object> customCommand(String[] args);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package glide.managers;

import glide.api.models.exceptions.ClosingException;
import glide.api.models.exceptions.ConnectionException;
import glide.api.models.exceptions.ExecAbortException;
import glide.api.models.exceptions.RedisException;
import glide.api.models.exceptions.RequestException;
import glide.api.models.exceptions.TimeoutException;
import lombok.AllArgsConstructor;
import response.ResponseOuterClass.RequestError;
import response.ResponseOuterClass.Response;

/**
* Response resolver responsible for evaluating the Redis response object with a success or failure.
*/
@AllArgsConstructor
public class BaseCommandResponseResolver
implements RedisExceptionCheckedFunction<Response, Object> {

private RedisExceptionCheckedFunction<Long, Object> respPointerResolver;

/**
* Extracts value from the RESP pointer. <br>
* Throws errors when the response is unsuccessful.
*
* @return A generic Object with the Response | null if the response is empty
*/
public Object apply(Response response) throws RedisException {
if (response.hasRequestError()) {
RequestError error = response.getRequestError();
String msg = error.getMessage();
switch (error.getType()) {
case Unspecified:
// Unspecified error on Redis service-side
throw new RequestException(msg);
case ExecAbort:
// Transactional error on Redis service-side
throw new ExecAbortException(msg);
case Timeout:
// Timeout from Glide to Redis service
throw new TimeoutException(msg);
case Disconnect:
// Connection problem between Glide and Redis
throw new ConnectionException(msg);
default:
// Request or command error from Redis
throw new RequestException(msg);
}
}
if (response.hasClosingError()) {
// A closing error is thrown when Rust-core is not connected to Redis
// We want to close shop and throw a ClosingException
// TODO: close the channel on a closing error
// channel.close();
throw new ClosingException(response.getClosingError());
}
if (response.hasConstantResponse()) {
// Return "OK"
return response.getConstantResponse().toString();
}
if (response.hasRespPointer()) {
// Return the shared value - which may be a null value
return respPointerResolver.apply(response.getRespPointer());
}
// if no response payload is provided, assume null
return null;
}
}
118 changes: 42 additions & 76 deletions java/client/src/main/java/glide/managers/CommandManager.java
Original file line number Diff line number Diff line change
@@ -1,19 +1,11 @@
package glide.managers;

import glide.api.models.exceptions.ClosingException;
import glide.api.models.exceptions.ConnectionException;
import glide.api.models.exceptions.ExecAbortException;
import glide.api.models.exceptions.RedisException;
import glide.api.models.exceptions.RequestException;
import glide.api.models.exceptions.TimeoutException;
import glide.connectors.handlers.ChannelHandler;
import glide.ffi.resolvers.RedisValueResolver;
import glide.models.RequestBuilder;
import java.util.List;
import glide.managers.models.Command;
import java.util.concurrent.CompletableFuture;
import lombok.RequiredArgsConstructor;
import redis_request.RedisRequestOuterClass;
import redis_request.RedisRequestOuterClass.RequestType;
import response.ResponseOuterClass.RequestError;
import response.ResponseOuterClass.Response;

/**
Expand All @@ -27,82 +19,56 @@ public class CommandManager {
private final ChannelHandler channel;

/**
* Async (non-blocking) get.<br>
* See <a href="https://redis.io/commands/get/">REDIS docs for GET</a>.
* Build a command and send.
*
* @param key The key name
* @param command
* @param responseHandler - to handle the response object
* @return A result promise of type T
*/
public CompletableFuture<String> get(String key) {
return submitNewRequest(RequestType.GetString, List.of(key));
public <T> CompletableFuture<T> submitNewCommand(
Command command, RedisExceptionCheckedFunction<Response, T> responseHandler) {
// write command request to channel
// when complete, convert the response to our expected type T using the given responseHandler
return channel
.write(prepareRedisRequest(command.getRequestType(), command.getArguments()), true)
.thenApplyAsync(response -> responseHandler.apply(response));
}

/**
* Async (non-blocking) set.<br>
* See <a href="https://redis.io/commands/set/">REDIS docs for SET</a>.
* Build a protobuf command/transaction request object.<br>
* Used by {@link CommandManager}.
*
* @param key The key name
* @param value The value to set
* @param command - Redis command
* @param args - Redis command arguments as string array
* @return An uncompleted request. CallbackDispatcher is responsible to complete it by adding a
* callback id.
*/
public CompletableFuture<String> set(String key, String value) {
return submitNewRequest(RequestType.SetString, List.of(key, value));
}
private RedisRequestOuterClass.RedisRequest.Builder prepareRedisRequest(
Command.RequestType command, String[] args) {
RedisRequestOuterClass.Command.ArgsArray.Builder commandArgs =
RedisRequestOuterClass.Command.ArgsArray.newBuilder();
for (var arg : args) {
commandArgs.addArgs(arg);
}

/**
* Build a command and submit it Netty to send.
*
* @param command Command type
* @param args Command arguments
* @return A result promise
*/
private CompletableFuture<String> submitNewRequest(RequestType command, List<String> args) {
return channel
.write(RequestBuilder.prepareRedisRequest(command, args), true)
.thenApplyAsync(this::extractValueFromGlideRsResponse);
// TODO: set route properly when no RouteOptions given
return RedisRequestOuterClass.RedisRequest.newBuilder()
.setSingleCommand(
RedisRequestOuterClass.Command.newBuilder()
.setRequestType(mapRequestTypes(command))
.setArgsArray(commandArgs.build())
.build())
.setRoute(
RedisRequestOuterClass.Routes.newBuilder()
.setSimpleRoutes(RedisRequestOuterClass.SimpleRoutes.AllNodes)
.build());
}

/**
* Check response and extract data from it.
*
* @param response A response received from rust core lib
* @return A String from the Redis response, or Ok. Otherwise, returns null
*/
private String extractValueFromGlideRsResponse(Response response) {
if (response.hasRequestError()) {
RequestError error = response.getRequestError();
String msg = error.getMessage();
switch (error.getType()) {
case Unspecified:
// Unspecified error on Redis service-side
throw new RequestException(msg);
case ExecAbort:
// Transactional error on Redis service-side
throw new ExecAbortException(msg);
case Timeout:
// Timeout from Glide to Redis service
throw new TimeoutException(msg);
case Disconnect:
// Connection problem between Glide and Redis
throw new ConnectionException(msg);
default:
// Request or command error from Redis
throw new RedisException(msg);
}
}
if (response.hasClosingError()) {
// A closing error is thrown when Rust-core is not connected to Redis
// We want to close shop and throw a ClosingException
channel.close();
throw new ClosingException(response.getClosingError());
}
if (response.hasConstantResponse()) {
// Return "OK"
return response.getConstantResponse().toString();
}
if (response.hasRespPointer()) {
// Return the shared value - which may be a null value
return RedisValueResolver.valueFromPointer(response.getRespPointer()).toString();
private RequestType mapRequestTypes(Command.RequestType inType) {
switch (inType) {
case CUSTOM_COMMAND:
return RequestType.CustomCommand;
}
// if no response payload is provided, assume null
return null;
throw new RuntimeException("Unsupported request type");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package glide.managers;

import glide.api.models.exceptions.RedisException;

/**
* Functional Interface to convert values and throw RedisException when encountering an error state.
*
* @param <R> type to evaluate
* @param <T> payload type
*/
@FunctionalInterface
public interface RedisExceptionCheckedFunction<R, T> {

/**
* Functional response handler that takes a value of type R and returns a payload of type T.
* Throws RedisException when encountering an invalid or error state.
*
* @param value - received value type
* @return T - returning payload type
* @throws RedisException
*/
T apply(R value) throws RedisException;
}
24 changes: 24 additions & 0 deletions java/client/src/main/java/glide/managers/models/Command.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package glide.managers.models;

import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NonNull;

/** Base Command class to send a single request to Redis. */
@Builder
@Getter
@EqualsAndHashCode
public class Command {

/** Redis command request type */
@NonNull final RequestType requestType;

/** List of Arguments for the Redis command request */
@Builder.Default final String[] arguments = new String[] {};

public enum RequestType {
/** Call a custom command with list of string arguments */
CUSTOM_COMMAND,
}
}
Loading

0 comments on commit 9345634

Please sign in to comment.