Skip to content

Commit

Permalink
Add Message enum and remove wasm::stream::io mod
Browse files Browse the repository at this point in the history
Signed-off-by: Yuki Kishimoto <[email protected]>
  • Loading branch information
yukibtc committed Jan 24, 2025
1 parent 8c630bb commit 12a8d4a
Show file tree
Hide file tree
Showing 8 changed files with 155 additions and 180 deletions.
6 changes: 4 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,20 @@ use std::time::Duration;
pub use futures_util;
pub use url::{self, Url};

pub mod message;
#[cfg(not(target_arch = "wasm32"))]
pub mod native;
pub mod prelude;
mod socket;
#[cfg(target_arch = "wasm32")]
pub mod wasm;

pub use self::message::Message;
#[cfg(not(target_arch = "wasm32"))]
pub use self::native::{Error, Message as WsMessage};
pub use self::native::Error;
pub use self::socket::WebSocket;
#[cfg(target_arch = "wasm32")]
pub use self::wasm::{Error, WsMessage};
pub use self::wasm::Error;

#[derive(Debug, Clone, Default, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum ConnectionMode {
Expand Down
121 changes: 121 additions & 0 deletions src/message.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// Copyright (c) 2022-2024 Yuki Kishimoto
// Distributed under the MIT software license

#[cfg(not(target_arch = "wasm32"))]
use std::borrow::Cow;

#[cfg(not(target_arch = "wasm32"))]
use tokio_tungstenite::tungstenite::protocol::frame::coding::CloseCode;
#[cfg(not(target_arch = "wasm32"))]
use tokio_tungstenite::tungstenite::protocol::CloseFrame as TungsteniteCloseFrame;
#[cfg(not(target_arch = "wasm32"))]
use tokio_tungstenite::tungstenite::protocol::Message as TungsteniteMessage;

#[cfg(not(target_arch = "wasm32"))]
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct CloseFrame {
/// The reason as a code.
pub code: u16,
/// The reason as text string.
pub reason: String,
}

/// An enum representing the various forms of a WebSocket message.
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum Message {
/// A text WebSocket message
Text(String),
/// A binary WebSocket message
Binary(Vec<u8>),
/// A ping message with the specified payload
///
/// The payload here must have a length less than 125 bytes
#[cfg(not(target_arch = "wasm32"))]
Ping(Vec<u8>),
/// A pong message with the specified payload
///
/// The payload here must have a length less than 125 bytes
#[cfg(not(target_arch = "wasm32"))]
Pong(Vec<u8>),
/// A close message with the optional close frame.
#[cfg(not(target_arch = "wasm32"))]
Close(Option<CloseFrame>),
}

// impl Message {
// /// Get the length of the WebSocket message.
// #[inline]
// pub fn len(&self) -> usize {
// match self {
// Self::Text(string) => string.len(),
// Self::Binary(data) => data.len(),
// Self::Ping(data) => data.len(),
// Self::Pong(data) => data.len(),
// Self::Close(frame) => frame.map(|f| f.reason.len()).unwrap_or(0),
// }
// }
//
// /// Returns true if the WebSocket message has no content.
// /// For example, if the other side of the connection sent an empty string.
// #[inline]
// pub fn is_empty(&self) -> bool {
// self.len() == 0
// }
//
// /// Consume the message and return it as binary data.
// pub fn into_data(self) -> Vec<u8> {
// match self {
// Self::Text(string) => string.into_bytes(),
// Self::Binary(data) => data,
// }
// }
// }

#[cfg(not(target_arch = "wasm32"))]
impl From<CloseFrame> for TungsteniteCloseFrame<'_> {
fn from(frame: CloseFrame) -> Self {
Self {
code: CloseCode::from(frame.code),
reason: Cow::Owned(frame.reason),
}
}
}

#[cfg(not(target_arch = "wasm32"))]
impl From<Message> for TungsteniteMessage {
fn from(msg: Message) -> Self {
match msg {
Message::Text(text) => Self::Text(text),
Message::Binary(data) => Self::Binary(data),
Message::Ping(data) => Self::Ping(data),
Message::Pong(data) => Self::Pong(data),
Message::Close(frame) => Self::Close(frame.map(|f| f.into())),
}
}
}

