diff --git a/JeMPI_Apps/JeMPI_Bootstrapper/src/main/resources/data/kafka/kafkaBootStrapConfig.json b/JeMPI_Apps/JeMPI_Bootstrapper/src/main/resources/data/kafka/kafkaBootStrapConfig.json index b89f997b8..a742662e1 100644 --- a/JeMPI_Apps/JeMPI_Bootstrapper/src/main/resources/data/kafka/kafkaBootStrapConfig.json +++ b/JeMPI_Apps/JeMPI_Bootstrapper/src/main/resources/data/kafka/kafkaBootStrapConfig.json @@ -35,6 +35,13 @@ "retention_ms": 86400000, "segments_bytes": 4194304 }, + "JeMPI-interaction-linker-matching": { + "topicName": "JeMPI-interaction-linker-matching", + "partition": 1, + "replications": 1, + "retention_ms": 86400000, + "segments_bytes": 4194304 + }, "JeMPI-mu-controller": { "topicName": "JeMPI-mu-controller", "partition": 1, diff --git a/JeMPI_Apps/JeMPI_LibShared/src/main/java/org/jembi/jempi/shared/models/GlobalConstants.java b/JeMPI_Apps/JeMPI_LibShared/src/main/java/org/jembi/jempi/shared/models/GlobalConstants.java index fae967877..cf2e9afa3 100644 --- a/JeMPI_Apps/JeMPI_LibShared/src/main/java/org/jembi/jempi/shared/models/GlobalConstants.java +++ b/JeMPI_Apps/JeMPI_LibShared/src/main/java/org/jembi/jempi/shared/models/GlobalConstants.java @@ -10,6 +10,7 @@ public final class GlobalConstants { public static final String TOPIC_INTERACTION_PROCESSOR_CONTROLLER = "JeMPI-interaction-processor-controller"; public static final String TOPIC_INTERACTION_EM = "JeMPI-interaction-em"; public static final String TOPIC_INTERACTION_LINKER = "JeMPI-interaction-linker"; + public static final String TOPIC_INTERACTION_LINKER_MATCHING = "JeMPI-interaction-linker-matching"; public static final String TOPIC_MU_CONTROLLER = "JeMPI-mu-controller"; public static final String TOPIC_MU_LINKER = "JeMPI-mu-linker"; public static final String TOPIC_AUDIT_TRAIL = "JeMPI-audit-trail"; diff --git a/JeMPI_Apps/JeMPI_Linker/src/main/java/org/jembi/jempi/linker/Ask.java b/JeMPI_Apps/JeMPI_Linker/src/main/java/org/jembi/jempi/linker/Ask.java index e88dd54d0..79663ef60 100644 --- a/JeMPI_Apps/JeMPI_Linker/src/main/java/org/jembi/jempi/linker/Ask.java +++ b/JeMPI_Apps/JeMPI_Linker/src/main/java/org/jembi/jempi/linker/Ask.java @@ -111,6 +111,17 @@ static CompletionStage linkInteraction( actorSystem.scheduler()); } + static CompletionStage matchInteraction( + final ActorSystem actorSystem, + final ActorRef backEnd, + final String key, + final InteractionEnvelop batchInteraction) { + return AskPattern.ask(backEnd, + replyTo -> new BackEnd.AsyncMatchInteractionRequest(replyTo, key, batchInteraction), + java.time.Duration.ofSeconds(60), + actorSystem.scheduler()); + } + static CompletionStage runStartEndHooks( final ActorSystem actorSystem, final ActorRef backEnd, diff --git a/JeMPI_Apps/JeMPI_Linker/src/main/java/org/jembi/jempi/linker/SPInteractions.java b/JeMPI_Apps/JeMPI_Linker/src/main/java/org/jembi/jempi/linker/SPInteractions.java index 899feaeda..d1445ad22 100644 --- a/JeMPI_Apps/JeMPI_Linker/src/main/java/org/jembi/jempi/linker/SPInteractions.java +++ b/JeMPI_Apps/JeMPI_Linker/src/main/java/org/jembi/jempi/linker/SPInteractions.java @@ -14,10 +14,13 @@ import org.jembi.jempi.libmpi.MpiGeneralError; import org.jembi.jempi.linker.backend.BackEnd; import org.jembi.jempi.shared.models.CustomMU; +import org.jembi.jempi.shared.models.GlobalConstants; import org.jembi.jempi.shared.models.InteractionEnvelop; import org.jembi.jempi.shared.serdes.JsonPojoDeserializer; import org.jembi.jempi.shared.serdes.JsonPojoSerializer; +import org.jetbrains.annotations.NotNull; +import java.time.Duration; import java.util.List; import java.util.Properties; import java.util.concurrent.ExecutionException; @@ -32,6 +35,7 @@ public final class SPInteractions { private static final Logger LOGGER = LogManager.getLogger(SPInteractions.class); private final String topic; private KafkaStreams interactionEnvelopKafkaStreams; + private KafkaStreams matchingEnvelopeKafkaStream; private SPInteractions(final String topic_) { LOGGER.info("SPInteractions constructor"); @@ -42,28 +46,33 @@ public static SPInteractions create(final String topic_) { return new SPInteractions(topic_); } + private void linkPatientProcess(final ActorSystem system, final ActorRef backEnd, final String key, final InteractionEnvelop interactionEnvelop) { + final var completableFuture = Ask.runStartEndHooks(system, backEnd, key, interactionEnvelop).toCompletableFuture(); + try { + List hookErrors = completableFuture.get(65, TimeUnit.SECONDS).hooksResults(); + if (!hookErrors.isEmpty()) { + LOGGER.error(hookErrors); + } + } catch (InterruptedException | ExecutionException | TimeoutException ex) { + LOGGER.error(ex.getLocalizedMessage(), ex); + this.closingLinkingStream(); + } + } private void linkPatient( final ActorSystem system, final ActorRef backEnd, final String key, final InteractionEnvelop interactionEnvelop) { - if (interactionEnvelop.contentType() == InteractionEnvelop.ContentType.BATCH_START_SENTINEL - || interactionEnvelop.contentType() == BATCH_END_SENTINEL) { - final var completableFuture = Ask.runStartEndHooks(system, backEnd, key, interactionEnvelop).toCompletableFuture(); - try { - List hookErrors = completableFuture.get(65, TimeUnit.SECONDS).hooksResults(); - if (!hookErrors.isEmpty()) { - LOGGER.error(hookErrors); - } - } catch (InterruptedException | ExecutionException | TimeoutException ex) { - LOGGER.error(ex.getLocalizedMessage(), ex); - this.close(); - } + if (interactionEnvelop.contentType() == InteractionEnvelop.ContentType.BATCH_START_SENTINEL) { + LOGGER.info(String.format("SPInteractions Stream Processor -> Starting linking for tag '%s'", interactionEnvelop.tag())); + linkPatientProcess(system, backEnd, key, interactionEnvelop); + } else if (interactionEnvelop.contentType() == BATCH_END_SENTINEL) { + LOGGER.info(String.format("SPInteractions Stream Processor -> Ended linking for tag '%s'", interactionEnvelop.tag())); + linkPatientProcess(system, backEnd, key, interactionEnvelop); } if (interactionEnvelop.contentType() != BATCH_INTERACTION) { - return; } final var completableFuture = Ask.linkInteraction(system, backEnd, key, interactionEnvelop).toCompletableFuture(); @@ -74,43 +83,95 @@ private void linkPatient( } } catch (InterruptedException | ExecutionException | TimeoutException ex) { LOGGER.error(ex.getLocalizedMessage(), ex); - this.close(); + this.closingLinkingStream(); } } - public void open( - final ActorSystem system, - final ActorRef backEnd) { - LOGGER.info("SPInteractions Stream Processor"); - final Properties props = loadConfig(); + private void matchPatient( + final ActorSystem system, + final ActorRef backEnd, + final String key, + final InteractionEnvelop interactionEnvelop) { + + if (interactionEnvelop.contentType() != BATCH_INTERACTION) { + return; + } + + final var completableFuture = Ask.matchInteraction(system, backEnd, key, interactionEnvelop).toCompletableFuture(); + try { + final var reply = completableFuture.get(65, TimeUnit.SECONDS); + if (!reply.matched()) { + LOGGER.error("BACK END RESPONSE(ERROR)"); + } + } catch (InterruptedException | ExecutionException | TimeoutException ex) { + LOGGER.error(ex.getLocalizedMessage(), ex); + this.closingMatchingStream(); + } + + } + + @NotNull + private StreamsBuilder getMatchingStream(final ActorSystem system, final ActorRef backEnd) { + final var stringSerde = Serdes.String(); + final var interactionEnvelopSerde = Serdes.serdeFrom(new JsonPojoSerializer<>(), + new JsonPojoDeserializer<>(InteractionEnvelop.class)); + final StreamsBuilder streamsBuilder = new StreamsBuilder(); + final KStream matchStream = + streamsBuilder.stream(GlobalConstants.TOPIC_INTERACTION_LINKER_MATCHING, Consumed.with(stringSerde, interactionEnvelopSerde)); + matchStream.foreach((key, matchEnvelop) -> { + matchPatient(system, backEnd, key, matchEnvelop); + if (matchEnvelop.contentType() == BATCH_END_SENTINEL) { + LOGGER.info(String.format("SPInteractions Stream Processor -> Ended matching for tag '%s'", matchEnvelop.tag())); + this.closingMatchingStream(); + } + }); + return streamsBuilder; + } + @NotNull + private StreamsBuilder getLinkingStream(final ActorSystem system, final ActorRef backEnd, final KafkaStreams matchingStream) { final var stringSerde = Serdes.String(); final var interactionEnvelopSerde = Serdes.serdeFrom(new JsonPojoSerializer<>(), - new JsonPojoDeserializer<>(InteractionEnvelop.class)); + new JsonPojoDeserializer<>(InteractionEnvelop.class)); final StreamsBuilder streamsBuilder = new StreamsBuilder(); final KStream interactionStream = - streamsBuilder.stream(topic, Consumed.with(stringSerde, interactionEnvelopSerde)); + streamsBuilder.stream(topic, Consumed.with(stringSerde, interactionEnvelopSerde)); interactionStream.foreach((key, interactionEnvelop) -> { linkPatient(system, backEnd, key, interactionEnvelop); if (!CustomMU.SEND_INTERACTIONS_TO_EM && interactionEnvelop.contentType() == BATCH_END_SENTINEL) { - this.close(); + LOGGER.info(String.format("SPInteractions Stream Processor -> Starting matching for tag '%s'", interactionEnvelop.tag())); + matchingStream.start(); } }); - interactionEnvelopKafkaStreams = new KafkaStreams(streamsBuilder.build(), props); + return streamsBuilder; + } + + public void open( + final ActorSystem system, + final ActorRef backEnd) { + LOGGER.info("SPInteractions Stream Processor"); + matchingEnvelopeKafkaStream = new KafkaStreams(getMatchingStream(system, backEnd).build(), loadConfig(topic)); + interactionEnvelopKafkaStreams = new KafkaStreams(getLinkingStream(system, backEnd, matchingEnvelopeKafkaStream).build(), loadConfig(GlobalConstants.TOPIC_INTERACTION_LINKER_MATCHING)); interactionEnvelopKafkaStreams.cleanUp(); + LOGGER.info("SPInteractions Stream Processor -> Starting linking processor"); interactionEnvelopKafkaStreams.start(); LOGGER.info("KafkaStreams started"); } - private void close() { - LOGGER.info("Stream closed"); + private void closingLinkingStream() { + LOGGER.info("SPInteractions Stream Processor -> Closing linking processor"); interactionEnvelopKafkaStreams.close(new KafkaStreams.CloseOptions().leaveGroup(true)); } - private Properties loadConfig() { + private void closingMatchingStream() { + LOGGER.info("SPInteractions Stream Processor -> Closing matching processor"); + matchingEnvelopeKafkaStream.close(new KafkaStreams.CloseOptions().leaveGroup(true).timeout(Duration.ofSeconds(2))); + } + + private Properties loadConfig(final String inTopic) { final Properties props = new Properties(); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, AppConfig.KAFKA_BOOTSTRAP_SERVERS); - props.put(StreamsConfig.APPLICATION_ID_CONFIG, AppConfig.KAFKA_APPLICATION_ID_INTERACTIONS + topic); + props.put(StreamsConfig.APPLICATION_ID_CONFIG, AppConfig.KAFKA_APPLICATION_ID_INTERACTIONS + inTopic); return props; } diff --git a/JeMPI_Apps/JeMPI_Linker/src/main/java/org/jembi/jempi/linker/backend/BackEnd.java b/JeMPI_Apps/JeMPI_Linker/src/main/java/org/jembi/jempi/linker/backend/BackEnd.java index c530891b3..afff03600 100644 --- a/JeMPI_Apps/JeMPI_Linker/src/main/java/org/jembi/jempi/linker/backend/BackEnd.java +++ b/JeMPI_Apps/JeMPI_Linker/src/main/java/org/jembi/jempi/linker/backend/BackEnd.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.stream.Collectors; @@ -37,6 +38,8 @@ public final class BackEnd extends AbstractBehavior { private static final Logger LOGGER = LogManager.getLogger(BackEnd.class); private static final String SINGLE_TIMER_TIMEOUT_KEY = "SingleTimerTimeOutKey"; static MyKafkaProducer topicNotifications; + + static MyKafkaProducer matchingInteractions; private final Executor ec; private LibMPI libMPI = null; @@ -52,6 +55,12 @@ private BackEnd(final ActorContext context) { new StringSerializer(), new JsonPojoSerializer<>(), AppConfig.KAFKA_CLIENT_ID_NOTIFICATIONS); + + matchingInteractions = new MyKafkaProducer<>(AppConfig.KAFKA_BOOTSTRAP_SERVERS, + GlobalConstants.TOPIC_INTERACTION_LINKER_MATCHING, + new StringSerializer(), + new JsonPojoSerializer<>(), + AppConfig.KAFKA_CLIENT_ID_NOTIFICATIONS); } private BackEnd( @@ -106,6 +115,7 @@ private void openMPI(final boolean useDGraph) { @Override public Receive createReceive() { return newReceiveBuilder().onMessage(AsyncLinkInteractionRequest.class, this::asyncLinkInteractionHandler) + .onMessage(AsyncMatchInteractionRequest.class, this::asyncMatchInteractionHandler) .onMessage(SyncLinkInteractionRequest.class, this::syncLinkInteractionHandler) // .onMessage(SyncLinkInteractionToGidRequest.class, this::syncLinkInteractionToGidHandler) .onMessage(CalculateScoresRequest.class, this::calculateScoresHandler) @@ -143,9 +153,11 @@ private Behavior crUpdateField(final CrUpdateFieldRequest req) { return Behaviors.same(); } - private Behavior runStartStopHooks(final RunStartStopHooksRequest req) { + private Behavior runStartStopHooks(final RunStartStopHooksRequest req) throws ExecutionException, InterruptedException { List hookRunErrors = List.of(); + matchingInteractions.produceSync(req.key, req.batchInteraction); + if (req.batchInteraction.contentType() == BATCH_START_SENTINEL) { hookRunErrors = libMPI.beforeLinkingHook(); } else if (req.batchInteraction.contentType() == BATCH_END_SENTINEL) { @@ -171,7 +183,8 @@ private Behavior syncLinkInteractionHandler(final SyncLinkInteractionRe request.link.matchThreshold() == null ? AppConfig.LINKER_MATCH_THRESHOLD : request.link.matchThreshold(), - request.link.stan()); + request.link.stan(), + null); request.replyTo.tell(new SyncLinkInteractionResponse(request.link.stan(), listLinkInfo.isLeft() ? listLinkInfo.getLeft() @@ -182,7 +195,7 @@ private Behavior syncLinkInteractionHandler(final SyncLinkInteractionRe return Behaviors.same(); } - private Behavior asyncLinkInteractionHandler(final AsyncLinkInteractionRequest req) { + private Behavior asyncLinkInteractionHandler(final AsyncLinkInteractionRequest req) throws ExecutionException, InterruptedException { if (req.batchInteraction.contentType() != InteractionEnvelop.ContentType.BATCH_INTERACTION) { return Behaviors.withTimers(timers -> { timers.startSingleTimer(SINGLE_TIMER_TIMEOUT_KEY, TeaTimeRequest.INSTANCE, Duration.ofSeconds(5)); @@ -190,12 +203,22 @@ private Behavior asyncLinkInteractionHandler(final AsyncLinkInteraction return Behaviors.same(); }); } + final var linkInfo = LinkerDWH.linkInteraction(libMPI, req.batchInteraction.interaction(), null, AppConfig.LINKER_MATCH_THRESHOLD, - req.batchInteraction.stan()); + req.batchInteraction.stan(), + (final Interaction interaction) -> { + try { + BackEnd.matchingInteractions.produceSync(req.key, req.batchInteraction); + } catch (ExecutionException e) { + throw new RuntimeException(e); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); if (linkInfo.isLeft()) { req.replyTo.tell(new AsyncLinkInteractionResponse(linkInfo.getLeft())); } else { @@ -207,6 +230,31 @@ private Behavior asyncLinkInteractionHandler(final AsyncLinkInteraction }); } + private Behavior asyncMatchInteractionHandler(final AsyncMatchInteractionRequest req) { + if (req.batchInteraction.contentType() != InteractionEnvelop.ContentType.BATCH_INTERACTION) { + return Behaviors.withTimers(timers -> { + timers.startSingleTimer(SINGLE_TIMER_TIMEOUT_KEY, TeaTimeRequest.INSTANCE, Duration.ofSeconds(5)); + req.replyTo.tell(new AsyncMatchInteractionResponse(null)); + return Behaviors.same(); + }); + } + final var linkInfo = + LinkerDWH.matchInteraction(libMPI, + req.batchInteraction.interaction(), + null, + AppConfig.LINKER_MATCH_THRESHOLD, + req.batchInteraction.stan()); + if (linkInfo.isRight()) { + req.replyTo.tell(new AsyncMatchInteractionResponse(true)); + } else { + req.replyTo.tell(new AsyncMatchInteractionResponse(false)); + } + return Behaviors.withTimers(timers -> { + timers.startSingleTimer(SINGLE_TIMER_TIMEOUT_KEY, TeaTimeRequest.INSTANCE, Duration.ofSeconds(10)); + return Behaviors.same(); + }); + } + /* private Behavior syncLinkInteractionToGidHandler(final SyncLinkInteractionToGidRequest request) { final LinkInfo linkInfo; @@ -334,6 +382,15 @@ public record AsyncLinkInteractionRequest( public record AsyncLinkInteractionResponse(LinkInfo linkInfo) implements Response { } + public record AsyncMatchInteractionRequest( + ActorRef replyTo, + String key, + InteractionEnvelop batchInteraction) implements Request { + } + + public record AsyncMatchInteractionResponse(Boolean matched) implements Response { + } + public record RunStartStopHooksRequest( ActorRef replyTo, String key, diff --git a/JeMPI_Apps/JeMPI_Linker/src/main/java/org/jembi/jempi/linker/backend/LinkerDWH.java b/JeMPI_Apps/JeMPI_Linker/src/main/java/org/jembi/jempi/linker/backend/LinkerDWH.java index 8c9be2157..a0733f60c 100644 --- a/JeMPI_Apps/JeMPI_Linker/src/main/java/org/jembi/jempi/linker/backend/LinkerDWH.java +++ b/JeMPI_Apps/JeMPI_Linker/src/main/java/org/jembi/jempi/linker/backend/LinkerDWH.java @@ -17,6 +17,7 @@ import java.util.*; import java.util.concurrent.ExecutionException; +import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -118,13 +119,60 @@ static void helperUpdateInteractionsScore( }); } + public static Either> matchInteraction( + final LibMPI libMPI, + final Interaction interaction, + final ExternalLinkRange externalLinkRange, + final float matchThreshold_, + final String envelopStan) { + libMPI.startTransaction(); + if (CustomLinkerDeterministic.DETERMINISTIC_DO_MATCHING || CustomLinkerProbabilistic.PROBABILISTIC_DO_MATCHING) { + final var candidates = libMPI.findMatchCandidates(interaction.demographicData()); + LOGGER.debug("Match Candidates {} ", candidates.size()); + if (candidates.isEmpty()) { + try { + final var i = OBJECT_MAPPER.writeValueAsString(interaction.demographicData()); + final var f = """ + MATCH NOTIFICATION NO CANDIDATE + {}"""; + LOGGER.info(f, i); + } catch (JsonProcessingException e) { + LOGGER.error(e.getLocalizedMessage(), e); + } + } else { + final var workCandidate = candidates.parallelStream() + .unordered() + .map(candidate -> new WorkCandidate(candidate, + LinkerUtils.calcNormalizedScore(candidate.demographicData(), + interaction.demographicData()))) + .sorted((o1, o2) -> Float.compare(o2.score(), o1.score())) + .collect(Collectors.toCollection(ArrayList::new)) + .getFirst(); + try { + final var i = OBJECT_MAPPER.writeValueAsString(interaction.demographicData()); + final var g = OBJECT_MAPPER.writeValueAsString(workCandidate.goldenRecord().demographicData()); + final var f = """ + MATCH NOTIFICATION + {} + {}"""; + LOGGER.info(f, i, g); + } catch (JsonProcessingException e) { + LOGGER.error(e.getLocalizedMessage(), e); + } + } + } + libMPI.closeTransaction(); + return Either.right(List.of()); + } + // + public static Either> linkInteraction( final LibMPI libMPI, final Interaction interaction, final ExternalLinkRange externalLinkRange, final float matchThreshold_, - final String envelopStan) { + final String envelopStan, + final Consumer forwarder) { // if (LOGGER.isTraceEnabled()) { // LOGGER.trace("{}", envelopStan); @@ -142,44 +190,12 @@ public static Either> linkInteraction( } if (!CustomLinkerDeterministic.canApplyLinking(interaction.demographicData())) { - libMPI.startTransaction(); - if (CustomLinkerDeterministic.DETERMINISTIC_DO_MATCHING || CustomLinkerProbabilistic.PROBABILISTIC_DO_MATCHING) { - final var candidates = libMPI.findMatchCandidates(interaction.demographicData()); - LOGGER.debug("Match Candidates {} ", candidates.size()); - if (candidates.isEmpty()) { - try { - final var i = OBJECT_MAPPER.writeValueAsString(interaction.demographicData()); - final var f = """ - MATCH NOTIFICATION NO CANDIDATE - {}"""; - LOGGER.info(f, i); - } catch (JsonProcessingException e) { - LOGGER.error(e.getLocalizedMessage(), e); - } - } else { - final var workCandidate = candidates.parallelStream() - .unordered() - .map(candidate -> new WorkCandidate(candidate, - LinkerUtils.calcNormalizedScore(candidate.demographicData(), - interaction.demographicData()))) - .sorted((o1, o2) -> Float.compare(o2.score(), o1.score())) - .collect(Collectors.toCollection(ArrayList::new)) - .getFirst(); - try { - final var i = OBJECT_MAPPER.writeValueAsString(interaction.demographicData()); - final var g = OBJECT_MAPPER.writeValueAsString(workCandidate.goldenRecord().demographicData()); - final var f = """ - MATCH NOTIFICATION - {} - {}"""; - LOGGER.info(f, i, g); - } catch (JsonProcessingException e) { - LOGGER.error(e.getLocalizedMessage(), e); - } - } + if (forwarder != null) { + forwarder.accept(interaction); + return Either.right(List.of()); + } else { + return matchInteraction(libMPI, interaction, externalLinkRange, matchThreshold_, envelopStan); } - libMPI.closeTransaction(); - return Either.right(List.of()); } else { LinkInfo linkInfo = null; final List externalLinkCandidateList = new ArrayList<>();