From b5e1da5d1cb6fe8a84ceee729c05a9da295774ce Mon Sep 17 00:00:00 2001 From: ChidoW Date: Tue, 12 Mar 2024 12:06:08 +0200 Subject: [PATCH 1/5] Spliting the streams into two --- .../main/java/org/jembi/jempi/linker/Ask.java | 11 +++ .../jembi/jempi/linker/SPInteractions.java | 82 +++++++++++++++--- .../jembi/jempi/linker/backend/BackEnd.java | 36 ++++++++ .../jembi/jempi/linker/backend/LinkerDWH.java | 85 ++++++++++--------- 4 files changed, 162 insertions(+), 52 deletions(-) diff --git a/JeMPI_Apps/JeMPI_Linker/src/main/java/org/jembi/jempi/linker/Ask.java b/JeMPI_Apps/JeMPI_Linker/src/main/java/org/jembi/jempi/linker/Ask.java index e88dd54d0..79663ef60 100644 --- a/JeMPI_Apps/JeMPI_Linker/src/main/java/org/jembi/jempi/linker/Ask.java +++ b/JeMPI_Apps/JeMPI_Linker/src/main/java/org/jembi/jempi/linker/Ask.java @@ -111,6 +111,17 @@ static CompletionStage linkInteraction( actorSystem.scheduler()); } + static CompletionStage matchInteraction( + final ActorSystem actorSystem, + final ActorRef backEnd, + final String key, + final InteractionEnvelop batchInteraction) { + return AskPattern.ask(backEnd, + replyTo -> new BackEnd.AsyncMatchInteractionRequest(replyTo, key, batchInteraction), + java.time.Duration.ofSeconds(60), + actorSystem.scheduler()); + } + static CompletionStage runStartEndHooks( final ActorSystem actorSystem, final ActorRef backEnd, diff --git a/JeMPI_Apps/JeMPI_Linker/src/main/java/org/jembi/jempi/linker/SPInteractions.java b/JeMPI_Apps/JeMPI_Linker/src/main/java/org/jembi/jempi/linker/SPInteractions.java index 899feaeda..6f044d92b 100644 --- a/JeMPI_Apps/JeMPI_Linker/src/main/java/org/jembi/jempi/linker/SPInteractions.java +++ b/JeMPI_Apps/JeMPI_Linker/src/main/java/org/jembi/jempi/linker/SPInteractions.java @@ -17,6 +17,7 @@ import org.jembi.jempi.shared.models.InteractionEnvelop; import org.jembi.jempi.shared.serdes.JsonPojoDeserializer; import org.jembi.jempi.shared.serdes.JsonPojoSerializer; +import org.jetbrains.annotations.NotNull; import java.util.List; import java.util.Properties; @@ -32,6 +33,7 @@ public final class SPInteractions { private static final Logger LOGGER = LogManager.getLogger(SPInteractions.class); private final String topic; private KafkaStreams interactionEnvelopKafkaStreams; + private KafkaStreams matchingEnvelopeKafkaStream; private SPInteractions(final String topic_) { LOGGER.info("SPInteractions constructor"); @@ -50,6 +52,7 @@ private void linkPatient( if (interactionEnvelop.contentType() == InteractionEnvelop.ContentType.BATCH_START_SENTINEL || interactionEnvelop.contentType() == BATCH_END_SENTINEL) { + forwardtoMatching final var completableFuture = Ask.runStartEndHooks(system, backEnd, key, interactionEnvelop).toCompletableFuture(); try { List hookErrors = completableFuture.get(65, TimeUnit.SECONDS).hooksResults(); @@ -58,12 +61,11 @@ private void linkPatient( } } catch (InterruptedException | ExecutionException | TimeoutException ex) { LOGGER.error(ex.getLocalizedMessage(), ex); - this.close(); + this.closingLinkingStream(); } } if (interactionEnvelop.contentType() != BATCH_INTERACTION) { - return; } final var completableFuture = Ask.linkInteraction(system, backEnd, key, interactionEnvelop).toCompletableFuture(); @@ -74,36 +76,88 @@ private void linkPatient( } } catch (InterruptedException | ExecutionException | TimeoutException ex) { LOGGER.error(ex.getLocalizedMessage(), ex); - this.close(); + this.closingLinkingStream(); } } - public void open( - final ActorSystem system, - final ActorRef backEnd) { - LOGGER.info("SPInteractions Stream Processor"); - final Properties props = loadConfig(); + private void matchPatient( + final ActorSystem system, + final ActorRef backEnd, + final String key, + final InteractionEnvelop interactionEnvelop) { + + if (interactionEnvelop.contentType() != BATCH_INTERACTION) { + return; + } + final var completableFuture = Ask.matchInteraction(system, backEnd, key, interactionEnvelop).toCompletableFuture(); + try { + final var reply = completableFuture.get(65, TimeUnit.SECONDS); + if (reply.linkInfo() == null) { + LOGGER.error("BACK END RESPONSE(ERROR)"); + } + } catch (InterruptedException | ExecutionException | TimeoutException ex) { + LOGGER.error(ex.getLocalizedMessage(), ex); + this.closingMatchingStream(); + } + + } + + @NotNull + private StreamsBuilder getMatchingStream(final ActorSystem system, final ActorRef backEnd) { final var stringSerde = Serdes.String(); final var interactionEnvelopSerde = Serdes.serdeFrom(new JsonPojoSerializer<>(), - new JsonPojoDeserializer<>(InteractionEnvelop.class)); + new JsonPojoDeserializer<>(InteractionEnvelop.class)); + final StreamsBuilder streamsBuilder = new StreamsBuilder(); + final KStream matchStream = + streamsBuilder.stream(topic, Consumed.with(stringSerde, interactionEnvelopSerde)); + matchStream.foreach((key, matchEnvelop) -> { + matchPatient(system, backEnd, key, matchEnvelop); + if (matchEnvelop.contentType() == BATCH_END_SENTINEL) { + this.closingMatchingStream(); + } + }); + return streamsBuilder; + } + @NotNull + private StreamsBuilder getLinkingStream(final ActorSystem system, final ActorRef backEnd, final KafkaStreams matchingStream) { + final var stringSerde = Serdes.String(); + final var interactionEnvelopSerde = Serdes.serdeFrom(new JsonPojoSerializer<>(), + new JsonPojoDeserializer<>(InteractionEnvelop.class)); final StreamsBuilder streamsBuilder = new StreamsBuilder(); final KStream interactionStream = - streamsBuilder.stream(topic, Consumed.with(stringSerde, interactionEnvelopSerde)); + streamsBuilder.stream(topic, Consumed.with(stringSerde, interactionEnvelopSerde)); interactionStream.foreach((key, interactionEnvelop) -> { linkPatient(system, backEnd, key, interactionEnvelop); if (!CustomMU.SEND_INTERACTIONS_TO_EM && interactionEnvelop.contentType() == BATCH_END_SENTINEL) { - this.close(); + LOGGER.info("SPInteractions Stream Processor -> Starting matching processor"); + matchingStream.start(); + this.closingLinkingStream(); } }); - interactionEnvelopKafkaStreams = new KafkaStreams(streamsBuilder.build(), props); + return streamsBuilder; + } + + public void open( + final ActorSystem system, + final ActorRef backEnd) { + LOGGER.info("SPInteractions Stream Processor"); + final Properties props = loadConfig(); + matchingEnvelopeKafkaStream = new KafkaStreams(getMatchingStream(system, backEnd).build(), props); + interactionEnvelopKafkaStreams = new KafkaStreams(getLinkingStream(system, backEnd, matchingEnvelopeKafkaStream).build(), props); interactionEnvelopKafkaStreams.cleanUp(); + LOGGER.info("SPInteractions Stream Processor -> Starting linking processor"); interactionEnvelopKafkaStreams.start(); LOGGER.info("KafkaStreams started"); } - private void close() { - LOGGER.info("Stream closed"); + private void closingLinkingStream() { + LOGGER.info("SPInteractions Stream Processor -> Closing linking processor"); + interactionEnvelopKafkaStreams.close(new KafkaStreams.CloseOptions().leaveGroup(true)); + } + + private void closingMatchingStream() { + LOGGER.info("SPInteractions Stream Processor -> Closing matching processor"); interactionEnvelopKafkaStreams.close(new KafkaStreams.CloseOptions().leaveGroup(true)); } diff --git a/JeMPI_Apps/JeMPI_Linker/src/main/java/org/jembi/jempi/linker/backend/BackEnd.java b/JeMPI_Apps/JeMPI_Linker/src/main/java/org/jembi/jempi/linker/backend/BackEnd.java index c530891b3..30d519e7b 100644 --- a/JeMPI_Apps/JeMPI_Linker/src/main/java/org/jembi/jempi/linker/backend/BackEnd.java +++ b/JeMPI_Apps/JeMPI_Linker/src/main/java/org/jembi/jempi/linker/backend/BackEnd.java @@ -106,6 +106,7 @@ private void openMPI(final boolean useDGraph) { @Override public Receive createReceive() { return newReceiveBuilder().onMessage(AsyncLinkInteractionRequest.class, this::asyncLinkInteractionHandler) + .onMessage(AsyncMatchInteractionRequest.class, this::asyncMatchInteractionHandler) .onMessage(SyncLinkInteractionRequest.class, this::syncLinkInteractionHandler) // .onMessage(SyncLinkInteractionToGidRequest.class, this::syncLinkInteractionToGidHandler) .onMessage(CalculateScoresRequest.class, this::calculateScoresHandler) @@ -207,6 +208,32 @@ private Behavior asyncLinkInteractionHandler(final AsyncLinkInteraction }); } + private Behavior asyncMatchInteractionHandler(final AsyncMatchInteractionRequest req) { + if (req.batchInteraction.contentType() != InteractionEnvelop.ContentType.BATCH_INTERACTION) { + return Behaviors.withTimers(timers -> { + timers.startSingleTimer(SINGLE_TIMER_TIMEOUT_KEY, TeaTimeRequest.INSTANCE, Duration.ofSeconds(5)); + req.replyTo.tell(new AsyncMatchInteractionResponse(null)); + return Behaviors.same(); + }); + } + // to-do: Consider if LinkInfo is nevessary + final var linkInfo = + LinkerDWH.matchInteraction(libMPI, + req.batchInteraction.interaction(), + null, + AppConfig.LINKER_MATCH_THRESHOLD, + req.batchInteraction.stan()); + if (linkInfo.isLeft()) { + req.replyTo.tell(new AsyncMatchInteractionResponse(linkInfo.getLeft())); + } else { + req.replyTo.tell(new AsyncMatchInteractionResponse(null)); + } + return Behaviors.withTimers(timers -> { + timers.startSingleTimer(SINGLE_TIMER_TIMEOUT_KEY, TeaTimeRequest.INSTANCE, Duration.ofSeconds(10)); + return Behaviors.same(); + }); + } + /* private Behavior syncLinkInteractionToGidHandler(final SyncLinkInteractionToGidRequest request) { final LinkInfo linkInfo; @@ -334,6 +361,15 @@ public record AsyncLinkInteractionRequest( public record AsyncLinkInteractionResponse(LinkInfo linkInfo) implements Response { } + public record AsyncMatchInteractionRequest( + ActorRef replyTo, + String key, + InteractionEnvelop batchInteraction) implements Request { + } + + public record AsyncMatchInteractionResponse(LinkInfo linkInfo) implements Response { + } + public record RunStartStopHooksRequest( ActorRef replyTo, String key, diff --git a/JeMPI_Apps/JeMPI_Linker/src/main/java/org/jembi/jempi/linker/backend/LinkerDWH.java b/JeMPI_Apps/JeMPI_Linker/src/main/java/org/jembi/jempi/linker/backend/LinkerDWH.java index 8c9be2157..e5e68642d 100644 --- a/JeMPI_Apps/JeMPI_Linker/src/main/java/org/jembi/jempi/linker/backend/LinkerDWH.java +++ b/JeMPI_Apps/JeMPI_Linker/src/main/java/org/jembi/jempi/linker/backend/LinkerDWH.java @@ -118,6 +118,52 @@ static void helperUpdateInteractionsScore( }); } + public static Either> matchInteraction( + final LibMPI libMPI, + final Interaction interaction, + final ExternalLinkRange externalLinkRange, + final float matchThreshold_, + final String envelopStan) { + libMPI.startTransaction(); + if (CustomLinkerDeterministic.DETERMINISTIC_DO_MATCHING || CustomLinkerProbabilistic.PROBABILISTIC_DO_MATCHING) { + final var candidates = libMPI.findMatchCandidates(interaction.demographicData()); + LOGGER.debug("Match Candidates {} ", candidates.size()); + if (candidates.isEmpty()) { + try { + final var i = OBJECT_MAPPER.writeValueAsString(interaction.demographicData()); + final var f = """ + MATCH NOTIFICATION NO CANDIDATE + {}"""; + LOGGER.info(f, i); + } catch (JsonProcessingException e) { + LOGGER.error(e.getLocalizedMessage(), e); + } + } else { + final var workCandidate = candidates.parallelStream() + .unordered() + .map(candidate -> new WorkCandidate(candidate, + LinkerUtils.calcNormalizedScore(candidate.demographicData(), + interaction.demographicData()))) + .sorted((o1, o2) -> Float.compare(o2.score(), o1.score())) + .collect(Collectors.toCollection(ArrayList::new)) + .getFirst(); + try { + final var i = OBJECT_MAPPER.writeValueAsString(interaction.demographicData()); + final var g = OBJECT_MAPPER.writeValueAsString(workCandidate.goldenRecord().demographicData()); + final var f = """ + MATCH NOTIFICATION + {} + {}"""; + LOGGER.info(f, i, g); + } catch (JsonProcessingException e) { + LOGGER.error(e.getLocalizedMessage(), e); + } + } + } + libMPI.closeTransaction(); + return Either.right(List.of()); + } + // + public static Either> linkInteraction( final LibMPI libMPI, @@ -142,44 +188,7 @@ public static Either> linkInteraction( } if (!CustomLinkerDeterministic.canApplyLinking(interaction.demographicData())) { - libMPI.startTransaction(); - if (CustomLinkerDeterministic.DETERMINISTIC_DO_MATCHING || CustomLinkerProbabilistic.PROBABILISTIC_DO_MATCHING) { - final var candidates = libMPI.findMatchCandidates(interaction.demographicData()); - LOGGER.debug("Match Candidates {} ", candidates.size()); - if (candidates.isEmpty()) { - try { - final var i = OBJECT_MAPPER.writeValueAsString(interaction.demographicData()); - final var f = """ - MATCH NOTIFICATION NO CANDIDATE - {}"""; - LOGGER.info(f, i); - } catch (JsonProcessingException e) { - LOGGER.error(e.getLocalizedMessage(), e); - } - } else { - final var workCandidate = candidates.parallelStream() - .unordered() - .map(candidate -> new WorkCandidate(candidate, - LinkerUtils.calcNormalizedScore(candidate.demographicData(), - interaction.demographicData()))) - .sorted((o1, o2) -> Float.compare(o2.score(), o1.score())) - .collect(Collectors.toCollection(ArrayList::new)) - .getFirst(); - try { - final var i = OBJECT_MAPPER.writeValueAsString(interaction.demographicData()); - final var g = OBJECT_MAPPER.writeValueAsString(workCandidate.goldenRecord().demographicData()); - final var f = """ - MATCH NOTIFICATION - {} - {}"""; - LOGGER.info(f, i, g); - } catch (JsonProcessingException e) { - LOGGER.error(e.getLocalizedMessage(), e); - } - } - } - libMPI.closeTransaction(); - return Either.right(List.of()); + sendKafkaTopic(); } else { LinkInfo linkInfo = null; final List externalLinkCandidateList = new ArrayList<>(); From 0091579ddef1d64806c823470d43d548d8792dc8 Mon Sep 17 00:00:00 2001 From: ChidoW Date: Thu, 14 Mar 2024 00:18:15 +0200 Subject: [PATCH 2/5] Sending matching topic to kafka --- .../jempi/shared/models/GlobalConstants.java | 1 + .../jembi/jempi/linker/SPInteractions.java | 4 +-- .../jembi/jempi/linker/backend/BackEnd.java | 30 ++++++++++++++++--- .../jembi/jempi/linker/backend/LinkerDWH.java | 11 +++++-- 4 files changed, 38 insertions(+), 8 deletions(-) diff --git a/JeMPI_Apps/JeMPI_LibShared/src/main/java/org/jembi/jempi/shared/models/GlobalConstants.java b/JeMPI_Apps/JeMPI_LibShared/src/main/java/org/jembi/jempi/shared/models/GlobalConstants.java index e8903c317..eab098e69 100644 --- a/JeMPI_Apps/JeMPI_LibShared/src/main/java/org/jembi/jempi/shared/models/GlobalConstants.java +++ b/JeMPI_Apps/JeMPI_LibShared/src/main/java/org/jembi/jempi/shared/models/GlobalConstants.java @@ -7,6 +7,7 @@ public final class GlobalConstants { public static final String TOPIC_INTERACTION_PROCESSOR_CONTROLLER = "JeMPI-interaction-processor-controller"; public static final String TOPIC_INTERACTION_EM = "JeMPI-interaction-em"; public static final String TOPIC_INTERACTION_LINKER = "JeMPI-interaction-linker"; + public static final String TOPIC_INTERACTION_LINKER_MATCHING = "JeMPI-interaction-linker-matching"; public static final String TOPIC_MU_CONTROLLER = "JeMPI-mu-controller"; public static final String TOPIC_MU_LINKER = "JeMPI-mu-linker"; public static final String TOPIC_AUDIT_TRAIL = "JeMPI-audit-trail"; diff --git a/JeMPI_Apps/JeMPI_Linker/src/main/java/org/jembi/jempi/linker/SPInteractions.java b/JeMPI_Apps/JeMPI_Linker/src/main/java/org/jembi/jempi/linker/SPInteractions.java index 6f044d92b..55e368c42 100644 --- a/JeMPI_Apps/JeMPI_Linker/src/main/java/org/jembi/jempi/linker/SPInteractions.java +++ b/JeMPI_Apps/JeMPI_Linker/src/main/java/org/jembi/jempi/linker/SPInteractions.java @@ -14,6 +14,7 @@ import org.jembi.jempi.libmpi.MpiGeneralError; import org.jembi.jempi.linker.backend.BackEnd; import org.jembi.jempi.shared.models.CustomMU; +import org.jembi.jempi.shared.models.GlobalConstants; import org.jembi.jempi.shared.models.InteractionEnvelop; import org.jembi.jempi.shared.serdes.JsonPojoDeserializer; import org.jembi.jempi.shared.serdes.JsonPojoSerializer; @@ -52,7 +53,6 @@ private void linkPatient( if (interactionEnvelop.contentType() == InteractionEnvelop.ContentType.BATCH_START_SENTINEL || interactionEnvelop.contentType() == BATCH_END_SENTINEL) { - forwardtoMatching final var completableFuture = Ask.runStartEndHooks(system, backEnd, key, interactionEnvelop).toCompletableFuture(); try { List hookErrors = completableFuture.get(65, TimeUnit.SECONDS).hooksResults(); @@ -110,7 +110,7 @@ private StreamsBuilder getMatchingStream(final ActorSystem system, final A new JsonPojoDeserializer<>(InteractionEnvelop.class)); final StreamsBuilder streamsBuilder = new StreamsBuilder(); final KStream matchStream = - streamsBuilder.stream(topic, Consumed.with(stringSerde, interactionEnvelopSerde)); + streamsBuilder.stream(GlobalConstants.TOPIC_NOTIFICATIONS, Consumed.with(stringSerde, interactionEnvelopSerde)); matchStream.foreach((key, matchEnvelop) -> { matchPatient(system, backEnd, key, matchEnvelop); if (matchEnvelop.contentType() == BATCH_END_SENTINEL) { diff --git a/JeMPI_Apps/JeMPI_Linker/src/main/java/org/jembi/jempi/linker/backend/BackEnd.java b/JeMPI_Apps/JeMPI_Linker/src/main/java/org/jembi/jempi/linker/backend/BackEnd.java index 30d519e7b..8a2c38091 100644 --- a/JeMPI_Apps/JeMPI_Linker/src/main/java/org/jembi/jempi/linker/backend/BackEnd.java +++ b/JeMPI_Apps/JeMPI_Linker/src/main/java/org/jembi/jempi/linker/backend/BackEnd.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.stream.Collectors; @@ -37,6 +38,8 @@ public final class BackEnd extends AbstractBehavior { private static final Logger LOGGER = LogManager.getLogger(BackEnd.class); private static final String SINGLE_TIMER_TIMEOUT_KEY = "SingleTimerTimeOutKey"; static MyKafkaProducer topicNotifications; + + static MyKafkaProducer matchingInteractions; private final Executor ec; private LibMPI libMPI = null; @@ -52,6 +55,12 @@ private BackEnd(final ActorContext context) { new StringSerializer(), new JsonPojoSerializer<>(), AppConfig.KAFKA_CLIENT_ID_NOTIFICATIONS); + + matchingInteractions = new MyKafkaProducer<>(AppConfig.KAFKA_BOOTSTRAP_SERVERS, + GlobalConstants.TOPIC_INTERACTION_LINKER_MATCHING, + new StringSerializer(), + new JsonPojoSerializer<>(), + AppConfig.KAFKA_CLIENT_ID_NOTIFICATIONS); } private BackEnd( @@ -144,9 +153,11 @@ private Behavior crUpdateField(final CrUpdateFieldRequest req) { return Behaviors.same(); } - private Behavior runStartStopHooks(final RunStartStopHooksRequest req) { + private Behavior runStartStopHooks(final RunStartStopHooksRequest req) throws ExecutionException, InterruptedException { List hookRunErrors = List.of(); + matchingInteractions.produceSync(req.key, req.batchInteraction); + if (req.batchInteraction.contentType() == BATCH_START_SENTINEL) { hookRunErrors = libMPI.beforeLinkingHook(); } else if (req.batchInteraction.contentType() == BATCH_END_SENTINEL) { @@ -172,7 +183,8 @@ private Behavior syncLinkInteractionHandler(final SyncLinkInteractionRe request.link.matchThreshold() == null ? AppConfig.LINKER_MATCH_THRESHOLD : request.link.matchThreshold(), - request.link.stan()); + request.link.stan(), + null); request.replyTo.tell(new SyncLinkInteractionResponse(request.link.stan(), listLinkInfo.isLeft() ? listLinkInfo.getLeft() @@ -183,7 +195,7 @@ private Behavior syncLinkInteractionHandler(final SyncLinkInteractionRe return Behaviors.same(); } - private Behavior asyncLinkInteractionHandler(final AsyncLinkInteractionRequest req) { + private Behavior asyncLinkInteractionHandler(final AsyncLinkInteractionRequest req) throws ExecutionException, InterruptedException { if (req.batchInteraction.contentType() != InteractionEnvelop.ContentType.BATCH_INTERACTION) { return Behaviors.withTimers(timers -> { timers.startSingleTimer(SINGLE_TIMER_TIMEOUT_KEY, TeaTimeRequest.INSTANCE, Duration.ofSeconds(5)); @@ -191,12 +203,22 @@ private Behavior asyncLinkInteractionHandler(final AsyncLinkInteraction return Behaviors.same(); }); } + final var linkInfo = LinkerDWH.linkInteraction(libMPI, req.batchInteraction.interaction(), null, AppConfig.LINKER_MATCH_THRESHOLD, - req.batchInteraction.stan()); + req.batchInteraction.stan(), + (final Interaction interaction) -> { + try { + BackEnd.matchingInteractions.produceSync(req.key, req.batchInteraction); + } catch (ExecutionException e) { + throw new RuntimeException(e); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); if (linkInfo.isLeft()) { req.replyTo.tell(new AsyncLinkInteractionResponse(linkInfo.getLeft())); } else { diff --git a/JeMPI_Apps/JeMPI_Linker/src/main/java/org/jembi/jempi/linker/backend/LinkerDWH.java b/JeMPI_Apps/JeMPI_Linker/src/main/java/org/jembi/jempi/linker/backend/LinkerDWH.java index e5e68642d..a0733f60c 100644 --- a/JeMPI_Apps/JeMPI_Linker/src/main/java/org/jembi/jempi/linker/backend/LinkerDWH.java +++ b/JeMPI_Apps/JeMPI_Linker/src/main/java/org/jembi/jempi/linker/backend/LinkerDWH.java @@ -17,6 +17,7 @@ import java.util.*; import java.util.concurrent.ExecutionException; +import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -170,7 +171,8 @@ public static Either> linkInteraction( final Interaction interaction, final ExternalLinkRange externalLinkRange, final float matchThreshold_, - final String envelopStan) { + final String envelopStan, + final Consumer forwarder) { // if (LOGGER.isTraceEnabled()) { // LOGGER.trace("{}", envelopStan); @@ -188,7 +190,12 @@ public static Either> linkInteraction( } if (!CustomLinkerDeterministic.canApplyLinking(interaction.demographicData())) { - sendKafkaTopic(); + if (forwarder != null) { + forwarder.accept(interaction); + return Either.right(List.of()); + } else { + return matchInteraction(libMPI, interaction, externalLinkRange, matchThreshold_, envelopStan); + } } else { LinkInfo linkInfo = null; final List externalLinkCandidateList = new ArrayList<>(); From 48e825628d6759e48bdbcee499e4177503a825da Mon Sep 17 00:00:00 2001 From: ChidoW Date: Thu, 14 Mar 2024 02:17:35 +0200 Subject: [PATCH 3/5] Small updates to the inteactions --- .../jembi/jempi/linker/SPInteractions.java | 48 +++++++++++-------- 1 file changed, 27 insertions(+), 21 deletions(-) diff --git a/JeMPI_Apps/JeMPI_Linker/src/main/java/org/jembi/jempi/linker/SPInteractions.java b/JeMPI_Apps/JeMPI_Linker/src/main/java/org/jembi/jempi/linker/SPInteractions.java index 55e368c42..7d2daeb7c 100644 --- a/JeMPI_Apps/JeMPI_Linker/src/main/java/org/jembi/jempi/linker/SPInteractions.java +++ b/JeMPI_Apps/JeMPI_Linker/src/main/java/org/jembi/jempi/linker/SPInteractions.java @@ -20,6 +20,7 @@ import org.jembi.jempi.shared.serdes.JsonPojoSerializer; import org.jetbrains.annotations.NotNull; +import java.time.Duration; import java.util.List; import java.util.Properties; import java.util.concurrent.ExecutionException; @@ -45,24 +46,30 @@ public static SPInteractions create(final String topic_) { return new SPInteractions(topic_); } + private void linkPatientProcess(final ActorSystem system, final ActorRef backEnd, final String key, final InteractionEnvelop interactionEnvelop) { + final var completableFuture = Ask.runStartEndHooks(system, backEnd, key, interactionEnvelop).toCompletableFuture(); + try { + List hookErrors = completableFuture.get(65, TimeUnit.SECONDS).hooksResults(); + if (!hookErrors.isEmpty()) { + LOGGER.error(hookErrors); + } + } catch (InterruptedException | ExecutionException | TimeoutException ex) { + LOGGER.error(ex.getLocalizedMessage(), ex); + this.closingLinkingStream(); + } + } private void linkPatient( final ActorSystem system, final ActorRef backEnd, final String key, final InteractionEnvelop interactionEnvelop) { - if (interactionEnvelop.contentType() == InteractionEnvelop.ContentType.BATCH_START_SENTINEL - || interactionEnvelop.contentType() == BATCH_END_SENTINEL) { - final var completableFuture = Ask.runStartEndHooks(system, backEnd, key, interactionEnvelop).toCompletableFuture(); - try { - List hookErrors = completableFuture.get(65, TimeUnit.SECONDS).hooksResults(); - if (!hookErrors.isEmpty()) { - LOGGER.error(hookErrors); - } - } catch (InterruptedException | ExecutionException | TimeoutException ex) { - LOGGER.error(ex.getLocalizedMessage(), ex); - this.closingLinkingStream(); - } + if (interactionEnvelop.contentType() == InteractionEnvelop.ContentType.BATCH_START_SENTINEL) { + LOGGER.info(String.format("SPInteractions Stream Processor -> Starting linking for tag '%s'", interactionEnvelop.tag())); + linkPatientProcess(system, backEnd, key, interactionEnvelop); + } else if (interactionEnvelop.contentType() == BATCH_END_SENTINEL) { + LOGGER.info(String.format("SPInteractions Stream Processor -> Ended linking for tag '%s'", interactionEnvelop.tag())); + linkPatientProcess(system, backEnd, key, interactionEnvelop); } if (interactionEnvelop.contentType() != BATCH_INTERACTION) { @@ -110,10 +117,11 @@ private StreamsBuilder getMatchingStream(final ActorSystem system, final A new JsonPojoDeserializer<>(InteractionEnvelop.class)); final StreamsBuilder streamsBuilder = new StreamsBuilder(); final KStream matchStream = - streamsBuilder.stream(GlobalConstants.TOPIC_NOTIFICATIONS, Consumed.with(stringSerde, interactionEnvelopSerde)); + streamsBuilder.stream(GlobalConstants.TOPIC_INTERACTION_LINKER_MATCHING, Consumed.with(stringSerde, interactionEnvelopSerde)); matchStream.foreach((key, matchEnvelop) -> { matchPatient(system, backEnd, key, matchEnvelop); if (matchEnvelop.contentType() == BATCH_END_SENTINEL) { + LOGGER.info(String.format("SPInteractions Stream Processor -> Ended matching for tag '%s'", matchEnvelop.tag())); this.closingMatchingStream(); } }); @@ -130,9 +138,8 @@ private StreamsBuilder getLinkingStream(final ActorSystem system, final Ac interactionStream.foreach((key, interactionEnvelop) -> { linkPatient(system, backEnd, key, interactionEnvelop); if (!CustomMU.SEND_INTERACTIONS_TO_EM && interactionEnvelop.contentType() == BATCH_END_SENTINEL) { - LOGGER.info("SPInteractions Stream Processor -> Starting matching processor"); + LOGGER.info(String.format("SPInteractions Stream Processor -> Starting matching for tag '%s'", interactionEnvelop.tag())); matchingStream.start(); - this.closingLinkingStream(); } }); return streamsBuilder; @@ -142,9 +149,8 @@ public void open( final ActorSystem system, final ActorRef backEnd) { LOGGER.info("SPInteractions Stream Processor"); - final Properties props = loadConfig(); - matchingEnvelopeKafkaStream = new KafkaStreams(getMatchingStream(system, backEnd).build(), props); - interactionEnvelopKafkaStreams = new KafkaStreams(getLinkingStream(system, backEnd, matchingEnvelopeKafkaStream).build(), props); + matchingEnvelopeKafkaStream = new KafkaStreams(getMatchingStream(system, backEnd).build(), loadConfig(topic)); + interactionEnvelopKafkaStreams = new KafkaStreams(getLinkingStream(system, backEnd, matchingEnvelopeKafkaStream).build(), loadConfig(GlobalConstants.TOPIC_INTERACTION_LINKER_MATCHING)); interactionEnvelopKafkaStreams.cleanUp(); LOGGER.info("SPInteractions Stream Processor -> Starting linking processor"); interactionEnvelopKafkaStreams.start(); @@ -158,13 +164,13 @@ private void closingLinkingStream() { private void closingMatchingStream() { LOGGER.info("SPInteractions Stream Processor -> Closing matching processor"); - interactionEnvelopKafkaStreams.close(new KafkaStreams.CloseOptions().leaveGroup(true)); + matchingEnvelopeKafkaStream.close(new KafkaStreams.CloseOptions().leaveGroup(true).timeout(Duration.ofSeconds(2))); } - private Properties loadConfig() { + private Properties loadConfig(final String inTopic) { final Properties props = new Properties(); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, AppConfig.KAFKA_BOOTSTRAP_SERVERS); - props.put(StreamsConfig.APPLICATION_ID_CONFIG, AppConfig.KAFKA_APPLICATION_ID_INTERACTIONS + topic); + props.put(StreamsConfig.APPLICATION_ID_CONFIG, AppConfig.KAFKA_APPLICATION_ID_INTERACTIONS + inTopic); return props; } From 555c8bda58e1829f975d31dca7a6d6c3fc31ba50 Mon Sep 17 00:00:00 2001 From: ChidoW Date: Thu, 14 Mar 2024 03:04:08 +0200 Subject: [PATCH 4/5] Adding the bootstarpper config --- .../main/resources/data/kafka/kafkaBootStrapConfig.json | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/JeMPI_Apps/JeMPI_Bootstrapper/src/main/resources/data/kafka/kafkaBootStrapConfig.json b/JeMPI_Apps/JeMPI_Bootstrapper/src/main/resources/data/kafka/kafkaBootStrapConfig.json index b89f997b8..a742662e1 100644 --- a/JeMPI_Apps/JeMPI_Bootstrapper/src/main/resources/data/kafka/kafkaBootStrapConfig.json +++ b/JeMPI_Apps/JeMPI_Bootstrapper/src/main/resources/data/kafka/kafkaBootStrapConfig.json @@ -35,6 +35,13 @@ "retention_ms": 86400000, "segments_bytes": 4194304 }, + "JeMPI-interaction-linker-matching": { + "topicName": "JeMPI-interaction-linker-matching", + "partition": 1, + "replications": 1, + "retention_ms": 86400000, + "segments_bytes": 4194304 + }, "JeMPI-mu-controller": { "topicName": "JeMPI-mu-controller", "partition": 1, From ceb06038006b381429d838a6f769b3fbe6fd9f15 Mon Sep 17 00:00:00 2001 From: ChidoW Date: Thu, 14 Mar 2024 03:27:14 +0200 Subject: [PATCH 5/5] Getting if matched correctly --- .../main/java/org/jembi/jempi/linker/SPInteractions.java | 3 ++- .../java/org/jembi/jempi/linker/backend/BackEnd.java | 9 ++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/JeMPI_Apps/JeMPI_Linker/src/main/java/org/jembi/jempi/linker/SPInteractions.java b/JeMPI_Apps/JeMPI_Linker/src/main/java/org/jembi/jempi/linker/SPInteractions.java index 7d2daeb7c..d1445ad22 100644 --- a/JeMPI_Apps/JeMPI_Linker/src/main/java/org/jembi/jempi/linker/SPInteractions.java +++ b/JeMPI_Apps/JeMPI_Linker/src/main/java/org/jembi/jempi/linker/SPInteractions.java @@ -97,10 +97,11 @@ private void matchPatient( if (interactionEnvelop.contentType() != BATCH_INTERACTION) { return; } + final var completableFuture = Ask.matchInteraction(system, backEnd, key, interactionEnvelop).toCompletableFuture(); try { final var reply = completableFuture.get(65, TimeUnit.SECONDS); - if (reply.linkInfo() == null) { + if (!reply.matched()) { LOGGER.error("BACK END RESPONSE(ERROR)"); } } catch (InterruptedException | ExecutionException | TimeoutException ex) { diff --git a/JeMPI_Apps/JeMPI_Linker/src/main/java/org/jembi/jempi/linker/backend/BackEnd.java b/JeMPI_Apps/JeMPI_Linker/src/main/java/org/jembi/jempi/linker/backend/BackEnd.java index 8a2c38091..afff03600 100644 --- a/JeMPI_Apps/JeMPI_Linker/src/main/java/org/jembi/jempi/linker/backend/BackEnd.java +++ b/JeMPI_Apps/JeMPI_Linker/src/main/java/org/jembi/jempi/linker/backend/BackEnd.java @@ -238,17 +238,16 @@ private Behavior asyncMatchInteractionHandler(final AsyncMatchInteracti return Behaviors.same(); }); } - // to-do: Consider if LinkInfo is nevessary final var linkInfo = LinkerDWH.matchInteraction(libMPI, req.batchInteraction.interaction(), null, AppConfig.LINKER_MATCH_THRESHOLD, req.batchInteraction.stan()); - if (linkInfo.isLeft()) { - req.replyTo.tell(new AsyncMatchInteractionResponse(linkInfo.getLeft())); + if (linkInfo.isRight()) { + req.replyTo.tell(new AsyncMatchInteractionResponse(true)); } else { - req.replyTo.tell(new AsyncMatchInteractionResponse(null)); + req.replyTo.tell(new AsyncMatchInteractionResponse(false)); } return Behaviors.withTimers(timers -> { timers.startSingleTimer(SINGLE_TIMER_TIMEOUT_KEY, TeaTimeRequest.INSTANCE, Duration.ofSeconds(10)); @@ -389,7 +388,7 @@ public record AsyncMatchInteractionRequest( InteractionEnvelop batchInteraction) implements Request { } - public record AsyncMatchInteractionResponse(LinkInfo linkInfo) implements Response { + public record AsyncMatchInteractionResponse(Boolean matched) implements Response { } public record RunStartStopHooksRequest(