Skip to content

Commit 0091579

Browse files
committed
Sending matching topic to kafka
1 parent b5e1da5 commit 0091579

File tree

4 files changed

+38
-8
lines changed

4 files changed

+38
-8
lines changed

JeMPI_Apps/JeMPI_LibShared/src/main/java/org/jembi/jempi/shared/models/GlobalConstants.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ public final class GlobalConstants {
77
public static final String TOPIC_INTERACTION_PROCESSOR_CONTROLLER = "JeMPI-interaction-processor-controller";
88
public static final String TOPIC_INTERACTION_EM = "JeMPI-interaction-em";
99
public static final String TOPIC_INTERACTION_LINKER = "JeMPI-interaction-linker";
10+
public static final String TOPIC_INTERACTION_LINKER_MATCHING = "JeMPI-interaction-linker-matching";
1011
public static final String TOPIC_MU_CONTROLLER = "JeMPI-mu-controller";
1112
public static final String TOPIC_MU_LINKER = "JeMPI-mu-linker";
1213
public static final String TOPIC_AUDIT_TRAIL = "JeMPI-audit-trail";

JeMPI_Apps/JeMPI_Linker/src/main/java/org/jembi/jempi/linker/SPInteractions.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.jembi.jempi.libmpi.MpiGeneralError;
1515
import org.jembi.jempi.linker.backend.BackEnd;
1616
import org.jembi.jempi.shared.models.CustomMU;
17+
import org.jembi.jempi.shared.models.GlobalConstants;
1718
import org.jembi.jempi.shared.models.InteractionEnvelop;
1819
import org.jembi.jempi.shared.serdes.JsonPojoDeserializer;
1920
import org.jembi.jempi.shared.serdes.JsonPojoSerializer;
@@ -52,7 +53,6 @@ private void linkPatient(
5253

5354
if (interactionEnvelop.contentType() == InteractionEnvelop.ContentType.BATCH_START_SENTINEL
5455
|| interactionEnvelop.contentType() == BATCH_END_SENTINEL) {
55-
forwardtoMatching
5656
final var completableFuture = Ask.runStartEndHooks(system, backEnd, key, interactionEnvelop).toCompletableFuture();
5757
try {
5858
List<MpiGeneralError> hookErrors = completableFuture.get(65, TimeUnit.SECONDS).hooksResults();
@@ -110,7 +110,7 @@ private StreamsBuilder getMatchingStream(final ActorSystem<Void> system, final A
110110
new JsonPojoDeserializer<>(InteractionEnvelop.class));
111111
final StreamsBuilder streamsBuilder = new StreamsBuilder();
112112
final KStream<String, InteractionEnvelop> matchStream =
113-
streamsBuilder.stream(topic, Consumed.with(stringSerde, interactionEnvelopSerde));
113+
streamsBuilder.stream(GlobalConstants.TOPIC_NOTIFICATIONS, Consumed.with(stringSerde, interactionEnvelopSerde));
114114
matchStream.foreach((key, matchEnvelop) -> {
115115
matchPatient(system, backEnd, key, matchEnvelop);
116116
if (matchEnvelop.contentType() == BATCH_END_SENTINEL) {

JeMPI_Apps/JeMPI_Linker/src/main/java/org/jembi/jempi/linker/backend/BackEnd.java

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.List;
2626
import java.util.UUID;
2727
import java.util.concurrent.CompletableFuture;
28+
import java.util.concurrent.ExecutionException;
2829
import java.util.concurrent.Executor;
2930
import java.util.stream.Collectors;
3031

@@ -37,6 +38,8 @@ public final class BackEnd extends AbstractBehavior<BackEnd.Request> {
3738
private static final Logger LOGGER = LogManager.getLogger(BackEnd.class);
3839
private static final String SINGLE_TIMER_TIMEOUT_KEY = "SingleTimerTimeOutKey";
3940
static MyKafkaProducer<String, Notification> topicNotifications;
41+
42+
static MyKafkaProducer<String, InteractionEnvelop> matchingInteractions;
4043
private final Executor ec;
4144
private LibMPI libMPI = null;
4245

@@ -52,6 +55,12 @@ private BackEnd(final ActorContext<Request> context) {
5255
new StringSerializer(),
5356
new JsonPojoSerializer<>(),
5457
AppConfig.KAFKA_CLIENT_ID_NOTIFICATIONS);
58+
59+
matchingInteractions = new MyKafkaProducer<>(AppConfig.KAFKA_BOOTSTRAP_SERVERS,
60+
GlobalConstants.TOPIC_INTERACTION_LINKER_MATCHING,
61+
new StringSerializer(),
62+
new JsonPojoSerializer<>(),
63+
AppConfig.KAFKA_CLIENT_ID_NOTIFICATIONS);
5564
}
5665

5766
private BackEnd(
@@ -144,9 +153,11 @@ private Behavior<Request> crUpdateField(final CrUpdateFieldRequest req) {
144153
return Behaviors.same();
145154
}
146155

147-
private Behavior<Request> runStartStopHooks(final RunStartStopHooksRequest req) {
156+
private Behavior<Request> runStartStopHooks(final RunStartStopHooksRequest req) throws ExecutionException, InterruptedException {
148157
List<MpiGeneralError> hookRunErrors = List.of();
149158

159+
matchingInteractions.produceSync(req.key, req.batchInteraction);
160+
150161
if (req.batchInteraction.contentType() == BATCH_START_SENTINEL) {
151162
hookRunErrors = libMPI.beforeLinkingHook();
152163
} else if (req.batchInteraction.contentType() == BATCH_END_SENTINEL) {
@@ -172,7 +183,8 @@ private Behavior<Request> syncLinkInteractionHandler(final SyncLinkInteractionRe
172183
request.link.matchThreshold() == null
173184
? AppConfig.LINKER_MATCH_THRESHOLD
174185
: request.link.matchThreshold(),
175-
request.link.stan());
186+
request.link.stan(),
187+
null);
176188
request.replyTo.tell(new SyncLinkInteractionResponse(request.link.stan(),
177189
listLinkInfo.isLeft()
178190
? listLinkInfo.getLeft()
@@ -183,20 +195,30 @@ private Behavior<Request> syncLinkInteractionHandler(final SyncLinkInteractionRe
183195
return Behaviors.same();
184196
}
185197

186-
private Behavior<Request> asyncLinkInteractionHandler(final AsyncLinkInteractionRequest req) {
198+
private Behavior<Request> asyncLinkInteractionHandler(final AsyncLinkInteractionRequest req) throws ExecutionException, InterruptedException {
187199
if (req.batchInteraction.contentType() != InteractionEnvelop.ContentType.BATCH_INTERACTION) {
188200
return Behaviors.withTimers(timers -> {
189201
timers.startSingleTimer(SINGLE_TIMER_TIMEOUT_KEY, TeaTimeRequest.INSTANCE, Duration.ofSeconds(5));
190202
req.replyTo.tell(new AsyncLinkInteractionResponse(null));
191203
return Behaviors.same();
192204
});
193205
}
206+
194207
final var linkInfo =
195208
LinkerDWH.linkInteraction(libMPI,
196209
req.batchInteraction.interaction(),
197210
null,
198211
AppConfig.LINKER_MATCH_THRESHOLD,
199-
req.batchInteraction.stan());
212+
req.batchInteraction.stan(),
213+
(final Interaction interaction) -> {
214+
try {
215+
BackEnd.matchingInteractions.produceSync(req.key, req.batchInteraction);
216+
} catch (ExecutionException e) {
217+
throw new RuntimeException(e);
218+
} catch (InterruptedException e) {
219+
throw new RuntimeException(e);
220+
}
221+
});
200222
if (linkInfo.isLeft()) {
201223
req.replyTo.tell(new AsyncLinkInteractionResponse(linkInfo.getLeft()));
202224
} else {

JeMPI_Apps/JeMPI_Linker/src/main/java/org/jembi/jempi/linker/backend/LinkerDWH.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import java.util.*;
1919
import java.util.concurrent.ExecutionException;
20+
import java.util.function.Consumer;
2021
import java.util.function.Function;
2122
import java.util.stream.Collectors;
2223
import java.util.stream.IntStream;
@@ -170,7 +171,8 @@ public static Either<LinkInfo, List<ExternalLinkCandidate>> linkInteraction(
170171
final Interaction interaction,
171172
final ExternalLinkRange externalLinkRange,
172173
final float matchThreshold_,
173-
final String envelopStan) {
174+
final String envelopStan,
175+
final Consumer<Interaction> forwarder) {
174176

175177
// if (LOGGER.isTraceEnabled()) {
176178
// LOGGER.trace("{}", envelopStan);
@@ -188,7 +190,12 @@ public static Either<LinkInfo, List<ExternalLinkCandidate>> linkInteraction(
188190
}
189191

190192
if (!CustomLinkerDeterministic.canApplyLinking(interaction.demographicData())) {
191-
sendKafkaTopic();
193+
if (forwarder != null) {
194+
forwarder.accept(interaction);
195+
return Either.right(List.of());
196+
} else {
197+
return matchInteraction(libMPI, interaction, externalLinkRange, matchThreshold_, envelopStan);
198+
}
192199
} else {
193200
LinkInfo linkInfo = null;
194201
final List<ExternalLinkCandidate> externalLinkCandidateList = new ArrayList<>();

0 commit comments

Comments
 (0)