From f7f96808772a176725c133cb7fbe1dba5539530a Mon Sep 17 00:00:00 2001 From: Maksat <144414992+Maksat-Galymzhan@users.noreply.github.com> Date: Mon, 30 Sep 2024 21:52:41 +0500 Subject: [PATCH] MODINVSTOR-1245: Implement synchronization operation for service point events (#1088) * MODINVSTOR-1245: Implement synchronization operation for service point event * MODINVSTOR-1245: Fix from code review * MODINVSTOR-1245: Comply with check style * MODINVSTOR-1245: Comply with check style * MODINVSTOR-1245: Add api test * MODINVSTOR-1245: Rename service point event * MODINVSTOR-1245: Rename tests * MODINVSTOR-1245: Replace service point creation with service implementation * MODINVSTOR-1245: Add validation for hold shelf expiry and pick up location * MODINVSTOR-1245: Replace system-stubs-junit4 instead of hard-coded env variable * MODINVSTOR-1245: Fix from code review * MODINVSTOR-1245: Fix from code review * MODINVSTOR-1245: Add logs --- descriptors/ModuleDescriptor-template.json | 3 +- pom.xml | 7 + .../java/org/folio/rest/impl/InitApiImpl.java | 19 ++ .../org/folio/rest/impl/ServicePointApi.java | 22 +- .../ServicePointSynchronizationVerticle.java | 111 ++++++ ...vicePointSynchronizationCreateHandler.java | 26 ++ ...vicePointSynchronizationDeleteHandler.java | 26 ++ .../ServicePointSynchronizationHandler.java | 73 ++++ ...vicePointSynchronizationUpdateHandler.java | 26 ++ ...ntSynchronizationCreateEventProcessor.java | 59 ++++ ...ntSynchronizationDeleteEventProcessor.java | 42 +++ ...icePointSynchronizationEventProcessor.java | 80 +++++ ...ntSynchronizationUpdateEventProcessor.java | 66 ++++ .../ServicePointDomainEventPublisher.java | 4 + .../domainevent/ServicePointEventType.java | 31 ++ .../servicepoint/ServicePointService.java | 12 + .../java/org/folio/utils/Environment.java | 8 + ...rvicePointSynchronizationVerticleTest.java | 321 ++++++++++++++++++ 18 files changed, 920 insertions(+), 16 deletions(-) create mode 100644 src/main/java/org/folio/services/consortium/ServicePointSynchronizationVerticle.java create mode 100644 src/main/java/org/folio/services/consortium/handler/ServicePointSynchronizationCreateHandler.java create mode 100644 src/main/java/org/folio/services/consortium/handler/ServicePointSynchronizationDeleteHandler.java create mode 100644 src/main/java/org/folio/services/consortium/handler/ServicePointSynchronizationHandler.java create mode 100644 src/main/java/org/folio/services/consortium/handler/ServicePointSynchronizationUpdateHandler.java create mode 100644 src/main/java/org/folio/services/consortium/processor/ServicePointSynchronizationCreateEventProcessor.java create mode 100644 src/main/java/org/folio/services/consortium/processor/ServicePointSynchronizationDeleteEventProcessor.java create mode 100644 src/main/java/org/folio/services/consortium/processor/ServicePointSynchronizationEventProcessor.java create mode 100644 src/main/java/org/folio/services/consortium/processor/ServicePointSynchronizationUpdateEventProcessor.java create mode 100644 src/main/java/org/folio/services/domainevent/ServicePointEventType.java create mode 100644 src/main/java/org/folio/utils/Environment.java create mode 100644 src/test/java/org/folio/rest/api/ServicePointSynchronizationVerticleTest.java diff --git a/descriptors/ModuleDescriptor-template.json b/descriptors/ModuleDescriptor-template.json index 2ddfa81e9..0a986b04e 100755 --- a/descriptors/ModuleDescriptor-template.json +++ b/descriptors/ModuleDescriptor-template.json @@ -2919,7 +2919,8 @@ { "name": "S3_BUCKET", "value": "marc-migrations" }, { "name": "S3_ACCESS_KEY_ID", "value": "" }, { "name": "S3_SECRET_ACCESS_KEY", "value": "" }, - { "name": "S3_IS_AWS", "value": "true" } + { "name": "S3_IS_AWS", "value": "true" }, + { "name": "ECS_TLR_FEATURE_ENABLED", "value": "false"} ] } } diff --git a/pom.xml b/pom.xml index 80379cba1..760dd49d9 100644 --- a/pom.xml +++ b/pom.xml @@ -37,6 +37,7 @@ 4.2.2 3.26.3 1.20.1 + 2.1.7 3.13.0 3.6.0 @@ -268,6 +269,12 @@ log4j-slf4j2-impl test + + uk.org.webcompere + system-stubs-junit4 + ${system-stubs-junit4.version} + test + diff --git a/src/main/java/org/folio/rest/impl/InitApiImpl.java b/src/main/java/org/folio/rest/impl/InitApiImpl.java index 134fec4e9..bfa05628d 100644 --- a/src/main/java/org/folio/rest/impl/InitApiImpl.java +++ b/src/main/java/org/folio/rest/impl/InitApiImpl.java @@ -6,11 +6,13 @@ import io.vertx.core.Future; import io.vertx.core.Handler; import io.vertx.core.Promise; +import io.vertx.core.ThreadingModel; import io.vertx.core.Vertx; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.folio.rest.resource.interfaces.InitAPI; import org.folio.services.caches.ConsortiumDataCache; +import org.folio.services.consortium.ServicePointSynchronizationVerticle; import org.folio.services.consortium.ShadowInstanceSynchronizationVerticle; import org.folio.services.consortium.SynchronizationVerticle; import org.folio.services.migration.async.AsyncMigrationConsumerVerticle; @@ -25,6 +27,7 @@ public void init(Vertx vertx, Context context, Handler> han initAsyncMigrationVerticle(vertx) .compose(v -> initShadowInstanceSynchronizationVerticle(vertx, getConsortiumDataCache(context))) .compose(v -> initSynchronizationVerticle(vertx, getConsortiumDataCache(context))) + .compose(v -> initServicePointSynchronizationVerticle(vertx, getConsortiumDataCache(context))) .map(true) .onComplete(handler); } @@ -76,6 +79,22 @@ private Future initSynchronizationVerticle(Vertx vertx, ConsortiumDataCa .mapEmpty(); } + private Future initServicePointSynchronizationVerticle(Vertx vertx, + ConsortiumDataCache consortiumDataCache) { + + DeploymentOptions options = new DeploymentOptions() + .setThreadingModel(ThreadingModel.WORKER) + .setInstances(1); + + return vertx.deployVerticle(() -> new ServicePointSynchronizationVerticle(consortiumDataCache), + options) + .onSuccess(v -> log.info("initServicePointSynchronizationVerticle:: " + + "ServicePointSynchronizationVerticle verticle was successfully started")) + .onFailure(e -> log.error("initServicePointSynchronizationVerticle:: " + + "ServicePointSynchronizationVerticle verticle was not successfully started", e)) + .mapEmpty(); + } + private void initConsortiumDataCache(Vertx vertx, Context context) { ConsortiumDataCache consortiumDataCache = new ConsortiumDataCache(vertx, vertx.createHttpClient()); context.put(ConsortiumDataCache.class.getName(), consortiumDataCache); diff --git a/src/main/java/org/folio/rest/impl/ServicePointApi.java b/src/main/java/org/folio/rest/impl/ServicePointApi.java index 60d5cacf6..fd4ca47d1 100644 --- a/src/main/java/org/folio/rest/impl/ServicePointApi.java +++ b/src/main/java/org/folio/rest/impl/ServicePointApi.java @@ -71,11 +71,11 @@ public void postServicePoints(Servicepoint entity, id = UUID.randomUUID().toString(); entity.setId(id); } - String tenantId = getTenant(okapiHeaders); - PostgresClient pgClient = getPgClient(vertxContext, tenantId); - pgClient.save(SERVICE_POINT_TABLE, id, entity, saveReply -> { - if (saveReply.failed()) { - String message = logAndSaveError(saveReply.cause()); + new ServicePointService(vertxContext, okapiHeaders) + .createServicePoint(id, entity) + .onSuccess(response -> asyncResultHandler.handle(succeededFuture(response))) + .onFailure(throwable -> { + String message = logAndSaveError(throwable); if (isDuplicate(message)) { asyncResultHandler.handle(Future.succeededFuture( PostServicePointsResponse.respond422WithApplicationJson( @@ -86,15 +86,7 @@ public void postServicePoints(Servicepoint entity, PostServicePointsResponse.respond500WithTextPlain( getErrorResponse(message)))); } - } else { - String ret = saveReply.result(); - entity.setId(ret); - asyncResultHandler.handle(Future.succeededFuture( - PostServicePointsResponse - .respond201WithApplicationJson(entity, - PostServicePointsResponse.headersFor201().withLocation(LOCATION_PREFIX + ret)))); - } - }); + }); } catch (Exception e) { String message = logAndSaveError(e); asyncResultHandler.handle(Future.succeededFuture( @@ -248,7 +240,7 @@ private boolean isDuplicate(String errorMessage) { "duplicate key value violates unique constraint"); } - private String validateServicePoint(Servicepoint svcpt) { + public static String validateServicePoint(Servicepoint svcpt) { HoldShelfExpiryPeriod holdShelfExpiryPeriod = svcpt.getHoldShelfExpiryPeriod(); boolean pickupLocation = svcpt.getPickupLocation() == null ? Boolean.FALSE : svcpt.getPickupLocation(); diff --git a/src/main/java/org/folio/services/consortium/ServicePointSynchronizationVerticle.java b/src/main/java/org/folio/services/consortium/ServicePointSynchronizationVerticle.java new file mode 100644 index 000000000..7753ce861 --- /dev/null +++ b/src/main/java/org/folio/services/consortium/ServicePointSynchronizationVerticle.java @@ -0,0 +1,111 @@ +package org.folio.services.consortium; + +import static org.folio.rest.tools.utils.ModuleName.getModuleName; +import static org.folio.rest.tools.utils.ModuleName.getModuleVersion; +import static org.folio.services.domainevent.ServicePointEventType.SERVICE_POINT_CREATED; +import static org.folio.services.domainevent.ServicePointEventType.SERVICE_POINT_DELETED; +import static org.folio.services.domainevent.ServicePointEventType.SERVICE_POINT_UPDATED; + +import io.vertx.core.AbstractVerticle; +import io.vertx.core.Future; +import io.vertx.core.Promise; +import io.vertx.core.http.HttpClient; +import java.util.ArrayList; +import java.util.List; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.folio.kafka.AsyncRecordHandler; +import org.folio.kafka.GlobalLoadSensor; +import org.folio.kafka.KafkaConfig; +import org.folio.kafka.KafkaConsumerWrapper; +import org.folio.kafka.SubscriptionDefinition; +import org.folio.kafka.services.KafkaEnvironmentProperties; +import org.folio.kafka.services.KafkaTopic; +import org.folio.services.caches.ConsortiumDataCache; +import org.folio.services.consortium.handler.ServicePointSynchronizationCreateHandler; +import org.folio.services.consortium.handler.ServicePointSynchronizationDeleteHandler; +import org.folio.services.consortium.handler.ServicePointSynchronizationUpdateHandler; +import org.folio.services.domainevent.ServicePointEventType; + +public class ServicePointSynchronizationVerticle extends AbstractVerticle { + + private static final Logger log = LogManager.getLogger(ServicePointSynchronizationVerticle.class); + private static final String TENANT_PATTERN = "\\w{1,}"; + private static final String MODULE_ID = getModuleId(); + private static final int DEFAULT_LOAD_LIMIT = 5; + private final ConsortiumDataCache consortiumDataCache; + + private final List> consumers = new ArrayList<>(); + + public ServicePointSynchronizationVerticle(final ConsortiumDataCache consortiumDataCache) { + this.consortiumDataCache = consortiumDataCache; + } + + @Override + public void start(Promise startPromise) throws Exception { + var httpClient = vertx.createHttpClient(); + + createConsumers(httpClient) + .onSuccess(v -> log.info("start:: verticle started")) + .onFailure(t -> log.error("start:: verticle start failed", t)) + .onComplete(startPromise); + } + + private Future createConsumers(HttpClient httpClient) { + final var config = getKafkaConfig(); + + return createEventConsumer(SERVICE_POINT_CREATED, config, + new ServicePointSynchronizationCreateHandler(consortiumDataCache, httpClient, vertx)) + .compose(r -> createEventConsumer(SERVICE_POINT_UPDATED, config, + new ServicePointSynchronizationUpdateHandler(consortiumDataCache, httpClient, vertx))) + .compose(r -> createEventConsumer(SERVICE_POINT_DELETED, config, + new ServicePointSynchronizationDeleteHandler(consortiumDataCache, httpClient, vertx))) + .mapEmpty(); + } + + private Future> createEventConsumer( + ServicePointEventType eventType, KafkaConfig kafkaConfig, + AsyncRecordHandler handler) { + + var subscriptionDefinition = SubscriptionDefinition.builder() + .eventType(eventType.name()) + .subscriptionPattern(buildSubscriptionPattern(eventType.getKafkaTopic(), kafkaConfig)) + .build(); + + return createConsumer(kafkaConfig, subscriptionDefinition, handler); + } + + private Future> createConsumer(KafkaConfig kafkaConfig, + SubscriptionDefinition subscriptionDefinition, + AsyncRecordHandler recordHandler) { + + var consumer = KafkaConsumerWrapper.builder() + .context(context) + .vertx(vertx) + .kafkaConfig(kafkaConfig) + .loadLimit(DEFAULT_LOAD_LIMIT) + .globalLoadSensor(new GlobalLoadSensor()) + .subscriptionDefinition(subscriptionDefinition) + .build(); + + return consumer.start(recordHandler, MODULE_ID) + .onSuccess(v -> consumers.add(consumer)) + .map(consumer); + } + + private static String buildSubscriptionPattern(KafkaTopic kafkaTopic, KafkaConfig kafkaConfig) { + return kafkaTopic.fullTopicName(kafkaConfig, TENANT_PATTERN); + } + + private static String getModuleId() { + return getModuleName().replace("_", "-") + "-" + getModuleVersion(); + } + + private KafkaConfig getKafkaConfig() { + return KafkaConfig.builder() + .envId(KafkaEnvironmentProperties.environment()) + .kafkaHost(KafkaEnvironmentProperties.host()) + .kafkaPort(KafkaEnvironmentProperties.port()) + .build(); + } +} diff --git a/src/main/java/org/folio/services/consortium/handler/ServicePointSynchronizationCreateHandler.java b/src/main/java/org/folio/services/consortium/handler/ServicePointSynchronizationCreateHandler.java new file mode 100644 index 000000000..f38b118fd --- /dev/null +++ b/src/main/java/org/folio/services/consortium/handler/ServicePointSynchronizationCreateHandler.java @@ -0,0 +1,26 @@ +package org.folio.services.consortium.handler; + +import io.vertx.core.Vertx; +import io.vertx.core.http.HttpClient; +import org.folio.rest.jaxrs.model.Servicepoint; +import org.folio.services.caches.ConsortiumDataCache; +import org.folio.services.consortium.processor.ServicePointSynchronizationCreateEventProcessor; +import org.folio.services.consortium.processor.ServicePointSynchronizationEventProcessor; +import org.folio.services.domainevent.DomainEvent; + +public class ServicePointSynchronizationCreateHandler extends ServicePointSynchronizationHandler { + + public ServicePointSynchronizationCreateHandler(ConsortiumDataCache consortiumDataCache, + HttpClient httpClient, Vertx vertx) { + + super(consortiumDataCache, httpClient, vertx); + } + + @Override + protected ServicePointSynchronizationEventProcessor getServicePointSynchronizationProcessor( + DomainEvent domainEvent) { + + return new ServicePointSynchronizationCreateEventProcessor(domainEvent); + } + +} diff --git a/src/main/java/org/folio/services/consortium/handler/ServicePointSynchronizationDeleteHandler.java b/src/main/java/org/folio/services/consortium/handler/ServicePointSynchronizationDeleteHandler.java new file mode 100644 index 000000000..6493526a6 --- /dev/null +++ b/src/main/java/org/folio/services/consortium/handler/ServicePointSynchronizationDeleteHandler.java @@ -0,0 +1,26 @@ +package org.folio.services.consortium.handler; + +import io.vertx.core.Vertx; +import io.vertx.core.http.HttpClient; +import org.folio.rest.jaxrs.model.Servicepoint; +import org.folio.services.caches.ConsortiumDataCache; +import org.folio.services.consortium.processor.ServicePointSynchronizationDeleteEventProcessor; +import org.folio.services.consortium.processor.ServicePointSynchronizationEventProcessor; +import org.folio.services.domainevent.DomainEvent; + +public class ServicePointSynchronizationDeleteHandler extends ServicePointSynchronizationHandler { + + public ServicePointSynchronizationDeleteHandler(ConsortiumDataCache consortiumDataCache, + HttpClient httpClient, Vertx vertx) { + + super(consortiumDataCache, httpClient, vertx); + } + + @Override + protected ServicePointSynchronizationEventProcessor getServicePointSynchronizationProcessor( + DomainEvent domainEvent) { + + return new ServicePointSynchronizationDeleteEventProcessor(domainEvent); + } + +} diff --git a/src/main/java/org/folio/services/consortium/handler/ServicePointSynchronizationHandler.java b/src/main/java/org/folio/services/consortium/handler/ServicePointSynchronizationHandler.java new file mode 100644 index 000000000..6d2246e79 --- /dev/null +++ b/src/main/java/org/folio/services/consortium/handler/ServicePointSynchronizationHandler.java @@ -0,0 +1,73 @@ +package org.folio.services.consortium.handler; + +import static io.vertx.core.Future.succeededFuture; + +import io.vertx.core.Future; +import io.vertx.core.Vertx; +import io.vertx.core.http.HttpClient; +import io.vertx.core.json.Json; +import io.vertx.kafka.client.consumer.KafkaConsumerRecord; +import java.util.Optional; +import org.apache.commons.collections4.map.CaseInsensitiveMap; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.folio.kafka.AsyncRecordHandler; +import org.folio.kafka.KafkaHeaderUtils; +import org.folio.rest.jaxrs.model.Servicepoint; +import org.folio.services.caches.ConsortiumData; +import org.folio.services.caches.ConsortiumDataCache; +import org.folio.services.consortium.SynchronizationContext; +import org.folio.services.consortium.processor.ServicePointSynchronizationEventProcessor; +import org.folio.services.domainevent.DomainEvent; + +public abstract class ServicePointSynchronizationHandler + implements AsyncRecordHandler { + + private static final Logger log = LogManager.getLogger( + ServicePointSynchronizationHandler.class); + + private final ConsortiumDataCache consortiumDataCache; + private final HttpClient httpClient; + private final Vertx vertx; + + protected ServicePointSynchronizationHandler(ConsortiumDataCache consortiumDataCache, + HttpClient httpClient, Vertx vertx) { + + this.consortiumDataCache = consortiumDataCache; + this.httpClient = httpClient; + this.vertx = vertx; + } + + @Override + public Future handle(KafkaConsumerRecord kafkaConsumerRecord) { + log.info("handle:: Processing event {}", kafkaConsumerRecord.topic()); + var headers = new CaseInsensitiveMap<>(KafkaHeaderUtils.kafkaHeadersToMap( + kafkaConsumerRecord.headers())); + return consortiumDataCache.getConsortiumData(headers) + .compose(consortiumData -> processConsortiumData(kafkaConsumerRecord, consortiumData, + headers)); + } + + private Future processConsortiumData( + KafkaConsumerRecord kafkaConsumerRecord, + Optional consortiumData, CaseInsensitiveMap headers) { + + log.info("processConsortiumData:: {}", consortiumData); + return consortiumData.map(data -> processConsortiumDataByEvent(data, kafkaConsumerRecord, + headers)).orElseGet(() -> succeededFuture(kafkaConsumerRecord.key())); + } + + private Future processConsortiumDataByEvent(ConsortiumData data, + KafkaConsumerRecord kafkaConsumerRecord, + CaseInsensitiveMap headers) { + + var event = Json.decodeValue(kafkaConsumerRecord.value(), DomainEvent.class); + var servicePointSynchronizationProcessor = getServicePointSynchronizationProcessor(event); + return servicePointSynchronizationProcessor.process(kafkaConsumerRecord.key(), + new SynchronizationContext(data, headers, vertx, httpClient)); + } + + protected abstract ServicePointSynchronizationEventProcessor getServicePointSynchronizationProcessor( + DomainEvent domainEvent); + +} diff --git a/src/main/java/org/folio/services/consortium/handler/ServicePointSynchronizationUpdateHandler.java b/src/main/java/org/folio/services/consortium/handler/ServicePointSynchronizationUpdateHandler.java new file mode 100644 index 000000000..ce24dd4c4 --- /dev/null +++ b/src/main/java/org/folio/services/consortium/handler/ServicePointSynchronizationUpdateHandler.java @@ -0,0 +1,26 @@ +package org.folio.services.consortium.handler; + +import io.vertx.core.Vertx; +import io.vertx.core.http.HttpClient; +import org.folio.rest.jaxrs.model.Servicepoint; +import org.folio.services.caches.ConsortiumDataCache; +import org.folio.services.consortium.processor.ServicePointSynchronizationEventProcessor; +import org.folio.services.consortium.processor.ServicePointSynchronizationUpdateEventProcessor; +import org.folio.services.domainevent.DomainEvent; + +public class ServicePointSynchronizationUpdateHandler extends ServicePointSynchronizationHandler { + + public ServicePointSynchronizationUpdateHandler(ConsortiumDataCache consortiumDataCache, + HttpClient httpClient, Vertx vertx) { + + super(consortiumDataCache, httpClient, vertx); + } + + @Override + protected ServicePointSynchronizationEventProcessor getServicePointSynchronizationProcessor( + DomainEvent domainEvent) { + + return new ServicePointSynchronizationUpdateEventProcessor(domainEvent); + } + +} diff --git a/src/main/java/org/folio/services/consortium/processor/ServicePointSynchronizationCreateEventProcessor.java b/src/main/java/org/folio/services/consortium/processor/ServicePointSynchronizationCreateEventProcessor.java new file mode 100644 index 000000000..679756890 --- /dev/null +++ b/src/main/java/org/folio/services/consortium/processor/ServicePointSynchronizationCreateEventProcessor.java @@ -0,0 +1,59 @@ +package org.folio.services.consortium.processor; + +import static io.vertx.core.Future.failedFuture; + +import io.vertx.core.Future; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.folio.rest.impl.ServicePointApi; +import org.folio.rest.jaxrs.model.Servicepoint; +import org.folio.rest.persist.PostgresClient; +import org.folio.services.domainevent.DomainEvent; +import org.folio.services.domainevent.ServicePointEventType; +import org.folio.services.servicepoint.ServicePointService; + +public class ServicePointSynchronizationCreateEventProcessor + extends ServicePointSynchronizationEventProcessor { + + private static final Logger log = LogManager.getLogger( + ServicePointSynchronizationCreateEventProcessor.class); + + public ServicePointSynchronizationCreateEventProcessor(DomainEvent domainEvent) { + super(ServicePointEventType.SERVICE_POINT_CREATED, domainEvent); + } + + @Override + protected Future processEvent(ServicePointService servicePointService, String servicePointId) { + try { + Servicepoint servicePoint = PostgresClient.pojo2JsonObject(domainEvent.getNewEntity()) + .mapTo(Servicepoint.class); + + return servicePointService.createServicePoint(servicePointId, servicePoint) + .map(servicePointId); + } catch (Exception e) { + log.error("processEvent:: failed due to {}", e.getMessage(), e); + return failedFuture(e); + } + } + + @Override + protected boolean validateEventEntity() { + try { + Servicepoint servicePoint = PostgresClient.pojo2JsonObject(domainEvent.getNewEntity()) + .mapTo(Servicepoint.class); + if (servicePoint == null) { + log.warn("validateEventEntity:: failed to find new service point entity"); + return false; + } + String validationMessage = ServicePointApi.validateServicePoint(servicePoint); + if (validationMessage != null) { + log.warn("validateEventEntity:: {}", validationMessage); + return false; + } + return true; + } catch (Exception e) { + log.error("validateEventEntity:: failed due to {}", e.getMessage(), e); + } + return false; + } +} diff --git a/src/main/java/org/folio/services/consortium/processor/ServicePointSynchronizationDeleteEventProcessor.java b/src/main/java/org/folio/services/consortium/processor/ServicePointSynchronizationDeleteEventProcessor.java new file mode 100644 index 000000000..dbebbe36f --- /dev/null +++ b/src/main/java/org/folio/services/consortium/processor/ServicePointSynchronizationDeleteEventProcessor.java @@ -0,0 +1,42 @@ +package org.folio.services.consortium.processor; + +import io.vertx.core.Future; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.folio.rest.jaxrs.model.Servicepoint; +import org.folio.rest.persist.PostgresClient; +import org.folio.services.domainevent.DomainEvent; +import org.folio.services.domainevent.ServicePointEventType; +import org.folio.services.servicepoint.ServicePointService; + +public class ServicePointSynchronizationDeleteEventProcessor + extends ServicePointSynchronizationEventProcessor { + + private static final Logger log = LogManager.getLogger( + ServicePointSynchronizationDeleteEventProcessor.class); + + public ServicePointSynchronizationDeleteEventProcessor(DomainEvent domainEvent) { + super(ServicePointEventType.SERVICE_POINT_DELETED, domainEvent); + } + + @Override + protected Future processEvent(ServicePointService servicePointService, String servicePointId) { + return servicePointService.deleteServicePoint(servicePointId).map(servicePointId); + } + + @Override + protected boolean validateEventEntity() { + try { + var servicePoint = PostgresClient.pojo2JsonObject(domainEvent.getOldEntity()) + .mapTo(Servicepoint.class); + if (servicePoint == null) { + log.warn("validateEventEntity:: service point is null"); + return false; + } + return true; + } catch (Exception e) { + log.error("validateEventEntity:: failed to {}", e.getMessage(), e); + } + return false; + } +} diff --git a/src/main/java/org/folio/services/consortium/processor/ServicePointSynchronizationEventProcessor.java b/src/main/java/org/folio/services/consortium/processor/ServicePointSynchronizationEventProcessor.java new file mode 100644 index 000000000..2dff9e92f --- /dev/null +++ b/src/main/java/org/folio/services/consortium/processor/ServicePointSynchronizationEventProcessor.java @@ -0,0 +1,80 @@ +package org.folio.services.consortium.processor; + +import static io.vertx.core.Future.succeededFuture; +import static java.lang.Boolean.FALSE; +import static org.folio.okapi.common.XOkapiHeaders.TENANT; +import static org.folio.utils.ConsortiumUtils.isCentralTenant; + +import io.vertx.core.Future; +import java.util.HashMap; +import java.util.Map; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.folio.rest.jaxrs.model.Servicepoint; +import org.folio.services.consortium.SynchronizationContext; +import org.folio.services.domainevent.DomainEvent; +import org.folio.services.domainevent.ServicePointEventType; +import org.folio.services.servicepoint.ServicePointService; +import org.folio.utils.Environment; + +public abstract class ServicePointSynchronizationEventProcessor { + + private static final Logger log = LogManager.getLogger( + ServicePointSynchronizationEventProcessor.class); + private static final String ECS_TLR_FEATURE_ENABLED = "ECS_TLR_FEATURE_ENABLED"; + protected final DomainEvent domainEvent; + private final ServicePointEventType servicePointEventType; + + protected ServicePointSynchronizationEventProcessor(ServicePointEventType eventType, + DomainEvent domainEvent) { + this.servicePointEventType = eventType; + this.domainEvent = domainEvent; + } + + public Future process(String eventKey, SynchronizationContext context) { + var future = succeededFuture(eventKey); + if (!isCentralTenant(domainEvent.getTenant(), context.consortiaData()) + || !isEcsTlrFeatureEnabled() + || servicePointEventType.getPayloadType() != domainEvent.getType()) { + + log.info("process:: tenant: {}, event type: {}, ECS_TLR_FEATURE_ENABLED: {}", + domainEvent.getTenant(), domainEvent.getType(), isEcsTlrFeatureEnabled()); + return future; + } + if (!validateEventEntity()) { + log.warn("process:: validation event entity failed"); + return future; + } + var vertxContext = context.vertx().getOrCreateContext(); + var headers = context.headers(); + for (String memberTenant : context.consortiaData().memberTenants()) { + log.info("process:: tenant {} servicePointId {}", memberTenant, eventKey); + future = future.eventually(() -> prepareHeaders(headers, memberTenant) + .compose(lendingTenantHeader -> { + var servicePointService = new ServicePointService(vertxContext, lendingTenantHeader); + return processEvent(servicePointService, eventKey); + }) + .onFailure(e -> + log.warn("process:: tenant {} servicePointId {} failed", memberTenant, eventKey, e))); + } + return future; + } + + protected abstract Future processEvent(ServicePointService servicePointService, + String servicePointId); + + protected abstract boolean validateEventEntity(); + + private boolean isEcsTlrFeatureEnabled() { + return Boolean.parseBoolean(Environment.getEnvVar(ECS_TLR_FEATURE_ENABLED, FALSE.toString())); + } + + private Future> prepareHeaders(Map headers, + String memberTenant) { + + var map = new HashMap<>(headers); + map.put(TENANT, memberTenant); + return succeededFuture(map); + } + +} diff --git a/src/main/java/org/folio/services/consortium/processor/ServicePointSynchronizationUpdateEventProcessor.java b/src/main/java/org/folio/services/consortium/processor/ServicePointSynchronizationUpdateEventProcessor.java new file mode 100644 index 000000000..920b96301 --- /dev/null +++ b/src/main/java/org/folio/services/consortium/processor/ServicePointSynchronizationUpdateEventProcessor.java @@ -0,0 +1,66 @@ +package org.folio.services.consortium.processor; + +import static io.vertx.core.Future.failedFuture; + +import io.vertx.core.Future; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.folio.rest.impl.ServicePointApi; +import org.folio.rest.jaxrs.model.Servicepoint; +import org.folio.rest.persist.PostgresClient; +import org.folio.services.domainevent.DomainEvent; +import org.folio.services.domainevent.ServicePointEventType; +import org.folio.services.servicepoint.ServicePointService; + +public class ServicePointSynchronizationUpdateEventProcessor + extends ServicePointSynchronizationEventProcessor { + + private static final Logger log = LogManager.getLogger( + ServicePointSynchronizationUpdateEventProcessor.class); + + public ServicePointSynchronizationUpdateEventProcessor(DomainEvent domainEvent) { + super(ServicePointEventType.SERVICE_POINT_UPDATED, domainEvent); + } + + @Override + protected Future processEvent(ServicePointService servicePointService, String servicePointId) { + try { + Servicepoint servicepoint = PostgresClient.pojo2JsonObject(domainEvent.getNewEntity()) + .mapTo(Servicepoint.class); + return servicePointService.updateServicePoint(servicePointId, servicepoint) + .map(servicePointId); + } catch (Exception e) { + log.warn("processEvent:: failed due to {}", e.getMessage(), e); + return failedFuture(e); + } + } + + @Override + protected boolean validateEventEntity() { + try { + var oldServicePoint = PostgresClient.pojo2JsonObject(domainEvent.getOldEntity()) + .mapTo(Servicepoint.class); + Servicepoint newServicePoint = PostgresClient.pojo2JsonObject(domainEvent.getNewEntity()) + .mapTo(Servicepoint.class); + + if (oldServicePoint == null || newServicePoint == null) { + log.warn("validateEventEntity:: failed due to oldServicePoint {} newServicePoint {}", + oldServicePoint, newServicePoint); + return false; + } + if (newServicePoint.equals(oldServicePoint)) { + log.warn("validateEventEntity:: old/new service points are identical"); + return false; + } + String validationMessage = ServicePointApi.validateServicePoint(newServicePoint); + if (validationMessage != null) { + log.warn("validateEventEntity:: {}", validationMessage); + return false; + } + return true; + } catch (Exception e) { + log.error("validateEventEntity:: failed to {}", e.getMessage(), e); + } + return false; + } +} diff --git a/src/main/java/org/folio/services/domainevent/ServicePointDomainEventPublisher.java b/src/main/java/org/folio/services/domainevent/ServicePointDomainEventPublisher.java index 88201e6bc..3b0a2f46f 100644 --- a/src/main/java/org/folio/services/domainevent/ServicePointDomainEventPublisher.java +++ b/src/main/java/org/folio/services/domainevent/ServicePointDomainEventPublisher.java @@ -21,4 +21,8 @@ public Future publishUpdated(Servicepoint servicePoint, Servicepoint updat public Future publishDeleted(Servicepoint servicePoint) { return publishRecordRemoved(servicePoint.getId(), servicePoint); } + + public Future publishCreated(Servicepoint servicePoint) { + return publishRecordCreated(servicePoint.getId(), servicePoint); + } } diff --git a/src/main/java/org/folio/services/domainevent/ServicePointEventType.java b/src/main/java/org/folio/services/domainevent/ServicePointEventType.java new file mode 100644 index 000000000..9a97122e9 --- /dev/null +++ b/src/main/java/org/folio/services/domainevent/ServicePointEventType.java @@ -0,0 +1,31 @@ +package org.folio.services.domainevent; + +import static org.folio.InventoryKafkaTopic.SERVICE_POINT; +import static org.folio.services.domainevent.DomainEventType.CREATE; +import static org.folio.services.domainevent.DomainEventType.DELETE; +import static org.folio.services.domainevent.DomainEventType.UPDATE; + +import org.folio.kafka.services.KafkaTopic; + +public enum ServicePointEventType { + + SERVICE_POINT_CREATED(SERVICE_POINT, CREATE), + SERVICE_POINT_UPDATED(SERVICE_POINT, UPDATE), + SERVICE_POINT_DELETED(SERVICE_POINT, DELETE); + + private final KafkaTopic kafkaTopic; + private final DomainEventType payloadType; + + ServicePointEventType(KafkaTopic kafkaTopic, DomainEventType payloadType) { + this.kafkaTopic = kafkaTopic; + this.payloadType = payloadType; + } + + public KafkaTopic getKafkaTopic() { + return kafkaTopic; + } + + public DomainEventType getPayloadType() { + return payloadType; + } +} diff --git a/src/main/java/org/folio/services/servicepoint/ServicePointService.java b/src/main/java/org/folio/services/servicepoint/ServicePointService.java index d6b5a1c80..4c7cc6808 100644 --- a/src/main/java/org/folio/services/servicepoint/ServicePointService.java +++ b/src/main/java/org/folio/services/servicepoint/ServicePointService.java @@ -1,6 +1,9 @@ package org.folio.services.servicepoint; import static io.vertx.core.Future.succeededFuture; +import static org.folio.rest.impl.ServicePointApi.LOCATION_PREFIX; +import static org.folio.rest.jaxrs.resource.ServicePoints.PostServicePointsResponse.headersFor201; +import static org.folio.rest.jaxrs.resource.ServicePoints.PostServicePointsResponse.respond201WithApplicationJson; import io.vertx.core.Context; import io.vertx.core.Future; @@ -45,6 +48,15 @@ public Future updateServicePoint(String servicePointId, Servicepoint e .map(x -> ItemStorage.PutItemStorageItemsByItemIdResponse.respond204()); } + public Future createServicePoint(String servicePointId, Servicepoint servicePoint) { + servicePoint.setId(servicePointId); + return servicePointRepository.save(servicePointId, servicePoint) + .compose(notUsed -> + servicePointDomainEventPublisher.publishCreated(servicePoint) + .map(resp -> respond201WithApplicationJson(servicePoint, headersFor201() + .withLocation(LOCATION_PREFIX + servicePointId)))); + } + public Future deleteServicePoint(String servicePointId) { log.debug("deleteServicePoint:: parameters servicePointId: {}", servicePointId); diff --git a/src/main/java/org/folio/utils/Environment.java b/src/main/java/org/folio/utils/Environment.java new file mode 100644 index 000000000..ecd34e7a9 --- /dev/null +++ b/src/main/java/org/folio/utils/Environment.java @@ -0,0 +1,8 @@ +package org.folio.utils; + +public record Environment() { + + public static String getEnvVar(String key, String defaultVal) { + return System.getenv().getOrDefault(key, defaultVal); + } +} diff --git a/src/test/java/org/folio/rest/api/ServicePointSynchronizationVerticleTest.java b/src/test/java/org/folio/rest/api/ServicePointSynchronizationVerticleTest.java new file mode 100644 index 000000000..47eac1a43 --- /dev/null +++ b/src/test/java/org/folio/rest/api/ServicePointSynchronizationVerticleTest.java @@ -0,0 +1,321 @@ +package org.folio.rest.api; + +import static com.github.tomakehurst.wiremock.client.WireMock.equalToIgnoreCase; +import static java.lang.String.format; +import static java.net.HttpURLConnection.HTTP_NOT_FOUND; +import static java.net.HttpURLConnection.HTTP_OK; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG; +import static org.awaitility.Awaitility.waitAtMost; +import static org.folio.kafka.services.KafkaEnvironmentProperties.environment; +import static org.folio.kafka.services.KafkaEnvironmentProperties.host; +import static org.folio.kafka.services.KafkaEnvironmentProperties.port; +import static org.folio.rest.api.ServicePointTest.createHoldShelfExpiryPeriod; +import static org.folio.rest.support.http.InterfaceUrls.servicePointsUrl; +import static org.folio.rest.tools.utils.ModuleName.getModuleName; +import static org.folio.rest.tools.utils.ModuleName.getModuleVersion; +import static org.folio.services.domainevent.ServicePointEventType.SERVICE_POINT_CREATED; +import static org.folio.services.domainevent.ServicePointEventType.SERVICE_POINT_DELETED; +import static org.folio.services.domainevent.ServicePointEventType.SERVICE_POINT_UPDATED; +import static org.folio.utility.LocationUtility.createServicePoint; +import static org.folio.utility.ModuleUtility.getClient; +import static org.folio.utility.ModuleUtility.getVertx; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.github.tomakehurst.wiremock.client.WireMock; +import io.vertx.core.Future; +import io.vertx.core.Promise; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.http.HttpResponseHead; +import io.vertx.core.json.JsonArray; +import io.vertx.core.json.JsonObject; +import io.vertx.ext.unit.TestContext; +import io.vertx.ext.unit.junit.VertxUnitRunner; +import io.vertx.ext.web.client.HttpResponse; +import io.vertx.kafka.admin.KafkaAdminClient; +import io.vertx.kafka.client.common.TopicPartition; +import io.vertx.kafka.client.consumer.OffsetAndMetadata; +import io.vertx.kafka.client.producer.KafkaProducer; +import io.vertx.kafka.client.producer.KafkaProducerRecord; +import java.util.Locale; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import lombok.SneakyThrows; +import org.folio.okapi.common.XOkapiHeaders; +import org.folio.rest.jaxrs.model.Servicepoint; +import org.folio.utility.ModuleUtility; +import org.hamcrest.CoreMatchers; +import org.hamcrest.MatcherAssert; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import uk.org.webcompere.systemstubs.rules.EnvironmentVariablesRule; + +@RunWith(VertxUnitRunner.class) +public class ServicePointSynchronizationVerticleTest extends TestBaseWithInventoryUtil { + + private static final String CENTRAL_TENANT_ID = "consortium"; + private static final String COLLEGE_TENANT_ID = "college"; + private static final String SERVICE_POINT_TOPIC = format( + "%s.%s.inventory.service-point", environment(), CENTRAL_TENANT_ID); + private static final String KAFKA_SERVER_URL = format("%s:%s", host(), port()); + private static final String SERVICE_POINT_ID = UUID.randomUUID().toString(); + private static final String CONSORTIUM_ID = UUID.randomUUID().toString(); + private static final String CONSORTIUM_TENANTS_PATH = "/consortia/%s/tenants".formatted( + CONSORTIUM_ID); + private static final String ECS_TLR_FEATURE_ENABLED = "ECS_TLR_FEATURE_ENABLED"; + private static KafkaProducer producer; + private static KafkaAdminClient adminClient; + @Rule + public EnvironmentVariablesRule environmentVariablesRule = + new EnvironmentVariablesRule(ECS_TLR_FEATURE_ENABLED, "true"); + + @BeforeClass + public static void setUpClass() throws Exception { + ModuleUtility.prepareTenant(CENTRAL_TENANT_ID, false); + ModuleUtility.prepareTenant(COLLEGE_TENANT_ID, false); + + producer = createProducer(); + adminClient = createAdminClient(); + } + + @Before + public void setUp() { + clearData(CENTRAL_TENANT_ID); + clearData(COLLEGE_TENANT_ID); + mockUserTenantsForConsortiumMember(); + mockConsortiumTenants(); + mockUserTenantsForNonConsortiumMember(); + assertTrue(Boolean.parseBoolean(System.getenv().getOrDefault(ECS_TLR_FEATURE_ENABLED, + "false"))); + } + + @AfterClass + public static void tearDownClass() throws ExecutionException, InterruptedException, + TimeoutException { + + ModuleUtility.removeTenant(CENTRAL_TENANT_ID); + ModuleUtility.removeTenant(COLLEGE_TENANT_ID); + waitFor(producer.close().compose(v -> adminClient.close()) + ); + } + + @Test + public void shouldPropagateCreationOfServicePointOnLendingTenant(TestContext context) { + var servicePointFromCentralTenant = createServicePointAgainstTenant(CENTRAL_TENANT_ID, false); + + int initialOffset = getOffsetForServicePointCreateEvents(); + publishServicePointCreateEvent(servicePointFromCentralTenant); + waitUntilValueIsIncreased(initialOffset, + ServicePointSynchronizationVerticleTest::getOffsetForServicePointCreateEvents); + getServicePointById(COLLEGE_TENANT_ID) + .onComplete(context.asyncAssertSuccess(collegeServicePoint -> + context.assertEquals(servicePointFromCentralTenant.getId(), collegeServicePoint.getId()))); + } + + @Test + public void shouldPropagateUpdateOfServicePointOnLendingTenant(TestContext context) { + var servicePointFromCentralTenant = createServicePointAgainstTenant(CENTRAL_TENANT_ID, + true); + var servicePointFromDataTenant = createServicePointAgainstTenant(COLLEGE_TENANT_ID, + false); + + int initialOffset = getOffsetForServicePointUpdateEvents(); + publishServicePointUpdateEvent(servicePointFromDataTenant, servicePointFromCentralTenant); + waitUntilValueIsIncreased(initialOffset, + ServicePointSynchronizationVerticleTest::getOffsetForServicePointUpdateEvents); + getServicePointById(COLLEGE_TENANT_ID) + .onComplete(context.asyncAssertSuccess(collegeServicePoint -> + context.assertEquals(servicePointFromCentralTenant.getDiscoveryDisplayName(), + collegeServicePoint.getDiscoveryDisplayName()))); + } + + @Test + public void shouldPropagateDeleteOfServicePointOnLendingTenant(TestContext context) { + var servicePointFromCentralTenant = createServicePointAgainstTenant(CENTRAL_TENANT_ID, false); + var servicePointFromDataTenant = createServicePointAgainstTenant(COLLEGE_TENANT_ID, false); + + getServicePointById(COLLEGE_TENANT_ID) + .onComplete(context.asyncAssertSuccess(servicePoint -> + context.assertEquals(servicePointFromCentralTenant.getId(), + servicePointFromDataTenant.getId()))); + + int initialOffset = getOffsetForServicePointDeleteEvents(); + publishServicePointDeleteEvent(servicePointFromDataTenant); + waitUntilValueIsIncreased(initialOffset, + ServicePointSynchronizationVerticleTest::getOffsetForServicePointDeleteEvents); + getStatusCodeOfServicePointById(COLLEGE_TENANT_ID) + .onComplete(context.asyncAssertSuccess(statusCode -> + context.assertEquals(HTTP_NOT_FOUND, statusCode))); + } + + @SneakyThrows + public static T waitFor(Future future, int timeoutSeconds) { + return future.toCompletionStage() + .toCompletableFuture() + .get(timeoutSeconds, TimeUnit.SECONDS); + } + + public static T waitFor(Future future) { + return waitFor(future, 10); + } + + private Future getServicePointById(String tenantId) { + Promise> promise = Promise.promise(); + getClient().get(servicePointsUrl("/" + SERVICE_POINT_ID), tenantId, promise::complete); + return promise.future().map(resp -> { + MatcherAssert.assertThat(resp.statusCode(), CoreMatchers.is(HTTP_OK)); + return resp.bodyAsJson(Servicepoint.class); + }); + } + + private Future getStatusCodeOfServicePointById(String tenantId) { + Promise> promise = Promise.promise(); + getClient().get(servicePointsUrl("/" + SERVICE_POINT_ID), tenantId, promise::complete); + return promise.future().map(HttpResponseHead::statusCode); + } + + @SneakyThrows(Exception.class) + private static Servicepoint createServicePointAgainstTenant(String tenantId, boolean updated) { + String discoveryDisplayName = "Circulation Desk -- Basement" + (updated ? "(updated)" : ""); + return createServicePoint(UUID.fromString(SERVICE_POINT_ID), "Circ Desk 2522", "cd2522", + discoveryDisplayName, null, 20, + true, createHoldShelfExpiryPeriod(), tenantId) + .getJson().mapTo(Servicepoint.class); + } + + private static int waitUntilValueIsIncreased(int previousValue, Callable valueSupplier) { + return waitAtMost(60, SECONDS) + .until(valueSupplier, newValue -> newValue > previousValue); + } + + private static JsonObject buildCreateEvent(Servicepoint newVersion) { + return new JsonObject() + .put("tenant", CENTRAL_TENANT_ID) + .put("type", "CREATE") + .put("new", newVersion); + } + + private static JsonObject buildUpdateEvent(Servicepoint oldVersion, Servicepoint newVersion) { + return new JsonObject() + .put("tenant", CENTRAL_TENANT_ID) + .put("type", "UPDATE") + .put("old", oldVersion) + .put("new", newVersion); + } + + private static JsonObject buildDeleteEvent(Servicepoint object) { + return new JsonObject() + .put("tenant", CENTRAL_TENANT_ID) + .put("type", "DELETE") + .put("old", object); + } + + private void publishServicePointCreateEvent( + Servicepoint newServicePoint) { + + publishEvent(buildCreateEvent(newServicePoint)); + } + + private void publishServicePointUpdateEvent(Servicepoint oldServicePoint, + Servicepoint newServicePoint) { + + publishEvent(buildUpdateEvent(oldServicePoint, newServicePoint)); + } + + private void publishServicePointDeleteEvent(Servicepoint servicePoint) { + publishEvent(buildDeleteEvent(servicePoint)); + } + + private static Integer getOffsetForServicePointCreateEvents() { + return getOffset(buildConsumerGroupId(SERVICE_POINT_CREATED.name())); + } + + private static Integer getOffsetForServicePointUpdateEvents() { + return getOffset(buildConsumerGroupId(SERVICE_POINT_UPDATED.name())); + } + + private static Integer getOffsetForServicePointDeleteEvents() { + return getOffset(buildConsumerGroupId(SERVICE_POINT_DELETED.name())); + } + + private void publishEvent(JsonObject eventPayload) { + var kafkaRecord = KafkaProducerRecord.create(SERVICE_POINT_TOPIC, SERVICE_POINT_ID, + eventPayload); + kafkaRecord.addHeader("X-Okapi-Tenant".toLowerCase(Locale.ROOT), CENTRAL_TENANT_ID); + kafkaRecord.addHeader("X-Okapi-Token".toLowerCase(Locale.ROOT), + "test-token".toLowerCase(Locale.ROOT)); + kafkaRecord.addHeader("X-Okapi-Url", mockServer.baseUrl().toLowerCase(Locale.ROOT)); + waitFor(producer.write(kafkaRecord)); + } + + private static KafkaProducer createProducer() { + Properties config = new Properties(); + config.put(BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVER_URL); + config.put(ACKS_CONFIG, "1"); + + return KafkaProducer.create(getVertx(), config, String.class, JsonObject.class); + } + + private static KafkaAdminClient createAdminClient() { + Map config = Map.of(BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVER_URL); + return KafkaAdminClient.create(getVertx(), config); + } + + private static String buildConsumerGroupId(String eventType) { + return format("%s.%s-%s", eventType, getModuleName().replace("_", "-"), getModuleVersion()); + } + + private static int getOffset(String consumerGroupId) { + return waitFor( + adminClient.listConsumerGroupOffsets(consumerGroupId) + .map(partitions -> Optional.ofNullable(partitions.get(new TopicPartition(SERVICE_POINT_TOPIC, 0))) + .map(OffsetAndMetadata::getOffset) + .map(Long::intValue) + .orElse(0)) // if topic does not exist yet + ); + } + + private void mockUserTenantsForConsortiumMember() { + JsonObject userTenantsCollection = new JsonObject() + .put("userTenants", new JsonArray() + .add(new JsonObject() + .put("centralTenantId", CENTRAL_TENANT_ID) + .put("consortiumId", CONSORTIUM_ID))); + WireMock.stubFor(WireMock.get(USER_TENANTS_PATH) + .withHeader("X-Okapi-Tenant", equalToIgnoreCase(CENTRAL_TENANT_ID)) + .willReturn(WireMock.ok().withBody(userTenantsCollection.encodePrettily()))); + } + + private static void mockConsortiumTenants() { + JsonObject tenantsCollection = new JsonObject() + .put("tenants", new JsonArray() + .add(new JsonObject() + .put("id", CENTRAL_TENANT_ID) + .put("isCentral", true)) + .add(new JsonObject() + .put("id", COLLEGE_TENANT_ID) + .put("isCentral", false))); + WireMock.stubFor(WireMock.get(CONSORTIUM_TENANTS_PATH) + .willReturn(WireMock.ok().withBody(tenantsCollection.encodePrettily()))); + } + + public static void mockUserTenantsForNonConsortiumMember() { + JsonObject emptyUserTenantsCollection = new JsonObject() + .put("userTenants", JsonArray.of()); + WireMock.stubFor(WireMock.get(USER_TENANTS_PATH) + .withHeader(XOkapiHeaders.TENANT, equalToIgnoreCase(COLLEGE_TENANT_ID)) + .willReturn(WireMock.ok().withBody(emptyUserTenantsCollection.encodePrettily()))); + } + +}