|
70 | 70 | import glide.api.models.commands.geospatial.GeoUnit;
|
71 | 71 | import glide.api.models.commands.geospatial.GeospatialData;
|
72 | 72 | import glide.api.models.commands.stream.StreamAddOptions;
|
| 73 | +import glide.api.models.commands.stream.StreamClaimOptions; |
73 | 74 | import glide.api.models.commands.stream.StreamGroupOptions;
|
74 | 75 | import glide.api.models.commands.stream.StreamPendingOptions;
|
75 | 76 | import glide.api.models.commands.stream.StreamRange.IdBound;
|
@@ -4393,9 +4394,21 @@ public void xpending_xclaim(BaseClient client) {
|
4393 | 4394 | assertTrue((Long) pending_results_extended[4][2] >= 0L);
|
4394 | 4395 |
|
4395 | 4396 | // use claim to claim stream 3 and 4 for consumer 1
|
4396 |
| - var claim_results = |
| 4397 | + var claimResults = |
4397 | 4398 | client.xclaim(key, groupName, consumer1, 0L, new String[] {streamid_3, streamid_5}).get();
|
4398 |
| - assertDeepEquals(Map.of(streamid_3, new String[] {"field3", "value3"}, streamid_5, new String[] {"field5", "value5"}), claim_results); |
| 4399 | + assertDeepEquals( |
| 4400 | + Map.of( |
| 4401 | + streamid_3, |
| 4402 | + new String[] {"field3", "value3"}, |
| 4403 | + streamid_5, |
| 4404 | + new String[] {"field5", "value5"}), |
| 4405 | + claimResults); |
| 4406 | + |
| 4407 | + var claimResultsJustId = |
| 4408 | + client |
| 4409 | + .xclaimJustId(key, groupName, consumer1, 0L, new String[] {streamid_3, streamid_5}) |
| 4410 | + .get(); |
| 4411 | + assertArrayEquals(new String[] {streamid_3, streamid_5}, claimResultsJustId); |
4399 | 4412 |
|
4400 | 4413 | // acknowledge streams 2-4 and remove them from the xpending results
|
4401 | 4414 | assertEquals(
|
@@ -4600,21 +4613,133 @@ public void xclaim_return_failures(BaseClient client) {
|
4600 | 4613 | String groupName = "group" + UUID.randomUUID();
|
4601 | 4614 | String zeroStreamId = "0";
|
4602 | 4615 | String consumer1 = "consumer-1-" + UUID.randomUUID();
|
| 4616 | + String consumer2 = "consumer-2-" + UUID.randomUUID(); |
4603 | 4617 |
|
4604 | 4618 | // create group and consumer for the group
|
4605 | 4619 | assertEquals(
|
4606 |
| - OK, |
4607 |
| - client |
4608 |
| - .xgroupCreate( |
4609 |
| - key, groupName, zeroStreamId, StreamGroupOptions.builder().makeStream().build()) |
4610 |
| - .get()); |
| 4620 | + OK, |
| 4621 | + client |
| 4622 | + .xgroupCreate( |
| 4623 | + key, groupName, zeroStreamId, StreamGroupOptions.builder().makeStream().build()) |
| 4624 | + .get()); |
4611 | 4625 | assertTrue(client.xgroupCreateConsumer(key, groupName, consumer1).get());
|
4612 | 4626 |
|
4613 |
| - // Add two stream entries for consumer 1 |
| 4627 | + // Add two stream entries and mark as pending: |
4614 | 4628 | String streamid_1 = client.xadd(key, Map.of("field1", "value1")).get();
|
4615 | 4629 | assertNotNull(streamid_1);
|
4616 |
| - String streamid_2 = client.xadd(key, Map.of("field2", "value2")).get(); |
4617 |
| - assertNotNull(streamid_2); |
| 4630 | + assertNotNull(client.xreadgroup(Map.of(key, ">"), groupName, consumer1).get()); |
| 4631 | + |
| 4632 | + // claim with invalid stream entry IDs |
| 4633 | + ExecutionException executionException = |
| 4634 | + assertThrows( |
| 4635 | + ExecutionException.class, |
| 4636 | + () -> |
| 4637 | + client.xclaimJustId(key, groupName, consumer1, 1L, new String[] {"invalid"}).get()); |
| 4638 | + assertInstanceOf(RequestException.class, executionException.getCause()); |
| 4639 | + |
| 4640 | + // non-existent key throws a RequestError (NOGROUP) |
| 4641 | + executionException = |
| 4642 | + assertThrows( |
| 4643 | + ExecutionException.class, |
| 4644 | + () -> |
| 4645 | + client |
| 4646 | + .xclaim(stringkey, groupName, consumer1, 1L, new String[] {streamid_1}) |
| 4647 | + .get()); |
| 4648 | + assertInstanceOf(RequestException.class, executionException.getCause()); |
| 4649 | + assertTrue(executionException.getMessage().contains("NOGROUP")); |
| 4650 | + |
| 4651 | + final var claimOptions = StreamClaimOptions.builder().idle(1L).build(); |
| 4652 | + executionException = |
| 4653 | + assertThrows( |
| 4654 | + ExecutionException.class, |
| 4655 | + () -> |
| 4656 | + client |
| 4657 | + .xclaim( |
| 4658 | + stringkey, |
| 4659 | + groupName, |
| 4660 | + consumer1, |
| 4661 | + 1L, |
| 4662 | + new String[] {streamid_1}, |
| 4663 | + claimOptions) |
| 4664 | + .get()); |
| 4665 | + assertInstanceOf(RequestException.class, executionException.getCause()); |
| 4666 | + assertTrue(executionException.getMessage().contains("NOGROUP")); |
| 4667 | + |
| 4668 | + executionException = |
| 4669 | + assertThrows( |
| 4670 | + ExecutionException.class, |
| 4671 | + () -> |
| 4672 | + client |
| 4673 | + .xclaimJustId(stringkey, groupName, consumer1, 1L, new String[] {streamid_1}) |
| 4674 | + .get()); |
| 4675 | + assertInstanceOf(RequestException.class, executionException.getCause()); |
| 4676 | + assertTrue(executionException.getMessage().contains("NOGROUP")); |
| 4677 | + |
| 4678 | + executionException = |
| 4679 | + assertThrows( |
| 4680 | + ExecutionException.class, |
| 4681 | + () -> |
| 4682 | + client |
| 4683 | + .xclaimJustId( |
| 4684 | + stringkey, |
| 4685 | + groupName, |
| 4686 | + consumer1, |
| 4687 | + 1L, |
| 4688 | + new String[] {streamid_1}, |
| 4689 | + claimOptions) |
| 4690 | + .get()); |
| 4691 | + assertInstanceOf(RequestException.class, executionException.getCause()); |
| 4692 | + assertTrue(executionException.getMessage().contains("NOGROUP")); |
| 4693 | + |
| 4694 | + // Key exists, but it is not a stream |
| 4695 | + assertEquals(OK, client.set(stringkey, "bar").get()); |
| 4696 | + executionException = |
| 4697 | + assertThrows( |
| 4698 | + ExecutionException.class, |
| 4699 | + () -> |
| 4700 | + client |
| 4701 | + .xclaim(stringkey, groupName, consumer1, 1L, new String[] {streamid_1}) |
| 4702 | + .get()); |
| 4703 | + assertInstanceOf(RequestException.class, executionException.getCause()); |
| 4704 | + |
| 4705 | + executionException = |
| 4706 | + assertThrows( |
| 4707 | + ExecutionException.class, |
| 4708 | + () -> |
| 4709 | + client |
| 4710 | + .xclaim( |
| 4711 | + stringkey, |
| 4712 | + groupName, |
| 4713 | + consumer1, |
| 4714 | + 1L, |
| 4715 | + new String[] {streamid_1}, |
| 4716 | + claimOptions) |
| 4717 | + .get()); |
| 4718 | + assertInstanceOf(RequestException.class, executionException.getCause()); |
| 4719 | + |
| 4720 | + executionException = |
| 4721 | + assertThrows( |
| 4722 | + ExecutionException.class, |
| 4723 | + () -> |
| 4724 | + client |
| 4725 | + .xclaimJustId(stringkey, groupName, consumer1, 1L, new String[] {streamid_1}) |
| 4726 | + .get()); |
| 4727 | + assertInstanceOf(RequestException.class, executionException.getCause()); |
| 4728 | + |
| 4729 | + executionException = |
| 4730 | + assertThrows( |
| 4731 | + ExecutionException.class, |
| 4732 | + () -> |
| 4733 | + client |
| 4734 | + .xclaimJustId( |
| 4735 | + stringkey, |
| 4736 | + groupName, |
| 4737 | + consumer1, |
| 4738 | + 1L, |
| 4739 | + new String[] {streamid_1}, |
| 4740 | + claimOptions) |
| 4741 | + .get()); |
| 4742 | + assertInstanceOf(RequestException.class, executionException.getCause()); |
4618 | 4743 | }
|
4619 | 4744 |
|
4620 | 4745 | @SneakyThrows
|
|
0 commit comments