Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,8 @@ indent_style = space
max_line_length = 120
insert_final_newline = true
trim_trailing_whitespace = true

[*.java]
ij_java_class_count_to_use_import_on_demand = 9999
ij_java_names_count_to_use_import_on_demand = 9999
ij_java_imports_layout = $**,|,**
1 change: 1 addition & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
- Index new Hubs in mod-search [MODLD-894](https://folio-org.atlassian.net/browse/MODLD-894)
- Reinitialize edges after setting resource's ID. [MODLD-896](https://folio-org.atlassian.net/browse/MODLD-896)
- Update HubReferenceMapperUnit to support RELATED_TO predicate. [MODLD-893](https://folio-org.atlassian.net/browse/MODLD-893)
- LdImportOutputEvent Listener and Handler introduced. [MODLD-873](https://folio-org.atlassian.net/browse/MODLD-873)

## 1.0.4 (04-24-2025)
- Work Edit form - Instance read-only section: "Notes about the instance" data is not shown [MODLD-716](https://folio-org.atlassian.net/browse/MODLD-716)
Expand Down
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,12 @@
<interfaceOnly>true</interfaceOnly>
<useSpringBoot3>true</useSpringBoot3>
</configOptions>
<importMappings>
<importMapping>Resource=org.folio.ld.dictionary.model.Resource</importMapping>
</importMappings>
<typeMappings>
<typeMapping>object+Resource=Resource</typeMapping>
</typeMappings>
</configuration>
</execution>
<execution>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.util.Map;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.folio.linked.data.domain.dto.ImportOutputEvent;
import org.folio.linked.data.domain.dto.InventoryInstanceEvent;
import org.folio.linked.data.domain.dto.SourceRecordDomainEvent;
import org.folio.spring.tools.kafka.FolioKafkaProperties;
Expand Down Expand Up @@ -51,6 +52,13 @@ public ConcurrentKafkaListenerContainerFactory<String, InventoryInstanceEvent> i
return concurrentKafkaBatchListenerContainerFactory(inventoryInstanceEventConsumerFactory);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, ImportOutputEvent> importOutputEventListenerContainerFactory(
ConsumerFactory<String, ImportOutputEvent> ldImportOutputEventConsumerFactory
) {
return concurrentKafkaBatchListenerContainerFactory(ldImportOutputEventConsumerFactory);
}

@Bean
public ConsumerFactory<String, SourceRecordDomainEvent> sourceRecordDomainEventConsumerFactory() {
return errorHandlingConsumerFactory(SourceRecordDomainEvent.class);
Expand All @@ -61,6 +69,11 @@ public ConsumerFactory<String, InventoryInstanceEvent> inventoryInstanceEventCon
return errorHandlingConsumerFactory(InventoryInstanceEvent.class);
}

@Bean
public ConsumerFactory<String, ImportOutputEvent> ldImportOutputEventConsumerFactory() {
return errorHandlingConsumerFactory(ImportOutputEvent.class);
}

private <K, V> ConcurrentKafkaListenerContainerFactory<K, V> concurrentKafkaBatchListenerContainerFactory(
ConsumerFactory<K, V> consumerFactory) {
var factory = new ConcurrentKafkaListenerContainerFactory<K, V>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.logging.log4j.Level;
import org.folio.linked.data.domain.dto.InventoryInstanceEvent;
import org.folio.linked.data.integration.kafka.listener.handler.InventoryInstanceEventHandler;
import org.folio.linked.data.integration.kafka.listener.handler.ExternalEventHandler;
import org.folio.linked.data.service.tenant.LinkedDataTenantService;
import org.folio.linked.data.service.tenant.TenantScopedExecutionService;
import org.springframework.context.annotation.Profile;
Expand All @@ -27,7 +27,7 @@ public class InventoryInstanceEventListener implements LinkedDataListener<Invent
private static final String INVENTORY_INSTANCE_EVENT_LISTENER = "mod-linked-data-inventory-instance-event-listener";
private static final String INVENTORY_EVENT_LISTENER_CONTAINER_FACTORY = "inventoryEventListenerContainerFactory";
private final TenantScopedExecutionService tenantScopedExecutionService;
private final InventoryInstanceEventHandler inventoryInstanceEventHandler;
private final ExternalEventHandler<InventoryInstanceEvent> inventoryInstanceEventHandler;
private final LinkedDataTenantService linkedDataTenantService;

@KafkaListener(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package org.folio.linked.data.integration.kafka.listener;

import static java.util.Optional.ofNullable;
import static org.folio.linked.data.util.Constants.STANDALONE_PROFILE;

import java.util.List;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.logging.log4j.Level;
import org.folio.linked.data.domain.dto.ImportOutputEvent;
import org.folio.linked.data.integration.kafka.listener.handler.ExternalEventHandler;
import org.folio.linked.data.service.tenant.LinkedDataTenantService;
import org.folio.linked.data.service.tenant.TenantScopedExecutionService;
import org.springframework.context.annotation.Profile;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.retry.RetryContext;
import org.springframework.stereotype.Component;

@Log4j2
@Component
@RequiredArgsConstructor
@Profile("!" + STANDALONE_PROFILE)
public class LdImportOutputEventListener implements LinkedDataListener<ImportOutputEvent> {

private static final String LISTENER_ID = "mod-linked-data-ld-import-event-listener";
private static final String CONTAINER_FACTORY = "importOutputEventListenerContainerFactory";
private final ExternalEventHandler<ImportOutputEvent> ldImportOutputEventHandler;
private final TenantScopedExecutionService tenantScopedExecutionService;
private final LinkedDataTenantService linkedDataTenantService;

@KafkaListener(
id = LISTENER_ID,
containerFactory = CONTAINER_FACTORY,
groupId = "#{folioKafkaProperties.listener['ld-import-output-event'].groupId}",
concurrency = "#{folioKafkaProperties.listener['ld-import-output-event'].concurrency}",
topicPattern = "#{folioKafkaProperties.listener['ld-import-output-event'].topicPattern}")
public void handleImportOutputEvent(List<ConsumerRecord<String, ImportOutputEvent>> consumerRecords) {
handle(consumerRecords, this::handleRecord, linkedDataTenantService, log);
}

@Override
public String getEventId(ImportOutputEvent event) {
return event.getTs();
}

private void handleRecord(ConsumerRecord<String, ImportOutputEvent> consumerRecord) {
var event = consumerRecord.value();
tenantScopedExecutionService.executeAsyncWithRetry(
consumerRecord.headers(),
retryContext -> runRetryableJob(event, retryContext),
ex -> logFailedEvent(event, ex, false)
);
}

private void runRetryableJob(ImportOutputEvent event, RetryContext retryContext) {
ofNullable(retryContext.getLastThrowable())
.ifPresent(ex -> logFailedEvent(event, ex, true));
ldImportOutputEventHandler.handle(event);
}

private void logFailedEvent(ImportOutputEvent event, Throwable ex, boolean isRetrying) {
var logLevel = isRetrying ? Level.INFO : Level.ERROR;
log.log(logLevel, "Failed to handle LD-Import output event with id {}. Retrying: {}",
event.getTs(), isRetrying, ex);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import org.apache.logging.log4j.Level;
import org.folio.linked.data.domain.dto.SourceRecordDomainEvent;
import org.folio.linked.data.domain.dto.SourceRecordType;
import org.folio.linked.data.integration.kafka.listener.handler.SourceRecordDomainEventHandler;
import org.folio.linked.data.integration.kafka.listener.handler.srs.SourceRecordDomainEventHandler;
import org.folio.linked.data.service.tenant.LinkedDataTenantService;
import org.folio.linked.data.service.tenant.TenantScopedExecutionService;
import org.springframework.context.annotation.Profile;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package org.folio.linked.data.integration.kafka.listener.handler;

public interface ExternalEventHandler<T> {

void handle(T event);
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
@Component
@RequiredArgsConstructor
@Profile("!" + STANDALONE_PROFILE)
public class InventoryInstanceEventHandler {
public class InventoryInstanceEventHandler implements ExternalEventHandler<InventoryInstanceEvent> {

private static final String INSTANCE_REINDEX_NOT_REQUIRED = "Ignoring InventoryInstanceEvent '{}',"
+ " reindexing not required.";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package org.folio.linked.data.integration.kafka.listener.handler;

import static org.folio.linked.data.util.Constants.STANDALONE_PROFILE;

import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.folio.linked.data.domain.dto.ImportOutputEvent;
import org.folio.linked.data.mapper.ResourceModelMapper;
import org.folio.linked.data.service.resource.events.ResourceEventsPublisher;
import org.folio.linked.data.service.resource.graph.ResourceGraphService;
import org.springframework.context.annotation.Profile;
import org.springframework.stereotype.Component;

@Log4j2
@Component
@RequiredArgsConstructor
@Profile("!" + STANDALONE_PROFILE)
public class LdImportOutputEventHandler implements ExternalEventHandler<ImportOutputEvent> {
private final ResourceModelMapper resourceModelMapper;
private final ResourceGraphService resourceGraphService;
private final ResourceEventsPublisher resourceEventsPublisher;

public void handle(ImportOutputEvent event) {
event.getResources().forEach(resource -> {
try {
var entity = resourceModelMapper.toEntity(resource);
var saveGraphResult = resourceGraphService.saveMergingGraph(entity);
resourceEventsPublisher.emitEventsForCreateAndUpdate(saveGraphResult, null);
} catch (Exception e) {
log.error("Exception during LD Import output resource with id = {} saving", resource.getId(), e);
}
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package org.folio.linked.data.integration.kafka.listener.handler.srs;

import org.folio.linked.data.domain.dto.SourceRecordDomainEvent;
import org.folio.linked.data.domain.dto.SourceRecordType;

public interface SourceRecordDomainEventHandler {

void handle(SourceRecordDomainEvent event, SourceRecordType recordType);
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package org.folio.linked.data.integration.kafka.listener.handler;
package org.folio.linked.data.integration.kafka.listener.handler.srs;

import static java.util.Objects.isNull;
import static org.apache.commons.lang3.ObjectUtils.isEmpty;
Expand Down Expand Up @@ -27,7 +27,7 @@
@Component
@RequiredArgsConstructor
@Profile("!" + STANDALONE_PROFILE)
public class SourceRecordDomainEventHandler {
public class SourceRecordDomainEventHandlerImpl implements SourceRecordDomainEventHandler {

private static final String EVENT_SAVED = "SourceRecordDomainEvent [id {}] was saved as {} resource [id {}]";
private static final String EMPTY_RESOURCE_MAPPED = "Empty resource(s) mapped from SourceRecordDomainEvent [id {}]";
Expand All @@ -43,6 +43,7 @@ public class SourceRecordDomainEventHandler {
private final MarcBib2ldMapper marcBib2ldMapper;

@SuppressWarnings("java:S125")
@Override
public void handle(SourceRecordDomainEvent event, SourceRecordType recordType) {
if (notProcessableEvent(event, recordType)) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ private ImportFileResponseDto save(Set<org.folio.ld.dictionary.model.Resource> r
.map(resource -> {
metadataService.ensure(resource);
var saveGraphResult = resourceGraphService.saveMergingGraph(resource);
resourceEventsPublisher.emitEventsForCreate(saveGraphResult);
resourceEventsPublisher.emitEventsForCreateAndUpdate(saveGraphResult, null);
report.addImport(
new ImportUtils.ImportedResource(
saveGraphResult.rootResource().getId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ private Resource createResourceAndPublishEvents(Resource resourceToSave, Integer
metadataService.ensure(resourceToSave);
var saveResult = resourceGraphService.saveMergingGraph(resourceToSave);
resourceProfileService.linkResourceToProfile(saveResult.rootResource(), profileId);
eventsPublisher.emitEventsForCreate(saveResult);
eventsPublisher.emitEventsForCreateAndUpdate(saveResult, null);
return saveResult.rootResource();
}

Expand All @@ -170,7 +170,7 @@ private Resource updateResourceAndPublishEvents(Resource resourceToSave, Resourc
var savedResource = saveResult.rootResource();
rawMarcService.saveRawMarc(savedResource, unmappedMarc);
resourceProfileService.linkResourceToProfile(savedResource, profileId);
eventsPublisher.emitEventsForUpdate(old, saveResult);
eventsPublisher.emitEventsForCreateAndUpdate(saveResult, old);
return savedResource;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,8 @@
import org.folio.linked.data.service.resource.graph.SaveGraphResult;

public interface ResourceEventsPublisher {
void emitEventsForCreate(SaveGraphResult saveGraphResult);

void emitEventsForUpdate(SaveGraphResult saveGraphResult);

void emitEventsForUpdate(Resource oldResource, SaveGraphResult saveGraphResult);
void emitEventsForCreateAndUpdate(SaveGraphResult saveGraphResult, Resource oldResource);

void emitEventForDelete(Resource deletedResource);
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package org.folio.linked.data.service.resource.events;

import static java.util.Objects.isNull;
import static java.util.stream.Collectors.toSet;

import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import lombok.RequiredArgsConstructor;
import org.folio.linked.data.model.entity.Resource;
import org.folio.linked.data.model.entity.event.ResourceCreatedEvent;
import org.folio.linked.data.model.entity.event.ResourceDeletedEvent;
import org.folio.linked.data.model.entity.event.ResourceEvent;
import org.folio.linked.data.model.entity.event.ResourceReplacedEvent;
import org.folio.linked.data.model.entity.event.ResourceUpdatedEvent;
import org.folio.linked.data.service.resource.graph.SaveGraphResult;
Expand All @@ -20,44 +21,34 @@ public class ResourceEventsPublisherImpl implements ResourceEventsPublisher {
private final ApplicationEventPublisher applicationEventPublisher;

@Override
public void emitEventsForCreate(SaveGraphResult saveGraphResult) {
emitResourceCreatedEvents(saveGraphResult.newResources());
}

@Override
public void emitEventsForUpdate(SaveGraphResult saveGraphResult) {
emitEventsForUpdate(null, saveGraphResult);
}

@Override
public void emitEventsForUpdate(Resource oldResource, SaveGraphResult saveGraphResult) {
public void emitEventsForCreateAndUpdate(SaveGraphResult saveGraphResult, Resource oldResource) {
var rootResource = saveGraphResult.rootResource();
var rootResourceEvent = (oldResource == null || Objects.equals(oldResource.getId(), rootResource.getId()))
? new ResourceUpdatedEvent(rootResource)
: new ResourceReplacedEvent(oldResource, rootResource.getId());

emitEvent(rootResourceEvent);
emitResourceCreatedEvents(
if (isNull(oldResource) && saveGraphResult.newResources().contains(rootResource)) {
applicationEventPublisher.publishEvent(new ResourceCreatedEvent(rootResource));
} else if (isNull(oldResource) || Objects.equals(oldResource.getId(), rootResource.getId())) {
applicationEventPublisher.publishEvent(new ResourceUpdatedEvent(rootResource));
} else {
applicationEventPublisher.publishEvent(new ResourceReplacedEvent(oldResource, rootResource.getId()));
}

emitNewLinkedResourceEvents(
saveGraphResult.newResources()
.stream()
.filter(r -> !Objects.equals(r.getId(), rootResource.getId()))
.collect(Collectors.toSet())
.collect(toSet())
);
}

@Override
public void emitEventForDelete(Resource deletedResource) {
emitEvent(new ResourceDeletedEvent(deletedResource));
applicationEventPublisher.publishEvent(new ResourceDeletedEvent(deletedResource));
}

private void emitResourceCreatedEvents(Set<Resource> createdResources) {
private void emitNewLinkedResourceEvents(Set<Resource> createdResources) {
createdResources
.stream()
.map(ResourceCreatedEvent::new)
.forEach(this::emitEvent);
.forEach(applicationEventPublisher::publishEvent);
}

private void emitEvent(ResourceEvent event) {
applicationEventPublisher.publishEvent(event);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ private Resource saveAndPublishEvents(org.folio.ld.dictionary.model.Resource mod
var newResource = saveGraphResult.rootResource();
refreshWork(newResource);
saveUnmappedMarc(modelResource, newResource);
eventsPublisher.emitEventsForUpdate(saveGraphResult);
eventsPublisher.emitEventsForCreateAndUpdate(saveGraphResult, null);
return newResource;
}

Expand Down
4 changes: 4 additions & 0 deletions src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ folio:
concurrency: ${KAFKA_INVENTORY_INSTANCE_EVENT_CONCURRENCY:1}
topic-pattern: ${KAFKA_INVENTORY_INSTANCE_EVENT_TOPIC_PATTERN:(${folio.environment}\.)(.*\.)inventory.instance}
group-id: ${folio.environment}-linked-data-inventory-instance-event-group
ld-import-output-event:
concurrency: ${KAFKA_LD_IMPORT_OUTPUT_EVENT_CONCURRENCY:1}
topic-pattern: ${KAFKA_LD_IMPORT_OUTPUT_EVENT_TOPIC_PATTERN:(${folio.environment}\.)(.*\.)linked_data_import.output}
group-id: ${folio.environment}-linked-data-import-output-event-group
retry-interval-ms: ${KAFKA_RETRY_INTERVAL_MS:2000}
retry-delivery-attempts: ${KAFKA_RETRY_DELIVERY_ATTEMPTS:6}
topics:
Expand Down
2 changes: 2 additions & 0 deletions src/main/resources/swagger.api/folio-modules.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,5 @@ components:
$ref: folio-modules/inventory/inventoryInstanceEvent.json
configurationsDto:
$ref: folio-modules/configuration/configurations.json
importOutputEvent:
$ref: folio-modules/ld-import/importOutputEvent.json
Loading