diff --git a/Cargo.toml b/Cargo.toml index 1d0aab54..9f4e0e35 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,6 +43,9 @@ async-std = { version = "1.12.0", features = ["attributes"], optional = true } neovim-lib = { version = "0.6.1", optional = true } parity-tokio-ipc = { version = "0.9.0", optional = true } +[target.'cfg(windows)'.dependencies] +winapi = { version = '*', features = ["winerror"] } + [dev-dependencies] tempfile = "3.8.0" # TODO: if changing tempfile: the rand version is based on whatever version @@ -109,3 +112,15 @@ required-features = ["use_tokio"] [[test]] name = "basic" + +[[test]] +name = "regression" +path = "tests/regression/mod.rs" + +[[bin]] +name = "linebuffercrash" +required-features = ["use_tokio"] + +[[bin]] +name = "linebuffercrash_as" +required-features = ["use_async-std"] diff --git a/examples/basic.rs b/examples/basic.rs index dcb45fbb..25f2a599 100644 --- a/examples/basic.rs +++ b/examples/basic.rs @@ -6,7 +6,7 @@ use async_trait::async_trait; use rmpv::Value; -use tokio::io::Stdout; +use tokio::fs::File as TokioFile; use nvim_rs::{ compat::tokio::Compat, create::tokio as create, rpc::IntoVal, Handler, Neovim, @@ -17,13 +17,13 @@ struct NeovimHandler {} #[async_trait] impl Handler for NeovimHandler { - type Writer = Compat; + type Writer = Compat; async fn handle_request( &self, name: String, _args: Vec, - _neovim: Neovim>, + _neovim: Neovim>, ) -> Result { match name.as_ref() { "ping" => Ok(Value::from("pong")), @@ -35,7 +35,7 @@ impl Handler for NeovimHandler { #[tokio::main] async fn main() { let handler: NeovimHandler = NeovimHandler {}; - let (nvim, io_handler) = create::new_parent(handler).await; + let (nvim, io_handler) = create::new_parent(handler).await.unwrap(); let curbuf = nvim.get_current_buf().await.unwrap(); let mut envargs = env::args(); diff --git a/examples/bench_async-std.rs b/examples/bench_async-std.rs index 795cf62b..a72ce444 100644 --- a/examples/bench_async-std.rs +++ b/examples/bench_async-std.rs @@ -4,25 +4,22 @@ use async_trait::async_trait; use rmpv::Value; -use async_std::io::Stdout; -use async_std; +use async_std::{self, fs::File as ASFile}; -use nvim_rs::{ - create::async_std as create, Handler, Neovim, -}; +use nvim_rs::{create::async_std as create, Handler, Neovim}; #[derive(Clone)] -struct NeovimHandler{} +struct NeovimHandler {} #[async_trait] impl Handler for NeovimHandler { - type Writer =Stdout; + type Writer = ASFile; async fn handle_request( &self, name: String, _args: Vec, - neovim: Neovim, + neovim: Neovim, ) -> Result { match name.as_ref() { "file" => { @@ -31,30 +28,29 @@ impl Handler for NeovimHandler { let _x = c.get_lines(0, -1, false).await; } Ok(Value::Nil) - }, + } "buffer" => { for _ in 0..10_000_usize { let _ = neovim.get_current_buf().await.unwrap(); } Ok(Value::Nil) - }, + } "api" => { for _ in 0..1_000_usize { let _ = neovim.get_api_info().await.unwrap(); } Ok(Value::Nil) - }, - _ => Ok(Value::Nil) + } + _ => Ok(Value::Nil), } } } #[async_std::main] async fn main() { + let handler: NeovimHandler = NeovimHandler {}; - let handler: NeovimHandler = NeovimHandler{}; - - let (nvim, io_handler) = create::new_parent(handler).await; + let (nvim, io_handler) = create::new_parent(handler).await.unwrap(); // Any error should probably be logged, as stderr is not visible to users. match io_handler.await { diff --git a/examples/bench_tokio.rs b/examples/bench_tokio.rs index 450b7525..86897ad6 100644 --- a/examples/bench_tokio.rs +++ b/examples/bench_tokio.rs @@ -4,24 +4,24 @@ use async_trait::async_trait; use rmpv::Value; -use tokio::io::Stdout; +use tokio::fs::File as TokioFile; use nvim_rs::{ compat::tokio::Compat, create::tokio as create, Handler, Neovim, }; #[derive(Clone)] -struct NeovimHandler{} +struct NeovimHandler {} #[async_trait] impl Handler for NeovimHandler { - type Writer = Compat; + type Writer = Compat; async fn handle_request( &self, name: String, _args: Vec, - neovim: Neovim>, + neovim: Neovim>, ) -> Result { match name.as_ref() { "file" => { @@ -30,30 +30,29 @@ impl Handler for NeovimHandler { let _x = c.get_lines(0, -1, false).await; } Ok(Value::Nil) - }, + } "buffer" => { for _ in 0..10_000_usize { let _ = neovim.get_current_buf().await.unwrap(); } Ok(Value::Nil) - }, + } "api" => { for _ in 0..1_000_usize { let _ = neovim.get_api_info().await.unwrap(); } Ok(Value::Nil) - }, - _ => Ok(Value::Nil) + } + _ => Ok(Value::Nil), } } } #[tokio::main] async fn main() { + let handler: NeovimHandler = NeovimHandler {}; - let handler: NeovimHandler = NeovimHandler{}; - - let (nvim, io_handler) = create::new_parent(handler).await; + let (nvim, io_handler) = create::new_parent(handler).await.unwrap(); // Any error should probably be logged, as stderr is not visible to users. match io_handler.await { diff --git a/examples/scorched_earth.rs b/examples/scorched_earth.rs index 024e4716..ef9a78cc 100644 --- a/examples/scorched_earth.rs +++ b/examples/scorched_earth.rs @@ -6,7 +6,7 @@ use async_trait::async_trait; use rmpv::Value; use futures::lock::Mutex; -use tokio::io::Stdout; +use tokio::fs::File as TokioFile; use nvim_rs::{ compat::tokio::Compat, create::tokio as create, Handler, Neovim, @@ -44,13 +44,13 @@ struct NeovimHandler(Arc>); #[async_trait] impl Handler for NeovimHandler { - type Writer = Compat; + type Writer = Compat; async fn handle_notify( &self, name: String, args: Vec, - neovim: Neovim>, + neovim: Neovim>, ) { match name.as_ref() { "cursor-moved-i" => { @@ -102,7 +102,7 @@ async fn main() { }; let handler: NeovimHandler = NeovimHandler(Arc::new(Mutex::new(p))); - let (nvim, io_handler) = create::new_parent(handler).await; + let (nvim, io_handler) = create::new_parent(handler).await.unwrap(); // Any error should probably be logged, as stderr is not visible to users. match io_handler.await { diff --git a/examples/scorched_earth_as.rs b/examples/scorched_earth_as.rs index 23986a47..50d505c1 100644 --- a/examples/scorched_earth_as.rs +++ b/examples/scorched_earth_as.rs @@ -7,11 +7,9 @@ use rmpv::Value; use futures::lock::Mutex; -use nvim_rs::{ - create::async_std as create, Handler, Neovim, -}; +use nvim_rs::{create::async_std as create, Handler, Neovim}; -use async_std::{self, io::Stdout}; +use async_std::{self, fs::File as ASFile}; struct Posis { cursor_start: Option<(u64, u64)>, @@ -45,13 +43,13 @@ struct NeovimHandler(Arc>); #[async_trait] impl Handler for NeovimHandler { - type Writer = Stdout; + type Writer = ASFile; async fn handle_notify( &self, name: String, args: Vec, - neovim: Neovim, + neovim: Neovim, ) { match name.as_ref() { "cursor-moved-i" => { @@ -103,7 +101,7 @@ async fn main() { }; let handler: NeovimHandler = NeovimHandler(Arc::new(Mutex::new(p))); - let (nvim, io_handler) = create::new_parent(handler).await; + let (nvim, io_handler) = create::new_parent(handler).await.unwrap(); // Any error should probably be logged, as stderr is not visible to users. match io_handler.await { diff --git a/src/bin/linebuffercrash.rs b/src/bin/linebuffercrash.rs new file mode 100644 index 00000000..9c9fa56a --- /dev/null +++ b/src/bin/linebuffercrash.rs @@ -0,0 +1,24 @@ + +use nvim_rs::{ + create::tokio as create, + rpc::handler::Dummy as DummyHandler +}; + + +#[tokio::main] +async fn main() { + let handler = DummyHandler::new(); + let (nvim, _io_handler) = create::new_parent(handler).await.unwrap(); + let curbuf = nvim.get_current_buf().await.unwrap(); + + // If our Stdout is linebuffered, this has a high chance of crashing neovim + // Should probably befixed in neovim itself, but for now, let's just make + // sure we're not using linebuffering, or at least don't crash neovim with + // this. + for i in 0..20 { + curbuf.set_name(&format!("a{i}")).await.unwrap(); + } + + let _ = nvim.command("quit!").await; + +} diff --git a/src/bin/linebuffercrash_as.rs b/src/bin/linebuffercrash_as.rs new file mode 100644 index 00000000..10c13008 --- /dev/null +++ b/src/bin/linebuffercrash_as.rs @@ -0,0 +1,24 @@ + +use nvim_rs::{ + create::async_std as create, + rpc::handler::Dummy as DummyHandler +}; + + +#[async_std::main] +async fn main() { + let handler = DummyHandler::new(); + let (nvim, _io_handler) = create::new_parent(handler).await.unwrap(); + let curbuf = nvim.get_current_buf().await.unwrap(); + + // If our Stdout is linebuffered, this has a high chance of crashing neovim + // Should probably befixed in neovim itself, but for now, let's just make + // sure we're not using linebuffering, or at least don't crash neovim with + // this. + for i in 0..20 { + curbuf.set_name(&format!("a{i}")).await.unwrap(); + } + + let _ = nvim.command("quit!").await; + +} diff --git a/src/create/async_std.rs b/src/create/async_std.rs index 9603f8e0..378b5557 100644 --- a/src/create/async_std.rs +++ b/src/create/async_std.rs @@ -6,7 +6,8 @@ use std::{future::Future, io}; use async_std::os::unix::net::UnixStream; use async_std::{ - io::{stdin, stdout, Stdout}, + fs::File as ASFile, + io::stdin, net::{TcpStream, ToSocketAddrs}, task::{spawn, JoinHandle}, }; @@ -16,7 +17,12 @@ use async_std::path::Path; use futures::io::{AsyncReadExt, WriteHalf}; -use crate::{create::Spawner, error::LoopError, neovim::Neovim, Handler}; +use crate::{ + create::{unbuffered_stdout, Spawner}, + error::LoopError, + neovim::Neovim, + Handler, +}; impl Spawner for H where @@ -78,12 +84,13 @@ where /// Connect to the neovim instance that spawned this process over stdin/stdout pub async fn new_parent( handler: H, -) -> (Neovim, JoinHandle>>) +) -> io::Result<(Neovim, JoinHandle>>)> where - H: Handler, + H: Handler, { - let (neovim, io) = Neovim::::new(stdin(), stdout(), handler); + let sout: ASFile = unbuffered_stdout()?.into(); + let (neovim, io) = Neovim::::new(stdin(), sout, handler); let io_handle = spawn(io); - (neovim, io_handle) + Ok((neovim, io_handle)) } diff --git a/src/create/mod.rs b/src/create/mod.rs index 741b3e13..95e4891f 100644 --- a/src/create/mod.rs +++ b/src/create/mod.rs @@ -17,6 +17,7 @@ pub mod tokio; pub mod async_std; use core::future::Future; +use std::{fs::File, io}; use crate::rpc::handler::Handler; @@ -36,3 +37,20 @@ pub trait Spawner: Handler { where Fut: Future + Send + 'static; } + +/// Create a std::io::File for stdout, which is not line-buffered, as +/// opposed to std::io::Stdout. +#[cfg(unix)] +pub fn unbuffered_stdout() -> io::Result { + use std::{io::stdout, os::fd::AsFd}; + + let owned_sout_fd = stdout().as_fd().try_clone_to_owned()?; + Ok(File::from(owned_sout_fd)) +} +#[cfg(windows)] +pub fn unbuffered_stdout() -> io::Result { + use std::{io::stdout, os::windows::io::AsHandle}; + + let owned_sout_handle = stdout().as_handle().try_clone_to_owned()?; + Ok(File::from(owned_sout_handle)) +} diff --git a/src/create/tokio.rs b/src/create/tokio.rs index 80fe3c4c..339454c3 100644 --- a/src/create/tokio.rs +++ b/src/create/tokio.rs @@ -8,7 +8,8 @@ use std::{ }; use tokio::{ - io::{split, stdin, stdout, Stdout, WriteHalf}, + fs::File as TokioFile, + io::{split, stdin, WriteHalf}, net::{TcpStream, ToSocketAddrs}, process::{Child, ChildStdin, Command}, spawn, @@ -21,7 +22,12 @@ use tokio_util::compat::{ Compat, TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt, }; -use crate::{create::Spawner, error::LoopError, neovim::Neovim, Handler}; +use crate::{ + create::{unbuffered_stdout, Spawner}, + error::LoopError, + neovim::Neovim, + Handler, +}; impl Spawner for H where @@ -152,19 +158,24 @@ where /// Connect to the neovim instance that spawned this process over stdin/stdout pub async fn new_parent( handler: H, -) -> ( - Neovim>, - JoinHandle>>, -) +) -> Result< + ( + Neovim>, + JoinHandle>>, + ), + Error, +> where - H: Handler>, + H: Handler>, { - let (neovim, io) = Neovim::>::new( + let sout = TokioFile::from_std(unbuffered_stdout()?); + + let (neovim, io) = Neovim::>::new( stdin().compat(), - stdout().compat_write(), + sout.compat(), handler, ); let io_handle = spawn(io); - (neovim, io_handle) + Ok((neovim, io_handle)) } diff --git a/src/neovim.rs b/src/neovim.rs index 55272edf..6825a23c 100644 --- a/src/neovim.rs +++ b/src/neovim.rs @@ -12,7 +12,7 @@ use futures::{ mpsc::{unbounded, UnboundedReceiver, UnboundedSender}, oneshot, }, - io::{AsyncRead, AsyncWrite, BufWriter}, + io::{AsyncRead, AsyncWrite,}, lock::Mutex, sink::SinkExt, stream::StreamExt, @@ -54,7 +54,7 @@ pub struct Neovim where W: AsyncWrite + Send + Unpin + 'static, { - pub(crate) writer: Arc>>, + pub(crate) writer: Arc>, pub(crate) queue: Queue, pub(crate) msgid_counter: Arc, } @@ -100,7 +100,7 @@ where H: Handler + Spawner, { let req = Neovim { - writer: Arc::new(Mutex::new(BufWriter::new(writer))), + writer: Arc::new(Mutex::new(writer)), msgid_counter: Arc::new(AtomicU64::new(0)), queue: Arc::new(Mutex::new(Vec::new())), }; diff --git a/src/rpc/model.rs b/src/rpc/model.rs index 920d3336..bf5f0ff8 100644 --- a/src/rpc/model.rs +++ b/src/rpc/model.rs @@ -7,7 +7,7 @@ use std::{ }; use futures::{ - io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, BufWriter}, + io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt,}, lock::Mutex, }; use rmpv::{decode::read_value, encode::write_value, Value}; @@ -165,7 +165,7 @@ fn decode_buffer( /// Encode the given message into the `BufWriter`. Flushes the writer when /// finished. pub async fn encode( - writer: Arc>>, + writer: Arc>, msg: RpcMessage, ) -> std::result::Result<(), Box> { let mut v: Vec = vec![]; diff --git a/tests/regression/buffering.rs b/tests/regression/buffering.rs new file mode 100644 index 00000000..8e2a695a --- /dev/null +++ b/tests/regression/buffering.rs @@ -0,0 +1,45 @@ +#[path = "../common/mod.rs"] +mod common; +use common::*; + +use std::{path::PathBuf, process::Command}; + +fn viml_escape(in_str: &str) -> String { + in_str.replace('\\', r"\\") +} + +fn linebuffercrashbin() -> &'static str { + #[cfg(feature = "use_tokio")] + return "linebuffercrash"; + #[cfg(feature = "use_async-std")] + return "linebuffercrash_as"; +} + +#[test] +fn linebuffer_crash() { + let c1 = format!( + "let jobid = jobstart([\"{}\"], {{\"rpc\": v:true}})", + viml_escape( + PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .join("target") + .join("debug") + .join(linebuffercrashbin()) + .to_str() + .unwrap() + ) + ); + + let status = Command::new(nvim_path()) + .args(&[ + "-u", + "NONE", + "--headless", + "-c", + &c1, + ]) + .status() + .unwrap(); + + assert!(status.success()); + +} diff --git a/tests/regression/mod.rs b/tests/regression/mod.rs new file mode 100644 index 00000000..cf8373db --- /dev/null +++ b/tests/regression/mod.rs @@ -0,0 +1,4 @@ +#[cfg(feature = "use_tokio")] +pub mod buffering; +#[cfg(feature = "use_async-std")] +pub mod buffering;