Skip to content

Commit

Permalink
Error handling refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
neekolas committed Nov 18, 2024
1 parent e9489ae commit b12ac5c
Show file tree
Hide file tree
Showing 7 changed files with 219 additions and 160 deletions.
95 changes: 4 additions & 91 deletions xmtp_mls/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::{
collections::HashMap,
mem::Discriminant,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
Expand All @@ -9,14 +8,11 @@ use std::{

use futures::stream::{self, FuturesUnordered, StreamExt};
use openmls::{
credentials::errors::BasicCredentialError,
framing::{MlsMessageBodyIn, MlsMessageIn},
group::GroupEpoch,
messages::Welcome,
prelude::tls_codec::{Deserialize, Error as TlsCodecError},
};
use openmls_traits::OpenMlsProvider;
use prost::EncodeError;
use thiserror::Error;
use tokio::sync::broadcast;

Expand All @@ -35,26 +31,23 @@ use xmtp_proto::xmtp::mls::api::v1::{
GroupMessage, WelcomeMessage,
};

use crate::storage::wallet_addresses::WalletEntry;
use crate::{
api::ApiClientWrapper,
groups::{
group_permissions::PolicySet, validated_commit::CommitValidationError, GroupError,
GroupMetadataOptions, IntentError, MlsGroup,
},
groups::{group_permissions::PolicySet, GroupError, GroupMetadataOptions, MlsGroup},
identity::{parse_credential, Identity, IdentityError},
identity_updates::{load_identity_updates, IdentityUpdateError},
intents::Intents,
mutex_registry::MutexRegistry,
retry::Retry,
retry_async, retryable,
storage::wallet_addresses::WalletEntry,
storage::{
consent_record::{ConsentState, ConsentType, StoredConsentRecord},
db_connection::DbConnection,
group::{GroupMembershipState, GroupQueryArgs, StoredGroup},
group_message::StoredGroupMessage,
refresh_state::EntityKind,
sql_key_store, EncryptedMessageStore, StorageError,
EncryptedMessageStore, StorageError,
},
subscriptions::LocalEvents,
verified_key_package_v2::{KeyPackageVerificationError, VerifiedKeyPackageV2},
Expand Down Expand Up @@ -91,8 +84,6 @@ pub enum ClientError {
TlsError(#[from] TlsCodecError),
#[error("key package verification: {0}")]
KeyPackageVerification(#[from] KeyPackageVerificationError),
#[error("syncing errors: {0:?}")]
SyncingError(Vec<MessageProcessingError>),
#[error("Stream inconsistency error: {0}")]
StreamInconsistency(String),
#[error("Association error: {0}")]
Expand Down Expand Up @@ -129,82 +120,6 @@ impl crate::retry::RetryableError for ClientError {
}
}

/// Errors that can occur when reading and processing a message off the network
#[derive(Debug, Error)]
pub enum MessageProcessingError {
#[error("[{0}] already processed")]
AlreadyProcessed(u64),
#[error("diesel error: {0}")]
Diesel(#[from] diesel::result::Error),
#[error("[{message_time_ns:?}] invalid sender with credential: {credential:?}")]
InvalidSender {
message_time_ns: u64,
credential: Vec<u8>,
},
#[error("invalid payload")]
InvalidPayload,
#[error(transparent)]
Identity(#[from] IdentityError),
#[error("openmls process message error: {0}")]
OpenMlsProcessMessage(#[from] openmls::prelude::ProcessMessageError),
#[error("merge staged commit: {0}")]
MergeStagedCommit(#[from] openmls::group::MergeCommitError<sql_key_store::SqlKeyStoreError>),
#[error(
"no pending commit to merge. group epoch is {group_epoch:?} and got {message_epoch:?}"
)]
NoPendingCommit {
message_epoch: GroupEpoch,
group_epoch: GroupEpoch,
},
#[error("intent error: {0}")]
Intent(#[from] IntentError),
#[error("storage error: {0}")]
Storage(#[from] crate::storage::StorageError),
#[error("TLS Codec error: {0}")]
TlsError(#[from] TlsCodecError),
#[error("unsupported message type: {0:?}")]
UnsupportedMessageType(Discriminant<MlsMessageBodyIn>),
#[error("commit validation")]
CommitValidation(#[from] CommitValidationError),
#[error("codec")]
Codec(#[from] crate::codecs::CodecError),
#[error("encode proto: {0}")]
EncodeProto(#[from] EncodeError),
#[error("epoch increment not allowed")]
EpochIncrementNotAllowed,
#[error("Welcome processing error: {0}")]
WelcomeProcessing(Box<GroupError>),
#[error("wrong credential type")]
WrongCredentialType(#[from] BasicCredentialError),
#[error("proto decode error: {0}")]
DecodeError(#[from] prost::DecodeError),
#[error("clear pending commit error: {0}")]
ClearPendingCommit(#[from] sql_key_store::SqlKeyStoreError),
#[error(transparent)]
Group(#[from] Box<GroupError>),
#[error("Serialization/Deserialization Error {0}")]
Serde(#[from] serde_json::Error),
#[error("generic:{0}")]
Generic(String),
#[error("intent is missing staged_commit field")]
IntentMissingStagedCommit,
}

impl crate::retry::RetryableError for MessageProcessingError {
fn is_retryable(&self) -> bool {
match self {
Self::Group(group_error) => retryable!(group_error),
Self::Identity(identity_error) => retryable!(identity_error),
Self::OpenMlsProcessMessage(err) => retryable!(err),
Self::MergeStagedCommit(err) => retryable!(err),
Self::Diesel(diesel_error) => retryable!(diesel_error),
Self::Storage(s) => retryable!(s),
Self::Generic(err) => err.contains("database is locked"),
_ => false,
}
}
}

impl From<String> for ClientError {
fn from(value: String) -> Self {
Self::Generic(value)
Expand Down Expand Up @@ -843,9 +758,7 @@ where
tracing::error!("failed to create group from welcome: {}", err);
}

Err(MessageProcessingError::WelcomeProcessing(
Box::new(err)
))
Err(err)
}
}
},
Expand Down
Loading

0 comments on commit b12ac5c

Please sign in to comment.