diff --git a/src/lib.rs b/src/lib.rs index acbd3c4..36765ac 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -16,6 +16,7 @@ use std::time::Duration; pub use futures_util; pub use url::{self, Url}; +pub mod message; #[cfg(not(target_arch = "wasm32"))] pub mod native; pub mod prelude; @@ -23,11 +24,12 @@ mod socket; #[cfg(target_arch = "wasm32")] pub mod wasm; +pub use self::message::Message; #[cfg(not(target_arch = "wasm32"))] -pub use self::native::{Error, Message as WsMessage}; +pub use self::native::Error; pub use self::socket::WebSocket; #[cfg(target_arch = "wasm32")] -pub use self::wasm::{Error, WsMessage}; +pub use self::wasm::Error; #[derive(Debug, Clone, Default, PartialEq, Eq, PartialOrd, Ord, Hash)] pub enum ConnectionMode { diff --git a/src/message.rs b/src/message.rs new file mode 100644 index 0000000..d77cee7 --- /dev/null +++ b/src/message.rs @@ -0,0 +1,121 @@ +// Copyright (c) 2022-2024 Yuki Kishimoto +// Distributed under the MIT software license + +#[cfg(not(target_arch = "wasm32"))] +use std::borrow::Cow; + +#[cfg(not(target_arch = "wasm32"))] +use tokio_tungstenite::tungstenite::protocol::frame::coding::CloseCode; +#[cfg(not(target_arch = "wasm32"))] +use tokio_tungstenite::tungstenite::protocol::CloseFrame as TungsteniteCloseFrame; +#[cfg(not(target_arch = "wasm32"))] +use tokio_tungstenite::tungstenite::protocol::Message as TungsteniteMessage; + +#[cfg(not(target_arch = "wasm32"))] +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub struct CloseFrame { + /// The reason as a code. + pub code: u16, + /// The reason as text string. + pub reason: String, +} + +/// An enum representing the various forms of a WebSocket message. +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub enum Message { + /// A text WebSocket message + Text(String), + /// A binary WebSocket message + Binary(Vec), + /// A ping message with the specified payload + /// + /// The payload here must have a length less than 125 bytes + #[cfg(not(target_arch = "wasm32"))] + Ping(Vec), + /// A pong message with the specified payload + /// + /// The payload here must have a length less than 125 bytes + #[cfg(not(target_arch = "wasm32"))] + Pong(Vec), + /// A close message with the optional close frame. + #[cfg(not(target_arch = "wasm32"))] + Close(Option), +} + +// impl Message { +// /// Get the length of the WebSocket message. +// #[inline] +// pub fn len(&self) -> usize { +// match self { +// Self::Text(string) => string.len(), +// Self::Binary(data) => data.len(), +// Self::Ping(data) => data.len(), +// Self::Pong(data) => data.len(), +// Self::Close(frame) => frame.map(|f| f.reason.len()).unwrap_or(0), +// } +// } +// +// /// Returns true if the WebSocket message has no content. +// /// For example, if the other side of the connection sent an empty string. +// #[inline] +// pub fn is_empty(&self) -> bool { +// self.len() == 0 +// } +// +// /// Consume the message and return it as binary data. +// pub fn into_data(self) -> Vec { +// match self { +// Self::Text(string) => string.into_bytes(), +// Self::Binary(data) => data, +// } +// } +// } + +#[cfg(not(target_arch = "wasm32"))] +impl From for TungsteniteCloseFrame<'_> { + fn from(frame: CloseFrame) -> Self { + Self { + code: CloseCode::from(frame.code), + reason: Cow::Owned(frame.reason), + } + } +} + +#[cfg(not(target_arch = "wasm32"))] +impl From for TungsteniteMessage { + fn from(msg: Message) -> Self { + match msg { + Message::Text(text) => Self::Text(text), + Message::Binary(data) => Self::Binary(data), + Message::Ping(data) => Self::Ping(data), + Message::Pong(data) => Self::Pong(data), + Message::Close(frame) => Self::Close(frame.map(|f| f.into())), + } + } +} + +#[cfg(not(target_arch = "wasm32"))] +impl From> for CloseFrame { + fn from(frame: TungsteniteCloseFrame<'_>) -> Self { + Self { + code: frame.code.into(), + reason: frame.reason.into_owned(), + } + } +} + +#[cfg(not(target_arch = "wasm32"))] +impl From for Message { + fn from(msg: TungsteniteMessage) -> Self { + match msg { + TungsteniteMessage::Text(text) => Self::Text(text), + TungsteniteMessage::Binary(data) => Self::Binary(data), + TungsteniteMessage::Ping(data) => Self::Ping(data), + TungsteniteMessage::Pong(data) => Self::Pong(data), + TungsteniteMessage::Close(frame) => Self::Close(frame.map(|f| f.into())), + // SAFETY: from tungstenite docs: "you're not going to get this value while reading the message". + // SAFETY: this conversion is used only in Stream trait, so when reading the messages. + TungsteniteMessage::Frame(..) => unreachable!(), + } + } +} diff --git a/src/native/mod.rs b/src/native/mod.rs index ece4938..c073310 100644 --- a/src/native/mod.rs +++ b/src/native/mod.rs @@ -16,7 +16,6 @@ use tokio::io::{AsyncRead, AsyncWrite}; use tokio::net::TcpStream; use tokio::time; use tokio_tungstenite::tungstenite::protocol::Role; -pub use tokio_tungstenite::tungstenite::Message; pub use tokio_tungstenite::WebSocketStream; use url::Url; diff --git a/src/socket.rs b/src/socket.rs index 64ecdb2..d932c7d 100644 --- a/src/socket.rs +++ b/src/socket.rs @@ -15,7 +15,7 @@ use tokio_tungstenite::{MaybeTlsStream, WebSocketStream}; #[cfg(target_arch = "wasm32")] use crate::wasm::WsStream; -use crate::{Error, WsMessage}; +use crate::{Error, Message}; #[cfg(not(target_arch = "wasm32"))] type WsStream = WebSocketStream>; @@ -29,7 +29,7 @@ pub enum WebSocket { Wasm(WsStream), } -impl Sink for WebSocket { +impl Sink for WebSocket { type Error = Error; fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { @@ -43,12 +43,12 @@ impl Sink for WebSocket { } } - fn start_send(mut self: Pin<&mut Self>, item: WsMessage) -> Result<(), Self::Error> { + fn start_send(mut self: Pin<&mut Self>, item: Message) -> Result<(), Self::Error> { match self.deref_mut() { #[cfg(not(target_arch = "wasm32"))] - Self::Tokio(s) => Pin::new(s).start_send(item).map_err(Into::into), + Self::Tokio(s) => Pin::new(s).start_send(item.into()).map_err(Into::into), #[cfg(all(feature = "tor", not(target_arch = "wasm32")))] - Self::Tor(s) => Pin::new(s).start_send(item).map_err(Into::into), + Self::Tor(s) => Pin::new(s).start_send(item.into()).map_err(Into::into), #[cfg(target_arch = "wasm32")] Self::Wasm(s) => Pin::new(s).start_send(item).map_err(Into::into), } @@ -78,14 +78,20 @@ impl Sink for WebSocket { } impl Stream for WebSocket { - type Item = Result; + type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { match self.deref_mut() { #[cfg(not(target_arch = "wasm32"))] - Self::Tokio(s) => Pin::new(s).poll_next(cx).map_err(Into::into), + Self::Tokio(s) => Pin::new(s) + .poll_next(cx) + .map(|i| i.map(|res| res.map(|msg| msg.into()))) + .map_err(Into::into), #[cfg(all(feature = "tor", not(target_arch = "wasm32")))] - Self::Tor(s) => Pin::new(s).poll_next(cx).map_err(Into::into), + Self::Tor(s) => Pin::new(s) + .poll_next(cx) + .map(|i| i.map(|res| res.map(|msg| msg.into()))) + .map_err(Into::into), #[cfg(target_arch = "wasm32")] Self::Wasm(s) => Pin::new(s).poll_next(cx).map_err(Into::into), } diff --git a/src/wasm/message.rs b/src/wasm/message.rs index f08709b..19eba9a 100644 --- a/src/wasm/message.rs +++ b/src/wasm/message.rs @@ -2,69 +2,24 @@ // Copyright (c) 2023-2024 Yuki Kishimoto // Distributed under the MIT software license -use core::{fmt, str}; - use js_sys::{ArrayBuffer, Uint8Array}; use wasm_bindgen::JsCast; use web_sys::{Blob, MessageEvent}; +use crate::message::Message; use crate::wasm::Error; -/// Represents a WebSocket Message, after converting from JavaScript type. -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub enum WsMessage { - /// The data of the message is a string. - Text(String), - /// The message contains binary data. - Binary(Vec), -} - -impl WsMessage { - /// Get the length of the WebSocket message. - #[inline] - pub fn len(&self) -> usize { - match self { - Self::Text(string) => string.len(), - Self::Binary(data) => data.len(), - } - } - - /// Returns true if the WebSocket message has no content. - /// For example, if the other side of the connection sent an empty string. - #[inline] - pub fn is_empty(&self) -> bool { - self.len() == 0 - } - - /// Consume the message and return it as binary data. - pub fn into_data(self) -> Vec { - match self { - Self::Text(string) => string.into_bytes(), - Self::Binary(data) => data, - } - } - - /// Attempt to get a &str from the WebSocket message, - /// this will try to convert binary data to utf8. - pub fn to_text(&self) -> Result<&str, Error> { - match self { - Self::Text(string) => Ok(string), - Self::Binary(data) => Ok(str::from_utf8(data)?), - } - } -} - /// This will convert the JavaScript event into a WsMessage. Note that this /// will only work if the connection is set to use the binary type ArrayBuffer. /// On binary type Blob, this will panic. -impl TryFrom for WsMessage { +impl TryFrom for Message { type Error = Error; fn try_from(evt: MessageEvent) -> Result { match evt.data() { - d if d.is_instance_of::() => Ok(WsMessage::Binary( - Uint8Array::new(d.unchecked_ref()).to_vec(), - )), + d if d.is_instance_of::() => { + Ok(Message::Binary(Uint8Array::new(d.unchecked_ref()).to_vec())) + } // We don't allow invalid encodings. In principle if needed, // we could add a variant to WsMessage with a CString or an OsString @@ -74,7 +29,7 @@ impl TryFrom for WsMessage { // idea to begin with. If you need data that is not a valid string, use a binary // message. d if d.is_string() => match d.as_string() { - Some(text) => Ok(WsMessage::Text(text)), + Some(text) => Ok(Message::Text(text)), None => Err(Error::InvalidEncoding), }, @@ -87,34 +42,3 @@ impl TryFrom for WsMessage { } } } - -impl From> for WsMessage { - fn from(vec: Vec) -> Self { - WsMessage::Binary(vec) - } -} - -impl From for WsMessage { - fn from(s: String) -> Self { - WsMessage::Text(s) - } -} - -impl AsRef<[u8]> for WsMessage { - fn as_ref(&self) -> &[u8] { - match self { - Self::Text(string) => string.as_ref(), - Self::Binary(data) => data.as_ref(), - } - } -} - -impl fmt::Display for WsMessage { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - if let Ok(string) = self.to_text() { - write!(f, "{string}") - } else { - write!(f, "Binary Data", self.len()) - } - } -} diff --git a/src/wasm/mod.rs b/src/wasm/mod.rs index b71d20d..682ee9a 100644 --- a/src/wasm/mod.rs +++ b/src/wasm/mod.rs @@ -21,7 +21,6 @@ mod stream; pub use self::error::Error; use self::event::{CloseEvent, WsEvent}; -pub use self::message::WsMessage; use self::pharos::SharedPharos; use self::socket::WebSocket as WasmWebSocket; use self::state::WsState; diff --git a/src/wasm/stream/mod.rs b/src/wasm/stream.rs similarity index 95% rename from src/wasm/stream/mod.rs rename to src/wasm/stream.rs index a61c304..47fba2e 100644 --- a/src/wasm/stream/mod.rs +++ b/src/wasm/stream.rs @@ -17,12 +17,11 @@ use wasm_bindgen::closure::Closure; use wasm_bindgen::JsCast; use web_sys::{CloseEvent as JsCloseEvt, WebSocket, *}; -pub mod io; - +use crate::message::Message; use crate::wasm::pharos::{Filter, Observable, SharedPharos}; -use crate::wasm::{notify, Error, WsEvent, WsMessage, WsState}; +use crate::wasm::{notify, Error, WsEvent, WsState}; -/// A futures 0.3 Sink/Stream of [WsMessage]. Created with [WsMeta::connect](crate::WsMeta::connect). +/// A futures 0.3 Sink/Stream of [Message]. Created with [WsMeta::connect](crate::WsMeta::connect). /// /// ## Closing the connection /// @@ -45,7 +44,7 @@ pub struct WsStream { ws: Arc, // The queue of received messages - queue: Arc>>, + queue: Arc>>, // Last waker of task that wants to read incoming messages to be woken up on a new message waker: Arc>>, @@ -86,7 +85,7 @@ impl WsStream { // Send the incoming ws messages to the WsMeta object #[allow(trivial_casts)] let on_msg = Closure::wrap(Box::new(move |msg_evt: MessageEvent| { - match WsMessage::try_from(msg_evt) { + match Message::try_from(msg_evt) { Ok(msg) => q2.borrow_mut().push_back(msg), Err(err) => notify(ph2.clone(), WsEvent::WsErr(err)), } @@ -198,7 +197,7 @@ impl Drop for WsStream { } impl Stream for WsStream { - type Item = Result; + type Item = Result; // Using `Result` to keep same format of `tungstenite` code @@ -222,7 +221,7 @@ impl Stream for WsStream { } } -impl Sink for WsStream { +impl Sink for WsStream { type Error = Error; // Web API does not really seem to let us check for readiness, other than the connection state. @@ -238,7 +237,7 @@ impl Sink for WsStream { } } - fn start_send(self: Pin<&mut Self>, item: WsMessage) -> Result<(), Self::Error> { + fn start_send(self: Pin<&mut Self>, item: Message) -> Result<(), Self::Error> { match self.ready_state()? { WsState::Open => { // The send method can return 2 errors: @@ -248,11 +247,11 @@ impl Sink for WsStream { // So if this returns an error, we will return ConnectionNotOpen. In principle, // we just checked that it's open, but this guarantees correctness. match item { - WsMessage::Binary(d) => self + Message::Binary(d) => self .ws .send_with_u8_array(&d) .map_err(|_| Error::ConnectionNotOpen)?, - WsMessage::Text(s) => self + Message::Text(s) => self .ws .send_with_str(&s) .map_err(|_| Error::ConnectionNotOpen)?, diff --git a/src/wasm/stream/io.rs b/src/wasm/stream/io.rs deleted file mode 100644 index 4d5d4ae..0000000 --- a/src/wasm/stream/io.rs +++ /dev/null @@ -1,75 +0,0 @@ -// Copyright (c) 2019-2022 Naja Melan -// Copyright (c) 2023-2024 Yuki Kishimoto -// Distributed under the MIT software license - -use std::io::{self, ErrorKind}; -use std::pin::Pin; -use std::task::{Context, Poll}; - -use futures::{Sink, Stream}; - -use crate::wasm::{Error, WsStream}; - -/// A wrapper around WsStream that converts errors into io::Error so that it can be -/// used for io (like `AsyncRead`/`AsyncWrite`). -/// -/// You shouldn't need to use this manually. It is passed to [`IoStream`] when calling -/// [`WsStream::into_io`]. -#[derive(Debug)] -pub struct WsStreamIo { - inner: WsStream, -} - -impl Stream for WsStreamIo { - type Item = Result, io::Error>; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.inner).poll_next(cx).map(|opt| { - opt.map(|msg| { - msg.map(|m| m.into_data()) - .map_err(|e| io::Error::new(ErrorKind::Other, e)) - }) - }) - } -} - -impl Sink> for WsStreamIo { - type Error = io::Error; - - fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.inner) - .poll_ready(cx) - .map(convert_res_tuple) - } - - fn start_send(mut self: Pin<&mut Self>, item: Vec) -> Result<(), Self::Error> { - Pin::new(&mut self.inner) - .start_send(item.into()) - .map_err(convert_err) - } - - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.inner) - .poll_flush(cx) - .map(convert_res_tuple) - } - - fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.inner) - .poll_close(cx) - .map(convert_res_tuple) - } -} - -#[inline] -fn convert_res_tuple(res: Result<(), Error>) -> Result<(), io::Error> { - res.map_err(convert_err) -} - -fn convert_err(err: Error) -> io::Error { - match err { - Error::ConnectionNotOpen => io::Error::from(ErrorKind::NotConnected), - // This shouldn't happen, so panic for early detection. - _ => io::Error::from(ErrorKind::Other), - } -}