From d80be3beed1085af30783bedc52829b660c3262c Mon Sep 17 00:00:00 2001 From: Simon Zambrovski Date: Tue, 12 Oct 2021 12:06:56 +0200 Subject: [PATCH 1/8] feature: implement support for event upcasters, fix #193 --- .../autoconfig/KafkaAutoConfiguration.java | 12 +- .../DefaultKafkaMessageConverter.java | 104 ++++++++++++++---- .../DefaultKafkaMessageConverterTest.java | 55 +++++++++ 3 files changed, 145 insertions(+), 26 deletions(-) diff --git a/kafka-spring-boot-autoconfigure/src/main/java/org/axonframework/extensions/kafka/autoconfig/KafkaAutoConfiguration.java b/kafka-spring-boot-autoconfigure/src/main/java/org/axonframework/extensions/kafka/autoconfig/KafkaAutoConfiguration.java index d2fe17e6..9611382b 100644 --- a/kafka-spring-boot-autoconfigure/src/main/java/org/axonframework/extensions/kafka/autoconfig/KafkaAutoConfiguration.java +++ b/kafka-spring-boot-autoconfigure/src/main/java/org/axonframework/extensions/kafka/autoconfig/KafkaAutoConfiguration.java @@ -35,6 +35,7 @@ import org.axonframework.extensions.kafka.eventhandling.producer.KafkaPublisher; import org.axonframework.extensions.kafka.eventhandling.producer.ProducerFactory; import org.axonframework.serialization.Serializer; +import org.axonframework.serialization.upcasting.event.EventUpcasterChain; import org.axonframework.spring.config.AxonConfiguration; import org.axonframework.springboot.autoconfig.AxonAutoConfiguration; import org.axonframework.springboot.autoconfig.InfraConfiguration; @@ -84,9 +85,16 @@ public KafkaAutoConfiguration(KafkaProperties properties) { @Bean @ConditionalOnMissingBean public KafkaMessageConverter kafkaMessageConverter( - @Qualifier("eventSerializer") Serializer eventSerializer + @Qualifier("eventSerializer") Serializer eventSerializer, + EventUpcasterChain eventUpcasterChain ) { - return DefaultKafkaMessageConverter.builder().serializer(eventSerializer).build(); + return DefaultKafkaMessageConverter.builder().serializer(eventSerializer).upcasterChain(eventUpcasterChain).build(); + } + + @Bean + @ConditionalOnMissingBean + public EventUpcasterChain emptyUpcasterChain() { + return new EventUpcasterChain(); } @Bean("axonKafkaProducerFactory") diff --git a/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/DefaultKafkaMessageConverter.java b/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/DefaultKafkaMessageConverter.java index 16ff2d52..9db8d641 100644 --- a/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/DefaultKafkaMessageConverter.java +++ b/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/DefaultKafkaMessageConverter.java @@ -22,6 +22,7 @@ import org.apache.kafka.common.header.internals.RecordHeader; import org.axonframework.common.AxonConfigurationException; import org.axonframework.eventhandling.EventMessage; +import org.axonframework.eventhandling.GenericDomainEventEntry; import org.axonframework.eventhandling.GenericDomainEventMessage; import org.axonframework.eventhandling.GenericEventMessage; import org.axonframework.eventhandling.async.SequencingPolicy; @@ -32,6 +33,8 @@ import org.axonframework.serialization.SerializedObject; import org.axonframework.serialization.Serializer; import org.axonframework.serialization.SimpleSerializedObject; +import org.axonframework.serialization.upcasting.event.EventUpcasterChain; +import org.axonframework.serialization.upcasting.event.InitialEventRepresentation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,18 +42,19 @@ import java.util.Arrays; import java.util.Optional; import java.util.function.BiFunction; +import java.util.stream.Stream; import static org.axonframework.common.BuilderUtils.assertNonNull; import static org.axonframework.extensions.kafka.eventhandling.HeaderUtils.*; import static org.axonframework.messaging.Headers.*; /** - * Converts and {@link EventMessage} to a {@link ProducerRecord} Kafka message and {from a @link ConsumerRecord} Kafka + * Converts and {@link EventMessage} to a {@link ProducerRecord} Kafka message and from a {@link ConsumerRecord} Kafka * message back to an EventMessage (if possible). *

