Skip to content

Commit

Permalink
Add Transactions on top of cluster mode
Browse files Browse the repository at this point in the history
Signed-off-by: Andrew Carbonetto <[email protected]>
  • Loading branch information
acarbonetto committed Jan 29, 2024
1 parent ec3a76a commit 6d308d2
Show file tree
Hide file tree
Showing 17 changed files with 681 additions and 224 deletions.
62 changes: 35 additions & 27 deletions java/client/src/main/java/glide/api/BaseClient.java
Original file line number Diff line number Diff line change
@@ -1,22 +1,24 @@
package glide.api;

import glide.api.commands.BaseCommands;
import glide.api.commands.Transaction;
import glide.api.commands.ConnectionCommands;
import glide.api.commands.StringCommands;
import glide.api.models.commands.SetOptions;
import glide.api.models.exceptions.RedisException;
import glide.ffi.resolvers.RedisValueResolver;
import glide.managers.BaseCommandResponseResolver;
import glide.managers.CommandManager;
import glide.managers.ConnectionManager;
import glide.managers.models.Command;
import java.util.HashMap;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import lombok.AllArgsConstructor;
import response.ResponseOuterClass.Response;

/** Base Client class for Redis */
@AllArgsConstructor
public abstract class BaseClient implements AutoCloseable, BaseCommands {
public abstract class BaseClient implements AutoCloseable, StringCommands, ConnectionCommands {

protected final ConnectionManager connectionManager;
protected final CommandManager commandManager;
Expand Down Expand Up @@ -45,7 +47,7 @@ public void close() throws ExecutionException {
* @param response Redis protobuf message
* @return Response Object
*/
protected static Object handleObjectResponse(Response response) {
protected Object handleObjectResponse(Response response) {
// convert protobuf response into Object and then Object into T
return new BaseCommandResponseResolver(RedisValueResolver::valueFromPointer).apply(response);
}
Expand All @@ -56,7 +58,7 @@ protected static Object handleObjectResponse(Response response) {
*
* @return null if the response is empty
*/
protected static Void handleVoidResponse(Response response) {
protected Void handleVoidResponse(Response response) {
Object value = handleObjectResponse(response);
if (value == null) {
return null;
Expand All @@ -74,7 +76,7 @@ protected static Void handleVoidResponse(Response response) {
* @param response Redis protobuf message
* @return Response as a String
*/
protected static String handleStringResponse(Response response) {
protected String handleStringResponse(Response response) {
Object value = handleObjectResponse(response);
if (value instanceof String) {
return (String) value;
Expand All @@ -92,7 +94,7 @@ protected static String handleStringResponse(Response response) {
* @param response Redis protobuf message
* @return Response as an Object[]
*/
protected static Object[] handleArrayResponse(Response response) {
protected Object[] handleArrayResponse(Response response) {
Object value = handleObjectResponse(response);
if (value instanceof Object[]) {
return (Object[]) value;
Expand All @@ -110,7 +112,7 @@ protected static Object[] handleArrayResponse(Response response) {
* @param response Redis protobuf message
* @return Response as a String
*/
protected static HashMap<String, Object> handleMapResponse(Response response) {
protected HashMap<String, Object> handleMapResponse(Response response) {
Object value = handleObjectResponse(response);
if (value instanceof HashMap) {
return (HashMap<String, Object>) value;
Expand All @@ -122,26 +124,32 @@ protected static HashMap<String, Object> handleMapResponse(Response response) {
}

@Override
public CompletableFuture<Object> customCommand(String[] args) {
Command command =
Command.builder().requestType(Command.RequestType.CUSTOM_COMMAND).arguments(args).build();
return commandManager.submitNewCommand(command, BaseClient::handleObjectResponse);
public CompletableFuture<String> ping() {
return commandManager.submitNewCommand(
Command.ping(), Optional.empty(), r -> handleStringResponse(r));
}

/**
* Execute a transaction by processing the queued commands.
*
* @see <a href="https://redis.io/topics/Transactions/">redis.io</a> for details on Redis
* Transactions.
* @param transaction - A {@link Transaction} object containing a list of commands to be executed.
* @return A list of results corresponding to the execution of each command in the transaction.
* <ul>
* <li>If a command returns a value, it will be included in the list. If a command doesn't
* return a value, the list entry will be null.
* <li>If the transaction failed due to a WATCH command, `exec` will return `null`.
* </ul>
*/
public CompletableFuture<Object[]> exec(Transaction transaction) {
return commandManager.submitNewTransaction(transaction, BaseClient::handleArrayResponse);
@Override
public CompletableFuture<String> ping(String msg) {
return commandManager.submitNewCommand(
Command.ping(msg), Optional.empty(), r -> handleStringResponse(r));
}

@Override
public CompletableFuture<String> get(String key) {
return commandManager.submitNewCommand(
Command.get(key), Optional.empty(), r -> handleStringResponse(r));
}

@Override
public CompletableFuture<Void> set(String key, String value) {
return commandManager.submitNewCommand(
Command.set(key, value), Optional.empty(), r -> handleVoidResponse(r));
}

@Override
public CompletableFuture<String> set(String key, String value, SetOptions options) {
return commandManager.submitNewCommand(
Command.set(key, value, options.toArgs()), Optional.empty(), r -> handleStringResponse(r));
}
}
58 changes: 0 additions & 58 deletions java/client/src/main/java/glide/api/ClusterClient.java

This file was deleted.

83 changes: 11 additions & 72 deletions java/client/src/main/java/glide/api/RedisClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@

import static glide.ffi.resolvers.SocketListenerResolver.getSocket;

import glide.api.commands.BaseCommands;
import glide.api.commands.ConnectionCommands;
import glide.api.commands.GenericCommands;
import glide.api.commands.ServerCommands;
import glide.api.commands.StringCommands;
import glide.api.commands.Transaction;
import glide.api.models.commands.InfoOptions;
import glide.api.models.commands.SetOptions;
import glide.api.models.configuration.RedisClientConfiguration;
import glide.connectors.handlers.CallbackDispatcher;
import glide.connectors.handlers.ChannelHandler;
Expand All @@ -23,11 +22,7 @@
* client to Redis.
*/
public class RedisClient extends BaseClient
implements ConnectionCommands, GenericCommands, ServerCommands, StringCommands {

protected RedisClient(ConnectionManager connectionManager, CommandManager commandManager) {
super(connectionManager, commandManager);
}
implements BaseCommands, ConnectionCommands, ServerCommands {

protected RedisClient(ConnectionManager connectionManager, CommandManager commandManager) {
super(connectionManager, commandManager);
Expand Down Expand Up @@ -71,81 +66,25 @@ protected static CommandManager buildCommandManager(ChannelHandler channelHandle

@Override
public CompletableFuture<Object> customCommand(String[] args) {
Command command =
Command.builder().requestType(Command.RequestType.CUSTOM_COMMAND).arguments(args).build();
return commandManager.submitNewCommand(command, Optional.empty(), this::handleObjectResponse);
}

/**
* Ping the Redis server.
*
* @see <a href="https://redis.io/commands/ping/">redis.io</a> for details.
* @return the String "PONG"
*/
@Override
public CompletableFuture<String> ping() {
return commandManager.submitNewCommand(Command.ping(), BaseClient::handleStringResponse);
return commandManager.submitNewCommand(
Command.customCommand(args), Optional.empty(), response -> handleObjectResponse(response));
}

/**
* Ping the Redis server.
*
* @see <a href="https://redis.io/commands/ping/">redis.io</a> for details.
* @param msg - the ping argument that will be returned.
* @return return a copy of the argument.
*/
@Override
public CompletableFuture<String> ping(String msg) {
return commandManager.submitNewCommand(Command.ping(msg), BaseClient::handleStringResponse);
public CompletableFuture<Object[]> exec(Transaction transaction) {
return commandManager.submitNewTransaction(
transaction, Optional.empty(), r -> handleArrayResponse(r));
}

/**
* Get information and statistics about the Redis server. DEFAULT option is assumed
*
* @see <a href="https://redis.io/commands/info/">redis.io</a> for details.
* @return CompletableFuture with the response
*/
@Override
public CompletableFuture<Map> info() {
return commandManager.submitNewCommand(Command.info(), BaseClient::handleMapResponse);
return commandManager.submitNewCommand(
Command.info(), Optional.empty(), r -> handleMapResponse(r));
}

/**
* Get information and statistics about the Redis server.
*
* @see <a href="https://redis.io/commands/info/">redis.io</a> for details.
* @param options - A list of InfoSection values specifying which sections of information to
* retrieve. When no parameter is provided, the default option is assumed.
* @return CompletableFuture with the response
*/
@Override
public CompletableFuture<Map> info(InfoOptions options) {
return commandManager.submitNewCommand(
Command.info(options.toInfoOptions()), BaseClient::handleMapResponse);
}

/**
* Get the value associated with the given key, or null if no such value exists.
*
* @see <a href="https://redis.io/commands/get/">redis.io</a> for details.
* @param key - The key to retrieve from the database.
* @return If `key` exists, returns the value of `key` as a string. Otherwise, return null
*/
@Override
public CompletableFuture<String> get(String key) {
return commandManager.submitNewCommand(Command.get(key), BaseClient::handleStringResponse);
}

/**
* Set the given key with the given value.
*
* @see <a href="https://redis.io/commands/set/">redis.io</a> for details.
* @param key - The key to store.
* @param value - The value to store with the given key.
* @return null
*/
@Override
public CompletableFuture<Void> set(String key, String value) {
return commandManager.submitNewCommand(Command.set(key, value), BaseClient::handleVoidResponse);
Command.info(options.toInfoOptions()), Optional.empty(), r -> handleMapResponse(r));
}
}
Loading

0 comments on commit 6d308d2

Please sign in to comment.