Skip to content

Commit 4c4aeb6

Browse files
authored
Merge pull request #1802 from cescoffier/virtual-threads-blog-4
Virtual threads - blog post 4 (Kafka)
2 parents d5b7fc2 + 955de40 commit 4c4aeb6

File tree

5 files changed

+134
-0
lines changed

5 files changed

+134
-0
lines changed

_posts/2023-09-19-virtual-thread-1.adoc

+1
Original file line numberDiff line numberDiff line change
@@ -357,6 +357,7 @@ Next, we will cover:
357357

358358
- https://quarkus.io/blog/virtual-threads-2/[How to write a crud application using virtual threads]
359359
- https://quarkus.io/blog/virtual-threads-3/[How to test virtual threads applications]
360+
- https://quarkus.io/blog/virtual-threads-4/[How to process Kafka messages using virtual threads]
360361
- How to build a native executable when using virtual threads (_to be published_)
361362
- How to containerize an application using virtual threads (in JVM mode) (_to be published_)
362363
- How to containerize an application using virtual threads in native mode (_to be published_)
+133
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
---
2+
layout: post
3+
title: 'Processing Kafka records on virtual threads'
4+
date: 2023-10-09
5+
tags: virtual-threads reactive redis kafka messaging
6+
synopsis: 'Learn about the virtual threads integration in Quarkus messaging (Kafka, AMQP, Pulsar...).'
7+
author: cescoffier
8+
---
9+
:imagesdir: /assets/images/posts/virtual-threads
10+
11+
In https://quarkus.io/blog/virtual-threads-2/[another blog post], we have seen how you can implement a CRUD application with Quarkus to utilize virtual threads.
12+
The virtual threads support in Quarkus is not limited to REST and HTTP.
13+
Many other parts support virtual threads, such as gRPC, scheduled tasks, and messaging.
14+
In this post, we will see how you can process Kafka records on virtual threads, increasing the concurrency of the processing.
15+
16+
## Processing messages on virtual threads
17+
18+
The Quarkus Reactive Messaging extension supports virtual threads.
19+
Similarly to HTTP, to execute the processing on a virtual thread, you need to use the `@RunOnVirtualThread` annotation:
20+
21+
[source, java]
22+
----
23+
@Incoming("input-channel")
24+
@Outgoing("output-channel")
25+
@RunOnVirtualThread
26+
public Fraud detect(Transaction tx) {
27+
// Run on a virtual thread
28+
}
29+
----
30+
31+
The processing of each message runs on separate virtual threads.
32+
So, for each message from the `input-channel`, a new virtual thread is created (as seen in https://quarkus.io/blog/virtual-thread-1/[this blog post], virtual thread creation is cheap).
33+
34+
image::virtual-thread-messaging.png[Threading model of the messaging application,400,float="right",align="center"]
35+
36+
This execution model can be used with any Quarkus reactive messaging connector, including AMQP 1.0, Apache Pulsar, and Apache Kafka.
37+
The concurrency of this processing is no longer limited by the number of worker threads, as it would with the `@Blocking` annotation.
38+
Thus, this novel execution model simplifies the development of highly concurrent data streaming applications.
39+
40+
As we will see later, such high-level concurrency can cause problems.
41+
To keep this concurrency controllable, Quarkus limits the number of concurrent message processing to `1024` (This default value is https://quarkus.io/guides/messaging-virtual-threads[configurable]).
42+
One of the main benefits of this limit is preventing the application from polling millions of messages, which would be very expensive in terms of memory.
43+
Without this limit, a Kafka application would poll all the records from the assigned topics-partitions and consume a large amount of memory.
44+
45+
Also, you may wonder why we do not use virtual threads by default.
46+
The reasons have been explained in https://quarkus.io/blog/virtual-thread-1/#five-things-you-need-to-know-before-using-virtual-threads-for-everything[a previous blog post].
47+
There are limitations that can make virtual threads dangerous.
48+
You need to make sure your virtual threads usage is safe before using it.
49+
We will see a few examples in this post.
50+
51+
## Processing Kafka records on virtual threads
52+
53+
To illustrate how to process Kafka records on virtual threads, let's consider a simple application.
54+
This application is a fake fraud detector.
55+
It analyzes banking transactions, and if the transaction amount for a given account in a given period of time reaches a threshold, we consider there is fraud.
56+
The code is available in this https://github.com/quarkusio/virtual-threads-demos/tree/main/kafka-example[GitHub repository].
57+
Of course, you can use more complex detection algorithms, and even use AI/ML.
58+
In this case, we use the https://redis.io/docs/data-types/timeseries/[Redis time series] commands inefficiently to introduce more I/O than necessary.
59+
It is done purposefully to utilize the virtual thread's ability to block:
60+
61+
[source, java]
62+
----
63+
@Incoming("tx")
64+
@Outgoing("frauds")
65+
@RunOnVirtualThread
66+
public Fraud detect(Transaction tx) {
67+
String key = "account:transactions:" + tx.account;
68+
69+
// Add sample
70+
long timestamp = tx.date.toInstant(ZoneOffset.UTC).toEpochMilli();
71+
timeseries.tsAdd(key, timestamp, tx.amount, new AddArgs()
72+
.onDuplicate(DuplicatePolicy.SUM));
73+
74+
// Retrieve the last sum.
75+
var range = timeseries.tsRevRange(key, TimeSeriesRange.fromTimeSeries(),
76+
// 1 min for demo purpose.
77+
new RangeArgs().aggregation(Aggregation.SUM, Duration.ofMinutes(1))
78+
.count(1));
79+
80+
if (!range.isEmpty()) {
81+
// Analysis
82+
var sum = range.get(0).value;
83+
if (sum > 10_000) {
84+
Log.warnf("Fraud detected for account %s: %.2f", tx.account, sum);
85+
return new Fraud(tx.account, sum);
86+
}
87+
}
88+
return null;
89+
}
90+
----
91+
92+
If you run this application and have a burst of transactions, it will not work.
93+
The processing is correctly executed on virtual threads.
94+
However, the Redis connection pool has not been tuned to handle that concurrency level.
95+
Very quickly, no Redis connections are available, and it starts enqueuing the commands into a waiting list.
96+
When this queue is full, it starts rejecting the commands.
97+
Fortunately, you can configure the max size of the waiting queue with:
98+
99+
[source, properties]
100+
----
101+
# Increase Redis pool size (and waiting queue size) as we will have a lot of concurrency
102+
quarkus.redis.max-pool-size=100 # Number of connection in the pool
103+
quarkus.redis.max-pool-waiting=10000 # Waiting queue max size
104+
----
105+
106+
While we use Redis in this application, you will face identical problems with many other clients (including HTTP clients).
107+
So, configure them properly to handle this new level of concurrency.
108+
109+
If you run the application and open the UI, you will see that the concurrency reached a maximum of 1024, as expected.
110+
111+
image::fraud-detection-screenshot.png[The application reached 1024 as top concurrency,800,float="right",align="center"]
112+
113+
## A note about pinning and monopolization
114+
115+
Our messaging connectors have been tailored to avoid pinning.
116+
It is also the case for the Quarkus Redis client.
117+
Thus, this application does not pin the carrier thread.
118+
119+
But pinning is not the only problem that can arise.
120+
While virtual threads can be appealing, you must be careful not to monopolize the carrier thread.
121+
If, for example, you implemented a complex and CPU-intensive detection algorithm instead of relying on Redis, you would likely monopolize the carrier thread, defeating the purpose of virtual threads.
122+
It will force the JVM to create new carrier threads, ultimately increasing memory usage.
123+
The JVM will limit the number of created carrier threads.
124+
When this happens, your application will under-perform as your tasks will be enqueued until a carrier thread is available.
125+
126+
## Summary
127+
128+
This post explains how you can execute message processing on virtual threads.
129+
While the example uses Kafka, you can use the same approach with the other messaging connectors provided by Quarkus.
130+
Do not forget that such kind of application:
131+
132+
* requires tuning connection pools, as the concurrency is much higher than before
133+
* can lead to monopolization if your processing is CPU-intensive
Loading
Loading
Loading

0 commit comments

Comments
 (0)