* During conversion meta data entries with the {@code 'axon-metadata-'} prefix are passed to the {@link Headers}. Other * message-specific attributes are added as metadata. The {@link EventMessage#getPayload()} is serialized using the - * configured {@link Serializer} and passed as the Kafka recordd's body. + * configured {@link Serializer} and passed as the Kafka record's body. *

* This implementation will suffice in most cases. * @@ -65,6 +69,7 @@ public class DefaultKafkaMessageConverter implements KafkaMessageConverter> sequencingPolicy; private final BiFunction headerValueMapper; + private final EventUpcasterChain upcasterChain; /** * Instantiate a {@link DefaultKafkaMessageConverter} based on the fields contained in the {@link Builder}. @@ -80,6 +85,7 @@ protected DefaultKafkaMessageConverter(Builder builder) { this.serializer = builder.serializer; this.sequencingPolicy = builder.sequencingPolicy; this.headerValueMapper = builder.headerValueMapper; + this.upcasterChain = builder.upcasterChain; } /** @@ -114,9 +120,9 @@ public static Builder builder() { public ProducerRecord createKafkaMessage(EventMessage eventMessage, String topic) { SerializedObject serializedObject = eventMessage.serializePayload(serializer, byte[].class); return new ProducerRecord<>( - topic, null, null, recordKey(eventMessage), - serializedObject.getData(), - toHeaders(eventMessage, serializedObject, headerValueMapper) + topic, null, null, recordKey(eventMessage), + serializedObject.getData(), + toHeaders(eventMessage, serializedObject, headerValueMapper) ); } @@ -130,9 +136,16 @@ public Optional> readKafkaMessage(ConsumerRecord try { Headers headers = consumerRecord.headers(); if (isAxonMessage(headers)) { + byte[] messageBody = consumerRecord.value(); - SerializedMessage message = extractSerializedMessage(headers, messageBody); - return buildMessage(headers, message); + final Optional> message; + // domain events may be upcasted + if (isDomainEvent(headers)) { + message = createDomainEventAndUpcast(headers, messageBody); + } else { + message = createEvent(headers, messageBody); + } + return message.flatMap(serializedMessage -> buildMessage(headers, serializedMessage)); } } catch (Exception e) { logger.trace("Error converting ConsumerRecord [{}] to an EventMessage", consumerRecord, e); @@ -145,35 +158,61 @@ private boolean isAxonMessage(Headers headers) { return keys(headers).containsAll(Arrays.asList(MESSAGE_ID, MESSAGE_TYPE)); } - private SerializedMessage extractSerializedMessage(Headers headers, byte[] messageBody) { + private Optional> createEvent(Headers headers, byte[] messageBody) { SimpleSerializedObject serializedObject = new SimpleSerializedObject<>( - messageBody, - byte[].class, - valueAsString(headers, MESSAGE_TYPE), - valueAsString(headers, MESSAGE_REVISION, null) + messageBody, + byte[].class, + valueAsString(headers, MESSAGE_TYPE), + valueAsString(headers, MESSAGE_REVISION, null) ); - return new SerializedMessage<>( - valueAsString(headers, MESSAGE_ID), - new LazyDeserializingObject<>(serializedObject, serializer), - new LazyDeserializingObject<>(MetaData.from(extractAxonMetadata(headers))) + return Optional.of(new SerializedMessage<>( + valueAsString(headers, MESSAGE_ID), + new LazyDeserializingObject<>(serializedObject, serializer), + new LazyDeserializingObject<>(MetaData.from(extractAxonMetadata(headers))) + )); + } + + private Optional> createDomainEventAndUpcast(Headers headers, byte[] messageBody) { + GenericDomainEventEntry domainEventEntry = new GenericDomainEventEntry<>( + valueAsString(headers, AGGREGATE_TYPE), + valueAsString(headers, AGGREGATE_ID), + valueAsLong(headers, AGGREGATE_SEQ), + valueAsString(headers, MESSAGE_ID), + valueAsLong(headers, MESSAGE_TIMESTAMP), + valueAsString(headers, MESSAGE_TYPE), + valueAsString(headers, MESSAGE_REVISION, null), + messageBody, + serializer.serialize(MetaData.from(extractAxonMetadata(headers)), byte[].class).getData() ); + + return upcasterChain.upcast( + Stream.of(new InitialEventRepresentation(domainEventEntry, serializer)) + ).findFirst().map(upcastedEventData -> new SerializedMessage<>( + valueAsString(headers, MESSAGE_ID), + new LazyDeserializingObject<>(upcastedEventData.getData(), serializer), + upcastedEventData.getMetaData() + )); + } + + private boolean isDomainEvent(Headers headers) { + return headers.lastHeader(AGGREGATE_ID) != null; } private Optional> buildMessage(Headers headers, SerializedMessage message) { long timestamp = valueAsLong(headers, MESSAGE_TIMESTAMP); - return headers.lastHeader(AGGREGATE_ID) != null - ? buildDomainEvent(headers, message, timestamp) - : buildEvent(message, timestamp); + return isDomainEvent(headers) + ? buildDomainEvent(headers, message, timestamp) + : buildEvent(message, timestamp); } private Optional> buildDomainEvent(Headers headers, SerializedMessage message, long timestamp) { return Optional.of(new GenericDomainEventMessage<>( - valueAsString(headers, AGGREGATE_TYPE), - valueAsString(headers, AGGREGATE_ID), - valueAsLong(headers, AGGREGATE_SEQ), - message, - () -> Instant.ofEpochMilli(timestamp) + valueAsString(headers, AGGREGATE_TYPE), + valueAsString(headers, AGGREGATE_ID), + valueAsLong(headers, AGGREGATE_SEQ), + message, + () -> Instant.ofEpochMilli(timestamp) )); } @@ -193,11 +232,13 @@ public static class Builder { private Serializer serializer; private SequencingPolicy> sequencingPolicy = SequentialPerAggregatePolicy.instance(); private BiFunction headerValueMapper = byteMapper(); + private EventUpcasterChain upcasterChain = new EventUpcasterChain(); /** * Sets the serializer to serialize the Event Message's payload with. * * @param serializer The serializer to serialize the Event Message's payload with + * * @return the current Builder instance, for fluent interfacing */ public Builder serializer(Serializer serializer) { @@ -211,6 +252,7 @@ public Builder serializer(Serializer serializer) { * the key for the {@link ProducerRecord}. Defaults to a {@link SequentialPerAggregatePolicy} instance. * * @param sequencingPolicy a {@link SequencingPolicy} used to generate the key for the {@link ProducerRecord} + * * @return the current Builder instance, for fluent interfacing */ public Builder sequencingPolicy(SequencingPolicy> sequencingPolicy) { @@ -226,6 +268,7 @@ public Builder sequencingPolicy(SequencingPolicy> sequen * * @param headerValueMapper a {@link BiFunction} of {@link String}, {@link Object} and {@link RecordHeader}, * used for mapping values to Kafka headers + * * @return the current Builder instance, for fluent interfacing */ public Builder headerValueMapper(BiFunction headerValueMapper) { @@ -234,6 +277,19 @@ public Builder headerValueMapper(BiFunction header return this; } + /** + * Sets the {@code upcasterChain} to be used during the consumption of events. + * + * @param upcasterChain upcaster chain to be used on event reading. + * + * @return the current Builder instance, for fluent interfacing + */ + public Builder upcasterChain(EventUpcasterChain upcasterChain) { + assertNonNull(upcasterChain, "UpcasterChain must not be null"); + this.upcasterChain = upcasterChain; + return this; + } + /** * Initializes a {@link DefaultKafkaMessageConverter} as specified through this Builder. * diff --git a/kafka/src/test/java/org/axonframework/extensions/kafka/eventhandling/DefaultKafkaMessageConverterTest.java b/kafka/src/test/java/org/axonframework/extensions/kafka/eventhandling/DefaultKafkaMessageConverterTest.java index 8c8db17b..67a38935 100644 --- a/kafka/src/test/java/org/axonframework/extensions/kafka/eventhandling/DefaultKafkaMessageConverterTest.java +++ b/kafka/src/test/java/org/axonframework/extensions/kafka/eventhandling/DefaultKafkaMessageConverterTest.java @@ -27,9 +27,16 @@ import org.axonframework.serialization.FixedValueRevisionResolver; import org.axonframework.serialization.SerializedObject; import org.axonframework.serialization.SimpleSerializedType; +import org.axonframework.serialization.upcasting.Upcaster; +import org.axonframework.serialization.upcasting.event.EventUpcaster; +import org.axonframework.serialization.upcasting.event.EventUpcasterChain; +import org.axonframework.serialization.upcasting.event.IntermediateEventRepresentation; import org.axonframework.serialization.xml.XStreamSerializer; import org.junit.jupiter.api.*; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Stream; + import static org.apache.kafka.clients.consumer.ConsumerRecord.NULL_SIZE; import static org.apache.kafka.common.record.RecordBatch.NO_TIMESTAMP; import static org.apache.kafka.common.record.TimestampType.NO_TIMESTAMP_TYPE; @@ -177,11 +184,26 @@ void testReadingMessagePayloadDifferentThanByteShouldReturnEmptyMessage() { @Test void testWritingEventMessageShouldBeReadAsEventMessage() { + AtomicInteger upcasterCalled = new AtomicInteger(0); + + EventUpcasterChain chain = new EventUpcasterChain(new EventUpcaster() { + @Override + public Stream upcast( + Stream intermediateRepresentations) { + upcasterCalled.addAndGet(1); + return intermediateRepresentations; + } + }); + + testSubject = DefaultKafkaMessageConverter.builder().serializer(serializer).upcasterChain(chain).build(); + EventMessage expected = eventMessage(); ProducerRecord senderMessage = testSubject.createKafkaMessage(expected, SOME_TOPIC); EventMessage actual = receiverMessage(senderMessage); assertEventMessage(actual, expected); + // upcasting should not happen on event messages, but on domain event messages only. + assertEquals(0, upcasterCalled.get()); } @Test @@ -205,6 +227,31 @@ void testWritingDomainEventMessageShouldBeReadAsDomainMessage() { assertDomainMessage((DomainEventMessage) actual, expected); } + @Test + void testWritingDomainEventMessageShouldBeReadAsDomainMessageAndPassUpcaster() { + + AtomicInteger upcasterCalled = new AtomicInteger(0); + + EventUpcasterChain chain = new EventUpcasterChain(new EventUpcaster() { + @Override + public Stream upcast( + Stream intermediateRepresentations) { + upcasterCalled.addAndGet(1); + return intermediateRepresentations; + } + }); + testSubject = DefaultKafkaMessageConverter.builder().serializer(serializer).upcasterChain(chain).build(); + + DomainEventMessage expected = domainMessage(); + ProducerRecord senderMessage = testSubject.createKafkaMessage(expected, SOME_TOPIC); + EventMessage actual = receiverMessage(senderMessage); + + assertEventMessage(actual, expected); + assertDomainMessage((DomainEventMessage) actual, expected); + assertEquals(1, upcasterCalled.get()); + } + + @Test void testBuildWithoutSerializerThrowsAxonConfigurationException() { DefaultKafkaMessageConverter.Builder testSubject = DefaultKafkaMessageConverter.builder(); @@ -233,6 +280,14 @@ void testBuildWithNullHeaderValueMapperThrowsAxonConfigurationException() { assertThrows(AxonConfigurationException.class, () -> testSubject.headerValueMapper(null)); } + @Test + void testBuildWithNullUpcasterChainThrowsAxonConfigurationException() { + DefaultKafkaMessageConverter.Builder testSubject = DefaultKafkaMessageConverter.builder(); + + assertThrows(AxonConfigurationException.class, () -> testSubject.upcasterChain(null)); + } + + private void assertDomainMessage(DomainEventMessage actual, DomainEventMessage expected) { assertEquals(expected.getAggregateIdentifier(), actual.getAggregateIdentifier()); assertEquals(expected.getSequenceNumber(), actual.getSequenceNumber()); From c7a648968e79b82362054684f378188b682e9dd1 Mon Sep 17 00:00:00 2001 From: Simon Zambrovski Date: Fri, 15 Oct 2021 20:28:21 +0200 Subject: [PATCH 2/8] feature: implement upcasting of non-domain events --- .../autoconfig/KafkaAutoConfiguration.java | 60 ++++----- .../DefaultKafkaMessageConverter.java | 126 +++++++++++++----- .../DefaultKafkaMessageConverterTest.java | 50 +++---- pom.xml | 2 +- 4 files changed, 144 insertions(+), 94 deletions(-) diff --git a/kafka-spring-boot-autoconfigure/src/main/java/org/axonframework/extensions/kafka/autoconfig/KafkaAutoConfiguration.java b/kafka-spring-boot-autoconfigure/src/main/java/org/axonframework/extensions/kafka/autoconfig/KafkaAutoConfiguration.java index 9611382b..72acbd38 100644 --- a/kafka-spring-boot-autoconfigure/src/main/java/org/axonframework/extensions/kafka/autoconfig/KafkaAutoConfiguration.java +++ b/kafka-spring-boot-autoconfigure/src/main/java/org/axonframework/extensions/kafka/autoconfig/KafkaAutoConfiguration.java @@ -85,16 +85,10 @@ public KafkaAutoConfiguration(KafkaProperties properties) { @Bean @ConditionalOnMissingBean public KafkaMessageConverter kafkaMessageConverter( - @Qualifier("eventSerializer") Serializer eventSerializer, - EventUpcasterChain eventUpcasterChain + @Qualifier("eventSerializer") Serializer eventSerializer, + AxonConfiguration config ) { - return DefaultKafkaMessageConverter.builder().serializer(eventSerializer).upcasterChain(eventUpcasterChain).build(); - } - - @Bean - @ConditionalOnMissingBean - public EventUpcasterChain emptyUpcasterChain() { - return new EventUpcasterChain(); + return DefaultKafkaMessageConverter.builder().serializer(eventSerializer).upcasterChain(config.upcasterChain() != null ? config.upcasterChain() : new EventUpcasterChain()).build(); } @Bean("axonKafkaProducerFactory") @@ -104,18 +98,18 @@ public ProducerFactory kafkaProducerFactory() { String transactionIdPrefix = properties.getProducer().getTransactionIdPrefix(); DefaultProducerFactory.Builder builder = - DefaultProducerFactory.builder() - .configuration(properties.buildProducerProperties()) - .confirmationMode(confirmationMode); + DefaultProducerFactory.builder() + .configuration(properties.buildProducerProperties()) + .confirmationMode(confirmationMode); if (isNonEmptyString(transactionIdPrefix)) { builder.transactionalIdPrefix(transactionIdPrefix) .confirmationMode(ConfirmationMode.TRANSACTIONAL); if (!confirmationMode.isTransactional()) { logger.warn( - "The confirmation mode is set to [{}], whilst a transactional id prefix is present. " - + "The transactional id prefix overwrites the confirmation mode choice to TRANSACTIONAL", - confirmationMode + "The confirmation mode is set to [{}], whilst a transactional id prefix is present. " + + "The transactional id prefix overwrites the confirmation mode choice to TRANSACTIONAL", + confirmationMode ); } } @@ -130,10 +124,11 @@ private boolean isNonEmptyString(String s) { @SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection") @ConditionalOnMissingBean @Bean(destroyMethod = "shutDown") - @ConditionalOnBean({ProducerFactory.class, KafkaMessageConverter.class}) - public KafkaPublisher kafkaPublisher(ProducerFactory axonKafkaProducerFactory, - KafkaMessageConverter kafkaMessageConverter, - AxonConfiguration configuration) { + @ConditionalOnBean({ ProducerFactory.class, KafkaMessageConverter.class }) + public KafkaPublisher kafkaPublisher( + ProducerFactory axonKafkaProducerFactory, + KafkaMessageConverter kafkaMessageConverter, + AxonConfiguration configuration) { return KafkaPublisher.builder() .producerFactory(axonKafkaProducerFactory) .messageConverter(kafkaMessageConverter) @@ -145,12 +140,13 @@ public KafkaPublisher kafkaPublisher(ProducerFactory kafkaEventPublisher(KafkaPublisher kafkaPublisher, - KafkaProperties kafkaProperties, - EventProcessingConfigurer eventProcessingConfigurer) { + @ConditionalOnBean({ KafkaPublisher.class }) + public KafkaEventPublisher kafkaEventPublisher( + KafkaPublisher kafkaPublisher, + KafkaProperties kafkaProperties, + EventProcessingConfigurer eventProcessingConfigurer) { KafkaEventPublisher kafkaEventPublisher = - KafkaEventPublisher.builder().kafkaPublisher(kafkaPublisher).build(); + KafkaEventPublisher.builder().kafkaPublisher(kafkaPublisher).build(); /* * Register an invocation error handler which re-throws any exception. @@ -160,11 +156,11 @@ public KafkaEventPublisher kafkaEventPublisher(KafkaPublisher kafkaEventPublisher) .registerListenerInvocationErrorHandler( - DEFAULT_PROCESSING_GROUP, configuration -> PropagatingErrorHandler.instance() + DEFAULT_PROCESSING_GROUP, configuration -> PropagatingErrorHandler.instance() ) .assignHandlerTypesMatching( - DEFAULT_PROCESSING_GROUP, - clazz -> clazz.isAssignableFrom(KafkaEventPublisher.class) + DEFAULT_PROCESSING_GROUP, + clazz -> clazz.isAssignableFrom(KafkaEventPublisher.class) ); KafkaProperties.EventProcessorMode processorMode = kafkaProperties.getProducer().getEventProcessorMode(); @@ -197,12 +193,12 @@ public ConsumerFactory kafkaConsumerFactory() { @Bean @ConditionalOnMissingBean - @ConditionalOnBean({ConsumerFactory.class, KafkaMessageConverter.class, Fetcher.class}) + @ConditionalOnBean({ ConsumerFactory.class, KafkaMessageConverter.class, Fetcher.class }) @Conditional(StreamingProcessorModeCondition.class) public StreamableKafkaMessageSource streamableKafkaMessageSource( - ConsumerFactory kafkaConsumerFactory, - Fetcher kafkaFetcher, - KafkaMessageConverter kafkaMessageConverter + ConsumerFactory kafkaConsumerFactory, + Fetcher kafkaFetcher, + KafkaMessageConverter kafkaMessageConverter ) { return StreamableKafkaMessageSource.builder() .topics(Collections.singletonList(properties.getDefaultTopic())) @@ -210,7 +206,7 @@ public StreamableKafkaMessageSource streamableKafkaMessageSource .fetcher(kafkaFetcher) .messageConverter(kafkaMessageConverter) .bufferFactory(() -> new SortedKafkaMessageBuffer<>( - properties.getFetcher().getBufferSize() + properties.getFetcher().getBufferSize() )) .build(); } diff --git a/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/DefaultKafkaMessageConverter.java b/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/DefaultKafkaMessageConverter.java index 9db8d641..feaa6951 100644 --- a/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/DefaultKafkaMessageConverter.java +++ b/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/DefaultKafkaMessageConverter.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.header.internals.RecordHeader; import org.axonframework.common.AxonConfigurationException; +import org.axonframework.eventhandling.EventData; import org.axonframework.eventhandling.EventMessage; import org.axonframework.eventhandling.GenericDomainEventEntry; import org.axonframework.eventhandling.GenericDomainEventMessage; @@ -30,6 +31,7 @@ import org.axonframework.messaging.MetaData; import org.axonframework.serialization.LazyDeserializingObject; import org.axonframework.serialization.SerializedMessage; +import org.axonframework.serialization.SerializedMetaData; import org.axonframework.serialization.SerializedObject; import org.axonframework.serialization.Serializer; import org.axonframework.serialization.SimpleSerializedObject; @@ -56,6 +58,11 @@ * message-specific attributes are added as metadata. The {@link EventMessage#getPayload()} is serialized using the * configured {@link Serializer} and passed as the Kafka record's body. *

+ *

+ * If an up-caster / up-caster chain is configured, this converter will pass the converted messages through the it. + * Please note, that the since the message converter consumes records one-by-one, the up-casting functionality is limited + * to one-to-one and one-to-many up-casters only. + *

* This implementation will suffice in most cases. * * @author Nakul Mishra @@ -136,21 +143,26 @@ public Optional> readKafkaMessage(ConsumerRecord try { Headers headers = consumerRecord.headers(); if (isAxonMessage(headers)) { - byte[] messageBody = consumerRecord.value(); - final Optional> message; - // domain events may be upcasted + final EventData eventData; if (isDomainEvent(headers)) { - message = createDomainEventAndUpcast(headers, messageBody); + eventData = createDomainEventEntry(headers, messageBody); } else { - message = createEvent(headers, messageBody); + eventData = createEventData(headers, messageBody); } - return message.flatMap(serializedMessage -> buildMessage(headers, serializedMessage)); + return upcasterChain + .upcast(Stream.of(new InitialEventRepresentation(eventData, serializer))) + .findFirst() + .map(upcastedEventData -> new SerializedMessage<>( + upcastedEventData.getMessageIdentifier(), + new LazyDeserializingObject<>(upcastedEventData.getData(), serializer), + upcastedEventData.getMetaData() + ) + ).flatMap(serializedMessage -> buildMessage(headers, serializedMessage)); } } catch (Exception e) { logger.trace("Error converting ConsumerRecord [{}] to an EventMessage", consumerRecord, e); } - return Optional.empty(); } @@ -158,23 +170,20 @@ private boolean isAxonMessage(Headers headers) { return keys(headers).containsAll(Arrays.asList(MESSAGE_ID, MESSAGE_TYPE)); } - private Optional> createEvent(Headers headers, byte[] messageBody) { - SimpleSerializedObject serializedObject = new SimpleSerializedObject<>( - messageBody, - byte[].class, + private EventData createEventData(Headers headers, byte[] messageBody) { + return new GenericMessageEventData<>( + valueAsString(headers, MESSAGE_ID), + valueAsLong(headers, MESSAGE_TIMESTAMP), valueAsString(headers, MESSAGE_TYPE), - valueAsString(headers, MESSAGE_REVISION, null) + valueAsString(headers, MESSAGE_REVISION, null), + messageBody, + extractMetadataAsBytes(headers), + byte[].class ); - - return Optional.of(new SerializedMessage<>( - valueAsString(headers, MESSAGE_ID), - new LazyDeserializingObject<>(serializedObject, serializer), - new LazyDeserializingObject<>(MetaData.from(extractAxonMetadata(headers))) - )); } - private Optional> createDomainEventAndUpcast(Headers headers, byte[] messageBody) { - GenericDomainEventEntry domainEventEntry = new GenericDomainEventEntry<>( + private GenericDomainEventEntry createDomainEventEntry(Headers headers, byte[] messageBody) { + return new GenericDomainEventEntry<>( valueAsString(headers, AGGREGATE_TYPE), valueAsString(headers, AGGREGATE_ID), valueAsLong(headers, AGGREGATE_SEQ), @@ -183,30 +192,26 @@ private Optional> createDomainEventAndUpcast(Headers header valueAsString(headers, MESSAGE_TYPE), valueAsString(headers, MESSAGE_REVISION, null), messageBody, - serializer.serialize(MetaData.from(extractAxonMetadata(headers)), byte[].class).getData() + extractMetadataAsBytes(headers) ); - - return upcasterChain.upcast( - Stream.of(new InitialEventRepresentation(domainEventEntry, serializer)) - ).findFirst().map(upcastedEventData -> new SerializedMessage<>( - valueAsString(headers, MESSAGE_ID), - new LazyDeserializingObject<>(upcastedEventData.getData(), serializer), - upcastedEventData.getMetaData() - )); } private boolean isDomainEvent(Headers headers) { return headers.lastHeader(AGGREGATE_ID) != null; } + private byte[] extractMetadataAsBytes(Headers headers) { + return serializer.serialize(MetaData.from(extractAxonMetadata(headers)), byte[].class).getData(); + } + private Optional> buildMessage(Headers headers, SerializedMessage message) { long timestamp = valueAsLong(headers, MESSAGE_TIMESTAMP); return isDomainEvent(headers) - ? buildDomainEvent(headers, message, timestamp) - : buildEvent(message, timestamp); + ? buildDomainEventMessage(headers, message, timestamp) + : buildEventMessage(message, timestamp); } - private Optional> buildDomainEvent(Headers headers, SerializedMessage message, long timestamp) { + private Optional> buildDomainEventMessage(Headers headers, SerializedMessage message, long timestamp) { return Optional.of(new GenericDomainEventMessage<>( valueAsString(headers, AGGREGATE_TYPE), valueAsString(headers, AGGREGATE_ID), @@ -216,10 +221,65 @@ private Optional> buildDomainEvent(Headers headers, SerializedMe )); } - private Optional> buildEvent(SerializedMessage message, long timestamp) { + private Optional> buildEventMessage(SerializedMessage message, long timestamp) { return Optional.of(new GenericEventMessage<>(message, () -> Instant.ofEpochMilli(timestamp))); } + /** + * Generic Event data used for passing throw up-caster chain. + */ + private static class GenericMessageEventData implements EventData { + + private final Instant timestamp; + private final String messageType; + private final String messageRevision; + private final T messageBody; + private final T metadata; + private final Class contentType; + private final String eventIdentifier; + + /** + * Constructs the event data. + * + * @param eventIdentifier event identifier. + * @param timestamp event timestamp as milliseconds since epoch. + * @param messageType message type. + * @param messageRevision revision or null, if no revision is provided. + * @param messageBody bytes of the event message. + * @param metadata metadata pf the message. + * @param contentType class of the Java representation of the content of the event body and metadata. + */ + GenericMessageEventData(String eventIdentifier, Long timestamp, String messageType, String messageRevision, T messageBody, T metadata, Class contentType) { + this.eventIdentifier = eventIdentifier; + this.timestamp = Instant.ofEpochMilli(timestamp); + this.messageType = messageType; + this.messageRevision = messageRevision; + this.messageBody = messageBody; + this.metadata = metadata; + this.contentType = contentType; + } + + @Override + public String getEventIdentifier() { + return this.eventIdentifier; + } + + @Override + public Instant getTimestamp() { + return timestamp; + } + + @Override + public SerializedObject getMetaData() { + return new SerializedMetaData<>(metadata, contentType); + } + + @Override + public SerializedObject getPayload() { + return new SimpleSerializedObject<>(messageBody, contentType, messageType, messageRevision); + } + } + /** * Builder class to instantiate a {@link DefaultKafkaMessageConverter}. *

diff --git a/kafka/src/test/java/org/axonframework/extensions/kafka/eventhandling/DefaultKafkaMessageConverterTest.java b/kafka/src/test/java/org/axonframework/extensions/kafka/eventhandling/DefaultKafkaMessageConverterTest.java index 67a38935..58e44969 100644 --- a/kafka/src/test/java/org/axonframework/extensions/kafka/eventhandling/DefaultKafkaMessageConverterTest.java +++ b/kafka/src/test/java/org/axonframework/extensions/kafka/eventhandling/DefaultKafkaMessageConverterTest.java @@ -16,6 +16,8 @@ package org.axonframework.extensions.kafka.eventhandling; +import com.thoughtworks.xstream.XStream; +import com.thoughtworks.xstream.security.AnyTypePermission; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.header.Headers; @@ -27,15 +29,12 @@ import org.axonframework.serialization.FixedValueRevisionResolver; import org.axonframework.serialization.SerializedObject; import org.axonframework.serialization.SimpleSerializedType; -import org.axonframework.serialization.upcasting.Upcaster; -import org.axonframework.serialization.upcasting.event.EventUpcaster; import org.axonframework.serialization.upcasting.event.EventUpcasterChain; -import org.axonframework.serialization.upcasting.event.IntermediateEventRepresentation; import org.axonframework.serialization.xml.XStreamSerializer; -import org.junit.jupiter.api.*; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.Stream; import static org.apache.kafka.clients.consumer.ConsumerRecord.NULL_SIZE; import static org.apache.kafka.common.record.RecordBatch.NO_TIMESTAMP; @@ -46,7 +45,8 @@ import static org.axonframework.extensions.kafka.eventhandling.util.HeaderAssertUtil.assertEventHeaders; import static org.axonframework.messaging.Headers.*; import static org.junit.jupiter.api.Assertions.*; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; /** * Tests for {@link DefaultKafkaMessageConverter}. @@ -79,13 +79,13 @@ private static EventMessage eventMessage() { private static GenericDomainEventMessage domainMessage() { return new GenericDomainEventMessage<>( - "Stub", SOME_AGGREGATE_IDENTIFIER, 1L, "Payload", MetaData.with("key", "value") + "Stub", SOME_AGGREGATE_IDENTIFIER, 1L, "Payload", MetaData.with("key", "value") ); } private static ConsumerRecord toReceiverRecord(ProducerRecord message) { ConsumerRecord receiverRecord = new ConsumerRecord<>( - SOME_TOPIC, SOME_PARTITION, SOME_OFFSET, message.key(), message.value() + SOME_TOPIC, SOME_PARTITION, SOME_OFFSET, message.key(), message.value() ); message.headers().forEach(header -> receiverRecord.headers().add(header)); return receiverRecord; @@ -93,8 +93,11 @@ private static ConsumerRecord toReceiverRecord(ProducerRecord( - "foo", 0, 0, NO_TIMESTAMP, NO_TIMESTAMP_TYPE, - -1L, NULL_SIZE, NULL_SIZE, "123", "some-wrong-input", headers + "foo", 0, 0, NO_TIMESTAMP, NO_TIMESTAMP_TYPE, + -1L, NULL_SIZE, NULL_SIZE, "123", "some-wrong-input", headers ); //noinspection unchecked @@ -183,16 +186,12 @@ void testReadingMessagePayloadDifferentThanByteShouldReturnEmptyMessage() { } @Test - void testWritingEventMessageShouldBeReadAsEventMessage() { + void testWritingEventMessageShouldBeReadAsEventMessageAndPassUpcaster() { AtomicInteger upcasterCalled = new AtomicInteger(0); - EventUpcasterChain chain = new EventUpcasterChain(new EventUpcaster() { - @Override - public Stream upcast( - Stream intermediateRepresentations) { - upcasterCalled.addAndGet(1); - return intermediateRepresentations; - } + EventUpcasterChain chain = new EventUpcasterChain(intermediateRepresentations -> { + upcasterCalled.addAndGet(1); + return intermediateRepresentations; }); testSubject = DefaultKafkaMessageConverter.builder().serializer(serializer).upcasterChain(chain).build(); @@ -202,8 +201,7 @@ public Stream upcast( EventMessage actual = receiverMessage(senderMessage); assertEventMessage(actual, expected); - // upcasting should not happen on event messages, but on domain event messages only. - assertEquals(0, upcasterCalled.get()); + assertEquals(1, upcasterCalled.get()); } @Test @@ -232,13 +230,9 @@ void testWritingDomainEventMessageShouldBeReadAsDomainMessageAndPassUpcaster() { AtomicInteger upcasterCalled = new AtomicInteger(0); - EventUpcasterChain chain = new EventUpcasterChain(new EventUpcaster() { - @Override - public Stream upcast( - Stream intermediateRepresentations) { - upcasterCalled.addAndGet(1); - return intermediateRepresentations; - } + EventUpcasterChain chain = new EventUpcasterChain(intermediateRepresentations -> { + upcasterCalled.addAndGet(1); + return intermediateRepresentations; }); testSubject = DefaultKafkaMessageConverter.builder().serializer(serializer).upcasterChain(chain).build(); @@ -296,7 +290,7 @@ private void assertDomainMessage(DomainEventMessage actual, DomainEventMessag private EventMessage receiverMessage(ProducerRecord senderMessage) { return testSubject.readKafkaMessage( - toReceiverRecord(senderMessage)).orElseThrow(() -> new AssertionError("Expected valid message") + toReceiverRecord(senderMessage)).orElseThrow(() -> new AssertionError("Expected valid message") ); } } diff --git a/pom.xml b/pom.xml index 6bf180ed..b3f77fce 100644 --- a/pom.xml +++ b/pom.xml @@ -38,7 +38,7 @@ Apache 2.0 - http://www.apache.org/licenses/LICENSE-2.0 + https://www.apache.org/licenses/LICENSE-2.0 From 6a4c986c133097443d3bb8eaa8d917bd21bb7fb9 Mon Sep 17 00:00:00 2001 From: Simon Zambrovski Date: Tue, 19 Oct 2021 11:11:51 +0200 Subject: [PATCH 3/8] feature: implemented review comments --- .../autoconfig/KafkaAutoConfiguration.java | 53 ++--- .../DefaultKafkaMessageConverter.java | 194 +++++++----------- .../kafka/eventhandling/HeaderUtils.java | 14 ++ 3 files changed, 117 insertions(+), 144 deletions(-) diff --git a/kafka-spring-boot-autoconfigure/src/main/java/org/axonframework/extensions/kafka/autoconfig/KafkaAutoConfiguration.java b/kafka-spring-boot-autoconfigure/src/main/java/org/axonframework/extensions/kafka/autoconfig/KafkaAutoConfiguration.java index 72acbd38..80cf04a1 100644 --- a/kafka-spring-boot-autoconfigure/src/main/java/org/axonframework/extensions/kafka/autoconfig/KafkaAutoConfiguration.java +++ b/kafka-spring-boot-autoconfigure/src/main/java/org/axonframework/extensions/kafka/autoconfig/KafkaAutoConfiguration.java @@ -85,10 +85,11 @@ public KafkaAutoConfiguration(KafkaProperties properties) { @Bean @ConditionalOnMissingBean public KafkaMessageConverter kafkaMessageConverter( - @Qualifier("eventSerializer") Serializer eventSerializer, - AxonConfiguration config + @Qualifier("eventSerializer") Serializer eventSerializer, + AxonConfiguration config ) { - return DefaultKafkaMessageConverter.builder().serializer(eventSerializer).upcasterChain(config.upcasterChain() != null ? config.upcasterChain() : new EventUpcasterChain()).build(); + return DefaultKafkaMessageConverter.builder().serializer(eventSerializer).upcasterChain( + config.upcasterChain() != null ? config.upcasterChain() : new EventUpcasterChain()).build(); } @Bean("axonKafkaProducerFactory") @@ -98,18 +99,18 @@ public ProducerFactory kafkaProducerFactory() { String transactionIdPrefix = properties.getProducer().getTransactionIdPrefix(); DefaultProducerFactory.Builder builder = - DefaultProducerFactory.builder() - .configuration(properties.buildProducerProperties()) - .confirmationMode(confirmationMode); + DefaultProducerFactory.builder() + .configuration(properties.buildProducerProperties()) + .confirmationMode(confirmationMode); if (isNonEmptyString(transactionIdPrefix)) { builder.transactionalIdPrefix(transactionIdPrefix) .confirmationMode(ConfirmationMode.TRANSACTIONAL); if (!confirmationMode.isTransactional()) { logger.warn( - "The confirmation mode is set to [{}], whilst a transactional id prefix is present. " - + "The transactional id prefix overwrites the confirmation mode choice to TRANSACTIONAL", - confirmationMode + "The confirmation mode is set to [{}], whilst a transactional id prefix is present. " + + "The transactional id prefix overwrites the confirmation mode choice to TRANSACTIONAL", + confirmationMode ); } } @@ -124,11 +125,11 @@ private boolean isNonEmptyString(String s) { @SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection") @ConditionalOnMissingBean @Bean(destroyMethod = "shutDown") - @ConditionalOnBean({ ProducerFactory.class, KafkaMessageConverter.class }) + @ConditionalOnBean({ProducerFactory.class, KafkaMessageConverter.class}) public KafkaPublisher kafkaPublisher( - ProducerFactory axonKafkaProducerFactory, - KafkaMessageConverter kafkaMessageConverter, - AxonConfiguration configuration) { + ProducerFactory axonKafkaProducerFactory, + KafkaMessageConverter kafkaMessageConverter, + AxonConfiguration configuration) { return KafkaPublisher.builder() .producerFactory(axonKafkaProducerFactory) .messageConverter(kafkaMessageConverter) @@ -140,13 +141,13 @@ public KafkaPublisher kafkaPublisher( @SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection") @Bean @ConditionalOnMissingBean - @ConditionalOnBean({ KafkaPublisher.class }) + @ConditionalOnBean({KafkaPublisher.class}) public KafkaEventPublisher kafkaEventPublisher( - KafkaPublisher kafkaPublisher, - KafkaProperties kafkaProperties, - EventProcessingConfigurer eventProcessingConfigurer) { + KafkaPublisher kafkaPublisher, + KafkaProperties kafkaProperties, + EventProcessingConfigurer eventProcessingConfigurer) { KafkaEventPublisher kafkaEventPublisher = - KafkaEventPublisher.builder().kafkaPublisher(kafkaPublisher).build(); + KafkaEventPublisher.builder().kafkaPublisher(kafkaPublisher).build(); /* * Register an invocation error handler which re-throws any exception. @@ -156,11 +157,11 @@ public KafkaEventPublisher kafkaEventPublisher( */ eventProcessingConfigurer.registerEventHandler(configuration -> kafkaEventPublisher) .registerListenerInvocationErrorHandler( - DEFAULT_PROCESSING_GROUP, configuration -> PropagatingErrorHandler.instance() + DEFAULT_PROCESSING_GROUP, configuration -> PropagatingErrorHandler.instance() ) .assignHandlerTypesMatching( - DEFAULT_PROCESSING_GROUP, - clazz -> clazz.isAssignableFrom(KafkaEventPublisher.class) + DEFAULT_PROCESSING_GROUP, + clazz -> clazz.isAssignableFrom(KafkaEventPublisher.class) ); KafkaProperties.EventProcessorMode processorMode = kafkaProperties.getProducer().getEventProcessorMode(); @@ -193,12 +194,12 @@ public ConsumerFactory kafkaConsumerFactory() { @Bean @ConditionalOnMissingBean - @ConditionalOnBean({ ConsumerFactory.class, KafkaMessageConverter.class, Fetcher.class }) + @ConditionalOnBean({ConsumerFactory.class, KafkaMessageConverter.class, Fetcher.class}) @Conditional(StreamingProcessorModeCondition.class) public StreamableKafkaMessageSource streamableKafkaMessageSource( - ConsumerFactory kafkaConsumerFactory, - Fetcher kafkaFetcher, - KafkaMessageConverter kafkaMessageConverter + ConsumerFactory kafkaConsumerFactory, + Fetcher kafkaFetcher, + KafkaMessageConverter kafkaMessageConverter ) { return StreamableKafkaMessageSource.builder() .topics(Collections.singletonList(properties.getDefaultTopic())) @@ -206,7 +207,7 @@ public StreamableKafkaMessageSource streamableKafkaMessageSource .fetcher(kafkaFetcher) .messageConverter(kafkaMessageConverter) .bufferFactory(() -> new SortedKafkaMessageBuffer<>( - properties.getFetcher().getBufferSize() + properties.getFetcher().getBufferSize() )) .build(); } diff --git a/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/DefaultKafkaMessageConverter.java b/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/DefaultKafkaMessageConverter.java index feaa6951..325e36a9 100644 --- a/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/DefaultKafkaMessageConverter.java +++ b/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/DefaultKafkaMessageConverter.java @@ -31,10 +31,8 @@ import org.axonframework.messaging.MetaData; import org.axonframework.serialization.LazyDeserializingObject; import org.axonframework.serialization.SerializedMessage; -import org.axonframework.serialization.SerializedMetaData; import org.axonframework.serialization.SerializedObject; import org.axonframework.serialization.Serializer; -import org.axonframework.serialization.SimpleSerializedObject; import org.axonframework.serialization.upcasting.event.EventUpcasterChain; import org.axonframework.serialization.upcasting.event.InitialEventRepresentation; import org.slf4j.Logger; @@ -60,8 +58,8 @@ *

*

* If an up-caster / up-caster chain is configured, this converter will pass the converted messages through the it. - * Please note, that the since the message converter consumes records one-by-one, the up-casting functionality is limited - * to one-to-one and one-to-many up-casters only. + * Please note, that the since the message converter consumes records one-by-one, the up-casting functionality is + * limited to one-to-one and one-to-many up-casters only. *

* This implementation will suffice in most cases. * @@ -127,9 +125,9 @@ public static Builder builder() { public ProducerRecord createKafkaMessage(EventMessage eventMessage, String topic) { SerializedObject serializedObject = eventMessage.serializePayload(serializer, byte[].class); return new ProducerRecord<>( - topic, null, null, recordKey(eventMessage), - serializedObject.getData(), - toHeaders(eventMessage, serializedObject, headerValueMapper) + topic, null, null, recordKey(eventMessage), + serializedObject.getData(), + toHeaders(eventMessage, serializedObject, headerValueMapper) ); } @@ -144,21 +142,16 @@ public Optional> readKafkaMessage(ConsumerRecord Headers headers = consumerRecord.headers(); if (isAxonMessage(headers)) { byte[] messageBody = consumerRecord.value(); - final EventData eventData; - if (isDomainEvent(headers)) { - eventData = createDomainEventEntry(headers, messageBody); - } else { - eventData = createEventData(headers, messageBody); - } + final EventData eventData = createEventData(headers, messageBody); return upcasterChain - .upcast(Stream.of(new InitialEventRepresentation(eventData, serializer))) - .findFirst() - .map(upcastedEventData -> new SerializedMessage<>( - upcastedEventData.getMessageIdentifier(), - new LazyDeserializingObject<>(upcastedEventData.getData(), serializer), - upcastedEventData.getMetaData() - ) - ).flatMap(serializedMessage -> buildMessage(headers, serializedMessage)); + .upcast(Stream.of(new InitialEventRepresentation(eventData, serializer))) + .findFirst() + .map(upcastedEventData -> new SerializedMessage<>( + upcastedEventData.getMessageIdentifier(), + new LazyDeserializingObject<>(upcastedEventData.getData(), serializer), + upcastedEventData.getMetaData() + ) + ).flatMap(serializedMessage -> buildMessage(headers, serializedMessage)); } } catch (Exception e) { logger.trace("Error converting ConsumerRecord [{}] to an EventMessage", consumerRecord, e); @@ -166,120 +159,89 @@ public Optional> readKafkaMessage(ConsumerRecord return Optional.empty(); } - private boolean isAxonMessage(Headers headers) { - return keys(headers).containsAll(Arrays.asList(MESSAGE_ID, MESSAGE_TYPE)); - } - + /** + * Constructs event data representation from given Kafka headers and byte array body. + *

+ * This method reuses the {@link GenericDomainEventEntry} class for both types of events which can be + * transmitted via Kafka. For domain events, the fields aggregateType, aggregateId and + * aggregateSeq will contain the corresponding values, but for the simple event they will be + * null. This is ok to pass null to those values and 0L to + * aggregateSeq, since the {@link InitialEventRepresentation} does the same in its constructor and + * is implemented in a null-tolerant way. Check {@link DefaultKafkaMessageConverter#isDomainEvent(Headers)} for more + * details. + *

+ * + * @param headers Kafka headers. + * @param messageBody Kafka payload as a byte array. + * @return event data. + */ private EventData createEventData(Headers headers, byte[] messageBody) { - return new GenericMessageEventData<>( - valueAsString(headers, MESSAGE_ID), - valueAsLong(headers, MESSAGE_TIMESTAMP), - valueAsString(headers, MESSAGE_TYPE), - valueAsString(headers, MESSAGE_REVISION, null), - messageBody, - extractMetadataAsBytes(headers), - byte[].class + return new GenericDomainEventEntry<>( + valueAsString(headers, AGGREGATE_TYPE), + valueAsString(headers, AGGREGATE_ID), + valueAsLong(headers, AGGREGATE_SEQ, 0L), + valueAsString(headers, MESSAGE_ID), + valueAsLong(headers, MESSAGE_TIMESTAMP), + valueAsString(headers, MESSAGE_TYPE), + valueAsString(headers, MESSAGE_REVISION, null), + messageBody, + extractMetadataAsBytes(headers) ); } - private GenericDomainEventEntry createDomainEventEntry(Headers headers, byte[] messageBody) { - return new GenericDomainEventEntry<>( - valueAsString(headers, AGGREGATE_TYPE), - valueAsString(headers, AGGREGATE_ID), - valueAsLong(headers, AGGREGATE_SEQ), - valueAsString(headers, MESSAGE_ID), - valueAsLong(headers, MESSAGE_TIMESTAMP), - valueAsString(headers, MESSAGE_TYPE), - valueAsString(headers, MESSAGE_REVISION, null), - messageBody, - extractMetadataAsBytes(headers) - ); + private byte[] extractMetadataAsBytes(Headers headers) { + return serializer.serialize(MetaData.from(extractAxonMetadata(headers)), byte[].class).getData(); } - private boolean isDomainEvent(Headers headers) { - return headers.lastHeader(AGGREGATE_ID) != null; + private static boolean isAxonMessage(Headers headers) { + return keys(headers).containsAll(Arrays.asList(MESSAGE_ID, MESSAGE_TYPE)); } - private byte[] extractMetadataAsBytes(Headers headers) { - return serializer.serialize(MetaData.from(extractAxonMetadata(headers)), byte[].class).getData(); + /** + * Checks if the event is originated from an aggregate (domain event) or is a simple event sent over the bus. + *

+ * The difference between a DomainEventMessage and an EventMessage, is the following three fields: + *

    + *
  • The type - represents the Aggregate the event originates from. It would be empty for an EventMessage and + * filled for a DomainEventMessage.
  • + *
  • The aggregateIdentifier - represents the Aggregate instance the event originates from. It would be equal + * to the eventIdentifier for an EventMessage and not equal to that identifier a DomainEventMessage.
  • + *
  • The sequenceNumber - represents the order of the events within an Aggregate instance's event stream. + * It would be 0 at all times for an EventMessage, whereas a DomainEventMessage would be 0 or greater.
  • + *
+ *

+ * + * @param headers Kafka headers. + * @return true if the event is originated from an aggregate. + */ + private static boolean isDomainEvent(Headers headers) { + return headers.lastHeader(AGGREGATE_TYPE) != null + && headers.lastHeader(AGGREGATE_ID) != null + && headers.lastHeader(AGGREGATE_SEQ) != null; } - private Optional> buildMessage(Headers headers, SerializedMessage message) { + private static Optional> buildMessage(Headers headers, SerializedMessage message) { long timestamp = valueAsLong(headers, MESSAGE_TIMESTAMP); return isDomainEvent(headers) - ? buildDomainEventMessage(headers, message, timestamp) - : buildEventMessage(message, timestamp); + ? buildDomainEventMessage(headers, message, timestamp) + : buildEventMessage(message, timestamp); } - private Optional> buildDomainEventMessage(Headers headers, SerializedMessage message, long timestamp) { + private static Optional> buildDomainEventMessage(Headers headers, SerializedMessage message, + long timestamp) { return Optional.of(new GenericDomainEventMessage<>( - valueAsString(headers, AGGREGATE_TYPE), - valueAsString(headers, AGGREGATE_ID), - valueAsLong(headers, AGGREGATE_SEQ), - message, - () -> Instant.ofEpochMilli(timestamp) + valueAsString(headers, AGGREGATE_TYPE), + valueAsString(headers, AGGREGATE_ID), + valueAsLong(headers, AGGREGATE_SEQ), + message, + () -> Instant.ofEpochMilli(timestamp) )); } - private Optional> buildEventMessage(SerializedMessage message, long timestamp) { + private static Optional> buildEventMessage(SerializedMessage message, long timestamp) { return Optional.of(new GenericEventMessage<>(message, () -> Instant.ofEpochMilli(timestamp))); } - /** - * Generic Event data used for passing throw up-caster chain. - */ - private static class GenericMessageEventData implements EventData { - - private final Instant timestamp; - private final String messageType; - private final String messageRevision; - private final T messageBody; - private final T metadata; - private final Class contentType; - private final String eventIdentifier; - - /** - * Constructs the event data. - * - * @param eventIdentifier event identifier. - * @param timestamp event timestamp as milliseconds since epoch. - * @param messageType message type. - * @param messageRevision revision or null, if no revision is provided. - * @param messageBody bytes of the event message. - * @param metadata metadata pf the message. - * @param contentType class of the Java representation of the content of the event body and metadata. - */ - GenericMessageEventData(String eventIdentifier, Long timestamp, String messageType, String messageRevision, T messageBody, T metadata, Class contentType) { - this.eventIdentifier = eventIdentifier; - this.timestamp = Instant.ofEpochMilli(timestamp); - this.messageType = messageType; - this.messageRevision = messageRevision; - this.messageBody = messageBody; - this.metadata = metadata; - this.contentType = contentType; - } - - @Override - public String getEventIdentifier() { - return this.eventIdentifier; - } - - @Override - public Instant getTimestamp() { - return timestamp; - } - - @Override - public SerializedObject getMetaData() { - return new SerializedMetaData<>(metadata, contentType); - } - - @Override - public SerializedObject getPayload() { - return new SimpleSerializedObject<>(messageBody, contentType, messageType, messageRevision); - } - } - /** * Builder class to instantiate a {@link DefaultKafkaMessageConverter}. *

@@ -298,7 +260,6 @@ public static class Builder { * Sets the serializer to serialize the Event Message's payload with. * * @param serializer The serializer to serialize the Event Message's payload with - * * @return the current Builder instance, for fluent interfacing */ public Builder serializer(Serializer serializer) { @@ -312,7 +273,6 @@ public Builder serializer(Serializer serializer) { * the key for the {@link ProducerRecord}. Defaults to a {@link SequentialPerAggregatePolicy} instance. * * @param sequencingPolicy a {@link SequencingPolicy} used to generate the key for the {@link ProducerRecord} - * * @return the current Builder instance, for fluent interfacing */ public Builder sequencingPolicy(SequencingPolicy> sequencingPolicy) { @@ -328,7 +288,6 @@ public Builder sequencingPolicy(SequencingPolicy> sequen * * @param headerValueMapper a {@link BiFunction} of {@link String}, {@link Object} and {@link RecordHeader}, * used for mapping values to Kafka headers - * * @return the current Builder instance, for fluent interfacing */ public Builder headerValueMapper(BiFunction headerValueMapper) { @@ -341,7 +300,6 @@ public Builder headerValueMapper(BiFunction header * Sets the {@code upcasterChain} to be used during the consumption of events. * * @param upcasterChain upcaster chain to be used on event reading. - * * @return the current Builder instance, for fluent interfacing */ public Builder upcasterChain(EventUpcasterChain upcasterChain) { diff --git a/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/HeaderUtils.java b/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/HeaderUtils.java index 8372c527..42665609 100644 --- a/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/HeaderUtils.java +++ b/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/HeaderUtils.java @@ -77,6 +77,20 @@ public static Long valueAsLong(Headers headers, String key) { return asLong(value(headers, key)); } + /** + * Return a {@link Long} representation of the {@code value} stored under a given {@code key} inside the {@link + * Headers}. In case of a missing entry {@code null} is returned. + * + * @param headers the Kafka {@code headers} to pull the {@link Long} value from + * @param key the key corresponding to the expected {@link Long} value + * @param defaultValue the default value to return when {@code key} does not exist in the given {@code headers} + * @return the value as a {@link Long} corresponding to the given {@code key} in the {@code headers} + */ + public static Long valueAsLong(Headers headers, String key, Long defaultValue) { + Long value = asLong(value(headers, key)); + return value != null ? value : defaultValue; + } + /** * Converts bytes to {@link String}. * From 1eb66b5bfe44ee89fd2019575ec6d53e5cbf5f0d Mon Sep 17 00:00:00 2001 From: Simon Zambrovski Date: Thu, 21 Oct 2021 12:41:40 +0200 Subject: [PATCH 4/8] Update kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/DefaultKafkaMessageConverter.java Co-authored-by: Lucas Campos --- .../kafka/eventhandling/DefaultKafkaMessageConverter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/DefaultKafkaMessageConverter.java b/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/DefaultKafkaMessageConverter.java index 325e36a9..c69f28d0 100644 --- a/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/DefaultKafkaMessageConverter.java +++ b/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/DefaultKafkaMessageConverter.java @@ -57,7 +57,7 @@ * configured {@link Serializer} and passed as the Kafka record's body. *

*

- * If an up-caster / up-caster chain is configured, this converter will pass the converted messages through the it. + * If an up-caster / up-caster chain is configured, this converter will pass the converted messages through it. * Please note, that the since the message converter consumes records one-by-one, the up-casting functionality is * limited to one-to-one and one-to-many up-casters only. *

From c3515919e2ff7b84ec26f844a764efb327db26a5 Mon Sep 17 00:00:00 2001 From: Simon Zambrovski Date: Thu, 21 Oct 2021 12:41:47 +0200 Subject: [PATCH 5/8] Update kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/DefaultKafkaMessageConverter.java Co-authored-by: Lucas Campos --- .../kafka/eventhandling/DefaultKafkaMessageConverter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/DefaultKafkaMessageConverter.java b/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/DefaultKafkaMessageConverter.java index c69f28d0..ebe5082c 100644 --- a/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/DefaultKafkaMessageConverter.java +++ b/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/DefaultKafkaMessageConverter.java @@ -58,7 +58,7 @@ *

*

* If an up-caster / up-caster chain is configured, this converter will pass the converted messages through it. - * Please note, that the since the message converter consumes records one-by-one, the up-casting functionality is + * Please note, that since the message converter consumes records one-by-one, the up-casting functionality is * limited to one-to-one and one-to-many up-casters only. *

* This implementation will suffice in most cases. From e2abba5a7a9a28a48c7fe1698fc648c6a024ac0b Mon Sep 17 00:00:00 2001 From: Simon Zambrovski Date: Thu, 21 Oct 2021 12:42:14 +0200 Subject: [PATCH 6/8] Update kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/HeaderUtils.java Co-authored-by: Lucas Campos --- .../extensions/kafka/eventhandling/HeaderUtils.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/HeaderUtils.java b/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/HeaderUtils.java index 42665609..e234f779 100644 --- a/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/HeaderUtils.java +++ b/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/HeaderUtils.java @@ -79,7 +79,7 @@ public static Long valueAsLong(Headers headers, String key) { /** * Return a {@link Long} representation of the {@code value} stored under a given {@code key} inside the {@link - * Headers}. In case of a missing entry {@code null} is returned. + * Headers}. In case of a missing entry {@code defaultValue} is returned. * * @param headers the Kafka {@code headers} to pull the {@link Long} value from * @param key the key corresponding to the expected {@link Long} value From c3683ed34055bdd32ae1c74bfdb4a59f41004cf6 Mon Sep 17 00:00:00 2001 From: Simon Zambrovski Date: Thu, 21 Oct 2021 12:45:14 +0200 Subject: [PATCH 7/8] chore: improved formatting for better reading --- .../kafka/autoconfig/KafkaAutoConfiguration.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/kafka-spring-boot-autoconfigure/src/main/java/org/axonframework/extensions/kafka/autoconfig/KafkaAutoConfiguration.java b/kafka-spring-boot-autoconfigure/src/main/java/org/axonframework/extensions/kafka/autoconfig/KafkaAutoConfiguration.java index 80cf04a1..881bbcb2 100644 --- a/kafka-spring-boot-autoconfigure/src/main/java/org/axonframework/extensions/kafka/autoconfig/KafkaAutoConfiguration.java +++ b/kafka-spring-boot-autoconfigure/src/main/java/org/axonframework/extensions/kafka/autoconfig/KafkaAutoConfiguration.java @@ -88,8 +88,11 @@ public KafkaMessageConverter kafkaMessageConverter( @Qualifier("eventSerializer") Serializer eventSerializer, AxonConfiguration config ) { - return DefaultKafkaMessageConverter.builder().serializer(eventSerializer).upcasterChain( - config.upcasterChain() != null ? config.upcasterChain() : new EventUpcasterChain()).build(); + return DefaultKafkaMessageConverter + .builder() + .serializer(eventSerializer) + .upcasterChain(config.upcasterChain() != null ? config.upcasterChain() : new EventUpcasterChain()) + .build(); } @Bean("axonKafkaProducerFactory") From 51cfdb5b3f6663e8f07174b99aa5be68b58dcdff Mon Sep 17 00:00:00 2001 From: Simon Zambrovski Date: Wed, 27 Oct 2021 22:43:09 +0200 Subject: [PATCH 8/8] test: add missing test --- .../kafka/eventhandling/HeaderUtilsTest.java | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/kafka/src/test/java/org/axonframework/extensions/kafka/eventhandling/HeaderUtilsTest.java b/kafka/src/test/java/org/axonframework/extensions/kafka/eventhandling/HeaderUtilsTest.java index 68e90911..b225518d 100644 --- a/kafka/src/test/java/org/axonframework/extensions/kafka/eventhandling/HeaderUtilsTest.java +++ b/kafka/src/test/java/org/axonframework/extensions/kafka/eventhandling/HeaderUtilsTest.java @@ -127,8 +127,13 @@ void testReadingValuesAsLongExistingKeyShouldReturnLong() { addHeader(headers, "negative", -4_8912_00_921_388_62621L); assertEquals(4_891_00_921_388_62621L, valueAsLong(headers, "positive")); + assertEquals(4_891_00_921_388_62621L, valueAsLong(headers, "positive", 42L)); assertEquals(0, valueAsLong(headers, "zero")); + assertEquals(0, valueAsLong(headers, "zero", 42L)); assertEquals(-4_8912_00_921_388_62621L, valueAsLong(headers, "negative")); + + assertEquals(-4_8912_00_921_388_62621L, valueAsLong(headers, "negative", 42L)); + } @Test @@ -136,6 +141,11 @@ void testReadingValueAsLongNonExistingKeyShouldReturnNull() { assertNull(valueAsLong(new RecordHeaders(), "some-invalid-key")); } + @Test + void testReadingValueAsLongNonExistingKeyShouldReturnDefaultValue() { + assertEquals(42L, valueAsLong(new RecordHeaders(), "some-invalid-key", 42L)); + } + @Test void testWritingTimestampShouldBeWrittenAsLong() { RecordHeaders target = new RecordHeaders();