From cc493fe139a9fcc4052c0f480e0ec286186da9cc Mon Sep 17 00:00:00 2001 From: Austin Schey Date: Mon, 13 Jan 2025 16:55:35 -0800 Subject: [PATCH 1/5] add call to get sequence number range --- .../cms/bfd/pipeline/rda/grpc/RdaSink.java | 8 ++++ .../rda/grpc/server/EmptyMessageSource.java | 6 +++ .../grpc/server/ExceptionMessageSource.java | 6 +++ .../rda/grpc/server/JsonMessageSource.java | 7 ++++ .../rda/grpc/server/MessageSource.java | 9 +++++ .../grpc/server/RandomFissClaimSource.java | 6 +++ .../rda/grpc/server/RandomMcsClaimSource.java | 6 +++ .../pipeline/rda/grpc/server/RdaService.java | 38 +++++++++++++++++++ .../server/S3BucketMessageSourceFactory.java | 6 +++ .../sink/concurrent/ConcurrentRdaSink.java | 6 +++ .../sink/direct/AbstractClaimRdaSink.java | 26 ++++++++++++- .../grpc/source/FissClaimStreamCaller.java | 11 ++++++ .../rda/grpc/source/GrpcStreamCaller.java | 11 ++++++ .../rda/grpc/source/McsClaimStreamCaller.java | 11 ++++++ .../grpc/source/StandardGrpcRdaSource.java | 4 ++ .../bfd/pipeline/rda/grpc/RdaSinkTest.java | 4 ++ .../S3BucketMessageSourceFactoryTest.java | 6 +++ .../grpc/sink/concurrent/TestDatabase.java | 4 ++ .../sink/direct/FissClaimRdaSinkTest.java | 1 + .../grpc/sink/direct/McsClaimRdaSinkTest.java | 1 + .../grpc/source/StandardGrpcRdaSourceIT.java | 36 ++++++++++++++++++ 21 files changed, 212 insertions(+), 1 deletion(-) diff --git a/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/main/java/gov/cms/bfd/pipeline/rda/grpc/RdaSink.java b/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/main/java/gov/cms/bfd/pipeline/rda/grpc/RdaSink.java index 7973fbaf70..f6ae35f525 100644 --- a/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/main/java/gov/cms/bfd/pipeline/rda/grpc/RdaSink.java +++ b/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/main/java/gov/cms/bfd/pipeline/rda/grpc/RdaSink.java @@ -2,6 +2,7 @@ import gov.cms.bfd.model.rda.MessageError; import gov.cms.model.dsl.codegen.library.DataTransformer; +import gov.cms.mpsm.rda.v1.ClaimSequenceNumberRange; import jakarta.annotation.Nonnull; import java.io.IOException; import java.time.Duration; @@ -191,6 +192,13 @@ Optional transformMessage(String apiVersion, TMessage message) */ int writeClaims(Collection objects) throws ProcessingException; + /** + * test. + * + * @param sequenceNumberRange test + */ + void updateSequenceNumberRange(ClaimSequenceNumberRange sequenceNumberRange); + /** * Return count of records processed since the most recent call to a write method or this method. * Calls to this method collect the current value and resets the counter. The sum of this method's diff --git a/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/main/java/gov/cms/bfd/pipeline/rda/grpc/server/EmptyMessageSource.java b/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/main/java/gov/cms/bfd/pipeline/rda/grpc/server/EmptyMessageSource.java index 83a4fe6998..c69c3fb53c 100644 --- a/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/main/java/gov/cms/bfd/pipeline/rda/grpc/server/EmptyMessageSource.java +++ b/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/main/java/gov/cms/bfd/pipeline/rda/grpc/server/EmptyMessageSource.java @@ -1,5 +1,6 @@ package gov.cms.bfd.pipeline.rda.grpc.server; +import gov.cms.mpsm.rda.v1.ClaimSequenceNumberRange; import java.util.NoSuchElementException; /** @@ -45,6 +46,11 @@ public T next() throws Exception { throw new NoSuchElementException(); } + @Override + public ClaimSequenceNumberRange getSequenceNumberRange() { + return ClaimSequenceNumberRange.newBuilder().setLower(0).setUpper(0).build(); + } + @Override public void close() throws Exception {} } diff --git a/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/main/java/gov/cms/bfd/pipeline/rda/grpc/server/ExceptionMessageSource.java b/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/main/java/gov/cms/bfd/pipeline/rda/grpc/server/ExceptionMessageSource.java index 14ce5d02ed..9b016d2d0b 100644 --- a/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/main/java/gov/cms/bfd/pipeline/rda/grpc/server/ExceptionMessageSource.java +++ b/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/main/java/gov/cms/bfd/pipeline/rda/grpc/server/ExceptionMessageSource.java @@ -1,5 +1,6 @@ package gov.cms.bfd.pipeline.rda.grpc.server; +import gov.cms.mpsm.rda.v1.ClaimSequenceNumberRange; import java.util.function.Supplier; /** @@ -62,6 +63,11 @@ public T next() throws Exception { return source.next(); } + @Override + public ClaimSequenceNumberRange getSequenceNumberRange() { + return ClaimSequenceNumberRange.newBuilder().setLower(0).setUpper(remainingBeforeThrow).build(); + } + @Override public void close() throws Exception { source.close(); diff --git a/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/main/java/gov/cms/bfd/pipeline/rda/grpc/server/JsonMessageSource.java b/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/main/java/gov/cms/bfd/pipeline/rda/grpc/server/JsonMessageSource.java index bb4ca9b4e0..d6f512c5ac 100644 --- a/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/main/java/gov/cms/bfd/pipeline/rda/grpc/server/JsonMessageSource.java +++ b/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/main/java/gov/cms/bfd/pipeline/rda/grpc/server/JsonMessageSource.java @@ -3,6 +3,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.io.CharSource; import com.google.protobuf.util.JsonFormat; +import gov.cms.mpsm.rda.v1.ClaimSequenceNumberRange; import gov.cms.mpsm.rda.v1.FissClaimChange; import gov.cms.mpsm.rda.v1.McsClaimChange; import java.io.BufferedReader; @@ -167,6 +168,12 @@ public T next() throws Exception { return answer; } + @Override + public ClaimSequenceNumberRange getSequenceNumberRange() { + // Can't easily support this since it would require deserializing the entire contents + return ClaimSequenceNumberRange.newBuilder().setLower(0).setUpper(0).build(); + } + @Override public void close() throws Exception { reader.close(); diff --git a/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/main/java/gov/cms/bfd/pipeline/rda/grpc/server/MessageSource.java b/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/main/java/gov/cms/bfd/pipeline/rda/grpc/server/MessageSource.java index 10d0f9c130..e03e6a50d2 100644 --- a/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/main/java/gov/cms/bfd/pipeline/rda/grpc/server/MessageSource.java +++ b/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/main/java/gov/cms/bfd/pipeline/rda/grpc/server/MessageSource.java @@ -1,5 +1,7 @@ package gov.cms.bfd.pipeline.rda.grpc.server; +import gov.cms.mpsm.rda.v1.ClaimSequenceNumberRange; + /** * Interface for objects that produce message objects from some source (e.g. a file, an array, a * database, etc). Mirrors the Iterator protocol but allows for unwrapped exceptions to be passed @@ -36,4 +38,11 @@ public interface MessageSource extends AutoCloseable { * @throws Exception if there is an issue getting the next claim */ MessageSource skipTo(long startingSequenceNumber) throws Exception; + + /** + * test. + * + * @return test + */ + ClaimSequenceNumberRange getSequenceNumberRange(); } diff --git a/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/main/java/gov/cms/bfd/pipeline/rda/grpc/server/RandomFissClaimSource.java b/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/main/java/gov/cms/bfd/pipeline/rda/grpc/server/RandomFissClaimSource.java index cee03e4a37..9a7f7c46cf 100644 --- a/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/main/java/gov/cms/bfd/pipeline/rda/grpc/server/RandomFissClaimSource.java +++ b/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/main/java/gov/cms/bfd/pipeline/rda/grpc/server/RandomFissClaimSource.java @@ -2,6 +2,7 @@ import com.google.protobuf.Timestamp; import gov.cms.mpsm.rda.v1.ChangeType; +import gov.cms.mpsm.rda.v1.ClaimSequenceNumberRange; import gov.cms.mpsm.rda.v1.FissClaimChange; import gov.cms.mpsm.rda.v1.RecordSource; import java.time.Clock; @@ -89,6 +90,11 @@ public FissClaimChange next() { return change; } + @Override + public ClaimSequenceNumberRange getSequenceNumberRange() { + return ClaimSequenceNumberRange.newBuilder().setLower(0).setUpper(maxToSend).build(); + } + @Override public void close() {} } diff --git a/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/main/java/gov/cms/bfd/pipeline/rda/grpc/server/RandomMcsClaimSource.java b/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/main/java/gov/cms/bfd/pipeline/rda/grpc/server/RandomMcsClaimSource.java index 75150a7362..e73f2be6a3 100644 --- a/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/main/java/gov/cms/bfd/pipeline/rda/grpc/server/RandomMcsClaimSource.java +++ b/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/main/java/gov/cms/bfd/pipeline/rda/grpc/server/RandomMcsClaimSource.java @@ -2,6 +2,7 @@ import com.google.protobuf.Timestamp; import gov.cms.mpsm.rda.v1.ChangeType; +import gov.cms.mpsm.rda.v1.ClaimSequenceNumberRange; import gov.cms.mpsm.rda.v1.McsClaimChange; import gov.cms.mpsm.rda.v1.RecordSource; import java.time.Clock; @@ -87,6 +88,11 @@ public McsClaimChange next() { return change; } + @Override + public ClaimSequenceNumberRange getSequenceNumberRange() { + return ClaimSequenceNumberRange.newBuilder().setLower(0).setUpper(maxToSend).build(); + } + @Override public void close() {} } diff --git a/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/main/java/gov/cms/bfd/pipeline/rda/grpc/server/RdaService.java b/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/main/java/gov/cms/bfd/pipeline/rda/grpc/server/RdaService.java index 682de35a65..705730c991 100644 --- a/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/main/java/gov/cms/bfd/pipeline/rda/grpc/server/RdaService.java +++ b/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/main/java/gov/cms/bfd/pipeline/rda/grpc/server/RdaService.java @@ -50,6 +50,44 @@ public void getVersion(Empty request, StreamObserver responseObserve } } + @Override + public void getFissClaimsSequenceNumberRange( + com.google.protobuf.Empty request, + StreamObserver responseObserver) { + LOGGER.info("start getFissClaimsSequenceNumberRange"); + try { + responseObserver.onNext( + messageSourceFactory.createFissMessageSource(0).getSequenceNumberRange()); + responseObserver.onCompleted(); + LOGGER.info("end getFissClaimsSequenceNumberRange"); + } catch (Exception ex) { + responseObserver.onError(Status.fromThrowable(ex).asException()); + LOGGER.error( + "end getFissClaimsSequenceNumberRange call - call failed with exception: message={}", + ex.getMessage(), + ex); + } + } + + @Override + public void getMcsClaimsSequenceNumberRange( + com.google.protobuf.Empty request, + StreamObserver responseObserver) { + LOGGER.info("start getMcsClaimsSequenceNumberRange"); + try { + responseObserver.onNext( + messageSourceFactory.createFissMessageSource(0).getSequenceNumberRange()); + responseObserver.onCompleted(); + LOGGER.info("end getMcsClaimsSequenceNumberRange"); + } catch (Exception ex) { + responseObserver.onError(Status.fromThrowable(ex).asException()); + LOGGER.error( + "end getMcsClaimsSequenceNumberRange call - call failed with exception: message={}", + ex.getMessage(), + ex); + } + } + @Override public void getFissClaims( ClaimRequest request, StreamObserver responseObserver) { diff --git a/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/main/java/gov/cms/bfd/pipeline/rda/grpc/server/S3BucketMessageSourceFactory.java b/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/main/java/gov/cms/bfd/pipeline/rda/grpc/server/S3BucketMessageSourceFactory.java index 911f63f0ac..14e004134f 100644 --- a/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/main/java/gov/cms/bfd/pipeline/rda/grpc/server/S3BucketMessageSourceFactory.java +++ b/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/main/java/gov/cms/bfd/pipeline/rda/grpc/server/S3BucketMessageSourceFactory.java @@ -3,6 +3,7 @@ import com.google.common.annotations.VisibleForTesting; import gov.cms.bfd.pipeline.rda.grpc.RdaChange; import gov.cms.bfd.pipeline.sharedutils.s3.S3DirectoryDao; +import gov.cms.mpsm.rda.v1.ClaimSequenceNumberRange; import java.util.ArrayList; import java.util.List; import java.util.function.Function; @@ -186,6 +187,11 @@ public synchronized T next() throws Exception { return current.next(); } + @Override + public ClaimSequenceNumberRange getSequenceNumberRange() { + return current.getSequenceNumberRange(); + } + @Override public synchronized void close() throws Exception { current.close(); diff --git a/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/main/java/gov/cms/bfd/pipeline/rda/grpc/sink/concurrent/ConcurrentRdaSink.java b/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/main/java/gov/cms/bfd/pipeline/rda/grpc/sink/concurrent/ConcurrentRdaSink.java index 49c53e42f8..79f01d4fe5 100644 --- a/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/main/java/gov/cms/bfd/pipeline/rda/grpc/sink/concurrent/ConcurrentRdaSink.java +++ b/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/main/java/gov/cms/bfd/pipeline/rda/grpc/sink/concurrent/ConcurrentRdaSink.java @@ -5,6 +5,7 @@ import gov.cms.bfd.pipeline.sharedutils.MultiCloser; import gov.cms.bfd.pipeline.sharedutils.SequenceNumberTracker; import gov.cms.model.dsl.codegen.library.DataTransformer; +import gov.cms.mpsm.rda.v1.ClaimSequenceNumberRange; import jakarta.annotation.Nonnull; import java.io.Closeable; import java.io.IOException; @@ -272,6 +273,11 @@ public Optional readMaxExistingSequenceNumber() throws ProcessingException return sink.readMaxExistingSequenceNumber(); } + @Override + public void updateSequenceNumberRange(ClaimSequenceNumberRange sequenceNumberRange) { + sink.updateSequenceNumberRange(sequenceNumberRange); + } + /** * This method is not implemented since that would bypass the queue used to schedule writes. * diff --git a/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/main/java/gov/cms/bfd/pipeline/rda/grpc/sink/direct/AbstractClaimRdaSink.java b/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/main/java/gov/cms/bfd/pipeline/rda/grpc/sink/direct/AbstractClaimRdaSink.java index fcd9c0a820..775849437b 100644 --- a/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/main/java/gov/cms/bfd/pipeline/rda/grpc/sink/direct/AbstractClaimRdaSink.java +++ b/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/main/java/gov/cms/bfd/pipeline/rda/grpc/sink/direct/AbstractClaimRdaSink.java @@ -13,6 +13,7 @@ import gov.cms.bfd.pipeline.sharedutils.PipelineApplicationState; import gov.cms.bfd.pipeline.sharedutils.TransactionManager; import gov.cms.model.dsl.codegen.library.DataTransformer; +import gov.cms.mpsm.rda.v1.ClaimSequenceNumberRange; import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.DistributionSummary; import io.micrometer.core.instrument.MeterRegistry; @@ -238,6 +239,11 @@ public int writeClaims(Collection> claims) throws ProcessingEx return claims.size(); } + @Override + public void updateSequenceNumberRange(ClaimSequenceNumberRange sequenceNumberRange) { + metrics.setMaxSequenceNumber(sequenceNumberRange.getUpper()); + } + /** * Always returns zero since all claims are written synchronously by writeMessages. * @@ -525,7 +531,7 @@ static class Metrics { /** Tracks the number of updates per database transaction. */ private final DistributionSummary dbBatchSize; - /** Latest sequnce number from writing a batch. * */ + /** Latest sequence number from writing a batch. * */ private final AtomicLong latestSequenceNumber; /** The value returned by the latestSequenceNumber gauge. * */ @@ -534,6 +540,12 @@ static class Metrics { /** The number of insert statements executed. */ private final DistributionSummary insertCount; + /** test. */ + private final AtomicLong maxSequenceNumber; + + /** test. */ + private final AtomicLong maxSequenceNumberValue; + /** * Initializes all the metrics. * @@ -560,6 +572,9 @@ private Metrics(Class klass, MeterRegistry appMetrics) { latestSequenceNumber = GAUGES.getGaugeForName(appMetrics, latestSequenceNumberGaugeName); latestSequenceNumberValue = GAUGES.getValueForName(latestSequenceNumberGaugeName); insertCount = appMetrics.summary(MetricRegistry.name(base, "insertCount")); + String maxSequenceNumberGaugeName = MetricRegistry.name(base, "maxSeq"); + maxSequenceNumber = GAUGES.getGaugeForName(appMetrics, maxSequenceNumberGaugeName); + maxSequenceNumberValue = GAUGES.getValueForName(maxSequenceNumberGaugeName); } /** @@ -571,5 +586,14 @@ private Metrics(Class klass, MeterRegistry appMetrics) { void setLatestSequenceNumber(long value) { latestSequenceNumberValue.set(value); } + + /** + * test. + * + * @param value test + */ + void setMaxSequenceNumber(long value) { + maxSequenceNumberValue.set(value); + } } } diff --git a/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/main/java/gov/cms/bfd/pipeline/rda/grpc/source/FissClaimStreamCaller.java b/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/main/java/gov/cms/bfd/pipeline/rda/grpc/source/FissClaimStreamCaller.java index 081f1635ed..e6c2b5d084 100644 --- a/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/main/java/gov/cms/bfd/pipeline/rda/grpc/source/FissClaimStreamCaller.java +++ b/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/main/java/gov/cms/bfd/pipeline/rda/grpc/source/FissClaimStreamCaller.java @@ -1,7 +1,9 @@ package gov.cms.bfd.pipeline.rda.grpc.source; import com.google.common.base.Preconditions; +import com.google.protobuf.Empty; import gov.cms.mpsm.rda.v1.ClaimRequest; +import gov.cms.mpsm.rda.v1.ClaimSequenceNumberRange; import gov.cms.mpsm.rda.v1.FissClaimChange; import gov.cms.mpsm.rda.v1.RDAServiceGrpc; import io.grpc.CallOptions; @@ -45,4 +47,13 @@ public GrpcResponseStream callService( ClientCalls.blockingServerStreamingCall(call, request); return new GrpcResponseStream<>(call, apiResults); } + + @Override + public ClaimSequenceNumberRange callSequenceNumberRangeService( + ManagedChannel channel, CallOptions callOptions) { + final MethodDescriptor method = + RDAServiceGrpc.getGetFissClaimsSequenceNumberRangeMethod(); + final ClientCall call = channel.newCall(method, callOptions); + return ClientCalls.blockingUnaryCall(call, Empty.getDefaultInstance()); + } } diff --git a/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/main/java/gov/cms/bfd/pipeline/rda/grpc/source/GrpcStreamCaller.java b/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/main/java/gov/cms/bfd/pipeline/rda/grpc/source/GrpcStreamCaller.java index ab20f7211b..d11314556e 100644 --- a/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/main/java/gov/cms/bfd/pipeline/rda/grpc/source/GrpcStreamCaller.java +++ b/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/main/java/gov/cms/bfd/pipeline/rda/grpc/source/GrpcStreamCaller.java @@ -5,6 +5,7 @@ import com.google.protobuf.Empty; import gov.cms.bfd.pipeline.rda.grpc.RdaServerJob; import gov.cms.mpsm.rda.v1.ApiVersion; +import gov.cms.mpsm.rda.v1.ClaimSequenceNumberRange; import gov.cms.mpsm.rda.v1.RDAServiceGrpc; import io.grpc.CallOptions; import io.grpc.ClientCall; @@ -62,6 +63,16 @@ public abstract GrpcResponseStream callService( ManagedChannel channel, CallOptions callOptions, long startingSequenceNumber) throws Exception; + /** + * test. + * + * @param channel test + * @param callOptions test + * @return test + */ + public abstract ClaimSequenceNumberRange callSequenceNumberRangeService( + ManagedChannel channel, CallOptions callOptions); + /** * Make a call to the server's {@code getVersion()} service and return the version component. Will * retry several times if the call fails. Retries allow the job to handle with a race condition diff --git a/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/main/java/gov/cms/bfd/pipeline/rda/grpc/source/McsClaimStreamCaller.java b/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/main/java/gov/cms/bfd/pipeline/rda/grpc/source/McsClaimStreamCaller.java index 79aa2e949a..35fc2336e5 100644 --- a/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/main/java/gov/cms/bfd/pipeline/rda/grpc/source/McsClaimStreamCaller.java +++ b/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/main/java/gov/cms/bfd/pipeline/rda/grpc/source/McsClaimStreamCaller.java @@ -1,7 +1,9 @@ package gov.cms.bfd.pipeline.rda.grpc.source; import com.google.common.base.Preconditions; +import com.google.protobuf.Empty; import gov.cms.mpsm.rda.v1.ClaimRequest; +import gov.cms.mpsm.rda.v1.ClaimSequenceNumberRange; import gov.cms.mpsm.rda.v1.McsClaimChange; import gov.cms.mpsm.rda.v1.RDAServiceGrpc; import io.grpc.CallOptions; @@ -44,4 +46,13 @@ public GrpcResponseStream callService( ClientCalls.blockingServerStreamingCall(call, request); return new GrpcResponseStream<>(call, apiResults); } + + @Override + public ClaimSequenceNumberRange callSequenceNumberRangeService( + ManagedChannel channel, CallOptions callOptions) { + final MethodDescriptor method = + RDAServiceGrpc.getGetMcsClaimsSequenceNumberRangeMethod(); + final ClientCall call = channel.newCall(method, callOptions); + return ClientCalls.blockingUnaryCall(call, Empty.getDefaultInstance()); + } } diff --git a/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/main/java/gov/cms/bfd/pipeline/rda/grpc/source/StandardGrpcRdaSource.java b/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/main/java/gov/cms/bfd/pipeline/rda/grpc/source/StandardGrpcRdaSource.java index b045153c50..8f7977d2c5 100644 --- a/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/main/java/gov/cms/bfd/pipeline/rda/grpc/source/StandardGrpcRdaSource.java +++ b/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/main/java/gov/cms/bfd/pipeline/rda/grpc/source/StandardGrpcRdaSource.java @@ -8,6 +8,7 @@ import gov.cms.bfd.pipeline.rda.grpc.RdaSink; import gov.cms.bfd.pipeline.rda.grpc.source.GrpcResponseStream.DroppedConnectionException; import gov.cms.bfd.pipeline.sharedutils.MultiCloser; +import gov.cms.mpsm.rda.v1.ClaimSequenceNumberRange; import io.grpc.CallOptions; import io.grpc.ManagedChannel; import io.micrometer.core.instrument.MeterRegistry; @@ -224,6 +225,9 @@ public int retrieveAndProcessObjects(int maxPerBatch, RdaSink } else if (sink.isValidMessage(result)) { batch.put(sink.getClaimIdForMessage(result), result); if (batch.size() >= maxPerBatch) { + ClaimSequenceNumberRange sequenceNumberRange = + caller.callSequenceNumberRangeService(channel, callOptionsFactory.get()); + sink.updateSequenceNumberRange(sequenceNumberRange); processResult.addCount(submitBatchToSink(apiVersion, sink, batch)); } lastProcessedTime = clock.millis(); diff --git a/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/test/java/gov/cms/bfd/pipeline/rda/grpc/RdaSinkTest.java b/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/test/java/gov/cms/bfd/pipeline/rda/grpc/RdaSinkTest.java index 17c02d5128..f1bbac8821 100644 --- a/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/test/java/gov/cms/bfd/pipeline/rda/grpc/RdaSinkTest.java +++ b/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/test/java/gov/cms/bfd/pipeline/rda/grpc/RdaSinkTest.java @@ -3,6 +3,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.fail; +import gov.cms.mpsm.rda.v1.ClaimSequenceNumberRange; import jakarta.annotation.Nonnull; import java.io.IOException; import java.time.Duration; @@ -86,6 +87,9 @@ public long getSequenceNumberForObject(Integer object) { return object; } + @Override + public void updateSequenceNumberRange(ClaimSequenceNumberRange sequenceNumberRange) {} + /** {@inheritDoc} */ @Override public void close() throws Exception {} diff --git a/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/test/java/gov/cms/bfd/pipeline/rda/grpc/server/S3BucketMessageSourceFactoryTest.java b/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/test/java/gov/cms/bfd/pipeline/rda/grpc/server/S3BucketMessageSourceFactoryTest.java index d985a11dae..cb2c0521e2 100644 --- a/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/test/java/gov/cms/bfd/pipeline/rda/grpc/server/S3BucketMessageSourceFactoryTest.java +++ b/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/test/java/gov/cms/bfd/pipeline/rda/grpc/server/S3BucketMessageSourceFactoryTest.java @@ -6,6 +6,7 @@ import static org.mockito.Mockito.doReturn; import gov.cms.bfd.pipeline.sharedutils.s3.S3DirectoryDao; +import gov.cms.mpsm.rda.v1.ClaimSequenceNumberRange; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -214,6 +215,11 @@ public Long next() throws Exception { return currentValue++; } + @Override + public ClaimSequenceNumberRange getSequenceNumberRange() { + return ClaimSequenceNumberRange.newBuilder().setLower(0).setUpper(maxValue).build(); + } + @Override public void close() throws Exception { closed = true; diff --git a/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/test/java/gov/cms/bfd/pipeline/rda/grpc/sink/concurrent/TestDatabase.java b/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/test/java/gov/cms/bfd/pipeline/rda/grpc/sink/concurrent/TestDatabase.java index 723dd1b1e8..2830c5be5e 100644 --- a/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/test/java/gov/cms/bfd/pipeline/rda/grpc/sink/concurrent/TestDatabase.java +++ b/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/test/java/gov/cms/bfd/pipeline/rda/grpc/sink/concurrent/TestDatabase.java @@ -6,6 +6,7 @@ import gov.cms.bfd.pipeline.rda.grpc.ProcessingException; import gov.cms.bfd.pipeline.rda.grpc.RdaSink; import gov.cms.model.dsl.codegen.library.DataTransformer; +import gov.cms.mpsm.rda.v1.ClaimSequenceNumberRange; import jakarta.annotation.Nonnull; import java.io.IOException; import java.time.Duration; @@ -193,6 +194,9 @@ public void updateLastSequenceNumber(long lastSequenceNumber) { setLastSequenceNumber(lastSequenceNumber); } + @Override + public void updateSequenceNumberRange(ClaimSequenceNumberRange sequenceNumberRange) {} + /** {@inheritDoc} */ @Override public void writeError( diff --git a/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/test/java/gov/cms/bfd/pipeline/rda/grpc/sink/direct/FissClaimRdaSinkTest.java b/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/test/java/gov/cms/bfd/pipeline/rda/grpc/sink/direct/FissClaimRdaSinkTest.java index 1711e01cf8..45ecccf3aa 100644 --- a/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/test/java/gov/cms/bfd/pipeline/rda/grpc/sink/direct/FissClaimRdaSinkTest.java +++ b/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/test/java/gov/cms/bfd/pipeline/rda/grpc/sink/direct/FissClaimRdaSinkTest.java @@ -122,6 +122,7 @@ public void metricNames() { "FissClaimRdaSink.failures", "FissClaimRdaSink.insertCount", "FissClaimRdaSink.lastSeq", + "FissClaimRdaSink.maxSeq", "FissClaimRdaSink.successes", "FissClaimRdaSink.transform.failures", "FissClaimRdaSink.transform.successes", diff --git a/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/test/java/gov/cms/bfd/pipeline/rda/grpc/sink/direct/McsClaimRdaSinkTest.java b/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/test/java/gov/cms/bfd/pipeline/rda/grpc/sink/direct/McsClaimRdaSinkTest.java index a5a288e3d6..8d46f7f570 100644 --- a/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/test/java/gov/cms/bfd/pipeline/rda/grpc/sink/direct/McsClaimRdaSinkTest.java +++ b/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/test/java/gov/cms/bfd/pipeline/rda/grpc/sink/direct/McsClaimRdaSinkTest.java @@ -125,6 +125,7 @@ public void metricNames() { "McsClaimRdaSink.failures", "McsClaimRdaSink.insertCount", "McsClaimRdaSink.lastSeq", + "McsClaimRdaSink.maxSeq", "McsClaimRdaSink.successes", "McsClaimRdaSink.transform.failures", "McsClaimRdaSink.transform.successes", diff --git a/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/test/java/gov/cms/bfd/pipeline/rda/grpc/source/StandardGrpcRdaSourceIT.java b/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/test/java/gov/cms/bfd/pipeline/rda/grpc/source/StandardGrpcRdaSourceIT.java index 8631f26928..66dd3860ad 100644 --- a/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/test/java/gov/cms/bfd/pipeline/rda/grpc/source/StandardGrpcRdaSourceIT.java +++ b/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/test/java/gov/cms/bfd/pipeline/rda/grpc/source/StandardGrpcRdaSourceIT.java @@ -20,6 +20,7 @@ import gov.cms.bfd.pipeline.rda.grpc.server.RdaService; import gov.cms.bfd.pipeline.rda.grpc.sink.direct.MbiCache; import gov.cms.bfd.pipeline.sharedutils.IdHasher; +import gov.cms.mpsm.rda.v1.ClaimSequenceNumberRange; import gov.cms.mpsm.rda.v1.FissClaimChange; import io.grpc.Status; import io.grpc.StatusRuntimeException; @@ -333,6 +334,33 @@ public void grpcCallWithCorrectAuthToken() throws Exception { }); } + /** + * Verifies that a GRPC call updates the current sequence number range. + * + * @throws Exception indicates test failure + */ + @Test + public void grpcCallUpdatesSequenceNumber() throws Exception { + createServerConfig() + .authorizedToken("secret") + .build() + .runWithPortParam( + port -> { + int count; + RdaSourceConfig config = + createSourceConfig(port).authenticationToken("secret").build(); + try (StandardGrpcRdaSource> source = + createSource(config)) { + count = source.retrieveAndProcessObjects(1, sink); + } + assertEquals(2, count); + assertEquals(2, sink.getValues().size()); + assertEquals(EXPECTED_CLAIM_1, sink.getValues().get(0)); + assertEquals(EXPECTED_CLAIM_2, sink.getValues().get(1)); + assertEquals(2, sink.updateSequenceNumberCallCount); + }); + } + /** Verifies that a GRPC call with an incompatible RDA version will throw an exception. */ @Test public void grpcCallWithIncompatibleRdaVersion() { @@ -477,6 +505,9 @@ private class JsonCaptureSink implements RdaSink Date: Fri, 24 Jan 2025 15:02:12 -0800 Subject: [PATCH 2/5] limit update interval for sequence number range --- .../pipeline/rda/grpc/server/RdaService.java | 10 +++---- .../grpc/source/StandardGrpcRdaSource.java | 27 ++++++++++++++++--- .../grpc/source/StandardGrpcRdaSourceIT.java | 3 ++- 3 files changed, 30 insertions(+), 10 deletions(-) diff --git a/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/main/java/gov/cms/bfd/pipeline/rda/grpc/server/RdaService.java b/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/main/java/gov/cms/bfd/pipeline/rda/grpc/server/RdaService.java index 705730c991..951035912a 100644 --- a/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/main/java/gov/cms/bfd/pipeline/rda/grpc/server/RdaService.java +++ b/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/main/java/gov/cms/bfd/pipeline/rda/grpc/server/RdaService.java @@ -55,9 +55,8 @@ public void getFissClaimsSequenceNumberRange( com.google.protobuf.Empty request, StreamObserver responseObserver) { LOGGER.info("start getFissClaimsSequenceNumberRange"); - try { - responseObserver.onNext( - messageSourceFactory.createFissMessageSource(0).getSequenceNumberRange()); + try (MessageSource source = messageSourceFactory.createFissMessageSource(0)) { + responseObserver.onNext(source.getSequenceNumberRange()); responseObserver.onCompleted(); LOGGER.info("end getFissClaimsSequenceNumberRange"); } catch (Exception ex) { @@ -74,9 +73,8 @@ public void getMcsClaimsSequenceNumberRange( com.google.protobuf.Empty request, StreamObserver responseObserver) { LOGGER.info("start getMcsClaimsSequenceNumberRange"); - try { - responseObserver.onNext( - messageSourceFactory.createFissMessageSource(0).getSequenceNumberRange()); + try (MessageSource source = messageSourceFactory.createMcsMessageSource(0)) { + responseObserver.onNext(source.getSequenceNumberRange()); responseObserver.onCompleted(); LOGGER.info("end getMcsClaimsSequenceNumberRange"); } catch (Exception ex) { diff --git a/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/main/java/gov/cms/bfd/pipeline/rda/grpc/source/StandardGrpcRdaSource.java b/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/main/java/gov/cms/bfd/pipeline/rda/grpc/source/StandardGrpcRdaSource.java index 8f7977d2c5..6013982ad2 100644 --- a/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/main/java/gov/cms/bfd/pipeline/rda/grpc/source/StandardGrpcRdaSource.java +++ b/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/main/java/gov/cms/bfd/pipeline/rda/grpc/source/StandardGrpcRdaSource.java @@ -14,6 +14,7 @@ import io.micrometer.core.instrument.MeterRegistry; import java.time.Clock; import java.time.Duration; +import java.time.Instant; import java.util.LinkedHashMap; import java.util.Map; import java.util.Optional; @@ -51,6 +52,12 @@ public class StandardGrpcRdaSource /** The type of RDA API server to connect to. */ private final RdaSourceConfig.ServerType serverType; + /** The last sequence number range update timestamp. */ + private Instant lastSequenceNumberRangeUpdate = Instant.MIN; + + /** The interval to update the sequence number range. */ + private static final Long SEQUENCE_NUMBER_RANGE_UPDATE_MILLIS = 5000L; + /** * The primary constructor for this class. Constructs a GrpcRdaSource and opens a channel to the * gRPC service. @@ -225,9 +232,7 @@ public int retrieveAndProcessObjects(int maxPerBatch, RdaSink } else if (sink.isValidMessage(result)) { batch.put(sink.getClaimIdForMessage(result), result); if (batch.size() >= maxPerBatch) { - ClaimSequenceNumberRange sequenceNumberRange = - caller.callSequenceNumberRangeService(channel, callOptionsFactory.get()); - sink.updateSequenceNumberRange(sequenceNumberRange); + updateSequenceNumberRange(sink); processResult.addCount(submitBatchToSink(apiVersion, sink, batch)); } lastProcessedTime = clock.millis(); @@ -287,6 +292,22 @@ public int retrieveAndProcessObjects(int maxPerBatch, RdaSink }); } + /** + * Updates the available sequence number range. + * + * @param sink RDA sink + */ + private void updateSequenceNumberRange(RdaSink sink) { + Instant now = Instant.now(); + if (now.minusMillis(SEQUENCE_NUMBER_RANGE_UPDATE_MILLIS) + .isAfter(lastSequenceNumberRangeUpdate)) { + ClaimSequenceNumberRange sequenceNumberRange = + caller.callSequenceNumberRangeService(channel, callOptionsFactory.get()); + sink.updateSequenceNumberRange(sequenceNumberRange); + this.lastSequenceNumberRangeUpdate = now; + } + } + /** * The RDA API server drops open connections abruptly when it has no data to transmit for some * period of time. These closures are not clean at the protocol level, so they appear as errors to diff --git a/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/test/java/gov/cms/bfd/pipeline/rda/grpc/source/StandardGrpcRdaSourceIT.java b/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/test/java/gov/cms/bfd/pipeline/rda/grpc/source/StandardGrpcRdaSourceIT.java index 66dd3860ad..bb462ea328 100644 --- a/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/test/java/gov/cms/bfd/pipeline/rda/grpc/source/StandardGrpcRdaSourceIT.java +++ b/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/test/java/gov/cms/bfd/pipeline/rda/grpc/source/StandardGrpcRdaSourceIT.java @@ -357,7 +357,8 @@ public void grpcCallUpdatesSequenceNumber() throws Exception { assertEquals(2, sink.getValues().size()); assertEquals(EXPECTED_CLAIM_1, sink.getValues().get(0)); assertEquals(EXPECTED_CLAIM_2, sink.getValues().get(1)); - assertEquals(2, sink.updateSequenceNumberCallCount); + // Should only be called once per 5 seconds + assertEquals(1, sink.updateSequenceNumberCallCount); }); } From 280f3204d889d8bd31e770a5aca87f75b37db080 Mon Sep 17 00:00:00 2001 From: Austin Schey Date: Tue, 28 Jan 2025 11:08:08 -0800 Subject: [PATCH 3/5] add sequence number metrics to allowed metric names --- .../main/java/gov/cms/bfd/pipeline/app/AppConfiguration.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/apps/bfd-pipeline/bfd-pipeline-app/src/main/java/gov/cms/bfd/pipeline/app/AppConfiguration.java b/apps/bfd-pipeline/bfd-pipeline-app/src/main/java/gov/cms/bfd/pipeline/app/AppConfiguration.java index 919404f418..74042d5b5e 100644 --- a/apps/bfd-pipeline/bfd-pipeline-app/src/main/java/gov/cms/bfd/pipeline/app/AppConfiguration.java +++ b/apps/bfd-pipeline/bfd-pipeline-app/src/main/java/gov/cms/bfd/pipeline/app/AppConfiguration.java @@ -377,6 +377,10 @@ public final class AppConfiguration extends BaseAppConfiguration { Set.of( "FissClaimRdaSink.change.latency.millis", "McsClaimRdaSink.change.latency.millis", + "FissClaimRdaSink.lastSeq", + "McsClaimRdaSink.lastSeq", + "FissClaimRdaSink.maxSeq", + "McsClaimRdaSink.maxSeq", CcwRifLoadJob.Metrics.DATASET_PROCESSING_ACTIVE_TIMER_NAME, CcwRifLoadJob.Metrics.DATASET_PROCESSING_TOTAL_TIMER_NAME, CcwRifLoadJob.Metrics.MANIFEST_PROCESSING_ACTIVE_TIMER_NAME, From 901c2259cc81bb4c5e475ce1adffb7c0cc267a1e Mon Sep 17 00:00:00 2001 From: Austin Schey Date: Tue, 28 Jan 2025 14:25:45 -0800 Subject: [PATCH 4/5] add micrometer settings for ephemeral envs --- ops/terraform/services/base/ephemeral.tf | 9 +++++---- ops/terraform/services/base/values/ephemeral.yaml | 2 ++ 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/ops/terraform/services/base/ephemeral.tf b/ops/terraform/services/base/ephemeral.tf index 7577420854..8cab5bdea3 100644 --- a/ops/terraform/services/base/ephemeral.tf +++ b/ops/terraform/services/base/ephemeral.tf @@ -22,10 +22,11 @@ locals { # Targeted PIPELINE hierarchy paths to be "copied" from the seed environment into requested ephemeral environment pipeline_seed_paths = local.is_ephemeral_env ? { - "/bfd/${local.env}/pipeline/sensitive/db/password" = "/bfd/${local.seed_env}/pipeline/sensitive/db/password" - "/bfd/${local.env}/pipeline/sensitive/db/username" = "/bfd/${local.seed_env}/pipeline/sensitive/db/username" - "/bfd/${local.env}/pipeline/sensitive/hicn_hash/iterations" = "/bfd/${local.seed_env}/pipeline/sensitive/hicn_hash/iterations" - "/bfd/${local.env}/pipeline/sensitive/hicn_hash/pepper" = "/bfd/${local.seed_env}/pipeline/sensitive/hicn_hash/pepper" + "/bfd/${local.env}/pipeline/sensitive/db/password" = "/bfd/${local.seed_env}/pipeline/sensitive/db/password" + "/bfd/${local.env}/pipeline/sensitive/db/username" = "/bfd/${local.seed_env}/pipeline/sensitive/db/username" + "/bfd/${local.env}/pipeline/sensitive/hicn_hash/iterations" = "/bfd/${local.seed_env}/pipeline/sensitive/hicn_hash/iterations" + "/bfd/${local.env}/pipeline/sensitive/hicn_hash/pepper" = "/bfd/${local.seed_env}/pipeline/sensitive/hicn_hash/pepper" + "/bfd/${local.env}/pipeline/nonsensitive/micrometer_cw/interval" = "/bfd/${local.seed_env}/pipeline/nonsensitive/micrometer_cw/interval" # The prod-sbx environment includes an in-process server instead of a communicating with an external, gRPC host "/bfd/${local.env}/pipeline/sensitive/rda/grpc/auth_token" = local.seed_env == "prod-sbx" ? "" : "/bfd/${local.seed_env}/pipeline/sensitive/rda/grpc/auth_token" "/bfd/${local.env}/pipeline/sensitive/rda/grpc/port" = local.seed_env == "prod-sbx" ? "" : "/bfd/${local.seed_env}/pipeline/sensitive/rda/grpc/port" diff --git a/ops/terraform/services/base/values/ephemeral.yaml b/ops/terraform/services/base/values/ephemeral.yaml index a7a2927fdd..e4725fdebe 100644 --- a/ops/terraform/services/base/values/ephemeral.yaml +++ b/ops/terraform/services/base/values/ephemeral.yaml @@ -22,6 +22,8 @@ /bfd/${env}/pipeline/nonsensitive/ccw/job/enabled: false /bfd/${env}/pipeline/nonsensitive/ccw/idempotency_enabled: true /bfd/${env}/pipeline/nonsensitive/ccw/instance_type: c6a.4xlarge +/bfd/${env}/pipeline/nonsensitive/micrometer_cw/enabled: "false" +/bfd/${env}/pipeline/nonsensitive/micrometer_cw/namespace: "bfd-${env}/bfd-pipeline" /bfd/${env}/pipeline/nonsensitive/ccw/rif_thread_multiple_claims: "25" /bfd/${env}/pipeline/nonsensitive/ccw/slis_repeater_lambda_invoke_rate: 15 minutes /bfd/${env}/pipeline/nonsensitive/ccw/slo/weekend_data_availability/verifier/enabled: false From 6654b3e427f2ac9bb11dd33701332766737e7169 Mon Sep 17 00:00:00 2001 From: Austin Schey Date: Wed, 29 Jan 2025 09:17:30 -0800 Subject: [PATCH 5/5] docs --- .../main/java/gov/cms/bfd/pipeline/rda/grpc/RdaSink.java | 4 ++-- .../cms/bfd/pipeline/rda/grpc/server/MessageSource.java | 4 ++-- .../rda/grpc/sink/direct/AbstractClaimRdaSink.java | 8 ++++---- .../bfd/pipeline/rda/grpc/source/GrpcStreamCaller.java | 8 ++++---- 4 files changed, 12 insertions(+), 12 deletions(-) diff --git a/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/main/java/gov/cms/bfd/pipeline/rda/grpc/RdaSink.java b/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/main/java/gov/cms/bfd/pipeline/rda/grpc/RdaSink.java index f6ae35f525..e5e4a44da2 100644 --- a/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/main/java/gov/cms/bfd/pipeline/rda/grpc/RdaSink.java +++ b/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/main/java/gov/cms/bfd/pipeline/rda/grpc/RdaSink.java @@ -193,9 +193,9 @@ Optional transformMessage(String apiVersion, TMessage message) int writeClaims(Collection objects) throws ProcessingException; /** - * test. + * Updates the available range of sequence numbers. * - * @param sequenceNumberRange test + * @param sequenceNumberRange Current range of sequence numbers. */ void updateSequenceNumberRange(ClaimSequenceNumberRange sequenceNumberRange); diff --git a/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/main/java/gov/cms/bfd/pipeline/rda/grpc/server/MessageSource.java b/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/main/java/gov/cms/bfd/pipeline/rda/grpc/server/MessageSource.java index e03e6a50d2..335ec045c4 100644 --- a/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/main/java/gov/cms/bfd/pipeline/rda/grpc/server/MessageSource.java +++ b/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/main/java/gov/cms/bfd/pipeline/rda/grpc/server/MessageSource.java @@ -40,9 +40,9 @@ public interface MessageSource extends AutoCloseable { MessageSource skipTo(long startingSequenceNumber) throws Exception; /** - * test. + * Returns the current range of sequence numbers. * - * @return test + * @return sequence number range */ ClaimSequenceNumberRange getSequenceNumberRange(); } diff --git a/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/main/java/gov/cms/bfd/pipeline/rda/grpc/sink/direct/AbstractClaimRdaSink.java b/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/main/java/gov/cms/bfd/pipeline/rda/grpc/sink/direct/AbstractClaimRdaSink.java index 775849437b..9ea2dae12c 100644 --- a/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/main/java/gov/cms/bfd/pipeline/rda/grpc/sink/direct/AbstractClaimRdaSink.java +++ b/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/main/java/gov/cms/bfd/pipeline/rda/grpc/sink/direct/AbstractClaimRdaSink.java @@ -540,10 +540,10 @@ static class Metrics { /** The number of insert statements executed. */ private final DistributionSummary insertCount; - /** test. */ + /** Maximum available sequence number. */ private final AtomicLong maxSequenceNumber; - /** test. */ + /** The value returned by the maxSequenceNumber gauge. */ private final AtomicLong maxSequenceNumberValue; /** @@ -588,9 +588,9 @@ void setLatestSequenceNumber(long value) { } /** - * test. + * Sets the {@link #maxSequenceNumber}. * - * @param value test + * @param value value to set */ void setMaxSequenceNumber(long value) { maxSequenceNumberValue.set(value); diff --git a/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/main/java/gov/cms/bfd/pipeline/rda/grpc/source/GrpcStreamCaller.java b/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/main/java/gov/cms/bfd/pipeline/rda/grpc/source/GrpcStreamCaller.java index d11314556e..3e7d34ff9e 100644 --- a/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/main/java/gov/cms/bfd/pipeline/rda/grpc/source/GrpcStreamCaller.java +++ b/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/main/java/gov/cms/bfd/pipeline/rda/grpc/source/GrpcStreamCaller.java @@ -64,11 +64,11 @@ public abstract GrpcResponseStream callService( throws Exception; /** - * test. + * Calls the service to get the sequence number range. * - * @param channel test - * @param callOptions test - * @return test + * @param channel an already open channel to the service being called + * @param callOptions the CallOptions object to use for the API call + * @return sequence number range */ public abstract ClaimSequenceNumberRange callSequenceNumberRangeService( ManagedChannel channel, CallOptions callOptions);