2
2
3
3
import org .exploit .signalix .annotation .KafkaEvent ;
4
4
import org .exploit .signalix .annotation .KafkaEventListener ;
5
+ import org .exploit .signalix .core .KafkaEventManager ;
5
6
import org .exploit .signalix .event .listener .ConsumerEventListener ;
6
7
import org .exploit .signalix .event .listener .EventListener ;
7
8
import org .exploit .signalix .event .listener .ReflectedEventListener ;
8
- import org .exploit .signalix .core .KafkaEventManager ;
9
- import org .exploit .signalix .model .KafkaEventMeta ;
10
- import org .exploit .signalix .utils .EventObjectMapper ;
9
+ import org .exploit .signalix .exception .EventExecutionException ;
11
10
import org .exploit .signalix .manager .EventScope ;
12
11
import org .exploit .signalix .marker .Event ;
13
12
import org .exploit .signalix .marker .Listener ;
13
+ import org .exploit .signalix .model .KafkaEventMeta ;
14
+ import org .exploit .signalix .model .KafkaWiredEvent ;
14
15
import org .exploit .signalix .utils .EventConsumer ;
16
+ import org .exploit .signalix .utils .EventObjectMapper ;
15
17
import org .jetbrains .annotations .NotNull ;
16
18
17
19
import java .util .List ;
@@ -32,16 +34,6 @@ public KafkaEventScope(Map<String, String> properties) {
32
34
this (convertToProperties (properties ));
33
35
}
34
36
35
- @ Override
36
- public <T extends Event > void call (T event ) {
37
- var eventClass = event .getClass ();
38
-
39
- extractAnnotation (eventClass , KafkaEvent .class ).ifPresentOrElse (
40
- meta -> kafkaEventManager .call (event , new KafkaEventMeta (meta )),
41
- () -> innerCall (event )
42
- );
43
- }
44
-
45
37
public <T extends Event > void call (String topic , T event ) {
46
38
extractAnnotation (event .getClass (), KafkaEvent .class ).ifPresentOrElse (
47
39
annotation -> {
@@ -52,8 +44,19 @@ public <T extends Event> void call(String topic, T event) {
52
44
);
53
45
}
54
46
47
+ @ Override
48
+ public void call (Event event ) {
49
+ extractAnnotation (event .getClass (), KafkaEvent .class ).ifPresentOrElse (
50
+ annotation -> {
51
+ var meta = new KafkaEventMeta (annotation .topic (), annotation .partition (), List .of (annotation .headers ()));
52
+ call (meta , event );
53
+ },
54
+ () -> super .call (event )
55
+ );
56
+ }
57
+
55
58
public <T extends Event > void call (KafkaEventMeta meta , T event ) {
56
- kafkaEventManager . call ( event , meta );
59
+ eventLoop . callEvent ( new KafkaWiredEvent ( event , meta ) );
57
60
}
58
61
59
62
public <T extends Event > void innerCall (T event ) {
@@ -104,6 +107,19 @@ public void registerEventListener(EventListener listener) {
104
107
kafkaEventManager .getEventObjectMapper ().registerEvents (listener );
105
108
}
106
109
110
+ @ Override
111
+ protected void dispatchEvent (Event event ) {
112
+ if (event instanceof KafkaWiredEvent kafkaEvent ) {
113
+ try {
114
+ kafkaEventManager .call (kafkaEvent );
115
+ } catch (Exception e ) {
116
+ call (new EventExecutionException (e , kafkaEvent .event ()));
117
+ }
118
+ } else {
119
+ super .dispatchEvent (event );
120
+ }
121
+ }
122
+
107
123
@ Override
108
124
public void close () {
109
125
super .close ();
0 commit comments