From 4e12f2ef95791d932bfe49c352cd69b608baa3ff Mon Sep 17 00:00:00 2001 From: Niklas Fiekas Date: Mon, 1 Jan 2024 18:55:04 +0100 Subject: [PATCH 01/10] get(0) -> first --- src/ipc.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ipc.rs b/src/ipc.rs index 89f06a7b..6fea651f 100644 --- a/src/ipc.rs +++ b/src/ipc.rs @@ -87,7 +87,7 @@ impl Matrix { pub fn best(&self) -> Option<&T> { self.matrix - .get(0) + .first() .and_then(|row| row.last().and_then(|v| v.as_ref())) } } From 408590b577593b13e3e324a857aa612bef679411 Mon Sep 17 00:00:00 2001 From: Niklas Fiekas Date: Mon, 1 Jan 2024 19:55:56 +0100 Subject: [PATCH 02/10] do not immediately discard skipped positions --- src/api.rs | 5 ++++- src/ipc.rs | 10 ++++----- src/logger.rs | 4 ++-- src/queue.rs | 56 ++++++++++++++++++++++-------------------------- src/stockfish.rs | 1 + 5 files changed, 38 insertions(+), 38 deletions(-) diff --git a/src/api.rs b/src/api.rs index 15c04171..6e99a899 100644 --- a/src/api.rs +++ b/src/api.rs @@ -288,6 +288,9 @@ impl From for Duration { } } +#[derive(Debug, Copy, Clone, Eq, PartialEq, Deserialize)] +pub struct PositionId(pub usize); + #[serde_as] #[derive(Debug, Deserialize)] pub struct AcquireResponseBody { @@ -303,7 +306,7 @@ pub struct AcquireResponseBody { #[serde_as(as = "StringWithSeparator::")] pub moves: Vec, #[serde(rename = "skipPositions", default)] - pub skip_positions: Vec, + pub skip_positions: Vec, } impl AcquireResponseBody { diff --git a/src/ipc.rs b/src/ipc.rs index 6fea651f..8b1df4bc 100644 --- a/src/ipc.rs +++ b/src/ipc.rs @@ -5,14 +5,10 @@ use tokio::sync::oneshot; use url::Url; use crate::{ - api::{AnalysisPart, BatchId, Score, Work}, + api::{AnalysisPart, BatchId, PositionId, Score, Work}, assets::EngineFlavor, }; -/// Uniquely identifies a position within a batch. -#[derive(Debug, Copy, Clone)] -pub struct PositionId(pub usize); - #[derive(Debug, Clone)] pub struct Position { pub work: Work, @@ -20,6 +16,8 @@ pub struct Position { pub flavor: EngineFlavor, pub url: Option, + pub skip: bool, + pub variant: Variant, pub root_fen: Fen, pub moves: Vec, @@ -31,6 +29,8 @@ pub struct PositionResponse { pub position_id: PositionId, pub url: Option, + pub skip: bool, + pub scores: Matrix, pub pvs: Matrix>, pub best_move: Option, diff --git a/src/logger.rs b/src/logger.rs index edcd35be..893dfc7b 100644 --- a/src/logger.rs +++ b/src/logger.rs @@ -10,9 +10,9 @@ use shakmaty::variant::Variant; use url::Url; use crate::{ - api::BatchId, + api::{BatchId, PositionId}, configure::Verbose, - ipc::{Position, PositionId, PositionResponse}, + ipc::{Position, PositionResponse}, util::NevermindExt as _, }; diff --git a/src/queue.rs b/src/queue.rs index 09a1a883..59355f17 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -20,10 +20,13 @@ use tokio::{ use url::Url; use crate::{ - api::{AcquireQuery, AcquireResponseBody, Acquired, AnalysisPart, ApiStub, BatchId, Work}, + api::{ + AcquireQuery, AcquireResponseBody, Acquired, AnalysisPart, ApiStub, BatchId, PositionId, + Work, + }, assets::{EngineFlavor, EvalFlavor}, configure::{BacklogOpt, Endpoint, MaxBackoff, StatsOpt}, - ipc::{Position, PositionFailed, PositionId, PositionResponse, Pull}, + ipc::{Position, PositionFailed, PositionResponse, Pull}, logger::{short_variant_name, Logger, ProgressAt, QueueStatusBar}, stats::{NpsRecorder, Stats, StatsRecorder}, util::{NevermindExt as _, RandomizedBackoff}, @@ -163,12 +166,11 @@ impl QueueState { for pos in batch.positions.into_iter().rev() { positions.insert( 0, - match pos { - Skip::Present(pos) => { - self.incoming.push_back(pos); - None - } - Skip::Skip => Some(Skip::Skip), + if pos.skip { + Some(Skip::Skip) + } else { + self.incoming.push_back(pos); + None }, ); } @@ -505,18 +507,12 @@ enum Skip { Skip, } -impl Skip { - fn is_skipped(&self) -> bool { - matches!(self, Skip::Skip) - } -} - #[derive(Debug, Clone)] pub struct IncomingBatch { work: Work, flavor: EngineFlavor, variant: Variant, - positions: Vec>, + positions: Vec, url: Option, } @@ -564,56 +560,56 @@ impl IncomingBatch { variant: body.variant, positions: match body.work { Work::Move { .. } => { - vec![Skip::Present(Position { + vec![Position { work: body.work, url, + skip: false, flavor, position_id: PositionId(0), variant: body.variant, root_fen, moves: body_moves, - })] + }] } Work::Analysis { .. } => { let mut moves = Vec::new(); - let mut positions = vec![Skip::Present(Position { + let mut positions = Vec::with_capacity(body_moves.len() + 1); + + positions.push(Position { work: body.work.clone(), url: url.clone().map(|mut url| { url.set_fragment(Some("0")); url }), + skip: body.skip_positions.contains(&PositionId(0)), flavor, position_id: PositionId(0), variant: body.variant, root_fen: root_fen.clone(), moves: moves.clone(), - })]; + }); for (i, m) in body_moves.into_iter().enumerate() { + let position_id = PositionId(i + 1); moves.push(m); - positions.push(Skip::Present(Position { + positions.push(Position { work: body.work.clone(), url: url.clone().map(|mut url| { - url.set_fragment(Some(&(1 + i).to_string())); + url.set_fragment(Some(&position_id.0.to_string())); url }), + skip: body.skip_positions.contains(&position_id), flavor, - position_id: PositionId(1 + i), + position_id, variant: body.variant, root_fen: root_fen.clone(), moves: moves.clone(), - })); - } - - for skip in body.skip_positions.into_iter() { - if let Some(pos) = positions.get_mut(skip) { - *pos = Skip::Skip; - } + }); } // Edge case: Batch is immediately completed, because all // positions are skipped. - if positions.iter().all(Skip::is_skipped) { + if positions.iter().all(|p| p.skip) { let now = Instant::now(); return Err(IncomingError::AllSkipped(CompletedBatch { work: body.work, diff --git a/src/stockfish.rs b/src/stockfish.rs index ca8cc943..ef0d517a 100644 --- a/src/stockfish.rs +++ b/src/stockfish.rs @@ -353,6 +353,7 @@ impl StockfishActor { work: position.work, position_id: position.position_id, url: position.url, + skip: position.skip, best_move: parts.next().and_then(|m| m.parse().ok()), scores, depth, From eedd72f3d92cd98c0fdc90ad23507e1047bb0b4a Mon Sep 17 00:00:00 2001 From: Niklas Fiekas Date: Mon, 1 Jan 2024 23:05:52 +0100 Subject: [PATCH 03/10] wip chunking --- src/api.rs | 2 +- src/ipc.rs | 37 ++++++---- src/logger.rs | 18 +++-- src/main.rs | 28 ++++---- src/queue.rs | 181 ++++++++++++++++++++++++++++------------------- src/stockfish.rs | 60 +++++++++------- 6 files changed, 197 insertions(+), 129 deletions(-) diff --git a/src/api.rs b/src/api.rs index 6e99a899..551d5e59 100644 --- a/src/api.rs +++ b/src/api.rs @@ -155,7 +155,7 @@ impl Work { } } - pub fn timeout(&self) -> Duration { + pub fn timeout_per_position(&self) -> Duration { match *self { Work::Analysis { timeout, .. } => timeout, Work::Move { .. } => Duration::from_secs(2), diff --git a/src/ipc.rs b/src/ipc.rs index 8b1df4bc..5f812236 100644 --- a/src/ipc.rs +++ b/src/ipc.rs @@ -9,16 +9,29 @@ use crate::{ assets::EngineFlavor, }; +#[derive(Debug)] +pub struct Chunk { + pub work: Work, + pub variant: Variant, + pub flavor: EngineFlavor, + pub positions: Vec, +} + +impl Chunk { + pub const MAX_POSITIONS: usize = 5; + + pub fn timeout(&self) -> Duration { + self.positions.len() as u32 * self.work.timeout_per_position() + } +} + #[derive(Debug, Clone)] pub struct Position { pub work: Work, - pub position_id: PositionId, - pub flavor: EngineFlavor, + pub position_id: Option, pub url: Option, - pub skip: bool, - pub variant: Variant, pub root_fen: Fen, pub moves: Vec, } @@ -26,11 +39,9 @@ pub struct Position { #[derive(Debug, Clone)] pub struct PositionResponse { pub work: Work, - pub position_id: PositionId, + pub position_id: Option, pub url: Option, - pub skip: bool, - pub scores: Matrix, pub pvs: Matrix>, pub best_move: Option, @@ -93,23 +104,23 @@ impl Matrix { } #[derive(Debug)] -pub struct PositionFailed { +pub struct ChunkFailed { pub batch_id: BatchId, } #[derive(Debug)] pub struct Pull { - pub response: Option>, - pub callback: oneshot::Sender, + pub responses: Result, ChunkFailed>, + pub callback: oneshot::Sender, } impl Pull { pub fn split( self, ) -> ( - Option>, - oneshot::Sender, + Result, ChunkFailed>, + oneshot::Sender, ) { - (self.response, self.callback) + (self.responses, self.callback) } } diff --git a/src/logger.rs b/src/logger.rs index 893dfc7b..87cddd3d 100644 --- a/src/logger.rs +++ b/src/logger.rs @@ -12,7 +12,7 @@ use url::Url; use crate::{ api::{BatchId, PositionId}, configure::Verbose, - ipc::{Position, PositionResponse}, + ipc::{Chunk, Position, PositionResponse}, util::NevermindExt as _, }; @@ -84,7 +84,7 @@ impl Logger { P: Into, { let line = format!( - "{} {} cores, {} queued, latest: {}", + "{} {} cores, {} chunks, latest: {}", queue, queue.cores, queue.pending, @@ -129,12 +129,22 @@ impl fmt::Display for ProgressAt { } } +impl From<&Chunk> for ProgressAt { + fn from(chunk: &Chunk) -> ProgressAt { + ProgressAt { + batch_id: chunk.work.id(), + batch_url: chunk.positions.last().and_then(|pos| pos.url.clone()), + position_id: chunk.positions.last().and_then(|pos| pos.position_id), + } + } +} + impl From<&Position> for ProgressAt { fn from(pos: &Position) -> ProgressAt { ProgressAt { batch_id: pos.work.id(), batch_url: pos.url.clone(), - position_id: Some(pos.position_id), + position_id: pos.position_id, } } } @@ -144,7 +154,7 @@ impl From<&PositionResponse> for ProgressAt { ProgressAt { batch_id: pos.work.id(), batch_url: pos.url.clone(), - position_id: Some(pos.position_id), + position_id: pos.position_id, } } } diff --git a/src/main.rs b/src/main.rs index 6e2c7687..6f0a065a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -35,7 +35,7 @@ use tokio::{ use crate::{ assets::{Assets, ByEngineFlavor, Cpu, EngineFlavor}, configure::{Command, Cores, CpuPriority, Opt}, - ipc::{Position, PositionFailed, Pull}, + ipc::{ChunkFailed, Pull, Chunk}, logger::{Logger, ProgressAt}, util::RandomizedBackoff, }; @@ -249,7 +249,7 @@ async fn run(opt: Opt, logger: &Logger) { } } - // Shutdown queue to abort remaining jobs. + // Shutdown queue to abort remaining chunks. queue.shutdown().await; // Wait for all workers. @@ -266,7 +266,7 @@ async fn run(opt: Opt, logger: &Logger) { async fn worker(i: usize, assets: Arc, tx: mpsc::Sender, logger: Logger) { logger.debug(&format!("Started worker {i}.")); - let mut job: Option = None; + let mut chunk: Option = None; let mut engine = ByEngineFlavor { official: None, multi_variant: None, @@ -277,10 +277,10 @@ async fn worker(i: usize, assets: Arc, tx: mpsc::Sender, logger: L let mut budget = default_budget; loop { - let response = if let Some(job) = job.take() { + let responses = if let Some(chunk) = chunk.take() { // Ensure engine process is ready. - let flavor = job.flavor; - let context = ProgressAt::from(&job); + let flavor = chunk.flavor; + let context = ProgressAt::from(&chunk); let (mut sf, join_handle) = if let Some((sf, join_handle)) = engine.get_mut(flavor).take() { (sf, join_handle) @@ -310,11 +310,11 @@ async fn worker(i: usize, assets: Arc, tx: mpsc::Sender, logger: L }; // Provide time budget. - budget = min(default_budget, budget) + job.work.timeout(); + budget = min(default_budget, budget) + chunk.timeout(); // Analyse or play. let timer = Instant::now(); - let batch_id = job.work.id(); + let batch_id = chunk.work.id(); let res = tokio::select! { _ = tx.closed() => { logger.debug(&format!("Worker {i} shutting down engine early")); @@ -322,7 +322,7 @@ async fn worker(i: usize, assets: Arc, tx: mpsc::Sender, logger: L join_handle.await.expect("join"); break; } - res = sf.go(job) => { + res = sf.go_multiple(chunk) => { match res { Ok(res) => { *engine.get_mut(flavor) = Some((sf, join_handle)); @@ -344,7 +344,7 @@ async fn worker(i: usize, assets: Arc, tx: mpsc::Sender, logger: L }); drop(sf); join_handle.await.expect("join"); - Err(PositionFailed { batch_id }) + Err(ChunkFailed { batch_id }) } }; @@ -354,14 +354,14 @@ async fn worker(i: usize, assets: Arc, tx: mpsc::Sender, logger: L logger.debug(&format!("Low engine timeout budget: {budget:?}")); } - Some(res) + res } else { - None + Ok(Vec::new()) }; let (callback, waiter) = oneshot::channel(); - if tx.send(Pull { response, callback }).await.is_err() { + if tx.send(Pull { responses, callback }).await.is_err() { logger.debug(&format!( "Worker {i} was about to send result, but shutting down" )); @@ -372,7 +372,7 @@ async fn worker(i: usize, assets: Arc, tx: mpsc::Sender, logger: L _ = tx.closed() => break, res = waiter => { match res { - Ok(next_job) => job = Some(next_job), + Ok(next_chunk) => chunk = Some(next_chunk), Err(_) => break, } } diff --git a/src/queue.rs b/src/queue.rs index 59355f17..a785d7b1 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -2,6 +2,7 @@ use std::{ cmp::{max, min}, collections::{hash_map::Entry, HashMap, VecDeque}, convert::TryInto, + iter::{once, zip}, num::NonZeroUsize, sync::Arc, time::{Duration, Instant}, @@ -26,7 +27,7 @@ use crate::{ }, assets::{EngineFlavor, EvalFlavor}, configure::{BacklogOpt, Endpoint, MaxBackoff, StatsOpt}, - ipc::{Position, PositionFailed, PositionResponse, Pull}, + ipc::{Chunk, Position, ChunkFailed, PositionResponse, Pull}, logger::{short_variant_name, Logger, ProgressAt, QueueStatusBar}, stats::{NpsRecorder, Stats, StatsRecorder}, util::{NevermindExt as _, RandomizedBackoff}, @@ -76,10 +77,8 @@ pub struct QueueStub { impl QueueStub { pub async fn pull(&mut self, pull: Pull) { let mut state = self.state.lock().await; - let (response, callback) = pull.split(); - if let Some(response) = response { - state.handle_position_response(self.clone(), response); - } + let (responses, callback) = pull.split(); + state.handle_position_responses(self, responses); if let Err(callback) = state.try_pull(callback) { if let Some(ref mut tx) = self.tx { tx.send(QueueMessage::Pull { callback }) @@ -125,7 +124,7 @@ impl QueueStub { struct QueueState { shutdown_soon: bool, cores: NonZeroUsize, - incoming: VecDeque, + incoming: VecDeque, pending: HashMap, move_submissions: VecDeque, stats_recorder: StatsRecorder, @@ -161,18 +160,18 @@ impl QueueState { Entry::Vacant(entry) => { let progress_at = ProgressAt::from(&batch); - // Reversal only for cosmetics when displaying progress. - let mut positions = Vec::with_capacity(batch.positions.len()); - for pos in batch.positions.into_iter().rev() { - positions.insert( - 0, - if pos.skip { - Some(Skip::Skip) - } else { - self.incoming.push_back(pos); - None - }, - ); + let mut positions = Vec::new(); + for chunk in &batch.chunks { + for pos in &chunk.positions { + if let Some(position_id) = pos.position_id { + if positions.len() <= position_id.0 { + positions.resize(position_id.0 + 1, None); + } + if pos.skip { + positions[position_id.0] = Some(Skip::Skip); + } + } + } } entry.insert(PendingBatch { @@ -189,22 +188,32 @@ impl QueueState { } } - fn handle_position_response( + fn handle_position_responses( &mut self, - queue: QueueStub, - res: Result, + queue: &QueueStub, + responses: Result, ChunkFailed>, ) { - match res { - Ok(res) => { - let progress_at = ProgressAt::from(&res); - let batch_id = res.work.id(); - if let Some(pending) = self.pending.get_mut(&batch_id) { - if let Some(pos) = pending.positions.get_mut(res.position_id.0) { - *pos = Some(Skip::Present(res)); - } + match responses { + Ok(responses) => { + let mut progress_at = None; + for res in responses { + let Some(position_id) = res.position_id else { + continue; + }; + let batch_id = res.work.id(); + let Some(pending) = self.pending.get_mut(&batch_id) else { + continue; + }; + let Some(pos) = pending.positions.get_mut(position_id.0) else { + continue; + }; + progress_at = Some(ProgressAt::from(&res)); + *pos = Some(Skip::Present(res)); + self.maybe_finished(queue.clone(), batch_id); + } + if let Some(progress_at) = progress_at { + self.logger.progress(self.status_bar(), progress_at); } - self.logger.progress(self.status_bar(), progress_at); - self.maybe_finished(queue, batch_id); } Err(failed) => { // Just forget about batches with failed positions, @@ -216,12 +225,9 @@ impl QueueState { } } - fn try_pull( - &mut self, - callback: oneshot::Sender, - ) -> Result<(), oneshot::Sender> { - if let Some(position) = self.incoming.pop_front() { - if let Err(err) = callback.send(position) { + fn try_pull(&mut self, callback: oneshot::Sender) -> Result<(), oneshot::Sender> { + if let Some(chunk) = self.incoming.pop_front() { + if let Err(err) = callback.send(chunk) { self.incoming.push_front(err); } Ok(()) @@ -288,17 +294,11 @@ impl QueueState { Err(pending) => { if !pending.work.matrix_wanted() { // Send partially analysis as progress report. - let progress_report = pending.progress_report(); - if progress_report.iter().filter(|p| p.is_some()).count() - % (self.cores.get() * 2) - == 0 - { - queue.api.submit_analysis( - pending.work.id(), - pending.flavor.eval_flavor(), - progress_report, - ); - } + /* TODO: queue.api.submit_analysis( + pending.work.id(), + pending.flavor.eval_flavor(), + pending.progress_report(), + ); */ } self.pending.insert(pending.work.id(), pending); @@ -310,7 +310,7 @@ impl QueueState { #[derive(Debug)] enum QueueMessage { - Pull { callback: oneshot::Sender }, + Pull { callback: oneshot::Sender }, MoveSubmitted, } @@ -507,12 +507,12 @@ enum Skip { Skip, } -#[derive(Debug, Clone)] +#[derive(Debug)] pub struct IncomingBatch { work: Work, flavor: EngineFlavor, variant: Variant, - positions: Vec, + chunks: Vec, url: Option, } @@ -558,23 +558,27 @@ impl IncomingBatch { url: url.clone(), flavor, variant: body.variant, - positions: match body.work { + chunks: match body.work { Work::Move { .. } => { - vec![Position { - work: body.work, - url, - skip: false, + vec![Chunk { + work: body.work.clone(), flavor, - position_id: PositionId(0), variant: body.variant, - root_fen, - moves: body_moves, + positions: vec![Position { + work: body.work, + url, + skip: false, + position_id: Some(PositionId(0)), + root_fen, + moves: body_moves, + }], }] } Work::Analysis { .. } => { + // Iterate forwards to prepare positions. let mut moves = Vec::new(); - let mut positions = Vec::with_capacity(body_moves.len() + 1); - + let num_positions = body_moves.len() + 1; + let mut positions = Vec::with_capacity(num_positions); positions.push(Position { work: body.work.clone(), url: url.clone().map(|mut url| { @@ -582,13 +586,10 @@ impl IncomingBatch { url }), skip: body.skip_positions.contains(&PositionId(0)), - flavor, - position_id: PositionId(0), - variant: body.variant, + position_id: Some(PositionId(0)), root_fen: root_fen.clone(), moves: moves.clone(), }); - for (i, m) in body_moves.into_iter().enumerate() { let position_id = PositionId(i + 1); moves.push(m); @@ -599,30 +600,68 @@ impl IncomingBatch { url }), skip: body.skip_positions.contains(&position_id), - flavor, - position_id, - variant: body.variant, + position_id: Some(position_id), root_fen: root_fen.clone(), moves: moves.clone(), }); } + // Reverse for backwards analysis. + positions.reverse(); + + // Prepare dummy positions, so the respective previous + // position is available when creating chunks. + let prev_and_current: Vec<_> = zip( + once(None).chain(positions.clone().into_iter().map(|pos| { + Some(Position { + position_id: None, + ..pos + }) + })), + positions, + ) + .collect(); + + // Create chunks with overlap. + let mut chunks = Vec::new(); + for prev_and_current_chunked in prev_and_current.chunks(Chunk::MAX_POSITIONS) { + let mut chunk_positions = Vec::new(); + for (prev, current) in prev_and_current_chunked { + if !current.skip { + if let Some(prev) = prev { + if prev.skip || chunk_positions.is_empty() { + chunk_positions.push(prev.clone()); + } + } + chunk_positions.push(current.clone()); + } + } + if !chunk_positions.is_empty() { + chunks.push(Chunk { + work: body.work.clone(), + flavor, + variant: body.variant, + positions: chunk_positions, + }); + } + } + // Edge case: Batch is immediately completed, because all // positions are skipped. - if positions.iter().all(|p| p.skip) { + if chunks.is_empty() { let now = Instant::now(); return Err(IncomingError::AllSkipped(CompletedBatch { work: body.work, url, flavor, variant: body.variant, - positions: positions.into_iter().map(|_| Skip::Skip).collect(), + positions: vec![Skip::Skip; num_positions], started_at: now, completed_at: now, })); } - positions + chunks } }, }) diff --git a/src/stockfish.rs b/src/stockfish.rs index ef0d517a..2fa96663 100644 --- a/src/stockfish.rs +++ b/src/stockfish.rs @@ -14,8 +14,8 @@ use tokio::{ use crate::{ api::{Score, Work}, - assets::EngineFlavor, - ipc::{Matrix, Position, PositionFailed, PositionResponse}, + assets::{EngineFlavor, EvalFlavor}, + ipc::{Matrix, Position, ChunkFailed, PositionResponse, Chunk}, logger::Logger, util::NevermindExt as _, }; @@ -38,14 +38,14 @@ pub struct StockfishStub { } impl StockfishStub { - pub async fn go(&mut self, position: Position) -> Result { - let (callback, response) = oneshot::channel(); - let batch_id = position.work.id(); + pub async fn go_multiple(&mut self, chunk: Chunk) -> Result, ChunkFailed> { + let (callback, responses) = oneshot::channel(); + let batch_id = chunk.work.id(); self.tx - .send(StockfishMessage::Go { position, callback }) + .send(StockfishMessage::GoMultiple { chunk, callback }) .await - .map_err(|_| PositionFailed { batch_id })?; - response.await.map_err(|_| PositionFailed { batch_id }) + .map_err(|_| ChunkFailed { batch_id })?; + responses.await.map_err(|_| ChunkFailed { batch_id }) } } @@ -58,9 +58,9 @@ pub struct StockfishActor { #[derive(Debug)] enum StockfishMessage { - Go { - position: Position, - callback: oneshot::Sender, + GoMultiple { + chunk: Chunk, + callback: oneshot::Sender>, }, } @@ -176,13 +176,13 @@ impl StockfishActor { msg: StockfishMessage, ) -> Result<(), EngineError> { match msg { - StockfishMessage::Go { + StockfishMessage::GoMultiple { mut callback, - position, + chunk, } => { tokio::select! { _ = callback.closed() => Err(EngineError::Shutdown), - res = self.go(stdout, stdin, position) => { + res = self.go_multiple(stdout, stdin, chunk) => { callback.send(res?).nevermind("go receiver dropped"); Ok(()) } @@ -220,12 +220,7 @@ impl StockfishActor { Ok(()) } - async fn go( - &mut self, - stdout: &mut Stdout, - stdin: &mut BufWriter, - position: Position, - ) -> io::Result { + async fn go_multiple(&mut self, stdout: &mut Stdout, stdin: &mut BufWriter, chunk: Chunk) -> io::Result> { // Set global options (once). self.init(stdout, stdin).await?; @@ -237,17 +232,17 @@ impl StockfishActor { .write_all( format!( "setoption name Use NNUE value {}\n", - position.flavor.eval_flavor().is_nnue() + chunk.flavor.eval_flavor().is_nnue() ) .as_bytes(), ) .await?; - if position.flavor == EngineFlavor::MultiVariant { + if chunk.flavor == EngineFlavor::MultiVariant { stdin .write_all( format!( "setoption name UCI_Variant value {}\n", - position.variant.uci() + chunk.variant.uci() ) .as_bytes(), ) @@ -255,10 +250,24 @@ impl StockfishActor { } stdin .write_all( - format!("setoption name MultiPV value {}\n", position.work.multipv()).as_bytes(), + format!("setoption name MultiPV value {}\n", chunk.work.multipv()).as_bytes(), ) .await?; + let mut responses = Vec::new(); + for position in chunk.positions { + responses.push(self.go(stdout, stdin, chunk.flavor.eval_flavor(), position).await?); + } + Ok(responses) + } + + async fn go( + &mut self, + stdout: &mut Stdout, + stdin: &mut BufWriter, + eval_flavor: EvalFlavor, + position: Position, + ) -> io::Result { // Setup position. let moves = position .moves @@ -317,7 +326,7 @@ impl StockfishActor { let mut go = vec![ "go".to_owned(), "nodes".to_owned(), - nodes.get(position.flavor.eval_flavor()).to_string(), + nodes.get(eval_flavor).to_string(), ]; if let Some(depth) = depth { @@ -353,7 +362,6 @@ impl StockfishActor { work: position.work, position_id: position.position_id, url: position.url, - skip: position.skip, best_move: parts.next().and_then(|m| m.parse().ok()), scores, depth, From 71801918b55950b66880ee9e79797e1da1a5dc09 Mon Sep 17 00:00:00 2001 From: Niklas Fiekas Date: Tue, 2 Jan 2024 00:01:35 +0100 Subject: [PATCH 04/10] fix incoming chunks not added --- src/main.rs | 11 +++++++++-- src/queue.rs | 23 +++++++++++++++-------- src/stockfish.rs | 26 +++++++++++++++++--------- 3 files changed, 41 insertions(+), 19 deletions(-) diff --git a/src/main.rs b/src/main.rs index 6f0a065a..3faa354f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -35,7 +35,7 @@ use tokio::{ use crate::{ assets::{Assets, ByEngineFlavor, Cpu, EngineFlavor}, configure::{Command, Cores, CpuPriority, Opt}, - ipc::{ChunkFailed, Pull, Chunk}, + ipc::{Chunk, ChunkFailed, Pull}, logger::{Logger, ProgressAt}, util::RandomizedBackoff, }; @@ -361,7 +361,14 @@ async fn worker(i: usize, assets: Arc, tx: mpsc::Sender, logger: L let (callback, waiter) = oneshot::channel(); - if tx.send(Pull { responses, callback }).await.is_err() { + if tx + .send(Pull { + responses, + callback, + }) + .await + .is_err() + { logger.debug(&format!( "Worker {i} was about to send result, but shutting down" )); diff --git a/src/queue.rs b/src/queue.rs index a785d7b1..293a83ad 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -27,7 +27,7 @@ use crate::{ }, assets::{EngineFlavor, EvalFlavor}, configure::{BacklogOpt, Endpoint, MaxBackoff, StatsOpt}, - ipc::{Chunk, Position, ChunkFailed, PositionResponse, Pull}, + ipc::{Chunk, ChunkFailed, Position, PositionResponse, Pull}, logger::{short_variant_name, Logger, ProgressAt, QueueStatusBar}, stats::{NpsRecorder, Stats, StatsRecorder}, util::{NevermindExt as _, RandomizedBackoff}, @@ -146,7 +146,7 @@ impl QueueState { fn status_bar(&self) -> QueueStatusBar { QueueStatusBar { - pending: self.pending.values().map(|p| p.pending()).sum(), + pending: self.incoming.len(), cores: self.cores, } } @@ -161,7 +161,7 @@ impl QueueState { let progress_at = ProgressAt::from(&batch); let mut positions = Vec::new(); - for chunk in &batch.chunks { + for chunk in batch.chunks { for pos in &chunk.positions { if let Some(position_id) = pos.position_id { if positions.len() <= position_id.0 { @@ -172,6 +172,7 @@ impl QueueState { } } } + self.incoming.push_back(chunk); } entry.insert(PendingBatch { @@ -196,6 +197,7 @@ impl QueueState { match responses { Ok(responses) => { let mut progress_at = None; + let mut batch_ids = Vec::new(); for res in responses { let Some(position_id) = res.position_id else { continue; @@ -209,11 +211,16 @@ impl QueueState { }; progress_at = Some(ProgressAt::from(&res)); *pos = Some(Skip::Present(res)); - self.maybe_finished(queue.clone(), batch_id); + if !batch_ids.contains(&batch_id) { + batch_ids.push(batch_id); + } } if let Some(progress_at) = progress_at { self.logger.progress(self.status_bar(), progress_at); } + for batch_id in batch_ids { + self.maybe_finished(queue.clone(), batch_id); + } } Err(failed) => { // Just forget about batches with failed positions, @@ -293,12 +300,12 @@ impl QueueState { } Err(pending) => { if !pending.work.matrix_wanted() { - // Send partially analysis as progress report. - /* TODO: queue.api.submit_analysis( + // Send partial analysis as progress report. + queue.api.submit_analysis( pending.work.id(), pending.flavor.eval_flavor(), pending.progress_report(), - ); */ + ); } self.pending.insert(pending.work.id(), pending); @@ -711,7 +718,7 @@ struct PendingBatch { impl PendingBatch { #[allow(clippy::result_large_err)] fn try_into_completed(self) -> Result { - match self.positions.clone().into_iter().collect() { + match dbg!(self.positions.clone()).into_iter().collect() { Some(positions) => Ok(CompletedBatch { work: self.work, url: self.url, diff --git a/src/stockfish.rs b/src/stockfish.rs index 2fa96663..792f19db 100644 --- a/src/stockfish.rs +++ b/src/stockfish.rs @@ -15,7 +15,7 @@ use tokio::{ use crate::{ api::{Score, Work}, assets::{EngineFlavor, EvalFlavor}, - ipc::{Matrix, Position, ChunkFailed, PositionResponse, Chunk}, + ipc::{Chunk, ChunkFailed, Matrix, Position, PositionResponse}, logger::Logger, util::NevermindExt as _, }; @@ -38,7 +38,10 @@ pub struct StockfishStub { } impl StockfishStub { - pub async fn go_multiple(&mut self, chunk: Chunk) -> Result, ChunkFailed> { + pub async fn go_multiple( + &mut self, + chunk: Chunk, + ) -> Result, ChunkFailed> { let (callback, responses) = oneshot::channel(); let batch_id = chunk.work.id(); self.tx @@ -220,7 +223,12 @@ impl StockfishActor { Ok(()) } - async fn go_multiple(&mut self, stdout: &mut Stdout, stdin: &mut BufWriter, chunk: Chunk) -> io::Result> { + async fn go_multiple( + &mut self, + stdout: &mut Stdout, + stdin: &mut BufWriter, + chunk: Chunk, + ) -> io::Result> { // Set global options (once). self.init(stdout, stdin).await?; @@ -240,11 +248,8 @@ impl StockfishActor { if chunk.flavor == EngineFlavor::MultiVariant { stdin .write_all( - format!( - "setoption name UCI_Variant value {}\n", - chunk.variant.uci() - ) - .as_bytes(), + format!("setoption name UCI_Variant value {}\n", chunk.variant.uci()) + .as_bytes(), ) .await?; } @@ -256,7 +261,10 @@ impl StockfishActor { let mut responses = Vec::new(); for position in chunk.positions { - responses.push(self.go(stdout, stdin, chunk.flavor.eval_flavor(), position).await?); + responses.push( + self.go(stdout, stdin, chunk.flavor.eval_flavor(), position) + .await?, + ); } Ok(responses) } From 8fd07d6bed604e2263943d157341244e9b9ef243 Mon Sep 17 00:00:00 2001 From: Niklas Fiekas Date: Tue, 2 Jan 2024 00:10:31 +0100 Subject: [PATCH 05/10] fix some positions not marked as skip --- src/queue.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/queue.rs b/src/queue.rs index 293a83ad..6c53f1ca 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -165,10 +165,10 @@ impl QueueState { for pos in &chunk.positions { if let Some(position_id) = pos.position_id { if positions.len() <= position_id.0 { - positions.resize(position_id.0 + 1, None); + positions.resize(position_id.0 + 1, Some(Skip::Skip)); } - if pos.skip { - positions[position_id.0] = Some(Skip::Skip); + if !pos.skip { + positions[position_id.0] = None; } } } @@ -631,7 +631,7 @@ impl IncomingBatch { // Create chunks with overlap. let mut chunks = Vec::new(); - for prev_and_current_chunked in prev_and_current.chunks(Chunk::MAX_POSITIONS) { + for prev_and_current_chunked in prev_and_current.chunks(Chunk::MAX_POSITIONS - 1) { let mut chunk_positions = Vec::new(); for (prev, current) in prev_and_current_chunked { if !current.skip { @@ -718,7 +718,7 @@ struct PendingBatch { impl PendingBatch { #[allow(clippy::result_large_err)] fn try_into_completed(self) -> Result { - match dbg!(self.positions.clone()).into_iter().collect() { + match self.positions.clone().into_iter().collect() { Some(positions) => Ok(CompletedBatch { work: self.work, url: self.url, From e592ff4fb68fd1a6582ad24b746c11ec20edb7ce Mon Sep 17 00:00:00 2001 From: Niklas Fiekas Date: Tue, 2 Jan 2024 00:26:17 +0100 Subject: [PATCH 06/10] back to reporting pending positions --- src/logger.rs | 2 +- src/queue.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/logger.rs b/src/logger.rs index 87cddd3d..8ab94cc5 100644 --- a/src/logger.rs +++ b/src/logger.rs @@ -84,7 +84,7 @@ impl Logger { P: Into, { let line = format!( - "{} {} cores, {} chunks, latest: {}", + "{} {} cores, {} queued, latest: {}", queue, queue.cores, queue.pending, diff --git a/src/queue.rs b/src/queue.rs index 6c53f1ca..23582124 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -146,7 +146,7 @@ impl QueueState { fn status_bar(&self) -> QueueStatusBar { QueueStatusBar { - pending: self.incoming.len(), + pending: self.pending.values().map(|p| p.pending()).sum(), cores: self.cores, } } From acfe0a01d7b0aed5a078343950c50f3cc2c7779c Mon Sep 17 00:00:00 2001 From: Niklas Fiekas Date: Tue, 2 Jan 2024 00:30:24 +0100 Subject: [PATCH 07/10] PositionId -> PositionIndex --- src/api.rs | 4 ++-- src/ipc.rs | 6 +++--- src/logger.rs | 14 +++++++------- src/queue.rs | 34 +++++++++++++++++----------------- src/stockfish.rs | 2 +- 5 files changed, 30 insertions(+), 30 deletions(-) diff --git a/src/api.rs b/src/api.rs index 551d5e59..b8782bf1 100644 --- a/src/api.rs +++ b/src/api.rs @@ -289,7 +289,7 @@ impl From for Duration { } #[derive(Debug, Copy, Clone, Eq, PartialEq, Deserialize)] -pub struct PositionId(pub usize); +pub struct PositionIndex(pub usize); #[serde_as] #[derive(Debug, Deserialize)] @@ -306,7 +306,7 @@ pub struct AcquireResponseBody { #[serde_as(as = "StringWithSeparator::")] pub moves: Vec, #[serde(rename = "skipPositions", default)] - pub skip_positions: Vec, + pub skip_positions: Vec, } impl AcquireResponseBody { diff --git a/src/ipc.rs b/src/ipc.rs index 5f812236..b7e8215a 100644 --- a/src/ipc.rs +++ b/src/ipc.rs @@ -5,7 +5,7 @@ use tokio::sync::oneshot; use url::Url; use crate::{ - api::{AnalysisPart, BatchId, PositionId, Score, Work}, + api::{AnalysisPart, BatchId, PositionIndex, Score, Work}, assets::EngineFlavor, }; @@ -28,7 +28,7 @@ impl Chunk { #[derive(Debug, Clone)] pub struct Position { pub work: Work, - pub position_id: Option, + pub position_index: Option, pub url: Option, pub skip: bool, @@ -39,7 +39,7 @@ pub struct Position { #[derive(Debug, Clone)] pub struct PositionResponse { pub work: Work, - pub position_id: Option, + pub position_index: Option, pub url: Option, pub scores: Matrix, diff --git a/src/logger.rs b/src/logger.rs index 8ab94cc5..3d738575 100644 --- a/src/logger.rs +++ b/src/logger.rs @@ -10,7 +10,7 @@ use shakmaty::variant::Variant; use url::Url; use crate::{ - api::{BatchId, PositionId}, + api::{BatchId, PositionIndex}, configure::Verbose, ipc::{Chunk, Position, PositionResponse}, util::NevermindExt as _, @@ -108,20 +108,20 @@ impl Logger { pub struct ProgressAt { pub batch_id: BatchId, pub batch_url: Option, - pub position_id: Option, + pub position_index: Option, } impl fmt::Display for ProgressAt { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { if let Some(ref batch_url) = self.batch_url { let mut url = batch_url.clone(); - if let Some(PositionId(positon_id)) = self.position_id { + if let Some(PositionIndex(positon_id)) = self.position_index { url.set_fragment(Some(&positon_id.to_string())); } fmt::Display::fmt(&url, f) } else { write!(f, "{}", self.batch_id)?; - if let Some(PositionId(positon_id)) = self.position_id { + if let Some(PositionIndex(positon_id)) = self.position_index { write!(f, "#{positon_id}")?; } Ok(()) @@ -134,7 +134,7 @@ impl From<&Chunk> for ProgressAt { ProgressAt { batch_id: chunk.work.id(), batch_url: chunk.positions.last().and_then(|pos| pos.url.clone()), - position_id: chunk.positions.last().and_then(|pos| pos.position_id), + position_index: chunk.positions.last().and_then(|pos| pos.position_index), } } } @@ -144,7 +144,7 @@ impl From<&Position> for ProgressAt { ProgressAt { batch_id: pos.work.id(), batch_url: pos.url.clone(), - position_id: pos.position_id, + position_index: pos.position_index, } } } @@ -154,7 +154,7 @@ impl From<&PositionResponse> for ProgressAt { ProgressAt { batch_id: pos.work.id(), batch_url: pos.url.clone(), - position_id: pos.position_id, + position_index: pos.position_index, } } } diff --git a/src/queue.rs b/src/queue.rs index 23582124..daefde22 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -22,7 +22,7 @@ use url::Url; use crate::{ api::{ - AcquireQuery, AcquireResponseBody, Acquired, AnalysisPart, ApiStub, BatchId, PositionId, + AcquireQuery, AcquireResponseBody, Acquired, AnalysisPart, ApiStub, BatchId, PositionIndex, Work, }, assets::{EngineFlavor, EvalFlavor}, @@ -163,12 +163,12 @@ impl QueueState { let mut positions = Vec::new(); for chunk in batch.chunks { for pos in &chunk.positions { - if let Some(position_id) = pos.position_id { - if positions.len() <= position_id.0 { - positions.resize(position_id.0 + 1, Some(Skip::Skip)); + if let Some(position_index) = pos.position_index { + if positions.len() <= position_index.0 { + positions.resize(position_index.0 + 1, Some(Skip::Skip)); } if !pos.skip { - positions[position_id.0] = None; + positions[position_index.0] = None; } } } @@ -199,14 +199,14 @@ impl QueueState { let mut progress_at = None; let mut batch_ids = Vec::new(); for res in responses { - let Some(position_id) = res.position_id else { + let Some(position_index) = res.position_index else { continue; }; let batch_id = res.work.id(); let Some(pending) = self.pending.get_mut(&batch_id) else { continue; }; - let Some(pos) = pending.positions.get_mut(position_id.0) else { + let Some(pos) = pending.positions.get_mut(position_index.0) else { continue; }; progress_at = Some(ProgressAt::from(&res)); @@ -384,7 +384,7 @@ impl QueueActor { let context = ProgressAt { batch_id: body.work.id(), batch_url: body.batch_url(self.api.endpoint()), - position_id: None, + position_index: None, }; match IncomingBatch::from_acquired(self.api.endpoint(), body) { @@ -575,7 +575,7 @@ impl IncomingBatch { work: body.work, url, skip: false, - position_id: Some(PositionId(0)), + position_index: Some(PositionIndex(0)), root_fen, moves: body_moves, }], @@ -592,22 +592,22 @@ impl IncomingBatch { url.set_fragment(Some("0")); url }), - skip: body.skip_positions.contains(&PositionId(0)), - position_id: Some(PositionId(0)), + skip: body.skip_positions.contains(&PositionIndex(0)), + position_index: Some(PositionIndex(0)), root_fen: root_fen.clone(), moves: moves.clone(), }); for (i, m) in body_moves.into_iter().enumerate() { - let position_id = PositionId(i + 1); + let position_index = PositionIndex(i + 1); moves.push(m); positions.push(Position { work: body.work.clone(), url: url.clone().map(|mut url| { - url.set_fragment(Some(&position_id.0.to_string())); + url.set_fragment(Some(&position_index.0.to_string())); url }), - skip: body.skip_positions.contains(&position_id), - position_id: Some(position_id), + skip: body.skip_positions.contains(&position_index), + position_index: Some(position_index), root_fen: root_fen.clone(), moves: moves.clone(), }); @@ -621,7 +621,7 @@ impl IncomingBatch { let prev_and_current: Vec<_> = zip( once(None).chain(positions.clone().into_iter().map(|pos| { Some(Position { - position_id: None, + position_index: None, ..pos }) })), @@ -680,7 +680,7 @@ impl From<&IncomingBatch> for ProgressAt { ProgressAt { batch_id: batch.work.id(), batch_url: batch.url.clone(), - position_id: None, + position_index: None, } } } diff --git a/src/stockfish.rs b/src/stockfish.rs index 792f19db..8694cd0b 100644 --- a/src/stockfish.rs +++ b/src/stockfish.rs @@ -368,7 +368,7 @@ impl StockfishActor { return Ok(PositionResponse { work: position.work, - position_id: position.position_id, + position_index: position.position_index, url: position.url, best_move: parts.next().and_then(|m| m.parse().ok()), scores, From 408eac3c46441bb8097828f75b3ea62354a3d915 Mon Sep 17 00:00:00 2001 From: Niklas Fiekas Date: Tue, 2 Jan 2024 00:32:11 +0100 Subject: [PATCH 08/10] target 5 real positions per chunk --- src/ipc.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ipc.rs b/src/ipc.rs index b7e8215a..c9a27a32 100644 --- a/src/ipc.rs +++ b/src/ipc.rs @@ -18,7 +18,7 @@ pub struct Chunk { } impl Chunk { - pub const MAX_POSITIONS: usize = 5; + pub const MAX_POSITIONS: usize = 6; pub fn timeout(&self) -> Duration { self.positions.len() as u32 * self.work.timeout_per_position() From b13785a1ac31418645983da596d4a4c8d8ad4ef7 Mon Sep 17 00:00:00 2001 From: Niklas Fiekas Date: Tue, 2 Jan 2024 12:36:41 +0100 Subject: [PATCH 09/10] only count real positions toward time budget --- src/ipc.rs | 6 +++++- src/queue.rs | 4 +++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/src/ipc.rs b/src/ipc.rs index c9a27a32..4e4a43d3 100644 --- a/src/ipc.rs +++ b/src/ipc.rs @@ -21,7 +21,11 @@ impl Chunk { pub const MAX_POSITIONS: usize = 6; pub fn timeout(&self) -> Duration { - self.positions.len() as u32 * self.work.timeout_per_position() + self.positions + .iter() + .filter(|pos| pos.position_index.is_some()) + .count() as u32 + * self.work.timeout_per_position() } } diff --git a/src/queue.rs b/src/queue.rs index daefde22..d41f05b4 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -631,7 +631,9 @@ impl IncomingBatch { // Create chunks with overlap. let mut chunks = Vec::new(); - for prev_and_current_chunked in prev_and_current.chunks(Chunk::MAX_POSITIONS - 1) { + for prev_and_current_chunked in + prev_and_current.chunks(Chunk::MAX_POSITIONS - 1) + { let mut chunk_positions = Vec::new(); for (prev, current) in prev_and_current_chunked { if !current.skip { From 986b9e014390b7c0d9f88353b7be19561cac1068 Mon Sep 17 00:00:00 2001 From: Niklas Fiekas Date: Tue, 2 Jan 2024 13:10:09 +0100 Subject: [PATCH 10/10] reduce default budget (can be amortized for each chunk) --- src/main.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main.rs b/src/main.rs index 3faa354f..8c36e564 100644 --- a/src/main.rs +++ b/src/main.rs @@ -273,7 +273,7 @@ async fn worker(i: usize, assets: Arc, tx: mpsc::Sender, logger: L }; let mut engine_backoff = RandomizedBackoff::default(); - let default_budget = Duration::from_secs(60); + let default_budget = Duration::from_secs(30); let mut budget = default_budget; loop {