20
20
import org .jembi .jempi .shared .serdes .JsonPojoSerializer ;
21
21
import org .jetbrains .annotations .NotNull ;
22
22
23
+ import java .time .Duration ;
23
24
import java .util .List ;
24
25
import java .util .Properties ;
25
26
import java .util .concurrent .ExecutionException ;
@@ -45,24 +46,30 @@ public static SPInteractions create(final String topic_) {
45
46
return new SPInteractions (topic_ );
46
47
}
47
48
49
+ private void linkPatientProcess (final ActorSystem <Void > system , final ActorRef <BackEnd .Request > backEnd , final String key , final InteractionEnvelop interactionEnvelop ) {
50
+ final var completableFuture = Ask .runStartEndHooks (system , backEnd , key , interactionEnvelop ).toCompletableFuture ();
51
+ try {
52
+ List <MpiGeneralError > hookErrors = completableFuture .get (65 , TimeUnit .SECONDS ).hooksResults ();
53
+ if (!hookErrors .isEmpty ()) {
54
+ LOGGER .error (hookErrors );
55
+ }
56
+ } catch (InterruptedException | ExecutionException | TimeoutException ex ) {
57
+ LOGGER .error (ex .getLocalizedMessage (), ex );
58
+ this .closingLinkingStream ();
59
+ }
60
+ }
48
61
private void linkPatient (
49
62
final ActorSystem <Void > system ,
50
63
final ActorRef <BackEnd .Request > backEnd ,
51
64
final String key ,
52
65
final InteractionEnvelop interactionEnvelop ) {
53
66
54
- if (interactionEnvelop .contentType () == InteractionEnvelop .ContentType .BATCH_START_SENTINEL
55
- || interactionEnvelop .contentType () == BATCH_END_SENTINEL ) {
56
- final var completableFuture = Ask .runStartEndHooks (system , backEnd , key , interactionEnvelop ).toCompletableFuture ();
57
- try {
58
- List <MpiGeneralError > hookErrors = completableFuture .get (65 , TimeUnit .SECONDS ).hooksResults ();
59
- if (!hookErrors .isEmpty ()) {
60
- LOGGER .error (hookErrors );
61
- }
62
- } catch (InterruptedException | ExecutionException | TimeoutException ex ) {
63
- LOGGER .error (ex .getLocalizedMessage (), ex );
64
- this .closingLinkingStream ();
65
- }
67
+ if (interactionEnvelop .contentType () == InteractionEnvelop .ContentType .BATCH_START_SENTINEL ) {
68
+ LOGGER .info (String .format ("SPInteractions Stream Processor -> Starting linking for tag '%s'" , interactionEnvelop .tag ()));
69
+ linkPatientProcess (system , backEnd , key , interactionEnvelop );
70
+ } else if (interactionEnvelop .contentType () == BATCH_END_SENTINEL ) {
71
+ LOGGER .info (String .format ("SPInteractions Stream Processor -> Ended linking for tag '%s'" , interactionEnvelop .tag ()));
72
+ linkPatientProcess (system , backEnd , key , interactionEnvelop );
66
73
}
67
74
68
75
if (interactionEnvelop .contentType () != BATCH_INTERACTION ) {
@@ -110,10 +117,11 @@ private StreamsBuilder getMatchingStream(final ActorSystem<Void> system, final A
110
117
new JsonPojoDeserializer <>(InteractionEnvelop .class ));
111
118
final StreamsBuilder streamsBuilder = new StreamsBuilder ();
112
119
final KStream <String , InteractionEnvelop > matchStream =
113
- streamsBuilder .stream (GlobalConstants .TOPIC_NOTIFICATIONS , Consumed .with (stringSerde , interactionEnvelopSerde ));
120
+ streamsBuilder .stream (GlobalConstants .TOPIC_INTERACTION_LINKER_MATCHING , Consumed .with (stringSerde , interactionEnvelopSerde ));
114
121
matchStream .foreach ((key , matchEnvelop ) -> {
115
122
matchPatient (system , backEnd , key , matchEnvelop );
116
123
if (matchEnvelop .contentType () == BATCH_END_SENTINEL ) {
124
+ LOGGER .info (String .format ("SPInteractions Stream Processor -> Ended matching for tag '%s'" , matchEnvelop .tag ()));
117
125
this .closingMatchingStream ();
118
126
}
119
127
});
@@ -130,9 +138,8 @@ private StreamsBuilder getLinkingStream(final ActorSystem<Void> system, final Ac
130
138
interactionStream .foreach ((key , interactionEnvelop ) -> {
131
139
linkPatient (system , backEnd , key , interactionEnvelop );
132
140
if (!CustomMU .SEND_INTERACTIONS_TO_EM && interactionEnvelop .contentType () == BATCH_END_SENTINEL ) {
133
- LOGGER .info ("SPInteractions Stream Processor -> Starting matching processor" );
141
+ LOGGER .info (String . format ( "SPInteractions Stream Processor -> Starting matching for tag '%s'" , interactionEnvelop . tag ()) );
134
142
matchingStream .start ();
135
- this .closingLinkingStream ();
136
143
}
137
144
});
138
145
return streamsBuilder ;
@@ -142,9 +149,8 @@ public void open(
142
149
final ActorSystem <Void > system ,
143
150
final ActorRef <BackEnd .Request > backEnd ) {
144
151
LOGGER .info ("SPInteractions Stream Processor" );
145
- final Properties props = loadConfig ();
146
- matchingEnvelopeKafkaStream = new KafkaStreams (getMatchingStream (system , backEnd ).build (), props );
147
- interactionEnvelopKafkaStreams = new KafkaStreams (getLinkingStream (system , backEnd , matchingEnvelopeKafkaStream ).build (), props );
152
+ matchingEnvelopeKafkaStream = new KafkaStreams (getMatchingStream (system , backEnd ).build (), loadConfig (topic ));
153
+ interactionEnvelopKafkaStreams = new KafkaStreams (getLinkingStream (system , backEnd , matchingEnvelopeKafkaStream ).build (), loadConfig (GlobalConstants .TOPIC_INTERACTION_LINKER_MATCHING ));
148
154
interactionEnvelopKafkaStreams .cleanUp ();
149
155
LOGGER .info ("SPInteractions Stream Processor -> Starting linking processor" );
150
156
interactionEnvelopKafkaStreams .start ();
@@ -158,13 +164,13 @@ private void closingLinkingStream() {
158
164
159
165
private void closingMatchingStream () {
160
166
LOGGER .info ("SPInteractions Stream Processor -> Closing matching processor" );
161
- interactionEnvelopKafkaStreams .close (new KafkaStreams .CloseOptions ().leaveGroup (true ));
167
+ matchingEnvelopeKafkaStream .close (new KafkaStreams .CloseOptions ().leaveGroup (true ). timeout ( Duration . ofSeconds ( 2 ) ));
162
168
}
163
169
164
- private Properties loadConfig () {
170
+ private Properties loadConfig (final String inTopic ) {
165
171
final Properties props = new Properties ();
166
172
props .put (StreamsConfig .BOOTSTRAP_SERVERS_CONFIG , AppConfig .KAFKA_BOOTSTRAP_SERVERS );
167
- props .put (StreamsConfig .APPLICATION_ID_CONFIG , AppConfig .KAFKA_APPLICATION_ID_INTERACTIONS + topic );
173
+ props .put (StreamsConfig .APPLICATION_ID_CONFIG , AppConfig .KAFKA_APPLICATION_ID_INTERACTIONS + inTopic );
168
174
return props ;
169
175
}
170
176
0 commit comments