Skip to content

Commit

Permalink
MODSOURCE-817: Fix data consistency in handling and updating Marc Bib…
Browse files Browse the repository at this point in the history
… records for links.instance-authority event (#652)

- make the handling of events for instance-authority links so that no Marc Bib modifications from are lost due to concurrent nature of handling events
- make sure the event of the first in a sequence of updates sent by the producer and then received by the consumer is handled before with persisting the changes of related bibs than the events produced/consumed later

Closes: MODSOURCE-817
  • Loading branch information
mukhiddin-yusuf committed Dec 27, 2024
1 parent 8fd13a5 commit a604acb
Show file tree
Hide file tree
Showing 18 changed files with 764 additions and 418 deletions.
3 changes: 3 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 2025-XX-XX 5.8.8-SNAPSHOT
* [MODSOURCE-817](https://folio-org.atlassian.net/browse/MODSOURCE-817) Fix data consistency in handling and updating Marc Bib records for links.instance-authority event

## 2024-09-03 v5.8.7
* [MODSOURMAN-1200](https://folio-org.atlassian.net/browse/MODSOURMAN-1200) (ECS) Fix update of shared Instances from a member tenant

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package org.folio.consumers;

import static java.util.Collections.emptyList;
import static java.util.Objects.nonNull;
import static org.apache.commons.collections4.CollectionUtils.isEmpty;
import static org.apache.commons.lang3.StringUtils.EMPTY;
Expand All @@ -10,7 +9,9 @@
import static org.folio.consumers.RecordMappingUtils.readParsedContentToObjectRepresentation;
import static org.folio.rest.jaxrs.model.LinkUpdateReport.Status.FAIL;
import static org.folio.services.util.EventHandlingUtil.createProducer;
import static org.folio.services.util.EventHandlingUtil.toOkapiHeaders;
import static org.folio.services.util.KafkaUtil.extractHeaderValue;
import static org.folio.util.AuthorityLinksUtils.getAuthorityIdSubfield;

import io.vertx.core.Future;
import io.vertx.core.Promise;
Expand All @@ -24,7 +25,6 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
Expand All @@ -33,8 +33,6 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.folio.dao.util.IdType;
import org.folio.dao.util.RecordDaoUtil;
import org.folio.dao.util.RecordType;
import org.folio.kafka.AsyncRecordHandler;
import org.folio.kafka.KafkaConfig;
Expand All @@ -53,17 +51,16 @@
import org.folio.rest.jaxrs.model.UpdateTarget;
import org.folio.services.RecordService;
import org.folio.services.SnapshotService;
import org.folio.services.entities.RecordsModifierOperator;
import org.folio.services.handlers.links.DeleteLinkProcessor;
import org.folio.services.handlers.links.LinkProcessor;
import org.folio.services.handlers.links.UpdateLinkProcessor;
import org.marc4j.marc.Subfield;
import org.marc4j.marc.impl.DataFieldImpl;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Component
public class AuthorityLinkChunkKafkaHandler implements AsyncRecordHandler<String, String> {
public static final char AUTHORITY_ID_SUBFIELD = '9';
private static final AtomicLong INDEXER = new AtomicLong();
private static final Logger LOGGER = LogManager.getLogger();
private final Map<KafkaTopic, KafkaProducer<String, String>> producers = new HashMap<>();
Expand All @@ -87,20 +84,27 @@ public AuthorityLinkChunkKafkaHandler(RecordService recordService, KafkaConfig k
@Override
public Future<String> handle(KafkaConsumerRecord<String, String> consumerRecord) {
LOGGER.trace("handle:: Handling kafka record: {}", consumerRecord);
LOGGER.info("handle:: Start Handling kafka record");
var userId = extractHeaderValue(XOkapiHeaders.USER_ID, consumerRecord.headers());
return mapToEvent(consumerRecord)

var result = mapToEvent(consumerRecord)
.compose(this::createSnapshot)
.compose(event -> retrieveRecords(event, event.getTenant())
.compose(recordCollection -> mapRecordFieldsChanges(event, recordCollection, userId))
.compose(recordCollection -> recordService.saveRecords(recordCollection, event.getTenant()))
.map(recordsBatchResponse -> sendReports(recordsBatchResponse, event, consumerRecord.headers()))
.map(recordsBatchResponse -> mapRecordsToBibUpdateEvents(recordsBatchResponse, event))
.compose(marcBibUpdates -> sendEvents(marcBibUpdates, event, consumerRecord))
).recover(th -> {
LOGGER.error("Failed to handle {} event", MARC_BIB.moduleTopicName(), th);
return Future.failedFuture(th);
}
);
.compose(linksUpdate -> {
var instanceIds = getBibRecordExternalIds(linksUpdate);
var okapiHeaders = toOkapiHeaders(consumerRecord.headers(), linksUpdate.getTenant());
RecordsModifierOperator recordsModifier = recordsCollection ->
this.mapRecordFieldsChanges(linksUpdate, recordsCollection, userId);

return recordService.saveRecordsByExternalIds(instanceIds, RecordType.MARC_BIB, recordsModifier, okapiHeaders)
.compose(recordsBatchResponse -> {
sendReports(recordsBatchResponse, linksUpdate, consumerRecord.headers());
var marcBibUpdateStats = mapRecordsToBibUpdateEvents(recordsBatchResponse, linksUpdate);
return sendEvents(marcBibUpdateStats, linksUpdate, consumerRecord);
});
});

LOGGER.info("handle:: Finish Handling kafka record");
return result;
}

private Future<BibAuthorityLinksUpdate> mapToEvent(KafkaConsumerRecord<String, String> consumerRecord) {
Expand All @@ -115,98 +119,104 @@ private Future<BibAuthorityLinksUpdate> mapToEvent(KafkaConsumerRecord<String, S
}
}

private Future<RecordCollection> retrieveRecords(BibAuthorityLinksUpdate bibAuthorityLinksUpdate, String tenantId) {
LOGGER.trace("Retrieving bibs for jobId {}, authorityId {}",
bibAuthorityLinksUpdate.getJobId(), bibAuthorityLinksUpdate.getAuthorityId());
var instanceIds = bibAuthorityLinksUpdate.getUpdateTargets().stream()
private List<String> getBibRecordExternalIds(BibAuthorityLinksUpdate linksUpdate) {
return linksUpdate.getUpdateTargets().stream()
.flatMap(updateTarget -> updateTarget.getLinks().stream()
.map(Link::getInstanceId))
.distinct()
.collect(Collectors.toList());
.toList();
}

var condition = RecordDaoUtil.getExternalIdsCondition(instanceIds, IdType.INSTANCE)
.and(RecordDaoUtil.filterRecordByDeleted(false));
return recordService.getRecords(condition, RecordType.MARC_BIB, emptyList(), 0, instanceIds.size(), tenantId);
private Future<BibAuthorityLinksUpdate> createSnapshot(BibAuthorityLinksUpdate bibAuthorityLinksUpdate) {
var now = new Date();
var snapshot = new Snapshot()
.withJobExecutionId(bibAuthorityLinksUpdate.getJobId())
.withStatus(Snapshot.Status.COMMITTED)
.withProcessingStartedDate(now)
.withMetadata(new Metadata()
.withCreatedDate(now)
.withUpdatedDate(now));

return snapshotService.saveSnapshot(snapshot, bibAuthorityLinksUpdate.getTenant())
.map(result -> bibAuthorityLinksUpdate);
}

private Future<RecordCollection> mapRecordFieldsChanges(BibAuthorityLinksUpdate bibAuthorityLinksUpdate,
RecordCollection recordCollection, String userId) {

private RecordCollection mapRecordFieldsChanges(BibAuthorityLinksUpdate bibAuthorityLinksUpdate,
RecordCollection recordCollection, String userId) {
LOGGER.debug("Retrieved {} bib records for jobId {}, authorityId {}",
recordCollection.getTotalRecords(), bibAuthorityLinksUpdate.getJobId(), bibAuthorityLinksUpdate.getAuthorityId());

return getLinkProcessorForEvent(bibAuthorityLinksUpdate).map(linkProcessor -> {
recordCollection.getRecords().forEach(bibRecord -> {
var newRecordId = UUID.randomUUID().toString();
var instanceId = bibRecord.getExternalIdsHolder().getInstanceId();
var parsedRecord = bibRecord.getParsedRecord();
var parsedRecordContent = readParsedContentToObjectRepresentation(bibRecord);
var fields = new LinkedList<>(parsedRecordContent.getDataFields());

var updateTargetFieldCodes = extractUpdateTargetFieldCodesForInstance(bibAuthorityLinksUpdate, instanceId);
var subfieldChanges = bibAuthorityLinksUpdate.getSubfieldsChanges().stream()
.filter(subfieldsChange -> updateTargetFieldCodes.contains(subfieldsChange.getField()))
.collect(Collectors.toMap(SubfieldsChange::getField, SubfieldsChange::getSubfields));

fields.forEach(field -> {
if (!updateTargetFieldCodes.contains(field.getTag())) {
return;
}

var subfields = field.getSubfields();
if (isEmpty(subfields)) {
return;
}

var authorityId = getAuthorityIdSubfield(subfields);
if (authorityId.isEmpty() || !bibAuthorityLinksUpdate.getAuthorityId().equals(authorityId.get().getData())) {
return;
}

var newSubfields = linkProcessor.process(field.getTag(), subfieldChanges.get(field.getTag()), subfields);
LOGGER.trace("JobId {}, AuthorityId {}, instanceId {}, field {}, old subfields: {}, new subfields: {}",
bibAuthorityLinksUpdate.getJobId(), bibAuthorityLinksUpdate.getAuthorityId(),
instanceId, field.getTag(), subfields, newSubfields);

var newField = new DataFieldImpl(field.getTag(), field.getIndicator1(), field.getIndicator2());
newSubfields.forEach(newField::addSubfield);

var dataFields = parsedRecordContent.getDataFields();
var fieldPosition = dataFields.indexOf(field);
dataFields.remove(fieldPosition);
dataFields.add(fieldPosition, newField);
});

parsedRecord.setContent(mapObjectRepresentationToParsedContentJsonString(parsedRecordContent));
parsedRecord.setFormattedContent(EMPTY);
parsedRecord.setId(newRecordId);
bibRecord.setId(newRecordId);
bibRecord.getRawRecord().setId(newRecordId);
bibRecord.setSnapshotId(bibAuthorityLinksUpdate.getJobId());
setUpdatedBy(bibRecord, userId);
var linkProcessor = getLinkProcessorForEvent(bibAuthorityLinksUpdate);
recordCollection.getRecords().forEach(bibRecord -> {
var newRecordId = UUID.randomUUID().toString();
var instanceId = bibRecord.getExternalIdsHolder().getInstanceId();
var parsedRecord = bibRecord.getParsedRecord();
var parsedRecordContent = readParsedContentToObjectRepresentation(bibRecord);
var fields = new LinkedList<>(parsedRecordContent.getDataFields());

var updateTargetFieldCodes = extractUpdateTargetFieldCodesForInstance(bibAuthorityLinksUpdate, instanceId);
var subfieldChanges = bibAuthorityLinksUpdate.getSubfieldsChanges().stream()
.filter(subfieldsChange -> updateTargetFieldCodes.contains(subfieldsChange.getField()))
.collect(Collectors.toMap(SubfieldsChange::getField, SubfieldsChange::getSubfields));

fields.forEach(field -> {
if (!updateTargetFieldCodes.contains(field.getTag())) {
return;
}

var subfields = field.getSubfields();
if (isEmpty(subfields)) {
return;
}

var authorityId = getAuthorityIdSubfield(subfields);
if (authorityId.isEmpty() || !bibAuthorityLinksUpdate.getAuthorityId().equals(authorityId.get().getData())) {
return;
}

var newSubfields = linkProcessor.process(field.getTag(), subfieldChanges.get(field.getTag()), subfields);
LOGGER.trace("JobId {}, AuthorityId {}, instanceId {}, field {}, old subfields: {}, new subfields: {}",
bibAuthorityLinksUpdate.getJobId(), bibAuthorityLinksUpdate.getAuthorityId(),
instanceId, field.getTag(), subfields, newSubfields);

var newField = new DataFieldImpl(field.getTag(), field.getIndicator1(), field.getIndicator2());
newSubfields.forEach(newField::addSubfield);

var dataFields = parsedRecordContent.getDataFields();
var fieldPosition = dataFields.indexOf(field);
dataFields.remove(fieldPosition);
dataFields.add(fieldPosition, newField);
});

return recordCollection;
parsedRecord.setContent(mapObjectRepresentationToParsedContentJsonString(parsedRecordContent));
parsedRecord.setFormattedContent(EMPTY);
parsedRecord.setId(newRecordId);
bibRecord.setId(newRecordId);
bibRecord.getRawRecord().setId(newRecordId);
bibRecord.setSnapshotId(bibAuthorityLinksUpdate.getJobId());
setUpdatedBy(bibRecord, userId);
});
return recordCollection;
}

private Future<LinkProcessor> getLinkProcessorForEvent(BibAuthorityLinksUpdate bibAuthorityLinksUpdate) {
private LinkProcessor getLinkProcessorForEvent(BibAuthorityLinksUpdate bibAuthorityLinksUpdate) {
var eventType = bibAuthorityLinksUpdate.getType();
switch (eventType) {
case DELETE: {
case DELETE -> {
LOGGER.debug("Precessing DELETE event for jobId {}, authorityId {}",
bibAuthorityLinksUpdate.getJobId(), bibAuthorityLinksUpdate.getAuthorityId());
return Future.succeededFuture(new DeleteLinkProcessor());
return new DeleteLinkProcessor();
}
case UPDATE: {
case UPDATE -> {
LOGGER.debug("Precessing UPDATE event for jobId {}, authorityId {}",
bibAuthorityLinksUpdate.getJobId(), bibAuthorityLinksUpdate.getAuthorityId());
return Future.succeededFuture(new UpdateLinkProcessor());
return new UpdateLinkProcessor();
}
default: {
return Future.failedFuture(new IllegalArgumentException(
default ->
throw new IllegalArgumentException(
String.format("Unsupported event type: %s for jobId %s, authorityId %s",
eventType, bibAuthorityLinksUpdate.getJobId(), bibAuthorityLinksUpdate.getAuthorityId())));
}
eventType, bibAuthorityLinksUpdate.getJobId(), bibAuthorityLinksUpdate.getAuthorityId()));
}
}

Expand All @@ -219,12 +229,6 @@ private List<String> extractUpdateTargetFieldCodesForInstance(BibAuthorityLinksU
.collect(Collectors.toList());
}

private Optional<Subfield> getAuthorityIdSubfield(List<Subfield> subfields) {
return subfields.stream()
.filter(subfield -> subfield.getCode() == AUTHORITY_ID_SUBFIELD)
.findFirst();
}

private List<MarcBibUpdate> mapRecordsToBibUpdateEvents(RecordsBatchResponse batchResponse,
BibAuthorityLinksUpdate event) {
LOGGER.debug("Updated {} bibs for jobId {}, authorityId {}",
Expand All @@ -233,7 +237,7 @@ private List<MarcBibUpdate> mapRecordsToBibUpdateEvents(RecordsBatchResponse bat
var errors = batchResponse.getErrorMessages();
if (!errors.isEmpty()) {
LOGGER.error("Unable to batch update some of linked bib records for jobId {}, authorityId {}."
+ " Total number of records: {}, successful: {}, failures: {}",
+ " Total number of records: {}, successful: {}, failures: {}",
event.getJobId(), event.getAuthorityId(),
batchResponse.getTotalRecords(), batchResponse.getRecords().size(), errors);
}
Expand Down Expand Up @@ -282,20 +286,6 @@ private List<LinkUpdateReport> toFailedLinkUpdateReports(List<Record> errorRecor
.collect(Collectors.toList());
}

private Future<BibAuthorityLinksUpdate> createSnapshot(BibAuthorityLinksUpdate bibAuthorityLinksUpdate) {
var now = new Date();
var snapshot = new Snapshot()
.withJobExecutionId(bibAuthorityLinksUpdate.getJobId())
.withStatus(Snapshot.Status.COMMITTED)
.withProcessingStartedDate(now)
.withMetadata(new Metadata()
.withCreatedDate(now)
.withUpdatedDate(now));

return snapshotService.saveSnapshot(snapshot, bibAuthorityLinksUpdate.getTenant())
.map(result -> bibAuthorityLinksUpdate);
}

private void setUpdatedBy(Record changedRecord, String userId) {
if (StringUtils.isNotBlank(userId)) {
if (changedRecord.getMetadata() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;

Expand All @@ -21,6 +22,7 @@
import org.folio.rest.jooq.enums.RecordState;
import org.folio.services.RecordSearchParameters;
import org.folio.dao.util.MatchField;
import org.folio.services.entities.RecordsModifierOperator;
import org.folio.services.util.TypeConnection;
import org.folio.services.util.parser.ParseFieldsResult;
import org.folio.services.util.parser.ParseLeaderResult;
Expand Down Expand Up @@ -203,14 +205,28 @@ Future<RecordsIdentifiersCollection> getMatchedRecordsIdentifiers(MatchField mat
Future<Record> saveRecord(ReactiveClassicGenericQueryExecutor txQE, Record record);

/**
* Saves {@link RecordCollection} to the db
* Saves {@link RecordCollection} to the db.
*
* @param recordCollection Record collection to save
* @param tenantId tenant id
* @return future with saved {@link RecordsBatchResponse}
*/
Future<RecordsBatchResponse> saveRecords(RecordCollection recordCollection, String tenantId);

/**
* Saves {@link RecordCollection} to the db.
*
* @param externalIds external relation ids
* @param recordType record type
* @param recordsModifier records collection modifier operator
* @param okapiHeaders okapi headers
* @return future with saved {@link RecordsBatchResponse}
*/
Future<RecordsBatchResponse> saveRecordsByExternalIds(List<String> externalIds,
RecordType recordType,
RecordsModifierOperator recordsModifier,
Map<String, String> okapiHeaders);

/**
* Updates {{@link Record} in the db
*
Expand Down
Loading

0 comments on commit a604acb

Please sign in to comment.