Skip to content

Commit

Permalink
PACA-1095: Improve RdaService logging and simplify callback handlers. (
Browse files Browse the repository at this point in the history
  • Loading branch information
brianburton authored Jun 7, 2023
1 parent e136b13 commit 7aff943
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.Builder;
import lombok.Value;
import org.slf4j.Logger;
Expand Down Expand Up @@ -56,11 +57,13 @@ public void getFissClaims(
MessageSource<FissClaimChange> generator =
messageSourceFactory.createFissMessageSource(request.getSince() + 1);
Responder<FissClaimChange> responder = createFissResponder(responseObserver, generator);
responder.sendResponses();
responder.start();
LOGGER.info("end getFissClaims call - stream running in background");
} catch (Exception ex) {
responseObserver.onError(Status.fromThrowable(ex).asException());
LOGGER.error(
"end getFissClaims call - call failed with exception: message={}", ex.getMessage(), ex);
}
LOGGER.info("end getFissClaims call");
}

/**
Expand All @@ -83,11 +86,13 @@ public void getMcsClaims(ClaimRequest request, StreamObserver<McsClaimChange> re
MessageSource<McsClaimChange> generator =
messageSourceFactory.createMcsMessageSource(request.getSince() + 1);
Responder<McsClaimChange> responder = createMcsResponder(responseObserver, generator);
responder.sendResponses();
responder.start();
LOGGER.info("end getMcsClaims call - stream running in background");
} catch (Exception ex) {
responseObserver.onError(Status.fromThrowable(ex).asException());
LOGGER.error(
"end getMcsClaims call - call failed with exception: message={}", ex.getMessage(), ex);
}
LOGGER.info("end getMcsClaims call");
}

/**
Expand All @@ -114,54 +119,88 @@ static class Responder<TChange> {
private final ServerCallStreamObserver<TChange> responseObserver;
/** The message generator. */
private final MessageSource<TChange> generator;
/** If the responder is cancelled. */
/** True if the client has cancelled the connection. */
private final AtomicBoolean cancelled;
/** If the responder if running. */
private final AtomicBoolean running;
/**
* True if we have sent all changes to the client and called {@link StreamObserver#onCompleted}.
*/
private final AtomicBoolean completed;
/** Total number of changes we have sent. */
private final AtomicInteger totalSent;

/**
* Instantiates a new Responder.
* Initializes object.
*
* @param responseObserver the response observer
* @param generator the message generator
*/
private Responder(StreamObserver<TChange> responseObserver, MessageSource<TChange> generator) {
this.generator = generator;
this.cancelled = new AtomicBoolean(false);
this.running = new AtomicBoolean(true);
this.completed = new AtomicBoolean(false);
this.totalSent = new AtomicInteger(0);
this.responseObserver = (ServerCallStreamObserver<TChange>) responseObserver;
this.responseObserver.setOnReadyHandler(this::sendResponses);
this.responseObserver.setOnCancelHandler(() -> cancelled.set(true));
}

/** Sends responses from the {@link #generator}. */
/**
* Initializes callbacks to start sending data when client is ready and receive notification if
* client cancels.
*/
void start() {
responseObserver.setOnCancelHandler(this::onCancelled);
responseObserver.setOnReadyHandler(this::onReady);
}

/** Callback from client indicating that is has cancelled the connection. */
void onCancelled() {
cancelled.set(true);
LOGGER.info("call cancelled by client: total={}", totalSent.get());
closeGenerator();
}

/**
* Callback from client indicating that it is ready to receive some records. Send as many as
* they are ready to receive or until generator runs out of data.
*/
@VisibleForTesting
void sendResponses() {
if (running.get()) {
try {
while (running.get()
&& responseObserver.isReady()
&& !responseObserver.isCancelled()
&& !cancelled.get()
&& generator.hasNext()) {
responseObserver.onNext(generator.next());
}
if (responseObserver.isCancelled() || cancelled.get()) {
running.set(false);
responseObserver.onCompleted();
generator.close();
LOGGER.info("call cancelled by client");
} else if (!generator.hasNext()) {
running.set(false);
responseObserver.onCompleted();
generator.close();
LOGGER.info("call complete");
}
} catch (Exception ex) {
running.set(false);
LOGGER.error("caught exception: {}", ex.getMessage(), ex);
responseObserver.onError(Status.fromThrowable(ex).asException());
void onReady() {
// docs indicate a race condition can trigger a redundant call
if (completed.get() || cancelled.get()) {
return;
}

int sent = 0;
int total = totalSent.get();

try {
while (responseObserver.isReady() && generator.hasNext()) {
var change = generator.next();
responseObserver.onNext(change);
sent += 1;
total = totalSent.incrementAndGet();
}
if (generator.hasNext()) {
LOGGER.debug("pausing: sent={} total={}", sent, total);
} else {
completed.set(true);
responseObserver.onCompleted();
closeGenerator();
LOGGER.info("stream complete: sent={} total={}", sent, total);
}
} catch (Exception ex) {
LOGGER.error(
"caught exception: sent={} total={} message={}", sent, total, ex.getMessage(), ex);
responseObserver.onError(Status.fromThrowable(ex).asException());
closeGenerator();
}
}

/** Closes the random claim generator so it can release any resources. */
void closeGenerator() {
try {
generator.close();
} catch (Exception ex) {
LOGGER.error("caught exception closing generator: message={}", ex.getMessage(), ex);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ void shouldSendFissResponses() {

serviceSpy.getFissClaims(mockRequest, mockFissObserver);

verify(mockFissResponder, times(1)).sendResponses();
verify(mockFissResponder, times(1)).start();
verify(mockFissObserver, times(0)).onError(any(Exception.class));
}

Expand All @@ -139,15 +139,15 @@ void shouldInvokeOnErrorWhenExceptionRaisedOnFissCall() {

RdaService serviceSpy = spy(new RdaService(mockRdaMessageSourceFactory));

doThrow(originalException).when(mockFissResponder).sendResponses();
doThrow(originalException).when(mockFissResponder).start();

doReturn(mockFissResponder)
.when(serviceSpy)
.createFissResponder(mockFissObserver, mockFissSource);

serviceSpy.getFissClaims(mockRequest, mockFissObserver);

verify(mockFissResponder, times(1)).sendResponses();
verify(mockFissResponder, times(1)).start();

ArgumentCaptor<StatusException> captor = ArgumentCaptor.forClass(StatusException.class);

Expand All @@ -169,7 +169,7 @@ void shouldSendMcsResponses() {

serviceSpy.getMcsClaims(mockRequest, mockMcsObserver);

verify(mockMcsResponder, times(1)).sendResponses();
verify(mockMcsResponder, times(1)).start();
verify(mockMcsObserver, times(0)).onError(any(Exception.class));
}

Expand All @@ -185,13 +185,13 @@ void shouldInvokeOnErrorWhenExceptionRaisedOnMcsCall() {

RdaService serviceSpy = spy(new RdaService(mockRdaMessageSourceFactory));

doThrow(originalException).when(mockMcsResponder).sendResponses();
doThrow(originalException).when(mockMcsResponder).start();

doReturn(mockMcsResponder).when(serviceSpy).createMcsResponder(mockMcsObserver, mockMcsSource);

serviceSpy.getMcsClaims(mockRequest, mockMcsObserver);

verify(mockMcsResponder, times(1)).sendResponses();
verify(mockMcsResponder, times(1)).start();

ArgumentCaptor<StatusException> captor = ArgumentCaptor.forClass(StatusException.class);

Expand Down

0 comments on commit 7aff943

Please sign in to comment.