Skip to content

Latest commit

 

History

History
124 lines (91 loc) · 5.28 KB

task-extractor.adoc

File metadata and controls

124 lines (91 loc) · 5.28 KB

TaskExtractor

Note

From Decaton 9.0.0, you can just use ProcessorsBuilder#consuming(String topic, Deserializer<T> deserializer) to consume arbitrary topics, without worrying if the topic is produced by DecatonClient or not.

You may need to read through this guide ONLY in specific situations described just below.

Decaton provides two ways to consume topics in ProcessorsBuilder:

  • ProcessorsBuilder#consuming(String topic, Deserializer<T> deserializer)

  • ProcessorsBuilder#consuming(String topic, TaskExtractor<T> deserializer)

As you may have learned through Getting Started, former is the most common and convenient way to consume topics, where you can just pass a value deserializer.

However, sometimes you may need to apply custom logic to extract a task from raw consumed messages:

  • You need to extract custom task metadata on consumption. (e.g. Set scheduledTimeMillis for delayed processing)

  • You need to access additional information (e.g. record headers) for deserialization

This is where latter way with TaskExtractor comes in.

This guide will show you how to implement TaskExtractor and use it.

Through this guide, we assume the topic is JSON-serialized and use jackson-databind for deserialization.

TaskExtractor

First, you need to start by implementing your own TaskExtractor to extract a task from raw consumed messages.

JSONUserEventExtractor.java
public class JSONUserEventExtractor implements TaskExtractor<UserEvent> {
    private static final ObjectMapper MAPPER = new ObjectMapper();

    @Override
    public DecatonTask<UserEvent> extract(ConsumedRecord record) {
        try {
            UserEvent event = MAPPER.readValue(record.value(), UserEvent.class);
            TaskMetadata metadata = TaskMetadata.builder()
                                                // Filling timestampMillis is not mandatory, but it would be useful
                                                // when you monitor delivery latency between event production time and event processing time.
                                                // Also, this will be used for scheduling tasks when scheduledTimeMillis is set.
                                                .timestampMillis(event.getEventTimestampMillis())
                                                // This field is not mandatory too, but you can track which application produced the task by filling this.
                                                .sourceApplicationId("event-tracker")
                                                // You can set other TaskMetadata fields as you needed
                                                .build();

            return new DecatonTask<>(metadata, event, record.value());
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }
}
Caution

If TaskExtractor#extract throws an exception, an error will be logged and the task which was being processed will be discarded. However the processor will continue to process subsequent tasks.

DecatonProcessor

Second, let’s implement a DecatonProcessor to process a UserEvent task.

UserEventProcessor.java
public class UserEventProcessor implements DecatonProcessor<UserEvent> {
    @Override
    public void process(ProcessingContext<UserEvent> context, UserEvent task) throws InterruptedException {
        System.out.printf("Noticed %s is %d years old\n",
                          task.getName(),
                          task.getAge());
    }
}

As you can see, there’s no difference the implementation of the DecatonProcessor from the case where you use Deserializer.

Lastly, you need to instantiate ProcessorSubscription as follows.

UserEventProcessorMain.java
...

ProcessorsBuilder.consuming(
        "my-decaton-json-topic",
        // This line is the only difference from regular Decaton processor.
        new JSONUserEventExtractor())
                 .thenProcess(new UserEventProcessor())

...

Run Example

Now we are ready to process a JSON topic with custom task extraction logic.

Before trying out, let’s download and extract the kafka binary from https://kafka.apache.org/downloads to use kafka-console-producer.sh.

After that, let’s create my-decaton-json-topic then run the example processor and produce a JSON message.

$ ./gradlew shadowJar

$ java -cp build/libs/example-*-all.jar -Dbootstrap.servers=$KAFKA_BOOTSTRAP_SERVERS example.UserEventProcessorMain &

$ /path/to/kafka_dir/bin/kafka-console-producer.sh --broker-list $KAFKA_BOOTSTRAP_SERVERS --topic my-decaton-json-topic

> {"eventTimestampMillis":1571368115079,"name": "daisuke","age": 52}

Noticed daisuke is 52 years old