Skip to content

Commit

Permalink
better logging for watch stream failure
Browse files Browse the repository at this point in the history
  • Loading branch information
skhugh committed Apr 12, 2024
1 parent 25ec2c6 commit 2b76b2c
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 9 deletions.
38 changes: 29 additions & 9 deletions yorkie/src/main/kotlin/dev/yorkie/core/Client.kt
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,12 @@ import dev.yorkie.document.time.ActorID
import dev.yorkie.util.YorkieLogger
import dev.yorkie.util.createSingleThreadDispatcher
import java.io.Closeable
import java.io.InterruptedIOException
import java.util.UUID
import kotlin.coroutines.coroutineContext
import kotlin.time.Duration
import kotlin.time.Duration.Companion.milliseconds
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.CoroutineStart
Expand Down Expand Up @@ -303,19 +305,12 @@ public class Client @VisibleForTesting internal constructor(
handleWatchDocumentsResponse(attachment.document.key, response)
retry = 0
}.onFailure {
_streamConnectionStatus.emit(StreamConnectionStatus.Disconnected)
retry++
if (retry > 3 || it is ClosedReceiveChannelException) {
_streamConnectionStatus.emit(StreamConnectionStatus.Disconnected)
stream.safeClose()
}
val errorMessage = if (it is ConnectException) {
it.toString()
} else {
it.message.orEmpty()
}
if (errorMessage.isNotEmpty()) {
YorkieLogger.e("watchStream", errorMessage)
}
sendWatchStreamException(it)
ensureActive()
delay(options.reconnectStreamDelay.inWholeMilliseconds)
}
Expand All @@ -338,6 +333,31 @@ public class Client @VisibleForTesting internal constructor(
}
}

private fun sendWatchStreamException(t: Throwable) {
val tag = "Client.WatchStream"
when (t) {
is CancellationException -> {
return
}

is ConnectException -> {
if (t.cause is InterruptedIOException) {
YorkieLogger.d(tag, t.toString())
} else {
YorkieLogger.e(tag, t.toString())
}
}

is ClosedReceiveChannelException -> {
YorkieLogger.d(tag, t.message ?: "stream closed")
}

else -> {
YorkieLogger.e(tag, t.message.orEmpty())
}
}
}

private suspend fun ServerOnlyStreamInterface<*, *>?.safeClose() {
if (this == null || isReceiveClosed()) {
return
Expand Down
6 changes: 6 additions & 0 deletions yorkie/src/main/kotlin/dev/yorkie/util/YorkieLogger.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,16 @@ public object YorkieLogger {
public var logger: Logger? = null

fun d(tag: String, message: String) {
if (message.isBlank()) {
return
}
logger?.d("$TAG_PREFIX$tag", message)
}

fun e(tag: String, message: String) {
if (message.isBlank()) {
return
}
logger?.e("$TAG_PREFIX$tag", message)
}
}
Expand Down

0 comments on commit 2b76b2c

Please sign in to comment.