Skip to content

Commit 46567f3

Browse files
committed
partial clean-link-match-streams && add cr-link-update
1 parent 40a72c5 commit 46567f3

33 files changed

+408
-1405
lines changed

JeMPI_Apps/JeMPI_LibAPI/src/main/java/org/jembi/jempi/libapi/BackEnd.java

Lines changed: 22 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
import org.jembi.jempi.shared.models.*;
1616
import org.jembi.jempi.shared.models.dashboard.NotificationStats;
1717
import org.jembi.jempi.shared.models.dashboard.SQLDashboardData;
18-
import org.jembi.jempi.shared.utils.AppUtils;
1918

2019
import java.io.File;
2120
import java.io.IOException;
@@ -25,7 +24,10 @@
2524
import java.sql.SQLException;
2625
import java.sql.Timestamp;
2726
import java.time.LocalDateTime;
28-
import java.util.*;
27+
import java.util.ArrayList;
28+
import java.util.Collections;
29+
import java.util.HashMap;
30+
import java.util.List;
2931

3032
public final class BackEnd extends AbstractBehavior<BackEnd.Event> {
3133

@@ -107,15 +109,7 @@ private void openMPI(
107109
final String kafkaBootstrapServers,
108110
final String kafkaClientId,
109111
final Level debugLevel) {
110-
if (!AppUtils.isNullOrEmpty(Arrays.stream(dgraphHosts).toList())) {
111-
libMPI = new LibMPI(debugLevel, dgraphHosts, dgraphPorts, kafkaBootstrapServers, kafkaClientId);
112-
} else {
113-
libMPI = new LibMPI(String.format(Locale.ROOT, "jdbc:postgresql://%s:%d/%s", pgIP, pgPort, pgAuditDb),
114-
pgUser,
115-
pgPassword,
116-
kafkaBootstrapServers,
117-
kafkaClientId);
118-
}
112+
libMPI = new LibMPI(debugLevel, dgraphHosts, dgraphPorts, kafkaBootstrapServers, kafkaClientId);
119113
}
120114

121115
@Override
@@ -241,8 +235,11 @@ private Behavior<Event> postFilterGidsWithInteractionCountHandler(final PostFilt
241235
}
242236

243237
private Behavior<Event> getNotificationsHandler(final GetNotificationsRequest request) {
244-
MatchesForReviewResult result = psqlNotifications.getMatchesForReview(request.limit(), request.offset(),
245-
request.startDate(), request.endDate(), request.states());
238+
MatchesForReviewResult result = psqlNotifications.getMatchesForReview(request.limit(),
239+
request.offset(),
240+
request.startDate(),
241+
request.endDate(),
242+
request.states());
246243
request.replyTo.tell(new GetNotificationsResponse(result.getCount(),
247244
result.getSkippedRecords(),
248245
result.getNotifications()));
@@ -300,10 +297,11 @@ private Behavior<Event> getExpandedGoldenRecordHandler(final GetExpandedGoldenRe
300297
libMPI.startTransaction();
301298
expandedGoldenRecord = libMPI.findExpandedGoldenRecord(request.goldenId);
302299
libMPI.closeTransaction();
303-
} catch (Exception exception) {
300+
} catch (Exception e) {
301+
LOGGER.error(e.getLocalizedMessage(), e);
304302
LOGGER.error("libMPI.findExpandedGoldenRecord failed for goldenId: {} with error: {}",
305303
request.goldenId,
306-
exception.getMessage());
304+
e.getMessage());
307305
}
308306

309307
if (expandedGoldenRecord == null) {
@@ -447,7 +445,10 @@ private Behavior<Event> postUploadCsvFileHandler(final PostUploadCsvFileRequest
447445
File file = request.file();
448446
try {
449447
String userCSVPath = System.getenv("UPLOAD_CSV_PATH");
450-
Files.copy(file.toPath(), Paths.get((userCSVPath != null ? userCSVPath : "/app/csv") + "/" + file.getName()));
448+
Files.copy(file.toPath(),
449+
Paths.get((userCSVPath != null
450+
? userCSVPath
451+
: "/app/csv") + "/" + file.getName()));
451452
Files.delete(file.toPath());
452453
} catch (NoSuchFileException e) {
453454
LOGGER.error("No such file");
@@ -461,7 +462,8 @@ private Behavior<Event> postUploadCsvFileHandler(final PostUploadCsvFileRequest
461462
private Behavior<Event> getSqlDashboardDataHandler(final SQLDashboardDataRequest request) {
462463
int openNotifications = psqlNotifications.getNotificationCount("OPEN");
463464
int closedNotifications = psqlNotifications.getNotificationCount("CLOSED");
464-
request.replyTo.tell(new SQLDashboardDataResponse(new SQLDashboardData(new NotificationStats(openNotifications, closedNotifications))));
465+
request.replyTo.tell(new SQLDashboardDataResponse(new SQLDashboardData(new NotificationStats(openNotifications,
466+
closedNotifications))));
465467
return Behaviors.same();
466468
}
467469

@@ -513,10 +515,11 @@ public record GetInteractionAuditTrailRequest(
513515
String uid) implements Event {
514516
}
515517

516-
public record SQLDashboardDataResponse(SQLDashboardData dashboardData) { }
518+
public record SQLDashboardDataResponse(SQLDashboardData dashboardData) {
519+
}
517520

518521
public record SQLDashboardDataRequest(
519-
ActorRef<SQLDashboardDataResponse> replyTo) implements Event {
522+
ActorRef<SQLDashboardDataResponse> replyTo) implements Event {
520523
}
521524

522525
public record GetInteractionAuditTrailResponse(List<ApiModels.ApiAuditTrail.LinkingAuditEntry> auditTrail) {

JeMPI_Apps/JeMPI_LibAPI/src/main/java/org/jembi/jempi/libapi/Routes.java

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -716,6 +716,29 @@ private static CompletionStage<HttpResponse> postCrRegisterProxy(
716716
return stage.thenApply(response -> response);
717717
}
718718

719+
private static CompletionStage<HttpResponse> postCrLinkUpdateProxy(
720+
final String linkerIP,
721+
final Integer linkerPort,
722+
final Http http,
723+
final ApiModels.ApiCrLinkUpdateRequest body) throws JsonProcessingException {
724+
final HttpRequest request;
725+
final byte[] json;
726+
try {
727+
json = OBJECT_MAPPER.writeValueAsBytes(body);
728+
} catch (JsonProcessingException e) {
729+
LOGGER.error(e.getLocalizedMessage(), e);
730+
throw e;
731+
}
732+
request = HttpRequest.create(String.format(Locale.ROOT,
733+
"http://%s:%d/JeMPI/%s",
734+
linkerIP,
735+
linkerPort,
736+
GlobalConstants.SEGMENT_PROXY_POST_CR_LINK_UPDATE))
737+
.withMethod(HttpMethods.POST)
738+
.withEntity(ContentTypes.APPLICATION_JSON, json);
739+
return http.singleRequest(request).thenApply(response -> response);
740+
}
741+
719742
private static CompletionStage<HttpResponse> postLinkInteractionProxy(
720743
final String linkerIP,
721744
final Integer linkerPort,
@@ -904,6 +927,28 @@ private static Route postCrRegister(
904927
});
905928
}
906929

930+
private static Route postCrLinkUpdate(
931+
final String linkerIP,
932+
final Integer linkerPort,
933+
final Http http) {
934+
return entity(Jackson.unmarshaller(OBJECT_MAPPER, ApiModels.ApiCrLinkUpdateRequest.class),
935+
obj -> {
936+
try {
937+
return onComplete(postCrLinkUpdateProxy(linkerIP, linkerPort, http, obj),
938+
response -> {
939+
if (!response.isSuccess()) {
940+
LOGGER.warn(IM_A_TEA_POT_LOG);
941+
return complete(ApiModels.getHttpErrorResponse(GlobalConstants.IM_A_TEA_POT));
942+
}
943+
return complete(response.get());
944+
});
945+
} catch (JsonProcessingException e) {
946+
LOGGER.error(e.getLocalizedMessage(), e);
947+
return complete(ApiModels.getHttpErrorResponse(StatusCodes.UNPROCESSABLE_ENTITY));
948+
}
949+
});
950+
}
951+
907952
private static Route postLinkInteraction(
908953
final String linkerIP,
909954
final Integer linkerPort,
@@ -985,6 +1030,8 @@ public static Route createCoreAPIRoutes(
9851030
() -> Routes.postLinkInteractionToGid(linkerIP, linkerPort, http)),
9861031
path(GlobalConstants.SEGMENT_PROXY_POST_CR_REGISTER,
9871032
() -> Routes.postCrRegister(linkerIP, linkerPort, http)),
1033+
path(GlobalConstants.SEGMENT_PROXY_POST_CR_LINK_UPDATE,
1034+
() -> Routes.postCrLinkUpdate(linkerIP, linkerPort, http)),
9881035
path(GlobalConstants.SEGMENT_PROXY_POST_CR_FIND,
9891036
() -> Routes.postCrFind(linkerIP, linkerPort, http)),
9901037
path(GlobalConstants.SEGMENT_PROXY_POST_CR_CANDIDATES,
@@ -1031,8 +1078,6 @@ public static Route createCoreAPIRoutes(
10311078
() -> Routes.getInteractionAuditTrail(actorSystem, backEnd)),
10321079
path(GlobalConstants.SEGMENT_GET_NOTIFICATIONS,
10331080
() -> Routes.getNotifications(actorSystem, backEnd)),
1034-
path(segment(GlobalConstants.SEGMENT_GET_INTERACTION).slash(segment(Pattern.compile(
1035-
"^[A-z0-9]+$"))), iid -> Routes.getInteraction(actorSystem, backEnd, iid)),
10361081
path(segment(GlobalConstants.SEGMENT_GET_EXPANDED_GOLDEN_RECORD).slash(segment(Pattern.compile(
10371082
"^[A-z0-9]+$"))), gid -> Routes.getExpandedGoldenRecord(actorSystem, backEnd, gid)),
10381083
path(GlobalConstants.SEGMENT_GET_FIELDS_CONFIG, () -> complete(StatusCodes.OK, jsonFields)),

JeMPI_Apps/JeMPI_LibMPI/src/main/java/org/jembi/jempi/libmpi/LibMPI.java

Lines changed: 56 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,11 @@
77
import org.apache.logging.log4j.LogManager;
88
import org.apache.logging.log4j.Logger;
99
import org.jembi.jempi.libmpi.dgraph.LibDgraph;
10-
import org.jembi.jempi.libmpi.postgresql.LibPostgresql;
1110
import org.jembi.jempi.shared.kafka.MyKafkaProducer;
1211
import org.jembi.jempi.shared.models.*;
1312
import org.jembi.jempi.shared.serdes.JsonPojoSerializer;
1413
import org.jembi.jempi.shared.utils.AuditTrailBridge;
14+
1515
import java.time.LocalDateTime;
1616
import java.util.List;
1717
import java.util.Locale;
@@ -39,28 +39,12 @@ public LibMPI(
3939
auditTrailUtil = new AuditTrailBridge(topicAuditEvents);
4040
}
4141

42-
public LibMPI(
43-
final String URL,
44-
final String USR,
45-
final String PSW,
46-
final String kafkaBootstrapServers,
47-
final String kafkaClientId) {
48-
LOGGER.info("{}", "LibMPI Constructor");
49-
topicAuditEvents = new MyKafkaProducer<>(kafkaBootstrapServers,
50-
GlobalConstants.TOPIC_AUDIT_TRAIL,
51-
new StringSerializer(),
52-
new JsonPojoSerializer<>(),
53-
kafkaClientId);
54-
client = new LibPostgresql(URL, USR, PSW);
55-
auditTrailUtil = new AuditTrailBridge(topicAuditEvents);
56-
}
57-
5842
private void sendAuditEvent(
59-
final String interactionID,
60-
final String goldenID,
61-
final String message,
62-
final float score,
63-
final LinkingRule linkingRule) {
43+
final String interactionID,
44+
final String goldenID,
45+
final String message,
46+
final float score,
47+
final LinkingRule linkingRule) {
6448

6549
LinkingAuditEventData linkingEvent = new LinkingAuditEventData(message, interactionID, goldenID, score, linkingRule);
6650
auditTrailUtil.sendAuditEvent(GlobalConstants.AuditEventType.LINKING_EVENT, linkingEvent);
@@ -123,18 +107,26 @@ public List<ExpandedInteraction> findExpandedInteractions(final List<String> int
123107
return client.findExpandedInteractions(interactionIDs);
124108
}
125109

126-
public GoldenRecord findGoldenRecord(final String goldenId) {
127-
return client.findGoldenRecord(goldenId);
110+
public Either<MpiGeneralError, GoldenRecord> findGoldenRecord(final String goldenId) {
111+
final var records = client.findGoldenRecords(List.of(goldenId));
112+
if (records.isRight()) {
113+
if (!records.get().isEmpty()) {
114+
return Either.right(records.get().getFirst());
115+
} else {
116+
return Either.left(new MpiServiceError.CRGidDoesNotExistError(goldenId));
117+
}
118+
}
119+
return Either.left(records.getLeft());
128120
}
129121

130-
public List<GoldenRecord> findGoldenRecords(final List<String> goldenIds) {
122+
public Either<MpiGeneralError, List<GoldenRecord>> findGoldenRecords(final List<String> goldenIds) {
131123
return client.findGoldenRecords(goldenIds);
132124
}
133125

134126
public ExpandedGoldenRecord findExpandedGoldenRecord(final String goldenId) {
135127
final var records = client.findExpandedGoldenRecords(List.of(goldenId));
136128
if (!records.isEmpty()) {
137-
return records.get(0);
129+
return records.getFirst();
138130
}
139131
return null;
140132
}
@@ -161,8 +153,8 @@ public List<GoldenRecord> findMatchCandidates(final CustomDemographicData demogr
161153
return client.findMatchCandidates(demographicData);
162154
}
163155

164-
public Either<List<GoldenRecord>, MpiGeneralError> findGoldenRecords(final ApiModels.ApiCrFindRequest request) {
165-
return client.findGoldenRecords(request);
156+
public Either<MpiGeneralError, List<GoldenRecord>> apiCrFindGoldenRecords(final ApiModels.ApiCrFindRequest request) {
157+
return client.apiCrFindGoldenRecords(request);
166158
}
167159

168160
public LibMPIPaginatedResultSet<ExpandedGoldenRecord> simpleSearchGoldenRecords(
@@ -230,9 +222,17 @@ public boolean setScore(
230222
final float newScore) {
231223
final var result = client.setScore(interactionID, goldenID, newScore);
232224
if (result) {
233-
sendAuditEvent(interactionID, goldenID, String.format(Locale.ROOT, "score: %.5f -> %.5f", oldScore, newScore), newScore, LinkingRule.UNMATCHED);
225+
sendAuditEvent(interactionID,
226+
goldenID,
227+
String.format(Locale.ROOT, "score: %.5f -> %.5f", oldScore, newScore),
228+
newScore,
229+
LinkingRule.UNMATCHED);
234230
} else {
235-
sendAuditEvent(interactionID, goldenID, String.format(Locale.ROOT, "set score error: %.5f -> %.5f", oldScore, newScore), newScore, LinkingRule.UNMATCHED);
231+
sendAuditEvent(interactionID,
232+
goldenID,
233+
String.format(Locale.ROOT, "set score error: %.5f -> %.5f", oldScore, newScore),
234+
newScore,
235+
LinkingRule.UNMATCHED);
236236

237237
}
238238
return result;
@@ -255,20 +255,20 @@ public boolean updateGoldenRecordField(
255255
final var result = client.updateGoldenRecordField(goldenId, fieldName, newValue);
256256
if (result) {
257257
sendAuditEvent(interactionId, goldenId, String.format(Locale.ROOT,
258-
"%s: '%s' -> '%s'",
259-
fieldName,
260-
oldValue,
261-
newValue),
262-
-1.0F,
263-
LinkingRule.UNMATCHED);
258+
"%s: '%s' -> '%s'",
259+
fieldName,
260+
oldValue,
261+
newValue),
262+
-1.0F,
263+
LinkingRule.UNMATCHED);
264264
} else {
265265
sendAuditEvent(interactionId, goldenId, String.format(Locale.ROOT,
266-
"%s: error updating '%s' -> '%s'",
267-
fieldName,
268-
oldValue,
269-
newValue),
270-
-1.0F,
271-
LinkingRule.UNMATCHED);
266+
"%s: error updating '%s' -> '%s'",
267+
fieldName,
268+
oldValue,
269+
newValue),
270+
-1.0F,
271+
LinkingRule.UNMATCHED);
272272
}
273273
return result;
274274
}
@@ -288,12 +288,12 @@ public Either<MpiGeneralError, LinkInfo> linkToNewGoldenRecord(
288288
score), score, LinkingRule.UNMATCHED);
289289
} else {
290290
sendAuditEvent(interactionId,
291-
currentGoldenId,
292-
String.format(Locale.ROOT,
293-
"Interaction -> update GoldenID error: old(%s) [%f]",
294-
currentGoldenId, score),
295-
score,
296-
LinkingRule.UNMATCHED);
291+
currentGoldenId,
292+
String.format(Locale.ROOT,
293+
"Interaction -> update GoldenID error: old(%s) [%f]",
294+
currentGoldenId, score),
295+
score,
296+
LinkingRule.UNMATCHED);
297297
}
298298
return result;
299299
}
@@ -357,16 +357,16 @@ public LinkInfo createInteractionAndLinkToClonedGoldenRecord(
357357
final var result = client.createInteractionAndLinkToClonedGoldenRecord(interaction, score);
358358
if (result != null) {
359359
sendAuditEvent(result.interactionUID(),
360-
result.goldenUID(),
361-
String.format(Locale.ROOT,
362-
"Interaction -> New GoldenRecord (%f)", score),
363-
score, LinkingRule.UNMATCHED);
360+
result.goldenUID(),
361+
String.format(Locale.ROOT,
362+
"Interaction -> New GoldenRecord (%f)", score),
363+
score, LinkingRule.UNMATCHED);
364364
} else {
365365
sendAuditEvent(interaction.interactionId(),
366-
null,
367-
String.format(Locale.ROOT,
368-
"Interaction -> error linking to new GoldenRecord (%f)", score),
369-
score, LinkingRule.UNMATCHED);
366+
null,
367+
String.format(Locale.ROOT,
368+
"Interaction -> error linking to new GoldenRecord (%f)", score),
369+
score, LinkingRule.UNMATCHED);
370370
}
371371
return result;
372372
}

JeMPI_Apps/JeMPI_LibMPI/src/main/java/org/jembi/jempi/libmpi/LibMPIClientInterface.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,7 @@ public interface LibMPIClientInterface {
4444

4545
List<ExpandedInteraction> findExpandedInteractions(List<String> interactionIDs);
4646

47-
GoldenRecord findGoldenRecord(String goldenId);
48-
49-
List<GoldenRecord> findGoldenRecords(List<String> goldenIds);
47+
Either<MpiGeneralError, List<GoldenRecord>> findGoldenRecords(List<String> goldenIds);
5048

5149
List<ExpandedGoldenRecord> findExpandedGoldenRecords(List<String> goldenIds);
5250

@@ -98,7 +96,7 @@ PaginatedGIDsWithInteractionCount filterGidsWithInteractionCount(
9896
LocalDateTime createdAt,
9997
PaginationOptions paginationOptions);
10098

101-
Either<List<GoldenRecord>, MpiGeneralError> findGoldenRecords(ApiModels.ApiCrFindRequest request);
99+
Either<MpiGeneralError, List<GoldenRecord>> apiCrFindGoldenRecords(ApiModels.ApiCrFindRequest request);
102100

103101
/*
104102
* *****************************************************************************

JeMPI_Apps/JeMPI_LibMPI/src/main/java/org/jembi/jempi/libmpi/MpiServiceError.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,15 @@ record CRClientExistsError(
4040
CustomDemographicData request) implements MpiServiceError {
4141
}
4242

43+
record CRGidDoesNotExistError(
44+
String gid) implements MpiServiceError {
45+
}
46+
47+
record CRLinkUpdateError(
48+
CustomDemographicData interaction) implements MpiServiceError {
49+
}
50+
51+
4352
record InvalidFunctionError(
4453
String error
4554
) implements MpiServiceError {

0 commit comments

Comments
 (0)