Introduce quarkus-signals experimental extension#53788
Introduce quarkus-signals experimental extension#53788mkouba wants to merge 1 commit intoquarkusio:mainfrom
Conversation
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
|
🎊 PR Preview 2a778de has been successfully built and deployed to https://quarkus-pr-main-53788-preview.surge.sh/version/main/guides/
|
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
- that allows application components to interact in a loosely coupled fashion, by emitting and receiving _signals_ - see also WG Proposal - Unified Events: quarkusio#53202 - this commit includes the extension itself, the docs and integration tests
Status for workflow
|
Status for workflow
|
| Status | Name | Step | Failures | Logs | Raw logs | Build scan |
|---|---|---|---|---|---|---|
| ✔️ | JVM Tests - JDK 17 | Logs | Raw logs | 🔍 | ||
| ✔️ | JVM Tests - JDK 21 | Logs | Raw logs | 🔍 | ||
| ✔️ | JVM Tests - JDK 25 | Logs | Raw logs | 🔍 | ||
| ✔️ | JVM Tests - JDK 25 Semeru | Logs | Raw logs | 🔍 | ||
| ❌ | JVM Tests - JDK 17 Windows | Cache Develocity local cache |
Logs | Raw logs | 🚧 | |
| ✔️ | Kubernetes Tests - JDK 25 | Logs | Raw logs | 🔍 | ||
| ❌ | Kubernetes Tests - JDK 25 Semeru | Build |
Failures | Logs | Raw logs | 🔍 |
| ✔️ | Kubernetes Tests - JDK 17 Windows | Logs | Raw logs | 🔍 | ||
| ❌ | Native Tests - Misc1 | Build |
Logs | Raw logs | 🚧 |
You can consult the Develocity build scans.
Failures
⚙️ Kubernetes Tests - JDK 25 Semeru #
- Failing: integration-tests/kubernetes/maven-invoker-way
📦 integration-tests/kubernetes/maven-invoker-way
❌ Failed to execute goal org.apache.maven.plugins:maven-invoker-plugin:3.9.1:run (integration-tests) on project quarkus-integration-test-kubernetes-invoker: 1 build failed. See console output above for details.
Flaky tests - Develocity
⚙️ JVM Tests - JDK 25
📦 extensions/vertx-http/deployment
❌ io.quarkus.vertx.http.filters.GracefulShutdownFilterTest.test - History
Timeout waiting for response-java.lang.RuntimeException
Details
java.lang.RuntimeException: Timeout waiting for response
at io.quarkus.vertx.http.filters.GracefulShutdownFilterTest.testWithVertxHttpClientAndHttp2AfterShutdown(GracefulShutdownFilterTest.java:85)
at io.quarkus.vertx.http.filters.GracefulShutdownFilterTest.test(GracefulShutdownFilterTest.java:59)
⚙️ JVM Tests - JDK 25 Semeru
📦 extensions/quartz/deployment
❌ io.quarkus.quartz.test.PausedSchedulerTest.testSchedulerPauseResume - History
expected: <false> but was: <true>-org.opentest4j.AssertionFailedError
Details
org.opentest4j.AssertionFailedError: expected: <false> but was: <true>
at org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:158)
at org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:139)
at org.junit.jupiter.api.AssertFalse.failNotFalse(AssertFalse.java:69)
at org.junit.jupiter.api.AssertFalse.assertFalse(AssertFalse.java:41)
at org.junit.jupiter.api.AssertFalse.assertFalse(AssertFalse.java:35)
at org.junit.jupiter.api.Assertions.assertFalse(Assertions.java:246)
at io.quarkus.quartz.test.PausedSchedulerTest.testSchedulerPauseResume(PausedSchedulerTest.java:47)
📦 extensions/vertx-http/deployment
❌ io.quarkus.vertx.http.filters.GracefulShutdownFilterTest.test - History
Timeout waiting for response-java.lang.RuntimeException
Details
java.lang.RuntimeException: Timeout waiting for response
at io.quarkus.vertx.http.filters.GracefulShutdownFilterTest.testWithVertxHttpClientAndHttp2AfterShutdown(GracefulShutdownFilterTest.java:85)
at io.quarkus.vertx.http.filters.GracefulShutdownFilterTest.test(GracefulShutdownFilterTest.java:59)
⚙️ Native Tests - Messaging1
📦 integration-tests/reactive-messaging-kafka
❌ io.quarkus.it.kafka.KafkaConnectorIT.testPets - History
Assertion condition defined as a Lambda expression in io.quarkus.it.kafka.KafkaConnectorTest expected: <6> but was: <3> within 10 seconds.-org.awaitility.core.ConditionTimeoutException
Details
org.awaitility.core.ConditionTimeoutException: Assertion condition defined as a Lambda expression in io.quarkus.it.kafka.KafkaConnectorTest expected: <6> but was: <3> within 10 seconds.
at org.awaitility.core.ConditionAwaiter.await(ConditionAwaiter.java:167)
at org.awaitility.core.AssertionCondition.await(AssertionCondition.java:119)
at org.awaitility.core.AssertionCondition.await(AssertionCondition.java:31)
at org.awaitility.core.ConditionFactory.until(ConditionFactory.java:1160)
at org.awaitility.core.ConditionFactory.untilAsserted(ConditionFactory.java:790)
at io.quarkus.it.kafka.KafkaConnectorTest.testPets(KafkaConnectorTest.java:62)
at java.base/java.lang.reflect.Method.invoke(Method.java:565)
⚙️ JVM Integration Tests - JDK 17
📦 integration-tests/reactive-messaging-kafka
❌ io.quarkus.it.kafka.KafkaConnectorTest.testPets - History
Assertion condition defined as a Lambda expression in io.quarkus.it.kafka.KafkaConnectorTest expected: <4> but was: <3> within 10 seconds.-org.awaitility.core.ConditionTimeoutException
Details
org.awaitility.core.ConditionTimeoutException: Assertion condition defined as a Lambda expression in io.quarkus.it.kafka.KafkaConnectorTest expected: <4> but was: <3> within 10 seconds.
at org.awaitility.core.ConditionAwaiter.await(ConditionAwaiter.java:167)
at org.awaitility.core.AssertionCondition.await(AssertionCondition.java:119)
at org.awaitility.core.AssertionCondition.await(AssertionCondition.java:31)
at org.awaitility.core.ConditionFactory.until(ConditionFactory.java:1160)
at org.awaitility.core.ConditionFactory.untilAsserted(ConditionFactory.java:790)
at io.quarkus.it.kafka.KafkaConnectorTest.testPets(KafkaConnectorTest.java:62)
Caused by: org.opentest4j.AssertionFailedError: expected: <4> but was: <3>
Assertion condition defined as a Lambda expression in io.quarkus.it.kafka.KafkaConnectorTest expected: <6> but was: <3> within 10 seconds.-org.awaitility.core.ConditionTimeoutException
Details
org.awaitility.core.ConditionTimeoutException: Assertion condition defined as a Lambda expression in io.quarkus.it.kafka.KafkaConnectorTest expected: <6> but was: <3> within 10 seconds.
at org.awaitility.core.ConditionAwaiter.await(ConditionAwaiter.java:167)
at org.awaitility.core.AssertionCondition.await(AssertionCondition.java:119)
at org.awaitility.core.AssertionCondition.await(AssertionCondition.java:31)
at org.awaitility.core.ConditionFactory.until(ConditionFactory.java:1160)
at org.awaitility.core.ConditionFactory.untilAsserted(ConditionFactory.java:790)
at io.quarkus.it.kafka.KafkaConnectorTest.testPets(KafkaConnectorTest.java:62)
Caused by: org.opentest4j.AssertionFailedError: expected: <6> but was: <3>
⚙️ JVM Integration Tests - JDK 25
📦 integration-tests/reactive-messaging-kafka
❌ io.quarkus.it.kafka.KafkaConnectorTest.testDataForKeyed - History
Assertion condition defined as a Lambda expression in io.quarkus.it.kafka.KafkaConnectorTest expected: <3> but was: <4> within 10 seconds.-org.awaitility.core.ConditionTimeoutException
Details
org.awaitility.core.ConditionTimeoutException: Assertion condition defined as a Lambda expression in io.quarkus.it.kafka.KafkaConnectorTest expected: <3> but was: <4> within 10 seconds.
at org.awaitility.core.ConditionAwaiter.await(ConditionAwaiter.java:167)
at org.awaitility.core.AssertionCondition.await(AssertionCondition.java:119)
at org.awaitility.core.AssertionCondition.await(AssertionCondition.java:31)
at org.awaitility.core.ConditionFactory.until(ConditionFactory.java:1160)
at org.awaitility.core.ConditionFactory.untilAsserted(ConditionFactory.java:790)
at io.quarkus.it.kafka.KafkaConnectorTest.testDataForKeyed(KafkaConnectorTest.java:96)
Caused by: org.opentest4j.AssertionFailedError: expected: <3> but was: <4>
Assertion condition defined as a Lambda expression in io.quarkus.it.kafka.KafkaConnectorTest expected: <3> but was: <5> within 10 seconds.-org.awaitility.core.ConditionTimeoutException
Details
org.awaitility.core.ConditionTimeoutException: Assertion condition defined as a Lambda expression in io.quarkus.it.kafka.KafkaConnectorTest expected: <3> but was: <5> within 10 seconds.
at org.awaitility.core.ConditionAwaiter.await(ConditionAwaiter.java:167)
at org.awaitility.core.AssertionCondition.await(AssertionCondition.java:119)
at org.awaitility.core.AssertionCondition.await(AssertionCondition.java:31)
at org.awaitility.core.ConditionFactory.until(ConditionFactory.java:1160)
at org.awaitility.core.ConditionFactory.untilAsserted(ConditionFactory.java:790)
at io.quarkus.it.kafka.KafkaConnectorTest.testDataForKeyed(KafkaConnectorTest.java:96)
Caused by: java.util.concurrent.TimeoutException
⚙️ JVM Integration Tests - JDK 25 Semeru
📦 integration-tests/reactive-messaging-kafka
❌ io.quarkus.it.kafka.KafkaConnectorTest.testRequestReply - History
iterable contents differ at index [0], expected: <reply-1> but was: <{"details":"Error id 82df9a9a-1523-4e9b-9214-3b338f5eec96-1, org.jboss.resteasy.spi.UnhandledException: io.smallrye.reactive.messaging.kafka.reply.KafkaRequestReplyTimeoutException: Timeout waiting for a reply for request with correlation ID: 0403eb21-8fd7-4190-b2b9-9f6d2c83dd29","stack":"org.jboss.resteasy.spi.UnhandledException: io.smallrye.reactive.messaging.kafka.reply.KafkaRequestReplyTimeoutException: Timeout waiting for a reply for request with correlation ID: 0403eb21-8fd7-4190-b2b9-9f6d2c83dd29\n\tat org.jboss.resteasy.core.ExceptionHandler.handleApplicationException(ExceptionHandler.java:107)\n\tat org.jboss.resteasy.core.ExceptionHandler.handleException(ExceptionHandler.java:344)\n\tat org.jboss.resteasy.core.SynchronousDispatcher.writeException(SynchronousDispatcher.java:205)\n\tat org.jboss.resteasy.core.SynchronousDispatcher.invoke(SynchronousDispatcher.java:452)\n\tat org.jboss.resteasy.core.Synchron...-org.opentest4j.AssertionFailedError
Details
org.opentest4j.AssertionFailedError: iterable contents differ at index [0], expected: <reply-1> but was: <{"details":"Error id 82df9a9a-1523-4e9b-9214-3b338f5eec96-1, org.jboss.resteasy.spi.UnhandledException: io.smallrye.reactive.messaging.kafka.reply.KafkaRequestReplyTimeoutException: Timeout waiting for a reply for request with correlation ID: 0403eb21-8fd7-4190-b2b9-9f6d2c83dd29","stack":"org.jboss.resteasy.spi.UnhandledException: io.smallrye.reactive.messaging.kafka.reply.KafkaRequestReplyTimeoutException: Timeout waiting for a reply for request with correlation ID: 0403eb21-8fd7-4190-b2b9-9f6d2c83dd29\n\tat org.jboss.resteasy.core.ExceptionHandler.handleApplicationException(ExceptionHandler.java:107)\n\tat org.jboss.resteasy.core.ExceptionHandler.handleException(ExceptionHandler.java:344)\n\tat org.jboss.resteasy.core.SynchronousDispatcher.writeException(SynchronousDispatcher.java:205)\n\tat org.jboss.resteasy.core.SynchronousDispatcher.invoke(SynchronousDispatcher.java:452)\n...

Definition of Done (taken from the WG proposal)
@ConsumeEventand Vert.x event bus to the new APINote
The extension does not even require the Vert.x dependency unless the
NON_BLOCKINGexecution model is used by a receiver.Missing pieces
Context propagation
The plan is to suppot propagation of security identity and OTel tracing stuff. It should be implemented with the
io.quarkus.signals.spi.ReceiverInterceptorandio.quarkus.signals.spi.SignalMetadataEnricherSPI in relevant extensions. However, we believe that a receiver notification should always run on a new CDI request context -- a new CDI request is activated for each receiver notification. There is no plan to propagate some other contexts.Relationship between the new API, CDI events, and in-memory Reactive Messaging
So far the documentation clearly express that receiver resolution is inspired by CDI, emission modes are inspired by the Vert.x EventBus, and the execution model is following Quarkus conventions.
Migration guide from
@ConsumeEventand Vert.x event busThis is out of scope while in an experimental phase.
Unfinished parts
The extension should be feature-complete. However, there are parts where we need some feedback. More specifically, the following topics are not considered a "solved problem".
Signal emission methods
Currently, common-case methods have the short names, and the
Unisuffix marks the reactive variants. For example,void Signal#publish(T)andUni<Void> Signal#publishUni(T)(note thatAsyncsuffix makes no sense because signals are always send asynchronously). It's not perfect but we want to keep the reactive variants and we think that short names for reactive variants are not convenient for users, i.e.Uni<Void> Signal#publish(T)andvoid Signal#publishAndForget(T).Resolution - the only difference from the CDI specification
In CDI, an observer method that declares no qualifiers receives an empty set of observed qualifiers, and as a result, is notified of any event that matches the observed type, i.e.
void onAnyFoo(@Observes Foo) {}is notified even for@Inject @SomeQualified Event<Foo>. In Signals, a receiver which declares no qualifier has exactly one qualifier --@Default, as a result, you need to use something likevoid onAnyFoo(@Receives @Any Foo) {}to be notified even for@Inject @SomeQualified Signal<Foo>. My personal opinion is that the Signals approach is more aligned with CDI injection rules (@Inject Fooalso results in@Inject @Default Foo) and also reads more naturally ("notify on any Foo"). @manovotn correctly points out that it is not compatible with CDI and users might get confused. It's important to note that Signals have a completely different execution model anyway (compared to CDI). So I personally think that users will be more confused by other differences, e.g. where are transactional observers, where is ordering, why is the receiver executed on a different request context, etc. And that's fine because Signal != CDI events.API history and evolution
The original POC can be found in this repository: https://github.com/mkouba/quarkus-signals