The Events generated by the application need to be dispatched to the components that update the query databases, search engines or any other resources that need them: the Event Handlers. It is the responsibility of the Event Bus to dispatch Event Messages to all components interested. On the receiving end, Event Processors are responsible for handling those events, which includes invocation of the appropriate Event Handlers.
In the vast majority of cases, the Aggregates will publish events by applying them. However, occasionally, it is necessary to publish an event (possibly from within another component), directly to the Event Bus. To publish an event, simply wrap the payload describing the event in an EventMessage
. The GenericEventMessage.asEventMessage(Object)
method allows you to wrap any object into an EventMessage
. If the passed object is already an EventMessage
, it is simply returned.
The EventBus
is the mechanism that dispatches events to the subscribed event handlers. Axon provides two implementation of the Event Bus: SimpleEventBus
and EmbeddedEventStore
. While both implementations support subscribing and tracking processors (see Events Processors), the EmbeddedEventStore
persists events, which allows you to replay them at a later stage. The SimpleEventBus
has a volatile storage and 'forgets' events as soon as they have been published to subscribed components.
When using the Configuration API, the SimpleEventBus
is used by default. To configure the EmbeddedEventStore
instead, you need to supply an implementation of a StorageEngine, which does the actual storage of Events.
Configurer configurer = DefaultConfigurer.defaultConfiguration();
configurer.configureEmbeddedEventStore(c -> new InMemoryEventStorageEngine());
Event Handlers define the business logic to be performed when an Event is received. Event Processors are the components that take care of the technical aspects of that processing. They start a Unit of Work and possibly a transaction, but also ensure that correlation data can be correctly attached to all messages created during Event processing.
Event Processors come in roughly two forms: Subscribing and Tracking. The Subscribing Event Processors subscribe themselves to a source of Events and are invoked by the thread managed by the publishing mechanism. Tracking Event Processors, on the other hand, pull their messages from a source using a thread that it manages itself.
All processors have a name, which identifies a processor instance across JVM instances. Two processors with the same name, can be considered as two instances of the same processor.
All Event Handlers are attached to a Processor whose name is the package name of the Event Handler's class.
For example, the following classes:
org.axonframework.example.eventhandling.MyHandler
,org.axonframework.example.eventhandling.MyOtherHandler
, andorg.axonframework.example.eventhandling.module.MyHandler
will trigger the creation of two Processors:
org.axonframework.example.eventhandling
with 2 handlers, andorg.axonframework.example.eventhandling.module
with a single handler
The Configuration API allows you to configure other strategies for assigning classes to processors, or even assign specific instances to specific processors.
To order Event Handlers within an Event Processor, the ordering in which Event Handlers are registered (as described in the Registering Event Handlers section) is guiding. Thus, the ordering in which Event Handlers will be called by an Event Processor for Event Handling is their insertion ordering in the configuration API.
If Spring is selected as the mechanism to wire everything, the ordering of the Event Handlers can be specified by adding the @Order
annotation. This annotation should be placed on class level of your Event Handler class, adding a integer
value to specify the ordering.
Do note that it is not possible to order Event Handlers which are not a part of the same Event Processor.
Processors take care of the technical aspects of handling an event, regardless of the business logic triggered by each event. However, the way "regular" (singleton, stateless) event handlers are Configured is slightly different from Sagas, as different aspects are important for both types of handlers.
By default, Axon will use Subscribing Event Processors. It is possible to change how Handlers are assigned and how processors are configured using the EventHandlingConfiguration
class of the Configuration API.
The EventHandlingConfiguration
class defines a number of methods that can be used to define how processors need to be configured.
registerEventProcessorFactory
allows you to define a default factory method that creates Event Processors for which no explicit factories have been defined.registerEventProcessor(String name, EventProcessorBuilder builder)
defines the factory method to use to create a Processor with givenname
. Note that such Processor is only created ifname
is chosen as the processor for any of the available Event Handler beans.registerTrackingProcessor(String name)
defines that a processor with given name should be configured as a Tracking Event Processor, using default settings. It is configured with a TransactionManager and a TokenStore, both taken from the main configuration by default.registerTrackingProcessor(String name, Function<Configuration, TrackingEventProcessorConfiguration> processorConfiguration, Function<Configuration, SequencingPolicy<? super EventMessage<?>>> sequencingPolicy)
defines that a processor with given name should be configured as a Tracking Processor, and use the givenTrackingEventProcessorConfiguration
to read the configuration settings for multi-threading. TheSequencingPolicy
defines which expectations the processor has on sequential processing of events. See Parallel Processing for more details.usingTrackingProcessors()
sets the default to Tracking Processors instead of Subscribing ones.
Sagas are configured using the SagaConfiguration
class. It provides static methods to initialize an instance either for Tracking Processing, or Subscribing.
To configure a Saga to run in subscribing mode, simply do:
SagaConfiguration<MySaga> sagaConfig = SagaConfiguration.subscribingSagaManager(MySaga.class);
If you don't want to use the default EventBus / Store as source for this Saga to get its messages from, you can define another source of messages as well:
SagaConfiguration.subscribingSagaManager(MySaga.class, c -> /* define source here */);
Another variant of the subscribingSagaManager()
method allows you to pass a (builder for an) EventProcessingStrategy
. By default, Sagas are invoked in synchronously. This can be made asynchronous using this method. However, using Tracking Processors is the preferred way for asynchronous invocation.
To configure a Saga to use a Tracking Processor, simply do:
SagaConfiguration.trackingSagaManager(MySaga.class);
This will set the default properties, meaning a single Thread is used to process events. To change this:
SagaConfiguration.trackingSagaManager(MySaga.class)
// configure 4 threads
.configureTrackingProcessor(c -> TrackingProcessingConfiguration.forParallelProcessing(4))
The TrackingProcessingConfiguration
has a few methods allowing you to specify how many segments will be created and which ThreadFactory should be used to create Processor threads. See Parallel Processing for more details.
Check out the API documentation (JavaDoc) of the SagaConfiguration
class for full details on how to configure event handling for a Saga.
Tracking Processors, unlike Subscribing ones, need a Token Store to store their progress in. Each message a Tracking Processor receives through its Event Stream is accompanied by a Token. This Token allows the processor to reopen the Stream at any later point, picking up where it left off with the last Event.
The Configuration API takes the Token Store, as well as most other components Processors need from the Global Configuration instance. If no TokenStore is explicitly defined, an InMemoryTokenStore
is used, which is not recommended in production.
To configure a different Token Store, use Configurer.registerComponent(TokenStore.class, conf -> ... create token store ...)
Note that you can override the TokenStore to use with Tracking Processors in the respective EventHandlingConfiguration
or SagaConfiguration
that defines that Processor. Where possible, it is recommended to use a Token Store that stores tokens in the same database as where the Event Handlers update the view models. This way, changes to the view model can be stored atomically with the changed tokens, guaranteeing exactly once processing semantics.
As of Axon Framework 3.1, Tracking Processors can use multiple threads to process an Event Stream. They do so, by claiming a so-called segment, identifier by a number. Normally, a single thread will process a single Segment.
The number of Segments used can be defined. When a Processor starts for the first time, it can initialize a number of segments. This number defines the maximum number of threads that can process events simultaneously. Each node running of a TrackingProcessor will attempt to start its configured amount of Threads, to start processing these.
Event Handlers may have specific expectations on the ordering of events. If this is the case, the processor must ensure these events are sent to these Handlers in that specific order. Axon uses the SequencingPolicy
for this. The SequencingPolicy
is essentially a function, that returns a value for any given message. If the return value of the SequencingPolicy
function is equal for two distinct event messages, it means that those messages must be processed sequentially. By default, Axon components will use the SequentialPerAggregatePolicy
, which makes it so that Events published by the same Aggregate instance will be handled sequentially.
A Saga instance is never invoked concurrently by multiple threads. Therefore, a Sequencing Policy for a Saga is irrelevant. Axon will ensure each Saga instance receives the Events it needs to process in the order they have been published on the Event Bus.
Note
Note that Subscribing Processors don't manage their own threads. Therefore, it is not possible to configure how they should receive their events. Effectively, they will always work on a sequential-per-aggregate basis, as that is generally the level of concurrency in the Command Handling component.
For tracking processors, it doesn't matter whether the Threads handling the events are all running on the same node, or on different nodes hosting the same (logical) TrackingProcessor. When two instances of TrackingProcessor, having the same name, are active on different machines, they are considered two instances of the same logical processor. They will 'compete' for segments of the Event Stream. Each instance will 'claim' a segment, preventing events assigned to that segment from being processed on the other nodes.
The TokenStore
instance will use the JVM's name (usually a combination of the host name and process ID) as the default nodeId
. This can be overridden in TokenStore
implementations that support multi-node processing.
In some cases, it is necessary to publish events to an external system, such as a message broker.
Axon provides out-of-the-box support to transfer Events to and from an AMQP message broker, such as Rabbit MQ.
The SpringAMQPPublisher
forwards events to an AMQP Exchange. It is initialized with a SubscribableMessageSource
, which is generally the EventBus
or EventStore
. Theoretically, this could be any source of Events that the publisher can Subscribe to.
To configure the SpringAMQPPublisher, simply define an instance as a Spring Bean. There is a number of setter methods that allow you to specify the behavior you expect, such as Transaction support, publisher acknowledgements (if supported by the broker), and the exchange name.
The default exchange name is 'Axon.EventBus'.
Note
Note that exchanges are not automatically created. You must still declare the Queues, Exchanges and Bindings you wish to use. Check the Spring documentation for more information.
Spring has extensive support for reading messages from an AMQP Queue. However, this needs to be 'bridged' to Axon, so that these messages can be handled from Axon as if they are regular Event Messages.
The SpringAMQPMessageSource
allows Event Processors to read messages from a Queue, instead of the Event Store or Event Bus. It acts as an adapter between Spring AMQP and the SubscribableMessageSource
needed by these processors.
The easiest way to configure the SpringAMQPMessageSource, is by defining a bean which overrides the default onMessage
method and annotates it with @RabbitListener
, as follows:
@Bean
public SpringAMQPMessageSource myMessageSource(Serializer serializer) {
return new SpringAMQPMessageSource(serializer) {
@RabbitListener(queues = "myQueue")
@Override
public void onMessage(Message message, Channel channel) throws Exception {
super.onMessage(message, channel);
}
};
}
Spring's @RabbitListener
annotation tells Spring that this method needs to be invoked for each message on the given Queue ('myQueue' in the example). This method simply invokes the super.onMessage()
method, which performs the actual publication of the Event to all the processors that have been subscribed to it.
To subscribe Processors to this MessageSource, pass the correct SpringAMQPMessageSource
instance to the constructor of the Subscribing Processor:
// in an @Configuration file:
@Autowired
public void configure(EventHandlingConfiguration ehConfig, SpringAmqpMessageSource myMessageSource) {
ehConfig.registerSubscribingEventProcessor("myProcessor", c -> myMessageSource);
}
Note that Tracking Processors are not compatible with the SpringAMQPMessageSource.
The recommended approach to handle Events asynchronously is by using a Tracking Event Processor. This implementation can guarantee processing of all events, even in case of a system failure (assuming the Events have been persisted).
However, it is also possible to handle Events asynchronously in a SubscribingProcessor
. To achieve this, the SubscribingProcessor
must be configured with an EventProcessingStrategy
. This strategy can be used to change how invocations of the Event Listeners should be managed.
The default strategy (DirectEventProcessingStrategy
) invokes these handlers in the thread that delivers the Events. This allows processors to use existing transactions.
The other Axon-provided strategy is the AsynchronousEventProcessingStrategy
. It uses an Executor to asynchronously invoke the Event Listeners.
Even though the AsynchronousEventProcessingStrategy
executes asynchronously, it is still desirable that certain events are processed sequentially. The SequencingPolicy
defines whether events must be handled sequentially, in parallel or a combination of both. Policies return a sequence identifier of a given event. If the policy returns an equal identifier for two events, this means that they must be handled sequentially by the event handler. A null
sequence identifier means the event may be processed in parallel with any other event.
Axon provides a number of common policies you can use:
- The
FullConcurrencyPolicy
will tell Axon that this event handler may handle all events concurrently. This means that there is no relationship between the events that require them to be processed in a particular order. - The
SequentialPolicy
tells Axon that all events must be processed sequentially. Handling of an event will start when the handling of a previous event is finished. SequentialPerAggregatePolicy
will force domain events that were raised from the same aggregate to be handled sequentially. However, events from different aggregates may be handled concurrently. This is typically a suitable policy to use for event listeners that update details from aggregates in database tables.
Besides these provided policies, you can define your own. All policies must implement the SequencingPolicy
interface. This interface defines a single method, getSequenceIdentifierFor
, that returns the sequence identifier for a given event. Events for which an equal sequence identifier is returned must be processed sequentially. Events that produce a different sequence identifier may be processed concurrently. For performance reasons, policy implementations should return null
if the event may be processed in parallel to any other event. This is faster, because Axon does not have to check for any restrictions on event processing.
It is recommended to explicitly define an ErrorHandler
when using the AsynchronousEventProcessingStrategy
. The default ErrorHandler
propagates exceptions, but in an asynchronous execution, there is nothing to propagate to, other than the Executor. This may result in Events not being processed. Instead, it is recommended to use an ErrorHandler
that reports errors and allows processing to continue. The ErrorHandler
is configured on the constructor of the SubscribingEventProcessor
, where the EventProcessingStrategy
is also provided.