Skip to content

Commit

Permalink
MODINVSTOR-1245: Implement synchronization operation for service poin…
Browse files Browse the repository at this point in the history
…t events (#1088)

* MODINVSTOR-1245: Implement synchronization operation for service point event

* MODINVSTOR-1245: Fix from code review

* MODINVSTOR-1245: Comply with check style

* MODINVSTOR-1245: Comply with check style

* MODINVSTOR-1245: Add api test

* MODINVSTOR-1245: Rename service point event

* MODINVSTOR-1245: Rename tests

* MODINVSTOR-1245: Replace service point creation with service implementation

* MODINVSTOR-1245: Add validation for hold shelf expiry and pick up location

* MODINVSTOR-1245: Replace system-stubs-junit4 instead of hard-coded env variable

* MODINVSTOR-1245: Fix from code review

* MODINVSTOR-1245: Fix from code review

* MODINVSTOR-1245: Add logs
  • Loading branch information
Maksat-Galymzhan authored Sep 30, 2024
1 parent 6101067 commit f7f9680
Show file tree
Hide file tree
Showing 18 changed files with 920 additions and 16 deletions.
3 changes: 2 additions & 1 deletion descriptors/ModuleDescriptor-template.json
Original file line number Diff line number Diff line change
Expand Up @@ -2919,7 +2919,8 @@
{ "name": "S3_BUCKET", "value": "marc-migrations" },
{ "name": "S3_ACCESS_KEY_ID", "value": "" },
{ "name": "S3_SECRET_ACCESS_KEY", "value": "" },
{ "name": "S3_IS_AWS", "value": "true" }
{ "name": "S3_IS_AWS", "value": "true" },
{ "name": "ECS_TLR_FEATURE_ENABLED", "value": "false"}
]
}
}
7 changes: 7 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
<awaitility.version>4.2.2</awaitility.version>
<assertj.version>3.26.3</assertj.version>
<localstack.version>1.20.1</localstack.version>
<system-stubs-junit4.version>2.1.7</system-stubs-junit4.version>

