Skip to content

Commit

Permalink
Avoid creation of list instances in a while true loop (#196)
Browse files Browse the repository at this point in the history
  • Loading branch information
skhugh authored Jun 4, 2024
1 parent 9cae02b commit 080f2cc
Showing 1 changed file with 33 additions and 16 deletions.
49 changes: 33 additions & 16 deletions yorkie/src/main/kotlin/dev/yorkie/core/Client.kt
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import dev.yorkie.util.createSingleThreadDispatcher
import java.io.Closeable
import java.io.InterruptedIOException
import java.util.UUID
import kotlin.collections.Map.Entry
import kotlin.coroutines.coroutineContext
import kotlin.time.Duration
import kotlin.time.Duration.Companion.milliseconds
Expand Down Expand Up @@ -69,6 +70,7 @@ import kotlinx.coroutines.flow.asStateFlow
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.fold
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.mapNotNull
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
import okhttp3.OkHttpClient
Expand Down Expand Up @@ -167,7 +169,7 @@ public class Client @VisibleForTesting internal constructor(
private fun runSyncLoop() {
scope.launch(activationJob) {
while (true) {
filterRealTimeSyncNeeded().asSyncFlow().collect { (document, result) ->
attachments.value.entries.asSyncFlow(true).collect { (document, result) ->
document.publishEvent(
if (result.isSuccess) {
SyncStatusChanged.Synced
Expand All @@ -181,12 +183,6 @@ public class Client @VisibleForTesting internal constructor(
}
}

private fun filterRealTimeSyncNeeded() =
attachments.value.filterValues(Attachment::needRealTimeSync).map { (key, attachment) ->
attachments.value += key to attachment.copy(remoteChangeEventReceived = false)
attachment
}

/**
* Pushes local changes of the attached documents to the server and
* receives changes of the remote replica from the server then apply them to local documents.
Expand All @@ -195,12 +191,12 @@ public class Client @VisibleForTesting internal constructor(
return scope.async {
var failure: Throwable? = null
val attachments = document?.let {
listOf(
attachments.value[it.key]?.copy(syncMode = SyncMode.Realtime)
?: throw IllegalArgumentException("document is not attached"),
)
} ?: attachments.value.values
attachments.asSyncFlow().collect { (document, result) ->
val attachment = attachments.value[it.key]?.copy(syncMode = SyncMode.Realtime)
?: throw IllegalArgumentException("document is not attached")

listOf(AttachmentEntry(it.key, attachment))
} ?: attachments.value.entries
attachments.asSyncFlow(false).collect { (document, result) ->
document.publishEvent(
if (result.isSuccess) {
SyncStatusChanged.Synced
Expand All @@ -217,10 +213,26 @@ public class Client @VisibleForTesting internal constructor(
}
}

private suspend fun Collection<Attachment>.asSyncFlow(): Flow<SyncResult> {
private suspend fun Collection<Entry<Document.Key, Attachment>>.asSyncFlow(
realTimeOnly: Boolean,
): Flow<SyncResult> {
return asFlow()
.map { attachment ->
val (document, documentID, syncMode) = attachment
.mapNotNull { (key, attachment) ->
val (document, documentID, syncMode) = if (realTimeOnly) {
if (!attachment.needRealTimeSync()) {
return@mapNotNull null
}
if (attachment.remoteChangeEventReceived) {
attachment.copy(remoteChangeEventReceived = false).also {
attachments.value += key to it
}
} else {
attachment
}
} else {
attachment
}

SyncResult(
document,
runCatching {
Expand Down Expand Up @@ -629,6 +641,11 @@ public class Client @VisibleForTesting internal constructor(
streamClient.dispatcher.executorService.shutdown()
}

private class AttachmentEntry(
override val key: Document.Key,
override val value: Attachment,
) : Entry<Document.Key, Attachment>

private data class SyncResult(val document: Document, val result: OperationResult)

private class WatchJobHolder(val documentID: String, val job: Job)
Expand Down

0 comments on commit 080f2cc

Please sign in to comment.