Skip to content

Commit 3e4665b

Browse files
Merge pull request #105 from AxonIQ/feature/add-total-number-for-pagination-support
Total number for pagination support
2 parents 7929c67 + 29b0416 commit 3e4665b

File tree

8 files changed

+85
-18
lines changed

8 files changed

+85
-18
lines changed

console-framework-client-api/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
<parent>
2222
<groupId>io.axoniq.console</groupId>
2323
<artifactId>console-framework-client-parent</artifactId>
24-
<version>1.10.1-SNAPSHOT</version>
24+
<version>1.10.2-SNAPSHOT</version>
2525
</parent>
2626
<modelVersion>4.0.0</modelVersion>
2727

console-framework-client-api/src/main/java/io/axoniq/console/framework/api/Routes.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,10 @@ object Routes {
5757
const val SEQUENCE_SIZE = "dlq-query-dead-letter-sequence-size"
5858

5959
const val DELETE_SEQUENCE = "dlq-command-delete-sequence"
60+
const val DELETE_ALL_SEQUENCES = "dlq-command-delete-all-sequences"
6061
const val DELETE_LETTER = "dlq-command-delete-letter"
6162
const val PROCESS = "dlq-command-process"
63+
const val PROCESS_ALL_SEQUENCES = "dlq-command-process-all-sequences"
6264
}
6365
}
6466

console-framework-client-api/src/main/java/io/axoniq/console/framework/api/deadLetterApi.kt

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@ data class DeadLetter(
3131
)
3232

3333
data class DeadLetterResponse(
34-
val sequences: List<List<DeadLetter>>
34+
val sequences: List<List<DeadLetter>>,
35+
val totalCount: Long = -1
3536
)
3637

3738
data class DeadLetterRequest(
@@ -57,6 +58,15 @@ data class DeadLetterSingleDeleteRequest(
5758
val messageIdentifier: String,
5859
)
5960

61+
data class ProcessAllDeadLetterSequencesRequest(
62+
val processingGroup: String,
63+
val maxMessages: Int = 10
64+
)
65+
66+
data class DeleteAllDeadLetterSequencesRequest(
67+
val processingGroup: String
68+
)
69+
6070
data class DeadLetterProcessRequest(
6171
val processingGroup: String,
6272
val messageIdentifier: String

console-framework-client-spring-boot-starter/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
<parent>
2222
<groupId>io.axoniq.console</groupId>
2323
<artifactId>console-framework-client-parent</artifactId>
24-
<version>1.10.1-SNAPSHOT</version>
24+
<version>1.10.2-SNAPSHOT</version>
2525
</parent>
2626
<modelVersion>4.0.0</modelVersion>
2727

console-framework-client/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
<parent>
2222
<groupId>io.axoniq.console</groupId>
2323
<artifactId>console-framework-client-parent</artifactId>
24-
<version>1.10.1-SNAPSHOT</version>
24+
<version>1.10.2-SNAPSHOT</version>
2525
</parent>
2626
<modelVersion>4.0.0</modelVersion>
2727

console-framework-client/src/main/java/io/axoniq/console/framework/eventprocessor/DeadLetterManager.kt

Lines changed: 46 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import java.util.concurrent.Callable
2828
import java.util.concurrent.ExecutorService
2929
import java.util.concurrent.TimeUnit
3030
import io.axoniq.console.framework.api.DeadLetter as ApiDeadLetter
31+
import io.axoniq.console.framework.api.DeadLetterResponse
3132

3233
private const val LETTER_PAYLOAD_SIZE_LIMIT = 1024
3334
private const val MASKED = "<MASKED>"
@@ -46,20 +47,21 @@ class DeadLetterManager(
4647
offset: Int = 0,
4748
size: Int = 25,
4849
maxSequenceLetters: Int = 10
49-
): List<List<ApiDeadLetter>> {
50+
): DeadLetterResponse {
5051
if (dlqMode == AxoniqConsoleDlqMode.NONE) {
51-
return emptyList()
52+
return DeadLetterResponse(emptyList(), 0)
5253
}
53-
return dlqFor(processingGroup)
54-
.deadLetters()
55-
.drop(offset)
56-
.take(size)
57-
.map { sequence ->
58-
sequence
59-
.asIterable()
60-
.take(maxSequenceLetters)
61-
.map { toDeadLetter(it, processingGroup) }
62-
}
54+
val queue = dlqFor(processingGroup)
55+
val sequences = queue
56+
.deadLetters()
57+
.drop(offset)
58+
.take(size)
59+
.map { sequence ->
60+
sequence.asIterable()
61+
.take(maxSequenceLetters)
62+
.map { toDeadLetter(it, processingGroup) }
63+
}
64+
return DeadLetterResponse(sequences, queue.amountOfSequences())
6365
}
6466

6567
private fun toDeadLetter(letter: DeadLetter<out EventMessage<*>>, processingGroup: String) =
@@ -169,6 +171,38 @@ class DeadLetterManager(
169171
}).get(60, TimeUnit.SECONDS)
170172
}
171173

