Skip to content

Commit

Permalink
Update XCLAIM with options; remove LASTID
Browse files Browse the repository at this point in the history
Signed-off-by: Andrew Carbonetto <[email protected]>
  • Loading branch information
acarbonetto committed Jun 30, 2024
1 parent 884eda5 commit 1ae76dc
Show file tree
Hide file tree
Showing 7 changed files with 151 additions and 67 deletions.
17 changes: 16 additions & 1 deletion glide-core/src/client/value_conversion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -858,7 +858,7 @@ pub(crate) fn expected_type_for_cmd(cmd: &Cmd) -> Option<ExpectedReturnType> {
Some(ExpectedReturnType::ArrayOfStrings)
} else {
Some(ExpectedReturnType::Map {
key_type: &Some(ExpectedReturnType::BulkString),
key_type: &Some(ExpectedReturnType::SimpleString),
value_type: &Some(ExpectedReturnType::ArrayOfStrings),
})
}
Expand Down Expand Up @@ -1192,6 +1192,21 @@ mod tests {
assert!(converted_4.is_err());
}

#[test]
fn convert_xclaim() {
assert!(matches!(
expected_type_for_cmd(redis::cmd("XCLAIM").arg("key").arg("grou").arg("consumer").arg("0").arg("id")),
Some(ExpectedReturnType::Map {
key_type: &Some(ExpectedReturnType::SimpleString),
value_type: &Some(ExpectedReturnType::ArrayOfStrings),
})
));
assert!(matches!(
expected_type_for_cmd(redis::cmd("XCLAIM").arg("key").arg("grou").arg("consumer").arg("0").arg("id").arg("JUSTID")),
Some(ExpectedReturnType::ArrayOfStrings)
));
}

