Skip to content

Commit

Permalink
Avoid creation of list instances in a while true loop
Browse files Browse the repository at this point in the history
  • Loading branch information
skhugh committed Jun 4, 2024
1 parent 9cae02b commit b2b2051
Showing 1 changed file with 30 additions and 16 deletions.
46 changes: 30 additions & 16 deletions yorkie/src/main/kotlin/dev/yorkie/core/Client.kt
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ import kotlinx.coroutines.flow.asStateFlow
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.fold
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.mapNotNull
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
import okhttp3.OkHttpClient
Expand Down Expand Up @@ -167,7 +168,7 @@ public class Client @VisibleForTesting internal constructor(
private fun runSyncLoop() {
scope.launch(activationJob) {
while (true) {
filterRealTimeSyncNeeded().asSyncFlow().collect { (document, result) ->
attachments.value.entries.asSyncFlow(true).collect { (document, result) ->
document.publishEvent(
if (result.isSuccess) {
SyncStatusChanged.Synced
Expand All @@ -181,12 +182,6 @@ public class Client @VisibleForTesting internal constructor(
}
}

private fun filterRealTimeSyncNeeded() =
attachments.value.filterValues(Attachment::needRealTimeSync).map { (key, attachment) ->
attachments.value += key to attachment.copy(remoteChangeEventReceived = false)
attachment
}

/**
* Pushes local changes of the attached documents to the server and
* receives changes of the remote replica from the server then apply them to local documents.
Expand All @@ -195,12 +190,12 @@ public class Client @VisibleForTesting internal constructor(
return scope.async {
var failure: Throwable? = null
val attachments = document?.let {
listOf(
attachments.value[it.key]?.copy(syncMode = SyncMode.Realtime)
?: throw IllegalArgumentException("document is not attached"),
)
} ?: attachments.value.values
attachments.asSyncFlow().collect { (document, result) ->
val attachment = attachments.value[it.key]?.copy(syncMode = SyncMode.Realtime)
?: throw IllegalArgumentException("document is not attached")

listOf(AttachmentEntry(it.key, attachment))
} ?: attachments.value.entries
attachments.asSyncFlow(false).collect { (document, result) ->
document.publishEvent(
if (result.isSuccess) {
SyncStatusChanged.Synced
Expand All @@ -217,10 +212,24 @@ public class Client @VisibleForTesting internal constructor(
}
}

private suspend fun Collection<Attachment>.asSyncFlow(): Flow<SyncResult> {
private suspend fun Collection<Map.Entry<Document.Key, Attachment>>.asSyncFlow(realTimeOnly: Boolean): Flow<SyncResult> {
return asFlow()
.map { attachment ->
val (document, documentID, syncMode) = attachment
.mapNotNull { (key, attachment) ->
val (document, documentID, syncMode) = if (realTimeOnly) {
if (!attachment.needRealTimeSync()) {
return@mapNotNull null
}
if (attachment.remoteChangeEventReceived) {
attachment.copy(remoteChangeEventReceived = false).also {
attachments.value += key to it
}
} else {
attachment
}
} else {
attachment
}

SyncResult(
document,
runCatching {
Expand Down Expand Up @@ -629,6 +638,11 @@ public class Client @VisibleForTesting internal constructor(
streamClient.dispatcher.executorService.shutdown()
}

private class AttachmentEntry(
override val key: Document.Key,
override val value: Attachment,
) : Map.Entry<Document.Key, Attachment>

private data class SyncResult(val document: Document, val result: OperationResult)

private class WatchJobHolder(val documentID: String, val job: Job)
Expand Down

0 comments on commit b2b2051

Please sign in to comment.