39
39
import org .springframework .test .context .ContextConfiguration ;
40
40
import org .testcontainers .junit .jupiter .Container ;
41
41
import org .testcontainers .junit .jupiter .Testcontainers ;
42
- import org .testcontainers .redpanda . RedpandaContainer ;
42
+ import org .testcontainers .kafka . KafkaContainer ;
43
43
44
44
import java .net .URI ;
45
45
import java .time .Duration ;
56
56
class TokenReplayIntegrationTest {
57
57
58
58
@ Container
59
- private static final RedpandaContainer REDPANDA_CONTAINER = new RedpandaContainer (
60
- "docker.redpanda.com/vectorized/redpanda:v22.2.1" );
61
- private ApplicationContextRunner testApplicationContext ;
59
+ private static final KafkaContainer KAFKA_CONTAINER = new KafkaContainer ("apache/kafka-native" );
62
60
61
+ private ApplicationContextRunner testApplicationContext ;
63
62
64
63
@ BeforeEach
65
64
void setUp () {
@@ -69,27 +68,27 @@ void setUp() {
69
68
.withPropertyValues ("axon.kafka.publisher.enabled=false" )
70
69
.withPropertyValues ("axon.kafka.message-converter-mode=cloud_event" )
71
70
.withPropertyValues ("axon.kafka.consumer.event-processor-mode=tracking" )
72
- .withPropertyValues ("axon.kafka.consumer.bootstrap-servers=" + REDPANDA_CONTAINER .getBootstrapServers ())
71
+ .withPropertyValues ("axon.kafka.consumer.bootstrap-servers=" + KAFKA_CONTAINER .getBootstrapServers ())
73
72
.withUserConfiguration (DefaultContext .class );
74
73
}
75
74
76
75
@ Test
77
76
void afterResetShouldOnlyProcessTenEventsIfTimeSetMidway () {
78
77
testApplicationContext
79
- .withPropertyValues ("axon.kafka.default-topic=counterfeed -1" )
78
+ .withPropertyValues ("axon.kafka.default-topic=counter-feed -1" )
80
79
.run (context -> {
81
80
Counter counter = context .getBean (Counter .class );
82
81
assertNotNull (counter );
83
82
assertEquals (0 , counter .getCount ());
84
- Instant between = addRecords ("counterfeed -1" );
83
+ Instant between = addRecords ("counter-feed -1" );
85
84
await ().atMost (Duration .ofSeconds (5L )).untilAsserted (
86
85
() -> assertEquals (20 , counter .getCount ())
87
86
);
88
87
EventProcessingConfiguration processingConfiguration = context .getBean (EventProcessingConfiguration .class );
89
88
assertNotNull (processingConfiguration );
90
89
processingConfiguration
91
90
.eventProcessorByProcessingGroup (
92
- "counterfeedprocessor " ,
91
+ "counter-feed-processor " ,
93
92
TrackingEventProcessor .class
94
93
)
95
94
.ifPresent (tep -> {
@@ -107,20 +106,20 @@ void afterResetShouldOnlyProcessTenEventsIfTimeSetMidway() {
107
106
@ Test
108
107
void afterResetShouldOnlyProcessNewMessages () {
109
108
testApplicationContext
110
- .withPropertyValues ("axon.kafka.default-topic=counterfeed -2" )
109
+ .withPropertyValues ("axon.kafka.default-topic=counter-feed -2" )
111
110
.run (context -> {
112
111
Counter counter = context .getBean (Counter .class );
113
112
assertNotNull (counter );
114
113
assertEquals (0 , counter .getCount ());
115
- addRecords ("counterfeed -2" );
114
+ addRecords ("counter-feed -2" );
116
115
await ().atMost (Duration .ofSeconds (5L )).untilAsserted (
117
116
() -> assertEquals (20 , counter .getCount ())
118
117
);
119
118
EventProcessingConfiguration processingConfiguration = context .getBean (EventProcessingConfiguration .class );
120
119
assertNotNull (processingConfiguration );
121
120
processingConfiguration
122
121
.eventProcessorByProcessingGroup (
123
- "counterfeedprocessor " ,
122
+ "counter-feed-processor " ,
124
123
TrackingEventProcessor .class
125
124
)
126
125
.ifPresent (tep -> {
@@ -129,15 +128,15 @@ void afterResetShouldOnlyProcessNewMessages() {
129
128
assertEquals (0 , counter .getCount ());
130
129
tep .start ();
131
130
});
132
- addRecords ("counterfeed -2" );
131
+ addRecords ("counter-feed -2" );
133
132
await ().atMost (Duration .ofSeconds (5L )).untilAsserted (
134
133
() -> assertEquals (20 , counter .getCount ())
135
134
);
136
135
});
137
136
}
138
137
139
138
private Instant addRecords (String topic ) {
140
- Producer <String , CloudEvent > producer = newProducer (REDPANDA_CONTAINER .getBootstrapServers ());
139
+ Producer <String , CloudEvent > producer = newProducer (KAFKA_CONTAINER .getBootstrapServers ());
141
140
sendTenMessages (producer , topic );
142
141
Instant now = Instant .now ();
143
142
sendTenMessages (producer , topic );
@@ -146,12 +145,11 @@ private Instant addRecords(String topic) {
146
145
}
147
146
148
147
private void sendMessage (Producer <String , CloudEvent > producer , String topic ) {
149
- CloudEvent event = new CloudEventBuilder ()
150
- .withId (UUID .randomUUID ().toString ())
151
- .withSource (URI .create ("source" ))
152
- .withData ("Payload" .getBytes ())
153
- .withType ("java.util.String" )
154
- .build ();
148
+ CloudEvent event = new CloudEventBuilder ().withId (UUID .randomUUID ().toString ())
149
+ .withSource (URI .create ("source" ))
150
+ .withData ("Payload" .getBytes ())
151
+ .withType ("java.util.String" )
152
+ .build ();
155
153
ProducerRecord <String , CloudEvent > record = new ProducerRecord <>(topic , 0 , null , null , event );
156
154
producer .send (record );
157
155
}
@@ -182,7 +180,7 @@ public void registerProcessor(
182
180
StreamableKafkaMessageSource <?, ?> streamableKafkaMessageSource
183
181
) {
184
182
configurer .eventProcessing ()
185
- .registerTrackingEventProcessor ("counterfeedprocessor " , c -> streamableKafkaMessageSource );
183
+ .registerTrackingEventProcessor ("counter-feed-processor " , c -> streamableKafkaMessageSource );
186
184
}
187
185
}
188
186
@@ -205,7 +203,7 @@ void reset() {
205
203
206
204
@ SuppressWarnings ("unused" )
207
205
@ Component
208
- @ ProcessingGroup ("counterfeedprocessor " )
206
+ @ ProcessingGroup ("counter-feed-processor " )
209
207
private static class KafkaEventHandler {
210
208
211
209
private final Counter counter ;
0 commit comments