From 6c103d48de0d4d3b6795a537caf94cb2bd6927bd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=8B=8F=E5=90=91=E5=A4=9C?= Date: Wed, 24 Apr 2024 21:16:01 +0800 Subject: [PATCH] feat(packet): remove ack resend and add lock for socket --- src/bin/main.rs | 2 +- src/models/client.rs | 33 +++----- src/models/packet.rs | 184 ++++++++++++------------------------------ src/models/render.rs | 20 ++--- src/models/server.rs | 55 ++++++++----- src/models/session.rs | 50 +++++++----- src/utils/gear.rs | 56 ++++++------- 7 files changed, 164 insertions(+), 236 deletions(-) diff --git a/src/bin/main.rs b/src/bin/main.rs index 7760c41..4502eeb 100644 --- a/src/bin/main.rs +++ b/src/bin/main.rs @@ -99,7 +99,7 @@ async fn main() -> Result<()> { path_route!(router, "/json" => json); path_route!(router, "/alive" => alive); - let mut server = Server::new("0.0.0.0", 7076, router); + let server = Server::new("0.0.0.0", 7076, router); server.run().await?; } _ => { diff --git a/src/models/client.rs b/src/models/client.rs index 0e5b9c1..2685f30 100644 --- a/src/models/client.rs +++ b/src/models/client.rs @@ -2,11 +2,7 @@ use std::{collections::VecDeque, sync::Arc}; use anyhow::{Error, Result}; -use tokio::{ - net::TcpStream, - sync::{Mutex, RwLock}, - task::JoinHandle, -}; +use tokio::{net::TcpStream, sync::Mutex, task::JoinHandle}; use crate::exceptions::Exception; #[cfg(feature = "python")] @@ -125,7 +121,7 @@ pub struct Client { pub entrance: String, pub path: OblivionPath, pub header: String, - pub session: Arc>, + pub session: Arc, pub responses: Arc>>, } @@ -151,26 +147,21 @@ impl Client { entrance: entrance.to_string(), path, header, - session: Arc::new(RwLock::new(session)), + session: Arc::new(session), responses: Arc::new(Mutex::new(VecDeque::new())), }) } pub async fn send(&self, data: Vec, status_code: u32) -> Result<()> { - let session = self.session.read().await; - Ok(session.send(data, status_code).await?) + Ok(self.session.send(data, status_code).await?) } pub async fn send_json(&self, json: Value, status_code: u32) -> Result<()> { - let session = self.session.read().await; - Ok(session - .send(json.to_string().into_bytes(), status_code) - .await?) + Ok(self.session.send_json(json, status_code).await?) } pub async fn recv(&self) -> Result { - let session = self.session.read().await; - Ok(session.recv().await?) + Ok(self.session.recv().await?) } pub async fn listen(&self) -> Result>> { @@ -178,10 +169,9 @@ impl Client { let responses = Arc::clone(&self.responses); Ok(tokio::spawn(async move { loop { - let rsess = session.read().await; let mut wres = responses.lock().await; - if !rsess.closed().await { - match rsess.recv().await { + if !session.closed().await { + match session.recv().await { Ok(res) => { if &res.flag == &1 { wres.push_back(res); @@ -190,9 +180,9 @@ impl Client { wres.push_back(res); } Err(e) => { - if !rsess.closed().await { + if !session.closed().await { eprintln!("{:?}", e); - rsess.close().await?; + session.close().await?; } break; } @@ -210,8 +200,7 @@ impl Client { } pub async fn close(&self) -> Result<()> { - let session = self.session.read().await; - session.close().await?; + self.session.close().await?; Ok(()) } } diff --git a/src/models/packet.rs b/src/models/packet.rs index ad6c2ea..91d3792 100644 --- a/src/models/packet.rs +++ b/src/models/packet.rs @@ -6,41 +6,13 @@ use crate::utils::gear::Socket; use crate::utils::generator::{generate_random_salt, SharedKey}; use crate::utils::parser::length; -use anyhow::{Error, Result}; +use anyhow::Result; use p256::ecdh::EphemeralSecret; use p256::PublicKey; -use rand::Rng; use serde_json::Value; const STOP_FLAG: [u8; 4] = u32::MIN.to_be_bytes(); -pub struct ACK { - sequence: u32, -} - -impl ACK { - pub fn new() -> Self { - Self { - sequence: rand::thread_rng().gen_range(1000..=9999), - } - } - - pub async fn from_stream(&mut self, stream: &mut Socket) -> Result { - Ok(Self { - sequence: stream.recv_u32().await?, - }) - } - - pub async fn to_stream(&mut self, stream: &mut Socket) -> Result<()> { - stream.send(&self.plain_data()).await?; - Ok(()) - } - - pub fn plain_data(&mut self) -> [u8; 4] { - self.sequence.to_be_bytes() - } -} - pub struct OSC { pub status_code: u32, } @@ -50,12 +22,12 @@ impl OSC { Self { status_code } } - pub async fn from_stream(stream: &mut Socket) -> Result { + pub async fn from_stream(stream: &Socket) -> Result { let status_code = stream.recv_u32().await?; Ok(Self { status_code }) } - pub async fn to_stream(&mut self, stream: &mut Socket) -> Result<()> { + pub async fn to_stream(&mut self, stream: &Socket) -> Result<()> { stream.send(&self.plain_data()).await?; Ok(()) } @@ -99,7 +71,7 @@ impl<'a> OKE<'a> { Ok(self) } - pub async fn from_stream(&mut self, stream: &mut Socket) -> Result<&mut Self> { + pub async fn from_stream(&mut self, stream: &Socket) -> Result<&mut Self> { let remote_public_key_length = stream.recv_usize().await?; let remote_public_key_bytes = stream.recv(remote_public_key_length).await?; self.remote_public_key = Some(PublicKey::from_sec1_bytes(&remote_public_key_bytes)?); @@ -111,7 +83,7 @@ impl<'a> OKE<'a> { Ok(self) } - pub async fn from_stream_with_salt(&mut self, stream: &mut Socket) -> Result<&mut Self> { + pub async fn from_stream_with_salt(&mut self, stream: &Socket) -> Result<&mut Self> { let remote_public_key_length = stream.recv_usize().await?; let remote_public_key_bytes = stream.recv(remote_public_key_length).await?; self.remote_public_key = Some(PublicKey::from_sec1_bytes(&remote_public_key_bytes)?); @@ -125,12 +97,12 @@ impl<'a> OKE<'a> { Ok(self) } - pub async fn to_stream(&mut self, stream: &mut Socket) -> Result<()> { + pub async fn to_stream(&mut self, stream: &Socket) -> Result<()> { stream.send(&self.plain_data()?).await?; Ok(()) } - pub async fn to_stream_with_salt(&mut self, stream: &mut Socket) -> Result<()> { + pub async fn to_stream_with_salt(&mut self, stream: &Socket) -> Result<()> { stream.send(&self.plain_data()?).await?; stream.send(&self.plain_salt()?).await?; Ok(()) @@ -176,10 +148,7 @@ impl OED { } } - pub fn from_json_or_string( - &mut self, - json_or_str: String, - ) -> Result<&mut Self, Exception> { + pub fn from_json_or_string(&mut self, json_or_str: String) -> Result<&mut Self, Exception> { let (encrypted_data, tag, nonce) = encrypt_plaintext(json_or_str, &self.aes_key.as_ref().unwrap())?; (self.encrypted_data, self.tag, self.nonce) = @@ -207,110 +176,63 @@ impl OED { Ok(self) } - pub async fn from_stream( - &mut self, - stream: &mut Socket, - total_attemps: u32, - ) -> Result<&mut Self> { - let mut attemp = 0; - let mut ack = false; - - while attemp < total_attemps { - let mut ack_packet = ACK::new(); - let mut ack_packet = ack_packet.from_stream(stream).await?; - - let len_nonce = stream.recv_usize().await?; - let len_tag = stream.recv_usize().await?; - - self.nonce = Some(stream.recv(len_nonce).await?); - self.tag = Some(stream.recv(len_tag).await?); - - let mut encrypted_data: Vec = Vec::new(); - self.chunk_count = 0; - - loop { - let prefix = stream.recv_usize().await?; - if prefix == 0 { - self.encrypted_data = Some(encrypted_data); - break; - } - - let mut add: Vec = Vec::new(); - while add.len() != prefix { - add.extend(stream.recv(prefix - add.len()).await?) - } - - encrypted_data.extend(add); - self.chunk_count += 1; - } + pub async fn from_stream(&mut self, stream: &Socket) -> Result<&mut Self> { + let len_nonce = stream.recv_usize().await?; + let len_tag = stream.recv_usize().await?; - match decrypt_bytes( - self.encrypted_data.clone().unwrap(), - self.tag.as_ref().unwrap(), - self.aes_key.as_ref().unwrap(), - self.nonce.as_ref().unwrap(), - ) { - Ok(data) => { - self.data = Some(data); - ack_packet.to_stream(stream).await?; - ack = true; - break; - } - Err(error) => { - stream.send(&STOP_FLAG).await?; - eprintln!("An error occured: {error}\nRetried {attemp} times."); - attemp += 1; - continue; - } - } - } - if !ack { - stream.close().await?; - return Err(Error::from(Exception::AllAttemptsRetryFailed { - times: total_attemps, - })); - } + self.nonce = Some(stream.recv(len_nonce).await?); + self.tag = Some(stream.recv(len_tag).await?); - Ok(self) - } + let mut encrypted_data: Vec = Vec::new(); + self.chunk_count = 0; - pub async fn to_stream(&mut self, stream: &mut Socket, total_attemps: u32) -> Result<()> { - let attemp = 0; - let mut ack = false; + loop { + let prefix = stream.recv_usize().await?; + if prefix == 0 { + self.encrypted_data = Some(encrypted_data); + break; + } - while attemp <= total_attemps { - let mut ack_packet = ACK::new(); - ack_packet.to_stream(stream).await?; + let mut add: Vec = Vec::new(); + while add.len() != prefix { + add.extend(stream.recv(prefix - add.len()).await?) + } - stream.send(&self.plain_data()?).await?; + encrypted_data.extend(add); + self.chunk_count += 1; + } - self.chunk_count = 0; - let encrypted_data = self.encrypted_data.as_ref().unwrap(); - let mut remaining_data = &encrypted_data[..]; - while !remaining_data.is_empty() { - let chunk_size = remaining_data.len().min(2048); + match decrypt_bytes( + self.encrypted_data.clone().unwrap(), + self.tag.as_ref().unwrap(), + self.aes_key.as_ref().unwrap(), + self.nonce.as_ref().unwrap(), + ) { + Ok(data) => { + self.data = Some(data); + Ok(self) + } + Err(error) => Err(Exception::DecryptError { error }.into()), + } + } - let chunk_length = chunk_size as u32; + pub async fn to_stream(&mut self, stream: &Socket) -> Result<()> { + stream.send(&self.plain_data()?).await?; - stream.send(&chunk_length.to_be_bytes()).await?; - stream.send(&remaining_data[..chunk_size]).await?; + self.chunk_count = 0; + let encrypted_data = self.encrypted_data.as_ref().unwrap(); + let mut remaining_data = &encrypted_data[..]; + while !remaining_data.is_empty() { + let chunk_size = remaining_data.len().min(2048); - remaining_data = &remaining_data[chunk_size..]; - } - stream.send(&STOP_FLAG).await?; + let chunk_length = chunk_size as u32; - if ack_packet.sequence == stream.recv_u32().await? { - ack = true; - break; - } - } + stream.send(&chunk_length.to_be_bytes()).await?; + stream.send(&remaining_data[..chunk_size]).await?; - if !ack { - stream.close().await?; - return Err(Error::from(Exception::AllAttemptsRetryFailed { - times: total_attemps, - })); + remaining_data = &remaining_data[chunk_size..]; } + stream.send(&STOP_FLAG).await?; Ok(()) } diff --git a/src/models/render.rs b/src/models/render.rs index fad57de..0e7cb4d 100644 --- a/src/models/render.rs +++ b/src/models/render.rs @@ -29,11 +29,11 @@ impl TextResponse { } } - pub fn as_bytes(&mut self) -> Vec { + pub fn as_bytes(&self) -> Vec { self.text.as_bytes().to_vec() } - pub fn get_status_code(&mut self) -> u32 { + pub fn get_status_code(&self) -> u32 { self.status_code } } @@ -48,43 +48,43 @@ impl JsonResponse { Self { data, status_code } } - pub fn as_bytes(&mut self) -> Vec { + pub fn as_bytes(&self) -> Vec { self.data.to_string().as_bytes().to_vec() } - pub fn get_status_code(&mut self) -> u32 { + pub fn get_status_code(&self) -> u32 { self.status_code } } impl BaseResponse { - pub fn as_bytes(&mut self) -> Result, Exception> { + pub fn as_bytes(&self) -> Result, Exception> { match self { Self::FileResponse(_, _) => Err(Exception::UnsupportedMethod { method: "FileResponse".to_string(), }), Self::TextResponse(text, status_code) => { - let mut tres = TextResponse::new(&text, *status_code); + let tres = TextResponse::new(&text, *status_code); Ok(tres.as_bytes()) } Self::JsonResponse(data, status_code) => { - let mut jres = JsonResponse::new(data.clone(), *status_code); + let jres = JsonResponse::new(data.clone(), *status_code); Ok(jres.as_bytes()) } } } - pub fn get_status_code(&mut self) -> Result { + pub fn get_status_code(&self) -> Result { match self { Self::FileResponse(_, _) => Err(Exception::UnsupportedMethod { method: "FileResponse".to_string(), }), Self::TextResponse(text, status_code) => { - let mut tres = TextResponse::new(&text, *status_code); + let tres = TextResponse::new(&text, *status_code); Ok(tres.get_status_code()) } Self::JsonResponse(data, status_code) => { - let mut jres = JsonResponse::new(data.clone(), *status_code); + let jres = JsonResponse::new(data.clone(), *status_code); Ok(jres.get_status_code()) } } diff --git a/src/models/server.rs b/src/models/server.rs index 665c7b2..c108119 100644 --- a/src/models/server.rs +++ b/src/models/server.rs @@ -13,45 +13,53 @@ use super::packet::{OED, OSC}; use super::router::Router; use super::session::Session; -async fn _handle(router: &mut Router, mut socket: Socket, peer: SocketAddr) -> Result { - socket.set_ttl(20)?; - - let mut session = Session::new(socket)?; +async fn _handle(router: &Router, stream: TcpStream, peer: SocketAddr) -> Result<()> { + stream.set_ttl(20)?; + let mut session = Session::new(Socket::new(stream))?; if let Err(error) = session.handshake(1).await { eprintln!( "{} -> [{}] \"{}\" {}", peer.ip().to_string().cyan(), Local::now().format("%d/%m/%Y %H:%M:%S"), - "CONNECT".yellow(), + "CONNECT - Oblivion/2.0".yellow(), "500".red() ); - return Err(Error::from(error)); + eprintln!("{}", error.to_string().bright_red()); + return Ok(()); } let header = session.header(); let ip_addr = session.get_ip(); let aes_key = session.aes_key.clone().unwrap(); - let arc_socket = Arc::clone(&session.socket); + println!( + "{} -> [{}] \"{}\" {}", + ip_addr.cyan(), + Local::now().format("%d/%m/%Y %H:%M:%S"), + header.green(), + "OK".cyan() + ); + + let socket = Arc::clone(&session.socket); let mut route = router.get_handler(&session.request.as_ref().unwrap().olps)?; - let mut callback = route.get_handler()(session).await?; + let callback = route.get_handler()(session).await?; let status_code = callback.get_status_code()?; - let mut socket = arc_socket.lock().await; - OSC::from_u32(1).to_stream(&mut socket).await?; + OSC::from_u32(1).to_stream(&socket).await?; OED::new(Some(aes_key)) .from_bytes(callback.as_bytes()?)? - .to_stream(&mut socket, 5) + .to_stream(&socket) .await?; OSC::from_u32(callback.get_status_code()?) - .to_stream(&mut socket) + .to_stream(&socket) .await?; + socket.close().await?; - let display = format!( - "{} -> [{}] \"{}\" {}", + println!( + "{} <- [{}] \"{}\" {}", ip_addr.cyan(), Local::now().format("%d/%m/%Y %H:%M:%S"), header.green(), @@ -64,17 +72,20 @@ async fn _handle(router: &mut Router, mut socket: Socket, peer: SocketAddr) -> R } ); - Ok(display) + Ok(()) } pub async fn handle(router: Router, stream: TcpStream, peer: SocketAddr) { - let socket = Socket::new(stream); - let mut router = router; - match _handle(&mut router, socket, peer).await { - Ok(display) => { - println!("{}", display) - } + match _handle(&router, stream, peer).await { + Ok(()) => {} Err(error) => { + eprintln!( + "{} <-> [{}] \"{}\" {}", + peer.ip().to_string().cyan(), + Local::now().format("%d/%m/%Y %H:%M:%S"), + "CONNECT - Oblivion/2.0".yellow(), + "501".red() + ); eprintln!("{}", error.to_string().bright_red()) } } @@ -96,7 +107,7 @@ impl Server { } } - pub async fn run(&mut self) -> Result<()> { + pub async fn run(&self) -> Result<()> { println!("Performing system checks...\n"); let address = format!("{}:{}", self.host, self.port); diff --git a/src/models/session.rs b/src/models/session.rs index 496a90b..36b4197 100644 --- a/src/models/session.rs +++ b/src/models/session.rs @@ -3,7 +3,8 @@ use std::sync::Arc; use anyhow::{anyhow, Result}; use chrono::{DateTime, Local}; use p256::{ecdh::EphemeralSecret, PublicKey}; -use tokio::sync::{Mutex, RwLock}; +use serde_json::Value; +use tokio::sync::RwLock; use crate::exceptions::Exception; use crate::utils::gear::Socket; @@ -12,6 +13,7 @@ use crate::utils::parser::{length, OblivionRequest}; use super::client::Response; use super::packet::{OED, OKE, OSC}; +use super::render::BaseResponse; pub struct Session { pub header: Option, @@ -20,8 +22,8 @@ pub struct Session { pub(crate) aes_key: Option>, pub request_time: DateTime, pub request: Option, - pub socket: Arc>, - closed: Arc>, + pub socket: Arc, + closed: RwLock, } impl Session { @@ -34,8 +36,8 @@ impl Session { aes_key: None, request_time: Local::now(), request: None, - socket: Arc::new(Mutex::new(socket)), - closed: Arc::new(RwLock::new(false)), + socket: Arc::new(socket), + closed: RwLock::new(false), }) } @@ -48,36 +50,36 @@ impl Session { aes_key: None, request_time: Local::now(), request: None, - socket: Arc::new(Mutex::new(socket)), - closed: Arc::new(RwLock::new(false)), + socket: Arc::new(socket), + closed: RwLock::new(false), }) } pub async fn first_hand(&mut self) -> Result<()> { - let socket = &mut self.socket.lock().await; + let socket = Arc::clone(&self.socket); let header = self.header.as_ref().unwrap().as_bytes(); socket .send(&[&length(&header.to_vec())?, header].concat()) .await?; let mut oke = OKE::new(Some(&self.private_key), Some(self.public_key))?; - oke.from_stream_with_salt(socket).await?; + oke.from_stream_with_salt(&socket).await?; self.aes_key = Some(oke.get_aes_key()); - oke.to_stream(socket).await?; + oke.to_stream(&socket).await?; Ok(()) } pub async fn second_hand(&mut self) -> Result<()> { - let socket = &mut self.socket.lock().await; - let peer = socket.peer_addr()?; + let socket = Arc::clone(&self.socket); + let peer = socket.peer_addr().await?; let len_header = socket.recv_usize().await?; let header = socket.recv_str(len_header).await?; let mut request = OblivionRequest::new(&header)?; request.set_remote_peer(&peer); let mut oke = OKE::new(Some(&self.private_key), Some(self.public_key))?; - oke.to_stream_with_salt(socket).await?; - oke.from_stream(socket).await?; + oke.to_stream_with_salt(&socket).await?; + oke.from_stream(&socket).await?; request.aes_key = Some(oke.get_aes_key()); self.aes_key = Some(oke.get_aes_key()); @@ -101,27 +103,36 @@ impl Session { return Err(Exception::ConnectionClosed.into()); } - let socket = &mut self.socket.lock().await; + let socket = &self.socket; OSC::from_u32(0).to_stream(socket).await?; OED::new(self.aes_key.clone()) .from_bytes(data)? - .to_stream(socket, 5) + .to_stream(socket) .await?; OSC::from_u32(status_code).to_stream(socket).await?; Ok(()) } + pub async fn send_json(&self, json: Value, status_code: u32) -> Result<()> { + self.send(json.to_string().into_bytes(), status_code).await + } + + pub async fn response(&self, response: BaseResponse) -> Result<()> { + self.send(response.as_bytes()?, response.get_status_code()?) + .await + } + pub async fn recv(&self) -> Result { if self.closed().await { return Err(Exception::ConnectionClosed.into()); } - let socket = &mut self.socket.lock().await; + let socket = &self.socket; let flag = OSC::from_stream(socket).await?.status_code; let content = OED::new(self.aes_key.clone()) - .from_stream(socket, 5) + .from_stream(socket) .await? .get_data(); let status_code = OSC::from_stream(socket).await?.status_code; @@ -135,9 +146,8 @@ impl Session { pub async fn close(&self) -> Result<()> { if !self.closed().await { - let socket = &mut self.socket.lock().await; *self.closed.write().await = true; - socket.close().await + self.socket.close().await } else { Ok(()) } diff --git a/src/utils/gear.rs b/src/utils/gear.rs index f47ae58..65ad05e 100644 --- a/src/utils/gear.rs +++ b/src/utils/gear.rs @@ -6,10 +6,9 @@ use ring::aead::{Nonce, NonceSequence}; use ring::error::Unspecified; use tokio::io::{AsyncReadExt, AsyncWriteExt}; -use tokio::net::{ - tcp::{ReadHalf, WriteHalf}, - TcpStream, -}; +use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf}; +use tokio::net::TcpStream; +use tokio::sync::Mutex; /// Absolute Nonce Sequence Structure /// @@ -38,60 +37,57 @@ impl<'a> AbsoluteNonceSequence<'a> { /// Used to abstract Oblivion's handling of transmitted data, wrapping all data type conversions. #[derive(Debug)] pub struct Socket { - pub tcp: TcpStream, + pub reader: Mutex, + pub writer: Mutex, // pub tcp: TcpStream, } impl Socket { pub fn new(tcp: TcpStream) -> Self { - Self { tcp } + let (reader, writer) = tcp.into_split(); + Self { + reader: Mutex::new(reader), + writer: Mutex::new(writer), + } } - pub fn set_ttl(&mut self, ttl: u32) -> Result<()> { - self.tcp.set_ttl(ttl)?; - Ok(()) - } - - pub fn peer_addr(&mut self) -> Result { - Ok(self.tcp.peer_addr()?) + pub async fn peer_addr(&self) -> Result { + Ok(self.writer.lock().await.peer_addr()?) } - pub async fn recv_usize(&mut self) -> Result { + pub async fn recv_usize(&self) -> Result { let mut len_bytes = [0; 4]; - self.tcp.read_exact(&mut len_bytes).await?; + self.reader.lock().await.read_exact(&mut len_bytes).await?; Ok(u32::from_be_bytes(len_bytes) as usize) } - pub async fn recv_u32(&mut self) -> Result { + pub async fn recv_u32(&self) -> Result { let mut len_bytes = [0; 4]; - self.tcp.read_exact(&mut len_bytes).await?; + self.reader.lock().await.read_exact(&mut len_bytes).await?; Ok(u32::from_be_bytes(len_bytes)) } - pub async fn recv(&mut self, len: usize) -> Result> { + pub async fn recv(&self, len: usize) -> Result> { let mut recv_bytes: Vec = vec![0; len]; - self.tcp.read_exact(&mut recv_bytes).await?; + self.reader.lock().await.read_exact(&mut recv_bytes).await?; Ok(recv_bytes) } - pub async fn recv_str(&mut self, len: usize) -> Result { + pub async fn recv_str(&self, len: usize) -> Result { let mut recv_bytes: Vec = vec![0; len]; - self.tcp.read_exact(&mut recv_bytes).await?; + self.reader.lock().await.read_exact(&mut recv_bytes).await?; Ok(String::from_utf8(recv_bytes)?.trim().to_string()) } - pub async fn send(&mut self, data: &[u8]) -> Result<()> { - self.tcp.write_all(&data).await?; - self.tcp.flush().await?; + pub async fn send(&self, data: &[u8]) -> Result<()> { + let mut writer = self.writer.lock().await; + writer.write_all(&data).await?; + writer.flush().await?; Ok(()) } - pub async fn close(&mut self) -> Result<()> { - self.tcp.shutdown().await?; + pub async fn close(&self) -> Result<()> { + self.writer.lock().await.shutdown().await?; Ok(()) } - - pub async fn split(&mut self) -> Result<(ReadHalf, WriteHalf)> { - Ok(self.tcp.split()) - } }