#[test]
fn convert_xrange_xrevrange() {
assert!(matches!(
Expand Down
21 changes: 15 additions & 6 deletions java/client/src/main/java/glide/api/BaseClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import static glide.api.models.commands.bitmap.BitFieldOptions.BitFieldReadOnlySubCommands;
import static glide.api.models.commands.bitmap.BitFieldOptions.BitFieldSubCommands;
import static glide.api.models.commands.bitmap.BitFieldOptions.createBitFieldArgs;
import static glide.api.models.commands.stream.StreamClaimOptions.JUST_ID_REDIS_API;
import static glide.ffi.resolvers.SocketListenerResolver.getSocket;
import static glide.utils.ArrayTransformUtils.cast3DArray;
import static glide.utils.ArrayTransformUtils.castArray;
Expand Down Expand Up @@ -2024,8 +2025,8 @@ public CompletableFuture<Map<String, String[]>> xclaim(
@NonNull String[] ids,
@NonNull StreamClaimOptions options) {
String[] args =
concatenateArrays(new String[] {key, group, consumer, Long.toString(minIdleTime)}, ids);
args = concatenateArrays(args, options.toArgs(false));
concatenateArrays(
new String[] {key, group, consumer, Long.toString(minIdleTime)}, ids, options.toArgs());
return commandManager.submitNewCommand(XClaim, args, this::handleMapResponse);
}

Expand All @@ -2036,8 +2037,13 @@ public CompletableFuture<String[]> xclaimJustId(
@NonNull String consumer,
long minIdleTime,
@NonNull String[] ids) {
return xclaimJustId(
key, group, consumer, minIdleTime, ids, StreamClaimOptions.builder().build());
String[] args =
concatenateArrays(
new String[] {key, group, consumer, Long.toString(minIdleTime)},
ids,
new String[] {JUST_ID_REDIS_API});
return commandManager.submitNewCommand(
XClaim, args, response -> castArray(handleArrayResponse(response), String.class));
}

@Override
Expand All @@ -2049,8 +2055,11 @@ public CompletableFuture<String[]> xclaimJustId(
@NonNull String[] ids,
@NonNull StreamClaimOptions options) {
String[] args =
concatenateArrays(new String[] {key, group, consumer, Long.toString(minIdleTime)}, ids);
args = concatenateArrays(args, options.toArgs(true));
concatenateArrays(
new String[] {key, group, consumer, Long.toString(minIdleTime)},
ids,
options.toArgs(),
new String[] {JUST_ID_REDIS_API});
return commandManager.submitNewCommand(
XClaim, args, response -> castArray(handleArrayResponse(response), String.class));
}
Expand Down
26 changes: 19 additions & 7 deletions java/client/src/main/java/glide/api/models/BaseTransaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static glide.api.models.commands.function.FunctionListOptions.LIBRARY_NAME_REDIS_API;
import static glide.api.models.commands.function.FunctionListOptions.WITH_CODE_REDIS_API;
import static glide.api.models.commands.function.FunctionLoadOptions.REPLACE;
import static glide.api.models.commands.stream.StreamClaimOptions.JUST_ID_REDIS_API;
import static glide.utils.ArrayTransformUtils.concatenateArrays;
import static glide.utils.ArrayTransformUtils.convertMapToKeyValueStringArray;
import static glide.utils.ArrayTransformUtils.convertMapToValueKeyStringArray;
Expand Down Expand Up @@ -3296,7 +3297,10 @@ public T xclaim(
@NonNull String consumer,
long minIdleTime,
@NonNull String[] ids) {
return xclaim(key, group, consumer, minIdleTime, ids, StreamClaimOptions.builder().build());
String[] args =
concatenateArrays(new String[] {key, group, consumer, Long.toString(minIdleTime)}, ids);
protobufTransaction.addCommands(buildCommand(XClaim, buildArgs(args)));
return getThis();
}

/**
Expand All @@ -3320,8 +3324,8 @@ public T xclaim(
@NonNull String[] ids,
@NonNull StreamClaimOptions options) {
String[] args =
concatenateArrays(new String[] {key, group, consumer, Long.toString(minIdleTime)}, ids);
args = concatenateArrays(args, options.toArgs(false));
concatenateArrays(
new String[] {key, group, consumer, Long.toString(minIdleTime)}, ids, options.toArgs());
protobufTransaction.addCommands(buildCommand(XClaim, buildArgs(args)));
return getThis();
}
Expand All @@ -3344,8 +3348,13 @@ public T xclaimJustId(
@NonNull String consumer,
long minIdleTime,
@NonNull String[] ids) {
return xclaimJustId(
key, group, consumer, minIdleTime, ids, StreamClaimOptions.builder().build());
String[] args =
concatenateArrays(
new String[] {key, group, consumer, Long.toString(minIdleTime)},
ids,
new String[] {JUST_ID_REDIS_API});
protobufTransaction.addCommands(buildCommand(XClaim, buildArgs(args)));
return getThis();
}

/**
Expand All @@ -3369,8 +3378,11 @@ public T xclaimJustId(
@NonNull String[] ids,
@NonNull StreamClaimOptions options) {
String[] args =
concatenateArrays(new String[] {key, group, consumer, Long.toString(minIdleTime)}, ids);
args = concatenateArrays(args, options.toArgs(true));
concatenateArrays(
new String[] {key, group, consumer, Long.toString(minIdleTime)},
ids,
options.toArgs(),
new String[] {JUST_ID_REDIS_API});
protobufTransaction.addCommands(buildCommand(XClaim, buildArgs(args)));
return getThis();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,6 @@ public class StreamClaimOptions {
/** Redis api string to designate JUSTID */
public static final String JUST_ID_REDIS_API = "JUSTID";

/** Redis api string to designate LASTID */
public static final String LAST_ID_REDIS_API = "LASTID";

/**
* Set the idle time (last time it was delivered) of the message. If <code>idle</code> is not
* specified, an <code>idle</code> of <code>0</code> is assumed, that is, the time count is reset
Expand Down Expand Up @@ -63,9 +60,6 @@ public class StreamClaimOptions {
*/
private final boolean isForce;

/** Filter up to the <code>lastid</code> when claiming messages */
private final String lastId;

public static class StreamClaimOptionsBuilder {

/**
Expand All @@ -83,7 +77,7 @@ public StreamClaimOptionsBuilder force() {
*
* @return String[]
*/
public String[] toArgs(boolean isJustId) {
public String[] toArgs() {
List<String> optionArgs = new ArrayList<>();

if (idle != null) {
Expand All @@ -105,15 +99,6 @@ public String[] toArgs(boolean isJustId) {
optionArgs.add(FORCE_REDIS_API);
}

if (isJustId) {
optionArgs.add(JUST_ID_REDIS_API);
}

if (lastId != null) {
optionArgs.add(LAST_ID_REDIS_API);
optionArgs.add(lastId);
}

return optionArgs.toArray(new String[0]);
}
}
25 changes: 4 additions & 21 deletions java/client/src/test/java/glide/api/RedisClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import static glide.api.models.commands.stream.StreamClaimOptions.FORCE_REDIS_API;
import static glide.api.models.commands.stream.StreamClaimOptions.IDLE_REDIS_API;
import static glide.api.models.commands.stream.StreamClaimOptions.JUST_ID_REDIS_API;
import static glide.api.models.commands.stream.StreamClaimOptions.LAST_ID_REDIS_API;
import static glide.api.models.commands.stream.StreamClaimOptions.RETRY_COUNT_REDIS_API;
import static glide.api.models.commands.stream.StreamClaimOptions.TIME_REDIS_API;
import static glide.api.models.commands.stream.StreamGroupOptions.ENTRIES_READ_REDIS_API;
Expand Down Expand Up @@ -5835,13 +5834,7 @@ public void xclaim_with_options_returns_success() {
Long minIdleTime = 18L;
String[] ids = new String[] {"testId"};
StreamClaimOptions options =
StreamClaimOptions.builder()
.force()
.idle(11L)
.idleUnixTime(12L)
.retryCount(5L)
.lastId("2345-5")
.build();
StreamClaimOptions.builder().force().idle(11L).idleUnixTime(12L).retryCount(5L).build();
String[] arguments =
new String[] {
key,
Expand All @@ -5855,9 +5848,7 @@ public void xclaim_with_options_returns_success() {
"12",
RETRY_COUNT_REDIS_API,
"5",
FORCE_REDIS_API,
LAST_ID_REDIS_API,
"2345-5"
FORCE_REDIS_API
};
Map<String, String[]> mockResult = Map.of("1234-0", new String[] {"message", "log"});

Expand Down Expand Up @@ -5917,13 +5908,7 @@ public void xclaimJustId_with_options_returns_success() {
Long minIdleTime = 18L;
String[] ids = new String[] {"testId"};
StreamClaimOptions options =
StreamClaimOptions.builder()
.force()
.idle(11L)
.idleUnixTime(12L)
.retryCount(5L)
.lastId("2345-5")
.build();
StreamClaimOptions.builder().force().idle(11L).idleUnixTime(12L).retryCount(5L).build();
String[] arguments =
new String[] {
key,
Expand All @@ -5938,9 +5923,7 @@ public void xclaimJustId_with_options_returns_success() {
RETRY_COUNT_REDIS_API,
"5",
FORCE_REDIS_API,
JUST_ID_REDIS_API,
LAST_ID_REDIS_API,
"2345-5"
JUST_ID_REDIS_API
};
String[] mockResult = {"message", "log"};

Expand Down
17 changes: 3 additions & 14 deletions java/client/src/test/java/glide/api/models/TransactionTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import static glide.api.models.commands.stream.StreamClaimOptions.FORCE_REDIS_API;
import static glide.api.models.commands.stream.StreamClaimOptions.IDLE_REDIS_API;
import static glide.api.models.commands.stream.StreamClaimOptions.JUST_ID_REDIS_API;
import static glide.api.models.commands.stream.StreamClaimOptions.LAST_ID_REDIS_API;
import static glide.api.models.commands.stream.StreamClaimOptions.RETRY_COUNT_REDIS_API;
import static glide.api.models.commands.stream.StreamClaimOptions.TIME_REDIS_API;
import static glide.api.models.commands.stream.StreamGroupOptions.ENTRIES_READ_REDIS_API;
Expand Down Expand Up @@ -882,13 +881,7 @@ InfScoreBound.NEGATIVE_INFINITY, new ScoreBoundary(3, false), new Limit(1, 2)),
results.add(Pair.of(XClaim, buildArgs("key", "group", "consumer", "99", "12345-1", "98765-4")));

StreamClaimOptions claimOptions =
StreamClaimOptions.builder()
.force()
.idle(11L)
.idleUnixTime(12L)
.retryCount(5L)
.lastId("2345-5")
.build();
StreamClaimOptions.builder().force().idle(11L).idleUnixTime(12L).retryCount(5L).build();
transaction.xclaim(
"key", "group", "consumer", 99L, new String[] {"12345-1", "98765-4"}, claimOptions);
results.add(
Expand All @@ -907,9 +900,7 @@ InfScoreBound.NEGATIVE_INFINITY, new ScoreBoundary(3, false), new Limit(1, 2)),
"12",
RETRY_COUNT_REDIS_API,
"5",
FORCE_REDIS_API,
LAST_ID_REDIS_API,
"2345-5")));
FORCE_REDIS_API)));

transaction.xclaimJustId("key", "group", "consumer", 99L, new String[] {"12345-1", "98765-4"});
results.add(
Expand All @@ -936,9 +927,7 @@ InfScoreBound.NEGATIVE_INFINITY, new ScoreBoundary(3, false), new Limit(1, 2)),
RETRY_COUNT_REDIS_API,
"5",
FORCE_REDIS_API,
JUST_ID_REDIS_API,
LAST_ID_REDIS_API,
"2345-5")));
JUST_ID_REDIS_API)));

transaction.time();
results.add(Pair.of(Time, buildArgs()));
Expand Down
Loading

0 comments on commit 1ae76dc

Please sign in to comment.