diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/ResettableThrowingStreamObserver.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/ResettableThrowingStreamObserver.java index 6b1eb1274621..83320a00f22c 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/ResettableThrowingStreamObserver.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/ResettableThrowingStreamObserver.java @@ -122,12 +122,9 @@ public void onNext(T t) throws StreamClosedException, WindmillStreamShutdownExce try { delegate.onError(cancellationException); } catch (IllegalStateException onErrorException) { - // If the delegate above was already terminated via onError or onComplete from another - // thread. - logger.warn( - "StreamObserver was already cancelled {} due to error.", - onErrorException, - cancellationException); + // The delegate above was already terminated via onError or onComplete. + // Fallthrough since this is possibly due to queued onNext() calls that are being made from + // previously blocked threads. } catch (RuntimeException onErrorException) { logger.warn( "Encountered unexpected error {} when cancelling due to error.",