<maven-compiler-plugin.version>3.13.0</maven-compiler-plugin.version>
<build-helper-maven-plugin.version>3.6.0</build-helper-maven-plugin.version>
Expand Down Expand Up @@ -268,6 +269,12 @@
<artifactId>log4j-slf4j2-impl</artifactId>
<scope>test</scope>
</dependency>
<dependency> <!-- for mocking env variable -->
<groupId>uk.org.webcompere</groupId>
<artifactId>system-stubs-junit4</artifactId>
<version>${system-stubs-junit4.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
19 changes: 19 additions & 0 deletions src/main/java/org/folio/rest/impl/InitApiImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.ThreadingModel;
import io.vertx.core.Vertx;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.folio.rest.resource.interfaces.InitAPI;
import org.folio.services.caches.ConsortiumDataCache;
import org.folio.services.consortium.ServicePointSynchronizationVerticle;
import org.folio.services.consortium.ShadowInstanceSynchronizationVerticle;
import org.folio.services.consortium.SynchronizationVerticle;
import org.folio.services.migration.async.AsyncMigrationConsumerVerticle;
Expand All @@ -25,6 +27,7 @@ public void init(Vertx vertx, Context context, Handler<AsyncResult<Boolean>> han
initAsyncMigrationVerticle(vertx)
.compose(v -> initShadowInstanceSynchronizationVerticle(vertx, getConsortiumDataCache(context)))
.compose(v -> initSynchronizationVerticle(vertx, getConsortiumDataCache(context)))
.compose(v -> initServicePointSynchronizationVerticle(vertx, getConsortiumDataCache(context)))
.map(true)
.onComplete(handler);
}
Expand Down Expand Up @@ -76,6 +79,22 @@ private Future<Object> initSynchronizationVerticle(Vertx vertx, ConsortiumDataCa
.mapEmpty();
}

private Future<Object> initServicePointSynchronizationVerticle(Vertx vertx,
ConsortiumDataCache consortiumDataCache) {

DeploymentOptions options = new DeploymentOptions()
.setThreadingModel(ThreadingModel.WORKER)
.setInstances(1);

return vertx.deployVerticle(() -> new ServicePointSynchronizationVerticle(consortiumDataCache),
options)
.onSuccess(v -> log.info("initServicePointSynchronizationVerticle:: "
+ "ServicePointSynchronizationVerticle verticle was successfully started"))
.onFailure(e -> log.error("initServicePointSynchronizationVerticle:: "
+ "ServicePointSynchronizationVerticle verticle was not successfully started", e))
.mapEmpty();
}

private void initConsortiumDataCache(Vertx vertx, Context context) {
ConsortiumDataCache consortiumDataCache = new ConsortiumDataCache(vertx, vertx.createHttpClient());
context.put(ConsortiumDataCache.class.getName(), consortiumDataCache);
Expand Down
22 changes: 7 additions & 15 deletions src/main/java/org/folio/rest/impl/ServicePointApi.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,11 @@ public void postServicePoints(Servicepoint entity,
id = UUID.randomUUID().toString();
entity.setId(id);
}
String tenantId = getTenant(okapiHeaders);
PostgresClient pgClient = getPgClient(vertxContext, tenantId);
pgClient.save(SERVICE_POINT_TABLE, id, entity, saveReply -> {
if (saveReply.failed()) {
String message = logAndSaveError(saveReply.cause());
new ServicePointService(vertxContext, okapiHeaders)
.createServicePoint(id, entity)
.onSuccess(response -> asyncResultHandler.handle(succeededFuture(response)))
.onFailure(throwable -> {
String message = logAndSaveError(throwable);
if (isDuplicate(message)) {
asyncResultHandler.handle(Future.succeededFuture(
PostServicePointsResponse.respond422WithApplicationJson(
Expand All @@ -86,15 +86,7 @@ public void postServicePoints(Servicepoint entity,
PostServicePointsResponse.respond500WithTextPlain(
getErrorResponse(message))));
}
} else {
String ret = saveReply.result();
entity.setId(ret);
asyncResultHandler.handle(Future.succeededFuture(
PostServicePointsResponse
.respond201WithApplicationJson(entity,
PostServicePointsResponse.headersFor201().withLocation(LOCATION_PREFIX + ret))));
}
});
});
} catch (Exception e) {
String message = logAndSaveError(e);
asyncResultHandler.handle(Future.succeededFuture(
Expand Down Expand Up @@ -248,7 +240,7 @@ private boolean isDuplicate(String errorMessage) {
"duplicate key value violates unique constraint");
}

private String validateServicePoint(Servicepoint svcpt) {
public static String validateServicePoint(Servicepoint svcpt) {

HoldShelfExpiryPeriod holdShelfExpiryPeriod = svcpt.getHoldShelfExpiryPeriod();
boolean pickupLocation = svcpt.getPickupLocation() == null ? Boolean.FALSE : svcpt.getPickupLocation();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package org.folio.services.consortium;

import static org.folio.rest.tools.utils.ModuleName.getModuleName;
import static org.folio.rest.tools.utils.ModuleName.getModuleVersion;
import static org.folio.services.domainevent.ServicePointEventType.SERVICE_POINT_CREATED;
import static org.folio.services.domainevent.ServicePointEventType.SERVICE_POINT_DELETED;
import static org.folio.services.domainevent.ServicePointEventType.SERVICE_POINT_UPDATED;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.http.HttpClient;
import java.util.ArrayList;
import java.util.List;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.folio.kafka.AsyncRecordHandler;
import org.folio.kafka.GlobalLoadSensor;
import org.folio.kafka.KafkaConfig;
import org.folio.kafka.KafkaConsumerWrapper;
import org.folio.kafka.SubscriptionDefinition;
import org.folio.kafka.services.KafkaEnvironmentProperties;
import org.folio.kafka.services.KafkaTopic;
import org.folio.services.caches.ConsortiumDataCache;
import org.folio.services.consortium.handler.ServicePointSynchronizationCreateHandler;
import org.folio.services.consortium.handler.ServicePointSynchronizationDeleteHandler;
import org.folio.services.consortium.handler.ServicePointSynchronizationUpdateHandler;
import org.folio.services.domainevent.ServicePointEventType;

public class ServicePointSynchronizationVerticle extends AbstractVerticle {

private static final Logger log = LogManager.getLogger(ServicePointSynchronizationVerticle.class);
private static final String TENANT_PATTERN = "\\w{1,}";
private static final String MODULE_ID = getModuleId();
private static final int DEFAULT_LOAD_LIMIT = 5;
private final ConsortiumDataCache consortiumDataCache;

private final List<KafkaConsumerWrapper<String, String>> consumers = new ArrayList<>();

public ServicePointSynchronizationVerticle(final ConsortiumDataCache consortiumDataCache) {
this.consortiumDataCache = consortiumDataCache;
}

@Override
public void start(Promise<Void> startPromise) throws Exception {
var httpClient = vertx.createHttpClient();

createConsumers(httpClient)
.onSuccess(v -> log.info("start:: verticle started"))
.onFailure(t -> log.error("start:: verticle start failed", t))
.onComplete(startPromise);
}

private Future<Void> createConsumers(HttpClient httpClient) {
final var config = getKafkaConfig();

return createEventConsumer(SERVICE_POINT_CREATED, config,
new ServicePointSynchronizationCreateHandler(consortiumDataCache, httpClient, vertx))
.compose(r -> createEventConsumer(SERVICE_POINT_UPDATED, config,
new ServicePointSynchronizationUpdateHandler(consortiumDataCache, httpClient, vertx)))
.compose(r -> createEventConsumer(SERVICE_POINT_DELETED, config,
new ServicePointSynchronizationDeleteHandler(consortiumDataCache, httpClient, vertx)))
.mapEmpty();
}

private Future<KafkaConsumerWrapper<String, String>> createEventConsumer(
ServicePointEventType eventType, KafkaConfig kafkaConfig,
AsyncRecordHandler<String, String> handler) {

var subscriptionDefinition = SubscriptionDefinition.builder()
.eventType(eventType.name())
.subscriptionPattern(buildSubscriptionPattern(eventType.getKafkaTopic(), kafkaConfig))
.build();

return createConsumer(kafkaConfig, subscriptionDefinition, handler);
}

private Future<KafkaConsumerWrapper<String, String>> createConsumer(KafkaConfig kafkaConfig,
SubscriptionDefinition subscriptionDefinition,
AsyncRecordHandler<String, String> recordHandler) {

var consumer = KafkaConsumerWrapper.<String, String>builder()
.context(context)
.vertx(vertx)
.kafkaConfig(kafkaConfig)
.loadLimit(DEFAULT_LOAD_LIMIT)
.globalLoadSensor(new GlobalLoadSensor())
.subscriptionDefinition(subscriptionDefinition)
.build();

return consumer.start(recordHandler, MODULE_ID)
.onSuccess(v -> consumers.add(consumer))
.map(consumer);
}

private static String buildSubscriptionPattern(KafkaTopic kafkaTopic, KafkaConfig kafkaConfig) {
return kafkaTopic.fullTopicName(kafkaConfig, TENANT_PATTERN);
}

private static String getModuleId() {
return getModuleName().replace("_", "-") + "-" + getModuleVersion();
}

private KafkaConfig getKafkaConfig() {
return KafkaConfig.builder()
.envId(KafkaEnvironmentProperties.environment())
.kafkaHost(KafkaEnvironmentProperties.host())
.kafkaPort(KafkaEnvironmentProperties.port())
.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package org.folio.services.consortium.handler;

import io.vertx.core.Vertx;
import io.vertx.core.http.HttpClient;
import org.folio.rest.jaxrs.model.Servicepoint;
import org.folio.services.caches.ConsortiumDataCache;
import org.folio.services.consortium.processor.ServicePointSynchronizationCreateEventProcessor;
import org.folio.services.consortium.processor.ServicePointSynchronizationEventProcessor;
import org.folio.services.domainevent.DomainEvent;

public class ServicePointSynchronizationCreateHandler extends ServicePointSynchronizationHandler {

public ServicePointSynchronizationCreateHandler(ConsortiumDataCache consortiumDataCache,
HttpClient httpClient, Vertx vertx) {

super(consortiumDataCache, httpClient, vertx);
}

@Override
protected ServicePointSynchronizationEventProcessor getServicePointSynchronizationProcessor(
DomainEvent<Servicepoint> domainEvent) {

return new ServicePointSynchronizationCreateEventProcessor(domainEvent);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package org.folio.services.consortium.handler;

import io.vertx.core.Vertx;
import io.vertx.core.http.HttpClient;
import org.folio.rest.jaxrs.model.Servicepoint;
import org.folio.services.caches.ConsortiumDataCache;
import org.folio.services.consortium.processor.ServicePointSynchronizationDeleteEventProcessor;
import org.folio.services.consortium.processor.ServicePointSynchronizationEventProcessor;
import org.folio.services.domainevent.DomainEvent;

public class ServicePointSynchronizationDeleteHandler extends ServicePointSynchronizationHandler {

public ServicePointSynchronizationDeleteHandler(ConsortiumDataCache consortiumDataCache,
HttpClient httpClient, Vertx vertx) {

super(consortiumDataCache, httpClient, vertx);
}

@Override
protected ServicePointSynchronizationEventProcessor getServicePointSynchronizationProcessor(
DomainEvent<Servicepoint> domainEvent) {

return new ServicePointSynchronizationDeleteEventProcessor(domainEvent);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package org.folio.services.consortium.handler;

import static io.vertx.core.Future.succeededFuture;

import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpClient;
import io.vertx.core.json.Json;
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
import java.util.Optional;
import org.apache.commons.collections4.map.CaseInsensitiveMap;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.folio.kafka.AsyncRecordHandler;
import org.folio.kafka.KafkaHeaderUtils;
import org.folio.rest.jaxrs.model.Servicepoint;
import org.folio.services.caches.ConsortiumData;
import org.folio.services.caches.ConsortiumDataCache;
import org.folio.services.consortium.SynchronizationContext;
import org.folio.services.consortium.processor.ServicePointSynchronizationEventProcessor;
import org.folio.services.domainevent.DomainEvent;

public abstract class ServicePointSynchronizationHandler
implements AsyncRecordHandler<String, String> {

private static final Logger log = LogManager.getLogger(
ServicePointSynchronizationHandler.class);

private final ConsortiumDataCache consortiumDataCache;
private final HttpClient httpClient;
private final Vertx vertx;

protected ServicePointSynchronizationHandler(ConsortiumDataCache consortiumDataCache,
HttpClient httpClient, Vertx vertx) {

this.consortiumDataCache = consortiumDataCache;
this.httpClient = httpClient;
this.vertx = vertx;
}

@Override
public Future<String> handle(KafkaConsumerRecord<String, String> kafkaConsumerRecord) {
log.info("handle:: Processing event {}", kafkaConsumerRecord.topic());
var headers = new CaseInsensitiveMap<>(KafkaHeaderUtils.kafkaHeadersToMap(
kafkaConsumerRecord.headers()));
return consortiumDataCache.getConsortiumData(headers)
.compose(consortiumData -> processConsortiumData(kafkaConsumerRecord, consortiumData,
headers));
}

private Future<String> processConsortiumData(
KafkaConsumerRecord<String, String> kafkaConsumerRecord,
Optional<ConsortiumData> consortiumData, CaseInsensitiveMap<String, String> headers) {

log.info("processConsortiumData:: {}", consortiumData);
return consortiumData.map(data -> processConsortiumDataByEvent(data, kafkaConsumerRecord,
headers)).orElseGet(() -> succeededFuture(kafkaConsumerRecord.key()));
}

private Future<String> processConsortiumDataByEvent(ConsortiumData data,
KafkaConsumerRecord<String, String> kafkaConsumerRecord,
CaseInsensitiveMap<String, String> headers) {

var event = Json.decodeValue(kafkaConsumerRecord.value(), DomainEvent.class);
var servicePointSynchronizationProcessor = getServicePointSynchronizationProcessor(event);
return servicePointSynchronizationProcessor.process(kafkaConsumerRecord.key(),
new SynchronizationContext(data, headers, vertx, httpClient));
}

protected abstract ServicePointSynchronizationEventProcessor getServicePointSynchronizationProcessor(
DomainEvent<Servicepoint> domainEvent);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package org.folio.services.consortium.handler;

import io.vertx.core.Vertx;
import io.vertx.core.http.HttpClient;
import org.folio.rest.jaxrs.model.Servicepoint;
import org.folio.services.caches.ConsortiumDataCache;
import org.folio.services.consortium.processor.ServicePointSynchronizationEventProcessor;
import org.folio.services.consortium.processor.ServicePointSynchronizationUpdateEventProcessor;
import org.folio.services.domainevent.DomainEvent;

public class ServicePointSynchronizationUpdateHandler extends ServicePointSynchronizationHandler {

public ServicePointSynchronizationUpdateHandler(ConsortiumDataCache consortiumDataCache,
HttpClient httpClient, Vertx vertx) {

super(consortiumDataCache, httpClient, vertx);
}

@Override
protected ServicePointSynchronizationEventProcessor getServicePointSynchronizationProcessor(
DomainEvent<Servicepoint> domainEvent) {

return new ServicePointSynchronizationUpdateEventProcessor(domainEvent);
}

}
Loading

0 comments on commit f7f9680

Please sign in to comment.