Skip to content

Commit

Permalink
Merge branch 'develop' into dependabot/github_actions/JamesIves/githu…
Browse files Browse the repository at this point in the history
…b-pages-deploy-action-4.7.3
  • Loading branch information
yamilmedina authored Feb 25, 2025
2 parents f12ef73 + e422297 commit b893879
Show file tree
Hide file tree
Showing 37 changed files with 1,489 additions and 1,109 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ import com.wire.kalium.logic.feature.publicuser.GetAllContactsResult
import kotlinx.coroutines.flow.first

suspend fun getConversations(userSession: UserSessionScope): List<Conversation> {
userSession.syncManager.waitUntilLive()
userSession.syncExecutor.request {
waitUntilLiveOrFailure()
}

val conversations = userSession.conversations.getConversations().let {
when (it) {
Expand All @@ -53,7 +55,9 @@ suspend fun UserSessionScope.listConversations(): List<Conversation> {
}

suspend fun UserSessionScope.selectConversation(): Conversation {
syncManager.waitUntilLive()
syncExecutor.request {
waitUntilLiveOrFailure()
}

val conversations = listConversations()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import com.wire.kalium.logic.feature.auth.autoVersioningAuth.AutoVersionAuthScop
import com.wire.kalium.logic.feature.client.RegisterClientResult
import com.wire.kalium.logic.feature.client.RegisterClientUseCase
import com.wire.kalium.logic.feature.server.GetServerConfigResult
import com.wire.kalium.util.DelicateKaliumApi
import kotlinx.coroutines.runBlocking

class LoginCommand : CliktCommand(name = "login") {
Expand Down Expand Up @@ -110,6 +111,7 @@ class LoginCommand : CliktCommand(name = "login") {
}
}

@OptIn(DelicateKaliumApi::class)
override fun run(): Unit = runBlocking {
val loginResult = authenticate().let {
if (it !is AuthenticationResult.Success) {
Expand All @@ -132,6 +134,10 @@ class LoginCommand : CliktCommand(name = "login") {
}
}

userSession = currentContext.findOrSetObject { coreLogic.getSessionScope(userId) }
userSession = currentContext.findOrSetObject {
coreLogic.getSessionScope(userId).also {
it.syncExecutor.request { keepSyncAlwaysOn() }
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ class UpdateSupportedProtocolsCommand : CliktCommand(name = "update-supported-pr
private val userSession by requireObject<UserSessionScope>()

override fun run() = runBlocking {
userSession.syncManager.waitUntilLive()
userSession.syncExecutor.request {
waitUntilLiveOrFailure()
}
userSession.users.updateSupportedProtocols().fold({ failure ->
throw PrintMessage("updating supported protocols failed: $failure")
}, {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,14 @@ import com.wire.crypto.BufferedDecryptedMessage
import com.wire.crypto.Ciphersuite
import com.wire.crypto.ConversationConfiguration
import com.wire.crypto.CoreCrypto
import com.wire.crypto.CoreCryptoCommand
import com.wire.crypto.CoreCryptoContext
import com.wire.crypto.CoreCryptoException
import com.wire.crypto.CustomConfiguration
import com.wire.crypto.DecryptedMessage
import com.wire.crypto.E2eiConversationState
import com.wire.crypto.MlsCredentialType
import com.wire.crypto.MlsException
import com.wire.crypto.MlsGroupInfoEncryptionType
import com.wire.crypto.MlsRatchetTreeType
import com.wire.crypto.MlsWirePolicy
Expand Down Expand Up @@ -137,22 +141,41 @@ class MLSClientImpl(
return applicationMessage
}

@Suppress("TooGenericExceptionCaught")
override suspend fun decryptMessage(groupId: MLSGroupId, message: ApplicationMessage): List<DecryptedMessageBundle> {
val decryptedMessage = coreCrypto.decryptMessage(
groupId.decodeBase64Bytes(),
message
)
var decryptedMessage: DecryptedMessage? = null

coreCrypto.transaction(object : CoreCryptoCommand {
override suspend fun execute(context: CoreCryptoContext) {
try {
val result = context.decryptMessage(
groupId.decodeBase64Bytes(),
message
)
decryptedMessage = result

} catch (throwable: Throwable) {
val isBufferedFutureError = (
throwable is CoreCryptoException.Mls && throwable.v1 is MlsException.BufferedFutureMessage
) || throwable.message
?.contains("Incoming message is a commit for which we have not yet received all the proposals") == true
if (!isBufferedFutureError) {
throw throwable
}
}
}
})

val messageBundle = listOf(
toDecryptedMessageBundle(
decryptedMessage
)
)
val bufferedMessages = decryptedMessage.bufferedMessages?.map {
toDecryptedMessageBundle(it)
} ?: emptyList()
if (decryptedMessage == null) {
return emptyList()
}

val mainMessageBundle = listOf(toDecryptedMessageBundle(decryptedMessage!!))
val bufferedBundles = decryptedMessage!!.bufferedMessages
?.map { toDecryptedMessageBundle(it) }
?: emptyList()

return messageBundle + bufferedMessages
return mainMessageBundle + bufferedBundles
}

override suspend fun commitAccepted(groupId: MLSGroupId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,105 @@ class MLSClientTest : BaseMLSClientTest() {
assertNull(aliceClient.decryptMessage(welcomeBundle.groupId, commit).first().message)
}

@Test
fun givenThreeClients_whenProcessingCommitOutOfOrder_shouldCatchBufferedFutureMessageAndBuffer() = runTest {
// Bob creates a conversation.
val bobClient = createClient(BOB1)
bobClient.createConversation(MLS_CONVERSATION_ID, externalSenderKey)

// Bob adds Alice, Alice processes the welcome.
val aliceClient = createClient(ALICE1)
val welcomeAlice = bobClient.addMember(
MLS_CONVERSATION_ID,
listOf(aliceClient.generateKeyPackages(1).first())
)!!.welcome!!
bobClient.commitAccepted(MLS_CONVERSATION_ID)
aliceClient.processWelcomeMessage(welcomeAlice)

// Bob adds Carol but Alice does NOT process that commit => out of order for Alice later.
val carolClient = createClient(CAROL1)
val addCarolResult = bobClient.addMember(
MLS_CONVERSATION_ID,
listOf(carolClient.generateKeyPackages(1).first())
)
val commitAddCarol = addCarolResult!!.commit
bobClient.commitAccepted(MLS_CONVERSATION_ID)

// Bob immediately removes Carol => definitely out of order for Alice.
val removeCarolResult = bobClient.removeMember(MLS_CONVERSATION_ID, listOf(CAROL1.qualifiedClientId))
val commitRemoveCarol = removeCarolResult.commit
bobClient.commitAccepted(MLS_CONVERSATION_ID)

// Alice tries to decrypt the removeCarol commit, which references an epoch Alice hasn't seen yet.
// In normal MLS logic, this triggers a "buffering" error, typically thrown as MlsException.BufferedFutureMessage
// wrapped in CoreCryptoException.Mls. The client code is supposed to swallow that error in a transaction
// and return an empty DecryptedMessage list.

val decryptedBundlesResult = runCatching {
aliceClient.decryptMessage(MLS_CONVERSATION_ID, commitRemoveCarol)
}

// The exception should be caught internally, so from the caller's perspective we succeed with an empty result.
// That indicates the message was buffered instead of fully decrypted.
assertTrue(
decryptedBundlesResult.isSuccess,
"Out-of-order commit should not propagate BufferedFutureMessage as an unhandled exception."
)

val decryptedBundles = decryptedBundlesResult.getOrThrow()
assertTrue(
decryptedBundles.isEmpty(),
"Decryption result should be empty for a buffered out-of-order commit."
)
}

@Test
fun givenOutOfOrderCommits_whenProcessingMissingCommitLater_shouldAlsoProcessBufferedOne() = runTest {
val bobClient = createClient(BOB1)
bobClient.createConversation(MLS_CONVERSATION_ID, externalSenderKey)

val aliceClient = createClient(ALICE1)
// Bob adds Alice to the conversation
val welcomeForAlice = bobClient.addMember(
MLS_CONVERSATION_ID,
listOf(aliceClient.generateKeyPackages(1).first())
)!!.welcome!!
bobClient.commitAccepted(MLS_CONVERSATION_ID)
aliceClient.processWelcomeMessage(welcomeForAlice)

// Bob adds Carol, but Alice never sees this commit => out-of-order for Alice
val carolClient = createClient(CAROL1)
val addCarolResult = bobClient.addMember(
MLS_CONVERSATION_ID,
listOf(carolClient.generateKeyPackages(1).first())
)
val commitAddCarol = addCarolResult!!.commit
bobClient.commitAccepted(MLS_CONVERSATION_ID)

// Immediately Bob removes Carol => definitely out-of-order for Alice
val removeCarolResult = bobClient.removeMember(MLS_CONVERSATION_ID, listOf(CAROL1.qualifiedClientId))
val commitRemoveCarol = removeCarolResult.commit
bobClient.commitAccepted(MLS_CONVERSATION_ID)

// Alice tries to decrypt the removeCarol commit first => out-of-order => should buffer
val removeResult = aliceClient.decryptMessage(MLS_CONVERSATION_ID, commitRemoveCarol)
assertTrue(
removeResult.isEmpty(),
"Out-of-order remove commit should be buffered and return an empty list."
)

// Now Alice processes the missing 'addCarol' commit.
// By processing the addCarol commit, MLS should also flush any previously buffered commits (the removeCarol).
val addResult = aliceClient.decryptMessage(MLS_CONVERSATION_ID, commitAddCarol)

// We expect 2 total commits to be processed now: (1) addCarol, (2) removeCarol.
assertEquals(
2,
addResult.size,
"Processing the older 'addCarol' commit should also flush the buffered 'removeCarol' commit, resulting in 2 items."
)
}

companion object {
val externalSenderKey = ByteArray(32)
val DEFAULT_CIPHER_SUITES = 1.toUShort()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -866,7 +866,9 @@ internal class MLSConversationDataSource(

private suspend fun keepCommitAndRetry(groupID: GroupID): Either<CoreFailure, Unit> {
kaliumLogger.w("Migrating failed commit to new epoch and re-trying.")

// FIXME: Sync Cyclic Dependency.
// This function can be called DURING sync. And at the same time, it waits for Sync.
// Perhaps it should be scheduled for a retry in the future, after Sync is done.
return syncManager.waitUntilLiveOrFailure().flatMap {
commitPendingProposalsWithoutRetry(groupID)
}
Expand All @@ -878,6 +880,9 @@ internal class MLSConversationDataSource(
wrapMLSRequest {
mlsClient.clearPendingCommit(idMapper.toCryptoModel(groupID))
}.flatMap {
// FIXME: Sync Cyclic Dependency.
// This function can be called DURING sync. And at the same time, it waits for Sync.
// Perhaps it should be scheduled for a retry in the future, after Sync is done.
syncManager.waitUntilLiveOrFailure()
}
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -43,20 +43,7 @@ internal interface IncrementalSyncRepository {
*/
val incrementalSyncState: Flow<IncrementalSyncStatus>

/**
* Buffered flow of [ConnectionPolicy].
* - Has a replay size of 1, so the latest
* value is always immediately available for new observers.
* - Doesn't emit repeated values.
* - It has a limited buffer of size [BUFFER_SIZE]
* that will drop the oldest values if the buffer is full
* to prevent emissions from being suspended due to slow
* collectors.
* @see [BufferOverflow]
*/
val connectionPolicyState: Flow<ConnectionPolicy>
suspend fun updateIncrementalSyncState(newState: IncrementalSyncStatus)
suspend fun setConnectionPolicy(connectionPolicy: ConnectionPolicy)

companion object {
// The same default buffer size used by Coroutines channels
Expand All @@ -80,28 +67,13 @@ internal class InMemoryIncrementalSyncRepository(
.asSharedFlow()
.distinctUntilChanged()

private val _connectionPolicy = MutableSharedFlow<ConnectionPolicy>(
replay = 1,
extraBufferCapacity = BUFFER_SIZE,
onBufferOverflow = BufferOverflow.DROP_OLDEST,
)

override val connectionPolicyState = _connectionPolicy
.asSharedFlow()
.distinctUntilChanged()

init {
_syncState.tryEmit(IncrementalSyncStatus.Pending)
_connectionPolicy.tryEmit(ConnectionPolicy.KEEP_ALIVE)
}

override suspend fun updateIncrementalSyncState(newState: IncrementalSyncStatus) {
logger.i("IncrementalSyncStatus Updated FROM:${_syncState.first()}; TO: $newState")
_syncState.emit(newState)
}

override suspend fun setConnectionPolicy(connectionPolicy: ConnectionPolicy) {
logger.i("IncrementalSync Connection Policy changed: $connectionPolicy")
_connectionPolicy.emit(connectionPolicy)
}
}
Loading

0 comments on commit b893879

Please sign in to comment.