diff --git a/NEWS.md b/NEWS.md index 288e630a7..ad47eaa5b 100644 --- a/NEWS.md +++ b/NEWS.md @@ -8,6 +8,7 @@ * [MODINV-1049](https://folio-org.atlassian.net/browse/MODINV-1049) Existing "035" field is not retained the original position in imported record * [MODSOURCE-785](https://folio-org.atlassian.net/browse/MODSOURCE-785) Update 005 field when set MARC for deletion * [MODSOURMAN-783](https://folio-org.atlassian.net/browse/MODSOURCE-783) Extend MARC-MARC search query to account for qualifiers +* [MODSOURCE-7752](https://folio-org.atlassian.net/browse/MODSOURCE-752) Emit Domain Events For Source Records ## 2024-03-20 5.8.0 * [MODSOURCE-733](https://issues.folio.org/browse/MODSOURCE-733) Reduce Memory Allocation of Strings diff --git a/README.md b/README.md index fbe73b6ad..7e4e1f224 100644 --- a/README.md +++ b/README.md @@ -141,6 +141,7 @@ After setup, it is good to check logs in all related modules for errors. Data im * DI_SRS_MARC_HOLDINGS_RECORD_MATCHED * DI_SRS_MARC_HOLDINGS_RECORD_NOT_MATCHED * DI_SRS_MARC_AUTHORITY_RECORD_UPDATED + * SRS_SOURCE_RECORDS_PARTITIONS Default value for all partitions is 1 ## Database schemas diff --git a/mod-source-record-storage-server/src/main/java/org/folio/services/SRSKafkaTopicService.java b/mod-source-record-storage-server/src/main/java/org/folio/services/SRSKafkaTopicService.java index a1354062a..ff862a031 100644 --- a/mod-source-record-storage-server/src/main/java/org/folio/services/SRSKafkaTopicService.java +++ b/mod-source-record-storage-server/src/main/java/org/folio/services/SRSKafkaTopicService.java @@ -19,6 +19,7 @@ import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_SRS_MARC_HOLDINGS_RECORD_NOT_MATCHED; import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_SRS_MARC_HOLDINGS_RECORD_UPDATED; import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_SRS_MARC_BIB_RECORD_MODIFIED_READY_FOR_POST_PROCESSING; +import static org.folio.services.domainevent.RecordDomainEventPublisher.RECORD_DOMAIN_EVENT_TOPIC; import org.folio.kafka.services.KafkaTopic; import org.springframework.beans.factory.annotation.Value; @@ -74,6 +75,9 @@ public class SRSKafkaTopicService { @Value("${di_marc_authority_record_updated.partitions}") private Integer diMarcAuthorityRecordUpdatedPartitions; + @Value("${source_records.partitions}") + private Integer sourceRecordsPartitions; + public KafkaTopic[] createTopicObjects() { return new KafkaTopic[] { MARC_BIB, @@ -91,7 +95,8 @@ public KafkaTopic[] createTopicObjects() { new SRSKafkaTopic(DI_LOG_SRS_MARC_AUTHORITY_RECORD_UPDATED.value(), diLogSrsMarcAuthorityRecordUpdatedPartitions), new SRSKafkaTopic(DI_SRS_MARC_HOLDINGS_RECORD_MATCHED.value(), diMarcHoldingsMatchedPartitions), new SRSKafkaTopic(DI_SRS_MARC_HOLDINGS_RECORD_NOT_MATCHED.value(), diMarcHoldingsNotMatchedPartitions), - new SRSKafkaTopic(DI_SRS_MARC_AUTHORITY_RECORD_UPDATED.value(), diMarcAuthorityRecordUpdatedPartitions) + new SRSKafkaTopic(DI_SRS_MARC_AUTHORITY_RECORD_UPDATED.value(), diMarcAuthorityRecordUpdatedPartitions), + new SRSKafkaTopic(RECORD_DOMAIN_EVENT_TOPIC, sourceRecordsPartitions, false) }; } @@ -99,10 +104,18 @@ public static class SRSKafkaTopic implements KafkaTopic { private final String topic; private final int numPartitions; + private final boolean includeNamespace; public SRSKafkaTopic(String topic, int numPartitions) { this.topic = topic; this.numPartitions = numPartitions; + this.includeNamespace = true; + } + + public SRSKafkaTopic(String topic, int numPartitions, boolean includeNamespace) { + this.topic = topic; + this.numPartitions = numPartitions; + this.includeNamespace = includeNamespace; } @Override @@ -122,7 +135,11 @@ public int numPartitions() { @Override public String fullTopicName(String tenant) { - return formatTopicName(environment(), getDefaultNameSpace(), tenant, topicName()); + if (includeNamespace) { + return formatTopicName(environment(), getDefaultNameSpace(), tenant, topicName()); + } else { + return formatTopicName(environment(), tenant, topicName()); + } } } } diff --git a/mod-source-record-storage-server/src/main/java/org/folio/services/domainevent/RecordDomainEventPublisher.java b/mod-source-record-storage-server/src/main/java/org/folio/services/domainevent/RecordDomainEventPublisher.java index 3303f9bff..15d5289c7 100644 --- a/mod-source-record-storage-server/src/main/java/org/folio/services/domainevent/RecordDomainEventPublisher.java +++ b/mod-source-record-storage-server/src/main/java/org/folio/services/domainevent/RecordDomainEventPublisher.java @@ -1,22 +1,19 @@ package org.folio.services.domainevent; +import static java.util.Objects.isNull; import static org.folio.okapi.common.XOkapiHeaders.TENANT; import static org.folio.okapi.common.XOkapiHeaders.TOKEN; import static org.folio.okapi.common.XOkapiHeaders.URL; import static org.folio.rest.jaxrs.model.SourceRecordDomainEvent.EventType.SOURCE_RECORD_CREATED; import static org.folio.rest.jaxrs.model.SourceRecordDomainEvent.EventType.SOURCE_RECORD_UPDATED; -import static org.folio.services.util.EventHandlingUtil.sendEventToKafka; -import io.vertx.core.Future; -import io.vertx.core.Vertx; import io.vertx.core.json.Json; import io.vertx.kafka.client.producer.KafkaHeader; -import java.util.ArrayList; import java.util.List; import java.util.Map; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.folio.kafka.KafkaConfig; +import org.folio.services.kafka.KafkaSender; import org.folio.rest.jaxrs.model.Record; import org.folio.rest.jaxrs.model.SourceRecordDomainEvent; import org.folio.rest.jaxrs.model.SourceRecordDomainEvent.EventType; @@ -27,14 +24,13 @@ @Component public class RecordDomainEventPublisher { - public static final String RECORD_DOMAIN_TOPIC = "srs.source_records"; + public static final String RECORD_DOMAIN_EVENT_TOPIC = "srs.source_records"; private static final String RECORD_TYPE = "folio.srs.recordType"; private static final Logger LOG = LogManager.getLogger(); - @Value("${ENABLE_DOMAIN_EVENTS:true}") - private boolean enableDomainEvents; - + @Value("${DOMAIN_EVENTS_ENABLED:true}") + private boolean domainEventsEnabled; @Autowired - private KafkaConfig kafkaConfig; + private KafkaSender kafkaSender; public void publishRecordCreated(Record created, Map okapiHeaders) { publishRecord(created, okapiHeaders, SOURCE_RECORD_CREATED); @@ -45,25 +41,42 @@ public void publishRecordUpdated(Record updated, Map okapiHeader } private void publishRecord(Record aRecord, Map okapiHeaders, EventType eventType) { - Vertx.vertx().executeBlocking(() -> { - try { - var kafkaHeaders = getKafkaHeaders(okapiHeaders, aRecord.getRecordType()); - var key = aRecord.getId(); - return sendEventToKafka(okapiHeaders.get(TENANT), getEvent(aRecord, eventType), - eventType.value(), kafkaHeaders, kafkaConfig, key); - } catch (Exception e) { - LOG.error("Exception during Record domain event sending", e); - return Future.failedFuture(e); - } - }); + if (!domainEventsEnabled || notValidForPublishing(aRecord)) { + return; + } + try { + var kafkaHeaders = getKafkaHeaders(okapiHeaders, aRecord.getRecordType()); + var key = aRecord.getId(); + kafkaSender.sendEventToKafka(okapiHeaders.get(TENANT), getEvent(aRecord, eventType), eventType.value(), + kafkaHeaders, key); + } catch (Exception e) { + LOG.error("Exception during Record domain event sending", e); + } + } + + private boolean notValidForPublishing(Record aRecord) { + if (isNull(aRecord.getRecordType())) { + LOG.error("Record [with id {}] contains no type information and won't be sent as domain event", aRecord.getId()); + return true; + } + if (isNull(aRecord.getRawRecord())) { + LOG.error("Record [with id {}] contains no raw record and won't be sent as domain event", aRecord.getId()); + return true; + } + if (isNull(aRecord.getRawRecord().getContent())) { + LOG.error("Record [with id {}] contains no raw record content and won't be sent as domain event", + aRecord.getId()); + return true; + } + return false; } private List getKafkaHeaders(Map okapiHeaders, Record.RecordType recordType) { - return new ArrayList<>(List.of( + return List.of( KafkaHeader.header(URL, okapiHeaders.get(URL)), KafkaHeader.header(TENANT, okapiHeaders.get(TENANT)), KafkaHeader.header(TOKEN, okapiHeaders.get(TOKEN)), - KafkaHeader.header(RECORD_TYPE, recordType.value())) + KafkaHeader.header(RECORD_TYPE, recordType.value()) ); } @@ -71,7 +84,7 @@ private String getEvent(Record eventRecord, EventType type) { var event = new SourceRecordDomainEvent() .withId(eventRecord.getId()) .withEventType(type) - .withEventPayload((String) eventRecord.getParsedRecord().getContent()); + .withEventPayload(eventRecord.getRawRecord().getContent()); return Json.encode(event); } diff --git a/mod-source-record-storage-server/src/main/java/org/folio/services/kafka/KafkaSender.java b/mod-source-record-storage-server/src/main/java/org/folio/services/kafka/KafkaSender.java new file mode 100644 index 000000000..a8700f59c --- /dev/null +++ b/mod-source-record-storage-server/src/main/java/org/folio/services/kafka/KafkaSender.java @@ -0,0 +1,21 @@ +package org.folio.services.kafka; + +import io.vertx.core.Future; +import io.vertx.kafka.client.producer.KafkaHeader; +import java.util.List; +import org.folio.kafka.KafkaConfig; +import org.folio.services.util.EventHandlingUtil; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Component +public class KafkaSender { + + @Autowired + private KafkaConfig kafkaConfig; + + public Future sendEventToKafka(String tenantId, String eventPayload, String eventType, + List kafkaHeaders, String key) { + return EventHandlingUtil.sendEventToKafka(tenantId, eventPayload, eventType, kafkaHeaders, kafkaConfig, key); + } +} diff --git a/mod-source-record-storage-server/src/main/java/org/folio/services/util/EventHandlingUtil.java b/mod-source-record-storage-server/src/main/java/org/folio/services/util/EventHandlingUtil.java index 3008bf1eb..5f74b924b 100644 --- a/mod-source-record-storage-server/src/main/java/org/folio/services/util/EventHandlingUtil.java +++ b/mod-source-record-storage-server/src/main/java/org/folio/services/util/EventHandlingUtil.java @@ -6,7 +6,7 @@ import static org.folio.okapi.common.XOkapiHeaders.TOKEN; import static org.folio.okapi.common.XOkapiHeaders.URL; import static org.folio.rest.jaxrs.model.SourceRecordDomainEvent.EventType; -import static org.folio.services.domainevent.RecordDomainEventPublisher.RECORD_DOMAIN_TOPIC; +import static org.folio.services.domainevent.RecordDomainEventPublisher.RECORD_DOMAIN_EVENT_TOPIC; import static org.folio.services.util.KafkaUtil.extractHeaderValue; import io.vertx.core.Future; @@ -108,7 +108,7 @@ public static String constructModuleName() { public static String createTopicName(String eventType, String tenantId, KafkaConfig kafkaConfig) { if (stream(EventType.values()).anyMatch(et -> et.value().equals(eventType))) { - return KafkaTopicNameHelper.formatTopicName(kafkaConfig.getEnvId(), tenantId, RECORD_DOMAIN_TOPIC); + return KafkaTopicNameHelper.formatTopicName(kafkaConfig.getEnvId(), tenantId, RECORD_DOMAIN_EVENT_TOPIC); } return KafkaTopicNameHelper.formatTopicName(kafkaConfig.getEnvId(), KafkaTopicNameHelper.getDefaultNameSpace(), tenantId, eventType); diff --git a/mod-source-record-storage-server/src/main/resources/kafka.properties b/mod-source-record-storage-server/src/main/resources/kafka.properties index 8bfed4f60..f63d82f14 100644 --- a/mod-source-record-storage-server/src/main/resources/kafka.properties +++ b/mod-source-record-storage-server/src/main/resources/kafka.properties @@ -14,3 +14,4 @@ di_logs_srs_marc_authority_record_updated.partitions = ${DI_LOG_SRS_MARC_AUTHORI di_marc_holdings_matched.partitions = ${DI_SRS_MARC_HOLDINGS_RECORD_MATCHED:1} di_marc_holdings_not_matched.partitions = ${DI_SRS_MARC_HOLDINGS_RECORD_NOT_MATCHED:1} di_marc_authority_record_updated.partitions = ${DI_SRS_MARC_AUTHORITY_RECORD_UPDATED:1} +source_records.partitions = ${SRS_SOURCE_RECORDS_PARTITIONS:1} diff --git a/mod-source-record-storage-server/src/test/java/org/folio/services/domainevent/RecordDomainEventPublisherUnitTest.java b/mod-source-record-storage-server/src/test/java/org/folio/services/domainevent/RecordDomainEventPublisherUnitTest.java new file mode 100644 index 000000000..6810ccf96 --- /dev/null +++ b/mod-source-record-storage-server/src/test/java/org/folio/services/domainevent/RecordDomainEventPublisherUnitTest.java @@ -0,0 +1,183 @@ +package org.folio.services.domainevent; + +import static org.folio.okapi.common.XOkapiHeaders.TENANT; +import static org.folio.okapi.common.XOkapiHeaders.TOKEN; +import static org.folio.okapi.common.XOkapiHeaders.URL; +import static org.folio.rest.jaxrs.model.SourceRecordDomainEvent.EventType.SOURCE_RECORD_CREATED; +import static org.folio.rest.jaxrs.model.SourceRecordDomainEvent.EventType.SOURCE_RECORD_UPDATED; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; + +import io.vertx.kafka.client.producer.KafkaHeader; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import org.folio.services.kafka.KafkaSender; +import org.folio.rest.jaxrs.model.ParsedRecord; +import org.folio.rest.jaxrs.model.Record; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; +import org.springframework.test.util.ReflectionTestUtils; + +@RunWith(MockitoJUnitRunner.class) +public class RecordDomainEventPublisherUnitTest { + + @InjectMocks + private RecordDomainEventPublisher publisher; + @Mock + private KafkaSender kafkaSender; + + @Test + public void publishRecordCreated_shouldSendNoEvents_ifDomainEventsAreNotEnabled() { + // given + ReflectionTestUtils.setField(publisher, "domainEventsEnabled", false); + var aRecord = new Record(); + var headers = Map.of(TENANT, "TENANT", URL, "OKAPI_URL", TOKEN, "TOKEN"); + + // when + publisher.publishRecordCreated(aRecord, headers); + + // then + verifyNoInteractions(kafkaSender); + } + + @Test + public void publishRecordUpdated_shouldSendNoEvents_ifDomainEventsAreNotEnabled() { + // given + ReflectionTestUtils.setField(publisher, "domainEventsEnabled", false); + var aRecord = new Record(); + var headers = Map.of(TENANT, "TENANT", URL, "OKAPI_URL", TOKEN, "TOKEN"); + + // when + publisher.publishRecordUpdated(aRecord, headers); + + // then + verifyNoInteractions(kafkaSender); + } + + @Test + public void publishRecordCreated_shouldSendNoEvents_ifRecordHasNoType() { + // given + ReflectionTestUtils.setField(publisher, "domainEventsEnabled", true); + var aRecord = new Record(); + var headers = Map.of(TENANT, "TENANT", URL, "OKAPI_URL", TOKEN, "TOKEN"); + + // when + publisher.publishRecordCreated(aRecord, headers); + + // then + verifyNoInteractions(kafkaSender); + } + + @Test + public void publishRecordUpdated_shouldSendNoEvents_ifRecordHasNoType() { + // given + ReflectionTestUtils.setField(publisher, "domainEventsEnabled", true); + var aRecord = new Record(); + var headers = Map.of(TENANT, "TENANT", URL, "OKAPI_URL", TOKEN, "TOKEN"); + + // when + publisher.publishRecordUpdated(aRecord, headers); + + // then + verifyNoInteractions(kafkaSender); + } + + @Test + public void publishRecordCreated_shouldSendNoEvents_ifRecordContainsNoParsedContent() { + // given + ReflectionTestUtils.setField(publisher, "domainEventsEnabled", true); + var aRecord = new Record().withRecordType(Record.RecordType.MARC_BIB); + var headers = Map.of(TENANT, "TENANT", URL, "OKAPI_URL", TOKEN, "TOKEN"); + + // when + publisher.publishRecordCreated(aRecord, headers); + + // then + verifyNoInteractions(kafkaSender); + } + + @Test + public void publishRecordUpdated_shouldSendNoEvents_ifRecordContainsNoParsedContent() { + // given + ReflectionTestUtils.setField(publisher, "domainEventsEnabled", true); + var aRecord = new Record().withRecordType(Record.RecordType.MARC_BIB); + var headers = Map.of(TENANT, "TENANT", URL, "OKAPI_URL", TOKEN, "TOKEN"); + + // when + publisher.publishRecordUpdated(aRecord, headers); + + // then + verifyNoInteractions(kafkaSender); + } + + @Test + public void publishRecordCreated_shouldSendEvent_ifRecordIsValid() { + // given + ReflectionTestUtils.setField(publisher, "domainEventsEnabled", true); + var parsedContent = "parsedContent"; + var aRecord = new Record() + .withId(UUID.randomUUID().toString()) + .withRecordType(Record.RecordType.MARC_BIB) + .withParsedRecord(new ParsedRecord().withContent(parsedContent)); + var tenantId = "TENANT"; + var okapiUrl = "OKAPI_URL"; + var token = "TOKEN"; + var givenHeaders = Map.of(TENANT, tenantId, URL, okapiUrl, TOKEN, token); + var expectedHeaders = getKafkaHeaders(okapiUrl, tenantId, token, aRecord); + var eventType = SOURCE_RECORD_CREATED.value(); + var expectedPayload = "{" + + "\"id\":\"" + aRecord.getId() + "\"" + + ",\"eventType\":\"" + eventType + "\"" + + ",\"eventPayload\":\"" + parsedContent + "\"" + + "}"; + + // when + publisher.publishRecordCreated(aRecord, givenHeaders); + + // then + verify(kafkaSender).sendEventToKafka(tenantId, expectedPayload, eventType, expectedHeaders, + aRecord.getId()); + } + + @Test + public void publishRecordUpdated_shouldSendEvent_ifRecordIsValid() { + // given + ReflectionTestUtils.setField(publisher, "domainEventsEnabled", true); + var parsedContent = "parsedContent"; + var aRecord = new Record() + .withId(UUID.randomUUID().toString()) + .withRecordType(Record.RecordType.MARC_BIB) + .withParsedRecord(new ParsedRecord().withContent(parsedContent)); + var tenantId = "TENANT"; + var okapiUrl = "OKAPI_URL"; + var token = "TOKEN"; + var givenHeaders = Map.of(TENANT, tenantId, URL, okapiUrl, TOKEN, token); + var expectedHeaders = getKafkaHeaders(okapiUrl, tenantId, token, aRecord); + var eventType = SOURCE_RECORD_UPDATED.value(); + var expectedPayload = "{" + + "\"id\":\"" + aRecord.getId() + "\"" + + ",\"eventType\":\"" + eventType + "\"" + + ",\"eventPayload\":\"" + parsedContent + "\"" + + "}"; + + // when + publisher.publishRecordUpdated(aRecord, givenHeaders); + + // thenÏ + verify(kafkaSender).sendEventToKafka(tenantId, expectedPayload, eventType, expectedHeaders, + aRecord.getId()); + } + + private List getKafkaHeaders(String okapiUrl, String tenantId, String token, Record aRecord) { + return List.of( + KafkaHeader.header(URL, okapiUrl), + KafkaHeader.header(TENANT, tenantId), + KafkaHeader.header(TOKEN, token), + KafkaHeader.header("folio.srs.recordType", aRecord.getRecordType().value()) + ); + } +}