Skip to content

Commit

Permalink
HOTFIX: MODINVSTOR-1299 update rollback mechanism for instances when …
Browse files Browse the repository at this point in the history
…linking/unlinking with subjects is failed. (#1127)

* MODINVSTOR-1299 update rollback mechanism for instances when linking/unlinking with subjects is failed.

* FIX CHECKSTYLE

* handle hrid errors gracefully
increase timeout for some test cases

* fix test

* fix checkstyle

* fix test

* fix test

* remove redundant catch block
  • Loading branch information
JavokhirAbdullayev authored Dec 23, 2024
1 parent e4830e6 commit 3c2cf47
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 54 deletions.
1 change: 1 addition & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
* Requires `API_NAME vX.Y`

### Features
* update rollback mechanism for instances when linking/unlinking with subjects is failed. ([MODINVSTOR-1299](https://folio-org.atlassian.net/browse/MODINVSTOR-1299))
* Unable to delete local Subject types/sources when they are linked to an Instance ([MODINVSTOR-1284](https://folio-org.atlassian.net/browse/MODINVSTOR-1284))
* Modify endpoint for bulk instances upsert with publish events flag ([MODINVSTOR-1283](https://folio-org.atlassian.net/browse/MODINVSTOR-1283))
* Change Kafka event publishing keys for holdings and items ([MODINVSTOR-1281](https://folio-org.atlassian.net/browse/MODINVSTOR-1281))
Expand Down
7 changes: 7 additions & 0 deletions src/main/java/org/folio/persist/InstanceRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import static org.folio.rest.impl.BoundWithPartApi.BOUND_WITH_TABLE;
import static org.folio.rest.impl.HoldingsStorageApi.HOLDINGS_RECORD_TABLE;
import static org.folio.rest.impl.ItemStorageApi.ITEM_TABLE;
import static org.folio.rest.jaxrs.resource.InstanceStorage.PostInstanceStorageInstancesResponse.headersFor201;
import static org.folio.rest.jaxrs.resource.InstanceStorage.PostInstanceStorageInstancesResponse.respond201WithApplicationJson;
import static org.folio.rest.persist.PgUtil.postgresClient;

import com.fasterxml.jackson.core.JsonProcessingException;
Expand Down Expand Up @@ -88,6 +90,11 @@ public Future<RowSet<Row>> batchLinkSubjectSource(Conn conn, List<Pair<String, S
}
}

public Future<Response> createInstance(Conn conn, Instance instance) {
return conn.save(INSTANCE_TABLE, instance.getId(), instance)
.map(id -> respond201WithApplicationJson(instance.withId(id), headersFor201()));
}

public Future<RowSet<Row>> batchLinkSubjectType(Conn conn, List<Pair<String, String>> typePairs) {
try {
String sql = """
Expand Down
56 changes: 51 additions & 5 deletions src/main/java/org/folio/services/ResponseHandlerUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@

public final class ResponseHandlerUtil {
private static final String HRID_ERROR_MESSAGE = "lower(f_unaccent(jsonb ->> 'hrid'::text))";
private static final String MATCH_KEY_ERROR_MESSAGE = "lower(f_unaccent(jsonb ->> 'matchKey'::text))";
private static final String HRID = "HRID";
private static final String TABLE_NAME = "instance";

private ResponseHandlerUtil() {
}
Expand All @@ -25,13 +27,10 @@ public static Response handleHridError(Response response) {
}

private static String getErrorMessage(Object responseEntity) {
var errorMessage = responseEntity.toString();
if (responseEntity instanceof Errors errors) {
errorMessage = errors.getErrors().get(0).getMessage();
} else if (responseEntity instanceof String message) {
errorMessage = message;
return errors.getErrors().get(0).getMessage();
}
return errorMessage;
return responseEntity.toString();
}

private static Response createResponse(Response response) {
Expand All @@ -43,6 +42,13 @@ private static Response createResponse(Response response) {
}
}

private static Response createResponse(Response response, String errorMessage) {
var transformedMessage = transformHridErrorMessage(errorMessage);
return Response.fromResponse(response)
.entity(transformedMessage)
.build();
}

public static Response failedValidationResponse(Response response) {
var entity = (Errors) response.getEntity();
var errors = entity.getErrors();
Expand All @@ -52,4 +58,44 @@ public static Response failedValidationResponse(Response response) {
.entity(entity)
.build();
}

// todo: use helper methods from PgUtil class when they become public
public static Response handleHridErrorInInstance(Response response) {
var statusCode = response.getStatus();

if (statusCode == 201) {
return response;
}

var errorMessage = getErrorMessage(response.getEntity());
if (errorMessage.contains(HRID_ERROR_MESSAGE)) {
return createResponse(response, errorMessage);
} else if (errorMessage.contains(MATCH_KEY_ERROR_MESSAGE)) {
return createMatchKeyResponse(response);
}
return response;
}

private static Response createMatchKeyResponse(Response response) {
var entity = response.getEntity().toString();
var matchKeyValue = extractValue(entity);
var remappedMessage = String.format("%s value already exists in table instance: %s",
MATCH_KEY_ERROR_MESSAGE, matchKeyValue);
return Response.fromResponse(response)
.entity(remappedMessage)
.build();
}

private static String transformHridErrorMessage(String errorMessage) {
var hridValue = extractValue(errorMessage);
return hridValue != null
? String.format("%s value already exists in table %s: %s", HRID, TABLE_NAME, hridValue)
: errorMessage;
}

private static String extractValue(String errorMessage) {
var startIndex = errorMessage.indexOf("=(") + 2;
var endIndex = errorMessage.indexOf(")", startIndex);
return (startIndex > 1 && endIndex > startIndex) ? errorMessage.substring(startIndex, endIndex) : null;
}
}
76 changes: 36 additions & 40 deletions src/main/java/org/folio/services/instance/InstanceService.java
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
package org.folio.services.instance;

import static io.vertx.core.Promise.promise;
import static javax.ws.rs.core.Response.noContent;
import static org.folio.okapi.common.XOkapiHeaders.TENANT;
import static org.folio.persist.InstanceRepository.INSTANCE_TABLE;
import static org.folio.rest.impl.StorageHelper.MAX_ENTITIES;
import static org.folio.rest.jaxrs.resource.InstanceStorage.DeleteInstanceStorageInstancesByInstanceIdResponse;
import static org.folio.rest.jaxrs.resource.InstanceStorage.DeleteInstanceStorageInstancesResponse;
import static org.folio.rest.jaxrs.resource.InstanceStorage.GetInstanceStorageInstancesByInstanceIdResponse;
import static org.folio.rest.jaxrs.resource.InstanceStorage.PostInstanceStorageInstancesResponse.respond400WithTextPlain;
import static org.folio.rest.jaxrs.resource.InstanceStorageBatchSynchronous.PostInstanceStorageBatchSynchronousResponse;
import static org.folio.rest.persist.PgUtil.post;
import static org.folio.rest.persist.PgUtil.postSync;
import static org.folio.rest.persist.PgUtil.postgresClient;
import static org.folio.rest.persist.PgUtil.put;
Expand All @@ -21,6 +22,7 @@
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.ext.web.RoutingContext;
import io.vertx.pgclient.PgException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -114,27 +116,38 @@ public Future<Response> createInstance(Instance entity) {
return hridManager.populateHrid(entity)
.compose(NotesValidators::refuseLongNotes)
.compose(instance -> {
final Promise<Response> postResponse = promise(); // promise for the entire process

// execute post and linking logic within same transaction
return postgresClient.withTrans(conn -> {
final Promise<Response> postResponse = promise();

Promise<Response> postPromise = postInstance(instance);
postgresClient.withTrans(conn ->
instanceRepository.createInstance(conn, instance)
.compose(response -> {
if (response.getEntity() instanceof Instance instanceResp) {
return batchLinkSubjects(conn, instanceResp.getId(), instance.getSubjects())
.map(v -> response);
} else {
return Future.succeededFuture(respond400WithTextPlain(response.getEntity()));
}
})
.onSuccess(postResponse::complete)
.onFailure(throwable -> {
if (throwable instanceof PgException pgException) {
postResponse.complete(respond400WithTextPlain(pgException.getDetail()));
} else {
postResponse.complete(respond400WithTextPlain(throwable.getMessage()));
}
})
);

// Chain the batchLinkSubjects operation after postPromise completes
return postPromise.future()
.compose(response -> batchLinkSubjects(conn, instance.getId(), instance.getSubjects())
.map(v -> response)
);
}).onComplete(transactionResult -> {
if (transactionResult.succeeded()) {
postResponse.complete(transactionResult.result());
} else {
postResponse.fail(transactionResult.cause());
}
}).onSuccess(domainEventPublisher.publishCreated());
return postResponse.future()
// Return the response without waiting for a domain event publish
// to complete. Units of work performed by this service is the same
// but the ordering of the units of work provides a benefit to the
// api client invoking this endpoint. The response is returned
// a little earlier so the api client can continue its processing
// while the domain event publish is satisfied.
.onSuccess(domainEventPublisher.publishCreated());
})
.map(ResponseHandlerUtil::handleHridError);
.map(ResponseHandlerUtil::handleHridErrorInInstance);
}

public Future<Response> createInstances(List<Instance> instances, boolean upsert, boolean optimisticLocking,
Expand All @@ -149,6 +162,7 @@ public Future<Response> createInstances(List<Instance> instances, boolean upsert
.compose(batchOperation -> {
final Promise<Response> postResponse = promise();

// todo: use the connection for creating batches and linking subjects
postgresClient.withTrans(conn -> {

Promise<Response> postPromise = postSyncInstance(instances, upsert, optimisticLocking);
Expand Down Expand Up @@ -198,24 +212,6 @@ public Future<Response> updateInstance(String id, Instance newInstance) {
});
}

/**
* creates instance with separate promise, to use withing single transaction
* alongside with linking subject sources/types.
* */
private Promise<Response> postInstance(Instance instance) {
Promise<Response> promise = Promise.promise(); // Promise for the `post` operation
post(INSTANCE_TABLE, instance, okapiHeaders, vertxContext,
InstanceStorage.PostInstanceStorageInstancesResponse.class,
reply -> {
if (reply.succeeded()) {
promise.complete(reply.result()); // Mark `postPromise` as successful
} else {
promise.fail(reply.cause()); // Mark `postPromise` as failed
}
});
return promise;
}

private Promise<Response> postSyncInstance(List<Instance> instances, boolean upsert, boolean optimisticLocking) {
Promise<Response> promise = Promise.promise();
postSync(INSTANCE_TABLE, instances, MAX_ENTITIES, upsert, optimisticLocking, okapiHeaders,
Expand Down Expand Up @@ -339,7 +335,7 @@ public Future<Response> deleteAllInstances() {
.compose(notUsed -> relationshipRepository.deleteAll())
.compose(notUsed -> instanceRepository.deleteAll())
.onSuccess(notUsed -> domainEventPublisher.publishAllRemoved())
.map(Response.noContent().build());
.map(noContent().build());
}

/**
Expand All @@ -356,7 +352,7 @@ public Future<Response> deleteInstance(String id) {
rowSet.iterator().forEachRemaining(row ->
domainEventPublisher.publishRemoved(row.getString(0), row.getString(1))
);
return Response.noContent().build();
return noContent().build();
});
}

Expand All @@ -380,7 +376,7 @@ public Future<Response> deleteInstances(String cql) {
domainEventPublisher.publishRemoved(row.getString(0), row.getString(1))
)
))
.map(Response.noContent().build());
.map(noContent().build());
}

public Future<Void> publishReindexInstanceRecords(String rangeId, String fromId, String toId) {
Expand Down
8 changes: 4 additions & 4 deletions src/test/java/org/folio/rest/api/AsyncMigrationTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public void canMigrateItems() {

@Test
public void canMigrateInstanceSubjectsAndSeries() {
var numberOfRecords = 3;
var numberOfRecords = 10;

IntStream.range(0, numberOfRecords).parallel().forEach(v ->
instancesClient.create(new JsonObject()
Expand Down Expand Up @@ -167,7 +167,7 @@ public void canMigrateInstanceSubjectsAndSeries() {

@Test
public void canMigrateInstancePublicationPeriod() {
var numberOfRecords = 4;
var numberOfRecords = 10;

IntStream.range(0, numberOfRecords).parallel().forEach(v ->
instancesClient.create(new JsonObject()
Expand All @@ -182,7 +182,7 @@ public void canMigrateInstancePublicationPeriod() {
// check jsonb contains 'publicationPeriod' data
RowSet<Row> selectResult = runSql(String.format(SELECT_JSONB, TENANT_ID));

assertEquals(4, selectResult.rowCount());
assertEquals(10, selectResult.rowCount());
JsonObject jsonbData = selectResult.iterator().next().toJson().getJsonObject("jsonb");
assertNull(jsonbData.getJsonObject("dates"));
assertNotNull(jsonbData.getJsonObject("publicationPeriod"));
Expand All @@ -209,7 +209,7 @@ public void canMigrateInstancePublicationPeriod() {
var selectQuery = String.format(SELECT_JSONB, TENANT_ID);
RowSet<Row> result = runSql(selectQuery);

assertEquals(4, result.rowCount());
assertEquals(10, result.rowCount());
JsonObject entry = result.iterator().next().toJson();
JsonObject jsonb = entry.getJsonObject("jsonb");
JsonObject dates = jsonb.getJsonObject("dates");
Expand Down
9 changes: 4 additions & 5 deletions src/test/java/org/folio/rest/api/InstanceStorageTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ public void afterEach(TestContext context) {

// This calls get() to ensure blocking until all futures are complete.
final Async async = context.async();
List<CompletableFuture<Response>> cfs = new ArrayList<CompletableFuture<Response>>();
List<CompletableFuture<Response>> cfs = new ArrayList<>();
natureOfContentIdsToRemoveAfterTest.forEach(id -> cfs.add(getClient()
.delete(natureOfContentTermsUrl("/" + id), TENANT_ID)));
CompletableFuture.allOf(cfs.toArray(new CompletableFuture[cfs.size()]))
Expand Down Expand Up @@ -1945,9 +1945,9 @@ public void shouldChangeStatusUpdatedDateOnSubsequentStatusChanges() throws Exce
assertThat(updatedInstanceWithOthStatus.getString(STATUS_UPDATED_DATE_PROPERTY), hasIsoFormat());

assertThat(updatedInstanceWithCatStatus
.getInstant(STATUS_UPDATED_DATE_PROPERTY), withinSecondsBeforeNow(seconds(2)));
.getInstant(STATUS_UPDATED_DATE_PROPERTY), withinSecondsBeforeNow(seconds(3)));
assertThat(updatedInstanceWithOthStatus
.getInstant(STATUS_UPDATED_DATE_PROPERTY), withinSecondsBeforeNow(seconds(1)));
.getInstant(STATUS_UPDATED_DATE_PROPERTY), withinSecondsBeforeNow(seconds(2)));
}

/**
Expand Down Expand Up @@ -2341,7 +2341,6 @@ public void cannotCreateAnInstanceWhenAlreadyAllocatedHridIsAllocated() {
final Response response = createCompleted.get(TIMEOUT, TimeUnit.SECONDS);

assertThat(response.getStatusCode(), is(400));

assertThat(response.getBody(), is(
"HRID value already exists in table instance: in00000001000"));
}
Expand Down Expand Up @@ -3095,7 +3094,7 @@ private IndividualResource createInstance(JsonObject instanceToCreate)
getClient().post(instancesStorageUrl(""), instanceToCreate,
TENANT_ID, json(createCompleted));

Response response = createCompleted.get(2, SECONDS);
Response response = createCompleted.get(5, SECONDS);

assertThat(format("Create instance failed: %s", response.getBody()),
response.getStatusCode(), is(201));
Expand Down

0 comments on commit 3c2cf47

Please sign in to comment.