diff --git a/mod-source-record-storage-server/src/main/java/org/folio/consumers/QuickMarcKafkaHandler.java b/mod-source-record-storage-server/src/main/java/org/folio/consumers/QuickMarcKafkaHandler.java index 0e423b6f4..a54182612 100644 --- a/mod-source-record-storage-server/src/main/java/org/folio/consumers/QuickMarcKafkaHandler.java +++ b/mod-source-record-storage-server/src/main/java/org/folio/consumers/QuickMarcKafkaHandler.java @@ -2,15 +2,10 @@ import static org.folio.dao.util.QMEventTypes.QM_ERROR; import static org.folio.dao.util.QMEventTypes.QM_SRS_MARC_RECORD_UPDATED; -import static org.folio.kafka.KafkaHeaderUtils.kafkaHeadersToMap; +import static org.folio.okapi.common.XOkapiHeaders.TENANT; import static org.folio.services.util.EventHandlingUtil.createProducer; import static org.folio.services.util.EventHandlingUtil.createProducerRecord; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.atomic.AtomicInteger; +import static org.folio.services.util.EventHandlingUtil.toOkapiHeaders; import io.vertx.core.Future; import io.vertx.core.Promise; @@ -19,19 +14,22 @@ import io.vertx.kafka.client.consumer.KafkaConsumerRecord; import io.vertx.kafka.client.producer.KafkaHeader; import io.vertx.kafka.client.producer.KafkaProducer; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.lang.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.folio.services.RecordService; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.stereotype.Component; - import org.folio.dao.util.QMEventTypes; import org.folio.kafka.AsyncRecordHandler; import org.folio.kafka.KafkaConfig; import org.folio.rest.jaxrs.model.Event; import org.folio.rest.jaxrs.model.ParsedRecordDto; -import org.folio.rest.util.OkapiConnectionParams; +import org.folio.services.RecordService; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; @Component public class QuickMarcKafkaHandler implements AsyncRecordHandler { @@ -67,22 +65,23 @@ public Future handle(KafkaConsumerRecord consumerRecord) log.trace("handle:: Handling kafka consumerRecord {}", consumerRecord); var kafkaHeaders = consumerRecord.headers(); - var params = new OkapiConnectionParams(kafkaHeadersToMap(kafkaHeaders), vertx); + var okapiHeaders = toOkapiHeaders(kafkaHeaders, null); return getEventPayload(consumerRecord) .compose(eventPayload -> { String snapshotId = eventPayload.getOrDefault(SNAPSHOT_ID_KEY, UUID.randomUUID().toString()); + var tenantId = okapiHeaders.get(TENANT); return getRecordDto(eventPayload) - .compose(recordDto -> recordService.updateSourceRecord(recordDto, snapshotId, params.getTenantId())) + .compose(recordDto -> recordService.updateSourceRecord(recordDto, snapshotId, okapiHeaders)) .compose(updatedRecord -> { eventPayload.put(updatedRecord.getRecordType().value(), Json.encode(updatedRecord)); - return sendEvent(eventPayload, QM_SRS_MARC_RECORD_UPDATED, params.getTenantId(), kafkaHeaders) + return sendEvent(eventPayload, QM_SRS_MARC_RECORD_UPDATED, tenantId, kafkaHeaders) .map(aBoolean -> consumerRecord.key()); }) .recover(th -> { log.warn("handle:: Failed to handle QM_RECORD_UPDATED event", th); eventPayload.put(ERROR_KEY, th.getMessage()); - return sendEvent(eventPayload, QM_ERROR, params.getTenantId(), kafkaHeaders) + return sendEvent(eventPayload, QM_ERROR, tenantId, kafkaHeaders) .map(aBoolean -> th.getMessage()); }); }) diff --git a/mod-source-record-storage-server/src/main/java/org/folio/dao/RecordDao.java b/mod-source-record-storage-server/src/main/java/org/folio/dao/RecordDao.java index d6679aff0..d784997fc 100644 --- a/mod-source-record-storage-server/src/main/java/org/folio/dao/RecordDao.java +++ b/mod-source-record-storage-server/src/main/java/org/folio/dao/RecordDao.java @@ -1,14 +1,17 @@ package org.folio.dao; +import io.github.jklingsporn.vertx.jooq.classic.reactivepg.ReactiveClassicGenericQueryExecutor; +import io.reactivex.Flowable; +import io.vertx.core.Future; +import io.vertx.sqlclient.Row; import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.function.Function; - -import io.vertx.sqlclient.Row; import net.sf.jsqlparser.JSQLParserException; import org.folio.dao.util.IdType; +import org.folio.dao.util.MatchField; import org.folio.dao.util.RecordType; import org.folio.rest.jaxrs.model.MarcBibCollection; import org.folio.rest.jaxrs.model.ParsedRecord; @@ -22,17 +25,12 @@ import org.folio.rest.jaxrs.model.StrippedParsedRecordCollection; import org.folio.rest.jooq.enums.RecordState; import org.folio.services.RecordSearchParameters; -import org.folio.dao.util.MatchField; import org.folio.services.util.TypeConnection; import org.folio.services.util.parser.ParseFieldsResult; import org.folio.services.util.parser.ParseLeaderResult; import org.jooq.Condition; import org.jooq.OrderField; -import io.github.jklingsporn.vertx.jooq.classic.reactivepg.ReactiveClassicGenericQueryExecutor; -import io.reactivex.Flowable; -import io.vertx.core.Future; - /** * Data access object for {@link Record} */ @@ -372,9 +370,10 @@ Future getMatchedRecordsIdentifiers(MatchField mat * @param txQE query execution * @param newRecord new Record to create * @param oldRecord old Record that has to be marked as "old" + * @param okapiHeaders okapi headers * @return future with new "updated" Record */ - Future saveUpdatedRecord(ReactiveClassicGenericQueryExecutor txQE, Record newRecord, Record oldRecord); + Future saveUpdatedRecord(ReactiveClassicGenericQueryExecutor txQE, Record newRecord, Record oldRecord, Map okapiHeaders); /** * Change suppress from discovery flag for record by external relation id diff --git a/mod-source-record-storage-server/src/main/java/org/folio/dao/RecordDaoImpl.java b/mod-source-record-storage-server/src/main/java/org/folio/dao/RecordDaoImpl.java index b7a25d8fd..a7a7fb3e5 100644 --- a/mod-source-record-storage-server/src/main/java/org/folio/dao/RecordDaoImpl.java +++ b/mod-source-record-storage-server/src/main/java/org/folio/dao/RecordDaoImpl.java @@ -735,8 +735,7 @@ public Future> getRecordByCondition(ReactiveClassicGenericQuery public Future saveRecord(Record record, Map okapiHeaders) { var tenantId = okapiHeaders.get(TENANT); LOG.trace("saveRecord:: Saving {} record {} for tenant {}", record.getRecordType(), record.getId(), tenantId); - return getQueryExecutor(tenantId).transaction(txQE -> saveRecord(txQE, record, okapiHeaders)) - .onSuccess(created -> recordDomainEventPublisher.publishRecordCreated(created, okapiHeaders)); + return getQueryExecutor(tenantId).transaction(txQE -> saveRecord(txQE, record, okapiHeaders)); } @Override @@ -971,8 +970,9 @@ public Future updateRecord(Record record, Map okapiHeade LOG.trace("updateRecord:: Updating {} record {} for tenant {}", record.getRecordType(), record.getId(), tenantId); return getQueryExecutor(tenantId).transaction(txQE -> getRecordById(txQE, record.getId()) .compose(optionalRecord -> optionalRecord - .map(r -> saveRecord(txQE, record, okapiHeaders)) - .orElse(Future.failedFuture(new NotFoundException(format(RECORD_NOT_FOUND_TEMPLATE, record.getId())))))); + .map(r -> insertOrUpdateRecord(txQE, record)) + .orElse(Future.failedFuture(new NotFoundException(format(RECORD_NOT_FOUND_TEMPLATE, record.getId())))))) + .onSuccess(updated -> recordDomainEventPublisher.publishRecordUpdated(updated, okapiHeaders)); } @Override @@ -1299,9 +1299,10 @@ private MarcBibCollection toMarcBibCollection(QueryResult result) { } @Override - public Future saveUpdatedRecord(ReactiveClassicGenericQueryExecutor txQE, Record newRecord, Record oldRecord) { + public Future saveUpdatedRecord(ReactiveClassicGenericQueryExecutor txQE, Record newRecord, Record oldRecord, Map okapiHeaders) { LOG.trace("saveUpdatedRecord:: Saving updated record {}", newRecord.getId()); - return insertOrUpdateRecord(txQE, oldRecord).compose(r -> insertOrUpdateRecord(txQE, newRecord)); + return insertOrUpdateRecord(txQE, oldRecord).compose(r -> insertOrUpdateRecord(txQE, newRecord)) + .onSuccess(r -> recordDomainEventPublisher.publishRecordUpdated(r, okapiHeaders)); } @Override diff --git a/mod-source-record-storage-server/src/main/java/org/folio/services/RecordService.java b/mod-source-record-storage-server/src/main/java/org/folio/services/RecordService.java index b8e666b55..c1cf7294c 100644 --- a/mod-source-record-storage-server/src/main/java/org/folio/services/RecordService.java +++ b/mod-source-record-storage-server/src/main/java/org/folio/services/RecordService.java @@ -241,10 +241,10 @@ public interface RecordService { * * @param parsedRecordDto parsed record DTO containing updates to parsed record * @param snapshotId snapshot id to which new Record should be linked - * @param tenantId tenant id + * @param okapiHeaders okapi headers * @return future with updated Record */ - Future updateSourceRecord(ParsedRecordDto parsedRecordDto, String snapshotId, String tenantId); + Future updateSourceRecord(ParsedRecordDto parsedRecordDto, String snapshotId, Map okapiHeaders); /** * Find marc bib ids by incoming arrays from SRM and exclude all valid marc bib and return only marc bib ids, diff --git a/mod-source-record-storage-server/src/main/java/org/folio/services/RecordServiceImpl.java b/mod-source-record-storage-server/src/main/java/org/folio/services/RecordServiceImpl.java index 06c455cd6..f8b8f39d2 100644 --- a/mod-source-record-storage-server/src/main/java/org/folio/services/RecordServiceImpl.java +++ b/mod-source-record-storage-server/src/main/java/org/folio/services/RecordServiceImpl.java @@ -149,7 +149,8 @@ public Future saveRecord(Record record, Map okapiHeaders if (generation > 0) { return recordDao.getRecordByMatchedId(txQE, record.getMatchedId()) .compose(optionalMatchedRecord -> optionalMatchedRecord - .map(matchedRecord -> recordDao.saveUpdatedRecord(txQE, ensureRecordForeignKeys(record.withGeneration(generation)), matchedRecord.withState(Record.State.OLD))) + .map(matchedRecord -> recordDao.saveUpdatedRecord(txQE, ensureRecordForeignKeys(record.withGeneration(generation)), + matchedRecord.withState(Record.State.OLD), okapiHeaders)) .orElseGet(() -> recordDao.saveRecord(txQE, ensureRecordForeignKeys(record.withGeneration(generation)), okapiHeaders))); } else { return recordDao.saveRecord(txQE, ensureRecordForeignKeys(record.withGeneration(generation)), okapiHeaders); @@ -293,7 +294,7 @@ public Future deleteRecordsByExternalId(String externalId, String tenantId } @Override - public Future updateSourceRecord(ParsedRecordDto parsedRecordDto, String snapshotId, String tenantId) { + public Future updateSourceRecord(ParsedRecordDto parsedRecordDto, String snapshotId, Map okapiHeaders) { String newRecordId = UUID.randomUUID().toString(); return recordDao.executeInTransaction(txQE -> recordDao.getRecordByMatchedId(txQE, parsedRecordDto.getId()) .compose(optionalRecord -> optionalRecord @@ -313,9 +314,9 @@ public Future updateSourceRecord(ParsedRecordDto parsedRecordDto, String .withParsedRecord(new ParsedRecord().withId(newRecordId).withContent(parsedRecordDto.getParsedRecord().getContent())) .withExternalIdsHolder(parsedRecordDto.getExternalIdsHolder()) .withAdditionalInfo(parsedRecordDto.getAdditionalInfo()) - .withMetadata(parsedRecordDto.getMetadata()), existingRecord.withState(Record.State.OLD)))) + .withMetadata(parsedRecordDto.getMetadata()), existingRecord.withState(Record.State.OLD), okapiHeaders))) .orElse(Future.failedFuture(new NotFoundException( - format(RECORD_NOT_FOUND_TEMPLATE, parsedRecordDto.getId()))))), tenantId); + format(RECORD_NOT_FOUND_TEMPLATE, parsedRecordDto.getId()))))), okapiHeaders.get(TENANT)); } @Override diff --git a/mod-source-record-storage-server/src/test/java/org/folio/services/RecordServiceTest.java b/mod-source-record-storage-server/src/test/java/org/folio/services/RecordServiceTest.java index c5c116c8f..f56caef20 100644 --- a/mod-source-record-storage-server/src/test/java/org/folio/services/RecordServiceTest.java +++ b/mod-source-record-storage-server/src/test/java/org/folio/services/RecordServiceTest.java @@ -965,8 +965,8 @@ public void shouldUpdateRecordState(TestContext context) { var okapiHeaders = Map.of(TENANT, TENANT_ID); recordDao.saveRecord(original, okapiHeaders) - .compose(ar -> recordService.updateSourceRecord(parsedRecordDto, snapshotId, TENANT_ID)) - .compose(ar -> recordService.updateSourceRecord(parsedRecordDto, snapshotId, TENANT_ID)) + .compose(ar -> recordService.updateSourceRecord(parsedRecordDto, snapshotId, okapiHeaders)) + .compose(ar -> recordService.updateSourceRecord(parsedRecordDto, snapshotId, okapiHeaders)) .compose(ar -> recordService.updateRecordsState(original.getMatchedId(), RecordState.DRAFT, RecordType.MARC_BIB, TENANT_ID)) .onComplete(update -> { if (update.failed()) { @@ -1002,7 +1002,7 @@ public void shouldUpdateMarcAuthorityRecordStateToDeleted(TestContext context) { var okapiHeaders = Map.of(TENANT, TENANT_ID); recordDao.saveRecord(original, okapiHeaders) - .compose(ar -> recordService.updateSourceRecord(parsedRecordDto, snapshotId, TENANT_ID)) + .compose(ar -> recordService.updateSourceRecord(parsedRecordDto, snapshotId, okapiHeaders)) .compose(ar -> recordService.updateRecordsState(original.getMatchedId(), RecordState.DELETED, RecordType.MARC_AUTHORITY, TENANT_ID)) .onComplete(update -> { if (update.failed()) { @@ -1191,7 +1191,7 @@ public void shouldGetMarcBibSourceRecordByMatchedIdNotEqualToId(TestContext cont var okapiHeaders = Map.of(TENANT, TENANT_ID); recordDao.saveRecord(expected, okapiHeaders) - .compose(ar -> recordService.updateSourceRecord(parsedRecordDto, snapshotId, TENANT_ID)) + .compose(ar -> recordService.updateSourceRecord(parsedRecordDto, snapshotId, okapiHeaders)) .onComplete(update -> { if (update.failed()) { context.fail(update.cause()); @@ -1361,7 +1361,7 @@ public void shouldUpdateSourceRecord(TestContext context) { .withAdditionalInfo(expected.getAdditionalInfo()) .withExternalIdsHolder(expected.getExternalIdsHolder()) .withMetadata(expected.getMetadata()); - recordService.updateSourceRecord(parsedRecordDto, snapshotId, TENANT_ID).onComplete(update -> { + recordService.updateSourceRecord(parsedRecordDto, snapshotId, okapiHeaders).onComplete(update -> { if (update.failed()) { context.fail(update.cause()); }