174+
fun processAll(
175+
processingGroup: String,
176+
maxMessages: Int? = null,
177+
timeoutSeconds: Long = 600
178+
): Int {
179+
return executor.submit(Callable {
180+
val processor = letterProcessorFor(processingGroup)
181+
var processedCount = 0
182+
183+
// Process all messages or up to maxMessages limit
184+
while ((maxMessages == null || processedCount < maxMessages)) {
185+
val processed = processor.process { true }
186+
if (!processed) break // No more messages to process
187+
processedCount++
188+
}
189+
190+
processedCount
191+
}).get(timeoutSeconds, TimeUnit.SECONDS)
192+
}
193+
194+
fun deleteAll(
195+
processingGroup: String,
196+
timeoutSeconds: Long = 600
197+
): Int {
198+
return executor.submit(Callable {
199+
val dlq = dlqFor(processingGroup)
200+
val totalCount = dlq.size().toInt()
201+
dlq.clear()
202+
totalCount
203+
}).get(timeoutSeconds, TimeUnit.SECONDS)
204+
}
205+
172206
private fun String.hashIfNeeded(): String {
173207
return if (dlqMode == AxoniqConsoleDlqMode.MASKED) {
174208
DigestUtils.sha256Hex(this)

console-framework-client/src/main/java/io/axoniq/console/framework/eventprocessor/RSocketDlqResponder.kt

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,11 +58,21 @@ open class RSocketDlqResponder(
5858
DeadLetterProcessRequest::class.java,
5959
this::handleProcessCommand
6060
)
61+
registrar.registerHandlerWithPayload(
62+
Routes.ProcessingGroup.DeadLetter.PROCESS_ALL_SEQUENCES,
63+
ProcessAllDeadLetterSequencesRequest::class.java,
64+
this::handleProcessAllSequencesCommand
65+
)
66+
registrar.registerHandlerWithPayload(
67+
Routes.ProcessingGroup.DeadLetter.DELETE_ALL_SEQUENCES,
68+
DeleteAllDeadLetterSequencesRequest::class.java,
69+
this::handleDeleteAllSequencesCommand
70+
)
6171
}
6272

6373
private fun handleDeadLetterQuery(request: DeadLetterRequest): DeadLetterResponse {
6474
logger.debug("Handling AxonIQ Console DEAD_LETTERS query for request [{}]", request)
65-
return DeadLetterResponse(deadLetterManager.deadLetters(request.processingGroup, request.offset, request.size))
75+
return deadLetterManager.deadLetters(request.processingGroup, request.offset, request.size)
6676
}
6777

6878
private fun handleSequenceSizeQuery(request: DeadLetterSequenceSize): Long {
@@ -93,4 +103,15 @@ open class RSocketDlqResponder(
93103
logger.debug("Handling AxonIQ Console DEAD LETTERS query for processing group [{}]", request.processingGroup)
94104
return deadLetterManager.process(request.processingGroup, request.messageIdentifier)
95105
}
106+
107+
private fun handleProcessAllSequencesCommand(request: ProcessAllDeadLetterSequencesRequest): Int {
108+
logger.debug("Handling AxonIQ Console PROCESS_ALL_DEAD_LETTER_SEQUENCES commands for processing group [{}]", request.processingGroup)
109+
return deadLetterManager.processAll(request.processingGroup, request.maxMessages)
110+
}
111+
112+
private fun handleDeleteAllSequencesCommand(request: DeleteAllDeadLetterSequencesRequest): Int {
113+
logger.debug("Handling AxonIQ Console DELETE_ALL_DEAD_LETTER_SEQUENCES commands for processing group [{}]", request.processingGroup)
114+
return deadLetterManager.deleteAll(request.processingGroup)
115+
}
116+
96117
}

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222

2323
<groupId>io.axoniq.console</groupId>
2424
<artifactId>console-framework-client-parent</artifactId>
25-
<version>1.10.1-SNAPSHOT</version>
25+
<version>1.10.2-SNAPSHOT</version>
2626

2727
<modules>
2828
<module>console-framework-client-api</module>

0 commit comments

Comments
 (0)