21
21
import org .apache .kafka .common .header .Headers ;
22
22
import org .apache .kafka .common .header .internals .RecordHeader ;
23
23
import org .axonframework .common .AxonConfigurationException ;
24
+ import org .axonframework .eventhandling .EventData ;
24
25
import org .axonframework .eventhandling .EventMessage ;
26
+ import org .axonframework .eventhandling .GenericDomainEventEntry ;
25
27
import org .axonframework .eventhandling .GenericDomainEventMessage ;
26
28
import org .axonframework .eventhandling .GenericEventMessage ;
27
29
import org .axonframework .eventhandling .async .SequencingPolicy ;
31
33
import org .axonframework .serialization .SerializedMessage ;
32
34
import org .axonframework .serialization .SerializedObject ;
33
35
import org .axonframework .serialization .Serializer ;
34
- import org .axonframework .serialization .SimpleSerializedObject ;
36
+ import org .axonframework .serialization .upcasting .event .EventUpcasterChain ;
37
+ import org .axonframework .serialization .upcasting .event .InitialEventRepresentation ;
35
38
import org .slf4j .Logger ;
36
39
import org .slf4j .LoggerFactory ;
37
40
38
41
import java .time .Instant ;
39
42
import java .util .Arrays ;
40
43
import java .util .Optional ;
41
44
import java .util .function .BiFunction ;
45
+ import java .util .stream .Stream ;
42
46
43
47
import static org .axonframework .common .BuilderUtils .assertNonNull ;
44
48
import static org .axonframework .extensions .kafka .eventhandling .HeaderUtils .*;
45
49
import static org .axonframework .messaging .Headers .*;
46
50
47
51
/**
48
- * Converts and {@link EventMessage} to a {@link ProducerRecord} Kafka message and { from a @link ConsumerRecord} Kafka
52
+ * Converts and {@link EventMessage} to a {@link ProducerRecord} Kafka message and from a { @link ConsumerRecord} Kafka
49
53
* message back to an EventMessage (if possible).
50
54
* <p>
51
55
* During conversion meta data entries with the {@code 'axon-metadata-'} prefix are passed to the {@link Headers}. Other
52
56
* message-specific attributes are added as metadata. The {@link EventMessage#getPayload()} is serialized using the
53
- * configured {@link Serializer} and passed as the Kafka recordd 's body.
57
+ * configured {@link Serializer} and passed as the Kafka record 's body.
54
58
* <p>
59
+ * <p>
60
+ * If an up-caster / up-caster chain is configured, this converter will pass the converted messages through it.
61
+ * Please note, that since the message converter consumes records one-by-one, the up-casting functionality is
62
+ * limited to one-to-one and one-to-many up-casters only.
63
+ * </p>
55
64
* This implementation will suffice in most cases.
56
65
*
57
66
* @author Nakul Mishra
@@ -65,6 +74,7 @@ public class DefaultKafkaMessageConverter implements KafkaMessageConverter<Strin
65
74
private final Serializer serializer ;
66
75
private final SequencingPolicy <? super EventMessage <?>> sequencingPolicy ;
67
76
private final BiFunction <String , Object , RecordHeader > headerValueMapper ;
77
+ private final EventUpcasterChain upcasterChain ;
68
78
69
79
/**
70
80
* Instantiate a {@link DefaultKafkaMessageConverter} based on the fields contained in the {@link Builder}.
@@ -80,6 +90,7 @@ protected DefaultKafkaMessageConverter(Builder builder) {
80
90
this .serializer = builder .serializer ;
81
91
this .sequencingPolicy = builder .sequencingPolicy ;
82
92
this .headerValueMapper = builder .headerValueMapper ;
93
+ this .upcasterChain = builder .upcasterChain ;
83
94
}
84
95
85
96
/**
@@ -131,43 +142,93 @@ public Optional<EventMessage<?>> readKafkaMessage(ConsumerRecord<String, byte[]>
131
142
Headers headers = consumerRecord .headers ();
132
143
if (isAxonMessage (headers )) {
133
144
byte [] messageBody = consumerRecord .value ();
134
- SerializedMessage <?> message = extractSerializedMessage (headers , messageBody );
135
- return buildMessage (headers , message );
145
+ final EventData <?> eventData = createEventData (headers , messageBody );
146
+ return upcasterChain
147
+ .upcast (Stream .of (new InitialEventRepresentation (eventData , serializer )))
148
+ .findFirst ()
149
+ .map (upcastedEventData -> new SerializedMessage <>(
150
+ upcastedEventData .getMessageIdentifier (),
151
+ new LazyDeserializingObject <>(upcastedEventData .getData (), serializer ),
152
+ upcastedEventData .getMetaData ()
153
+ )
154
+ ).flatMap (serializedMessage -> buildMessage (headers , serializedMessage ));
136
155
}
137
156
} catch (Exception e ) {
138
157
logger .trace ("Error converting ConsumerRecord [{}] to an EventMessage" , consumerRecord , e );
139
158
}
140
-
141
159
return Optional .empty ();
142
160
}
143
161
144
- private boolean isAxonMessage (Headers headers ) {
145
- return keys (headers ).containsAll (Arrays .asList (MESSAGE_ID , MESSAGE_TYPE ));
146
- }
147
-
148
- private SerializedMessage <?> extractSerializedMessage (Headers headers , byte [] messageBody ) {
149
- SimpleSerializedObject <byte []> serializedObject = new SimpleSerializedObject <>(
150
- messageBody ,
151
- byte [].class ,
162
+ /**
163
+ * Constructs event data representation from given Kafka headers and byte array body.
164
+ * <p>
165
+ * This method <i>reuses</i> the {@link GenericDomainEventEntry} class for both types of events which can be
166
+ * transmitted via Kafka. For domain events, the fields <code>aggregateType</code>, <code>aggregateId</code> and
167
+ * <code>aggregateSeq</code> will contain the corresponding values, but for the simple event they will be
168
+ * <code>null</code>. This is ok to pass <code>null</code> to those values and <code>0L</code> to
169
+ * <code>aggregateSeq</code>, since the {@link InitialEventRepresentation} does the same in its constructor and
170
+ * is implemented in a null-tolerant way. Check {@link DefaultKafkaMessageConverter#isDomainEvent(Headers)} for more
171
+ * details.
172
+ * </p>
173
+ *
174
+ * @param headers Kafka headers.
175
+ * @param messageBody Kafka payload as a byte array.
176
+ * @return event data.
177
+ */
178
+ private EventData <?> createEventData (Headers headers , byte [] messageBody ) {
179
+ return new GenericDomainEventEntry <>(
180
+ valueAsString (headers , AGGREGATE_TYPE ),
181
+ valueAsString (headers , AGGREGATE_ID ),
182
+ valueAsLong (headers , AGGREGATE_SEQ , 0L ),
183
+ valueAsString (headers , MESSAGE_ID ),
184
+ valueAsLong (headers , MESSAGE_TIMESTAMP ),
152
185
valueAsString (headers , MESSAGE_TYPE ),
153
- valueAsString (headers , MESSAGE_REVISION , null )
186
+ valueAsString (headers , MESSAGE_REVISION , null ),
187
+ messageBody ,
188
+ extractMetadataAsBytes (headers )
154
189
);
190
+ }
155
191
156
- return new SerializedMessage <>(
157
- valueAsString (headers , MESSAGE_ID ),
158
- new LazyDeserializingObject <>(serializedObject , serializer ),
159
- new LazyDeserializingObject <>(MetaData .from (extractAxonMetadata (headers )))
160
- );
192
+ private byte [] extractMetadataAsBytes (Headers headers ) {
193
+ return serializer .serialize (MetaData .from (extractAxonMetadata (headers )), byte [].class ).getData ();
161
194
}
162
195
163
- private Optional <EventMessage <?>> buildMessage (Headers headers , SerializedMessage <?> message ) {
196
+ private static boolean isAxonMessage (Headers headers ) {
197
+ return keys (headers ).containsAll (Arrays .asList (MESSAGE_ID , MESSAGE_TYPE ));
198
+ }
199
+
200
+ /**
201
+ * Checks if the event is originated from an aggregate (domain event) or is a simple event sent over the bus.
202
+ * <p>
203
+ * The difference between a DomainEventMessage and an EventMessage, is the following three fields:
204
+ * <ul>
205
+ * <li>The type - represents the Aggregate the event originates from. It would be empty for an EventMessage and
206
+ * filled for a DomainEventMessage.</li>
207
+ * <li>The aggregateIdentifier - represents the Aggregate instance the event originates from. It would be equal
208
+ * to the eventIdentifier for an EventMessage and not equal to that identifier a DomainEventMessage.</li>
209
+ * <li>The sequenceNumber - represents the order of the events within an Aggregate instance's event stream.
210
+ * It would be 0 at all times for an EventMessage, whereas a DomainEventMessage would be 0 or greater.</li>
211
+ * </ul>
212
+ * </p>
213
+ *
214
+ * @param headers Kafka headers.
215
+ * @return <code>true</code> if the event is originated from an aggregate.
216
+ */
217
+ private static boolean isDomainEvent (Headers headers ) {
218
+ return headers .lastHeader (AGGREGATE_TYPE ) != null
219
+ && headers .lastHeader (AGGREGATE_ID ) != null
220
+ && headers .lastHeader (AGGREGATE_SEQ ) != null ;
221
+ }
222
+
223
+ private static Optional <EventMessage <?>> buildMessage (Headers headers , SerializedMessage <?> message ) {
164
224
long timestamp = valueAsLong (headers , MESSAGE_TIMESTAMP );
165
- return headers . lastHeader ( AGGREGATE_ID ) != null
166
- ? buildDomainEvent (headers , message , timestamp )
167
- : buildEvent (message , timestamp );
225
+ return isDomainEvent ( headers )
226
+ ? buildDomainEventMessage (headers , message , timestamp )
227
+ : buildEventMessage (message , timestamp );
168
228
}
169
229
170
- private Optional <EventMessage <?>> buildDomainEvent (Headers headers , SerializedMessage <?> message , long timestamp ) {
230
+ private static Optional <EventMessage <?>> buildDomainEventMessage (Headers headers , SerializedMessage <?> message ,
231
+ long timestamp ) {
171
232
return Optional .of (new GenericDomainEventMessage <>(
172
233
valueAsString (headers , AGGREGATE_TYPE ),
173
234
valueAsString (headers , AGGREGATE_ID ),
@@ -177,7 +238,7 @@ private Optional<EventMessage<?>> buildDomainEvent(Headers headers, SerializedMe
177
238
));
178
239
}
179
240
180
- private Optional <EventMessage <?>> buildEvent (SerializedMessage <?> message , long timestamp ) {
241
+ private static Optional <EventMessage <?>> buildEventMessage (SerializedMessage <?> message , long timestamp ) {
181
242
return Optional .of (new GenericEventMessage <>(message , () -> Instant .ofEpochMilli (timestamp )));
182
243
}
183
244
@@ -193,6 +254,7 @@ public static class Builder {
193
254
private Serializer serializer ;
194
255
private SequencingPolicy <? super EventMessage <?>> sequencingPolicy = SequentialPerAggregatePolicy .instance ();
195
256
private BiFunction <String , Object , RecordHeader > headerValueMapper = byteMapper ();
257
+ private EventUpcasterChain upcasterChain = new EventUpcasterChain ();
196
258
197
259
/**
198
260
* Sets the serializer to serialize the Event Message's payload with.
@@ -234,6 +296,18 @@ public Builder headerValueMapper(BiFunction<String, Object, RecordHeader> header
234
296
return this ;
235
297
}
236
298
299
+ /**
300
+ * Sets the {@code upcasterChain} to be used during the consumption of events.
301
+ *
302
+ * @param upcasterChain upcaster chain to be used on event reading.
303
+ * @return the current Builder instance, for fluent interfacing
304
+ */
305
+ public Builder upcasterChain (EventUpcasterChain upcasterChain ) {
306
+ assertNonNull (upcasterChain , "UpcasterChain must not be null" );
307
+ this .upcasterChain = upcasterChain ;
308
+ return this ;
309
+ }
310
+
237
311
/**
238
312
* Initializes a {@link DefaultKafkaMessageConverter} as specified through this Builder.
239
313
*
0 commit comments