diff --git a/NEWS.md b/NEWS.md index a64ebfa8b..2a35a64fe 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,4 +1,4 @@ -## 21.1.0-SNAPSHOT 2024-xx-xx +## 21.1.0-SNAPSHOT 2025-xx-xx * Provide consistent handling with concurrency two or more Marc Bib Update events for the same bib record [MODINV-1100](https://folio-org.atlassian.net/browse/MODINV-1100) * Enable system user for data-import processes [MODINV-1115](https://folio-org.atlassian.net/browse/MODINV-1115) * Missing x-okapi-user-id header in communications with inventory-storage [MODINV-1134](https://folio-org.atlassian.net/browse/MODINV-1134) @@ -7,6 +7,7 @@ * Update the snapshot status from PROCESSING_FINISHED to COMMITTED in the InstanceIngressEventHandler [MODINV-1161](https://folio-org.atlassian.net/browse/MODINV-1161) * Add "deleted" field to Instance schema [MODINVSTOR-1342](https://folio-org.atlassian.net/browse/MODINVSTOR-1342) * Adjust /mark-deleted endpoint behavior to set the "deleted" flag [MODINV-1138](https://folio-org.atlassian.net/browse/MODINV-1138) +* Implement marc bib submatch [MODINV-1114](https://issues.folio.org/browse/MODINV-1114) ## 21.0.0 2024-10-29 * Existing "035" field is not retained the original position in imported record [MODINV-1049](https://folio-org.atlassian.net/browse/MODINV-1049) diff --git a/src/main/java/org/folio/inventory/dataimport/handlers/matching/AbstractMarcMatchEventHandler.java b/src/main/java/org/folio/inventory/dataimport/handlers/matching/AbstractMarcMatchEventHandler.java index f9ee4401f..79cf2050d 100644 --- a/src/main/java/org/folio/inventory/dataimport/handlers/matching/AbstractMarcMatchEventHandler.java +++ b/src/main/java/org/folio/inventory/dataimport/handlers/matching/AbstractMarcMatchEventHandler.java @@ -3,6 +3,7 @@ import io.vertx.core.Future; import io.vertx.core.http.HttpClient; import io.vertx.core.json.Json; +import io.vertx.core.json.JsonArray; import io.vertx.core.json.JsonObject; import org.apache.commons.collections.MapUtils; import org.apache.commons.lang3.StringUtils; @@ -30,6 +31,7 @@ import org.folio.rest.jaxrs.model.Filter; import org.folio.rest.jaxrs.model.ProfileSnapshotWrapper; import org.folio.rest.jaxrs.model.Qualifier; +import org.folio.rest.jaxrs.model.ReactToType; import org.folio.rest.jaxrs.model.Record; import org.folio.rest.jaxrs.model.RecordIdentifiersDto; import org.folio.rest.jaxrs.model.RecordMatchingDto; @@ -45,9 +47,11 @@ import java.util.stream.Stream; import static java.lang.String.format; +import static java.util.Objects.nonNull; +import static org.apache.commons.collections.CollectionUtils.isNotEmpty; import static org.apache.http.HttpStatus.SC_OK; -import static org.folio.inventory.dataimport.handlers.matching.util.EventHandlingUtil.PAYLOAD_USER_ID; import static org.folio.processing.value.Value.ValueType.MISSING; +import static org.folio.rest.jaxrs.model.Filter.ComparisonPartType; import static org.folio.rest.jaxrs.model.MatchExpression.DataValueType.VALUE_FROM_RECORD; import static org.folio.rest.jaxrs.model.ProfileType.MATCH_PROFILE; @@ -65,6 +69,10 @@ public abstract class AbstractMarcMatchEventHandler implements EventHandler { private static final String USER_ID_HEADER = "userId"; private static final int EXPECTED_MATCH_EXPRESSION_FIELDS_NUMBER = 4; private static final String DEFAULT_RECORDS_IDENTIFIERS_LIMIT = "5000"; + private static final String FIELD_999 = "999"; + private static final String INDICATOR_F = "f"; + private static final String SUBFIELD_I = "i"; + private static final String SUBFIELD_S = "s"; protected final ConsortiumService consortiumService; private final DataImportEventTypes matchedEventType; @@ -95,9 +103,8 @@ public CompletableFuture handle(DataImportEventPayload p LOG.warn("handle:: {}", PAYLOAD_HAS_NO_DATA_MESSAGE); return CompletableFuture.failedFuture(new EventProcessingException(PAYLOAD_HAS_NO_DATA_MESSAGE)); } - String userId = context.get(USER_ID_HEADER); payload.getEventsChain().add(payload.getEventType()); - payload.setAdditionalProperty(USER_ID_HEADER, userId); + payload.setAdditionalProperty(USER_ID_HEADER, context.get(USER_ID_HEADER)); String recordAsString = context.get(getMarcType()); MatchDetail matchDetail = retrieveMatchDetail(payload); @@ -114,16 +121,11 @@ public CompletableFuture handle(DataImportEventPayload p return CompletableFuture.completedFuture(payload); } - RecordMatchingDto recordMatchingDto = buildRecordsMatchingRequest(matchDetail, value); - return retrieveMarcRecords(recordMatchingDto, payload.getTenant(), userId, payload) - .compose(localMatchedRecords -> { - if (isMatchingOnCentralTenantRequired()) { - return matchCentralTenantIfNeededAndCombineWithLocalMatchedRecords(recordMatchingDto, payload, localMatchedRecords); - } - return Future.succeededFuture(localMatchedRecords.stream().toList()); - }) - .compose(recordList -> ensureRelatedEntities(recordList, payload).map(recordList)) - .compose(recordList -> processSucceededResult(recordList, payload)) + RecordMatchingDto recordMatchingDto = buildRecordsMatchingRequest(matchDetail, value, payload); + return createRecordsMatchingContext(payload) + .compose(recordsMatchingContext -> matchRecords(recordMatchingDto, recordsMatchingContext, payload)) + .compose(recordOptional -> ensureRelatedEntities(recordOptional, payload).map(recordOptional)) + .compose(recordOptional -> processSucceededResult(recordOptional, payload)) .onFailure(e -> LOG.warn("handle:: Failed to process event for MARC record matching, jobExecutionId: '{}'", payload.getJobExecutionId(), e)) .toCompletionStage().toCompletableFuture(); } catch (Exception e) { @@ -141,8 +143,10 @@ public CompletableFuture handle(DataImportEventPayload p protected abstract String getMultiMatchResultKey(); + protected abstract boolean canSubMatchProfileProcessMultiMatchResult(MatchProfile matchProfile); + @SuppressWarnings("squid:S1172") - protected Future ensureRelatedEntities(List records, DataImportEventPayload eventPayload) { + protected Future ensureRelatedEntities(Optional recordOptional, DataImportEventPayload eventPayload) { return Future.succeededFuture(); } @@ -184,7 +188,7 @@ private boolean isValidMatchDetail(MatchDetail matchDetail) { return false; } - private RecordMatchingDto buildRecordsMatchingRequest(MatchDetail matchDetail, Value value) { + private RecordMatchingDto buildRecordsMatchingRequest(MatchDetail matchDetail, Value value, DataImportEventPayload payload) { List matchDetailFields = matchDetail.getExistingMatchExpression().getFields(); String field = matchDetailFields.get(0).getValue(); String ind1 = matchDetailFields.get(1).getValue(); @@ -199,50 +203,125 @@ private RecordMatchingDto buildRecordsMatchingRequest(MatchDetail matchDetail, V Qualifier qualifier = matchDetail.getExistingMatchExpression().getQualifier(); Filter.Qualifier qualifierFilterType = null; - Filter.ComparisonPartType comparisonPartType = null; + ComparisonPartType comparisonPartType = null; String qualifierValue = null; if (qualifier != null) { qualifierValue = qualifier.getQualifierValue(); - qualifierFilterType = - qualifier.getQualifierType() != null ? - Filter.Qualifier.valueOf(qualifier.getQualifierType().toString()) : null; - comparisonPartType = - qualifier.getComparisonPart() != null ? - Filter.ComparisonPartType.valueOf(qualifier.getComparisonPart().toString()) : null; + qualifierFilterType = qualifier.getQualifierType() != null + ? Filter.Qualifier.valueOf(qualifier.getQualifierType().toString()) : null; + comparisonPartType = qualifier.getComparisonPart() != null + ? ComparisonPartType.valueOf(qualifier.getComparisonPart().toString()) : null; } - return new RecordMatchingDto() + Filter matchCriteriaFilter = new Filter() + .withValues(values) + .withField(field) + .withIndicator1(ind1) + .withIndicator2(ind2) + .withSubfield(subfield) + .withQualifier(qualifierFilterType) + .withQualifierValue(qualifierValue) + .withComparisonPartType(comparisonPartType); + + RecordMatchingDto recordMatchingDto = new RecordMatchingDto() .withRecordType(getMatchedRecordType()) - .withFilters(List.of(new Filter() - .withValues(values) - .withField(field) - .withIndicator1(ind1) - .withIndicator2(ind2) - .withSubfield(subfield) - .withQualifier(qualifierFilterType) - .withQualifierValue(qualifierValue) - .withComparisonPartType(comparisonPartType))) .withReturnTotalRecordsCount(true); + recordMatchingDto.getFilters().add(matchCriteriaFilter); + + buildFilterBasedOnPreviousMatchResult(payload).ifPresent(filter -> { + recordMatchingDto.setLogicalOperator(RecordMatchingDto.LogicalOperator.AND); + recordMatchingDto.getFilters().add(filter); + }); + + return recordMatchingDto; } - private Future> retrieveMarcRecords(RecordMatchingDto recordMatchingDto, String tenantId, String userId, - DataImportEventPayload payload) { - SourceStorageRecordsClient sourceStorageRecordsClient = - new SourceStorageRecordsClientWrapper(payload.getOkapiUrl(), tenantId, payload.getToken(), userId, httpClient); + private Optional buildFilterBasedOnPreviousMatchResult(DataImportEventPayload payload) { + if (containsMultiMatchResult(payload)) { + return Optional.of(buildFilterForMultiMatchResult(payload)); + } else if (StringUtils.isNotEmpty(payload.getContext().get(getMatchedMarcKey()))) { + return Optional.of(buildFilterBasedOnPreviouslyMatchedRecord(payload)); + } + return Optional.empty(); + } - return getAllMatchedRecordsIdentifiers(recordMatchingDto, payload, sourceStorageRecordsClient) - .compose(recordsIdentifiersCollection -> { - if (recordsIdentifiersCollection.getIdentifiers().size() > 1) { - populatePayloadWithExternalIdentifiers(recordsIdentifiersCollection, payload); - return Future.succeededFuture(Optional.empty()); - } else if (recordsIdentifiersCollection.getIdentifiers().size() == 1) { - return getRecordById(recordsIdentifiersCollection.getIdentifiers().get(0), sourceStorageRecordsClient, payload); - } - LOG.info("retrieveMarcRecords:: {}, jobExecutionId: '{}', tenantId: '{}'", - RECORDS_NOT_FOUND_MESSAGE, payload.getJobExecutionId(), tenantId); - return Future.succeededFuture(Optional.empty()); + private Filter buildFilterForMultiMatchResult(DataImportEventPayload payload) { + List ids = new JsonArray(payload.getContext().get(getMultiMatchResultKey())) + .stream() + .map(String.class::cast) + .toList(); + + payload.getContext().remove(getMultiMatchResultKey()); + return buildFilter(ids, FIELD_999, INDICATOR_F, INDICATOR_F, SUBFIELD_I); + } + + private Filter buildFilterBasedOnPreviouslyMatchedRecord(DataImportEventPayload payload) { + Record previouslyMatchedRecord = Json.decodeValue(payload.getContext().get(getMatchedMarcKey()), Record.class); + String matchedId = previouslyMatchedRecord.getMatchedId(); + return buildFilter(List.of(matchedId), FIELD_999, INDICATOR_F, INDICATOR_F, SUBFIELD_S); + } + + private Filter buildFilter(List values, String field, String ind1, String ind2, String subfield) { + return new Filter() + .withValues(values) + .withField(field) + .withIndicator1(ind1) + .withIndicator2(ind2) + .withSubfield(subfield); + } + + private Future createRecordsMatchingContext(DataImportEventPayload payload) { + String userId = payload.getContext().get(USER_ID_HEADER); + RecordsMatchingContext recordsMatchingContext = new RecordsMatchingContext(); + recordsMatchingContext.setLocalTenantRecordsClient(new SourceStorageRecordsClientWrapper( + payload.getOkapiUrl(), payload.getTenant(), payload.getToken(), userId, httpClient)); + + if (!isMatchingOnCentralTenantRequired()) { + return Future.succeededFuture(recordsMatchingContext); + } + + Context context = EventHandlingUtil.constructContext(payload.getTenant(), payload.getToken(), + payload.getOkapiUrl(), userId); + + return consortiumService.getConsortiumConfiguration(context).map(consortiumConfigurationOptional -> { + consortiumConfigurationOptional.ifPresent(consortiumConfiguration -> { + recordsMatchingContext.setCentralTenantId(consortiumConfiguration.getCentralTenantId()); + recordsMatchingContext.setCentralTenantRecordsClient(new SourceStorageRecordsClientWrapper( + payload.getOkapiUrl(), consortiumConfiguration.getCentralTenantId(), payload.getToken(), userId, httpClient)); }); + return recordsMatchingContext; + }); + } + + private Future> matchRecords(RecordMatchingDto recordMatchingDto, + RecordsMatchingContext recordsMatchingContext, + DataImportEventPayload payload) { + SourceStorageRecordsClient localTenantStorageRecordsClient = recordsMatchingContext.getLocalTenantRecordsClient(); + + Future localMatchingFuture = + getAllMatchedRecordsIdentifiers(recordMatchingDto, payload, localTenantStorageRecordsClient); + Future centralMatchingFuture = + getMatchedRecordsIdentifiersOnCentralTenantIfNeeded(recordMatchingDto, recordsMatchingContext, payload); + + return Future.all(localMatchingFuture, centralMatchingFuture).compose(v -> { + RecordsIdentifiersCollection localMatchingRes = localMatchingFuture.result(); + RecordsIdentifiersCollection centralMatchingRes = centralMatchingFuture.result(); + + if (localMatchingRes.getTotalRecords() == 1 && centralMatchingRes.getIdentifiers().isEmpty()) { + return getRecordById(localMatchingRes.getIdentifiers().get(0), localTenantStorageRecordsClient, payload).map(Optional::of); + } else if (localMatchingRes.getIdentifiers().isEmpty() && centralMatchingRes.getTotalRecords() == 1) { + payload.getContext().put(CENTRAL_TENANT_ID_KEY, recordsMatchingContext.getCentralTenantId()); + return getRecordById(centralMatchingRes.getIdentifiers().get(0), recordsMatchingContext.getCentralTenantRecordsClient(), payload).map(Optional::of); + } else if (localMatchingRes.getIdentifiers().isEmpty() && centralMatchingRes.getIdentifiers().isEmpty()) { + LOG.info("matchRecords:: {}, jobExecutionId: '{}', tenantId: '{}'", + RECORDS_NOT_FOUND_MESSAGE, payload.getJobExecutionId(), payload.getTenant()); + return Future.succeededFuture(Optional.empty()); + } else { + populatePayloadWithExternalIdentifiers(localMatchingRes, centralMatchingRes, payload); + return Future.succeededFuture(Optional.empty()); + } + }); } private Future getAllMatchedRecordsIdentifiers(RecordMatchingDto recordMatchingDto, @@ -289,13 +368,13 @@ private Future getMatchedRecordsIdentifiers(Record }); } - private Future> getRecordById(RecordIdentifiersDto recordIdentifiersDto, SourceStorageRecordsClient sourceStorageRecordsClient, - DataImportEventPayload payload) { + private Future getRecordById(RecordIdentifiersDto recordIdentifiersDto, SourceStorageRecordsClient sourceStorageRecordsClient, + DataImportEventPayload payload) { String recordId = recordIdentifiersDto.getRecordId(); return sourceStorageRecordsClient.getSourceStorageRecordsById(recordId) .compose(response -> { if (response.statusCode() == SC_OK) { - return Future.succeededFuture(Optional.of(response.bodyAsJson(Record.class))); + return Future.succeededFuture(response.bodyAsJson(Record.class)); } String msg = format("Failed to retrieve record by id: '%s', responseStatus: '%s', body: '%s', jobExecutionId: '%s', tenant: '%s'", recordId, response.statusCode(), response.bodyAsString(), payload.getJobExecutionId(), payload.getTenant()); @@ -310,37 +389,32 @@ private Future> getRecordById(RecordIdentifiersDto recordIdenti * These external identifiers represent multiple match result returned by this handler * and can be used and deleted during further matching processing by other match handlers. * - * @param recordsIdentifiersCollection identifiers collection to extract external identifiers - * @param payload event payload to populate + * @param localRecordsIdentifiersCollection local tenant identifiers collection to extract external identifiers + * @param centralRecordsIdentifiersCollection central tenant identifiers collection to extract external identifiers + * @param payload event payload to populate */ - private void populatePayloadWithExternalIdentifiers(RecordsIdentifiersCollection recordsIdentifiersCollection, + private void populatePayloadWithExternalIdentifiers(RecordsIdentifiersCollection localRecordsIdentifiersCollection, + RecordsIdentifiersCollection centralRecordsIdentifiersCollection, DataImportEventPayload payload) { - List externalEntityIds = recordsIdentifiersCollection.getIdentifiers() - .stream() + List externalEntityIds = Stream.concat( + localRecordsIdentifiersCollection.getIdentifiers().stream(), + centralRecordsIdentifiersCollection.getIdentifiers().stream()) .map(RecordIdentifiersDto::getExternalId) .toList(); payload.getContext().put(getMultiMatchResultKey(), Json.encode(externalEntityIds)); } - private Future> matchCentralTenantIfNeededAndCombineWithLocalMatchedRecords(RecordMatchingDto recordMatchingDto, DataImportEventPayload payload, - Optional localMatchedRecord) { - Context context = EventHandlingUtil.constructContext(payload.getTenant(), payload.getToken(), payload.getOkapiUrl(), - payload.getContext().get(PAYLOAD_USER_ID)); - return consortiumService.getConsortiumConfiguration(context) - .compose(consortiumConfigurationOptional -> { - if (consortiumConfigurationOptional.isPresent() && !consortiumConfigurationOptional.get().getCentralTenantId().equals(payload.getTenant())) { - LOG.debug("matchCentralTenantIfNeededAndCombineWithLocalMatchedRecords:: Matching on centralTenant with id: {}", - consortiumConfigurationOptional.get().getCentralTenantId()); - - return retrieveMarcRecords(recordMatchingDto, consortiumConfigurationOptional.get().getCentralTenantId(), context.getUserId(), payload) - .map(centralRecordOptional -> { - centralRecordOptional.ifPresent(r -> payload.getContext().put(CENTRAL_TENANT_ID_KEY, consortiumConfigurationOptional.get().getCentralTenantId())); - return Stream.concat(localMatchedRecord.stream(), centralRecordOptional.stream()).toList(); - }); - } - return Future.succeededFuture(localMatchedRecord.stream().toList()); - }); + private Future getMatchedRecordsIdentifiersOnCentralTenantIfNeeded(RecordMatchingDto recordMatchingDto, + RecordsMatchingContext recordsMatchingContext, + DataImportEventPayload payload) { + if (isMatchingOnCentralTenantRequired() && nonNull(recordsMatchingContext.getCentralTenantId()) + && !recordsMatchingContext.getCentralTenantId().equals(payload.getTenant())) { + LOG.debug("getMatchedRecordsIdentifiersOnCentralTenantIfNeeded:: Matching on centralTenant with id: {}", + recordsMatchingContext.getCentralTenantId()); + return getAllMatchedRecordsIdentifiers(recordMatchingDto, payload, recordsMatchingContext.getCentralTenantRecordsClient()); + } + return Future.succeededFuture(new RecordsIdentifiersCollection()); } @Override @@ -362,28 +436,20 @@ private boolean isEligibleMatchProfile(MatchProfile matchProfile) { * Prepares {@link DataImportEventPayload} for the further processing * based on the number of specified records in {@code records} list * - * @param records records retrieved during matching processing + * @param recordOptional matched record retrieved during matching processing * @param payload event payload to prepare * @return Future of {@link DataImportEventPayload} with result of matching */ - private Future processSucceededResult(List records, DataImportEventPayload payload) { - if (records.size() == 1) { + private Future processSucceededResult(Optional recordOptional, DataImportEventPayload payload) { + if (recordOptional.isPresent()) { payload.setEventType(matchedEventType.toString()); - payload.getContext().put(getMatchedMarcKey(), Json.encode(records.get(0))); + payload.getContext().put(getMatchedMarcKey(), Json.encode(recordOptional.get())); LOG.debug("processSucceededResult:: Matched 1 record for jobExecutionId: '{}', tenantId: '{}'", payload.getJobExecutionId(), payload.getTenant()); return Future.succeededFuture(payload); } - if (records.size() > 1) { - LOG.warn("processSucceededResult:: Matched multiple records, jobExecutionId: '{}', tenantId: '{}'", - payload.getJobExecutionId(), payload.getTenant()); - return Future.failedFuture(new MatchingException(FOUND_MULTIPLE_RECORDS_ERROR_MESSAGE)); - } - if (payload.getContext().containsKey(getMultiMatchResultKey())) { - LOG.info("processSucceededResult:: Multiple records were found which match criteria, jobExecutionId: '{}', tenantId: '{}'", - payload.getJobExecutionId(), payload.getTenant()); - payload.setEventType(matchedEventType.toString()); - return Future.succeededFuture(payload); + if (containsMultiMatchResult(payload)) { + return handlePayloadWithMultiMatchResult(payload); } LOG.info("processSucceededResult:: {}, jobExecutionId: '{}', tenantId: '{}'", RECORDS_NOT_FOUND_MESSAGE, payload.getJobExecutionId(), payload.getTenant()); @@ -394,4 +460,63 @@ private String getMatchedMarcKey() { return format(MATCH_RESULT_KEY_PREFIX, getMarcType()); } + private boolean containsMultiMatchResult(DataImportEventPayload payload) { + return payload.getContext().containsKey(getMultiMatchResultKey()); + } + + private Future handlePayloadWithMultiMatchResult(DataImportEventPayload payload) { + if (canNextProfileProcessMultiMatchResult(payload)) { + LOG.info("handlePayloadWithMultiMatchResult:: Multiple records were found which match criteria, jobExecutionId: '{}', tenantId: '{}'", + payload.getJobExecutionId(), payload.getTenant()); + payload.setEventType(matchedEventType.toString()); + return Future.succeededFuture(payload); + } + + LOG.warn("handlePayloadWithMultiMatchResult:: Matched multiple records, jobExecutionId: '{}', tenantId: '{}'", + payload.getJobExecutionId(), payload.getTenant()); + return Future.failedFuture(new MatchingException(FOUND_MULTIPLE_RECORDS_ERROR_MESSAGE)); + } + + private boolean canNextProfileProcessMultiMatchResult(DataImportEventPayload eventPayload) { + List childProfiles = eventPayload.getCurrentNode().getChildSnapshotWrappers(); + if (isNotEmpty(childProfiles) && ReactToType.MATCH.equals(childProfiles.get(0).getReactTo()) + && MATCH_PROFILE.equals(childProfiles.get(0).getContentType())) { + MatchProfile nextMatchProfile = JsonObject.mapFrom(childProfiles.get(0).getContent()).mapTo(MatchProfile.class); + return canSubMatchProfileProcessMultiMatchResult(nextMatchProfile); + } + return false; + } + + private static class RecordsMatchingContext { + + private SourceStorageRecordsClient localTenantRecordsClient; + private SourceStorageRecordsClient centralTenantRecordsClient; + private String centralTenantId; + + public SourceStorageRecordsClient getLocalTenantRecordsClient() { + return localTenantRecordsClient; + } + + public SourceStorageRecordsClient getCentralTenantRecordsClient() { + return centralTenantRecordsClient; + } + + public String getCentralTenantId() { + return centralTenantId; + } + + public void setLocalTenantRecordsClient(SourceStorageRecordsClient sourceStorageRecordsClient) { + this.localTenantRecordsClient = sourceStorageRecordsClient; + } + + public void setCentralTenantRecordsClient(SourceStorageRecordsClient sourceStorageRecordsClient) { + this.centralTenantRecordsClient = sourceStorageRecordsClient; + } + + public void setCentralTenantId(String centralTenantId) { + this.centralTenantId = centralTenantId; + } + + } + } diff --git a/src/main/java/org/folio/inventory/dataimport/handlers/matching/MarcBibliographicMatchEventHandler.java b/src/main/java/org/folio/inventory/dataimport/handlers/matching/MarcBibliographicMatchEventHandler.java index 07e8d4664..ec9a1f49c 100644 --- a/src/main/java/org/folio/inventory/dataimport/handlers/matching/MarcBibliographicMatchEventHandler.java +++ b/src/main/java/org/folio/inventory/dataimport/handlers/matching/MarcBibliographicMatchEventHandler.java @@ -6,6 +6,7 @@ import io.vertx.core.json.Json; import org.folio.DataImportEventPayload; import org.folio.HoldingsRecord; +import org.folio.MatchProfile; import org.folio.inventory.common.Context; import org.folio.inventory.common.api.request.PagingParameters; import org.folio.inventory.consortium.services.ConsortiumService; @@ -15,11 +16,13 @@ import org.folio.inventory.domain.HoldingsRecordCollection; import org.folio.inventory.domain.instances.InstanceCollection; import org.folio.inventory.storage.Storage; +import org.folio.rest.jaxrs.model.EntityType; import org.folio.rest.jaxrs.model.Record; import org.folio.rest.jaxrs.model.RecordMatchingDto; import java.io.UnsupportedEncodingException; import java.util.List; +import java.util.Optional; import static java.lang.String.format; import static org.apache.commons.lang3.StringUtils.isBlank; @@ -63,9 +66,16 @@ protected String getMultiMatchResultKey() { } @Override - protected Future ensureRelatedEntities(List records, DataImportEventPayload eventPayload) { - if (records.size() == 1) { - Record matchedRecord = records.get(0); + protected boolean canSubMatchProfileProcessMultiMatchResult(MatchProfile matchProfile) { + return matchProfile.getExistingRecordType() == EntityType.MARC_BIBLIOGRAPHIC + || matchProfile.getExistingRecordType() == EntityType.INSTANCE + || matchProfile.getExistingRecordType() == EntityType.HOLDINGS; + } + + @Override + protected Future ensureRelatedEntities(Optional recordOptional, DataImportEventPayload eventPayload) { + if (recordOptional.isPresent()) { + Record matchedRecord = recordOptional.get(); String instanceId = ParsedRecordUtil.getAdditionalSubfieldValue(matchedRecord.getParsedRecord(), AdditionalSubfields.I); String matchedRecordTenantId = getTenant(eventPayload); Context context = EventHandlingUtil.constructContext(matchedRecordTenantId, eventPayload.getToken(), eventPayload.getOkapiUrl(), diff --git a/src/test/java/org/folio/inventory/dataimport/handlers/actions/CreateInstanceEventHandlerTest.java b/src/test/java/org/folio/inventory/dataimport/handlers/actions/CreateInstanceEventHandlerTest.java index 85d8bd7d9..fc3c24e53 100644 --- a/src/test/java/org/folio/inventory/dataimport/handlers/actions/CreateInstanceEventHandlerTest.java +++ b/src/test/java/org/folio/inventory/dataimport/handlers/actions/CreateInstanceEventHandlerTest.java @@ -37,6 +37,7 @@ import org.folio.inventory.storage.Storage; import org.folio.inventory.support.http.client.OkapiHttpClient; import org.folio.inventory.support.http.client.Response; +import org.folio.kafka.exception.DuplicateEventException; import org.folio.processing.mapping.MappingManager; import org.folio.processing.mapping.defaultmapper.processor.parameters.MappingParameters; import org.folio.processing.mapping.mapper.reader.Reader; @@ -53,6 +54,7 @@ import org.folio.rest.jaxrs.model.ParsedRecord; import org.folio.rest.jaxrs.model.ProfileSnapshotWrapper; import org.folio.rest.jaxrs.model.Record; +import org.hamcrest.junit.internal.ThrowableCauseMatcher; import org.junit.Assert; import org.junit.Before; import org.junit.Rule; @@ -79,6 +81,7 @@ import java.util.function.Consumer; import static com.github.tomakehurst.wiremock.client.WireMock.get; +import static java.lang.String.format; import static java.util.concurrent.CompletableFuture.completedStage; import static org.apache.http.HttpStatus.SC_CREATED; import static org.folio.ActionProfile.FolioRecord.INSTANCE; @@ -90,16 +93,18 @@ import static org.folio.inventory.dataimport.handlers.matching.util.EventHandlingUtil.PAYLOAD_USER_ID; import static org.folio.inventory.dataimport.util.AdditionalFieldsUtil.TAG_005; import static org.folio.inventory.dataimport.util.AdditionalFieldsUtil.dateTime005Formatter; -import static org.folio.inventory.dataimport.util.DataImportConstants.UNIQUE_ID_ERROR_MESSAGE; +import static org.folio.inventory.dataimport.util.DataImportConstants.ALREADY_EXISTS_ERROR_MSG; import static org.folio.rest.jaxrs.model.ProfileType.ACTION_PROFILE; import static org.folio.rest.jaxrs.model.ProfileType.JOB_PROFILE; import static org.folio.rest.jaxrs.model.ProfileType.MAPPING_PROFILE; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.argThat; @@ -945,9 +950,8 @@ public void shouldNotProcessEventWhenRecordToInstanceFutureFails() throws Execut future.get(5, TimeUnit.SECONDS); } - - @Test(expected = Exception.class) - public void shouldNotProcessEventEvenIfDuplicatedInventoryStorageErrorExists() throws InterruptedException, ExecutionException, TimeoutException { + @Test() + public void shouldNotProcessEventEvenIfDuplicatedInventoryStorageErrorExists() { Reader fakeReader = Mockito.mock(Reader.class); String instanceTypeId = "fe19bae4-da28-472b-be90-d442e2428ead"; @@ -965,7 +969,7 @@ public void shouldNotProcessEventEvenIfDuplicatedInventoryStorageErrorExists() t when(instanceIdStorageService.store(any(), any(), any())).thenReturn(Future.succeededFuture(recordToInstance)); doAnswer(invocationOnMock -> { Consumer failureHandler = invocationOnMock.getArgument(2); - failureHandler.accept(new Failure(UNIQUE_ID_ERROR_MESSAGE, 400)); + failureHandler.accept(new Failure(format(ALREADY_EXISTS_ERROR_MSG, instanceId), 400)); return null; }).when(instanceRecordCollection).add(any(), any(), any()); @@ -989,7 +993,8 @@ public void shouldNotProcessEventEvenIfDuplicatedInventoryStorageErrorExists() t CompletableFuture future = createInstanceEventHandler.handle(dataImportEventPayload); - future.get(5, TimeUnit.SECONDS); + ExecutionException actualException = assertThrows(ExecutionException.class, () -> future.get(5, TimeUnit.SECONDS)); + assertThat(actualException, ThrowableCauseMatcher.hasCause(instanceOf(DuplicateEventException.class))); } @Test(expected = Exception.class) diff --git a/src/test/java/org/folio/inventory/dataimport/handlers/matching/MarcBibliographicMatchEventHandlerTest.java b/src/test/java/org/folio/inventory/dataimport/handlers/matching/MarcBibliographicMatchEventHandlerTest.java index 08fcea17f..fe0d326b2 100644 --- a/src/test/java/org/folio/inventory/dataimport/handlers/matching/MarcBibliographicMatchEventHandlerTest.java +++ b/src/test/java/org/folio/inventory/dataimport/handlers/matching/MarcBibliographicMatchEventHandlerTest.java @@ -6,6 +6,7 @@ import com.github.tomakehurst.wiremock.junit.WireMockRule; import com.github.tomakehurst.wiremock.matching.RegexPattern; import com.github.tomakehurst.wiremock.matching.UrlPathPattern; +import com.github.tomakehurst.wiremock.verification.LoggedRequest; import io.vertx.core.Future; import io.vertx.core.Vertx; import io.vertx.core.json.Json; @@ -36,6 +37,7 @@ import org.folio.rest.jaxrs.model.MatchExpression; import org.folio.rest.jaxrs.model.ProfileSnapshotWrapper; import org.folio.rest.jaxrs.model.RecordIdentifiersDto; +import org.folio.rest.jaxrs.model.RecordMatchingDto; import org.folio.rest.jaxrs.model.RecordsIdentifiersCollection; import org.hamcrest.Matchers; import org.junit.After; @@ -53,6 +55,7 @@ import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; +import java.util.stream.Stream; import static com.github.tomakehurst.wiremock.client.WireMock.equalTo; import static com.github.tomakehurst.wiremock.client.WireMock.get; @@ -74,6 +77,7 @@ import static org.folio.rest.jaxrs.model.EntityType.MARC_BIBLIOGRAPHIC; import static org.folio.rest.jaxrs.model.MatchExpression.DataValueType.VALUE_FROM_RECORD; import static org.folio.rest.jaxrs.model.ProfileType.MATCH_PROFILE; +import static org.folio.rest.jaxrs.model.ReactToType.MATCH; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.junit.MatcherAssert.assertThat; import static org.junit.Assert.assertFalse; @@ -148,6 +152,7 @@ public class MarcBibliographicMatchEventHandlerTest { @Before public void setUp() throws UnsupportedEncodingException { WireMock.reset(); + System.clearProperty(RECORDS_IDENTIFIERS_FETCH_LIMIT_PARAM); this.closeable = MockitoAnnotations.openMocks(this); Instance existingInstance = Instance.fromJson(new JsonObject().put("id", UUID.randomUUID().toString())); @@ -229,7 +234,7 @@ public void shouldNotMatchMarcBibIfNoRecordsMatchingCriteria(TestContext context } @Test - public void shouldPopulatePayloadWithInstancesIdsOfMatchedRecordsIfMultipleRecordsMatchCriteriaOnTenant(TestContext context) { + public void shouldPopulatePayloadWithInstancesIdsOfMatchedRecordsIfMultipleRecordsMatchCriteriaAndNextProfileIsEligibleForMultiMatchResult(TestContext context) { Async async = context.async(); List instanceIds = List.of(UUID.randomUUID().toString(), UUID.randomUUID().toString()); List recordsIdentifiers = instanceIds.stream() @@ -246,7 +251,7 @@ public void shouldPopulatePayloadWithInstancesIdsOfMatchedRecordsIfMultipleRecor when(consortiumService.getConsortiumConfiguration(any(Context.class))) .thenReturn(Future.succeededFuture(Optional.empty())); - DataImportEventPayload eventPayload = createEventPayload(TENANT_ID); + DataImportEventPayload eventPayload = createEventPayloadWithSubmatchProfile(TENANT_ID); CompletableFuture future = matchMarcBibEventHandler.handle(eventPayload); @@ -284,7 +289,7 @@ public void shouldRequestRecordsIdentifiersMultipleTimesIfMultipleRecordsMatchCr .withTotalRecords(totalRecordsIdentifiers))))); assertThat(totalRecordsIdentifiers, Matchers.greaterThan(recordsIdentifiersLimit)); - DataImportEventPayload eventPayload = createEventPayload(TENANT_ID); + DataImportEventPayload eventPayload = createEventPayloadWithSubmatchProfile(TENANT_ID); MarcBibliographicMatchEventHandler eventHandler = new MarcBibliographicMatchEventHandler(consortiumService, vertx.createHttpClient(), mockedStorage); @@ -305,6 +310,35 @@ public void shouldRequestRecordsIdentifiersMultipleTimesIfMultipleRecordsMatchCr })); } + @Test + public void shouldReturnFailedFutureIfMultipleRecordsMatchCriteriaAndNextProfileNotEligibleForMultiMatchResult(TestContext context) { + Async async = context.async(); + List recordsIdentifiers = Stream.iterate(0, i -> i < 2, i -> ++i) + .map(v -> new RecordIdentifiersDto() + .withRecordId(UUID.randomUUID().toString()) + .withExternalId(UUID.randomUUID().toString())) + .toList(); + + WireMock.stubFor(post(RECORDS_MATCHING_PATH) + .willReturn(WireMock.ok().withBody(Json.encode(new RecordsIdentifiersCollection() + .withIdentifiers(recordsIdentifiers) + .withTotalRecords(recordsIdentifiers.size()))))); + + when(consortiumService.getConsortiumConfiguration(any(Context.class))) + .thenReturn(Future.succeededFuture(Optional.empty())); + + DataImportEventPayload eventPayload = createEventPayload(TENANT_ID); + context.assertTrue(eventPayload.getCurrentNode().getChildSnapshotWrappers().isEmpty()); + + CompletableFuture future = matchMarcBibEventHandler.handle(eventPayload); + + future.whenComplete((payload, throwable) -> { + context.assertNotNull(throwable); + context.assertEquals(DI_SRS_MARC_BIB_RECORD_NOT_MATCHED.value(), eventPayload.getEventType()); + async.complete(); + }); + } + @Test public void shouldNotMatchMarcBibIfIncomingRecordHasNoSpecifiedIncomingField(TestContext context) { Async async = context.async(); @@ -454,25 +488,42 @@ public void shouldSearchRecordAtCentralTenantOnlyOnceIfCurrentTenantIsCentralTen } @Test - public void shouldReturnFailedFutureIfMatchedRecordAtLocalTenantAndMatchedAtCentralTenant(TestContext context) { + public void shouldPopulatePayloadWithInstancesIdsOfMatchedRecordsIfRecordMatchesCriteriaInLocalAndCentralTenant(TestContext context) { Async async = context.async(); + String localTenantInstanceId = UUID.randomUUID().toString(); + String centralTenantInstanceId = UUID.randomUUID().toString(); + WireMock.stubFor(post(RECORDS_MATCHING_PATH) + .withHeader(XOkapiHeaders.TENANT.toLowerCase(), equalTo(TENANT_ID)) + .willReturn(WireMock.ok().withBody(Json.encode(new RecordsIdentifiersCollection() + .withIdentifiers(List.of( + new RecordIdentifiersDto() + .withRecordId(expectedMatchedRecord.getId()) + .withExternalId(localTenantInstanceId))) + .withTotalRecords(1))))); + + WireMock.stubFor(post(RECORDS_MATCHING_PATH) + .withHeader(XOkapiHeaders.TENANT.toLowerCase(), equalTo(CENTRAL_TENANT_ID)) .willReturn(WireMock.ok().withBody(Json.encode(new RecordsIdentifiersCollection() .withIdentifiers(List.of(new RecordIdentifiersDto() .withRecordId(expectedMatchedRecord.getId()) - .withExternalId(UUID.randomUUID().toString()))) + .withExternalId(centralTenantInstanceId))) .withTotalRecords(1))))); WireMock.stubFor(get(new UrlPathPattern(new RegexPattern(SOURCE_STORAGE_RECORDS_PATH_REGEX), true)) .willReturn(WireMock.ok().withBody(Json.encodePrettily(expectedMatchedRecord)))); - DataImportEventPayload eventPayload = createEventPayload(TENANT_ID); + DataImportEventPayload eventPayload = createEventPayloadWithSubmatchProfile(TENANT_ID); CompletableFuture future = matchMarcBibEventHandler.handle(eventPayload); - future.whenComplete((res, throwable) -> { - context.assertNotNull(throwable); - context.assertEquals(DI_SRS_MARC_BIB_RECORD_NOT_MATCHED.value(), eventPayload.getEventType()); + future.whenComplete((payload, throwable) -> { + context.assertNull(throwable); + context.assertEquals(DI_SRS_MARC_BIB_RECORD_MATCHED.value(), eventPayload.getEventType()); + context.assertNotNull(payload.getContext().get(INSTANCES_IDS_KEY)); + assertThat(new JsonArray(payload.getContext().get(INSTANCES_IDS_KEY)), + containsInAnyOrder(localTenantInstanceId, centralTenantInstanceId)); + context.assertNull(payload.getContext().get(MATCHED_MARC_BIB_KEY)); async.complete(); }); } @@ -557,6 +608,7 @@ public void shouldLoadOnlyInstanceFromCentralTenantIfMatchedRecordAtCentralTenan .withTotalRecords(1))))); WireMock.stubFor(get(new UrlPathPattern(new RegexPattern(SOURCE_STORAGE_RECORDS_PATH_REGEX), true)) + .withHeader(XOkapiHeaders.TENANT.toLowerCase(), equalTo(CENTRAL_TENANT_ID)) .willReturn(WireMock.ok().withBody(Json.encodePrettily(expectedMatchedRecord)))); DataImportEventPayload eventPayload = createEventPayload(TENANT_ID); @@ -791,6 +843,86 @@ public void shouldNotSetHoldingIfMultipleHoldingsWereFoundForMatchedRecord(TestC }); } + @Test + public void shouldProcessPayloadContainingInstanceIdsRepresentingMultiMatchResultAndMatchMarcBib(TestContext context) { + Async async = context.async(); + when(consortiumService.getConsortiumConfiguration(any(Context.class))) + .thenReturn(Future.succeededFuture(Optional.empty())); + + JsonArray instancesIds = JsonArray.of(UUID.randomUUID().toString(), UUID.randomUUID().toString()); + + WireMock.stubFor(post(RECORDS_MATCHING_PATH).willReturn(WireMock.ok() + .withBody(Json.encode(new RecordsIdentifiersCollection() + .withIdentifiers(List.of(new RecordIdentifiersDto() + .withRecordId(expectedMatchedRecord.getId()) + .withExternalId(UUID.randomUUID().toString()))) + .withTotalRecords(1))))); + + WireMock.stubFor(get(new UrlPathPattern(new RegexPattern(SOURCE_STORAGE_RECORDS_PATH_REGEX), true)) + .willReturn(WireMock.ok().withBody(Json.encodePrettily(expectedMatchedRecord)))); + + DataImportEventPayload eventPayload = createEventPayload(TENANT_ID); + eventPayload.getContext().put(INSTANCES_IDS_KEY, instancesIds.encode()); + + CompletableFuture future = matchMarcBibEventHandler.handle(eventPayload); + + future.whenComplete((payload, throwable) -> context.verify(v -> { + context.assertNull(throwable); + context.assertEquals(1, eventPayload.getEventsChain().size()); + context.assertEquals(DI_SRS_MARC_BIB_RECORD_MATCHED.value(), eventPayload.getEventType()); + context.assertNull(payload.getContext().get(INSTANCES_IDS_KEY)); + context.assertNotNull(payload.getContext().get(MATCHED_MARC_BIB_KEY)); + Record actualRecord = Json.decodeValue(payload.getContext().get(MATCHED_MARC_BIB_KEY), Record.class); + context.assertEquals(expectedMatchedRecord.getId(), actualRecord.getId()); + context.assertNotNull(payload.getContext().get(INSTANCE.value())); + WireMock.verify(1, postRequestedFor(urlEqualTo(RECORDS_MATCHING_PATH))); + List requests = WireMock.findAll(postRequestedFor(urlEqualTo(RECORDS_MATCHING_PATH))); + RecordMatchingDto matchingDto = Json.decodeValue(requests.get(0).getBodyAsString(), RecordMatchingDto.class); + context.assertEquals(2, matchingDto.getFilters().size()); + assertThat(matchingDto.getFilters().get(1).getValues(), containsInAnyOrder(instancesIds.getList().toArray())); + async.complete(); + })); + } + + @Test + public void shouldMatchRecordAndTakeIntoAccountPreviouslyMatchedRecord(TestContext context) { + Async async = context.async(); + Record previouslyMatchedRecord = new Record().withMatchedId(UUID.randomUUID().toString()); + when(consortiumService.getConsortiumConfiguration(any(Context.class))) + .thenReturn(Future.succeededFuture(Optional.empty())); + + WireMock.stubFor(post(RECORDS_MATCHING_PATH).willReturn(WireMock.ok() + .withBody(Json.encode(new RecordsIdentifiersCollection() + .withIdentifiers(List.of(new RecordIdentifiersDto() + .withRecordId(expectedMatchedRecord.getId()) + .withExternalId(UUID.randomUUID().toString()))) + .withTotalRecords(1))))); + + WireMock.stubFor(get(new UrlPathPattern(new RegexPattern(SOURCE_STORAGE_RECORDS_PATH_REGEX), true)) + .willReturn(WireMock.ok().withBody(Json.encodePrettily(expectedMatchedRecord)))); + + DataImportEventPayload eventPayload = createEventPayload(TENANT_ID); + eventPayload.getContext().put(MATCHED_MARC_BIB_KEY, Json.encode(previouslyMatchedRecord)); + + CompletableFuture future = matchMarcBibEventHandler.handle(eventPayload); + + future.whenComplete((payload, throwable) -> context.verify(v -> { + context.assertNull(throwable); + context.assertEquals(1, eventPayload.getEventsChain().size()); + context.assertEquals(DI_SRS_MARC_BIB_RECORD_MATCHED.value(), eventPayload.getEventType()); + context.assertNotNull(payload.getContext().get(MATCHED_MARC_BIB_KEY)); + Record actualRecord = Json.decodeValue(payload.getContext().get(MATCHED_MARC_BIB_KEY), Record.class); + context.assertEquals(expectedMatchedRecord.getId(), actualRecord.getId()); + context.assertNotNull(payload.getContext().get(INSTANCE.value())); + WireMock.verify(1, postRequestedFor(urlEqualTo(RECORDS_MATCHING_PATH))); + List requests = WireMock.findAll(postRequestedFor(urlEqualTo(RECORDS_MATCHING_PATH))); + RecordMatchingDto matchingRequest = Json.decodeValue(requests.get(0).getBodyAsString(), RecordMatchingDto.class); + context.assertEquals(2, matchingRequest.getFilters().size()); + context.assertTrue(matchingRequest.getFilters().get(1).getValues().contains(previouslyMatchedRecord.getMatchedId())); + async.complete(); + })); + } + @Test public void shouldReturnTrueIfHandlerIsEligibleForEventPayload() { DataImportEventPayload eventPayload = new DataImportEventPayload() @@ -817,6 +949,21 @@ public void shouldReturnFalseIfHandlerIsNotEligibleForPayload() { } private DataImportEventPayload createEventPayload(String tenantId) { + return createEventPayload(tenantId, null); + } + + private DataImportEventPayload createEventPayloadWithSubmatchProfile(String tenantId) { + ProfileSnapshotWrapper subMatchProfileWrapper = new ProfileSnapshotWrapper() + .withProfileId(matchProfile.getId()) + .withContentType(MATCH_PROFILE) + .withOrder(0) + .withReactTo(MATCH) + .withContent(JsonObject.mapFrom(matchProfile).getMap()); + + return createEventPayload(tenantId, subMatchProfileWrapper); + } + + private DataImportEventPayload createEventPayload(String tenantId, ProfileSnapshotWrapper nextProfileWrapper) { Record record = new Record() .withParsedRecord(new ParsedRecord().withContent(PARSED_CONTENT)); @@ -825,6 +972,10 @@ private DataImportEventPayload createEventPayload(String tenantId) { .withContentType(MATCH_PROFILE) .withContent(JsonObject.mapFrom(matchProfile).getMap()); + if (nextProfileWrapper != null) { + matchProfileWrapper.getChildSnapshotWrappers().add(nextProfileWrapper); + } + return new DataImportEventPayload() .withEventType(DI_INCOMING_MARC_BIB_RECORD_PARSED.value()) .withJobExecutionId(UUID.randomUUID().toString())