diff --git a/yorkie/src/main/kotlin/dev/yorkie/core/Client.kt b/yorkie/src/main/kotlin/dev/yorkie/core/Client.kt index c4d01d25b..1bf763da7 100644 --- a/yorkie/src/main/kotlin/dev/yorkie/core/Client.kt +++ b/yorkie/src/main/kotlin/dev/yorkie/core/Client.kt @@ -280,7 +280,9 @@ public class Client @VisibleForTesting internal constructor( document.onlineClients.value += clientIDs val presences = document.presences.first { it.keys.containsAll(clientIDs) } - document.publish(PresenceChange.MyPresence.Initialized(presences.asPresences())) + document.publishPresenceEvent( + PresenceChange.MyPresence.Initialized(presences.asPresences()), + ) return } @@ -298,7 +300,7 @@ public class Client @VisibleForTesting internal constructor( document.onlineClients.value += publisher if (publisher in document.allPresences.value) { val presence = document.presences.first { publisher in it }[publisher] ?: return - document.publish( + document.publishPresenceEvent( PresenceChange.Others.Watched(PresenceInfo(publisher, presence)), ) } @@ -310,7 +312,9 @@ public class Client @VisibleForTesting internal constructor( // the 'unwatched' event is triggered while handling the PresenceChange. val presence = document.presences.value[publisher] ?: return document.onlineClients.value -= publisher - document.publish(PresenceChange.Others.Unwatched(PresenceInfo(publisher, presence))) + document.publishPresenceEvent( + PresenceChange.Others.Unwatched(PresenceInfo(publisher, presence)), + ) } DocEventType.DOC_EVENT_TYPE_DOCUMENT_CHANGED -> { diff --git a/yorkie/src/main/kotlin/dev/yorkie/document/Document.kt b/yorkie/src/main/kotlin/dev/yorkie/document/Document.kt index df12077f4..73b6a6467 100644 --- a/yorkie/src/main/kotlin/dev/yorkie/document/Document.kt +++ b/yorkie/src/main/kotlin/dev/yorkie/document/Document.kt @@ -46,6 +46,7 @@ import kotlinx.coroutines.flow.first import kotlinx.coroutines.flow.mapNotNull import kotlinx.coroutines.flow.stateIn import kotlinx.coroutines.withContext +import kotlinx.coroutines.withTimeoutOrNull /** * A CRDT-based data type. @@ -144,7 +145,7 @@ public class Document(public val key: Key, private val options: Options = Option } if (change.hasPresenceChange) { val presence = _presences.value[actorID] ?: return@async false - eventStream.emit(createPresenceChangedEvent(actorID, presence)) + publishPresenceEvent(createPresenceChangedEvent(actorID, presence)) } true } @@ -304,7 +305,7 @@ public class Document(public val key: Key, private val options: Options = Option } newPresences?.let { emitPresences(it) } - presenceEvent?.let { eventStream.emit(it) } + presenceEvent?.let { publishPresenceEvent(it) } changeID = changeID.syncLamport(change.id.lamport) } } @@ -369,21 +370,25 @@ public class Document(public val key: Key, private val options: Options = Option /** * Triggers an event in this [Document]. */ - internal suspend fun publish(event: Event) { - when (event) { - is Others.Watched -> { - presences.first { event.changed.actorID in it.keys } - } - - is Others.Unwatched -> { - presences.first { event.changed.actorID !in it.keys } - } + internal suspend fun publishPresenceEvent(event: Event.PresenceChange) { + val predicate: (Presences) -> Boolean = { presences -> + when (event) { + is MyPresence.Initialized -> event.initialized == presences + is MyPresence.PresenceChanged -> { + val actorID = event.changed.actorID + actorID !in presences || event.changed.presence == presences[actorID] + } - is MyPresence.Initialized -> { - presences.first { event.initialized == it } + is Others.Watched -> event.changed.actorID in presences + is Others.Unwatched -> event.changed.actorID !in presences + is Others.PresenceChanged -> { + val actorID = event.changed.actorID + actorID !in presences || event.changed.presence == presences[actorID] + } } - - else -> {} + } + withTimeoutOrNull(3_000) { + presences.first(predicate) } eventStream.emit(event) }