Skip to content

Commit

Permalink
MODLD-503: Reindex LD instance if Suppress Flags changed (#5)
Browse files Browse the repository at this point in the history
MODLD-503: Reindex LD instance if Suppress Flags changed
  • Loading branch information
AndreiBordak authored Oct 22, 2024
1 parent aa97f9c commit 4f209e0
Show file tree
Hide file tree
Showing 18 changed files with 596 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.util.Map;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.folio.linked.data.domain.dto.InventoryInstanceEvent;
import org.folio.linked.data.domain.dto.SourceRecordDomainEvent;
import org.folio.spring.tools.kafka.FolioKafkaProperties;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
Expand Down Expand Up @@ -40,19 +41,39 @@ public FolioKafkaProperties folioKafkaProperties() {
public ConcurrentKafkaListenerContainerFactory<String, SourceRecordDomainEvent> srsEventListenerContainerFactory(
ConsumerFactory<String, SourceRecordDomainEvent> sourceRecordDomainEventConsumerFactory
) {
var factory = new ConcurrentKafkaListenerContainerFactory<String, SourceRecordDomainEvent>();
factory.setBatchListener(true);
factory.setConsumerFactory(sourceRecordDomainEventConsumerFactory);
return factory;
return concurrentKafkaBatchListenerContainerFactory(sourceRecordDomainEventConsumerFactory);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, InventoryInstanceEvent> inventoryEventListenerContainerFactory(
ConsumerFactory<String, InventoryInstanceEvent> inventoryInstanceEventConsumerFactory
) {
return concurrentKafkaBatchListenerContainerFactory(inventoryInstanceEventConsumerFactory);
}

@Bean
public ConsumerFactory<String, SourceRecordDomainEvent> sourceRecordDomainEventConsumerFactory() {
var deserializer = new ErrorHandlingDeserializer<>(new JsonDeserializer<>(SourceRecordDomainEvent.class, mapper));
return errorHandlingConsumerFactory(SourceRecordDomainEvent.class);
}

@Bean
public ConsumerFactory<String, InventoryInstanceEvent> inventoryInstanceEventConsumerFactory() {
return errorHandlingConsumerFactory(InventoryInstanceEvent.class);
}

private <K, V> ConcurrentKafkaListenerContainerFactory<K, V> concurrentKafkaBatchListenerContainerFactory(
ConsumerFactory<K, V> consumerFactory) {
var factory = new ConcurrentKafkaListenerContainerFactory<K, V>();
factory.setBatchListener(true);
factory.setConsumerFactory(consumerFactory);
return factory;
}

public <T> ConsumerFactory<String, T> errorHandlingConsumerFactory(Class<T> clazz) {
var deserializer = new ErrorHandlingDeserializer<>(new JsonDeserializer<>(clazz, mapper));
Map<String, Object> config = new HashMap<>(kafkaProperties.buildConsumerProperties(null));
config.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(VALUE_DESERIALIZER_CLASS_CONFIG, deserializer);
return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), deserializer);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package org.folio.linked.data.integration.kafka.listener;

import static java.util.Optional.ofNullable;
import static org.folio.linked.data.domain.dto.ResourceIndexEventType.UPDATE;
import static org.folio.linked.data.util.Constants.FOLIO_PROFILE;

import java.util.List;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.logging.log4j.Level;
import org.folio.linked.data.domain.dto.InventoryInstanceEvent;
import org.folio.linked.data.integration.kafka.listener.handler.InventoryInstanceEventHandler;
import org.folio.linked.data.service.tenant.TenantScopedExecutionService;
import org.springframework.context.annotation.Profile;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.retry.RetryContext;
import org.springframework.stereotype.Component;

@Component
@Log4j2
@Profile(FOLIO_PROFILE)
@RequiredArgsConstructor
public class InventoryInstanceEventListener {

private static final String INVENTORY_INSTANCE_EVENT_LISTENER = "mod-linked-data-inventory-instance-event-listener";
private static final String INVENTORY_EVENT_LISTENER_CONTAINER_FACTORY = "inventoryEventListenerContainerFactory";
private final TenantScopedExecutionService tenantScopedExecutionService;
private final InventoryInstanceEventHandler inventoryInstanceEventHandler;

@KafkaListener(
id = INVENTORY_INSTANCE_EVENT_LISTENER,
containerFactory = INVENTORY_EVENT_LISTENER_CONTAINER_FACTORY,
groupId = "#{folioKafkaProperties.listener['inventory-instance-event'].groupId}",
concurrency = "#{folioKafkaProperties.listener['inventory-instance-event'].concurrency}",
topicPattern = "#{folioKafkaProperties.listener['inventory-instance-event'].topicPattern}")
public void handleInventoryInstanceEvent(List<ConsumerRecord<String, InventoryInstanceEvent>> consumerRecords) {
consumerRecords.forEach(this::handleRecord);
}

private void handleRecord(ConsumerRecord<String, InventoryInstanceEvent> consumerRecord) {
var event = consumerRecord.value();
if (event.getType() == UPDATE) {
tenantScopedExecutionService.executeAsyncWithRetry(
consumerRecord.headers(),
retryContext -> runRetryableJob(event, retryContext),
ex -> logFailedEvent(event, ex, false)
);
}
}

private void runRetryableJob(InventoryInstanceEvent event, RetryContext retryContext) {
ofNullable(retryContext.getLastThrowable())
.ifPresent(ex -> logFailedEvent(event, ex, true));
inventoryInstanceEventHandler.handle(event);
}

private void logFailedEvent(InventoryInstanceEvent event, Throwable ex, boolean isRetrying) {
var logLevel = isRetrying ? Level.INFO : Level.ERROR;
log.log(logLevel, "Failed to reindex inventory instance with id {}. Retrying: {}",
event.getNew().getId(), isRetrying, ex);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package org.folio.linked.data.integration.kafka.listener.handler;

import static org.apache.commons.lang3.ObjectUtils.anyNull;
import static org.folio.linked.data.domain.dto.ResourceIndexEventType.UPDATE;
import static org.folio.linked.data.util.Constants.FOLIO_PROFILE;

import java.util.Objects;
import java.util.Optional;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.folio.linked.data.domain.dto.InventoryInstanceEvent;
import org.folio.linked.data.integration.kafka.sender.search.WorkUpdateMessageSender;
import org.folio.linked.data.model.entity.FolioMetadata;
import org.folio.linked.data.model.entity.Resource;
import org.folio.linked.data.repo.FolioMetadataRepository;
import org.springframework.context.annotation.Profile;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

@Log4j2
@Component
@Profile(FOLIO_PROFILE)
@RequiredArgsConstructor
public class InventoryInstanceEventHandler {

public static final String INSTANCE_REINDEX_NOT_REQUIRED = "Ignoring InventoryInstanceEvent '{}',"
+ " reindexing not required.";
public static final String SENT_FOR_REINDEXING = "Instance '{}' has some fields updated, sent for reindexing.";
public static final String SUPPRESS_FLAGS_CHANGED = "InventoryInstanceEvent:{} - changes in suppress flags: {}.";

private final FolioMetadataRepository folioMetadataRepository;
private final WorkUpdateMessageSender workUpdateMessageSender;

@Transactional
public void handle(InventoryInstanceEvent event) {
getOptionalReindexResource(event)
.ifPresentOrElse(this::reindexResource,
() -> log.debug(INSTANCE_REINDEX_NOT_REQUIRED, event.getId()));
}

private Optional<Resource> getOptionalReindexResource(InventoryInstanceEvent event) {
if (reindexNotRequired(event)) {
return Optional.empty();
}
return folioMetadataRepository.findByInventoryId(event.getNew().getId())
.map(metadata -> updateMetadataAndGetResource(metadata, event));
}

private void reindexResource(Resource resource) {
workUpdateMessageSender.produce(resource);
log.info(SENT_FOR_REINDEXING, resource.getId());
}

private Resource updateMetadataAndGetResource(FolioMetadata folioMetadata, InventoryInstanceEvent event) {
folioMetadata.setStaffSuppress(event.getNew().getStaffSuppress());
folioMetadata.setSuppressFromDiscovery(event.getNew().getDiscoverySuppress());
return folioMetadataRepository.save(folioMetadata).getResource();
}

private boolean reindexNotRequired(InventoryInstanceEvent event) {
if (notEqual(event.getType(), UPDATE) || anyNull(event.getOld(), event.getNew())) {
return true;
}
return !suppressFlagsChanged(event);
}

private boolean suppressFlagsChanged(InventoryInstanceEvent event) {
var newObj = event.getNew();
var oldObj = event.getOld();
var result = notEqual(newObj.getStaffSuppress(), oldObj.getStaffSuppress())
|| notEqual(newObj.getDiscoverySuppress(), oldObj.getDiscoverySuppress());
log.debug(SUPPRESS_FLAGS_CHANGED, event.getId(), result);
return result;
}

private boolean notEqual(Object first, Object second) {
return !Objects.equals(first, second);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import static java.util.Optional.ofNullable;
import static java.util.stream.Collectors.joining;
import static org.apache.commons.collections4.CollectionUtils.isNotEmpty;
import static org.apache.commons.lang3.ObjectUtils.allNull;
import static org.folio.ld.dictionary.PredicateDictionary.CLASSIFICATION;
import static org.folio.ld.dictionary.PredicateDictionary.CONTRIBUTOR;
import static org.folio.ld.dictionary.PredicateDictionary.CREATOR;
Expand Down Expand Up @@ -49,6 +50,7 @@
import org.folio.linked.data.domain.dto.LinkedDataContributor;
import org.folio.linked.data.domain.dto.LinkedDataInstanceOnly;
import org.folio.linked.data.domain.dto.LinkedDataInstanceOnlyPublicationsInner;
import org.folio.linked.data.domain.dto.LinkedDataInstanceOnlySuppress;
import org.folio.linked.data.domain.dto.LinkedDataNote;
import org.folio.linked.data.domain.dto.LinkedDataTitle;
import org.folio.linked.data.domain.dto.LinkedDataWork;
Expand Down Expand Up @@ -220,6 +222,7 @@ protected List<LinkedDataInstanceOnly> extractInstances(Resource resource) {
.notes(mapNotes(ir.getDoc(), InstanceMapperUnit.SUPPORTED_NOTES))
.contributors(extractContributors(ir))
.publications(extractPublications(ir))
.suppress(extractSuppress(ir))
.editionStatements(getPropertyValues(ir.getDoc(), EDITION_STATEMENT.getValue()).toList()))
.filter(bii -> isNotEmpty(bii.getTitles()) || isNotEmpty(bii.getIdentifiers())
|| isNotEmpty(bii.getContributors()) || isNotEmpty(bii.getPublications())
Expand All @@ -228,6 +231,16 @@ protected List<LinkedDataInstanceOnly> extractInstances(Resource resource) {
.toList();
}

private LinkedDataInstanceOnlySuppress extractSuppress(Resource resource) {
var metadata = resource.getFolioMetadata();
if (isNull(metadata) || allNull(metadata.getSuppressFromDiscovery(), metadata.getStaffSuppress())) {
return null;
}
return new LinkedDataInstanceOnlySuppress()
.fromDiscovery(metadata.getSuppressFromDiscovery())
.staff(metadata.getStaffSuppress());
}

protected List<LinkedDataInstanceOnlyPublicationsInner> extractPublications(Resource resource) {
return resource.getOutgoingEdges().stream()
.filter(re -> PE_PUBLICATION.getUri().equals(re.getPredicate().getUri()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ public class FolioMetadata {
@ToString.Exclude
private Resource resource;

@Column(name = "suppress_from_discovery")
private Boolean suppressFromDiscovery;

@Column(name = "staff_suppress")
private Boolean staffSuppress;

public FolioMetadata(Resource resource) {
this.resource = resource;
this.id = resource.getId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ public interface FolioMetadataRepository extends JpaRepository<FolioMetadata, Lo

boolean existsBySrsId(String srsId);

Optional<FolioMetadata> findByInventoryId(String inventoryId);

interface IdOnly {
Long getId();
}
Expand Down
4 changes: 4 additions & 0 deletions src/main/resources/application-folio.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ folio:
concurrency: ${KAFKA_SOURCE_RECORD_DOMAIN_EVENT_CONCURRENCY:1}
topic-pattern: ${KAFKA_SOURCE_RECORD_DOMAIN_EVENT_TOPIC_PATTERN:(${folio.environment}\.)(.*\.)srs.source_records}
group-id: ${folio.environment}-linked-data-source-record-domain-event-group
inventory-instance-event:
concurrency: ${KAFKA_INVENTORY_INSTANCE_EVENT_CONCURRENCY:1}
topic-pattern: ${KAFKA_INVENTORY_INSTANCE_EVENT_TOPIC_PATTERN:(${folio.environment}\.)(.*\.)inventory.instance}
group-id: ${folio.environment}-linked-data-inventory-instance-event-group
retry-interval-ms: ${KAFKA_RETRY_INTERVAL_MS:2000}
retry-delivery-attempts: ${KAFKA_RETRY_DELIVERY_ATTEMPTS:6}
topics:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ create table if not exists folio_metadata (
resource_hash bigint primary key references resources(resource_hash),
inventory_id text null unique,
srs_id text null unique,
source resource_source null
source resource_source null,
suppress_from_discovery boolean null,
staff_suppress boolean null
);

create index if not exists folio_metadata_inventory_id_idx on folio_metadata(inventory_id);
Expand All @@ -13,3 +15,5 @@ comment on column folio_metadata.resource_hash is 'The unique hash identifier fo
comment on column folio_metadata.inventory_id is 'ID of the inventory in FOLIO Inventory application';
comment on column folio_metadata.srs_id is 'ID of the source record in FOLIO SRS application';
comment on column folio_metadata.source is 'Source of the instance resource (ex. LINKED_DATA, MARC)';
comment on column folio_metadata.suppress_from_discovery is 'Suppress From Discovery value';
comment on column folio_metadata.staff_suppress is 'Staff Suppress value';
2 changes: 2 additions & 0 deletions src/main/resources/swagger.api/folio-modules.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,5 @@ components:
$ref: folio-modules/inventory/instanceIngressEvent.json
sourceRecordDomainEvent:
$ref: folio-modules/srs/sourceRecordDomainEvent.json
inventoryInstanceEvent:
$ref: folio-modules/inventory/inventoryInstanceEvent.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
{
"$schema": "http://json-schema.org/draft-04/schema#",
"description": "Inventory instance event data model",
"javaType": "org.folio.rest.jaxrs.model.InventoryInstance",
"type": "object",
"additionalProperties": false,
"properties": {
"id": {
"description": "UUID",
"$ref": "../common/uuid.json"
},
"staffSuppress": {
"description": "Suppress flags - Staff",
"type": "boolean"
},
"discoverySuppress": {
"description": "Suppress flags - From Discovery",
"type": "boolean"
}
},
"required": [
"id"
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
{
"$schema": "http://json-schema.org/draft-04/schema#",
"description": "Inventory instance event data model",
"javaType": "org.folio.rest.jaxrs.model.InventoryInstanceEvent",
"type": "object",
"additionalProperties": false,
"properties": {
"id": {
"description": "Id of the event",
"$ref": "../common/uuid.json"
},
"new": {
"description": "New state of the instance",
"type": "object",
"$ref": "inventoryInstance.json"
},
"old": {
"description": "Old state of the instance",
"type": "object",
"$ref": "inventoryInstance.json"
},
"type": {
"description": "Event type",
"$ref": "../search/resourceIndexEventType.json"
},
"tenant": {
"description": "Tenant id",
"type": "string"
},
"ts": {
"description": "Timestamp",
"type": "string"
}
},
"required": [
"id"
],
"excludedFromEqualsAndHashCode": [
"tenant",
"ts"
],
"x-implements": "org.folio.spring.tools.kafka.BaseKafkaMessage"
}
Loading

0 comments on commit 4f209e0

Please sign in to comment.