39
39
import org .springframework .test .context .ContextConfiguration ;
40
40
import org .testcontainers .junit .jupiter .Container ;
41
41
import org .testcontainers .junit .jupiter .Testcontainers ;
42
- import org .testcontainers .redpanda . RedpandaContainer ;
42
+ import org .testcontainers .kafka . KafkaContainer ;
43
43
44
44
import java .net .URI ;
45
45
import java .time .Duration ;
56
56
class TokenReplayIntegrationTest {
57
57
58
58
@ Container
59
- private static final RedpandaContainer REDPANDA_CONTAINER = new RedpandaContainer (
60
- "docker.redpanda.com/vectorized/redpanda:v22.2.1" );
61
- private ApplicationContextRunner testApplicationContext ;
59
+ private static final KafkaContainer KAFKA_CONTAINER = new KafkaContainer ("apache/kafka-native" )
60
+ .withEnv ("KAFKA_LISTENERS" , "PLAINTEXT://:9092,BROKER://:9093,CONTROLLER://:9094" );
62
61
62
+ private ApplicationContextRunner testApplicationContext ;
63
63
64
64
@ BeforeEach
65
65
void setUp () {
@@ -69,27 +69,27 @@ void setUp() {
69
69
.withPropertyValues ("axon.kafka.publisher.enabled=false" )
70
70
.withPropertyValues ("axon.kafka.message-converter-mode=cloud_event" )
71
71
.withPropertyValues ("axon.kafka.consumer.event-processor-mode=tracking" )
72
- .withPropertyValues ("axon.kafka.consumer.bootstrap-servers=" + REDPANDA_CONTAINER .getBootstrapServers ())
72
+ .withPropertyValues ("axon.kafka.consumer.bootstrap-servers=" + KAFKA_CONTAINER .getBootstrapServers ())
73
73
.withUserConfiguration (DefaultContext .class );
74
74
}
75
75
76
76
@ Test
77
77
void afterResetShouldOnlyProcessTenEventsIfTimeSetMidway () {
78
78
testApplicationContext
79
- .withPropertyValues ("axon.kafka.default-topic=counterfeed -1" )
79
+ .withPropertyValues ("axon.kafka.default-topic=counter-feed -1" )
80
80
.run (context -> {
81
81
Counter counter = context .getBean (Counter .class );
82
82
assertNotNull (counter );
83
83
assertEquals (0 , counter .getCount ());
84
- Instant between = addRecords ("counterfeed -1" );
84
+ Instant between = addRecords ("counter-feed -1" );
85
85
await ().atMost (Duration .ofSeconds (5L )).untilAsserted (
86
86
() -> assertEquals (20 , counter .getCount ())
87
87
);
88
88
EventProcessingConfiguration processingConfiguration = context .getBean (EventProcessingConfiguration .class );
89
89
assertNotNull (processingConfiguration );
90
90
processingConfiguration
91
91
.eventProcessorByProcessingGroup (
92
- "counterfeedprocessor " ,
92
+ "counter-feed-processor " ,
93
93
TrackingEventProcessor .class
94
94
)
95
95
.ifPresent (tep -> {
@@ -107,20 +107,20 @@ void afterResetShouldOnlyProcessTenEventsIfTimeSetMidway() {
107
107
@ Test
108
108
void afterResetShouldOnlyProcessNewMessages () {
109
109
testApplicationContext
110
- .withPropertyValues ("axon.kafka.default-topic=counterfeed -2" )
110
+ .withPropertyValues ("axon.kafka.default-topic=counter-feed -2" )
111
111
.run (context -> {
112
112
Counter counter = context .getBean (Counter .class );
113
113
assertNotNull (counter );
114
114
assertEquals (0 , counter .getCount ());
115
- addRecords ("counterfeed -2" );
115
+ addRecords ("counter-feed -2" );
116
116
await ().atMost (Duration .ofSeconds (5L )).untilAsserted (
117
117
() -> assertEquals (20 , counter .getCount ())
118
118
);
119
119
EventProcessingConfiguration processingConfiguration = context .getBean (EventProcessingConfiguration .class );
120
120
assertNotNull (processingConfiguration );
121
121
processingConfiguration
122
122
.eventProcessorByProcessingGroup (
123
- "counterfeedprocessor " ,
123
+ "counter-feed-processor " ,
124
124
TrackingEventProcessor .class
125
125
)
126
126
.ifPresent (tep -> {
@@ -129,15 +129,15 @@ void afterResetShouldOnlyProcessNewMessages() {
129
129
assertEquals (0 , counter .getCount ());
130
130
tep .start ();
131
131
});
132
- addRecords ("counterfeed -2" );
132
+ addRecords ("counter-feed -2" );
133
133
await ().atMost (Duration .ofSeconds (5L )).untilAsserted (
134
134
() -> assertEquals (20 , counter .getCount ())
135
135
);
136
136
});
137
137
}
138
138
139
139
private Instant addRecords (String topic ) {
140
- Producer <String , CloudEvent > producer = newProducer (REDPANDA_CONTAINER .getBootstrapServers ());
140
+ Producer <String , CloudEvent > producer = newProducer (KAFKA_CONTAINER .getBootstrapServers ());
141
141
sendTenMessages (producer , topic );
142
142
Instant now = Instant .now ();
143
143
sendTenMessages (producer , topic );
@@ -146,12 +146,11 @@ private Instant addRecords(String topic) {
146
146
}
147
147
148
148
private void sendMessage (Producer <String , CloudEvent > producer , String topic ) {
149
- CloudEvent event = new CloudEventBuilder ()
150
- .withId (UUID .randomUUID ().toString ())
151
- .withSource (URI .create ("source" ))
152
- .withData ("Payload" .getBytes ())
153
- .withType ("java.util.String" )
154
- .build ();
149
+ CloudEvent event = new CloudEventBuilder ().withId (UUID .randomUUID ().toString ())
150
+ .withSource (URI .create ("source" ))
151
+ .withData ("Payload" .getBytes ())
152
+ .withType ("java.util.String" )
153
+ .build ();
155
154
ProducerRecord <String , CloudEvent > record = new ProducerRecord <>(topic , 0 , null , null , event );
156
155
producer .send (record );
157
156
}
@@ -182,7 +181,7 @@ public void registerProcessor(
182
181
StreamableKafkaMessageSource <?, ?> streamableKafkaMessageSource
183
182
) {
184
183
configurer .eventProcessing ()
185
- .registerTrackingEventProcessor ("counterfeedprocessor " , c -> streamableKafkaMessageSource );
184
+ .registerTrackingEventProcessor ("counter-feed-processor " , c -> streamableKafkaMessageSource );
186
185
}
187
186
}
188
187
@@ -205,7 +204,7 @@ void reset() {
205
204
206
205
@ SuppressWarnings ("unused" )
207
206
@ Component
208
- @ ProcessingGroup ("counterfeedprocessor " )
207
+ @ ProcessingGroup ("counter-feed-processor " )
209
208
private static class KafkaEventHandler {
210
209
211
210
private final Counter counter ;
0 commit comments