diff --git a/NEWS.md b/NEWS.md index c0bda29eb..0412dc319 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,10 +1,12 @@ ## 2024-xx-xx 5.8.0-SNAPSHOT +* [MODSOURCE-733](https://issues.folio.org/browse/MODSOURCE-733) Reduce Memory Allocation of Strings * [MODSOURCE-506](https://issues.folio.org/browse/MODSOURCE-506) Remove rawRecord field from source record * [MODSOURCE-709](https://issues.folio.org/browse/MODSOURCE-709) MARC authority record is not created when use Job profile with match profile by absent subfield/field * [MODSOURCE-677](https://issues.folio.org/browse/MODSOURCE-677) Import is completed with errors when control field that differs from 001 is used for marc-to-marc matching * [MODSOURCE-722](https://issues.folio.org/browse/MODSOURCE-722) deleteMarcIndexersOldVersions: relation "marc_records_tracking" does not exist * [MODSOURMAN-1106](https://issues.folio.org/browse/MODSOURMAN-1106) The status of Instance is '-' in the Import log after uploading file. The numbers of updated SRS and Instance are not displayed in the Summary table. * [MODSOURCE-717](https://issues.folio.org/browse/MODSOURCE-717) MARC modifications not processed when placed after Holdings Update action in a job profile +* [MODSOURCE-739](https://issues.folio.org/browse/MODSOURCE-739) Create Kafka topics instead of relying on auto create in mod-srs * [MODSOURCE-729](https://issues.folio.org/browse/MODSOURCE-729) Implement new endpoint to be used for matching ## 2023-10-13 v5.7.0 diff --git a/README.md b/README.md index 6bf94bdf9..a76c5a65a 100644 --- a/README.md +++ b/README.md @@ -125,7 +125,18 @@ After setup, it is good to check logs in all related modules for errors. Data im * "_srs.kafka.AuthorityLinkChunkConsumer.loadLimit_": 2 * Relevant from the **Poppy** release, module versions from 5.7.0: * "_srs.linking-rules-cache.expiration.time.hours_": 12 - +* Variables for setting number of partitions of topics: + * MARC_BIB + * DI_PARSED_RECORDS_CHUNK_SAVED + * DI_SRS_MARC_BIB_INSTANCE_HRID_SET + * DI_SRS_MARC_BIB_RECORD_MODIFIED + * DI_SRS_MARC_BIB_RECORD_MODIFIED_READY_FOR_POST_PROCESSING + * DI_SRS_MARC_BIB_RECORD_MATCHED + * DI_SRS_MARC_BIB_RECORD_NOT_MATCHED + * DI_SRS_MARC_AUTHORITY_RECORD_MATCHED + * DI_SRS_MARC_AUTHORITY_RECORD_NOT_MATCHED + * DI_SRS_MARC_AUTHORITY_RECORD_DELETED + Default value for all partitions is 1 ## Database schemas The mod-source-record-storage module uses relational approach and Liquibase to define database schemas. diff --git a/descriptors/ModuleDescriptor-template.json b/descriptors/ModuleDescriptor-template.json index e25f71274..9b87c1569 100644 --- a/descriptors/ModuleDescriptor-template.json +++ b/descriptors/ModuleDescriptor-template.json @@ -54,7 +54,7 @@ }, { "id": "source-storage-records", - "version": "3.1", + "version": "3.3", "handlers": [ { "methods": [ diff --git a/mod-source-record-storage-server/pom.xml b/mod-source-record-storage-server/pom.xml index 2bb4333c3..676a1871e 100644 --- a/mod-source-record-storage-server/pom.xml +++ b/mod-source-record-storage-server/pom.xml @@ -188,7 +188,7 @@ org.folio folio-kafka-wrapper - 3.0.0 + 3.1.0-SNAPSHOT net.mguenther.kafka diff --git a/mod-source-record-storage-server/src/main/java/org/folio/consumers/DataImportKafkaHandler.java b/mod-source-record-storage-server/src/main/java/org/folio/consumers/DataImportKafkaHandler.java index 57f2dd2f0..2336f5148 100644 --- a/mod-source-record-storage-server/src/main/java/org/folio/consumers/DataImportKafkaHandler.java +++ b/mod-source-record-storage-server/src/main/java/org/folio/consumers/DataImportKafkaHandler.java @@ -4,13 +4,13 @@ import io.vertx.core.Promise; import io.vertx.core.Vertx; import io.vertx.core.json.Json; +import io.vertx.core.json.jackson.DatabindCodec; import io.vertx.kafka.client.consumer.KafkaConsumerRecord; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.folio.DataImportEventPayload; import org.folio.dataimport.util.OkapiConnectionParams; -import org.folio.dbschema.ObjectMapperTool; import org.folio.kafka.AsyncRecordHandler; import org.folio.processing.events.EventManager; import org.folio.processing.exceptions.EventProcessingException; @@ -29,7 +29,7 @@ @Component @Qualifier("DataImportKafkaHandler") -public class DataImportKafkaHandler implements AsyncRecordHandler { +public class DataImportKafkaHandler implements AsyncRecordHandler { private static final Logger LOGGER = LogManager.getLogger(); @@ -48,14 +48,14 @@ public DataImportKafkaHandler(Vertx vertx, JobProfileSnapshotCache profileSnapsh } @Override - public Future handle(KafkaConsumerRecord targetRecord) { + public Future handle(KafkaConsumerRecord targetRecord) { LOGGER.trace("handle:: Handling kafka record: {}", targetRecord); String recordId = extractHeaderValue(RECORD_ID_HEADER, targetRecord.headers()); String chunkId = extractHeaderValue(CHUNK_ID_HEADER, targetRecord.headers()); String userId = extractHeaderValue(USER_ID_HEADER, targetRecord.headers()); try { Promise promise = Promise.promise(); - Event event = ObjectMapperTool.getMapper().readValue(targetRecord.value(), Event.class); + Event event = DatabindCodec.mapper().readValue(targetRecord.value(), Event.class); DataImportEventPayload eventPayload = Json.decodeValue(event.getEventPayload(), DataImportEventPayload.class); LOGGER.debug("handle:: Data import event payload has been received with event type: '{}' by jobExecutionId: '{}' and recordId: '{}' and chunkId: '{}' and userId: '{}'", eventPayload.getEventType(), eventPayload.getJobExecutionId(), recordId, chunkId, userId); diff --git a/mod-source-record-storage-server/src/main/java/org/folio/consumers/ParsedRecordChunksKafkaHandler.java b/mod-source-record-storage-server/src/main/java/org/folio/consumers/ParsedRecordChunksKafkaHandler.java index 46908a125..e9cdbe4e7 100644 --- a/mod-source-record-storage-server/src/main/java/org/folio/consumers/ParsedRecordChunksKafkaHandler.java +++ b/mod-source-record-storage-server/src/main/java/org/folio/consumers/ParsedRecordChunksKafkaHandler.java @@ -4,6 +4,7 @@ import io.vertx.core.Promise; import io.vertx.core.Vertx; import io.vertx.core.json.Json; +import io.vertx.core.json.jackson.DatabindCodec; import io.vertx.kafka.client.consumer.KafkaConsumerRecord; import io.vertx.kafka.client.producer.KafkaHeader; import io.vertx.kafka.client.producer.KafkaProducer; @@ -37,7 +38,7 @@ import static org.folio.services.util.KafkaUtil.extractHeaderValue; @Component -public class ParsedRecordChunksKafkaHandler implements AsyncRecordHandler { +public class ParsedRecordChunksKafkaHandler implements AsyncRecordHandler { private static final Logger LOGGER = LogManager.getLogger(); public static final String JOB_EXECUTION_ID_HEADER = "jobExecutionId"; @@ -65,7 +66,7 @@ public ParsedRecordChunksKafkaHandler(@Autowired RecordService recordService, } @Override - public Future handle(KafkaConsumerRecord targetRecord) { + public Future handle(KafkaConsumerRecord targetRecord) { LOGGER.trace("handle:: Handling kafka record: {}", targetRecord); String jobExecutionId = extractHeaderValue(JOB_EXECUTION_ID_HEADER, targetRecord.headers()); String chunkId = extractHeaderValue(CHUNK_ID_HEADER, targetRecord.headers()); @@ -74,7 +75,7 @@ public Future handle(KafkaConsumerRecord targetRecord) { String key = targetRecord.key(); try { - Event event = Json.decodeValue(targetRecord.value(), Event.class); + Event event = DatabindCodec.mapper().readValue(targetRecord.value(), Event.class); RecordCollection recordCollection = Json.decodeValue(event.getEventPayload(), RecordCollection.class); List kafkaHeaders = targetRecord.headers(); @@ -93,7 +94,7 @@ public Future handle(KafkaConsumerRecord targetRecord) { } } - private Future sendBackRecordsBatchResponse(RecordsBatchResponse recordsBatchResponse, List kafkaHeaders, String tenantId, int chunkNumber, String eventType, KafkaConsumerRecord commonRecord) { + private Future sendBackRecordsBatchResponse(RecordsBatchResponse recordsBatchResponse, List kafkaHeaders, String tenantId, int chunkNumber, String eventType, KafkaConsumerRecord commonRecord) { Event event; event = new Event() .withId(UUID.randomUUID().toString()) diff --git a/mod-source-record-storage-server/src/main/java/org/folio/dao/util/IdType.java b/mod-source-record-storage-server/src/main/java/org/folio/dao/util/IdType.java index 5ba04bd17..83b631798 100644 --- a/mod-source-record-storage-server/src/main/java/org/folio/dao/util/IdType.java +++ b/mod-source-record-storage-server/src/main/java/org/folio/dao/util/IdType.java @@ -7,7 +7,8 @@ public enum IdType { AUTHORITY("authorityId"), EXTERNAL("externalId"), // NOTE: not really external id but is default from dto - RECORD("matchedId"); + RECORD("matchedId"), + SRS_RECORD("id"); private final String idField; diff --git a/mod-source-record-storage-server/src/main/java/org/folio/errorhandlers/ParsedRecordChunksErrorHandler.java b/mod-source-record-storage-server/src/main/java/org/folio/errorhandlers/ParsedRecordChunksErrorHandler.java index 93a03e0ad..adf1c9d47 100644 --- a/mod-source-record-storage-server/src/main/java/org/folio/errorhandlers/ParsedRecordChunksErrorHandler.java +++ b/mod-source-record-storage-server/src/main/java/org/folio/errorhandlers/ParsedRecordChunksErrorHandler.java @@ -3,6 +3,7 @@ import io.vertx.core.Future; import io.vertx.core.Vertx; import io.vertx.core.json.Json; +import io.vertx.core.json.jackson.DatabindCodec; import io.vertx.kafka.client.consumer.KafkaConsumerRecord; import io.vertx.kafka.client.producer.KafkaHeader; import io.vertx.kafka.client.producer.impl.KafkaHeaderImpl; @@ -23,6 +24,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -38,7 +40,7 @@ * with status 'Completed with errors' with showing error messge instead of hanging progress bar. */ @Component -public class ParsedRecordChunksErrorHandler implements ProcessRecordErrorHandler { +public class ParsedRecordChunksErrorHandler implements ProcessRecordErrorHandler { private static final Logger LOGGER = LogManager.getLogger(); @@ -53,12 +55,18 @@ public class ParsedRecordChunksErrorHandler implements ProcessRecordErrorHandler private Vertx vertx; @Override - public void handle(Throwable throwable, KafkaConsumerRecord record) { - LOGGER.trace("handle:: Handling record {}", record); - Event event = Json.decodeValue(record.value(), Event.class); - RecordCollection recordCollection = Json.decodeValue(event.getEventPayload(), RecordCollection.class); + public void handle(Throwable throwable, KafkaConsumerRecord consumerRecord) { + LOGGER.trace("handle:: Handling record {}", consumerRecord); + Event event; + try { + event = DatabindCodec.mapper().readValue(consumerRecord.value(), Event.class); + } catch (IOException e) { + LOGGER.error("Something happened when deserializing record", e); + return; + } + RecordCollection recordCollection = Json.decodeValue(event.getEventPayload(), RecordCollection.class); - List kafkaHeaders = record.headers(); + List kafkaHeaders = consumerRecord.headers(); OkapiConnectionParams okapiConnectionParams = new OkapiConnectionParams(KafkaHeaderUtils.kafkaHeadersToMap(kafkaHeaders), vertx); String jobExecutionId = okapiConnectionParams.getHeaders().get(JOB_EXECUTION_ID_HEADER); diff --git a/mod-source-record-storage-server/src/main/java/org/folio/rest/impl/ModTenantAPI.java b/mod-source-record-storage-server/src/main/java/org/folio/rest/impl/ModTenantAPI.java index cfc7940a9..0f54f4185 100644 --- a/mod-source-record-storage-server/src/main/java/org/folio/rest/impl/ModTenantAPI.java +++ b/mod-source-record-storage-server/src/main/java/org/folio/rest/impl/ModTenantAPI.java @@ -1,11 +1,17 @@ package org.folio.rest.impl; +import static org.apache.commons.lang3.StringUtils.EMPTY; +import static org.folio.rest.tools.utils.TenantTool.tenantId; + import io.vertx.core.AsyncResult; import io.vertx.core.Context; import io.vertx.core.Future; import io.vertx.core.Handler; import io.vertx.core.Promise; import io.vertx.core.Vertx; +import java.util.Date; +import java.util.Map; +import javax.ws.rs.core.Response; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.folio.RecordStorageKafkaTopic; @@ -17,17 +23,11 @@ import org.folio.rest.jaxrs.model.Snapshot.Status; import org.folio.rest.jaxrs.model.TenantAttributes; import org.folio.rest.tools.utils.TenantTool; +import org.folio.services.SRSKafkaTopicService; import org.folio.services.SnapshotService; import org.folio.spring.SpringContextUtil; import org.springframework.beans.factory.annotation.Autowired; -import javax.ws.rs.core.Response; -import java.util.Date; -import java.util.Map; - -import static org.apache.commons.lang3.StringUtils.EMPTY; -import static org.folio.rest.tools.utils.TenantTool.tenantId; - public class ModTenantAPI extends TenantAPI { private static final Logger LOGGER = LogManager.getLogger(); @@ -42,6 +42,9 @@ public class ModTenantAPI extends TenantAPI { @Autowired private SnapshotService snapshotService; + @Autowired + private SRSKafkaTopicService srsKafkaTopicService; + private final String tenantId; public ModTenantAPI(Vertx vertx, String tenantId) { //NOSONAR @@ -55,14 +58,13 @@ Future loadData(TenantAttributes attributes, String tenantId, Map headers, Context context) { // create topics before loading data Vertx vertx = context.owner(); - return new KafkaAdminClientService(vertx) - .createKafkaTopics(RecordStorageKafkaTopic.values(), tenantId) - .compose(x -> super.loadData(attributes, tenantId, headers, context)) + + return super.loadData(attributes, tenantId, headers, context) .compose(num -> { - LiquibaseUtil.initializeSchemaForTenant(vertx, tenantId); - return setLoadSampleParameter(attributes, context) - .compose(v -> createStubSnapshot(attributes)).map(num); - }); + LiquibaseUtil.initializeSchemaForTenant(vertx, tenantId); + return setLoadSampleParameter(attributes, context) + .compose(v -> createStubSnapshot(attributes)).map(num); + }); } @Validate @@ -73,7 +75,16 @@ public void postTenant(TenantAttributes tenantAttributes, Map he Future result = tenantAttributes.getPurge() != null && tenantAttributes.getPurge() ? new KafkaAdminClientService(context.owner()).deleteKafkaTopics(RecordStorageKafkaTopic.values(), tenantId(headers)) : Future.succeededFuture(); - result.onComplete(x -> super.postTenant(tenantAttributes, headers, handler, context)); + result.onComplete(x -> super.postTenant(tenantAttributes, headers, ar -> { + if (ar.succeeded()) { + Vertx vertx = context.owner(); + var kafkaAdminClientService = new KafkaAdminClientService(vertx); + kafkaAdminClientService.createKafkaTopics(srsKafkaTopicService.createTopicObjects(), tenantId); + handler.handle(Future.succeededFuture(ar.result())); + } else { + handler.handle(Future.failedFuture(ar.cause())); + } + }, context)); } private Future setLoadSampleParameter(TenantAttributes attributes, Context context) { diff --git a/mod-source-record-storage-server/src/main/java/org/folio/rest/impl/SourceStorageRecordsImpl.java b/mod-source-record-storage-server/src/main/java/org/folio/rest/impl/SourceStorageRecordsImpl.java index 5efa6b464..7b68b1c02 100644 --- a/mod-source-record-storage-server/src/main/java/org/folio/rest/impl/SourceStorageRecordsImpl.java +++ b/mod-source-record-storage-server/src/main/java/org/folio/rest/impl/SourceStorageRecordsImpl.java @@ -15,7 +15,6 @@ import org.folio.dataimport.util.ExceptionHelper; import org.folio.rest.jaxrs.model.Record; -import org.folio.rest.jaxrs.model.Record.State; import org.folio.rest.jaxrs.model.RecordMatchingDto; import org.folio.rest.jaxrs.resource.SourceStorageRecords; import org.folio.rest.tools.utils.TenantTool; @@ -116,14 +115,11 @@ public void putSourceStorageRecordsGenerationById(String matchedId, Record entit } @Override - public void deleteSourceStorageRecordsById(String id, Map okapiHeaders, + public void deleteSourceStorageRecordsById(String id, String idType, Map okapiHeaders, Handler> asyncResultHandler, Context vertxContext) { vertxContext.runOnContext(v -> { try { - recordService.getRecordById(id, tenantId) - .map(recordOptional -> recordOptional.orElseThrow(() -> new NotFoundException(format(NOT_FOUND_MESSAGE, Record.class.getSimpleName(), id)))) - .compose(record -> record.getState().equals(State.DELETED) ? Future.succeededFuture(true) - : recordService.updateRecord(record.withState(State.DELETED), tenantId).map(r -> true)) + recordService.deleteRecordById(id, toExternalIdType(idType), tenantId).map(r -> true) .map(updated -> DeleteSourceStorageRecordsByIdResponse.respond204()).map(Response.class::cast) .otherwise(ExceptionHelper::mapExceptionToResponse).onComplete(asyncResultHandler); } catch (Exception e) { diff --git a/mod-source-record-storage-server/src/main/java/org/folio/services/RecordService.java b/mod-source-record-storage-server/src/main/java/org/folio/services/RecordService.java index 9c502590d..f97b0bef8 100644 --- a/mod-source-record-storage-server/src/main/java/org/folio/services/RecordService.java +++ b/mod-source-record-storage-server/src/main/java/org/folio/services/RecordService.java @@ -267,4 +267,5 @@ public interface RecordService { */ Future updateRecordsState(String matchedId, RecordState state, RecordType recordType, String tenantId); + Future deleteRecordById(String id, IdType idType, String tenantId); } diff --git a/mod-source-record-storage-server/src/main/java/org/folio/services/RecordServiceImpl.java b/mod-source-record-storage-server/src/main/java/org/folio/services/RecordServiceImpl.java index 20a79ba88..1b971d89a 100644 --- a/mod-source-record-storage-server/src/main/java/org/folio/services/RecordServiceImpl.java +++ b/mod-source-record-storage-server/src/main/java/org/folio/services/RecordServiceImpl.java @@ -41,8 +41,12 @@ import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.folio.dao.util.IdType; +import org.folio.dao.util.ParsedRecordDaoUtil; import org.folio.dao.util.MatchField; import org.folio.dao.util.RecordDaoUtil; +import org.folio.dao.util.RecordType; +import org.folio.dao.util.SnapshotDaoUtil; import org.folio.okapi.common.GenericCompositeFuture; import org.folio.processing.value.ListValue; import org.folio.rest.jaxrs.model.Filter; @@ -56,9 +60,6 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.folio.dao.RecordDao; -import org.folio.dao.util.IdType; -import org.folio.dao.util.RecordType; -import org.folio.dao.util.SnapshotDaoUtil; import org.folio.rest.jaxrs.model.FetchParsedRecordsBatchRequest; import org.folio.rest.jaxrs.model.FieldRange; import org.folio.rest.jaxrs.model.MarcBibCollection; @@ -82,16 +83,20 @@ public class RecordServiceImpl implements RecordService { private static final Logger LOG = LogManager.getLogger(); - private final RecordDao recordDao; + private static final String DUPLICATE_CONSTRAINT = "idx_records_matched_id_gen"; private static final String DUPLICATE_RECORD_MSG = "Incoming file may contain duplicates"; private static final String MULTIPLE_MATCHING_FILTERS_SPECIFIED_MSG = "Only one matching filter is allowed in the current API implementation"; private static final String MATCHED_ID_NOT_EQUAL_TO_999_FIELD = "Matched id (%s) not equal to 999ff$s (%s) field"; private static final String RECORD_WITH_GIVEN_MATCHED_ID_NOT_FOUND = "Record with given matched id (%s) not found"; + private static final String NOT_FOUND_MESSAGE = "%s with id '%s' was not found"; + private static final Character DELETED_LEADER_RECORD_STATUS = 'd'; public static final String UPDATE_RECORD_DUPLICATE_EXCEPTION = "Incoming record could be a duplicate, incoming record generation should not be the same as matched record generation and the execution of job should be started after of creating the previous record generation"; public static final char SUBFIELD_S = 's'; public static final char INDICATOR = 'f'; + private final RecordDao recordDao; + @Autowired public RecordServiceImpl(final RecordDao recordDao) { this.recordDao = recordDao; @@ -325,48 +330,17 @@ public Future getMatchedRecordsIdentifiers(RecordM true, recordMatchingDto.getOffset(), recordMatchingDto.getLimit(), tenantId); } - private MatchField prepareMatchField(RecordMatchingDto recordMatchingDto) { - // only one matching filter is expected in the current implementation for processing records matching - if (recordMatchingDto.getFilters().size() > 1) { - throw new BadRequestException(MULTIPLE_MATCHING_FILTERS_SPECIFIED_MSG); - } - - Filter filter = recordMatchingDto.getFilters().get(0); - String ind1 = filter.getIndicator1() != null ? filter.getIndicator1() : StringUtils.EMPTY; - String ind2 = filter.getIndicator2() != null ? filter.getIndicator2() : StringUtils.EMPTY; - String subfield = filter.getSubfield() != null ? filter.getSubfield() : StringUtils.EMPTY; - return new MatchField(filter.getField(), ind1, ind2, subfield, ListValue.of(filter.getValues())); - } - - private TypeConnection getTypeConnection(RecordMatchingDto.RecordType recordType) { - return switch (recordType) { - case MARC_BIB -> TypeConnection.MARC_BIB; - case MARC_HOLDING -> TypeConnection.MARC_HOLDINGS; - case MARC_AUTHORITY -> TypeConnection.MARC_AUTHORITY; - }; - } - - private Future processDefaultMatchField(MatchField matchField, TypeConnection typeConnection, - RecordMatchingDto recordMatchingDto, String tenantId) { - Condition condition = filterRecordByState(Record.State.ACTUAL.value()); - List values = ((ListValue) matchField.getValue()).getValue(); - - if (matchField.isMatchedId()) { - condition = condition.and(getExternalIdsCondition(values, IdType.RECORD)); - } else if (matchField.isExternalId()) { - condition = condition.and(getExternalIdsCondition(values, IdType.EXTERNAL)); - } else if (matchField.isExternalHrid()) { - condition = condition.and(filterRecordByExternalHridValues(values)); - } - - return recordDao.getRecords(condition, typeConnection.getDbType(), Collections.emptyList(), recordMatchingDto.getOffset(), - recordMatchingDto.getLimit(), recordMatchingDto.getReturnTotalRecordsCount(), tenantId) - .map(recordCollection -> recordCollection.getRecords().stream() - .map(sourceRecord -> new RecordIdentifiersDto() - .withRecordId(sourceRecord.getId()) - .withExternalId(RecordDaoUtil.getExternalId(sourceRecord.getExternalIdsHolder(), sourceRecord.getRecordType()))) - .collect(collectingAndThen(toList(), identifiers -> new RecordsIdentifiersCollection() - .withIdentifiers(identifiers).withTotalRecords(recordCollection.getTotalRecords())))); + @Override + public Future deleteRecordById(String id, IdType idType, String tenantId) { + return recordDao.getRecordByExternalId(id, idType, tenantId) + .map(recordOptional -> recordOptional.orElseThrow(() -> new NotFoundException(format(NOT_FOUND_MESSAGE, Record.class.getSimpleName(), id)))) + .map(record -> { + record.withState(Record.State.DELETED); + record.setAdditionalInfo(record.getAdditionalInfo().withSuppressDiscovery(true)); + ParsedRecordDaoUtil.updateLeaderStatus(record.getParsedRecord(), DELETED_LEADER_RECORD_STATUS); + return record; + }) + .compose(record -> updateRecord(record, tenantId)).map(r -> null); } private Future setMatchedIdForRecord(Record record, String tenantId) { @@ -470,4 +444,49 @@ private boolean checkFieldRange(JsonObject fields, List data) { } return false; } + + private MatchField prepareMatchField(RecordMatchingDto recordMatchingDto) { + // only one matching filter is expected in the current implementation for processing records matching + if (recordMatchingDto.getFilters().size() > 1) { + throw new BadRequestException(MULTIPLE_MATCHING_FILTERS_SPECIFIED_MSG); + } + + Filter filter = recordMatchingDto.getFilters().get(0); + String ind1 = filter.getIndicator1() != null ? filter.getIndicator1() : StringUtils.EMPTY; + String ind2 = filter.getIndicator2() != null ? filter.getIndicator2() : StringUtils.EMPTY; + String subfield = filter.getSubfield() != null ? filter.getSubfield() : StringUtils.EMPTY; + return new MatchField(filter.getField(), ind1, ind2, subfield, ListValue.of(filter.getValues())); + } + + private TypeConnection getTypeConnection(RecordMatchingDto.RecordType recordType) { + return switch (recordType) { + case MARC_BIB -> TypeConnection.MARC_BIB; + case MARC_HOLDING -> TypeConnection.MARC_HOLDINGS; + case MARC_AUTHORITY -> TypeConnection.MARC_AUTHORITY; + }; + } + + private Future processDefaultMatchField(MatchField matchField, TypeConnection typeConnection, + RecordMatchingDto recordMatchingDto, String tenantId) { + Condition condition = filterRecordByState(Record.State.ACTUAL.value()); + List values = ((ListValue) matchField.getValue()).getValue(); + + if (matchField.isMatchedId()) { + condition = condition.and(getExternalIdsCondition(values, IdType.RECORD)); + } else if (matchField.isExternalId()) { + condition = condition.and(getExternalIdsCondition(values, IdType.EXTERNAL)); + } else if (matchField.isExternalHrid()) { + condition = condition.and(filterRecordByExternalHridValues(values)); + } + + return recordDao.getRecords(condition, typeConnection.getDbType(), Collections.emptyList(), recordMatchingDto.getOffset(), + recordMatchingDto.getLimit(), recordMatchingDto.getReturnTotalRecordsCount(), tenantId) + .map(recordCollection -> recordCollection.getRecords().stream() + .map(sourceRecord -> new RecordIdentifiersDto() + .withRecordId(sourceRecord.getId()) + .withExternalId(RecordDaoUtil.getExternalId(sourceRecord.getExternalIdsHolder(), sourceRecord.getRecordType()))) + .collect(collectingAndThen(toList(), identifiers -> new RecordsIdentifiersCollection() + .withIdentifiers(identifiers).withTotalRecords(recordCollection.getTotalRecords())))); + } + } diff --git a/mod-source-record-storage-server/src/main/java/org/folio/services/SRSKafkaTopicService.java b/mod-source-record-storage-server/src/main/java/org/folio/services/SRSKafkaTopicService.java new file mode 100644 index 000000000..e2521941c --- /dev/null +++ b/mod-source-record-storage-server/src/main/java/org/folio/services/SRSKafkaTopicService.java @@ -0,0 +1,94 @@ +package org.folio.services; + +import static org.folio.kafka.KafkaTopicNameHelper.formatTopicName; +import static org.folio.kafka.services.KafkaEnvironmentProperties.environment; + +import org.folio.kafka.services.KafkaTopic; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.PropertySource; +import org.springframework.stereotype.Service; + +@Service +@PropertySource(value = "kafka.properties") +public class SRSKafkaTopicService { + + @Value("${marcBib.partitions}") + private Integer marcBibPartitions; + + @Value("${di_parsed_records_chunk_saved.partitions}") + private Integer diParsedRecordsChunkSavedPartitions; + + @Value("${di_srs_marc_bib_instance_hrid_set.partitions}") + private Integer diSrsMarcBibInstanceHridSetPartitions; + + @Value("${di_srs_marc_bib_record_modified.partitions}") + private Integer diSrsMarcBibRecordModifiedPartitions; + + @Value("${di_srs_marc_bib_record_modified_ready_for_post_processing.partitions}") + private Integer diSrsMarcBibRecordModifiedReadyForPostProcessingPartitions; + + @Value("${di_marc_bib_record_matched.partitions}") + private Integer diMarcBibRecordMatchedPartitions; + + @Value("${di_marc_bib_record_not_matched.partitions}") + private Integer diMarcBibRecordNotMatchedPartitions; + + @Value("${di_marc_authority_record_matched.partitions}") + private Integer diMarcAuthorityRecordMatchedPartitions; + + @Value("${di_marc_authority_record_not_matched.partitions}") + private Integer diMarcAuthorityRecordNotMatchedPartitions; + + @Value("${di_marc_authority_record_deleted.partitions}") + private Integer diMarcAuthorityRecordDeletedPartitions; + + public KafkaTopic[] createTopicObjects() { + return new SRSKafkaTopic[] { + new SRSKafkaTopic("MARC_BIB", marcBibPartitions), + new SRSKafkaTopic("DI_PARSED_RECORDS_CHUNK_SAVED", diParsedRecordsChunkSavedPartitions), + new SRSKafkaTopic("DI_SRS_MARC_BIB_INSTANCE_HRID_SET", diSrsMarcBibInstanceHridSetPartitions), + new SRSKafkaTopic("DI_SRS_MARC_BIB_RECORD_MODIFIED", diSrsMarcBibRecordModifiedPartitions), + new SRSKafkaTopic("DI_SRS_MARC_BIB_RECORD_MODIFIED_READY_FOR_POST_PROCESSING", + diSrsMarcBibRecordModifiedReadyForPostProcessingPartitions), + new SRSKafkaTopic("DI_SRS_MARC_BIB_RECORD_MATCHED", diMarcBibRecordMatchedPartitions), + new SRSKafkaTopic("DI_SRS_MARC_BIB_RECORD_NOT_MATCHED", + diMarcBibRecordNotMatchedPartitions), + new SRSKafkaTopic("DI_SRS_MARC_AUTHORITY_RECORD_MATCHED", + diMarcAuthorityRecordMatchedPartitions), + new SRSKafkaTopic("DI_SRS_MARC_AUTHORITY_RECORD_NOT_MATCHED", + diMarcAuthorityRecordNotMatchedPartitions), + new SRSKafkaTopic("DI_SRS_MARC_AUTHORITY_RECORD_DELETED", + diMarcAuthorityRecordDeletedPartitions)}; + } + + public static class SRSKafkaTopic implements KafkaTopic { + + private final String topic; + private final int numPartitions; + + public SRSKafkaTopic(String topic, int numPartitions) { + this.topic = topic; + this.numPartitions = numPartitions; + } + + @Override + public String moduleName() { + return "srs"; + } + + @Override + public String topicName() { + return topic; + } + + @Override + public int numPartitions() { + return numPartitions; + } + + @Override + public String fullTopicName(String tenant) { + return formatTopicName(environment(), tenant, topicName()); + } + } +} diff --git a/mod-source-record-storage-server/src/main/java/org/folio/verticle/consumers/AbstractConsumerVerticle.java b/mod-source-record-storage-server/src/main/java/org/folio/verticle/consumers/AbstractConsumerVerticle.java index 062110016..eb2d8658e 100644 --- a/mod-source-record-storage-server/src/main/java/org/folio/verticle/consumers/AbstractConsumerVerticle.java +++ b/mod-source-record-storage-server/src/main/java/org/folio/verticle/consumers/AbstractConsumerVerticle.java @@ -19,9 +19,9 @@ import org.folio.kafka.SubscriptionDefinition; import org.folio.okapi.common.GenericCompositeFuture; -public abstract class AbstractConsumerVerticle extends AbstractVerticle { +public abstract class AbstractConsumerVerticle extends AbstractVerticle { - private final List> consumers = new ArrayList<>(); + private final List> consumers = new ArrayList<>(); private final KafkaConfig kafkaConfig; @@ -31,12 +31,20 @@ protected AbstractConsumerVerticle(KafkaConfig kafkaConfig) { @Override public void start(Promise startPromise) { - eventTypes().forEach(eventType -> { + KafkaConfig config; + if (getDeserializerClass() != null) { + config = kafkaConfig.toBuilder() + .consumerValueDeserializerClass(getDeserializerClass()) + .build(); + } else { + config = kafkaConfig; + } + eventTypes().forEach(eventType -> { SubscriptionDefinition subscriptionDefinition = getSubscriptionDefinition(eventType); - consumers.add(KafkaConsumerWrapper.builder() + consumers.add(KafkaConsumerWrapper.builder() .context(context) .vertx(vertx) - .kafkaConfig(kafkaConfig) + .kafkaConfig(config) .loadLimit(loadLimit()) .globalLoadSensor(new GlobalLoadSensor()) .subscriptionDefinition(subscriptionDefinition) @@ -64,9 +72,9 @@ protected Optional namespace() { return Optional.of(getDefaultNameSpace()); } - protected abstract AsyncRecordHandler recordHandler(); + protected abstract AsyncRecordHandler recordHandler(); - protected ProcessRecordErrorHandler processRecordErrorHandler() { + protected ProcessRecordErrorHandler processRecordErrorHandler() { return null; } @@ -89,4 +97,11 @@ private SubscriptionDefinition getSubscriptionDefinition(String eventType) { private String getConsumerName() { return constructModuleName() + "_" + getClass().getSimpleName(); } + + /** + * Set a custom deserializer class for this kafka consumer + */ + public String getDeserializerClass() { + return null; + } } diff --git a/mod-source-record-storage-server/src/main/java/org/folio/verticle/consumers/AuthorityDomainConsumersVerticle.java b/mod-source-record-storage-server/src/main/java/org/folio/verticle/consumers/AuthorityDomainConsumersVerticle.java index f4d7fb02f..d45f3f708 100644 --- a/mod-source-record-storage-server/src/main/java/org/folio/verticle/consumers/AuthorityDomainConsumersVerticle.java +++ b/mod-source-record-storage-server/src/main/java/org/folio/verticle/consumers/AuthorityDomainConsumersVerticle.java @@ -15,7 +15,7 @@ @Component @Scope(SCOPE_PROTOTYPE) -public class AuthorityDomainConsumersVerticle extends AbstractConsumerVerticle { +public class AuthorityDomainConsumersVerticle extends AbstractConsumerVerticle { private final AuthorityDomainKafkaHandler authorityDomainKafkaHandler; diff --git a/mod-source-record-storage-server/src/main/java/org/folio/verticle/consumers/AuthorityLinkChunkConsumersVerticle.java b/mod-source-record-storage-server/src/main/java/org/folio/verticle/consumers/AuthorityLinkChunkConsumersVerticle.java index 17b995301..8094b3841 100644 --- a/mod-source-record-storage-server/src/main/java/org/folio/verticle/consumers/AuthorityLinkChunkConsumersVerticle.java +++ b/mod-source-record-storage-server/src/main/java/org/folio/verticle/consumers/AuthorityLinkChunkConsumersVerticle.java @@ -15,7 +15,7 @@ @Component @Scope(SCOPE_PROTOTYPE) -public class AuthorityLinkChunkConsumersVerticle extends AbstractConsumerVerticle { +public class AuthorityLinkChunkConsumersVerticle extends AbstractConsumerVerticle { private final AuthorityLinkChunkKafkaHandler kafkaHandler; diff --git a/mod-source-record-storage-server/src/main/java/org/folio/verticle/consumers/DataImportConsumersVerticle.java b/mod-source-record-storage-server/src/main/java/org/folio/verticle/consumers/DataImportConsumersVerticle.java index f508bb4b8..cf09ac98e 100644 --- a/mod-source-record-storage-server/src/main/java/org/folio/verticle/consumers/DataImportConsumersVerticle.java +++ b/mod-source-record-storage-server/src/main/java/org/folio/verticle/consumers/DataImportConsumersVerticle.java @@ -1,5 +1,6 @@ package org.folio.verticle.consumers; +import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_INCOMING_MARC_BIB_RECORD_PARSED; import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_INVENTORY_AUTHORITY_CREATED_READY_FOR_POST_PROCESSING; import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_INVENTORY_AUTHORITY_UPDATED_READY_FOR_POST_PROCESSING; import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_INVENTORY_HOLDINGS_CREATED_READY_FOR_POST_PROCESSING; @@ -37,7 +38,7 @@ @Component @Scope(SCOPE_PROTOTYPE) -public class DataImportConsumersVerticle extends AbstractConsumerVerticle { +public class DataImportConsumersVerticle extends AbstractConsumerVerticle { private static final List EVENTS = Arrays.asList( DI_INVENTORY_AUTHORITY_CREATED_READY_FOR_POST_PROCESSING.value(), @@ -62,10 +63,11 @@ public class DataImportConsumersVerticle extends AbstractConsumerVerticle { DI_SRS_MARC_BIB_RECORD_MATCHED.value(), DI_SRS_MARC_BIB_RECORD_MODIFIED.value(), DI_SRS_MARC_BIB_RECORD_NOT_MATCHED.value(), - DI_SRS_MARC_HOLDINGS_RECORD_MATCHED.value() + DI_SRS_MARC_HOLDINGS_RECORD_MATCHED.value(), + DI_INCOMING_MARC_BIB_RECORD_PARSED.value() ); - private final AsyncRecordHandler dataImportKafkaHandler; + private final AsyncRecordHandler dataImportKafkaHandler; @Value("${srs.kafka.DataImportConsumer.loadLimit:5}") private int loadLimit; @@ -73,7 +75,7 @@ public class DataImportConsumersVerticle extends AbstractConsumerVerticle { @Autowired public DataImportConsumersVerticle(KafkaConfig kafkaConfig, @Qualifier("DataImportKafkaHandler") - AsyncRecordHandler dataImportKafkaHandler) { + AsyncRecordHandler dataImportKafkaHandler) { super(kafkaConfig); this.dataImportKafkaHandler = dataImportKafkaHandler; } @@ -84,7 +86,7 @@ protected int loadLimit() { } @Override - protected AsyncRecordHandler recordHandler() { + protected AsyncRecordHandler recordHandler() { return dataImportKafkaHandler; } @@ -93,4 +95,9 @@ protected List eventTypes() { return EVENTS; } + @Override + public String getDeserializerClass() { + return "org.apache.kafka.common.serialization.ByteArrayDeserializer"; + } + } diff --git a/mod-source-record-storage-server/src/main/java/org/folio/verticle/consumers/ParsedRecordChunkConsumersVerticle.java b/mod-source-record-storage-server/src/main/java/org/folio/verticle/consumers/ParsedRecordChunkConsumersVerticle.java index fe3f32e48..05c1dccaa 100644 --- a/mod-source-record-storage-server/src/main/java/org/folio/verticle/consumers/ParsedRecordChunkConsumersVerticle.java +++ b/mod-source-record-storage-server/src/main/java/org/folio/verticle/consumers/ParsedRecordChunkConsumersVerticle.java @@ -15,11 +15,11 @@ @Component @Scope(SCOPE_PROTOTYPE) -public class ParsedRecordChunkConsumersVerticle extends AbstractConsumerVerticle { +public class ParsedRecordChunkConsumersVerticle extends AbstractConsumerVerticle { - private final AsyncRecordHandler parsedRecordChunksKafkaHandler; + private final AsyncRecordHandler parsedRecordChunksKafkaHandler; - private final ProcessRecordErrorHandler parsedRecordChunksErrorHandler; + private final ProcessRecordErrorHandler parsedRecordChunksErrorHandler; @Value("${srs.kafka.ParsedMarcChunkConsumer.loadLimit:5}") private int loadLimit; @@ -27,16 +27,16 @@ public class ParsedRecordChunkConsumersVerticle extends AbstractConsumerVerticle @Autowired protected ParsedRecordChunkConsumersVerticle(KafkaConfig kafkaConfig, @Qualifier("parsedRecordChunksKafkaHandler") - AsyncRecordHandler parsedRecordChunksKafkaHandler, + AsyncRecordHandler parsedRecordChunksKafkaHandler, @Qualifier("parsedRecordChunksErrorHandler") - ProcessRecordErrorHandler parsedRecordChunksErrorHandler) { + ProcessRecordErrorHandler parsedRecordChunksErrorHandler) { super(kafkaConfig); this.parsedRecordChunksKafkaHandler = parsedRecordChunksKafkaHandler; this.parsedRecordChunksErrorHandler = parsedRecordChunksErrorHandler; } @Override - protected ProcessRecordErrorHandler processRecordErrorHandler() { + protected ProcessRecordErrorHandler processRecordErrorHandler() { return parsedRecordChunksErrorHandler; } @@ -46,7 +46,7 @@ protected int loadLimit() { } @Override - protected AsyncRecordHandler recordHandler() { + protected AsyncRecordHandler recordHandler() { return parsedRecordChunksKafkaHandler; } @@ -55,4 +55,9 @@ protected List eventTypes() { return List.of(DI_RAW_RECORDS_CHUNK_PARSED.value()); } + @Override + public String getDeserializerClass() { + return "org.apache.kafka.common.serialization.ByteArrayDeserializer"; + } + } diff --git a/mod-source-record-storage-server/src/main/java/org/folio/verticle/consumers/QuickMarcConsumersVerticle.java b/mod-source-record-storage-server/src/main/java/org/folio/verticle/consumers/QuickMarcConsumersVerticle.java index a4551c1c1..7e3e61764 100644 --- a/mod-source-record-storage-server/src/main/java/org/folio/verticle/consumers/QuickMarcConsumersVerticle.java +++ b/mod-source-record-storage-server/src/main/java/org/folio/verticle/consumers/QuickMarcConsumersVerticle.java @@ -14,7 +14,7 @@ @Component @Scope(SCOPE_PROTOTYPE) -public class QuickMarcConsumersVerticle extends AbstractConsumerVerticle { +public class QuickMarcConsumersVerticle extends AbstractConsumerVerticle { private final QuickMarcKafkaHandler kafkaHandler; diff --git a/mod-source-record-storage-server/src/main/resources/kafka.properties b/mod-source-record-storage-server/src/main/resources/kafka.properties new file mode 100644 index 000000000..f314ae1d6 --- /dev/null +++ b/mod-source-record-storage-server/src/main/resources/kafka.properties @@ -0,0 +1,11 @@ + +marcBib.partitions = ${MARC_BIB_PARTITIONS:1} +di_parsed_records_chunk_saved.partitions = ${DI_PARSED_RECORDS_CHUNK_SAVED_PARTITIONS:1} +di_srs_marc_bib_instance_hrid_set.partitions = ${DI_SRS_MARC_BIB_INSTANCE_HRID_SET_PARTITIONS:1} +di_srs_marc_bib_record_modified.partitions = ${DI_SRS_MARC_BIB_RECORD_MODIFIED_PARTITIONS:1} +di_srs_marc_bib_record_modified_ready_for_post_processing.partitions = ${DI_SRS_MARC_BIB_RECORD_MODIFIED_READY_FOR_POST_PROCESSING_PARTITIONS:1} +di_marc_bib_record_matched.partitions = ${DI_SRS_MARC_BIB_RECORD_MATCHED_PARTITIONS:1} +di_marc_bib_record_not_matched.partitions = ${DI_SRS_MARC_BIB_RECORD_NOT_MATCHED_PARTITIONS:1} +di_marc_authority_record_matched.partitions = ${DI_SRS_MARC_AUTHORITY_RECORD_MATCHED_PARTITIONS:1} +di_marc_authority_record_not_matched.partitions = ${DI_SRS_MARC_AUTHORITY_RECORD_NOT_MATCHED_PARTITIONS:1} +di_marc_authority_record_deleted.partitions = ${DI_SRS_MARC_AUTHORITY_RECORD_DELETED_PARTITIONS:1} diff --git a/mod-source-record-storage-server/src/main/resources/vertx-default-jul-logging.properties b/mod-source-record-storage-server/src/main/resources/vertx-default-jul-logging.properties deleted file mode 100644 index b2a56574d..000000000 --- a/mod-source-record-storage-server/src/main/resources/vertx-default-jul-logging.properties +++ /dev/null @@ -1,5 +0,0 @@ -handlers = java.util.logging.ConsoleHandler -.level = ALL - -logger.cql2pgjson.level = ERROR -logger.cql2pgjson.name = org.folio.cql2pgjson.CQL2PgJSON diff --git a/mod-source-record-storage-server/src/test/java/org/folio/rest/impl/RecordApiTest.java b/mod-source-record-storage-server/src/test/java/org/folio/rest/impl/RecordApiTest.java index 3761a3ef9..89c9021c2 100644 --- a/mod-source-record-storage-server/src/test/java/org/folio/rest/impl/RecordApiTest.java +++ b/mod-source-record-storage-server/src/test/java/org/folio/rest/impl/RecordApiTest.java @@ -21,6 +21,7 @@ import io.vertx.ext.unit.TestContext; import io.vertx.ext.unit.junit.VertxUnitRunner; import org.apache.http.HttpStatus; +import org.folio.dao.util.ParsedRecordDaoUtil; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -760,7 +761,7 @@ public void shouldReturnNotFoundOnDeleteWhenRecordDoesNotExist() { } @Test - public void shouldDeleteExistingMarcRecordOnDelete(TestContext testContext) { + public void shouldDeleteExistingMarcRecordOnDeleteByRecordId(TestContext testContext) { postSnapshots(testContext, snapshot_2); Async async = testContext.async(); @@ -783,13 +784,19 @@ public void shouldDeleteExistingMarcRecordOnDelete(TestContext testContext) { async.complete(); async = testContext.async(); - RestAssured.given() + Response deletedResponse = RestAssured.given() .spec(spec) .when() - .get(SOURCE_STORAGE_RECORDS_PATH + "/" + parsed.getId()) - .then() - .statusCode(HttpStatus.SC_OK) - .body("deleted", is(true)); + .get(SOURCE_STORAGE_RECORDS_PATH + "/" + parsed.getId()); + Assert.assertEquals(HttpStatus.SC_OK, deletedResponse.getStatusCode()); + Record deletedRecord = deletedResponse.body().as(Record.class); + + Assert.assertEquals(true, deletedRecord.getDeleted()); + Assert.assertEquals(Record.State.DELETED, deletedRecord.getState()); + Assert.assertEquals("d", deletedRecord.getLeaderRecordStatus()); + Assert.assertEquals(true, deletedRecord.getAdditionalInfo().getSuppressDiscovery()); + Assert.assertEquals("d", ParsedRecordDaoUtil.getLeaderStatus(deletedRecord.getParsedRecord())); + async.complete(); async = testContext.async(); @@ -818,10 +825,131 @@ public void shouldDeleteExistingMarcRecordOnDelete(TestContext testContext) { .get(SOURCE_STORAGE_RECORDS_PATH + "/" + errorRecord.getId()) .then() .statusCode(HttpStatus.SC_OK) - .body("deleted", is(true)); + .body("deleted", is(true)) + .body("state", is("DELETED")) + .body("additionalInfo.suppressDiscovery", is(true)); async.complete(); } + @Test + public void shouldDeleteExistingMarcRecordOnDeleteByInstanceId(TestContext testContext) { + postSnapshots(testContext, snapshot_1); + + String srsId = UUID.randomUUID().toString(); + String instanceId = UUID.randomUUID().toString(); + + ParsedRecord parsedRecord = new ParsedRecord().withId(srsId) + .withContent(new JsonObject().put("leader", "01542ccm a2200361 4500") + .put("fields", new JsonArray().add(new JsonObject().put("999", new JsonObject() + .put("subfields", new JsonArray().add(new JsonObject().put("s", srsId)).add(new JsonObject().put("i", instanceId))))))); + + Record newRecord = new Record() + .withId(srsId) + .withSnapshotId(snapshot_1.getJobExecutionId()) + .withRecordType(Record.RecordType.MARC_BIB) + .withRawRecord(rawMarcRecord) + .withParsedRecord(parsedRecord) + .withState(Record.State.ACTUAL) + .withExternalIdsHolder(new ExternalIdsHolder() + .withInstanceId(instanceId)) + .withMatchedId(UUID.randomUUID().toString()); + + Async async = testContext.async(); + Response createParsed = RestAssured.given() + .spec(spec) + .body(newRecord) + .when() + .post(SOURCE_STORAGE_RECORDS_PATH); + assertThat(createParsed.statusCode(), is(HttpStatus.SC_CREATED)); + Record parsed = createParsed.body().as(Record.class); + async.complete(); + + async = testContext.async(); + RestAssured.given() + .spec(spec) + .when() + .delete(SOURCE_STORAGE_RECORDS_PATH + "/" + instanceId + "?idType=INSTANCE") + .then() + .statusCode(HttpStatus.SC_NO_CONTENT); + async.complete(); + + async = testContext.async(); + Response deletedResponse = RestAssured.given() + .spec(spec) + .when() + .get(SOURCE_STORAGE_RECORDS_PATH + "/" + parsed.getId()); + Assert.assertEquals(HttpStatus.SC_OK, deletedResponse.getStatusCode()); + Record deletedRecord = deletedResponse.body().as(Record.class); + + Assert.assertEquals(true, deletedRecord.getDeleted()); + Assert.assertEquals(Record.State.DELETED, deletedRecord.getState()); + Assert.assertEquals("d", deletedRecord.getLeaderRecordStatus()); + Assert.assertEquals(true, deletedRecord.getAdditionalInfo().getSuppressDiscovery()); + Assert.assertEquals("d", ParsedRecordDaoUtil.getLeaderStatus(deletedRecord.getParsedRecord())); + + async.complete(); + } + + @Test + public void shouldReturnNotFoundIfTryingToDeleteRecordWithStateNotActual(TestContext testContext) { + postSnapshots(testContext, snapshot_1); + + Record newRecord1 = new Record() + .withId(UUID.randomUUID().toString()) + .withSnapshotId(snapshot_1.getJobExecutionId()) + .withRecordType(Record.RecordType.MARC_BIB) + .withRawRecord(rawMarcRecord) + .withParsedRecord(parsedMarcRecord) + .withState(Record.State.OLD) + .withMatchedId(UUID.randomUUID().toString()); + + Async async = testContext.async(); + Response createParsed = RestAssured.given() + .spec(spec) + .body(newRecord1) + .when() + .post(SOURCE_STORAGE_RECORDS_PATH); + assertThat(createParsed.statusCode(), is(HttpStatus.SC_CREATED)); + async.complete(); + + async = testContext.async(); + RestAssured.given() + .spec(spec) + .when() + .delete(SOURCE_STORAGE_RECORDS_PATH + "/" + newRecord1.getId()) + .then() + .statusCode(HttpStatus.SC_NOT_FOUND); + async.complete(); + + Record newRecord2 = new Record() + .withId(UUID.randomUUID().toString()) + .withSnapshotId(snapshot_1.getJobExecutionId()) + .withRecordType(Record.RecordType.MARC_BIB) + .withRawRecord(rawMarcRecord) + .withParsedRecord(parsedMarcRecord) + .withState(Record.State.DELETED) + .withMatchedId(UUID.randomUUID().toString()); + + async = testContext.async(); + Response createParsed2 = RestAssured.given() + .spec(spec) + .body(newRecord2) + .when() + .post(SOURCE_STORAGE_RECORDS_PATH); + assertThat(createParsed2.statusCode(), is(HttpStatus.SC_CREATED)); + async.complete(); + + async = testContext.async(); + RestAssured.given() + .spec(spec) + .when() + .delete(SOURCE_STORAGE_RECORDS_PATH + "/" + newRecord2.getId()) + .then() + .statusCode(HttpStatus.SC_NOT_FOUND); + async.complete(); + } + + @Test public void shouldDeleteExistingEdifactRecordOnDelete(TestContext testContext) { postSnapshots(testContext, snapshot_3); diff --git a/mod-source-record-storage-server/src/test/java/org/folio/rest/impl/SourceStorageStreamApiTest.java b/mod-source-record-storage-server/src/test/java/org/folio/rest/impl/SourceStorageStreamApiTest.java index cf232129b..a5fe13b10 100644 --- a/mod-source-record-storage-server/src/test/java/org/folio/rest/impl/SourceStorageStreamApiTest.java +++ b/mod-source-record-storage-server/src/test/java/org/folio/rest/impl/SourceStorageStreamApiTest.java @@ -1067,7 +1067,8 @@ public void shouldReturnIdOnSearchMarcRecordIdsWhenRecordWasDeleted(TestContext .statusCode(HttpStatus.SC_NO_CONTENT); async.complete(); MarcRecordSearchRequest searchRequest = new MarcRecordSearchRequest(); - searchRequest.setLeaderSearchExpression("p_05 = 'c' and p_06 = 'c' and p_07 = 'm'"); + searchRequest.setLeaderSearchExpression("p_05 = 'd' and p_06 = 'c' and p_07 = 'm'"); + searchRequest.setSuppressFromDiscovery(true); searchRequest.setFieldsSearchExpression("001.value = '393893' and 005.value ^= '2014110' and 035.ind1 = '#'"); searchRequest.setDeleted(true); // when diff --git a/mod-source-record-storage-server/src/test/java/org/folio/services/kafka/KafkaAdminClientServiceTest.java b/mod-source-record-storage-server/src/test/java/org/folio/services/kafka/KafkaAdminClientServiceTest.java index 9d11004b5..674dce59b 100644 --- a/mod-source-record-storage-server/src/test/java/org/folio/services/kafka/KafkaAdminClientServiceTest.java +++ b/mod-source-record-storage-server/src/test/java/org/folio/services/kafka/KafkaAdminClientServiceTest.java @@ -1,23 +1,5 @@ package org.folio.services.kafka; -import io.vertx.core.Future; -import io.vertx.core.Vertx; -import io.vertx.ext.unit.TestContext; -import io.vertx.ext.unit.junit.VertxUnitRunner; -import io.vertx.kafka.admin.KafkaAdminClient; -import io.vertx.kafka.admin.NewTopic; -import org.apache.kafka.common.errors.TopicExistsException; -import org.folio.RecordStorageKafkaTopic; -import org.folio.kafka.services.KafkaAdminClientService; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.ArgumentCaptor; - -import java.util.List; -import java.util.Set; -import java.util.stream.Collectors; - import static io.vertx.core.Future.failedFuture; import static io.vertx.core.Future.succeededFuture; import static org.hamcrest.MatcherAssert.assertThat; @@ -33,18 +15,53 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import io.vertx.core.Future; +import io.vertx.core.Vertx; +import io.vertx.ext.unit.TestContext; +import io.vertx.ext.unit.junit.VertxUnitRunner; +import io.vertx.kafka.admin.KafkaAdminClient; +import io.vertx.kafka.admin.NewTopic; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.kafka.common.errors.TopicExistsException; +import org.folio.kafka.services.KafkaAdminClientService; +import org.folio.kafka.services.KafkaTopic; +import org.folio.services.SRSKafkaTopicService; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; + @RunWith(VertxUnitRunner.class) public class KafkaAdminClientServiceTest { - private final Set allExpectedTopics = Set.of("folio.foo-tenant.srs.marc-bib"); private final String STUB_TENANT = "foo-tenant"; private KafkaAdminClient mockClient; private Vertx vertx; + @Mock + private SRSKafkaTopicService srsKafkaTopicService; @Before public void setUp() { vertx = mock(Vertx.class); mockClient = mock(KafkaAdminClient.class); + srsKafkaTopicService = mock(SRSKafkaTopicService.class); + KafkaTopic[] topicObjects = { + new SRSKafkaTopicService.SRSKafkaTopic("MARC_BIB", 10), + new SRSKafkaTopicService.SRSKafkaTopic("DI_PARSED_RECORDS_CHUNK_SAVED", 10), + new SRSKafkaTopicService.SRSKafkaTopic("DI_SRS_MARC_BIB_INSTANCE_HRID_SET", 10), + new SRSKafkaTopicService.SRSKafkaTopic("DI_SRS_MARC_BIB_RECORD_MODIFIED", 10), + new SRSKafkaTopicService.SRSKafkaTopic("DI_SRS_MARC_BIB_RECORD_MODIFIED_READY_FOR_POST_PROCESSING", 10), + new SRSKafkaTopicService.SRSKafkaTopic("DI_SRS_MARC_BIB_RECORD_MATCHED", 10), + new SRSKafkaTopicService.SRSKafkaTopic("DI_SRS_MARC_BIB_RECORD_NOT_MATCHED", 10), + new SRSKafkaTopicService.SRSKafkaTopic("DI_SRS_MARC_AUTHORITY_RECORD_MATCHED", 10), + new SRSKafkaTopicService.SRSKafkaTopic("DI_SRS_MARC_AUTHORITY_RECORD_NOT_MATCHED", 10), + new SRSKafkaTopicService.SRSKafkaTopic("DI_SRS_MARC_AUTHORITY_RECORD_DELETED", 10) + }; + + when(srsKafkaTopicService.createTopicObjects()).thenReturn(topicObjects); } @Test @@ -123,7 +140,20 @@ private Future createKafkaTopicsAsync(KafkaAdminClient client) { mocked.when(() -> KafkaAdminClient.create(eq(vertx), anyMap())).thenReturn(client); return new KafkaAdminClientService(vertx) - .createKafkaTopics(RecordStorageKafkaTopic.values(), STUB_TENANT); + .createKafkaTopics(srsKafkaTopicService.createTopicObjects(), STUB_TENANT); } } + + private final Set allExpectedTopics = Set.of( + "folio.foo-tenant.MARC_BIB", + "folio.foo-tenant.DI_PARSED_RECORDS_CHUNK_SAVED", + "folio.foo-tenant.DI_SRS_MARC_BIB_INSTANCE_HRID_SET", + "folio.foo-tenant.DI_SRS_MARC_BIB_RECORD_MODIFIED", + "folio.foo-tenant.DI_SRS_MARC_BIB_RECORD_MODIFIED_READY_FOR_POST_PROCESSING", + "folio.foo-tenant.DI_SRS_MARC_BIB_RECORD_MATCHED", + "folio.foo-tenant.DI_SRS_MARC_BIB_RECORD_NOT_MATCHED", + "folio.foo-tenant.DI_SRS_MARC_AUTHORITY_RECORD_MATCHED", + "folio.foo-tenant.DI_SRS_MARC_AUTHORITY_RECORD_NOT_MATCHED", + "folio.foo-tenant.DI_SRS_MARC_AUTHORITY_RECORD_DELETED" + ); } diff --git a/ramls/source-record-storage-records.raml b/ramls/source-record-storage-records.raml index 271c28889..1d7a99083 100644 --- a/ramls/source-record-storage-records.raml +++ b/ramls/source-record-storage-records.raml @@ -109,6 +109,12 @@ resourceTypes: application/json: type: record delete: + queryParameters: + idType: + description: Type of Id for Record lookup + type: string + example: INSTANCE + default: SRS_RECORD responses: 204: /formatted: