diff --git a/yorkie/src/main/kotlin/dev/yorkie/core/Client.kt b/yorkie/src/main/kotlin/dev/yorkie/core/Client.kt index c729bffca..44c30b95b 100644 --- a/yorkie/src/main/kotlin/dev/yorkie/core/Client.kt +++ b/yorkie/src/main/kotlin/dev/yorkie/core/Client.kt @@ -35,10 +35,12 @@ import dev.yorkie.document.time.ActorID import dev.yorkie.util.YorkieLogger import dev.yorkie.util.createSingleThreadDispatcher import java.io.Closeable +import java.io.InterruptedIOException import java.util.UUID import kotlin.coroutines.coroutineContext import kotlin.time.Duration import kotlin.time.Duration.Companion.milliseconds +import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CoroutineDispatcher import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.CoroutineStart @@ -303,19 +305,12 @@ public class Client @VisibleForTesting internal constructor( handleWatchDocumentsResponse(attachment.document.key, response) retry = 0 }.onFailure { - _streamConnectionStatus.emit(StreamConnectionStatus.Disconnected) retry++ if (retry > 3 || it is ClosedReceiveChannelException) { + _streamConnectionStatus.emit(StreamConnectionStatus.Disconnected) stream.safeClose() } - val errorMessage = if (it is ConnectException) { - it.toString() - } else { - it.message.orEmpty() - } - if (errorMessage.isNotEmpty()) { - YorkieLogger.e("watchStream", errorMessage) - } + sendWatchStreamException(it) ensureActive() delay(options.reconnectStreamDelay.inWholeMilliseconds) } @@ -338,6 +333,31 @@ public class Client @VisibleForTesting internal constructor( } } + private fun sendWatchStreamException(t: Throwable) { + val tag = "Client.WatchStream" + when (t) { + is CancellationException -> { + return + } + + is ConnectException -> { + if (t.cause is InterruptedIOException) { + YorkieLogger.d(tag, t.toString()) + } else { + YorkieLogger.e(tag, t.toString()) + } + } + + is ClosedReceiveChannelException -> { + YorkieLogger.d(tag, t.message ?: "stream closed") + } + + else -> { + YorkieLogger.e(tag, t.message.orEmpty()) + } + } + } + private suspend fun ServerOnlyStreamInterface<*, *>?.safeClose() { if (this == null || isReceiveClosed()) { return diff --git a/yorkie/src/main/kotlin/dev/yorkie/util/YorkieLogger.kt b/yorkie/src/main/kotlin/dev/yorkie/util/YorkieLogger.kt index 3bd0860ca..db7d1cdaf 100644 --- a/yorkie/src/main/kotlin/dev/yorkie/util/YorkieLogger.kt +++ b/yorkie/src/main/kotlin/dev/yorkie/util/YorkieLogger.kt @@ -6,10 +6,16 @@ public object YorkieLogger { public var logger: Logger? = null fun d(tag: String, message: String) { + if (message.isBlank()) { + return + } logger?.d("$TAG_PREFIX$tag", message) } fun e(tag: String, message: String) { + if (message.isBlank()) { + return + } logger?.e("$TAG_PREFIX$tag", message) } }