From 45203f61a2f76dffd4c4c264980e30d989b9a997 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E1=84=80=E1=85=B5=E1=86=B7=E1=84=92=E1=85=AD=E1=84=8B?= =?UTF-8?q?=E1=85=AE=5BSE=5D=5BSmartEditor=5D?= Date: Fri, 14 Jun 2024 07:58:41 +0900 Subject: [PATCH 1/2] changes timeouts to more reasonable values --- yorkie/src/main/kotlin/dev/yorkie/core/Client.kt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/yorkie/src/main/kotlin/dev/yorkie/core/Client.kt b/yorkie/src/main/kotlin/dev/yorkie/core/Client.kt index 1e3cf4fd..11afdc0a 100644 --- a/yorkie/src/main/kotlin/dev/yorkie/core/Client.kt +++ b/yorkie/src/main/kotlin/dev/yorkie/core/Client.kt @@ -312,7 +312,7 @@ public class Client @VisibleForTesting internal constructor( while (true) { ensureActive() latestStream.safeClose() - val stream = withTimeoutOrNull(1_000) { + val stream = withTimeoutOrNull(60_000) { service.watchDocument( attachment.document.key.documentBasedRequestHeader, ).also { @@ -322,7 +322,7 @@ public class Client @VisibleForTesting internal constructor( val streamJob = launch(start = CoroutineStart.UNDISPATCHED) { val channel = stream.responseChannel() while (!stream.isReceiveClosed() && !channel.isClosedForReceive) { - withTimeoutOrNull(60_000) { + withTimeoutOrNull(180_000) { val receiveResult = channel.receiveCatching() receiveResult.onSuccess { attachment.document.publishEvent(StreamConnectionChanged.Connected) From 8bb6a9c72ecf5382a4b810b915a4300443a7ef5c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E1=84=80=E1=85=B5=E1=86=B7=E1=84=92=E1=85=AD=E1=84=8B?= =?UTF-8?q?=E1=85=AE=5BSE=5D=5BSmartEditor=5D?= Date: Fri, 14 Jun 2024 09:33:53 +0900 Subject: [PATCH 2/2] use streamClient's timeout values for timeout check --- yorkie/src/main/kotlin/dev/yorkie/core/Client.kt | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/yorkie/src/main/kotlin/dev/yorkie/core/Client.kt b/yorkie/src/main/kotlin/dev/yorkie/core/Client.kt index 11afdc0a..a81eccb4 100644 --- a/yorkie/src/main/kotlin/dev/yorkie/core/Client.kt +++ b/yorkie/src/main/kotlin/dev/yorkie/core/Client.kt @@ -45,6 +45,7 @@ import kotlin.collections.Map.Entry import kotlin.coroutines.coroutineContext import kotlin.time.Duration import kotlin.time.Duration.Companion.milliseconds +import kotlin.time.Duration.Companion.minutes import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CoroutineDispatcher import kotlinx.coroutines.CoroutineScope @@ -118,6 +119,10 @@ public class Client @VisibleForTesting internal constructor( private val Document.mutex get() = mutexForDocuments.getOrPut(key) { Mutex() } + private val streamTimeout = with(streamClient) { + callTimeoutMillis.takeIf { it > 0 } ?: (connectTimeoutMillis + readTimeoutMillis) + }.takeIf { it > 0 }?.milliseconds ?: 5.minutes + public constructor( host: String, options: Options = Options(), @@ -312,7 +317,7 @@ public class Client @VisibleForTesting internal constructor( while (true) { ensureActive() latestStream.safeClose() - val stream = withTimeoutOrNull(60_000) { + val stream = withTimeoutOrNull(streamTimeout) { service.watchDocument( attachment.document.key.documentBasedRequestHeader, ).also { @@ -322,7 +327,7 @@ public class Client @VisibleForTesting internal constructor( val streamJob = launch(start = CoroutineStart.UNDISPATCHED) { val channel = stream.responseChannel() while (!stream.isReceiveClosed() && !channel.isClosedForReceive) { - withTimeoutOrNull(180_000) { + withTimeoutOrNull(streamTimeout) { val receiveResult = channel.receiveCatching() receiveResult.onSuccess { attachment.document.publishEvent(StreamConnectionChanged.Connected)