diff --git a/NEWS.md b/NEWS.md index 39a15736b..deb152c84 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,3 +1,6 @@ +## 2025-XX-XX 5.8.8-SNAPSHOT +* [MODSOURCE-817](https://folio-org.atlassian.net/browse/MODSOURCE-817) Fix data consistency in handling and updating Marc Bib records for links.instance-authority event + ## 2024-09-03 v5.8.7 * [MODSOURMAN-1200](https://folio-org.atlassian.net/browse/MODSOURMAN-1200) (ECS) Fix update of shared Instances from a member tenant diff --git a/mod-source-record-storage-server/src/main/java/org/folio/consumers/AuthorityLinkChunkKafkaHandler.java b/mod-source-record-storage-server/src/main/java/org/folio/consumers/AuthorityLinkChunkKafkaHandler.java index 1238d0ccb..8cc22805a 100644 --- a/mod-source-record-storage-server/src/main/java/org/folio/consumers/AuthorityLinkChunkKafkaHandler.java +++ b/mod-source-record-storage-server/src/main/java/org/folio/consumers/AuthorityLinkChunkKafkaHandler.java @@ -1,6 +1,5 @@ package org.folio.consumers; -import static java.util.Collections.emptyList; import static java.util.Objects.nonNull; import static org.apache.commons.collections4.CollectionUtils.isEmpty; import static org.apache.commons.lang3.StringUtils.EMPTY; @@ -10,7 +9,9 @@ import static org.folio.consumers.RecordMappingUtils.readParsedContentToObjectRepresentation; import static org.folio.rest.jaxrs.model.LinkUpdateReport.Status.FAIL; import static org.folio.services.util.EventHandlingUtil.createProducer; +import static org.folio.services.util.EventHandlingUtil.toOkapiHeaders; import static org.folio.services.util.KafkaUtil.extractHeaderValue; +import static org.folio.util.AuthorityLinksUtils.getAuthorityIdSubfield; import io.vertx.core.Future; import io.vertx.core.Promise; @@ -24,7 +25,6 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; @@ -33,8 +33,6 @@ import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.folio.dao.util.IdType; -import org.folio.dao.util.RecordDaoUtil; import org.folio.dao.util.RecordType; import org.folio.kafka.AsyncRecordHandler; import org.folio.kafka.KafkaConfig; @@ -53,17 +51,16 @@ import org.folio.rest.jaxrs.model.UpdateTarget; import org.folio.services.RecordService; import org.folio.services.SnapshotService; +import org.folio.services.entities.RecordsModifierOperator; import org.folio.services.handlers.links.DeleteLinkProcessor; import org.folio.services.handlers.links.LinkProcessor; import org.folio.services.handlers.links.UpdateLinkProcessor; -import org.marc4j.marc.Subfield; import org.marc4j.marc.impl.DataFieldImpl; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; @Component public class AuthorityLinkChunkKafkaHandler implements AsyncRecordHandler { - public static final char AUTHORITY_ID_SUBFIELD = '9'; private static final AtomicLong INDEXER = new AtomicLong(); private static final Logger LOGGER = LogManager.getLogger(); private final Map> producers = new HashMap<>(); @@ -87,20 +84,27 @@ public AuthorityLinkChunkKafkaHandler(RecordService recordService, KafkaConfig k @Override public Future handle(KafkaConsumerRecord consumerRecord) { LOGGER.trace("handle:: Handling kafka record: {}", consumerRecord); + LOGGER.info("handle:: Start Handling kafka record"); var userId = extractHeaderValue(XOkapiHeaders.USER_ID, consumerRecord.headers()); - return mapToEvent(consumerRecord) + + var result = mapToEvent(consumerRecord) .compose(this::createSnapshot) - .compose(event -> retrieveRecords(event, event.getTenant()) - .compose(recordCollection -> mapRecordFieldsChanges(event, recordCollection, userId)) - .compose(recordCollection -> recordService.saveRecords(recordCollection, event.getTenant())) - .map(recordsBatchResponse -> sendReports(recordsBatchResponse, event, consumerRecord.headers())) - .map(recordsBatchResponse -> mapRecordsToBibUpdateEvents(recordsBatchResponse, event)) - .compose(marcBibUpdates -> sendEvents(marcBibUpdates, event, consumerRecord)) - ).recover(th -> { - LOGGER.error("Failed to handle {} event", MARC_BIB.moduleTopicName(), th); - return Future.failedFuture(th); - } - ); + .compose(linksUpdate -> { + var instanceIds = getBibRecordExternalIds(linksUpdate); + var okapiHeaders = toOkapiHeaders(consumerRecord.headers(), linksUpdate.getTenant()); + RecordsModifierOperator recordsModifier = recordsCollection -> + this.mapRecordFieldsChanges(linksUpdate, recordsCollection, userId); + + return recordService.saveRecordsByExternalIds(instanceIds, RecordType.MARC_BIB, recordsModifier, okapiHeaders) + .compose(recordsBatchResponse -> { + sendReports(recordsBatchResponse, linksUpdate, consumerRecord.headers()); + var marcBibUpdateStats = mapRecordsToBibUpdateEvents(recordsBatchResponse, linksUpdate); + return sendEvents(marcBibUpdateStats, linksUpdate, consumerRecord); + }); + }); + + LOGGER.info("handle:: Finish Handling kafka record"); + return result; } private Future mapToEvent(KafkaConsumerRecord consumerRecord) { @@ -115,98 +119,104 @@ private Future mapToEvent(KafkaConsumerRecord retrieveRecords(BibAuthorityLinksUpdate bibAuthorityLinksUpdate, String tenantId) { - LOGGER.trace("Retrieving bibs for jobId {}, authorityId {}", - bibAuthorityLinksUpdate.getJobId(), bibAuthorityLinksUpdate.getAuthorityId()); - var instanceIds = bibAuthorityLinksUpdate.getUpdateTargets().stream() + private List getBibRecordExternalIds(BibAuthorityLinksUpdate linksUpdate) { + return linksUpdate.getUpdateTargets().stream() .flatMap(updateTarget -> updateTarget.getLinks().stream() .map(Link::getInstanceId)) .distinct() - .collect(Collectors.toList()); + .toList(); + } - var condition = RecordDaoUtil.getExternalIdsCondition(instanceIds, IdType.INSTANCE) - .and(RecordDaoUtil.filterRecordByDeleted(false)); - return recordService.getRecords(condition, RecordType.MARC_BIB, emptyList(), 0, instanceIds.size(), tenantId); + private Future createSnapshot(BibAuthorityLinksUpdate bibAuthorityLinksUpdate) { + var now = new Date(); + var snapshot = new Snapshot() + .withJobExecutionId(bibAuthorityLinksUpdate.getJobId()) + .withStatus(Snapshot.Status.COMMITTED) + .withProcessingStartedDate(now) + .withMetadata(new Metadata() + .withCreatedDate(now) + .withUpdatedDate(now)); + + return snapshotService.saveSnapshot(snapshot, bibAuthorityLinksUpdate.getTenant()) + .map(result -> bibAuthorityLinksUpdate); } - private Future mapRecordFieldsChanges(BibAuthorityLinksUpdate bibAuthorityLinksUpdate, - RecordCollection recordCollection, String userId) { + + private RecordCollection mapRecordFieldsChanges(BibAuthorityLinksUpdate bibAuthorityLinksUpdate, + RecordCollection recordCollection, String userId) { LOGGER.debug("Retrieved {} bib records for jobId {}, authorityId {}", recordCollection.getTotalRecords(), bibAuthorityLinksUpdate.getJobId(), bibAuthorityLinksUpdate.getAuthorityId()); - return getLinkProcessorForEvent(bibAuthorityLinksUpdate).map(linkProcessor -> { - recordCollection.getRecords().forEach(bibRecord -> { - var newRecordId = UUID.randomUUID().toString(); - var instanceId = bibRecord.getExternalIdsHolder().getInstanceId(); - var parsedRecord = bibRecord.getParsedRecord(); - var parsedRecordContent = readParsedContentToObjectRepresentation(bibRecord); - var fields = new LinkedList<>(parsedRecordContent.getDataFields()); - - var updateTargetFieldCodes = extractUpdateTargetFieldCodesForInstance(bibAuthorityLinksUpdate, instanceId); - var subfieldChanges = bibAuthorityLinksUpdate.getSubfieldsChanges().stream() - .filter(subfieldsChange -> updateTargetFieldCodes.contains(subfieldsChange.getField())) - .collect(Collectors.toMap(SubfieldsChange::getField, SubfieldsChange::getSubfields)); - - fields.forEach(field -> { - if (!updateTargetFieldCodes.contains(field.getTag())) { - return; - } - - var subfields = field.getSubfields(); - if (isEmpty(subfields)) { - return; - } - - var authorityId = getAuthorityIdSubfield(subfields); - if (authorityId.isEmpty() || !bibAuthorityLinksUpdate.getAuthorityId().equals(authorityId.get().getData())) { - return; - } - - var newSubfields = linkProcessor.process(field.getTag(), subfieldChanges.get(field.getTag()), subfields); - LOGGER.trace("JobId {}, AuthorityId {}, instanceId {}, field {}, old subfields: {}, new subfields: {}", - bibAuthorityLinksUpdate.getJobId(), bibAuthorityLinksUpdate.getAuthorityId(), - instanceId, field.getTag(), subfields, newSubfields); - - var newField = new DataFieldImpl(field.getTag(), field.getIndicator1(), field.getIndicator2()); - newSubfields.forEach(newField::addSubfield); - - var dataFields = parsedRecordContent.getDataFields(); - var fieldPosition = dataFields.indexOf(field); - dataFields.remove(fieldPosition); - dataFields.add(fieldPosition, newField); - }); - - parsedRecord.setContent(mapObjectRepresentationToParsedContentJsonString(parsedRecordContent)); - parsedRecord.setFormattedContent(EMPTY); - parsedRecord.setId(newRecordId); - bibRecord.setId(newRecordId); - bibRecord.getRawRecord().setId(newRecordId); - bibRecord.setSnapshotId(bibAuthorityLinksUpdate.getJobId()); - setUpdatedBy(bibRecord, userId); + var linkProcessor = getLinkProcessorForEvent(bibAuthorityLinksUpdate); + recordCollection.getRecords().forEach(bibRecord -> { + var newRecordId = UUID.randomUUID().toString(); + var instanceId = bibRecord.getExternalIdsHolder().getInstanceId(); + var parsedRecord = bibRecord.getParsedRecord(); + var parsedRecordContent = readParsedContentToObjectRepresentation(bibRecord); + var fields = new LinkedList<>(parsedRecordContent.getDataFields()); + + var updateTargetFieldCodes = extractUpdateTargetFieldCodesForInstance(bibAuthorityLinksUpdate, instanceId); + var subfieldChanges = bibAuthorityLinksUpdate.getSubfieldsChanges().stream() + .filter(subfieldsChange -> updateTargetFieldCodes.contains(subfieldsChange.getField())) + .collect(Collectors.toMap(SubfieldsChange::getField, SubfieldsChange::getSubfields)); + + fields.forEach(field -> { + if (!updateTargetFieldCodes.contains(field.getTag())) { + return; + } + + var subfields = field.getSubfields(); + if (isEmpty(subfields)) { + return; + } + + var authorityId = getAuthorityIdSubfield(subfields); + if (authorityId.isEmpty() || !bibAuthorityLinksUpdate.getAuthorityId().equals(authorityId.get().getData())) { + return; + } + + var newSubfields = linkProcessor.process(field.getTag(), subfieldChanges.get(field.getTag()), subfields); + LOGGER.trace("JobId {}, AuthorityId {}, instanceId {}, field {}, old subfields: {}, new subfields: {}", + bibAuthorityLinksUpdate.getJobId(), bibAuthorityLinksUpdate.getAuthorityId(), + instanceId, field.getTag(), subfields, newSubfields); + + var newField = new DataFieldImpl(field.getTag(), field.getIndicator1(), field.getIndicator2()); + newSubfields.forEach(newField::addSubfield); + + var dataFields = parsedRecordContent.getDataFields(); + var fieldPosition = dataFields.indexOf(field); + dataFields.remove(fieldPosition); + dataFields.add(fieldPosition, newField); }); - return recordCollection; + parsedRecord.setContent(mapObjectRepresentationToParsedContentJsonString(parsedRecordContent)); + parsedRecord.setFormattedContent(EMPTY); + parsedRecord.setId(newRecordId); + bibRecord.setId(newRecordId); + bibRecord.getRawRecord().setId(newRecordId); + bibRecord.setSnapshotId(bibAuthorityLinksUpdate.getJobId()); + setUpdatedBy(bibRecord, userId); }); + return recordCollection; } - private Future getLinkProcessorForEvent(BibAuthorityLinksUpdate bibAuthorityLinksUpdate) { + private LinkProcessor getLinkProcessorForEvent(BibAuthorityLinksUpdate bibAuthorityLinksUpdate) { var eventType = bibAuthorityLinksUpdate.getType(); switch (eventType) { - case DELETE: { + case DELETE -> { LOGGER.debug("Precessing DELETE event for jobId {}, authorityId {}", bibAuthorityLinksUpdate.getJobId(), bibAuthorityLinksUpdate.getAuthorityId()); - return Future.succeededFuture(new DeleteLinkProcessor()); + return new DeleteLinkProcessor(); } - case UPDATE: { + case UPDATE -> { LOGGER.debug("Precessing UPDATE event for jobId {}, authorityId {}", bibAuthorityLinksUpdate.getJobId(), bibAuthorityLinksUpdate.getAuthorityId()); - return Future.succeededFuture(new UpdateLinkProcessor()); + return new UpdateLinkProcessor(); } - default: { - return Future.failedFuture(new IllegalArgumentException( + default -> + throw new IllegalArgumentException( String.format("Unsupported event type: %s for jobId %s, authorityId %s", - eventType, bibAuthorityLinksUpdate.getJobId(), bibAuthorityLinksUpdate.getAuthorityId()))); - } + eventType, bibAuthorityLinksUpdate.getJobId(), bibAuthorityLinksUpdate.getAuthorityId())); } } @@ -219,12 +229,6 @@ private List extractUpdateTargetFieldCodesForInstance(BibAuthorityLinksU .collect(Collectors.toList()); } - private Optional getAuthorityIdSubfield(List subfields) { - return subfields.stream() - .filter(subfield -> subfield.getCode() == AUTHORITY_ID_SUBFIELD) - .findFirst(); - } - private List mapRecordsToBibUpdateEvents(RecordsBatchResponse batchResponse, BibAuthorityLinksUpdate event) { LOGGER.debug("Updated {} bibs for jobId {}, authorityId {}", @@ -233,7 +237,7 @@ private List mapRecordsToBibUpdateEvents(RecordsBatchResponse bat var errors = batchResponse.getErrorMessages(); if (!errors.isEmpty()) { LOGGER.error("Unable to batch update some of linked bib records for jobId {}, authorityId {}." - + " Total number of records: {}, successful: {}, failures: {}", + + " Total number of records: {}, successful: {}, failures: {}", event.getJobId(), event.getAuthorityId(), batchResponse.getTotalRecords(), batchResponse.getRecords().size(), errors); } @@ -282,20 +286,6 @@ private List toFailedLinkUpdateReports(List errorRecor .collect(Collectors.toList()); } - private Future createSnapshot(BibAuthorityLinksUpdate bibAuthorityLinksUpdate) { - var now = new Date(); - var snapshot = new Snapshot() - .withJobExecutionId(bibAuthorityLinksUpdate.getJobId()) - .withStatus(Snapshot.Status.COMMITTED) - .withProcessingStartedDate(now) - .withMetadata(new Metadata() - .withCreatedDate(now) - .withUpdatedDate(now)); - - return snapshotService.saveSnapshot(snapshot, bibAuthorityLinksUpdate.getTenant()) - .map(result -> bibAuthorityLinksUpdate); - } - private void setUpdatedBy(Record changedRecord, String userId) { if (StringUtils.isNotBlank(userId)) { if (changedRecord.getMetadata() != null) { diff --git a/mod-source-record-storage-server/src/main/java/org/folio/dao/RecordDao.java b/mod-source-record-storage-server/src/main/java/org/folio/dao/RecordDao.java index 49412d6ee..ce4120b39 100644 --- a/mod-source-record-storage-server/src/main/java/org/folio/dao/RecordDao.java +++ b/mod-source-record-storage-server/src/main/java/org/folio/dao/RecordDao.java @@ -2,6 +2,7 @@ import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.function.Function; @@ -21,6 +22,7 @@ import org.folio.rest.jooq.enums.RecordState; import org.folio.services.RecordSearchParameters; import org.folio.dao.util.MatchField; +import org.folio.services.entities.RecordsModifierOperator; import org.folio.services.util.TypeConnection; import org.folio.services.util.parser.ParseFieldsResult; import org.folio.services.util.parser.ParseLeaderResult; @@ -203,7 +205,7 @@ Future getMatchedRecordsIdentifiers(MatchField mat Future saveRecord(ReactiveClassicGenericQueryExecutor txQE, Record record); /** - * Saves {@link RecordCollection} to the db + * Saves {@link RecordCollection} to the db. * * @param recordCollection Record collection to save * @param tenantId tenant id @@ -211,6 +213,20 @@ Future getMatchedRecordsIdentifiers(MatchField mat */ Future saveRecords(RecordCollection recordCollection, String tenantId); + /** + * Saves {@link RecordCollection} to the db. + * + * @param externalIds external relation ids + * @param recordType record type + * @param recordsModifier records collection modifier operator + * @param okapiHeaders okapi headers + * @return future with saved {@link RecordsBatchResponse} + */ + Future saveRecordsByExternalIds(List externalIds, + RecordType recordType, + RecordsModifierOperator recordsModifier, + Map okapiHeaders); + /** * Updates {{@link Record} in the db * diff --git a/mod-source-record-storage-server/src/main/java/org/folio/dao/RecordDaoImpl.java b/mod-source-record-storage-server/src/main/java/org/folio/dao/RecordDaoImpl.java index f13e773a4..374575639 100644 --- a/mod-source-record-storage-server/src/main/java/org/folio/dao/RecordDaoImpl.java +++ b/mod-source-record-storage-server/src/main/java/org/folio/dao/RecordDaoImpl.java @@ -13,6 +13,7 @@ import static org.folio.dao.util.RecordDaoUtil.filterRecordByType; import static org.folio.dao.util.RecordDaoUtil.getExternalHrid; import static org.folio.dao.util.RecordDaoUtil.getExternalId; +import static org.folio.dao.util.RecordDaoUtil.getExternalIdType; import static org.folio.dao.util.SnapshotDaoUtil.SNAPSHOT_NOT_FOUND_TEMPLATE; import static org.folio.dao.util.SnapshotDaoUtil.SNAPSHOT_NOT_STARTED_MESSAGE_TEMPLATE; import static org.folio.rest.jooq.Tables.ERROR_RECORDS_LB; @@ -22,6 +23,7 @@ import static org.folio.rest.jooq.Tables.RECORDS_LB; import static org.folio.rest.jooq.Tables.SNAPSHOTS_LB; import static org.folio.rest.jooq.enums.RecordType.MARC_BIB; +import static org.folio.rest.util.OkapiConnectionParams.OKAPI_TENANT_HEADER; import static org.folio.rest.util.QueryParamUtil.toRecordType; import static org.jooq.impl.DSL.condition; import static org.jooq.impl.DSL.countDistinct; @@ -63,6 +65,8 @@ import java.util.stream.Collectors; import javax.ws.rs.BadRequestException; import javax.ws.rs.NotFoundException; + +import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang.ArrayUtils; import org.apache.commons.lang.text.StrSubstitutor; import org.apache.commons.lang3.StringUtils; @@ -105,14 +109,16 @@ import org.folio.rest.jooq.tables.records.RecordsLbRecord; import org.folio.rest.jooq.tables.records.SnapshotsLbRecord; import org.folio.services.RecordSearchParameters; +import org.folio.services.entities.RecordsModifierOperator; +import org.folio.services.exceptions.RecordUpdateException; import org.folio.services.util.TypeConnection; import org.folio.services.util.parser.ParseFieldsResult; import org.folio.services.util.parser.ParseLeaderResult; import org.jooq.Condition; +import org.jooq.Configuration; import org.jooq.DSLContext; import org.jooq.Field; import org.jooq.JSONB; -import org.jooq.LoaderError; import org.jooq.Name; import org.jooq.OrderField; import org.jooq.Record1; @@ -233,32 +239,10 @@ public Future getRecords(Condition condition, RecordType recor } @Override - public Future getRecords(Condition condition, RecordType recordType, Collection> orderFields, int offset, int limit, boolean returnTotalCount, String tenantId) { - Name cte = name(CTE); - Name prt = name(recordType.getTableName()); - return getQueryExecutor(tenantId).transaction(txQE -> txQE.query(dsl -> { - ResultQuery> countQuery; - if (returnTotalCount) { - countQuery = dsl.selectCount() - .from(RECORDS_LB) - .where(condition.and(recordType.getRecordImplicitCondition())); - } else { - countQuery = select(inline(null, Integer.class).as(COUNT)); - } - - return dsl - .with(cte.as(countQuery)) - .select(getAllRecordFieldsWithCount(prt)) - .from(RECORDS_LB) - .leftJoin(table(prt)).on(RECORDS_LB.ID.eq(field(TABLE_FIELD_TEMPLATE, UUID.class, prt, name(ID)))) - .leftJoin(RAW_RECORDS_LB).on(RECORDS_LB.ID.eq(RAW_RECORDS_LB.ID)) - .leftJoin(ERROR_RECORDS_LB).on(RECORDS_LB.ID.eq(ERROR_RECORDS_LB.ID)) - .rightJoin(dsl.select().from(table(cte))).on(trueCondition()) - .where(condition.and(recordType.getRecordImplicitCondition())) - .orderBy(orderFields) - .offset(offset) - .limit(limit > 0 ? limit : DEFAULT_LIMIT_FOR_GET_RECORDS); - } + public Future getRecords(Condition condition, RecordType recordType, Collection> orderFields, + int offset, int limit, boolean returnTotalCount, String tenantId) { + return getQueryExecutor(tenantId).transaction(txQE -> txQE.query(dsl -> + readRecords(dsl, condition, recordType, offset, limit, returnTotalCount, orderFields) )).map(queryResult -> toRecordCollectionWithLimitCheck(queryResult, limit)); } @@ -344,7 +328,11 @@ public Future> getMatchedRecordsWithoutIndexersVersionUsage(MatchFi ) .offset(offset) .limit(limit > 0 ? limit : DEFAULT_LIMIT_FOR_GET_RECORDS) - )).map(queryResult -> queryResult.stream().map(res -> asRow(res.unwrap())).map(this::toRecord).collect(Collectors.toList())); + )).map(queryResult -> queryResult.stream() + .map(res -> asRow(res.unwrap())) + .map(this::toRecord) + .toList() + ); } private Condition getMatchedFieldCondition(MatchField matchedField, String partition) { @@ -375,7 +363,8 @@ private String getValueInSqlFormat(Value value) { } if (Value.ValueType.LIST.equals(value.getType())) { List listOfValues = ((ListValue) value).getValue().stream() - .map(v -> format(VALUE_IN_SINGLE_QUOTES, v)).collect(Collectors.toList()); + .map(v -> format(VALUE_IN_SINGLE_QUOTES, v)) + .toList(); return StringUtils.join(listOfValues, ", "); } return StringUtils.EMPTY; @@ -629,215 +618,307 @@ public Future saveRecord(ReactiveClassicGenericQueryExecutor txQE, Recor @Override public Future saveRecords(RecordCollection recordCollection, String tenantId) { logRecordCollection("saveRecords:: Saving", recordCollection, tenantId); + var firstRecord = recordCollection.getRecords().iterator().next(); + var snapshotId = firstRecord.getSnapshotId(); + var recordType = RecordType.valueOf(firstRecord.getRecordType().name()); Promise finalPromise = Promise.promise(); Context context = Vertx.currentContext(); - if(context == null) return Future.failedFuture("saveRecords must be executed by a Vertx thread"); - context.owner().executeBlocking(promise -> { - Set matchedIds = new HashSet<>(); - Set snapshotIds = new HashSet<>(); - Set recordTypes = new HashSet<>(); - - List dbRecords = new ArrayList<>(); - List dbRawRecords = new ArrayList<>(); - List> dbParsedRecords = new ArrayList<>(); - List dbErrorRecords = new ArrayList<>(); - - List errorMessages = new ArrayList<>(); - - recordCollection.getRecords() - .stream() - .map(RecordDaoUtil::ensureRecordHasId) - .map(RecordDaoUtil::ensureRecordHasMatchedId) - .map(RecordDaoUtil::ensureRecordHasSuppressDiscovery) - .map(RecordDaoUtil::ensureRecordForeignKeys) - .forEach(record -> { - // collect unique matched ids to query to determine generation - matchedIds.add(UUID.fromString(record.getMatchedId())); - - // make sure only one snapshot id - snapshotIds.add(record.getSnapshotId()); - if (snapshotIds.size() > 1) { - throw new BadRequestException("Batch record collection only supports single snapshot"); - } - if(Objects.nonNull(record.getRecordType())) { - recordTypes.add(record.getRecordType().name()); - } else { - throw new BadRequestException(StringUtils.defaultIfEmpty(record.getErrorRecord().getDescription(), String.format("Record with id %s has not record type", record.getId()))); - } + if(context == null) { + return Future.failedFuture("saveRecords must be executed by a Vertx thread"); + } - // make sure only one record type - if (recordTypes.size() > 1) { - throw new BadRequestException("Batch record collection only supports single record type"); - } + context.owner().executeBlocking( + () -> saveRecords(recordCollection, snapshotId, recordType, tenantId), + false, + r -> { + if (r.failed()) { + LOG.warn("saveRecords:: Error during batch record save", r.cause()); + finalPromise.fail(r.cause()); + } else { + LOG.debug("saveRecords:: batch record save was successful"); + finalPromise.complete(r.result()); + } + }); - // if record has parsed record, validate by attempting format - if (Objects.nonNull(record.getParsedRecord())) { - try { - RecordType recordType = toRecordType(record.getRecordType().name()); - recordType.formatRecord(record); - Record2 dbParsedRecord = recordType.toDatabaseRecord2(record.getParsedRecord()); - dbParsedRecords.add(dbParsedRecord); - } catch (Exception e) { - // create error record and remove from record - Object content = Objects.nonNull(record.getParsedRecord()) - ? record.getParsedRecord().getContent() - : null; - ErrorRecord errorRecord = new ErrorRecord() - .withId(record.getId()) - .withDescription(e.getMessage()) - .withContent(content); - errorMessages.add(format(INVALID_PARSED_RECORD_MESSAGE_TEMPLATE, record.getId(), e.getMessage())); - record.withErrorRecord(errorRecord) - .withParsedRecord(null) - .withLeaderRecordStatus(null); - } - } - if (Objects.nonNull(record.getRawRecord())) { - dbRawRecords.add(RawRecordDaoUtil.toDatabaseRawRecord(record.getRawRecord())); - } - if (Objects.nonNull(record.getErrorRecord())) { - dbErrorRecords.add(ErrorRecordDaoUtil.toDatabaseErrorRecord(record.getErrorRecord())); - } - dbRecords.add(RecordDaoUtil.toDatabaseRecord(record)); - }); + return finalPromise.future(); + } - UUID snapshotId = UUID.fromString(snapshotIds.stream().findFirst().orElseThrow()); + @Override + public Future saveRecordsByExternalIds(List externalIds, + RecordType recordType, + RecordsModifierOperator recordsModifier, + Map okapiHeaders) { + var condition = RecordDaoUtil.getExternalIdsCondition(externalIds, + getExternalIdType(Record.RecordType.fromValue(recordType.name()))) + .and(RecordDaoUtil.filterRecordByDeleted(false)); + + var tenantId = okapiHeaders.get(OKAPI_TENANT_HEADER); + Promise finalPromise = Promise.promise(); + Context context = Vertx.currentContext(); + if(context == null) { + return Future.failedFuture("saveRecordsByExternalIds:: operation must be executed by a Vertx thread"); + } - RecordType recordType = toRecordType(recordTypes.stream().findFirst().orElseThrow()); + context.owner().executeBlocking( + () -> { + final RecordCollection recordCollection; + try (Connection connection = getConnection(tenantId)) { + recordCollection = DSL.using(connection).transactionResult(ctx -> { + DSLContext dsl = DSL.using(ctx); + var queryResult = readRecords(dsl, condition, recordType, 0, externalIds.size(), false, emptyList()); + var records = queryResult.fetch(this::toRecord); + return new RecordCollection().withRecords(records).withTotalRecords(records.size()); + }); + } catch (SQLException | DataAccessException e) { + LOG.warn("saveRecordsByExternalIds:: Failed to read records", e); + throw e; + } - try (Connection connection = getConnection(tenantId)) { - DSL.using(connection).transaction(ctx -> { - DSLContext dsl = DSL.using(ctx); + if (recordCollection == null || CollectionUtils.isEmpty(recordCollection.getRecords())) { + LOG.warn("saveRecordsByExternalIds:: No records returned from the fetch query"); + return new RecordsBatchResponse().withTotalRecords(0); + } - // validate snapshot - Optional snapshot = DSL.using(ctx).selectFrom(SNAPSHOTS_LB) - .where(SNAPSHOTS_LB.ID.eq(snapshotId)) - .fetchOptional(); - if (snapshot.isPresent()) { - if (Objects.isNull(snapshot.get().getProcessingStartedDate())) { - throw new BadRequestException(format(SNAPSHOT_NOT_STARTED_MESSAGE_TEMPLATE, snapshot.get().getStatus())); - } - } else { - throw new NotFoundException(format(SNAPSHOT_NOT_FOUND_TEMPLATE, snapshotId)); - } + var modifiedRecords = recordsModifier.apply(recordCollection); + var snapshotId = modifiedRecords.getRecords().iterator().next().getSnapshotId(); + return saveRecords(modifiedRecords, snapshotId, recordType, tenantId); + }, + r -> { + if (r.failed()) { + LOG.warn("saveRecordsByExternalIds:: Error during batch record save", r.cause()); + finalPromise.fail(r.cause()); + } else { + LOG.debug("saveRecordsByExternalIds:: batch record save was successful"); + finalPromise.complete(r.result()); + } + } + ); - List ids = new ArrayList<>(); - Map matchedGenerations = new HashMap<>(); - - // lookup latest generation by matched id and committed snapshot updated before current snapshot - dsl.select(RECORDS_LB.MATCHED_ID, RECORDS_LB.ID, RECORDS_LB.GENERATION) - .distinctOn(RECORDS_LB.MATCHED_ID) - .from(RECORDS_LB) - .innerJoin(SNAPSHOTS_LB).on(RECORDS_LB.SNAPSHOT_ID.eq(SNAPSHOTS_LB.ID)) - .where(RECORDS_LB.MATCHED_ID.in(matchedIds) - .and(SNAPSHOTS_LB.STATUS.in(JobExecutionStatus.COMMITTED, JobExecutionStatus.ERROR, JobExecutionStatus.CANCELLED)) - .and(SNAPSHOTS_LB.UPDATED_DATE.lessThan(dsl - .select(SNAPSHOTS_LB.PROCESSING_STARTED_DATE) - .from(SNAPSHOTS_LB) - .where(SNAPSHOTS_LB.ID.eq(snapshotId))))) - .orderBy(RECORDS_LB.MATCHED_ID.asc(), RECORDS_LB.GENERATION.desc()) - .fetchStream().forEach(r -> { - UUID id = r.get(RECORDS_LB.ID); - UUID matchedId = r.get(RECORDS_LB.MATCHED_ID); - int generation = r.get(RECORDS_LB.GENERATION); - ids.add(id); - matchedGenerations.put(matchedId, generation); - }); + return finalPromise.future(); + } + + private ResultQuery readRecords(DSLContext dsl, Condition condition, RecordType recordType, int offset, int limit, + boolean returnTotalCount, Collection> orderFields) { + Name cte = name(CTE); + Name prt = name(recordType.getTableName()); + var finalCondition = condition.and(recordType.getRecordImplicitCondition()); + + ResultQuery> countQuery; + if (returnTotalCount) { + countQuery = dsl.selectCount() + .from(RECORDS_LB) + .where(finalCondition); + } else { + countQuery = select(inline(null, Integer.class).as(COUNT)); + } + + return dsl + .with(cte.as(countQuery)) + .select(getAllRecordFieldsWithCount(prt)) + .from(RECORDS_LB) + .leftJoin(table(prt)).on(RECORDS_LB.ID.eq(field(TABLE_FIELD_TEMPLATE, UUID.class, prt, name(ID)))) + .leftJoin(RAW_RECORDS_LB).on(RECORDS_LB.ID.eq(RAW_RECORDS_LB.ID)) + .leftJoin(ERROR_RECORDS_LB).on(RECORDS_LB.ID.eq(ERROR_RECORDS_LB.ID)) + .rightJoin(dsl.select().from(table(cte))).on(trueCondition()) + .where(finalCondition) + .orderBy(orderFields) + .offset(offset) + .limit(limit > 0 ? limit : DEFAULT_LIMIT_FOR_GET_RECORDS); + } + + private RecordsBatchResponse saveRecords(RecordCollection recordCollection, String snapshotId, RecordType recordType, + String tenantId) throws SQLException { + Set matchedIds = new HashSet<>(); + List dbRecords = new ArrayList<>(); + List dbRawRecords = new ArrayList<>(); + List> dbParsedRecords = new ArrayList<>(); + List dbErrorRecords = new ArrayList<>(); + + List errorMessages = new ArrayList<>(); + + recordCollection.getRecords() + .stream() + .map(RecordDaoUtil::ensureRecordHasId) + .map(RecordDaoUtil::ensureRecordHasMatchedId) + .map(RecordDaoUtil::ensureRecordHasSuppressDiscovery) + .map(RecordDaoUtil::ensureRecordForeignKeys) + .forEach(record -> { + // collect unique matched ids to query to determine generation + matchedIds.add(UUID.fromString(record.getMatchedId())); + + // make sure only one snapshot id + if (!Objects.equals(snapshotId, record.getSnapshotId())) { + throw new BadRequestException("Batch record collection only supports single snapshot"); + } + validateRecordType(record, recordType); - // update matching records state - if(!ids.isEmpty()) - { - dsl.update(RECORDS_LB) - .set(RECORDS_LB.STATE, RecordState.OLD) - .where(RECORDS_LB.ID.in(ids)) - .execute(); + // if record has parsed record, validate by attempting format + if (Objects.nonNull(record.getParsedRecord())) { + try { + recordType.formatRecord(record); + Record2 dbParsedRecord = recordType.toDatabaseRecord2(record.getParsedRecord()); + dbParsedRecords.add(dbParsedRecord); + } catch (Exception e) { + // create error record and remove from record + Object content = Optional.ofNullable(record.getParsedRecord()) + .map(ParsedRecord::getContent) + .orElse(null); + var errorRecord = new ErrorRecord() + .withId(record.getId()) + .withDescription(e.getMessage()) + .withContent(content); + errorMessages.add(format(INVALID_PARSED_RECORD_MESSAGE_TEMPLATE, record.getId(), e.getMessage())); + record.withErrorRecord(errorRecord) + .withParsedRecord(null) + .withLeaderRecordStatus(null); } + } + if (Objects.nonNull(record.getRawRecord())) { + dbRawRecords.add(RawRecordDaoUtil.toDatabaseRawRecord(record.getRawRecord())); + } + if (Objects.nonNull(record.getErrorRecord())) { + dbErrorRecords.add(ErrorRecordDaoUtil.toDatabaseErrorRecord(record.getErrorRecord())); + } + dbRecords.add(RecordDaoUtil.toDatabaseRecord(record)); + }); - // batch insert records updating generation if required - List recordsLoadingErrors = dsl.loadInto(RECORDS_LB) - .batchAfter(1000) - .bulkAfter(500) - .commitAfter(1000) - .onErrorAbort() - .loadRecords(dbRecords.stream().map(record -> { - Integer generation = matchedGenerations.get(record.getMatchedId()); - if (Objects.nonNull(generation)) { - record.setGeneration(generation + 1); - } else if (Objects.isNull(record.getGeneration())) { - record.setGeneration(0); - } - return record; - }).collect(Collectors.toList())) - .fieldsCorresponding() - .execute() - .errors(); + try (Connection connection = getConnection(tenantId)) { + return DSL.using(connection).transactionResult(ctx -> { + DSLContext dsl = DSL.using(ctx); - recordsLoadingErrors.forEach(error -> { - if (error.exception().sqlState().equals(UNIQUE_VIOLATION_SQL_STATE)) { - throw new DuplicateEventException("SQL Unique constraint violation prevented repeatedly saving the record"); - } - LOG.warn("saveRecords:: Error occurred on batch execution: {}", error.exception().getCause().getMessage()); - LOG.debug("saveRecords:: Failed to execute statement from batch: {}", error.query()); + // validate snapshot + validateSnapshot(UUID.fromString(snapshotId), ctx); + + List ids = new ArrayList<>(); + Map matchedGenerations = new HashMap<>(); + + // lookup the latest generation by matched id and committed snapshot updated before current snapshot + dsl.select(RECORDS_LB.MATCHED_ID, RECORDS_LB.ID, RECORDS_LB.GENERATION) + .distinctOn(RECORDS_LB.MATCHED_ID) + .from(RECORDS_LB) + .innerJoin(SNAPSHOTS_LB).on(RECORDS_LB.SNAPSHOT_ID.eq(SNAPSHOTS_LB.ID)) + .where(RECORDS_LB.MATCHED_ID.in(matchedIds) + .and(SNAPSHOTS_LB.STATUS.in(JobExecutionStatus.COMMITTED, JobExecutionStatus.ERROR, JobExecutionStatus.CANCELLED)) + .and(SNAPSHOTS_LB.UPDATED_DATE.lessThan(dsl + .select(SNAPSHOTS_LB.PROCESSING_STARTED_DATE) + .from(SNAPSHOTS_LB) + .where(SNAPSHOTS_LB.ID.eq(UUID.fromString(snapshotId)))))) + .orderBy(RECORDS_LB.MATCHED_ID.asc(), RECORDS_LB.GENERATION.desc()) + .fetchStream().forEach(r -> { + UUID id = r.get(RECORDS_LB.ID); + UUID matchedId = r.get(RECORDS_LB.MATCHED_ID); + int generation = r.get(RECORDS_LB.GENERATION); + ids.add(id); + matchedGenerations.put(matchedId, generation); }); - // batch insert raw records - dsl.loadInto(RAW_RECORDS_LB) - .batchAfter(250) - .commitAfter(1000) - .onDuplicateKeyUpdate() - .onErrorAbort() - .loadRecords(dbRawRecords) - .fieldsCorresponding() + // update matching records state + if(!ids.isEmpty()) + { + dsl.update(RECORDS_LB) + .set(RECORDS_LB.STATE, RecordState.OLD) + .where(RECORDS_LB.ID.in(ids)) .execute(); + } + + // batch insert records updating generation if required + var recordsLoadingErrors = dsl.loadInto(RECORDS_LB) + .batchAfter(1000) + .bulkAfter(500) + .commitAfter(1000) + .onErrorAbort() + .loadRecords(dbRecords.stream() + .map(recordDto -> { + Integer generation = matchedGenerations.get(recordDto.getMatchedId()); + if (Objects.nonNull(generation)) { + recordDto.setGeneration(generation + 1); + } else if (Objects.isNull(recordDto.getGeneration())) { + recordDto.setGeneration(0); + } + return recordDto; + }) + .toList()) + .fieldsCorresponding() + .execute() + .errors(); + + recordsLoadingErrors.forEach(error -> { + if (error.exception().sqlState().equals(UNIQUE_VIOLATION_SQL_STATE)) { + throw new DuplicateEventException("SQL Unique constraint violation prevented repeatedly saving the record"); + } + LOG.warn("saveRecords:: Error occurred on batch execution: {}", error.exception().getCause().getMessage()); + LOG.debug("saveRecords:: Failed to execute statement from batch: {}", error.query()); + }); - // batch insert parsed records - recordType.toLoaderOptionsStep(dsl) + // batch insert raw records + dsl.loadInto(RAW_RECORDS_LB) + .batchAfter(250) + .commitAfter(1000) + .onDuplicateKeyUpdate() + .onErrorAbort() + .loadRecords(dbRawRecords) + .fieldsCorresponding() + .execute(); + + // batch insert parsed records + recordType.toLoaderOptionsStep(dsl) + .batchAfter(250) + .commitAfter(1000) + .onDuplicateKeyUpdate() + .onErrorAbort() + .loadRecords(dbParsedRecords) + .fieldsCorresponding() + .execute(); + + if (!dbErrorRecords.isEmpty()) { + // batch insert error records + dsl.loadInto(ERROR_RECORDS_LB) .batchAfter(250) .commitAfter(1000) .onDuplicateKeyUpdate() .onErrorAbort() - .loadRecords(dbParsedRecords) + .loadRecords(dbErrorRecords) .fieldsCorresponding() .execute(); + } - if (!dbErrorRecords.isEmpty()) { - // batch insert error records - dsl.loadInto(ERROR_RECORDS_LB) - .batchAfter(250) - .commitAfter(1000) - .onDuplicateKeyUpdate() - .onErrorAbort() - .loadRecords(dbErrorRecords) - .fieldsCorresponding() - .execute(); - } + return new RecordsBatchResponse() + .withRecords(recordCollection.getRecords()) + .withTotalRecords(recordCollection.getRecords().size()) + .withErrorMessages(errorMessages); + }); + } catch (DuplicateEventException e) { + LOG.info("saveRecords:: Skipped saving records due to duplicate event: {}", e.getMessage()); + throw e; + } catch (SQLException | DataAccessException ex) { + LOG.warn("saveRecords:: Failed to save records", ex); + Throwable throwable = ex.getCause() != null ? ex.getCause() : ex; + throw new RecordUpdateException(throwable); + } + } - promise.complete(new RecordsBatchResponse() - .withRecords(recordCollection.getRecords()) - .withTotalRecords(recordCollection.getRecords().size()) - .withErrorMessages(errorMessages)); - }); - } catch (DuplicateEventException e) { - LOG.info("saveRecords:: Skipped saving records due to duplicate event: {}", e.getMessage()); - promise.fail(e); - } catch (SQLException | DataAccessException e) { - LOG.warn("saveRecords:: Failed to save records", e); - promise.fail(e.getCause()); - } - }, - false, - r -> { - if (r.failed()) { - LOG.warn("saveRecords:: Error during batch record save", r.cause()); - finalPromise.fail(r.cause()); - } else { - LOG.debug("saveRecords:: batch record save was successful"); - finalPromise.complete(r.result()); - } - }); + private void validateRecordType(Record recordDto, RecordType recordType) { + if (recordDto.getRecordType() == null) { + var error = recordDto.getErrorRecord() != null ? recordDto.getErrorRecord().getDescription() : ""; + throw new BadRequestException( + StringUtils.defaultIfEmpty(error, String.format("Record with id %s has not record type", recordDto.getId()))); + } - return finalPromise.future(); + if (RecordType.valueOf(recordDto.getRecordType().name()) != recordType) { + throw new BadRequestException("Batch record collection only supports single record type"); + } + } + + private void validateSnapshot(UUID snapshotId, Configuration ctx) { + Optional snapshot = DSL.using(ctx).selectFrom(SNAPSHOTS_LB) + .where(SNAPSHOTS_LB.ID.eq(snapshotId)) + .fetchOptional(); + if (snapshot.isPresent() && Objects.isNull(snapshot.get().getProcessingStartedDate())) { + throw new BadRequestException(format(SNAPSHOT_NOT_STARTED_MESSAGE_TEMPLATE, snapshot.get().getStatus())); + } else if (snapshot.isEmpty()) { + throw new NotFoundException(format(SNAPSHOT_NOT_FOUND_TEMPLATE, snapshotId)); + } } @Override @@ -1293,7 +1374,7 @@ public Future updateMarcAuthorityRecordsStateAsDeleted(String matchedId, S .compose(recordCollection -> { List> futures = recordCollection.getRecords().stream() .map(recordToUpdate -> updateMarcAuthorityRecordWithDeletedState(txQE, ensureRecordForeignKeys(recordToUpdate))) - .collect(Collectors.toList()); + .toList(); Promise result = Promise.promise(); GenericCompositeFuture.all(futures).onComplete(ar -> { @@ -1440,11 +1521,11 @@ private Future updateExternalIdsForRecord(ReactiveClassicGenericQueryEx }); } - private Record validateParsedRecordId(Record record) { - if (Objects.isNull(record.getParsedRecord()) || StringUtils.isEmpty(record.getParsedRecord().getId())) { + private Record validateParsedRecordId(Record recordDto) { + if (Objects.isNull(recordDto.getParsedRecord()) || StringUtils.isEmpty(recordDto.getParsedRecord().getId())) { throw new BadRequestException("Each parsed record should contain an id"); } - return record; + return recordDto; } private Field[] getRecordFields(Name prt) { @@ -1487,7 +1568,8 @@ private RecordCollection toRecordCollection(QueryResult result) { List records = result.stream().map(res -> asRow(res.unwrap())).map(row -> { recordCollection.setTotalRecords(row.getInteger(COUNT)); return toRecord(row); - }).collect(Collectors.toList()); + }) + .toList(); if (!records.isEmpty() && Objects.nonNull(records.get(0).getId())) { recordCollection.withRecords(records); } @@ -1499,7 +1581,8 @@ private StrippedParsedRecordCollection toStrippedParsedRecordCollection(QueryRes List records = result.stream().map(res -> asRow(res.unwrap())).map(row -> { recordCollection.setTotalRecords(row.getInteger(COUNT)); return toStrippedParsedRecord(row); - }).collect(Collectors.toList()); + }) + .toList(); if (!records.isEmpty() && Objects.nonNull(records.get(0).getId())) { recordCollection.withRecords(records); } @@ -1513,8 +1596,7 @@ private RecordCollection toRecordCollectionWithLimitCheck(QueryResult result, in // Validation to ignore records insertion to the returned recordCollection when limit equals zero if (limit == 0) { return new RecordCollection().withTotalRecords(asRow(result.unwrap()).getInteger(COUNT)); - } - else { + } else { return toRecordCollection(result); } } @@ -1525,7 +1607,8 @@ private SourceRecordCollection toSourceRecordCollection(QueryResult result) { sourceRecordCollection.setTotalRecords(row.getInteger(COUNT)); return RecordDaoUtil.toSourceRecord(RecordDaoUtil.toRecord(row)) .withParsedRecord(ParsedRecordDaoUtil.toParsedRecord(row)); - }).collect(Collectors.toList()); + }) + .toList(); if (!sourceRecords.isEmpty() && Objects.nonNull(sourceRecords.get(0).getRecordId())) { sourceRecordCollection.withSourceRecords(sourceRecords); } @@ -1542,20 +1625,38 @@ private SourceRecord toSourceRecord(Row row) { } private Record toRecord(Row row) { - Record record = RecordDaoUtil.toRecord(row); + Record recordDto = RecordDaoUtil.toRecord(row); RawRecord rawRecord = RawRecordDaoUtil.toJoinedRawRecord(row); if (Objects.nonNull(rawRecord.getContent())) { - record.setRawRecord(rawRecord); + recordDto.setRawRecord(rawRecord); } ParsedRecord parsedRecord = ParsedRecordDaoUtil.toJoinedParsedRecord(row); if (Objects.nonNull(parsedRecord.getContent())) { - record.setParsedRecord(parsedRecord); + recordDto.setParsedRecord(parsedRecord); } ErrorRecord errorRecord = ErrorRecordDaoUtil.toJoinedErrorRecord(row); if (Objects.nonNull(errorRecord.getContent())) { - record.setErrorRecord(errorRecord); + recordDto.setErrorRecord(errorRecord); + } + return recordDto; + } + + private Record toRecord(org.jooq.Record dbRecord) { + Record recordDto = RecordDaoUtil.toRecord(dbRecord); + RawRecord rawRecord = RawRecordDaoUtil.toJoinedRawRecord(dbRecord); + if (Objects.nonNull(rawRecord.getContent())) { + recordDto.setRawRecord(rawRecord); + } + + ParsedRecord parsedRecord = ParsedRecordDaoUtil.toJoinedParsedRecord(dbRecord); + if (Objects.nonNull(parsedRecord.getContent())) { + recordDto.setParsedRecord(parsedRecord); + } + ErrorRecord errorRecord = ErrorRecordDaoUtil.toJoinedErrorRecord(dbRecord); + if (Objects.nonNull(errorRecord.getContent())) { + recordDto.setErrorRecord(errorRecord); } - return record; + return recordDto; } private StrippedParsedRecord toStrippedParsedRecord(Row row) { diff --git a/mod-source-record-storage-server/src/main/java/org/folio/dao/util/ErrorRecordDaoUtil.java b/mod-source-record-storage-server/src/main/java/org/folio/dao/util/ErrorRecordDaoUtil.java index ff10bc84c..6e76924ed 100644 --- a/mod-source-record-storage-server/src/main/java/org/folio/dao/util/ErrorRecordDaoUtil.java +++ b/mod-source-record-storage-server/src/main/java/org/folio/dao/util/ErrorRecordDaoUtil.java @@ -17,6 +17,7 @@ import io.vertx.core.json.JsonObject; import io.vertx.sqlclient.Row; import io.vertx.sqlclient.RowSet; +import org.jooq.Record; /** * Utility class for managing {@link ErrorRecord} @@ -32,7 +33,7 @@ private ErrorRecordDaoUtil() { } /** * Searches for {@link ErrorRecord} by id using {@link ReactiveClassicGenericQueryExecutor} - * + * * @param queryExecutor query executor * @param id id * @return future with optional ErrorRecord @@ -45,7 +46,7 @@ public static Future> findById(ReactiveClassicGenericQuery /** * Saves {@link ErrorRecord} to the db using {@link ReactiveClassicGenericQueryExecutor} - * + * * @param queryExecutor query executor * @param errorRecord error record * @return future with updated ErrorRecord @@ -62,7 +63,7 @@ public static Future save(ReactiveClassicGenericQueryExecutor query /** * Convert database query result {@link Row} to {@link ErrorRecord} - * + * * @param row query result row * @return ErrorRecord */ @@ -76,7 +77,7 @@ public static ErrorRecord toErrorRecord(Row row) { /** * Convert database query result {@link Row} to {@link ErrorRecord} - * + * * @param row query result row * @return ErrorRecord */ @@ -91,9 +92,26 @@ public static ErrorRecord toJoinedErrorRecord(Row row) { .withDescription(row.getString(DESCRIPTION)); } + /** + * Convert database query result {@link Row} to {@link ErrorRecord} + * + * @param dbRecord query result record + * @return ErrorRecord + */ + public static ErrorRecord toJoinedErrorRecord(Record dbRecord) { + ErrorRecord errorRecord = new ErrorRecord(); + UUID id = dbRecord.get(org.folio.rest.jooq.tables.ErrorRecordsLb.ERROR_RECORDS_LB.ID); + if (Objects.nonNull(id)) { + errorRecord.withId(id.toString()); + } + return errorRecord + .withContent(dbRecord.get(ERROR_RECORD_CONTENT, String.class)) + .withDescription(dbRecord.get(org.folio.rest.jooq.tables.ErrorRecordsLb.ERROR_RECORDS_LB.DESCRIPTION)); + } + /** * Convert database query result {@link Row} to {@link Optional} {@link ErrorRecord} - * + * * @param row query result row * @return optional ErrorRecord */ @@ -103,7 +121,7 @@ public static Optional toOptionalErrorRecord(Row row) { /** * Convert {@link ErrorRecord} to database record {@link ErrorRecordsLbRecord} - * + * * @param errorRecord error record * @return ErrorRecordsLbRecord */ diff --git a/mod-source-record-storage-server/src/main/java/org/folio/dao/util/ParsedRecordDaoUtil.java b/mod-source-record-storage-server/src/main/java/org/folio/dao/util/ParsedRecordDaoUtil.java index 167160881..767c9fa7d 100644 --- a/mod-source-record-storage-server/src/main/java/org/folio/dao/util/ParsedRecordDaoUtil.java +++ b/mod-source-record-storage-server/src/main/java/org/folio/dao/util/ParsedRecordDaoUtil.java @@ -138,16 +138,23 @@ public static ParsedRecord toParsedRecord(Row row) { * @return ParsedRecord */ public static ParsedRecord toJoinedParsedRecord(Row row) { - ParsedRecord parsedRecord = new ParsedRecord(); UUID id = row.getUUID(ID); - if (Objects.nonNull(id)) { - parsedRecord.withId(id.toString()); - } Object content = row.getValue(PARSED_RECORD_CONTENT); - if (Objects.nonNull(content)) { - parsedRecord.withContent(normalize(content).getMap()); - } - return parsedRecord; + + return asParsedRecord(id, content); + } + + /** + * Convert database query result {@link org.jooq.Record} to {@link ParsedRecord} + * + * @param dbRecord query result record + * @return ParsedRecord + */ + public static ParsedRecord toJoinedParsedRecord(org.jooq.Record dbRecord) { + UUID id = dbRecord.get(ID_FIELD); + Object content = dbRecord.get(PARSED_RECORD_CONTENT, String.class); + + return asParsedRecord(id, content); } /** @@ -259,4 +266,14 @@ public static JsonObject normalize(Object content) { : JsonObject.mapFrom(content); } + private static ParsedRecord asParsedRecord(UUID id, Object content) { + ParsedRecord parsedRecord = new ParsedRecord(); + if (Objects.nonNull(id)) { + parsedRecord.withId(id.toString()); + } + if (Objects.nonNull(content)) { + parsedRecord.withContent(normalize(content).getMap()); + } + return parsedRecord; + } } diff --git a/mod-source-record-storage-server/src/main/java/org/folio/dao/util/RawRecordDaoUtil.java b/mod-source-record-storage-server/src/main/java/org/folio/dao/util/RawRecordDaoUtil.java index 0de3850e1..d41851fbf 100644 --- a/mod-source-record-storage-server/src/main/java/org/folio/dao/util/RawRecordDaoUtil.java +++ b/mod-source-record-storage-server/src/main/java/org/folio/dao/util/RawRecordDaoUtil.java @@ -16,6 +16,7 @@ import io.vertx.core.Future; import io.vertx.sqlclient.Row; import io.vertx.sqlclient.RowSet; +import org.jooq.Record; /** * Utility class for managing {@link RawRecord} @@ -30,7 +31,7 @@ private RawRecordDaoUtil() { } /** * Searches for {@link RawRecord} by id using {@link ReactiveClassicGenericQueryExecutor} - * + * * @param queryExecutor query executor * @param id id * @return future with optional RawRecord @@ -43,7 +44,7 @@ public static Future> findById(ReactiveClassicGenericQueryEx /** * Saves {@link RawRecord} to the db using {@link ReactiveClassicGenericQueryExecutor} - * + * * @param queryExecutor query executor * @param rawRecord raw record * @return future with updated RawRecord @@ -60,7 +61,7 @@ public static Future save(ReactiveClassicGenericQueryExecutor queryEx /** * Convert database query result {@link Row} to {@link RawRecord} - * + * * @param row query result row * @return RawRecord */ @@ -73,7 +74,7 @@ public static RawRecord toRawRecord(Row row) { /** * Convert database query result {@link Row} to {@link RawRecord} - * + * * @param row query result row * @return RawRecord */ @@ -87,9 +88,25 @@ public static RawRecord toJoinedRawRecord(Row row) { .withContent(row.getString(RAW_RECORD_CONTENT)); } + /** + * Convert database query result {@link Record} to {@link RawRecord} + * + * @param dbRecord query result record + * @return RawRecord + */ + public static RawRecord toJoinedRawRecord(Record dbRecord) { + RawRecord rawRecord = new RawRecord(); + UUID id = dbRecord.get(org.folio.rest.jooq.tables.RawRecordsLb.RAW_RECORDS_LB.ID); + if (Objects.nonNull(id)) { + rawRecord.withId(id.toString()); + } + return rawRecord + .withContent(dbRecord.get(RAW_RECORD_CONTENT, String.class)); + } + /** * Convert database query result {@link Row} to {@link Optional} {@link RawRecord} - * + * * @param row query result row * @return optional RawRecord */ @@ -99,7 +116,7 @@ public static Optional toOptionalRawRecord(Row row) { /** * Convert {@link RawRecord} to database record {@link RawRecordsLbRecord} - * + * * @param rawRecord raw record * @return RawRecordsLbRecord */ diff --git a/mod-source-record-storage-server/src/main/java/org/folio/dao/util/RecordDaoUtil.java b/mod-source-record-storage-server/src/main/java/org/folio/dao/util/RecordDaoUtil.java index f00d3eebc..d77685d99 100644 --- a/mod-source-record-storage-server/src/main/java/org/folio/dao/util/RecordDaoUtil.java +++ b/mod-source-record-storage-server/src/main/java/org/folio/dao/util/RecordDaoUtil.java @@ -278,31 +278,46 @@ public static SourceRecord toSourceRecord(Record record) { */ public static Record toRecord(Row row) { RecordsLb pojo = RowMappers.getRecordsLbMapper().apply(row); - Record record = new Record(); + return asRecord(pojo); + } + + /** + * Convert database query result {@link org.jooq.Record} to {@link Record} + * + * @param dbRecord query result record + * @return Record + */ + public static Record toRecord(org.jooq.Record dbRecord) { + RecordsLb pojo = RecordMappers.getDbRecordToRecordsLbMapper().apply(dbRecord); + return asRecord(pojo); + } + + private static Record asRecord(RecordsLb pojo) { + Record recordDto = new Record(); if (Objects.nonNull(pojo.getId())) { - record.withId(pojo.getId().toString()); + recordDto.withId(pojo.getId().toString()); } if (Objects.nonNull(pojo.getSnapshotId())) { - record.withSnapshotId(pojo.getSnapshotId().toString()); + recordDto.withSnapshotId(pojo.getSnapshotId().toString()); } if (Objects.nonNull(pojo.getMatchedId())) { - record.withMatchedId(pojo.getMatchedId().toString()); + recordDto.withMatchedId(pojo.getMatchedId().toString()); } if (Objects.nonNull(pojo.getRecordType())) { - record.withRecordType(Record.RecordType.valueOf(pojo.getRecordType().toString())); + recordDto.withRecordType(Record.RecordType.valueOf(pojo.getRecordType().toString())); } if (Objects.nonNull(pojo.getState())) { - record.withState(State.valueOf(pojo.getState().toString())); + recordDto.withState(State.valueOf(pojo.getState().toString())); } - record + recordDto .withOrder(pojo.getOrder()) .withGeneration(pojo.getGeneration()) .withLeaderRecordStatus(pojo.getLeaderRecordStatus()) - .withDeleted(record.getState().equals(State.DELETED) - || DELETED_LEADER_RECORD_STATUS.contains(record.getLeaderRecordStatus())); + .withDeleted(recordDto.getState().equals(State.DELETED) + || DELETED_LEADER_RECORD_STATUS.contains(recordDto.getLeaderRecordStatus())); - return record + return recordDto .withAdditionalInfo(toAdditionalInfo(pojo)) .withExternalIdsHolder(toExternalIdsHolder(pojo)) .withMetadata(toMetadata(pojo)); @@ -661,7 +676,7 @@ private static UUID toUUID(String uuid) { } private static List toUUIDs(List uuids) { - return uuids.stream().map(RecordDaoUtil::toUUID).collect(Collectors.toList()); + return uuids.stream().map(RecordDaoUtil::toUUID).toList(); } private static RecordType toRecordType(String type) { diff --git a/mod-source-record-storage-server/src/main/java/org/folio/dao/util/RecordMappers.java b/mod-source-record-storage-server/src/main/java/org/folio/dao/util/RecordMappers.java new file mode 100644 index 000000000..0cb50bc42 --- /dev/null +++ b/mod-source-record-storage-server/src/main/java/org/folio/dao/util/RecordMappers.java @@ -0,0 +1,39 @@ +package org.folio.dao.util; + +import org.folio.rest.jooq.tables.pojos.RecordsLb; +import org.jooq.Record; + +import java.util.function.Function; + +public final class RecordMappers { + + private RecordMappers() {} + + public static Function getDbRecordToRecordsLbMapper() { + return jooqRecord -> { + org.folio.rest.jooq.tables.pojos.RecordsLb pojo = new org.folio.rest.jooq.tables.pojos.RecordsLb(); + pojo.setId(jooqRecord.get(org.folio.rest.jooq.tables.RecordsLb.RECORDS_LB.ID)); + pojo.setSnapshotId(jooqRecord.get(org.folio.rest.jooq.tables.RecordsLb.RECORDS_LB.SNAPSHOT_ID)); + pojo.setMatchedId(jooqRecord.get(org.folio.rest.jooq.tables.RecordsLb.RECORDS_LB.MATCHED_ID)); + pojo.setGeneration(jooqRecord.get(org.folio.rest.jooq.tables.RecordsLb.RECORDS_LB.GENERATION)); + pojo.setRecordType(java.util.Arrays.stream(org.folio.rest.jooq.enums.RecordType.values()) + .filter(jooqRecord.get(org.folio.rest.jooq.tables.RecordsLb.RECORDS_LB.RECORD_TYPE)::equals) + .findFirst() + .orElse(null)); + pojo.setExternalId(jooqRecord.get(org.folio.rest.jooq.tables.RecordsLb.RECORDS_LB.EXTERNAL_ID)); + pojo.setState(java.util.Arrays.stream(org.folio.rest.jooq.enums.RecordState.values()) + .filter(jooqRecord.get(org.folio.rest.jooq.tables.RecordsLb.RECORDS_LB.STATE)::equals) + .findFirst() + .orElse(null)); + pojo.setLeaderRecordStatus(jooqRecord.get(org.folio.rest.jooq.tables.RecordsLb.RECORDS_LB.LEADER_RECORD_STATUS)); + pojo.setOrder(jooqRecord.get(org.folio.rest.jooq.tables.RecordsLb.RECORDS_LB.ORDER)); + pojo.setSuppressDiscovery(jooqRecord.get(org.folio.rest.jooq.tables.RecordsLb.RECORDS_LB.SUPPRESS_DISCOVERY)); + pojo.setCreatedByUserId(jooqRecord.get(org.folio.rest.jooq.tables.RecordsLb.RECORDS_LB.CREATED_BY_USER_ID)); + pojo.setCreatedDate(jooqRecord.get(org.folio.rest.jooq.tables.RecordsLb.RECORDS_LB.CREATED_DATE)); + pojo.setUpdatedByUserId(jooqRecord.get(org.folio.rest.jooq.tables.RecordsLb.RECORDS_LB.UPDATED_BY_USER_ID)); + pojo.setUpdatedDate(jooqRecord.get(org.folio.rest.jooq.tables.RecordsLb.RECORDS_LB.UPDATED_DATE)); + pojo.setExternalHrid(jooqRecord.get(org.folio.rest.jooq.tables.RecordsLb.RECORDS_LB.EXTERNAL_HRID)); + return pojo; + }; + } +} diff --git a/mod-source-record-storage-server/src/main/java/org/folio/services/RecordService.java b/mod-source-record-storage-server/src/main/java/org/folio/services/RecordService.java index f97b0bef8..04e95adfd 100644 --- a/mod-source-record-storage-server/src/main/java/org/folio/services/RecordService.java +++ b/mod-source-record-storage-server/src/main/java/org/folio/services/RecordService.java @@ -1,10 +1,12 @@ package org.folio.services; +import io.reactivex.Flowable; +import io.vertx.core.Future; +import io.vertx.sqlclient.Row; import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.Optional; - -import io.vertx.sqlclient.Row; import org.folio.dao.util.IdType; import org.folio.dao.util.RecordType; import org.folio.rest.jaxrs.model.FetchParsedRecordsBatchRequest; @@ -21,12 +23,10 @@ import org.folio.rest.jaxrs.model.SourceRecordCollection; import org.folio.rest.jaxrs.model.StrippedParsedRecordCollection; import org.folio.rest.jooq.enums.RecordState; +import org.folio.services.entities.RecordsModifierOperator; import org.jooq.Condition; import org.jooq.OrderField; -import io.reactivex.Flowable; -import io.vertx.core.Future; - public interface RecordService { /** @@ -82,6 +82,19 @@ public interface RecordService { */ Future saveRecords(RecordCollection recordsCollection, String tenantId); + /** + * Saves collection of records. + * + * @param externalIds external relation ids + * @param recordType record type + * @param recordsModifier records collection modifier operator + * @param okapiHeaders okapi headers + * @return future with response containing list of successfully saved records and error messages for records that were not saved + */ + Future saveRecordsByExternalIds(List externalIds, RecordType recordType, + RecordsModifierOperator recordsModifier, + Map okapiHeaders); + /** * Updates record with given id * diff --git a/mod-source-record-storage-server/src/main/java/org/folio/services/RecordServiceImpl.java b/mod-source-record-storage-server/src/main/java/org/folio/services/RecordServiceImpl.java index afd175859..2b800ff82 100644 --- a/mod-source-record-storage-server/src/main/java/org/folio/services/RecordServiceImpl.java +++ b/mod-source-record-storage-server/src/main/java/org/folio/services/RecordServiceImpl.java @@ -11,8 +11,10 @@ import static org.folio.dao.util.RecordDaoUtil.filterRecordByExternalHridValues; import static org.folio.dao.util.RecordDaoUtil.filterRecordByState; import static org.folio.dao.util.RecordDaoUtil.getExternalIdsCondition; +import static org.folio.dao.util.RecordDaoUtil.getExternalIdType; import static org.folio.dao.util.SnapshotDaoUtil.SNAPSHOT_NOT_FOUND_TEMPLATE; import static org.folio.dao.util.SnapshotDaoUtil.SNAPSHOT_NOT_STARTED_MESSAGE_TEMPLATE; +import static org.folio.rest.util.OkapiConnectionParams.OKAPI_TENANT_HEADER; import static org.folio.rest.util.QueryParamUtil.toRecordType; import static org.folio.services.util.AdditionalFieldsUtil.TAG_999; import static org.folio.services.util.AdditionalFieldsUtil.addFieldToMarcRecord; @@ -23,12 +25,14 @@ import java.util.Collections; import java.util.Date; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.UUID; -import java.util.stream.Collectors; +import java.util.concurrent.ExecutionException; import javax.ws.rs.BadRequestException; import javax.ws.rs.NotFoundException; +import org.apache.commons.collections4.CollectionUtils; import io.reactivex.Flowable; import io.vertx.core.AsyncResult; @@ -75,6 +79,8 @@ import org.folio.rest.jaxrs.model.SourceRecordCollection; import org.folio.rest.jaxrs.model.StrippedParsedRecordCollection; import org.folio.rest.jooq.enums.RecordState; +import org.folio.services.entities.RecordsModifierOperator; +import org.folio.services.exceptions.RecordUpdateException; import org.folio.services.util.parser.ParseFieldsResult; import org.folio.services.util.parser.ParseLeaderResult; import org.folio.services.util.parser.SearchExpressionParser; @@ -168,6 +174,42 @@ public Future saveRecords(RecordCollection recordCollectio .recover(RecordServiceImpl::mapToDuplicateExceptionIfNeeded); } + @Override + public Future saveRecordsByExternalIds(List externalIds, + RecordType recordType, + RecordsModifierOperator recordsModifier, + Map okapiHeaders) { + if (CollectionUtils.isEmpty(externalIds)) { + LOG.warn("saveRecordsByExternalIds:: Skipping the records save, no external IDs are provided"); + return Future.succeededFuture(new RecordsBatchResponse().withTotalRecords(0)); + } + + if (recordsModifier == null) { + LOG.warn("saveRecordsByExternalIds:: Skipping the records save, no operator is provided to modify the existing records"); + return Future.succeededFuture(new RecordsBatchResponse().withTotalRecords(0)); + } + + RecordsModifierOperator recordsMatchedIdsSetter = recordCollection -> { + try { + for (var sourceRecord : recordCollection.getRecords()) { + setMatchedIdForRecord(sourceRecord, okapiHeaders.get(OKAPI_TENANT_HEADER)) + .toCompletionStage().toCompletableFuture().get(); + } + return recordCollection; + } catch (InterruptedException ex) { + LOG.warn("saveRecordsByExternalIds:: Setting record matched id is interrupted: {}", ex.getMessage()); + Thread.currentThread().interrupt(); + throw new RecordUpdateException(ex.getMessage()); + } catch (ExecutionException ex) { + LOG.warn("saveRecordsByExternalIds:: Failed to set record matched id: {}", ex.getMessage()); + throw new RecordUpdateException(ex.getMessage()); + } + }; + RecordsModifierOperator recordsModifierWithMatchedIdsSetter = recordsModifier.andThen(recordsMatchedIdsSetter); + + return recordDao.saveRecordsByExternalIds(externalIds, recordType, recordsModifierWithMatchedIdsSetter, okapiHeaders); + } + @Override public Future updateRecord(Record record, String tenantId) { return recordDao.updateRecord(ensureRecordForeignKeys(record), tenantId); @@ -352,7 +394,7 @@ private Future setMatchedIdForRecord(Record record, String tenantId) { } Promise promise = Promise.promise(); String externalId = RecordDaoUtil.getExternalId(record.getExternalIdsHolder(), record.getRecordType()); - IdType idType = RecordDaoUtil.getExternalIdType(record.getRecordType()); + IdType idType = getExternalIdType(record.getRecordType()); if (externalId != null && idType != null && record.getState() == Record.State.ACTUAL) { setMatchedIdFromExistingSourceRecord(record, tenantId, promise, externalId, idType); @@ -424,7 +466,7 @@ private void filterFieldsByDataRange(AsyncResult .map(JsonObject.class::cast) .filter(field -> checkFieldRange(field, data)) .map(JsonObject::getMap) - .collect(Collectors.toList()); + .toList(); parsedContent.put("fields", filteredFields); recordToFilter.getParsedRecord().setContent(parsedContent.getMap()); diff --git a/mod-source-record-storage-server/src/main/java/org/folio/services/entities/RecordsModifierOperator.java b/mod-source-record-storage-server/src/main/java/org/folio/services/entities/RecordsModifierOperator.java new file mode 100644 index 000000000..5dcd87483 --- /dev/null +++ b/mod-source-record-storage-server/src/main/java/org/folio/services/entities/RecordsModifierOperator.java @@ -0,0 +1,24 @@ +package org.folio.services.entities; + +import org.folio.rest.jaxrs.model.RecordCollection; +import java.util.Objects; +import java.util.function.UnaryOperator; + +@FunctionalInterface +public +interface RecordsModifierOperator extends UnaryOperator { + + static RecordsModifierOperator identity() { + return s -> s; + } + + default RecordsModifierOperator andThen(RecordsModifierOperator after) { + Objects.requireNonNull(after); + return s -> after.apply(this.apply(s)); + } + + default RecordsModifierOperator compose(RecordsModifierOperator before) { + Objects.requireNonNull(before); + return s -> this.apply(before.apply(s)); + } +} diff --git a/mod-source-record-storage-server/src/main/java/org/folio/services/exceptions/RecordUpdateException.java b/mod-source-record-storage-server/src/main/java/org/folio/services/exceptions/RecordUpdateException.java new file mode 100644 index 000000000..8ae129f63 --- /dev/null +++ b/mod-source-record-storage-server/src/main/java/org/folio/services/exceptions/RecordUpdateException.java @@ -0,0 +1,15 @@ +package org.folio.services.exceptions; + +/** + * Exception to indicate a record update error + */ +public class RecordUpdateException extends RuntimeException { + + public RecordUpdateException(String message) { + super(message); + } + + public RecordUpdateException(Throwable cause) { + super(cause); + } +} diff --git a/mod-source-record-storage-server/src/main/java/org/folio/services/handlers/links/DeleteLinkProcessor.java b/mod-source-record-storage-server/src/main/java/org/folio/services/handlers/links/DeleteLinkProcessor.java index 17a7d1881..9e08f137a 100644 --- a/mod-source-record-storage-server/src/main/java/org/folio/services/handlers/links/DeleteLinkProcessor.java +++ b/mod-source-record-storage-server/src/main/java/org/folio/services/handlers/links/DeleteLinkProcessor.java @@ -1,6 +1,6 @@ package org.folio.services.handlers.links; -import static org.folio.consumers.AuthorityLinkChunkKafkaHandler.AUTHORITY_ID_SUBFIELD; +import static org.folio.util.AuthorityLinksUtils.AUTHORITY_ID_SUBFIELD; import java.util.Collection; import java.util.LinkedList; diff --git a/mod-source-record-storage-server/src/main/java/org/folio/services/util/EventHandlingUtil.java b/mod-source-record-storage-server/src/main/java/org/folio/services/util/EventHandlingUtil.java index e2a4bde97..932d3e20a 100644 --- a/mod-source-record-storage-server/src/main/java/org/folio/services/util/EventHandlingUtil.java +++ b/mod-source-record-storage-server/src/main/java/org/folio/services/util/EventHandlingUtil.java @@ -15,11 +15,18 @@ import org.folio.processing.events.utils.PomReaderUtil; import org.folio.rest.jaxrs.model.Event; import org.folio.rest.jaxrs.model.EventMetadata; -import org.folio.rest.tools.utils.ModuleName; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.UUID; +import static java.util.Objects.nonNull; +import static org.folio.rest.util.OkapiConnectionParams.OKAPI_TENANT_HEADER; +import static org.folio.rest.util.OkapiConnectionParams.OKAPI_TOKEN_HEADER; +import static org.folio.rest.util.OkapiConnectionParams.OKAPI_URL_HEADER; +import static org.folio.services.util.KafkaUtil.extractHeaderValue; + public final class EventHandlingUtil { private static final Logger LOGGER = LogManager.getLogger(); @@ -91,8 +98,8 @@ public static KafkaProducerRecord createProducerRecord(String ev } public static String constructModuleName() { - return PomReaderUtil.INSTANCE.constructModuleVersionAndVersion(ModuleName.getModuleName(), - ModuleName.getModuleVersion()); + return PomReaderUtil.INSTANCE.constructModuleVersionAndVersion(PomReaderUtil.INSTANCE.getModuleName(), + PomReaderUtil.INSTANCE.getVersion()); } public static String createTopicName(String eventType, String tenantId, KafkaConfig kafkaConfig) { @@ -108,6 +115,14 @@ public static KafkaProducer createProducer(String eventType, Kaf return new SimpleKafkaProducerManager(Vertx.currentContext().owner(), kafkaConfig).createShared(eventType); } + public static Map toOkapiHeaders(List kafkaHeaders, String eventTenantId) { + var okapiHeaders = new HashMap(); + okapiHeaders.put(OKAPI_URL_HEADER, extractHeaderValue(OKAPI_URL_HEADER, kafkaHeaders)); + okapiHeaders.put(OKAPI_TENANT_HEADER, nonNull(eventTenantId) ? eventTenantId : extractHeaderValue(OKAPI_TENANT_HEADER, kafkaHeaders)); + okapiHeaders.put(OKAPI_TOKEN_HEADER, extractHeaderValue(OKAPI_TOKEN_HEADER, kafkaHeaders)); + return okapiHeaders; + } + private static String extractRecordId(List kafkaHeaders) { return kafkaHeaders.stream() .filter(header -> header.key().equals(RECORD_ID_HEADER)) diff --git a/mod-source-record-storage-server/src/main/java/org/folio/util/AuthorityLinksUtils.java b/mod-source-record-storage-server/src/main/java/org/folio/util/AuthorityLinksUtils.java new file mode 100644 index 000000000..53b170599 --- /dev/null +++ b/mod-source-record-storage-server/src/main/java/org/folio/util/AuthorityLinksUtils.java @@ -0,0 +1,27 @@ +package org.folio.util; + +import java.util.List; +import java.util.Optional; +import lombok.experimental.UtilityClass; +import org.marc4j.marc.Subfield; + +@UtilityClass +public class AuthorityLinksUtils { + + public static final char AUTHORITY_ID_SUBFIELD = '9'; + public static final char AUTHORITY_NATURAL_ID_SUBFIELD = '0'; + + public static Optional getAuthorityIdSubfield(List subfields) { + return getSubfield(subfields, AUTHORITY_ID_SUBFIELD); + } + + public static Optional getAuthorityNaturalIdSubfield(List subfields) { + return getSubfield(subfields, AUTHORITY_NATURAL_ID_SUBFIELD); + } + + private static Optional getSubfield(List subfields, char authorityIdSubfield) { + return subfields.stream() + .filter(subfield -> subfield.getCode() == authorityIdSubfield) + .findFirst(); + } +} diff --git a/mod-source-record-storage-server/src/test/java/org/folio/services/RecordServiceTest.java b/mod-source-record-storage-server/src/test/java/org/folio/services/RecordServiceTest.java index 6de404c4b..bd3ee6fec 100644 --- a/mod-source-record-storage-server/src/test/java/org/folio/services/RecordServiceTest.java +++ b/mod-source-record-storage-server/src/test/java/org/folio/services/RecordServiceTest.java @@ -2,7 +2,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import io.reactivex.Flowable; -import io.vertx.core.AsyncResult; import io.vertx.core.CompositeFuture; import io.vertx.core.Future; import io.vertx.core.json.JsonArray; @@ -37,7 +36,6 @@ import org.folio.rest.jaxrs.model.RecordsBatchResponse; import org.folio.rest.jaxrs.model.Snapshot; import org.folio.rest.jaxrs.model.SourceRecord; -import org.folio.rest.jaxrs.model.SourceRecordCollection; import org.folio.rest.jaxrs.model.StrippedParsedRecord; import org.folio.rest.jooq.enums.RecordState; import org.jooq.Condition; @@ -137,7 +135,7 @@ public void shouldGetMarcBibRecordsBySnapshotId(TestContext context) { .filter(r -> r.getRecordType().equals(Record.RecordType.MARC_BIB)) .filter(r -> r.getSnapshotId().equals(snapshotId)) .sorted(comparing(Record::getOrder)) - .collect(Collectors.toList()); + .toList(); context.assertEquals(expected.size(), get.result().getTotalRecords()); compareRecords(context, expected.get(1), get.result().getRecords().get(0)); compareRecords(context, expected.get(2), get.result().getRecords().get(1)); @@ -267,7 +265,7 @@ public void shouldStreamMarcBibRecordsBySnapshotId(TestContext context) { .filter(r -> r.getRecordType().equals(Record.RecordType.MARC_BIB)) .filter(r -> r.getSnapshotId().equals(snapshotId)) .sorted(comparing(Record::getOrder)) - .collect(Collectors.toList()); + .toList(); List actual = new ArrayList<>(); flowable.doFinally(() -> { @@ -1413,7 +1411,7 @@ private void getTotalRecordsAndRecordsDependsOnLimit(TestContext context, int li } List expected = records.stream() .filter(r -> r.getRecordType().equals(Record.RecordType.MARC_BIB)) - .collect(Collectors.toList()); + .toList(); context.assertEquals(expected.size(), get.result().getTotalRecords()); context.assertEquals(limit, get.result().getRecords().size()); async.complete(); @@ -1443,7 +1441,7 @@ private void getRecordsBySnapshotId(TestContext context, String snapshotId, Reco .filter(r -> r.getRecordType().equals(recordType)) .filter(r -> r.getSnapshotId().equals(snapshotId)) .sorted(comparing(Record::getOrder)) - .collect(Collectors.toList()); + .toList(); context.assertEquals(expected.size(), get.result().getTotalRecords()); compareRecords(context, expected.get(0), get.result().getRecords().get(0)); async.complete(); @@ -1471,8 +1469,7 @@ private void getMarcRecordsBetweenDates(TestContext context, OffsetDateTime earl } List expected = records.stream() .filter(r -> r.getRecordType().equals(recordType)) - .sorted(comparing(Record::getOrder)) - .collect(Collectors.toList()); + .toList(); context.assertEquals(expected.size(), get.result().getTotalRecords()); compareRecords(context, expected, get.result().getRecords()); async.complete(); @@ -1500,7 +1497,7 @@ private void streamRecordsBySnapshotId(TestContext context, String snapshotId, R .filter(r -> r.getRecordType().equals(recordType)) .filter(r -> r.getSnapshotId().equals(snapshotId)) .sorted(comparing(Record::getOrder)) - .collect(Collectors.toList()); + .toList(); List actual = new ArrayList<>(); flowable.doFinally(() -> { @@ -1597,8 +1594,6 @@ private void saveMarcRecords(TestContext context, Record.RecordType marcBib) { } context.assertEquals(0, batch.result().getErrorMessages().size()); context.assertEquals(expected.size(), batch.result().getTotalRecords()); - expected.sort(comparing(Record::getId)); - batch.result().getRecords().sort(comparing(Record::getId)); compareRecords(context, expected, batch.result().getRecords()); RecordDaoUtil.countByCondition(postgresClientFactory.getQueryExecutor(TENANT_ID), DSL.trueCondition()) .onComplete(count -> { @@ -1627,8 +1622,6 @@ private void saveMarcRecordsWithExpectedErrors(TestContext context, Record.Recor } context.assertEquals(0, batch.result().getErrorMessages().size()); context.assertEquals(expected.size(), batch.result().getTotalRecords()); - expected.sort(comparing(Record::getId)); - batch.result().getRecords().sort(comparing(Record::getId)); compareRecords(context, expected, batch.result().getRecords()); checkRecordErrorRecords(context, batch.result().getRecords(), TestMocks.getErrorRecord(0).getContent().toString(), TestMocks.getErrorRecord(0).getDescription()); @@ -1671,9 +1664,7 @@ private void getMarcSourceRecords(TestContext context, RecordType parsedRecordTy List expected = records.stream() .filter(r -> r.getRecordType().equals(recordType)) .map(RecordDaoUtil::toSourceRecord) - .sorted(comparing(SourceRecord::getRecordId)) - .collect(Collectors.toList()); - get.result().getSourceRecords().sort(comparing(SourceRecord::getRecordId)); + .toList(); context.assertEquals(expected.size(), get.result().getTotalRecords()); compareSourceRecords(context, expected, get.result().getSourceRecords()); async.complete(); @@ -1700,14 +1691,10 @@ private void streamMarcSourceRecords(TestContext context, RecordType parsedRecor List expected = records.stream() .filter(r -> r.getRecordType().equals(recordType)) .map(RecordDaoUtil::toSourceRecord) - .sorted(comparing(SourceRecord::getRecordId)) - .sorted(comparing(SourceRecord::getOrder)) - .collect(Collectors.toList()); + .toList(); List actual = new ArrayList<>(); flowable.doFinally(() -> { - - actual.sort(comparing(SourceRecord::getRecordId)); context.assertEquals(expected.size(), actual.size()); compareSourceRecords(context, expected, actual); @@ -1741,9 +1728,7 @@ private void getMarcSourceRecordsByListOfIds(TestContext context, Record.RecordT List expected = records.stream() .filter(r -> r.getRecordType().equals(recordType)) .map(RecordDaoUtil::toSourceRecord) - .sorted(comparing(SourceRecord::getRecordId)) - .collect(Collectors.toList()); - sortByRecordId(get); + .toList(); context.assertEquals(expected.size(), get.result().getTotalRecords()); compareSourceRecords(context, expected, get.result().getSourceRecords()); async.complete(); @@ -1751,10 +1736,6 @@ private void getMarcSourceRecordsByListOfIds(TestContext context, Record.RecordT }); } - private void sortByRecordId(AsyncResult get) { - get.result().getSourceRecords().sort(comparing(SourceRecord::getRecordId)); - } - private void getMarcSourceRecordsBetweenDates(TestContext context, Record.RecordType recordType, RecordType parsedRecordType, OffsetDateTime earliestDate, OffsetDateTime latestDate) { @@ -1777,9 +1758,7 @@ private void getMarcSourceRecordsBetweenDates(TestContext context, Record.Record List expected = records.stream() .filter(r -> r.getRecordType().equals(recordType)) .map(RecordDaoUtil::toSourceRecord) - .sorted(comparing(SourceRecord::getRecordId)) - .collect(Collectors.toList()); - get.result().getSourceRecords().sort(comparing(SourceRecord::getRecordId)); + .toList(); context.assertEquals(expected.size(), get.result().getTotalRecords()); compareSourceRecords(context, expected, get.result().getSourceRecords()); async.complete(); @@ -1832,9 +1811,7 @@ private void getMarcSourceRecordsByListOfIdsThatAreDeleted(TestContext context, List expected = records.stream() .filter(r -> r.getRecordType().equals(recordType)) .map(RecordDaoUtil::toSourceRecord) - .sorted(comparing(SourceRecord::getRecordId)) - .collect(Collectors.toList()); - get.result().getSourceRecords().sort(comparing(SourceRecord::getRecordId)); + .toList(); context.assertEquals(expected.size(), get.result().getTotalRecords()); compareSourceRecords(context, expected, get.result().getSourceRecords()); async.complete(); @@ -1903,8 +1880,6 @@ private void updateParsedMarcRecords(TestContext context, Record.RecordType reco } context.assertEquals(0, update.result().getErrorMessages().size()); context.assertEquals(expected.size(), update.result().getTotalRecords()); - expected.sort(comparing(ParsedRecord::getId)); - update.result().getParsedRecords().sort(comparing(ParsedRecord::getId)); compareParsedRecords(context, expected, update.result().getParsedRecords()); GenericCompositeFuture.all(updated.stream().map(record -> recordDao .getRecordByMatchedId(record.getMatchedId(), TENANT_ID) @@ -2046,7 +2021,12 @@ private CompositeFuture saveRecords(List records) { private void compareRecords(TestContext context, List expected, List actual) { context.assertEquals(expected.size(), actual.size()); for (Record record : expected) { - compareRecords(context, record, record); + var actualRecord = actual.stream() + .filter(r -> Objects.equals(r.getId(), record.getId())) + .findFirst(); + if (actualRecord.isPresent()) { + compareRecords(context, record, actualRecord.get()); + } } } @@ -2111,7 +2091,12 @@ private void compareRecords(TestContext context, Record expected, StrippedParsed private void compareSourceRecords(TestContext context, List expected, List actual) { context.assertEquals(expected.size(), actual.size()); for (SourceRecord sourceRecord : expected) { - compareSourceRecords(context, sourceRecord, sourceRecord); + var sourceRecordActual = actual.stream() + .filter(sr -> Objects.equals(sr.getRecordId(), sourceRecord.getRecordId())) + .findFirst(); + if (sourceRecordActual.isPresent()) { + compareSourceRecords(context, sourceRecord, sourceRecordActual.get()); + } } } @@ -2144,7 +2129,10 @@ private void compareSourceRecords(TestContext context, SourceRecord expected, So private void compareParsedRecords(TestContext context, List expected, List actual) { context.assertEquals(expected.size(), actual.size()); for (ParsedRecord parsedRecord : expected) { - compareParsedRecords(context, parsedRecord, parsedRecord); + var actualParsedRecord = actual.stream().filter(a -> Objects.equals(a.getId(), parsedRecord.getId())).findFirst(); + if (actualParsedRecord.isPresent()) { + compareParsedRecords(context, parsedRecord, actualParsedRecord.get()); + } } } diff --git a/mod-source-record-storage-server/src/test/java/org/folio/verticle/consumers/ParsedRecordChunkConsumersVerticleTest.java b/mod-source-record-storage-server/src/test/java/org/folio/verticle/consumers/ParsedRecordChunkConsumersVerticleTest.java index 010cda28c..3fa774b8c 100644 --- a/mod-source-record-storage-server/src/test/java/org/folio/verticle/consumers/ParsedRecordChunkConsumersVerticleTest.java +++ b/mod-source-record-storage-server/src/test/java/org/folio/verticle/consumers/ParsedRecordChunkConsumersVerticleTest.java @@ -5,6 +5,7 @@ import io.vertx.core.json.JsonObject; import io.vertx.ext.unit.Async; import io.vertx.ext.unit.TestContext; +import io.vertx.ext.unit.junit.RunTestOnContext; import io.vertx.ext.unit.junit.VertxUnitRunner; import net.mguenther.kafka.junit.KeyValue; import net.mguenther.kafka.junit.ObserveKeyValues; @@ -30,6 +31,7 @@ import org.junit.After; import org.junit.Before; import org.junit.BeforeClass; +import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.MockitoAnnotations; @@ -62,6 +64,9 @@ public class ParsedRecordChunkConsumersVerticleTest extends AbstractLBServiceTes private static String recordId = UUID.randomUUID().toString(); + @Rule + public RunTestOnContext rule = new RunTestOnContext(); + private static RawRecord rawMarcRecord; private static ParsedRecord parsedMarcRecord; @@ -166,7 +171,7 @@ private void sendEventWithSavedMarcRecordCollectionPayloadAfterProcessingParsedR } @Test - public void shouldSendDIErrorEventsWhenParsedRecordChunkWasNotSaved() throws InterruptedException { + public void shouldSendDIErrorEventsWhenParsedRecordChunkWasNotSaved(TestContext context) throws InterruptedException { Record validRecord = TestMocks.getRecord(0).withSnapshotId(snapshotId); Record additionalRecord = getAdditionalRecord(validRecord, snapshotId, validRecord.getRecordType()); List records = List.of(validRecord, additionalRecord); @@ -174,7 +179,8 @@ public void shouldSendDIErrorEventsWhenParsedRecordChunkWasNotSaved() throws Int sendRecordsToKafka(jobExecutionId, records); - check_DI_ERROR_eventsSent(jobExecutionId, records, "ERROR: insert or update on table \"raw_records_lb\" violates foreign key constraint \"fk_raw_records_records\"" ); + check_DI_ERROR_eventsSent(jobExecutionId, records, + "ERROR: insert or update on table \"raw_records_lb\" violates foreign key constraint \"fk_raw_records_records\""); } @Test