#[cfg(not(target_arch = "wasm32"))]
impl From<TungsteniteCloseFrame<'_>> for CloseFrame {
fn from(frame: TungsteniteCloseFrame<'_>) -> Self {
Self {
code: frame.code.into(),
reason: frame.reason.into_owned(),
}
}
}

#[cfg(not(target_arch = "wasm32"))]
impl From<TungsteniteMessage> for Message {
fn from(msg: TungsteniteMessage) -> Self {
match msg {
TungsteniteMessage::Text(text) => Self::Text(text),
TungsteniteMessage::Binary(data) => Self::Binary(data),
TungsteniteMessage::Ping(data) => Self::Ping(data),
TungsteniteMessage::Pong(data) => Self::Pong(data),
TungsteniteMessage::Close(frame) => Self::Close(frame.map(|f| f.into())),
// SAFETY: from tungstenite docs: "you're not going to get this value while reading the message".
// SAFETY: this conversion is used only in Stream trait, so when reading the messages.
TungsteniteMessage::Frame(..) => unreachable!(),
}
}
}
1 change: 0 additions & 1 deletion src/native/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::TcpStream;
use tokio::time;
use tokio_tungstenite::tungstenite::protocol::Role;
pub use tokio_tungstenite::tungstenite::Message;
pub use tokio_tungstenite::WebSocketStream;
use url::Url;

Expand Down
22 changes: 14 additions & 8 deletions src/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};

#[cfg(target_arch = "wasm32")]
use crate::wasm::WsStream;
use crate::{Error, WsMessage};
use crate::{Error, Message};

#[cfg(not(target_arch = "wasm32"))]
type WsStream<T> = WebSocketStream<MaybeTlsStream<T>>;
Expand All @@ -29,7 +29,7 @@ pub enum WebSocket {
Wasm(WsStream),
}

