Skip to content

Commit

Permalink
test
Browse files Browse the repository at this point in the history
  • Loading branch information
skhugh committed Apr 11, 2024
1 parent 5964392 commit 25ec2c6
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 37 deletions.
19 changes: 5 additions & 14 deletions yorkie/src/androidTest/kotlin/dev/yorkie/core/TestUtils.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,19 @@ package dev.yorkie.core

import dev.yorkie.document.Document
import java.util.UUID
import java.util.concurrent.TimeUnit
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.runBlocking
import okhttp3.OkHttpClient
import okhttp3.Protocol

const val GENERAL_TIMEOUT = 3_000L

fun createClient() = run {
val unaryClient = OkHttpClient.Builder()
fun createClient() = Client(
"http://10.0.2.2:8080",
unaryClient = OkHttpClient.Builder()
.protocols(listOf(Protocol.HTTP_1_1))
.build()

Client(
"http://10.0.2.2:8080",
unaryClient = unaryClient,
streamClient = unaryClient.newBuilder()
.pingInterval(10L, TimeUnit.SECONDS)
.readTimeout(10L, TimeUnit.MINUTES)
.build(),
)
}
.build(),
)

fun String.toDocKey(): Document.Key {
return Document.Key(
Expand Down
43 changes: 20 additions & 23 deletions yorkie/src/main/kotlin/dev/yorkie/core/Client.kt
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,6 @@ import kotlinx.coroutines.async
import kotlinx.coroutines.cancel
import kotlinx.coroutines.cancelChildren
import kotlinx.coroutines.channels.ClosedReceiveChannelException
import kotlinx.coroutines.channels.onFailure
import kotlinx.coroutines.channels.onSuccess
import kotlinx.coroutines.delay
import kotlinx.coroutines.ensureActive
import kotlinx.coroutines.flow.Flow
Expand Down Expand Up @@ -299,29 +297,28 @@ public class Client @VisibleForTesting internal constructor(
val channel = stream.responseChannel()
var retry = 0
while (!stream.isReceiveClosed() && !channel.isClosedForReceive) {
channel.receiveCatching()
.onSuccess {
_streamConnectionStatus.emit(StreamConnectionStatus.Connected)
handleWatchDocumentsResponse(attachment.document.key, it)
retry = 0
runCatching {
val response = channel.receive()
_streamConnectionStatus.emit(StreamConnectionStatus.Connected)
handleWatchDocumentsResponse(attachment.document.key, response)
retry = 0
}.onFailure {
_streamConnectionStatus.emit(StreamConnectionStatus.Disconnected)
retry++
if (retry > 3 || it is ClosedReceiveChannelException) {
stream.safeClose()
}
.onFailure {
_streamConnectionStatus.emit(StreamConnectionStatus.Disconnected)
retry++
if (retry > 3 || it is ClosedReceiveChannelException) {
stream.safeClose()
}
val errorMessage = if (it is ConnectException) {
it.toString()
} else {
it?.message
}
if (!errorMessage.isNullOrEmpty()) {
YorkieLogger.e("watchStream", errorMessage)
}
ensureActive()
delay(options.reconnectStreamDelay.inWholeMilliseconds)
val errorMessage = if (it is ConnectException) {
it.toString()
} else {
it.message.orEmpty()
}
if (errorMessage.isNotEmpty()) {
YorkieLogger.e("watchStream", errorMessage)
}
ensureActive()
delay(options.reconnectStreamDelay.inWholeMilliseconds)
}
}
}
stream.sendAndClose(
Expand Down

0 comments on commit 25ec2c6

Please sign in to comment.