Skip to content

Commit

Permalink
Merge branch 'master' of github.com:folio-org/mod-source-record-stora…
Browse files Browse the repository at this point in the history
…ge into MODSOURCE-729
  • Loading branch information
RuslanLavrov committed Feb 7, 2024
2 parents cfcaeeb + 18475c6 commit 4350fdb
Show file tree
Hide file tree
Showing 25 changed files with 482 additions and 140 deletions.
2 changes: 2 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
## 2024-xx-xx 5.8.0-SNAPSHOT
* [MODSOURCE-733](https://issues.folio.org/browse/MODSOURCE-733) Reduce Memory Allocation of Strings
* [MODSOURCE-506](https://issues.folio.org/browse/MODSOURCE-506) Remove rawRecord field from source record
* [MODSOURCE-709](https://issues.folio.org/browse/MODSOURCE-709) MARC authority record is not created when use Job profile with match profile by absent subfield/field
* [MODSOURCE-677](https://issues.folio.org/browse/MODSOURCE-677) Import is completed with errors when control field that differs from 001 is used for marc-to-marc matching
* [MODSOURCE-722](https://issues.folio.org/browse/MODSOURCE-722) deleteMarcIndexersOldVersions: relation "marc_records_tracking" does not exist
* [MODSOURMAN-1106](https://issues.folio.org/browse/MODSOURMAN-1106) The status of Instance is '-' in the Import log after uploading file. The numbers of updated SRS and Instance are not displayed in the Summary table.
* [MODSOURCE-717](https://issues.folio.org/browse/MODSOURCE-717) MARC modifications not processed when placed after Holdings Update action in a job profile
* [MODSOURCE-739](https://issues.folio.org/browse/MODSOURCE-739) Create Kafka topics instead of relying on auto create in mod-srs
* [MODSOURCE-729](https://issues.folio.org/browse/MODSOURCE-729) Implement new endpoint to be used for matching

## 2023-10-13 v5.7.0
Expand Down
13 changes: 12 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,18 @@ After setup, it is good to check logs in all related modules for errors. Data im
* "_srs.kafka.AuthorityLinkChunkConsumer.loadLimit_": 2
* Relevant from the **Poppy** release, module versions from 5.7.0:
* "_srs.linking-rules-cache.expiration.time.hours_": 12

* Variables for setting number of partitions of topics:
* MARC_BIB
* DI_PARSED_RECORDS_CHUNK_SAVED
* DI_SRS_MARC_BIB_INSTANCE_HRID_SET
* DI_SRS_MARC_BIB_RECORD_MODIFIED
* DI_SRS_MARC_BIB_RECORD_MODIFIED_READY_FOR_POST_PROCESSING
* DI_SRS_MARC_BIB_RECORD_MATCHED
* DI_SRS_MARC_BIB_RECORD_NOT_MATCHED
* DI_SRS_MARC_AUTHORITY_RECORD_MATCHED
* DI_SRS_MARC_AUTHORITY_RECORD_NOT_MATCHED
* DI_SRS_MARC_AUTHORITY_RECORD_DELETED
Default value for all partitions is 1
## Database schemas

The mod-source-record-storage module uses relational approach and Liquibase to define database schemas.
Expand Down
2 changes: 1 addition & 1 deletion descriptors/ModuleDescriptor-template.json
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
},
{
"id": "source-storage-records",
"version": "3.1",
"version": "3.3",
"handlers": [
{
"methods": [
Expand Down
2 changes: 1 addition & 1 deletion mod-source-record-storage-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@
<dependency>
<groupId>org.folio</groupId>
<artifactId>folio-kafka-wrapper</artifactId>
<version>3.0.0</version>
<version>3.1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>net.mguenther.kafka</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.json.Json;
import io.vertx.core.json.jackson.DatabindCodec;
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.folio.DataImportEventPayload;
import org.folio.dataimport.util.OkapiConnectionParams;
import org.folio.dbschema.ObjectMapperTool;
import org.folio.kafka.AsyncRecordHandler;
import org.folio.processing.events.EventManager;
import org.folio.processing.exceptions.EventProcessingException;
Expand All @@ -29,7 +29,7 @@

@Component
@Qualifier("DataImportKafkaHandler")
public class DataImportKafkaHandler implements AsyncRecordHandler<String, String> {
public class DataImportKafkaHandler implements AsyncRecordHandler<String, byte[]> {

private static final Logger LOGGER = LogManager.getLogger();

Expand All @@ -48,14 +48,14 @@ public DataImportKafkaHandler(Vertx vertx, JobProfileSnapshotCache profileSnapsh
}

@Override
public Future<String> handle(KafkaConsumerRecord<String, String> targetRecord) {
public Future<String> handle(KafkaConsumerRecord<String, byte[]> targetRecord) {
LOGGER.trace("handle:: Handling kafka record: {}", targetRecord);
String recordId = extractHeaderValue(RECORD_ID_HEADER, targetRecord.headers());
String chunkId = extractHeaderValue(CHUNK_ID_HEADER, targetRecord.headers());
String userId = extractHeaderValue(USER_ID_HEADER, targetRecord.headers());
try {
Promise<String> promise = Promise.promise();
Event event = ObjectMapperTool.getMapper().readValue(targetRecord.value(), Event.class);
Event event = DatabindCodec.mapper().readValue(targetRecord.value(), Event.class);
DataImportEventPayload eventPayload = Json.decodeValue(event.getEventPayload(), DataImportEventPayload.class);
LOGGER.debug("handle:: Data import event payload has been received with event type: '{}' by jobExecutionId: '{}' and recordId: '{}' and chunkId: '{}' and userId: '{}'",
eventPayload.getEventType(), eventPayload.getJobExecutionId(), recordId, chunkId, userId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.json.Json;
import io.vertx.core.json.jackson.DatabindCodec;
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
import io.vertx.kafka.client.producer.KafkaHeader;
import io.vertx.kafka.client.producer.KafkaProducer;
Expand Down Expand Up @@ -37,7 +38,7 @@
import static org.folio.services.util.KafkaUtil.extractHeaderValue;

@Component
public class ParsedRecordChunksKafkaHandler implements AsyncRecordHandler<String, String> {
public class ParsedRecordChunksKafkaHandler implements AsyncRecordHandler<String, byte[]> {
private static final Logger LOGGER = LogManager.getLogger();

public static final String JOB_EXECUTION_ID_HEADER = "jobExecutionId";
Expand Down Expand Up @@ -65,7 +66,7 @@ public ParsedRecordChunksKafkaHandler(@Autowired RecordService recordService,
}

@Override
public Future<String> handle(KafkaConsumerRecord<String, String> targetRecord) {
public Future<String> handle(KafkaConsumerRecord<String, byte[]> targetRecord) {
LOGGER.trace("handle:: Handling kafka record: {}", targetRecord);
String jobExecutionId = extractHeaderValue(JOB_EXECUTION_ID_HEADER, targetRecord.headers());
String chunkId = extractHeaderValue(CHUNK_ID_HEADER, targetRecord.headers());
Expand All @@ -74,7 +75,7 @@ public Future<String> handle(KafkaConsumerRecord<String, String> targetRecord) {
String key = targetRecord.key();

try {
Event event = Json.decodeValue(targetRecord.value(), Event.class);
Event event = DatabindCodec.mapper().readValue(targetRecord.value(), Event.class);
RecordCollection recordCollection = Json.decodeValue(event.getEventPayload(), RecordCollection.class);

List<KafkaHeader> kafkaHeaders = targetRecord.headers();
Expand All @@ -93,7 +94,7 @@ public Future<String> handle(KafkaConsumerRecord<String, String> targetRecord) {
}
}

private Future<String> sendBackRecordsBatchResponse(RecordsBatchResponse recordsBatchResponse, List<KafkaHeader> kafkaHeaders, String tenantId, int chunkNumber, String eventType, KafkaConsumerRecord<String, String> commonRecord) {
private Future<String> sendBackRecordsBatchResponse(RecordsBatchResponse recordsBatchResponse, List<KafkaHeader> kafkaHeaders, String tenantId, int chunkNumber, String eventType, KafkaConsumerRecord<String, byte[]> commonRecord) {
Event event;
event = new Event()
.withId(UUID.randomUUID().toString())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ public enum IdType {
AUTHORITY("authorityId"),
EXTERNAL("externalId"),
// NOTE: not really external id but is default from dto
RECORD("matchedId");
RECORD("matchedId"),
SRS_RECORD("id");

private final String idField;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.json.Json;
import io.vertx.core.json.jackson.DatabindCodec;
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
import io.vertx.kafka.client.producer.KafkaHeader;
import io.vertx.kafka.client.producer.impl.KafkaHeaderImpl;
Expand All @@ -23,6 +24,7 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand All @@ -38,7 +40,7 @@
* with status 'Completed with errors' with showing error messge instead of hanging progress bar.
*/
@Component
public class ParsedRecordChunksErrorHandler implements ProcessRecordErrorHandler<String, String> {
public class ParsedRecordChunksErrorHandler implements ProcessRecordErrorHandler<String, byte[]> {

private static final Logger LOGGER = LogManager.getLogger();

Expand All @@ -53,12 +55,18 @@ public class ParsedRecordChunksErrorHandler implements ProcessRecordErrorHandler
private Vertx vertx;

@Override
public void handle(Throwable throwable, KafkaConsumerRecord<String, String> record) {
LOGGER.trace("handle:: Handling record {}", record);
Event event = Json.decodeValue(record.value(), Event.class);
RecordCollection recordCollection = Json.decodeValue(event.getEventPayload(), RecordCollection.class);
public void handle(Throwable throwable, KafkaConsumerRecord<String, byte[]> consumerRecord) {
LOGGER.trace("handle:: Handling record {}", consumerRecord);
Event event;
try {
event = DatabindCodec.mapper().readValue(consumerRecord.value(), Event.class);
} catch (IOException e) {
LOGGER.error("Something happened when deserializing record", e);
return;
}
RecordCollection recordCollection = Json.decodeValue(event.getEventPayload(), RecordCollection.class);

List<KafkaHeader> kafkaHeaders = record.headers();
List<KafkaHeader> kafkaHeaders = consumerRecord.headers();
OkapiConnectionParams okapiConnectionParams = new OkapiConnectionParams(KafkaHeaderUtils.kafkaHeadersToMap(kafkaHeaders), vertx);

String jobExecutionId = okapiConnectionParams.getHeaders().get(JOB_EXECUTION_ID_HEADER);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
package org.folio.rest.impl;

import static org.apache.commons.lang3.StringUtils.EMPTY;
import static org.folio.rest.tools.utils.TenantTool.tenantId;

import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import java.util.Date;
import java.util.Map;
import javax.ws.rs.core.Response;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.folio.RecordStorageKafkaTopic;
Expand All @@ -17,17 +23,11 @@
import org.folio.rest.jaxrs.model.Snapshot.Status;
import org.folio.rest.jaxrs.model.TenantAttributes;
import org.folio.rest.tools.utils.TenantTool;
import org.folio.services.SRSKafkaTopicService;
import org.folio.services.SnapshotService;
import org.folio.spring.SpringContextUtil;
import org.springframework.beans.factory.annotation.Autowired;

import javax.ws.rs.core.Response;
import java.util.Date;
import java.util.Map;

import static org.apache.commons.lang3.StringUtils.EMPTY;
import static org.folio.rest.tools.utils.TenantTool.tenantId;

public class ModTenantAPI extends TenantAPI {

private static final Logger LOGGER = LogManager.getLogger();
Expand All @@ -42,6 +42,9 @@ public class ModTenantAPI extends TenantAPI {
@Autowired
private SnapshotService snapshotService;

@Autowired
private SRSKafkaTopicService srsKafkaTopicService;

private final String tenantId;

public ModTenantAPI(Vertx vertx, String tenantId) { //NOSONAR
Expand All @@ -55,14 +58,13 @@ Future<Integer> loadData(TenantAttributes attributes, String tenantId,
Map<String, String> headers, Context context) {
// create topics before loading data
Vertx vertx = context.owner();
return new KafkaAdminClientService(vertx)
.createKafkaTopics(RecordStorageKafkaTopic.values(), tenantId)
.compose(x -> super.loadData(attributes, tenantId, headers, context))

return super.loadData(attributes, tenantId, headers, context)
.compose(num -> {
LiquibaseUtil.initializeSchemaForTenant(vertx, tenantId);
return setLoadSampleParameter(attributes, context)
.compose(v -> createStubSnapshot(attributes)).map(num);
});
LiquibaseUtil.initializeSchemaForTenant(vertx, tenantId);
return setLoadSampleParameter(attributes, context)
.compose(v -> createStubSnapshot(attributes)).map(num);
});
}

@Validate
Expand All @@ -73,7 +75,16 @@ public void postTenant(TenantAttributes tenantAttributes, Map<String, String> he
Future<Void> result = tenantAttributes.getPurge() != null && tenantAttributes.getPurge()
? new KafkaAdminClientService(context.owner()).deleteKafkaTopics(RecordStorageKafkaTopic.values(), tenantId(headers))
: Future.succeededFuture();
result.onComplete(x -> super.postTenant(tenantAttributes, headers, handler, context));
result.onComplete(x -> super.postTenant(tenantAttributes, headers, ar -> {
if (ar.succeeded()) {
Vertx vertx = context.owner();
var kafkaAdminClientService = new KafkaAdminClientService(vertx);
kafkaAdminClientService.createKafkaTopics(srsKafkaTopicService.createTopicObjects(), tenantId);
handler.handle(Future.succeededFuture(ar.result()));
} else {
handler.handle(Future.failedFuture(ar.cause()));
}
}, context));
}

private Future<Void> setLoadSampleParameter(TenantAttributes attributes, Context context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

import org.folio.dataimport.util.ExceptionHelper;
import org.folio.rest.jaxrs.model.Record;
import org.folio.rest.jaxrs.model.Record.State;
import org.folio.rest.jaxrs.model.RecordMatchingDto;
import org.folio.rest.jaxrs.resource.SourceStorageRecords;
import org.folio.rest.tools.utils.TenantTool;
Expand Down Expand Up @@ -116,14 +115,11 @@ public void putSourceStorageRecordsGenerationById(String matchedId, Record entit
}

@Override
public void deleteSourceStorageRecordsById(String id, Map<String, String> okapiHeaders,
public void deleteSourceStorageRecordsById(String id, String idType, Map<String, String> okapiHeaders,
Handler<AsyncResult<Response>> asyncResultHandler, Context vertxContext) {
vertxContext.runOnContext(v -> {
try {
recordService.getRecordById(id, tenantId)
.map(recordOptional -> recordOptional.orElseThrow(() -> new NotFoundException(format(NOT_FOUND_MESSAGE, Record.class.getSimpleName(), id))))
.compose(record -> record.getState().equals(State.DELETED) ? Future.succeededFuture(true)
: recordService.updateRecord(record.withState(State.DELETED), tenantId).map(r -> true))
recordService.deleteRecordById(id, toExternalIdType(idType), tenantId).map(r -> true)
.map(updated -> DeleteSourceStorageRecordsByIdResponse.respond204()).map(Response.class::cast)
.otherwise(ExceptionHelper::mapExceptionToResponse).onComplete(asyncResultHandler);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,4 +267,5 @@ public interface RecordService {
*/
Future<Void> updateRecordsState(String matchedId, RecordState state, RecordType recordType, String tenantId);

Future<Void> deleteRecordById(String id, IdType idType, String tenantId);
}
Loading

0 comments on commit 4350fdb

Please sign in to comment.