Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
nesium committed Dec 8, 2023
1 parent 49108a3 commit 9a61e03
Show file tree
Hide file tree
Showing 18 changed files with 153 additions and 136 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::app::deps::DynRoomFactory;
use crate::app::event_handlers::ClientEventDispatcherTrait;
use crate::client::ClientInner;
use crate::domain::rooms::models::RoomInternals;
use crate::domain::shared::models::RoomType;
use crate::{ClientDelegate, ClientEvent, ClientRoomEventType};

pub struct ClientEventDispatcher {
Expand Down Expand Up @@ -60,6 +61,11 @@ impl ClientEventDispatcherTrait for ClientEventDispatcher {
}

fn dispatch_room_event(&self, room: Arc<RoomInternals>, event: ClientRoomEventType) {
// We're not sending events for rooms that are still pending…
if room.r#type == RoomType::Pending {
return;
}

let Some(ref delegate) = self.delegate else {
return;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,32 +7,51 @@ use std::sync::OnceLock;

use tracing::error;

use crate::app::event_handlers::{XMPPEvent, XMPPEventHandler};
use prose_xmpp::Event as XMPPEvent;

pub struct XMPPEventHandlerQueue {
handlers: OnceLock<Vec<Box<dyn XMPPEventHandler>>>,
use crate::app::event_handlers::{ServerEvent, ServerEventHandler};
use crate::infra::xmpp::event_parser::parse_xmpp_event;

pub struct ServerEventHandlerQueue {
handlers: OnceLock<Vec<Box<dyn ServerEventHandler>>>,
}

impl XMPPEventHandlerQueue {
impl ServerEventHandlerQueue {
pub fn new() -> Self {
Self {
handlers: Default::default(),
}
}

pub fn set_handlers(&self, handlers: Vec<Box<dyn XMPPEventHandler>>) {
pub fn set_handlers(&self, handlers: Vec<Box<dyn ServerEventHandler>>) {
self.handlers
.set(handlers)
.map_err(|_| ())
.expect("Tried to applied handlers XMPPEventHandlerQueue more than once");
.expect("Tried to applied handlers ServerEventHandlerQueue more than once");
}

pub async fn handle_event(&self, event: XMPPEvent) {
let events = match parse_xmpp_event(event) {
Ok(event) => event,
Err(err) => {
error!("Failed to parse XMPP event. Reason: {}", err.to_string());
return;
}
};

for event in events {
self.handle_server_event(event).await
}
}
}

impl ServerEventHandlerQueue {
async fn handle_server_event(&self, event: ServerEvent) {
let mut event = event;
let handlers = self
.handlers
.get()
.expect("Handlers were not set in XMPPEventHandlerQueue");
.expect("Handlers were not set in ServerEventHandlerQueue");

for handler in handlers.iter() {
match handler.handle_event(event).await {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,14 @@ use tracing::{debug, error};
use xmpp_parsers::message::MessageType;

use prose_proc_macros::InjectDependencies;
use prose_xmpp::mods::chat;
use prose_xmpp::mods::chat::Carbon;
use prose_xmpp::stanza::Message;
use prose_xmpp::Event;

use crate::app::deps::{
DynClientEventDispatcher, DynConnectedRoomsReadOnlyRepository, DynMessagesRepository,
DynMessagingService, DynSidebarDomainService, DynTimeProvider,
};
use crate::app::event_handlers::{XMPPEvent, XMPPEventHandler};
use crate::app::event_handlers::{MessageEvent, MessageEventType, ServerEvent, ServerEventHandler};
use crate::domain::messaging::models::{MessageLike, MessageLikeError, TimestampedMessage};
use crate::domain::rooms::services::CreateOrEnterRoomRequest;
use crate::domain::shared::models::{RoomId, UserId};
Expand All @@ -42,32 +40,19 @@ pub struct MessagesEventHandler {

#[cfg_attr(target_arch = "wasm32", async_trait(? Send))]
#[async_trait]
impl XMPPEventHandler for MessagesEventHandler {
impl ServerEventHandler for MessagesEventHandler {
fn name(&self) -> &'static str {
"messages"
}

async fn handle_event(&self, event: XMPPEvent) -> Result<Option<XMPPEvent>> {
async fn handle_event(&self, event: ServerEvent) -> Result<Option<ServerEvent>> {
match event {
Event::Chat(event) => match event {
chat::Event::Message(message) => {
self.handle_received_message(ReceivedMessage::Message(message))
.await?;
Ok(None)
}
chat::Event::Carbon(carbon) => {
self.handle_received_message(ReceivedMessage::Carbon(carbon))
.await?;
Ok(None)
}
chat::Event::Sent(message) => {
self.handle_sent_message(message).await?;
Ok(None)
}
_ => Ok(Some(Event::Chat(event))),
},
_ => Ok(Some(event)),
ServerEvent::Message(event) => {
self.handle_message_event(event).await?;
}
_ => return Ok(Some(event)),
}
Ok(None)
}
}

Expand Down Expand Up @@ -115,6 +100,21 @@ impl ReceivedMessage {
}

impl MessagesEventHandler {
async fn handle_message_event(&self, event: MessageEvent) -> Result<()> {
match event.r#type {
MessageEventType::Received(message) => {
self.handle_received_message(ReceivedMessage::Message(message))
.await?
}
MessageEventType::Sync(carbon) => {
self.handle_received_message(ReceivedMessage::Carbon(carbon))
.await?
}
MessageEventType::Sent(message) => self.handle_sent_message(message).await?,
}
Ok(())
}

async fn handle_received_message(&self, message: ReceivedMessage) -> Result<()> {
let Some(from) = message.sender() else {
error!("Received message from unknown sender.");
Expand Down Expand Up @@ -152,12 +152,10 @@ impl MessagesEventHandler {
let now = self.time_provider.now();

let parsed_message: Result<MessageLike> = match message {
ReceivedMessage::Message(message) => {
MessageLike::try_from(TimestampedMessage {
message,
timestamp: now.into(),
})
}
ReceivedMessage::Message(message) => MessageLike::try_from(TimestampedMessage {
message,
timestamp: now.into(),
}),
ReceivedMessage::Carbon(carbon) => MessageLike::try_from(TimestampedMessage {
message: carbon,
timestamp: now.into(),
Expand Down
14 changes: 3 additions & 11 deletions crates/prose-core-client/src/app/event_handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use async_trait::async_trait;
pub use bookmarks_event_handler::BookmarksEventHandler;
pub use client_event_dispatcher::ClientEventDispatcher;
pub use connection_event_handler::ConnectionEventHandler;
pub use event_handler_queue::XMPPEventHandlerQueue;
pub use event_handler_queue::ServerEventHandlerQueue;
pub use messages_event_handler::MessagesEventHandler;
use prose_wasm_utils::{SendUnlessWasm, SyncUnlessWasm};
pub use prose_xmpp::Event as XMPPEvent;
Expand All @@ -31,22 +31,14 @@ mod messages_event_handler;
mod requests_event_handler;
mod rooms_event_handler;
mod server_event;
mod server_event_handler_wrapper;
mod user_state_event_handler;

/// `XMPPEventHandler` is a trait representing a handler for XMPP events.
/// `ServerEventHandler` is a trait representing a handler for XMPP events.
///
/// Implementors of this trait should provide a `handle_event` method, which takes an `XMPPEvent`
/// and returns an `Option<XMPPEvent>`. If the handler returns `None`, it means the event has been
/// and returns an `Option<ServerEvent>`. If the handler returns `None`, it means the event has been
/// consumed and no further processing should be done. If it returns `Some(event)`, the event is
/// not consumed and should be passed to the next handler.
#[cfg_attr(target_arch = "wasm32", async_trait(? Send))]
#[async_trait]
pub trait XMPPEventHandler: SendUnlessWasm + SyncUnlessWasm {
fn name(&self) -> &'static str;
async fn handle_event(&self, event: XMPPEvent) -> Result<Option<XMPPEvent>>;
}

#[cfg_attr(target_arch = "wasm32", async_trait(? Send))]
#[async_trait]
pub trait ServerEventHandler: SendUnlessWasm + SyncUnlessWasm {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub enum ServerEvent {
Occupant(OccupantEvent),
/// Events about requests that are directed at us.
Request(RequestEvent),
// TODO…
/// Events about received messages.
Message(MessageEvent),
/// Events about changes to the sidebar.
SidebarBookmark(SidebarBookmarkEvent),
Expand Down Expand Up @@ -156,8 +156,9 @@ pub struct MessageEvent {

#[derive(Debug, Clone, PartialEq)]
pub enum MessageEventType {
Received, // Regular messages
Sync, // Carbons
Received(prose_xmpp::stanza::Message),
Sync(prose_xmpp::mods::chat::Carbon),
Sent(prose_xmpp::stanza::Message),
}

#[derive(Debug, Clone, PartialEq)]
Expand Down

This file was deleted.

17 changes: 7 additions & 10 deletions crates/prose-core-client/src/client_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use prose_xmpp::{ns, IDProvider, SystemTimeProvider, TimeProvider, UUIDProvider}
use crate::app::deps::{AppContext, AppDependencies};
use crate::app::event_handlers::{
BookmarksEventHandler, ClientEventDispatcher, ConnectionEventHandler, MessagesEventHandler,
RequestsEventHandler, RoomsEventHandler, UserStateEventHandler, XMPPEventHandlerQueue,
RequestsEventHandler, RoomsEventHandler, ServerEventHandlerQueue, UserStateEventHandler,
};
use crate::app::services::{
AccountService, ConnectionService, ContactsService, RoomsService, UserDataService,
Expand Down Expand Up @@ -154,7 +154,7 @@ impl<A: AvatarCache + 'static> ClientBuilder<Store<PlatformDriver>, A> {
],
);

let handler_queue = Arc::new(XMPPEventHandlerQueue::new());
let handler_queue = Arc::new(ServerEventHandlerQueue::new());

let xmpp_client = Arc::new(
{
Expand All @@ -181,16 +181,13 @@ impl<A: AvatarCache + 'static> ClientBuilder<Store<PlatformDriver>, A> {
}
.into();

// TODO: Fixme!
todo!("FIXME");

handler_queue.set_handlers(vec![
//Box::new(ConnectionEventHandler::from(&dependencies)),
//Box::new(RequestsEventHandler::from(&dependencies)),
//Box::new(UserStateEventHandler::from(&dependencies)),
Box::new(ConnectionEventHandler::from(&dependencies)),
Box::new(RequestsEventHandler::from(&dependencies)),
Box::new(UserStateEventHandler::from(&dependencies)),
Box::new(MessagesEventHandler::from(&dependencies)),
//Box::new(RoomsEventHandler::from(&dependencies)),
//Box::new(BookmarksEventHandler::from(&dependencies)),
Box::new(RoomsEventHandler::from(&dependencies)),
Box::new(BookmarksEventHandler::from(&dependencies)),
]);

let client_inner = Arc::new(ClientInner {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,8 @@ impl RoomsDomainServiceTrait for RoomsDomainService {
.reconfigure_room(room_jid, spec, new_name)
.await?;

// TODO: Make public channels also members-only so that the member list translates to the private channel

let Some(room) = self.connected_rooms_repo.update(room_jid, {
Box::new(|room| room.by_changing_type(RoomType::PrivateChannel))
}) else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ impl RoomFactory {
let inner = Arc::new((self.builder)(room));

match room_type {
RoomType::Pending => panic!("Cannot convert pending room to RoomEnvelope"),
RoomType::Pending => {
panic!("Cannot convert pending room to RoomEnvelope")
}
RoomType::DirectMessage => RoomEnvelope::DirectMessage(inner.into()),
RoomType::Group => RoomEnvelope::Group(inner.into()),
RoomType::PrivateChannel => RoomEnvelope::PrivateChannel(inner.into()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,27 @@ use prose_xmpp::ns;
use prose_xmpp::stanza::muc::MucUser;
use prose_xmpp::stanza::Message;

use crate::app::event_handlers::{RoomEvent, RoomEventType};
use crate::app::event_handlers::{MessageEvent, MessageEventType, RoomEvent, RoomEventType};
use crate::dtos::RoomId;
use crate::infra::xmpp::type_conversions::event_parser::{
ignore_stanza, missing_attribute, Context,
};
use crate::infra::xmpp::event_parser::{ignore_stanza, missing_attribute, Context};

pub fn parse_message(ctx: &mut Context, message: Message) -> Result<()> {
let Some(from) = message.from.clone() else {
return missing_attribute(ctx, "from", message);
};

// Ignore messages that contain invites…
// TODO: Handle this in the XMPP lib
if message.direct_invite().is_some() || message.mediated_invite().is_some() {
return Ok(());
}

// Ignore messages that contain a chat state but no body…
// TODO: Handle this in the XMPP lib
if message.chat_state().is_some() && message.body().is_none() {
return Ok(());
}

match message.type_ {
MessageType::Groupchat => parse_group_chat_message(ctx, from, message)?,
MessageType::Chat | MessageType::Normal => parse_chat_message(ctx, from, message)?,
Expand Down Expand Up @@ -55,7 +65,8 @@ fn parse_group_chat_message(ctx: &mut Context, from: Jid, message: Message) -> R
ctx.push_event(RoomEvent {
room_id: from.clone(),
r#type: RoomEventType::RoomConfigChanged,
})
});
return Ok(());
}
}

Expand All @@ -69,9 +80,16 @@ fn parse_group_chat_message(ctx: &mut Context, from: Jid, message: Message) -> R
return Ok(());
}

ctx.push_event(MessageEvent {
r#type: MessageEventType::Received(message),
});

Ok(())
}

fn parse_chat_message(_ctx: &mut Context, _from: Jid, _message: Message) -> Result<()> {
fn parse_chat_message(ctx: &mut Context, _from: Jid, message: Message) -> Result<()> {
ctx.push_event(MessageEvent {
r#type: MessageEventType::Received(message),
});
Ok(())
}
Loading

0 comments on commit 9a61e03

Please sign in to comment.