Skip to content

Commit

Permalink
feat: discover legal hold when sending message [WPB-5999] (#2333)
Browse files Browse the repository at this point in the history
* feat: discover legal hold when receiving message [WPB-5837]

* removed unused code

* handle only once for each user id

* reduce db queries

* use date of received message to create system message for conversation

* replace DebounceBuffer with simpler TriggerBuffer

* handle live messages right away and use message timestamp for new system messages

* change name of the buffer

* fix detekt

* feat: discover legal hold when sending message [WPB-5999]

* return dedicated failure type when legal hold enabled

* return messageId with legal hold enabled failure

* fix detekt issues
  • Loading branch information
saleniuk authored Dec 28, 2023
1 parent 10e2eec commit 19491b1
Show file tree
Hide file tree
Showing 11 changed files with 526 additions and 44 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Wire
* Copyright (C) 2023 Wire Swiss GmbH
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see http://www.gnu.org/licenses/.
*/

package com.wire.kalium.logic.failure

import com.wire.kalium.logic.CoreFailure
import com.wire.kalium.logic.data.id.MessageId

data class LegalHoldEnabledForConversationFailure(val messageId: MessageId) : CoreFailure.FeatureFailure()
Original file line number Diff line number Diff line change
Expand Up @@ -1600,6 +1600,7 @@ class UserSessionScope internal constructor(
conversationRepository,
mlsConversationRepository,
clientRepository,
clientRemoteRepository,
clientIdProvider,
proteusClientProvider,
mlsClientProvider,
Expand All @@ -1613,6 +1614,7 @@ class UserSessionScope internal constructor(
selfConversationIdProvider,
staleEpochVerifier,
eventProcessor,
legalHoldHandler,
this
)
val messages: MessageScope
Expand All @@ -1625,6 +1627,7 @@ class UserSessionScope internal constructor(
conversationRepository,
mlsConversationRepository,
clientRepository,
clientRemoteRepository,
proteusClientProvider,
mlsClientProvider,
preKeyRepository,
Expand All @@ -1641,6 +1644,7 @@ class UserSessionScope internal constructor(
observeSelfDeletingMessages,
messageMetadataRepository,
staleEpochVerifier,
legalHoldHandler,
this
)
val users: UserScope
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ import com.wire.kalium.logic.data.asset.AssetRepository
import com.wire.kalium.logic.data.client.ClientRepository
import com.wire.kalium.logic.data.client.MLSClientProvider
import com.wire.kalium.logic.data.client.ProteusClientProvider
import com.wire.kalium.logic.data.client.remote.ClientRemoteRepository
import com.wire.kalium.logic.data.conversation.ConversationRepository
import com.wire.kalium.logic.data.conversation.LegalHoldStatusMapperImpl
import com.wire.kalium.logic.data.conversation.MLSConversationRepository
import com.wire.kalium.logic.data.id.CurrentClientIdProvider
import com.wire.kalium.logic.data.message.MessageRepository
Expand All @@ -35,7 +37,6 @@ import com.wire.kalium.logic.data.prekey.PreKeyRepository
import com.wire.kalium.logic.data.sync.SlowSyncRepository
import com.wire.kalium.logic.data.user.UserId
import com.wire.kalium.logic.data.user.UserRepository
import com.wire.kalium.logic.data.conversation.LegalHoldStatusMapperImpl
import com.wire.kalium.logic.feature.message.MLSMessageCreator
import com.wire.kalium.logic.feature.message.MLSMessageCreatorImpl
import com.wire.kalium.logic.feature.message.MessageEnvelopeCreator
Expand All @@ -53,6 +54,7 @@ import com.wire.kalium.logic.feature.message.ephemeral.DeleteEphemeralMessageFor
import com.wire.kalium.logic.feature.message.ephemeral.EphemeralMessageDeletionHandlerImpl
import com.wire.kalium.logic.sync.SyncManager
import com.wire.kalium.logic.sync.incremental.EventProcessor
import com.wire.kalium.logic.sync.receiver.handler.legalhold.LegalHoldHandler
import com.wire.kalium.logic.util.MessageContentEncoder
import com.wire.kalium.util.KaliumDispatcher
import com.wire.kalium.util.KaliumDispatcherImpl
Expand All @@ -67,6 +69,7 @@ class DebugScope internal constructor(
private val conversationRepository: ConversationRepository,
private val mlsConversationRepository: MLSConversationRepository,
private val clientRepository: ClientRepository,
private val clientRemoteRepository: ClientRemoteRepository,
private val currentClientIdProvider: CurrentClientIdProvider,
private val proteusClientProvider: ProteusClientProvider,
private val mlsClientProvider: MLSClientProvider,
Expand All @@ -80,6 +83,7 @@ class DebugScope internal constructor(
private val selfConversationIdProvider: SelfConversationIdProvider,
private val staleEpochVerifier: StaleEpochVerifier,
private val eventProcessor: EventProcessor,
private val legalHoldHandler: LegalHoldHandler,
private val scope: CoroutineScope,
internal val dispatcher: KaliumDispatcher = KaliumDispatcherImpl
) {
Expand Down Expand Up @@ -113,9 +117,10 @@ class DebugScope internal constructor(
get() = MessageSendFailureHandlerImpl(
userRepository,
clientRepository,
clientRemoteRepository,
messageRepository,
messageSendingScheduler,
conversationRepository
conversationRepository,
)

private val sessionEstablisher: SessionEstablisher
Expand Down Expand Up @@ -153,6 +158,7 @@ class DebugScope internal constructor(
mlsConversationRepository,
syncManager,
messageSendFailureHandler,
legalHoldHandler,
sessionEstablisher,
messageEnvelopeCreator,
mlsMessageCreator,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import com.wire.kalium.logic.data.asset.AssetRepository
import com.wire.kalium.logic.data.client.ClientRepository
import com.wire.kalium.logic.data.client.MLSClientProvider
import com.wire.kalium.logic.data.client.ProteusClientProvider
import com.wire.kalium.logic.data.client.remote.ClientRemoteRepository
import com.wire.kalium.logic.data.connection.ConnectionRepository
import com.wire.kalium.logic.data.conversation.ConversationRepository
import com.wire.kalium.logic.data.conversation.LegalHoldStatusMapper
Expand Down Expand Up @@ -65,6 +66,7 @@ import com.wire.kalium.logic.feature.selfDeletingMessages.ObserveSelfDeletionTim
import com.wire.kalium.logic.feature.sessionreset.ResetSessionUseCase
import com.wire.kalium.logic.feature.sessionreset.ResetSessionUseCaseImpl
import com.wire.kalium.logic.sync.SyncManager
import com.wire.kalium.logic.sync.receiver.handler.legalhold.LegalHoldHandler
import com.wire.kalium.logic.util.MessageContentEncoder
import com.wire.kalium.util.KaliumDispatcher
import com.wire.kalium.util.KaliumDispatcherImpl
Expand All @@ -80,6 +82,7 @@ class MessageScope internal constructor(
private val conversationRepository: ConversationRepository,
private val mlsConversationRepository: MLSConversationRepository,
private val clientRepository: ClientRepository,
private val clientRemoteRepository: ClientRemoteRepository,
private val proteusClientProvider: ProteusClientProvider,
private val mlsClientProvider: MLSClientProvider,
private val preKeyRepository: PreKeyRepository,
Expand All @@ -96,6 +99,7 @@ class MessageScope internal constructor(
private val observeSelfDeletingMessages: ObserveSelfDeletionTimerSettingsForConversationUseCase,
private val messageMetadataRepository: MessageMetadataRepository,
private val staleEpochVerifier: StaleEpochVerifier,
private val legalHoldHandler: LegalHoldHandler,
private val scope: CoroutineScope,
internal val dispatcher: KaliumDispatcher = KaliumDispatcherImpl,
private val legalHoldStatusMapper: LegalHoldStatusMapper = LegalHoldStatusMapperImpl
Expand All @@ -105,6 +109,7 @@ class MessageScope internal constructor(
get() = MessageSendFailureHandlerImpl(
userRepository,
clientRepository,
clientRemoteRepository,
messageRepository,
messageSendingScheduler,
conversationRepository
Expand Down Expand Up @@ -158,6 +163,7 @@ class MessageScope internal constructor(
mlsConversationRepository,
syncManager,
messageSendFailureHandler,
legalHoldHandler,
sessionEstablisher,
messageEnvelopeCreator,
mlsMessageCreator,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,16 @@ package com.wire.kalium.logic.feature.message
import com.wire.kalium.logic.CoreFailure
import com.wire.kalium.logic.NetworkFailure
import com.wire.kalium.logic.StorageFailure
import com.wire.kalium.logic.data.client.ClientMapper
import com.wire.kalium.logic.data.client.ClientRepository
import com.wire.kalium.logic.data.client.remote.ClientRemoteRepository
import com.wire.kalium.logic.data.conversation.ClientId
import com.wire.kalium.logic.data.conversation.ConversationRepository
import com.wire.kalium.logic.data.id.ConversationId
import com.wire.kalium.logic.data.message.MessageRepository
import com.wire.kalium.logic.data.user.UserId
import com.wire.kalium.logic.data.user.UserRepository
import com.wire.kalium.logic.di.MapperProvider
import com.wire.kalium.logic.failure.ProteusSendMessageFailure
import com.wire.kalium.logic.functional.Either
import com.wire.kalium.logic.functional.flatMap
Expand Down Expand Up @@ -66,12 +69,15 @@ interface MessageSendFailureHandler {
)
}

@Suppress("LongParameterList")
class MessageSendFailureHandlerImpl internal constructor(
private val userRepository: UserRepository,
private val clientRepository: ClientRepository,
private val clientRemoteRepository: ClientRemoteRepository,
private val messageRepository: MessageRepository,
private val messageSendingScheduler: MessageSendingScheduler,
private val conversationRepository: ConversationRepository,
private val clientMapper: ClientMapper = MapperProvider.clientMapper(),
) : MessageSendFailureHandler {

override suspend fun handleClientsHaveChangedFailure(
Expand Down Expand Up @@ -108,10 +114,16 @@ class MessageSendFailureHandlerImpl internal constructor(
else userRepository.fetchUsersByIds(userId)
}

private suspend fun addMissingClients(missingClients: Map<UserId, List<ClientId>>): Either<CoreFailure, Unit> {
return if (missingClients.isEmpty()) Either.Right(Unit)
else clientRepository.storeMapOfUserToClientId(missingClients)
}
private suspend fun addMissingClients(missingClients: Map<UserId, List<ClientId>>): Either<CoreFailure, Unit> =
if (missingClients.isEmpty()) Either.Right(Unit)
else clientRemoteRepository.fetchOtherUserClients(missingClients.keys.toList())
.flatMap {
it.map { (userId, clientList) -> clientMapper.toInsertClientParam(clientList, userId) }
.flatten().let { insertClientParamList ->
if (insertClientParamList.isEmpty()) Either.Right(Unit)
else clientRepository.storeUserClientListAndRemoveRedundantClients(insertClientParamList)
}
}

override suspend fun handleFailureAndUpdateMessageStatus(
failure: CoreFailure,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import com.wire.kalium.logic.data.conversation.MLSConversationRepository
import com.wire.kalium.logic.data.conversation.Recipient
import com.wire.kalium.logic.data.id.ConversationId
import com.wire.kalium.logic.data.id.GroupID
import com.wire.kalium.logic.data.id.MessageId
import com.wire.kalium.logic.data.message.BroadcastMessage
import com.wire.kalium.logic.data.message.BroadcastMessageOption
import com.wire.kalium.logic.data.message.BroadcastMessageTarget
Expand All @@ -44,6 +45,7 @@ import com.wire.kalium.logic.data.message.getType
import com.wire.kalium.logic.data.prekey.UsersWithoutSessions
import com.wire.kalium.logic.data.user.UserId
import com.wire.kalium.logic.data.user.UserRepository
import com.wire.kalium.logic.failure.LegalHoldEnabledForConversationFailure
import com.wire.kalium.logic.failure.ProteusSendMessageFailure
import com.wire.kalium.logic.functional.Either
import com.wire.kalium.logic.functional.flatMap
Expand All @@ -53,6 +55,7 @@ import com.wire.kalium.logic.functional.onFailure
import com.wire.kalium.logic.functional.onSuccess
import com.wire.kalium.logic.kaliumLogger
import com.wire.kalium.logic.sync.SyncManager
import com.wire.kalium.logic.sync.receiver.handler.legalhold.LegalHoldHandler
import com.wire.kalium.network.exceptions.KaliumException
import com.wire.kalium.network.exceptions.isMlsStaleMessage
import com.wire.kalium.util.DateTimeUtil
Expand Down Expand Up @@ -135,6 +138,7 @@ internal class MessageSenderImpl internal constructor(
private val mlsConversationRepository: MLSConversationRepository,
private val syncManager: SyncManager,
private val messageSendFailureHandler: MessageSendFailureHandler,
private val legalHoldHandler: LegalHoldHandler,
private val sessionEstablisher: SessionEstablisher,
private val messageEnvelopeCreator: MessageEnvelopeCreator,
private val mlsMessageCreator: MLSMessageCreator,
Expand Down Expand Up @@ -359,6 +363,8 @@ internal class MessageSenderImpl internal constructor(
failure = it,
action = "Send",
messageLogString = message.toLogString(),
messageId = message.id,
messageTimestampIso = message.date,
conversationId = message.conversationId,
remainingAttempts = remainingAttempts
) { remainingAttempts ->
Expand All @@ -385,7 +391,7 @@ internal class MessageSenderImpl internal constructor(
messageRepository
.broadcastEnvelope(envelope, option)
.fold({
handleProteusError(it, "Broadcast", message.toLogString(), null, remainingAttempts = 1) {
handleProteusError(it, "Broadcast", message.toLogString(), message.id, message.date, null, remainingAttempts = 1) {
attemptToBroadcastWithProteus(
message,
target,
Expand All @@ -401,6 +407,8 @@ internal class MessageSenderImpl internal constructor(
failure: CoreFailure,
action: String, // Send or Broadcast
messageLogString: String,
messageId: MessageId,
messageTimestampIso: String,
conversationId: ConversationId?,
remainingAttempts: Int,
retry: suspend (remainingAttempts: Int) -> Either<CoreFailure, String>
Expand All @@ -410,21 +418,33 @@ internal class MessageSenderImpl internal constructor(
logger.w(
"Proteus $action Failure: { \"message\" : \"${messageLogString}\", \"errorInfo\" : \"${failure}\" }"
)
messageSendFailureHandler
.handleClientsHaveChangedFailure(failure, conversationId)
.flatMap {
if (remainingAttempts > 0) {
logger.w(
"Retrying (remaining attempts: $remainingAttempts) after Proteus $action " +
"Failure: { \"message\" : \"${messageLogString}\"}"
)
retry(remainingAttempts - 1)
} else {
logger.e(
"No remaining attempts to retry after Proteus $action " +
"Failure: { \"message\" : \"${messageLogString}\"}"
)
Either.Left(failure)
handleLegalHoldChanges(conversationId, messageTimestampIso) {
messageSendFailureHandler
.handleClientsHaveChangedFailure(failure, conversationId)
}
.flatMap { legalHoldEnabled ->
when {
legalHoldEnabled -> {
logger.w(
"Legal hold enabled, no retry after Proteus $action " +
"Failure: { \"message\" : \"${messageLogString}\", \"errorInfo\" : \"${failure}\" }"
)
Either.Left(LegalHoldEnabledForConversationFailure(messageId))
}
remainingAttempts > 0 -> {
logger.w(
"Retrying (remaining attempts: $remainingAttempts) after Proteus $action " +
"Failure: { \"message\" : \"${messageLogString}\", \"errorInfo\" : \"${failure}\" }"
)
retry(remainingAttempts - 1)
}
else -> {
logger.e(
"No remaining attempts to retry after Proteus $action " +
"Failure: { \"message\" : \"${messageLogString}\", \"errorInfo\" : \"${failure}\" }"
)
Either.Left(failure)
}
}
}
.onFailure {
Expand All @@ -443,6 +463,14 @@ internal class MessageSenderImpl internal constructor(
}
}

private suspend fun handleLegalHoldChanges(
conversationId: ConversationId?,
messageTimestampIso: String,
handleClientsHaveChangedFailure: suspend () -> Either<CoreFailure, Unit>
) =
if (conversationId == null) handleClientsHaveChangedFailure().map { false }
else legalHoldHandler.handleMessageSendFailure(conversationId, messageTimestampIso, handleClientsHaveChangedFailure)

private fun getBroadcastParams(
selfUserId: UserId,
selfClientId: ClientId,
Expand Down
Loading

0 comments on commit 19491b1

Please sign in to comment.