impl Sink<WsMessage> for WebSocket {
impl Sink<Message> for WebSocket {
type Error = Error;

fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Expand All @@ -43,12 +43,12 @@ impl Sink<WsMessage> for WebSocket {
}
}

fn start_send(mut self: Pin<&mut Self>, item: WsMessage) -> Result<(), Self::Error> {
fn start_send(mut self: Pin<&mut Self>, item: Message) -> Result<(), Self::Error> {
match self.deref_mut() {
#[cfg(not(target_arch = "wasm32"))]
Self::Tokio(s) => Pin::new(s).start_send(item).map_err(Into::into),
Self::Tokio(s) => Pin::new(s).start_send(item.into()).map_err(Into::into),
#[cfg(all(feature = "tor", not(target_arch = "wasm32")))]
Self::Tor(s) => Pin::new(s).start_send(item).map_err(Into::into),
Self::Tor(s) => Pin::new(s).start_send(item.into()).map_err(Into::into),
#[cfg(target_arch = "wasm32")]
Self::Wasm(s) => Pin::new(s).start_send(item).map_err(Into::into),
}
Expand Down Expand Up @@ -78,14 +78,20 @@ impl Sink<WsMessage> for WebSocket {
}

impl Stream for WebSocket {
type Item = Result<WsMessage, Error>;
type Item = Result<Message, Error>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.deref_mut() {
#[cfg(not(target_arch = "wasm32"))]
Self::Tokio(s) => Pin::new(s).poll_next(cx).map_err(Into::into),
Self::Tokio(s) => Pin::new(s)
.poll_next(cx)
.map(|i| i.map(|res| res.map(|msg| msg.into())))
.map_err(Into::into),
#[cfg(all(feature = "tor", not(target_arch = "wasm32")))]
Self::Tor(s) => Pin::new(s).poll_next(cx).map_err(Into::into),
Self::Tor(s) => Pin::new(s)
.poll_next(cx)
.map(|i| i.map(|res| res.map(|msg| msg.into())))
.map_err(Into::into),
#[cfg(target_arch = "wasm32")]
Self::Wasm(s) => Pin::new(s).poll_next(cx).map_err(Into::into),
}
Expand Down
88 changes: 6 additions & 82 deletions src/wasm/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,69 +2,24 @@
// Copyright (c) 2023-2024 Yuki Kishimoto
// Distributed under the MIT software license

use core::{fmt, str};

use js_sys::{ArrayBuffer, Uint8Array};
use wasm_bindgen::JsCast;
use web_sys::{Blob, MessageEvent};

use crate::message::Message;
use crate::wasm::Error;

/// Represents a WebSocket Message, after converting from JavaScript type.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum WsMessage {
/// The data of the message is a string.
Text(String),
/// The message contains binary data.
Binary(Vec<u8>),
}

impl WsMessage {
/// Get the length of the WebSocket message.
#[inline]
pub fn len(&self) -> usize {
match self {
Self::Text(string) => string.len(),
Self::Binary(data) => data.len(),
}
}

/// Returns true if the WebSocket message has no content.
/// For example, if the other side of the connection sent an empty string.
#[inline]
pub fn is_empty(&self) -> bool {
self.len() == 0
}

/// Consume the message and return it as binary data.
pub fn into_data(self) -> Vec<u8> {
match self {
Self::Text(string) => string.into_bytes(),
Self::Binary(data) => data,
}
}

/// Attempt to get a &str from the WebSocket message,
/// this will try to convert binary data to utf8.
pub fn to_text(&self) -> Result<&str, Error> {
match self {
Self::Text(string) => Ok(string),
Self::Binary(data) => Ok(str::from_utf8(data)?),
}
}
}

/// This will convert the JavaScript event into a WsMessage. Note that this
/// will only work if the connection is set to use the binary type ArrayBuffer.
/// On binary type Blob, this will panic.
impl TryFrom<MessageEvent> for WsMessage {
impl TryFrom<MessageEvent> for Message {
type Error = Error;

fn try_from(evt: MessageEvent) -> Result<Self, Self::Error> {
match evt.data() {
d if d.is_instance_of::<ArrayBuffer>() => Ok(WsMessage::Binary(
Uint8Array::new(d.unchecked_ref()).to_vec(),
)),
d if d.is_instance_of::<ArrayBuffer>() => {
Ok(Message::Binary(Uint8Array::new(d.unchecked_ref()).to_vec()))
}

// We don't allow invalid encodings. In principle if needed,
// we could add a variant to WsMessage with a CString or an OsString
Expand All @@ -74,7 +29,7 @@ impl TryFrom<MessageEvent> for WsMessage {
// idea to begin with. If you need data that is not a valid string, use a binary
// message.
d if d.is_string() => match d.as_string() {
Some(text) => Ok(WsMessage::Text(text)),
Some(text) => Ok(Message::Text(text)),
None => Err(Error::InvalidEncoding),
},

Expand All @@ -87,34 +42,3 @@ impl TryFrom<MessageEvent> for WsMessage {
}
}
}

impl From<Vec<u8>> for WsMessage {
fn from(vec: Vec<u8>) -> Self {
WsMessage::Binary(vec)
}
}

impl From<String> for WsMessage {
fn from(s: String) -> Self {
WsMessage::Text(s)
}
}

impl AsRef<[u8]> for WsMessage {
fn as_ref(&self) -> &[u8] {
match self {
Self::Text(string) => string.as_ref(),
Self::Binary(data) => data.as_ref(),
}
}
}

impl fmt::Display for WsMessage {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
if let Ok(string) = self.to_text() {
write!(f, "{string}")
} else {
write!(f, "Binary Data<length={}>", self.len())
}
}
}
1 change: 0 additions & 1 deletion src/wasm/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ mod stream;

pub use self::error::Error;
use self::event::{CloseEvent, WsEvent};
pub use self::message::WsMessage;
use self::pharos::SharedPharos;
use self::socket::WebSocket as WasmWebSocket;
use self::state::WsState;
Expand Down
Loading

0 comments on commit 12a8d4a

Please sign in to comment.