Skip to content

Commit

Permalink
ensure to post event after presences flow emission
Browse files Browse the repository at this point in the history
  • Loading branch information
7hong13 committed Jan 19, 2024
1 parent f334ce4 commit b799c0d
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 18 deletions.
10 changes: 7 additions & 3 deletions yorkie/src/main/kotlin/dev/yorkie/core/Client.kt
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,9 @@ public class Client @VisibleForTesting internal constructor(
document.onlineClients.value += clientIDs

val presences = document.presences.first { it.keys.containsAll(clientIDs) }
document.publish(PresenceChange.MyPresence.Initialized(presences.asPresences()))
document.publishPresenceEvent(
PresenceChange.MyPresence.Initialized(presences.asPresences()),
)
return
}

Expand All @@ -298,7 +300,7 @@ public class Client @VisibleForTesting internal constructor(
document.onlineClients.value += publisher
if (publisher in document.allPresences.value) {
val presence = document.presences.first { publisher in it }[publisher] ?: return
document.publish(
document.publishPresenceEvent(
PresenceChange.Others.Watched(PresenceInfo(publisher, presence)),
)
}
Expand All @@ -310,7 +312,9 @@ public class Client @VisibleForTesting internal constructor(
// the 'unwatched' event is triggered while handling the PresenceChange.
val presence = document.presences.value[publisher] ?: return
document.onlineClients.value -= publisher
document.publish(PresenceChange.Others.Unwatched(PresenceInfo(publisher, presence)))
document.publishPresenceEvent(
PresenceChange.Others.Unwatched(PresenceInfo(publisher, presence)),
)
}

DocEventType.DOC_EVENT_TYPE_DOCUMENT_CHANGED -> {
Expand Down
35 changes: 20 additions & 15 deletions yorkie/src/main/kotlin/dev/yorkie/document/Document.kt
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.mapNotNull
import kotlinx.coroutines.flow.stateIn
import kotlinx.coroutines.withContext
import kotlinx.coroutines.withTimeoutOrNull

/**
* A CRDT-based data type.
Expand Down Expand Up @@ -144,7 +145,7 @@ public class Document(public val key: Key, private val options: Options = Option
}
if (change.hasPresenceChange) {
val presence = _presences.value[actorID] ?: return@async false
eventStream.emit(createPresenceChangedEvent(actorID, presence))
publishPresenceEvent(createPresenceChangedEvent(actorID, presence))
}
true
}
Expand Down Expand Up @@ -304,7 +305,7 @@ public class Document(public val key: Key, private val options: Options = Option
}

newPresences?.let { emitPresences(it) }
presenceEvent?.let { eventStream.emit(it) }
presenceEvent?.let { publishPresenceEvent(it) }
changeID = changeID.syncLamport(change.id.lamport)
}
}
Expand Down Expand Up @@ -369,21 +370,25 @@ public class Document(public val key: Key, private val options: Options = Option
/**
* Triggers an event in this [Document].
*/
internal suspend fun publish(event: Event) {
when (event) {
is Others.Watched -> {
presences.first { event.changed.actorID in it.keys }
}

is Others.Unwatched -> {
presences.first { event.changed.actorID !in it.keys }
}
internal suspend fun publishPresenceEvent(event: Event.PresenceChange) {
val predicate: (Presences) -> Boolean = { presences ->
when (event) {
is MyPresence.Initialized -> event.initialized == presences
is MyPresence.PresenceChanged -> {
val actorID = event.changed.actorID
actorID !in presences || event.changed.presence == presences[actorID]
}

is MyPresence.Initialized -> {
presences.first { event.initialized == it }
is Others.Watched -> event.changed.actorID in presences
is Others.Unwatched -> event.changed.actorID !in presences
is Others.PresenceChanged -> {
val actorID = event.changed.actorID
actorID !in presences || event.changed.presence == presences[actorID]
}
}

else -> {}
}
withTimeoutOrNull(3_000) {
presences.first(predicate)
}
eventStream.emit(event)
}
Expand Down

0 comments on commit b799c0d

Please sign in to comment.