* During conversion meta data entries with the {@code 'axon-metadata-'} prefix are passed to the {@link Headers}. Other
* message-specific attributes are added as metadata. The {@link EventMessage#getPayload()} is serialized using the
- * configured {@link Serializer} and passed as the Kafka recordd's body.
+ * configured {@link Serializer} and passed as the Kafka record's body.
*
+ *
+ * If an up-caster / up-caster chain is configured, this converter will pass the converted messages through it.
+ * Please note, that since the message converter consumes records one-by-one, the up-casting functionality is
+ * limited to one-to-one and one-to-many up-casters only.
+ *
* This implementation will suffice in most cases.
*
* @author Nakul Mishra
@@ -65,6 +74,7 @@ public class DefaultKafkaMessageConverter implements KafkaMessageConverter> sequencingPolicy;
private final BiFunction headerValueMapper;
+ private final EventUpcasterChain upcasterChain;
/**
* Instantiate a {@link DefaultKafkaMessageConverter} based on the fields contained in the {@link Builder}.
@@ -80,6 +90,7 @@ protected DefaultKafkaMessageConverter(Builder builder) {
this.serializer = builder.serializer;
this.sequencingPolicy = builder.sequencingPolicy;
this.headerValueMapper = builder.headerValueMapper;
+ this.upcasterChain = builder.upcasterChain;
}
/**
@@ -131,43 +142,93 @@ public Optional> readKafkaMessage(ConsumerRecord
Headers headers = consumerRecord.headers();
if (isAxonMessage(headers)) {
byte[] messageBody = consumerRecord.value();
- SerializedMessage> message = extractSerializedMessage(headers, messageBody);
- return buildMessage(headers, message);
+ final EventData> eventData = createEventData(headers, messageBody);
+ return upcasterChain
+ .upcast(Stream.of(new InitialEventRepresentation(eventData, serializer)))
+ .findFirst()
+ .map(upcastedEventData -> new SerializedMessage<>(
+ upcastedEventData.getMessageIdentifier(),
+ new LazyDeserializingObject<>(upcastedEventData.getData(), serializer),
+ upcastedEventData.getMetaData()
+ )
+ ).flatMap(serializedMessage -> buildMessage(headers, serializedMessage));
}
} catch (Exception e) {
logger.trace("Error converting ConsumerRecord [{}] to an EventMessage", consumerRecord, e);
}
-
return Optional.empty();
}
- private boolean isAxonMessage(Headers headers) {
- return keys(headers).containsAll(Arrays.asList(MESSAGE_ID, MESSAGE_TYPE));
- }
-
- private SerializedMessage> extractSerializedMessage(Headers headers, byte[] messageBody) {
- SimpleSerializedObject serializedObject = new SimpleSerializedObject<>(
- messageBody,
- byte[].class,
+ /**
+ * Constructs event data representation from given Kafka headers and byte array body.
+ *
+ * This method reuses the {@link GenericDomainEventEntry} class for both types of events which can be
+ * transmitted via Kafka. For domain events, the fields aggregateType, aggregateId and
+ * aggregateSeq will contain the corresponding values, but for the simple event they will be
+ * null. This is ok to pass null to those values and 0L to
+ * aggregateSeq, since the {@link InitialEventRepresentation} does the same in its constructor and
+ * is implemented in a null-tolerant way. Check {@link DefaultKafkaMessageConverter#isDomainEvent(Headers)} for more
+ * details.
+ *
+ *
+ * @param headers Kafka headers.
+ * @param messageBody Kafka payload as a byte array.
+ * @return event data.
+ */
+ private EventData> createEventData(Headers headers, byte[] messageBody) {
+ return new GenericDomainEventEntry<>(
+ valueAsString(headers, AGGREGATE_TYPE),
+ valueAsString(headers, AGGREGATE_ID),
+ valueAsLong(headers, AGGREGATE_SEQ, 0L),
+ valueAsString(headers, MESSAGE_ID),
+ valueAsLong(headers, MESSAGE_TIMESTAMP),
valueAsString(headers, MESSAGE_TYPE),
- valueAsString(headers, MESSAGE_REVISION, null)
+ valueAsString(headers, MESSAGE_REVISION, null),
+ messageBody,
+ extractMetadataAsBytes(headers)
);
+ }
- return new SerializedMessage<>(
- valueAsString(headers, MESSAGE_ID),
- new LazyDeserializingObject<>(serializedObject, serializer),
- new LazyDeserializingObject<>(MetaData.from(extractAxonMetadata(headers)))
- );
+ private byte[] extractMetadataAsBytes(Headers headers) {
+ return serializer.serialize(MetaData.from(extractAxonMetadata(headers)), byte[].class).getData();
}
- private Optional> buildMessage(Headers headers, SerializedMessage> message) {
+ private static boolean isAxonMessage(Headers headers) {
+ return keys(headers).containsAll(Arrays.asList(MESSAGE_ID, MESSAGE_TYPE));
+ }
+
+ /**
+ * Checks if the event is originated from an aggregate (domain event) or is a simple event sent over the bus.
+ *
+ * The difference between a DomainEventMessage and an EventMessage, is the following three fields:
+ *
+ *
The type - represents the Aggregate the event originates from. It would be empty for an EventMessage and
+ * filled for a DomainEventMessage.
+ *
The aggregateIdentifier - represents the Aggregate instance the event originates from. It would be equal
+ * to the eventIdentifier for an EventMessage and not equal to that identifier a DomainEventMessage.
+ *
The sequenceNumber - represents the order of the events within an Aggregate instance's event stream.
+ * It would be 0 at all times for an EventMessage, whereas a DomainEventMessage would be 0 or greater.
+ *
+ *
+ *
+ * @param headers Kafka headers.
+ * @return true if the event is originated from an aggregate.
+ */
+ private static boolean isDomainEvent(Headers headers) {
+ return headers.lastHeader(AGGREGATE_TYPE) != null
+ && headers.lastHeader(AGGREGATE_ID) != null
+ && headers.lastHeader(AGGREGATE_SEQ) != null;
+ }
+
+ private static Optional> buildMessage(Headers headers, SerializedMessage> message) {
long timestamp = valueAsLong(headers, MESSAGE_TIMESTAMP);
- return headers.lastHeader(AGGREGATE_ID) != null
- ? buildDomainEvent(headers, message, timestamp)
- : buildEvent(message, timestamp);
+ return isDomainEvent(headers)
+ ? buildDomainEventMessage(headers, message, timestamp)
+ : buildEventMessage(message, timestamp);
}
- private Optional> buildDomainEvent(Headers headers, SerializedMessage> message, long timestamp) {
+ private static Optional> buildDomainEventMessage(Headers headers, SerializedMessage> message,
+ long timestamp) {
return Optional.of(new GenericDomainEventMessage<>(
valueAsString(headers, AGGREGATE_TYPE),
valueAsString(headers, AGGREGATE_ID),
@@ -177,7 +238,7 @@ private Optional> buildDomainEvent(Headers headers, SerializedMe
));
}
- private Optional> buildEvent(SerializedMessage> message, long timestamp) {
+ private static Optional> buildEventMessage(SerializedMessage> message, long timestamp) {
return Optional.of(new GenericEventMessage<>(message, () -> Instant.ofEpochMilli(timestamp)));
}
@@ -193,6 +254,7 @@ public static class Builder {
private Serializer serializer;
private SequencingPolicy super EventMessage>> sequencingPolicy = SequentialPerAggregatePolicy.instance();
private BiFunction headerValueMapper = byteMapper();
+ private EventUpcasterChain upcasterChain = new EventUpcasterChain();
/**
* Sets the serializer to serialize the Event Message's payload with.
@@ -234,6 +296,18 @@ public Builder headerValueMapper(BiFunction header
return this;
}
+ /**
+ * Sets the {@code upcasterChain} to be used during the consumption of events.
+ *
+ * @param upcasterChain upcaster chain to be used on event reading.
+ * @return the current Builder instance, for fluent interfacing
+ */
+ public Builder upcasterChain(EventUpcasterChain upcasterChain) {
+ assertNonNull(upcasterChain, "UpcasterChain must not be null");
+ this.upcasterChain = upcasterChain;
+ return this;
+ }
+
/**
* Initializes a {@link DefaultKafkaMessageConverter} as specified through this Builder.
*
diff --git a/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/HeaderUtils.java b/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/HeaderUtils.java
index 8372c527..e234f779 100644
--- a/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/HeaderUtils.java
+++ b/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/HeaderUtils.java
@@ -77,6 +77,20 @@ public static Long valueAsLong(Headers headers, String key) {
return asLong(value(headers, key));
}
+ /**
+ * Return a {@link Long} representation of the {@code value} stored under a given {@code key} inside the {@link
+ * Headers}. In case of a missing entry {@code defaultValue} is returned.
+ *
+ * @param headers the Kafka {@code headers} to pull the {@link Long} value from
+ * @param key the key corresponding to the expected {@link Long} value
+ * @param defaultValue the default value to return when {@code key} does not exist in the given {@code headers}
+ * @return the value as a {@link Long} corresponding to the given {@code key} in the {@code headers}
+ */
+ public static Long valueAsLong(Headers headers, String key, Long defaultValue) {
+ Long value = asLong(value(headers, key));
+ return value != null ? value : defaultValue;
+ }
+
/**
* Converts bytes to {@link String}.
*
diff --git a/kafka/src/test/java/org/axonframework/extensions/kafka/eventhandling/DefaultKafkaMessageConverterTest.java b/kafka/src/test/java/org/axonframework/extensions/kafka/eventhandling/DefaultKafkaMessageConverterTest.java
index 8c8db17b..58e44969 100644
--- a/kafka/src/test/java/org/axonframework/extensions/kafka/eventhandling/DefaultKafkaMessageConverterTest.java
+++ b/kafka/src/test/java/org/axonframework/extensions/kafka/eventhandling/DefaultKafkaMessageConverterTest.java
@@ -16,6 +16,8 @@
package org.axonframework.extensions.kafka.eventhandling;
+import com.thoughtworks.xstream.XStream;
+import com.thoughtworks.xstream.security.AnyTypePermission;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
@@ -27,8 +29,12 @@
import org.axonframework.serialization.FixedValueRevisionResolver;
import org.axonframework.serialization.SerializedObject;
import org.axonframework.serialization.SimpleSerializedType;
+import org.axonframework.serialization.upcasting.event.EventUpcasterChain;
import org.axonframework.serialization.xml.XStreamSerializer;
-import org.junit.jupiter.api.*;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.kafka.clients.consumer.ConsumerRecord.NULL_SIZE;
import static org.apache.kafka.common.record.RecordBatch.NO_TIMESTAMP;
@@ -39,7 +45,8 @@
import static org.axonframework.extensions.kafka.eventhandling.util.HeaderAssertUtil.assertEventHeaders;
import static org.axonframework.messaging.Headers.*;
import static org.junit.jupiter.api.Assertions.*;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
/**
* Tests for {@link DefaultKafkaMessageConverter}.
@@ -72,13 +79,13 @@ private static EventMessage