diff --git a/glide-core/src/client/value_conversion.rs b/glide-core/src/client/value_conversion.rs index a5c9ee4709..73e38532a1 100644 --- a/glide-core/src/client/value_conversion.rs +++ b/glide-core/src/client/value_conversion.rs @@ -858,7 +858,7 @@ pub(crate) fn expected_type_for_cmd(cmd: &Cmd) -> Option { Some(ExpectedReturnType::ArrayOfStrings) } else { Some(ExpectedReturnType::Map { - key_type: &Some(ExpectedReturnType::BulkString), + key_type: &Some(ExpectedReturnType::SimpleString), value_type: &Some(ExpectedReturnType::ArrayOfStrings), }) } @@ -1192,6 +1192,21 @@ mod tests { assert!(converted_4.is_err()); } + #[test] + fn convert_xclaim() { + assert!(matches!( + expected_type_for_cmd(redis::cmd("XCLAIM").arg("key").arg("grou").arg("consumer").arg("0").arg("id")), + Some(ExpectedReturnType::Map { + key_type: &Some(ExpectedReturnType::SimpleString), + value_type: &Some(ExpectedReturnType::ArrayOfStrings), + }) + )); + assert!(matches!( + expected_type_for_cmd(redis::cmd("XCLAIM").arg("key").arg("grou").arg("consumer").arg("0").arg("id").arg("JUSTID")), + Some(ExpectedReturnType::ArrayOfStrings) + )); + } + #[test] fn convert_xrange_xrevrange() { assert!(matches!( diff --git a/java/client/src/main/java/glide/api/BaseClient.java b/java/client/src/main/java/glide/api/BaseClient.java index 91155c5c5c..7c7ca5e639 100644 --- a/java/client/src/main/java/glide/api/BaseClient.java +++ b/java/client/src/main/java/glide/api/BaseClient.java @@ -7,6 +7,7 @@ import static glide.api.models.commands.bitmap.BitFieldOptions.BitFieldReadOnlySubCommands; import static glide.api.models.commands.bitmap.BitFieldOptions.BitFieldSubCommands; import static glide.api.models.commands.bitmap.BitFieldOptions.createBitFieldArgs; +import static glide.api.models.commands.stream.StreamClaimOptions.JUST_ID_REDIS_API; import static glide.ffi.resolvers.SocketListenerResolver.getSocket; import static glide.utils.ArrayTransformUtils.cast3DArray; import static glide.utils.ArrayTransformUtils.castArray; @@ -2024,8 +2025,8 @@ public CompletableFuture> xclaim( @NonNull String[] ids, @NonNull StreamClaimOptions options) { String[] args = - concatenateArrays(new String[] {key, group, consumer, Long.toString(minIdleTime)}, ids); - args = concatenateArrays(args, options.toArgs(false)); + concatenateArrays( + new String[] {key, group, consumer, Long.toString(minIdleTime)}, ids, options.toArgs()); return commandManager.submitNewCommand(XClaim, args, this::handleMapResponse); } @@ -2036,8 +2037,13 @@ public CompletableFuture xclaimJustId( @NonNull String consumer, long minIdleTime, @NonNull String[] ids) { - return xclaimJustId( - key, group, consumer, minIdleTime, ids, StreamClaimOptions.builder().build()); + String[] args = + concatenateArrays( + new String[] {key, group, consumer, Long.toString(minIdleTime)}, + ids, + new String[] {JUST_ID_REDIS_API}); + return commandManager.submitNewCommand( + XClaim, args, response -> castArray(handleArrayResponse(response), String.class)); } @Override @@ -2049,8 +2055,11 @@ public CompletableFuture xclaimJustId( @NonNull String[] ids, @NonNull StreamClaimOptions options) { String[] args = - concatenateArrays(new String[] {key, group, consumer, Long.toString(minIdleTime)}, ids); - args = concatenateArrays(args, options.toArgs(true)); + concatenateArrays( + new String[] {key, group, consumer, Long.toString(minIdleTime)}, + ids, + options.toArgs(), + new String[] {JUST_ID_REDIS_API}); return commandManager.submitNewCommand( XClaim, args, response -> castArray(handleArrayResponse(response), String.class)); } diff --git a/java/client/src/main/java/glide/api/models/BaseTransaction.java b/java/client/src/main/java/glide/api/models/BaseTransaction.java index 4f7b9d6860..ed37e7512b 100644 --- a/java/client/src/main/java/glide/api/models/BaseTransaction.java +++ b/java/client/src/main/java/glide/api/models/BaseTransaction.java @@ -20,6 +20,7 @@ import static glide.api.models.commands.function.FunctionListOptions.LIBRARY_NAME_REDIS_API; import static glide.api.models.commands.function.FunctionListOptions.WITH_CODE_REDIS_API; import static glide.api.models.commands.function.FunctionLoadOptions.REPLACE; +import static glide.api.models.commands.stream.StreamClaimOptions.JUST_ID_REDIS_API; import static glide.utils.ArrayTransformUtils.concatenateArrays; import static glide.utils.ArrayTransformUtils.convertMapToKeyValueStringArray; import static glide.utils.ArrayTransformUtils.convertMapToValueKeyStringArray; @@ -3296,7 +3297,10 @@ public T xclaim( @NonNull String consumer, long minIdleTime, @NonNull String[] ids) { - return xclaim(key, group, consumer, minIdleTime, ids, StreamClaimOptions.builder().build()); + String[] args = + concatenateArrays(new String[] {key, group, consumer, Long.toString(minIdleTime)}, ids); + protobufTransaction.addCommands(buildCommand(XClaim, buildArgs(args))); + return getThis(); } /** @@ -3320,8 +3324,8 @@ public T xclaim( @NonNull String[] ids, @NonNull StreamClaimOptions options) { String[] args = - concatenateArrays(new String[] {key, group, consumer, Long.toString(minIdleTime)}, ids); - args = concatenateArrays(args, options.toArgs(false)); + concatenateArrays( + new String[] {key, group, consumer, Long.toString(minIdleTime)}, ids, options.toArgs()); protobufTransaction.addCommands(buildCommand(XClaim, buildArgs(args))); return getThis(); } @@ -3344,8 +3348,13 @@ public T xclaimJustId( @NonNull String consumer, long minIdleTime, @NonNull String[] ids) { - return xclaimJustId( - key, group, consumer, minIdleTime, ids, StreamClaimOptions.builder().build()); + String[] args = + concatenateArrays( + new String[] {key, group, consumer, Long.toString(minIdleTime)}, + ids, + new String[] {JUST_ID_REDIS_API}); + protobufTransaction.addCommands(buildCommand(XClaim, buildArgs(args))); + return getThis(); } /** @@ -3369,8 +3378,11 @@ public T xclaimJustId( @NonNull String[] ids, @NonNull StreamClaimOptions options) { String[] args = - concatenateArrays(new String[] {key, group, consumer, Long.toString(minIdleTime)}, ids); - args = concatenateArrays(args, options.toArgs(true)); + concatenateArrays( + new String[] {key, group, consumer, Long.toString(minIdleTime)}, + ids, + options.toArgs(), + new String[] {JUST_ID_REDIS_API}); protobufTransaction.addCommands(buildCommand(XClaim, buildArgs(args))); return getThis(); } diff --git a/java/client/src/main/java/glide/api/models/commands/stream/StreamClaimOptions.java b/java/client/src/main/java/glide/api/models/commands/stream/StreamClaimOptions.java index 3c7268012a..fdc19cbb54 100644 --- a/java/client/src/main/java/glide/api/models/commands/stream/StreamClaimOptions.java +++ b/java/client/src/main/java/glide/api/models/commands/stream/StreamClaimOptions.java @@ -30,9 +30,6 @@ public class StreamClaimOptions { /** Redis api string to designate JUSTID */ public static final String JUST_ID_REDIS_API = "JUSTID"; - /** Redis api string to designate LASTID */ - public static final String LAST_ID_REDIS_API = "LASTID"; - /** * Set the idle time (last time it was delivered) of the message. If idle is not * specified, an idle of 0 is assumed, that is, the time count is reset @@ -63,9 +60,6 @@ public class StreamClaimOptions { */ private final boolean isForce; - /** Filter up to the lastid when claiming messages */ - private final String lastId; - public static class StreamClaimOptionsBuilder { /** @@ -83,7 +77,7 @@ public StreamClaimOptionsBuilder force() { * * @return String[] */ - public String[] toArgs(boolean isJustId) { + public String[] toArgs() { List optionArgs = new ArrayList<>(); if (idle != null) { @@ -105,15 +99,6 @@ public String[] toArgs(boolean isJustId) { optionArgs.add(FORCE_REDIS_API); } - if (isJustId) { - optionArgs.add(JUST_ID_REDIS_API); - } - - if (lastId != null) { - optionArgs.add(LAST_ID_REDIS_API); - optionArgs.add(lastId); - } - return optionArgs.toArray(new String[0]); } } diff --git a/java/client/src/test/java/glide/api/RedisClientTest.java b/java/client/src/test/java/glide/api/RedisClientTest.java index e05d34bff8..444f562ae7 100644 --- a/java/client/src/test/java/glide/api/RedisClientTest.java +++ b/java/client/src/test/java/glide/api/RedisClientTest.java @@ -41,7 +41,6 @@ import static glide.api.models.commands.stream.StreamClaimOptions.FORCE_REDIS_API; import static glide.api.models.commands.stream.StreamClaimOptions.IDLE_REDIS_API; import static glide.api.models.commands.stream.StreamClaimOptions.JUST_ID_REDIS_API; -import static glide.api.models.commands.stream.StreamClaimOptions.LAST_ID_REDIS_API; import static glide.api.models.commands.stream.StreamClaimOptions.RETRY_COUNT_REDIS_API; import static glide.api.models.commands.stream.StreamClaimOptions.TIME_REDIS_API; import static glide.api.models.commands.stream.StreamGroupOptions.ENTRIES_READ_REDIS_API; @@ -5835,13 +5834,7 @@ public void xclaim_with_options_returns_success() { Long minIdleTime = 18L; String[] ids = new String[] {"testId"}; StreamClaimOptions options = - StreamClaimOptions.builder() - .force() - .idle(11L) - .idleUnixTime(12L) - .retryCount(5L) - .lastId("2345-5") - .build(); + StreamClaimOptions.builder().force().idle(11L).idleUnixTime(12L).retryCount(5L).build(); String[] arguments = new String[] { key, @@ -5855,9 +5848,7 @@ public void xclaim_with_options_returns_success() { "12", RETRY_COUNT_REDIS_API, "5", - FORCE_REDIS_API, - LAST_ID_REDIS_API, - "2345-5" + FORCE_REDIS_API }; Map mockResult = Map.of("1234-0", new String[] {"message", "log"}); @@ -5917,13 +5908,7 @@ public void xclaimJustId_with_options_returns_success() { Long minIdleTime = 18L; String[] ids = new String[] {"testId"}; StreamClaimOptions options = - StreamClaimOptions.builder() - .force() - .idle(11L) - .idleUnixTime(12L) - .retryCount(5L) - .lastId("2345-5") - .build(); + StreamClaimOptions.builder().force().idle(11L).idleUnixTime(12L).retryCount(5L).build(); String[] arguments = new String[] { key, @@ -5938,9 +5923,7 @@ public void xclaimJustId_with_options_returns_success() { RETRY_COUNT_REDIS_API, "5", FORCE_REDIS_API, - JUST_ID_REDIS_API, - LAST_ID_REDIS_API, - "2345-5" + JUST_ID_REDIS_API }; String[] mockResult = {"message", "log"}; diff --git a/java/client/src/test/java/glide/api/models/TransactionTests.java b/java/client/src/test/java/glide/api/models/TransactionTests.java index 45783ab212..2b66edf5f3 100644 --- a/java/client/src/test/java/glide/api/models/TransactionTests.java +++ b/java/client/src/test/java/glide/api/models/TransactionTests.java @@ -31,7 +31,6 @@ import static glide.api.models.commands.stream.StreamClaimOptions.FORCE_REDIS_API; import static glide.api.models.commands.stream.StreamClaimOptions.IDLE_REDIS_API; import static glide.api.models.commands.stream.StreamClaimOptions.JUST_ID_REDIS_API; -import static glide.api.models.commands.stream.StreamClaimOptions.LAST_ID_REDIS_API; import static glide.api.models.commands.stream.StreamClaimOptions.RETRY_COUNT_REDIS_API; import static glide.api.models.commands.stream.StreamClaimOptions.TIME_REDIS_API; import static glide.api.models.commands.stream.StreamGroupOptions.ENTRIES_READ_REDIS_API; @@ -882,13 +881,7 @@ InfScoreBound.NEGATIVE_INFINITY, new ScoreBoundary(3, false), new Limit(1, 2)), results.add(Pair.of(XClaim, buildArgs("key", "group", "consumer", "99", "12345-1", "98765-4"))); StreamClaimOptions claimOptions = - StreamClaimOptions.builder() - .force() - .idle(11L) - .idleUnixTime(12L) - .retryCount(5L) - .lastId("2345-5") - .build(); + StreamClaimOptions.builder().force().idle(11L).idleUnixTime(12L).retryCount(5L).build(); transaction.xclaim( "key", "group", "consumer", 99L, new String[] {"12345-1", "98765-4"}, claimOptions); results.add( @@ -907,9 +900,7 @@ InfScoreBound.NEGATIVE_INFINITY, new ScoreBoundary(3, false), new Limit(1, 2)), "12", RETRY_COUNT_REDIS_API, "5", - FORCE_REDIS_API, - LAST_ID_REDIS_API, - "2345-5"))); + FORCE_REDIS_API))); transaction.xclaimJustId("key", "group", "consumer", 99L, new String[] {"12345-1", "98765-4"}); results.add( @@ -936,9 +927,7 @@ InfScoreBound.NEGATIVE_INFINITY, new ScoreBoundary(3, false), new Limit(1, 2)), RETRY_COUNT_REDIS_API, "5", FORCE_REDIS_API, - JUST_ID_REDIS_API, - LAST_ID_REDIS_API, - "2345-5"))); + JUST_ID_REDIS_API))); transaction.time(); results.add(Pair.of(Time, buildArgs())); diff --git a/java/integTest/src/test/java/glide/SharedCommandTests.java b/java/integTest/src/test/java/glide/SharedCommandTests.java index 4d3ae6603d..3e36b64dc5 100644 --- a/java/integTest/src/test/java/glide/SharedCommandTests.java +++ b/java/integTest/src/test/java/glide/SharedCommandTests.java @@ -4410,9 +4410,35 @@ public void xpending_xclaim(BaseClient client) { .get(); assertArrayEquals(new String[] {streamid_3, streamid_5}, claimResultsJustId); - // acknowledge streams 2-4 and remove them from the xpending results + // add one more stream + String streamid_6 = client.xadd(key, Map.of("field6", "value6")).get(); + assertNotNull(streamid_5); + + // using force, we can xclaim the message without reading it + var claimForceResults = + client + .xclaim( + key, + groupName, + consumer2, + 0L, + new String[] {streamid_6}, + StreamClaimOptions.builder().force().retryCount(99L).build()) + .get(); + assertDeepEquals(Map.of(streamid_6, new String[] {"field6", "value6"}), claimForceResults); + + Object[][] forcePendingResults = + client.xpending(key, groupName, IdBound.of(streamid_6), IdBound.of(streamid_6), 1L).get(); + assertEquals(streamid_6, forcePendingResults[0][0]); + assertEquals(consumer2, forcePendingResults[0][1]); + assertEquals(99L, forcePendingResults[0][3]); + + // acknowledge streams 2, 3, 4, and 6 and remove them from the xpending results assertEquals( - 3L, client.xack(key, groupName, new String[] {streamid_2, streamid_3, streamid_4}).get()); + 4L, + client + .xack(key, groupName, new String[] {streamid_2, streamid_3, streamid_4, streamid_6}) + .get()); pending_results_extended = client @@ -4444,6 +4470,71 @@ public void xpending_xclaim(BaseClient client) { assertEquals(2, pending_results_extended.length); } + @SneakyThrows + @ParameterizedTest(autoCloseArguments = false) + @MethodSource("getClients") + public void xclaim_options(BaseClient client) { + + String key = UUID.randomUUID().toString(); + String groupName = "group" + UUID.randomUUID(); + String zeroStreamId = "0"; + String consumer1 = "consumer-1-" + UUID.randomUUID(); + String consumer2 = "consumer-2-" + UUID.randomUUID(); + + // create group and consumer for the group + assertEquals( + OK, + client + .xgroupCreate( + key, groupName, zeroStreamId, StreamGroupOptions.builder().makeStream().build()) + .get()); + assertTrue(client.xgroupCreateConsumer(key, groupName, consumer1).get()); + assertTrue(client.xgroupCreateConsumer(key, groupName, consumer2).get()); + + // Add two stream entries for consumer 1 + String streamid_1 = client.xadd(key, Map.of("field1", "value1")).get(); + assertNotNull(streamid_1); + String streamid_2 = client.xadd(key, Map.of("field2", "value2")).get(); + assertNotNull(streamid_2); + String streamid_4 = client.xadd(key, Map.of("field4", "value4")).get(); + assertNotNull(streamid_4); + String streamid_5 = client.xadd(key, Map.of("field5", "value5")).get(); + assertNotNull(streamid_5); + + // read the entire stream for the consumer and mark messages as pending + var result_1 = client.xreadgroup(Map.of(key, ">"), groupName, consumer1).get(); + assertDeepEquals( + Map.of( + key, + Map.of( + streamid_1, new String[][] {{"field1", "value1"}}, + streamid_2, new String[][] {{"field2", "value2"}}, + // streamid_3, new String[][] {{"field3", "value3"}}, + streamid_4, new String[][] {{"field4", "value4"}}, + streamid_5, new String[][] {{"field5", "value5"}})), + result_1); + + String streamid_3 = client.xadd(key, Map.of("field3", "value3")).get(); + assertNotNull(streamid_3); + + StreamClaimOptions options = StreamClaimOptions.builder().retryCount(100L).force().build(); + var claim_results = + client + .xclaim( + key, + groupName, + consumer2, + 0, + new String[] {streamid_1, streamid_3, streamid_5}, + options) + .get(); + System.out.println(claim_results); + + var pending_results = + client.xpending(key, groupName, IdBound.of(streamid_1), IdBound.of(streamid_3), 10L).get(); + System.out.println(pending_results); + } + @SneakyThrows @ParameterizedTest(autoCloseArguments = false) @MethodSource("getClients")