Skip to content

Commit

Permalink
Merge pull request #3354 from tediou5/opt/improve-farm-maintenance-pe…
Browse files Browse the repository at this point in the history
…rformance-via-parallelization

opt: improve farms maintenance performance via parallelization
  • Loading branch information
nazar-pc authored Feb 19, 2025
2 parents 3622c72 + 4250caa commit 186a844
Show file tree
Hide file tree
Showing 2 changed files with 308 additions and 76 deletions.
234 changes: 158 additions & 76 deletions crates/subspace-farmer/src/cluster/controller/farms.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
//! about which pieces are plotted in which sectors of which farm up to date. Implementation
//! automatically handles dynamic farm addition and removal, etc.
#[cfg(test)]
mod tests;

use crate::cluster::controller::ClusterControllerFarmerIdentifyBroadcast;
use crate::cluster::farmer::{ClusterFarm, ClusterFarmerIdentifyFarmBroadcast};
use crate::cluster::nats_client::NatsClient;
Expand All @@ -12,29 +15,107 @@ use crate::farm::{Farm, FarmId, SectorPlottingDetails, SectorUpdate};
use anyhow::anyhow;
use async_lock::RwLock as AsyncRwLock;
use futures::channel::oneshot;
use futures::future::FusedFuture;
use futures::stream::FuturesUnordered;
use futures::{select, FutureExt, StreamExt};
use futures::stream::{FusedStream, FuturesUnordered};
use futures::{select, FutureExt, Stream, StreamExt};
use parking_lot::Mutex;
use std::collections::hash_map::Entry;
use std::collections::{HashMap, VecDeque};
use std::future::{ready, Future};
use std::future::Future;
use std::mem;
use std::pin::{pin, Pin};
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use subspace_core_primitives::hashes::Blake3Hash;
use subspace_core_primitives::sectors::SectorIndex;
use tokio::task;
use tokio::time::MissedTickBehavior;
use tokio_stream::StreamMap;
use tracing::{error, info, trace, warn};

type AddRemoveFuture<'a> =
Pin<Box<dyn Future<Output = Option<(FarmIndex, oneshot::Receiver<()>, ClusterFarm)>> + 'a>>;

/// Number of farms in a cluster is currently limited to 2^16
pub type FarmIndex = u16;

type AddRemoveResult = Option<(FarmIndex, oneshot::Receiver<()>, ClusterFarm)>;
type AddRemoveFuture<'a, R> = Pin<Box<dyn Future<Output = R> + 'a>>;
type AddRemoveStream<'a, R> = Pin<Box<dyn Stream<Item = R> + Unpin + 'a>>;

/// A FarmsAddRemovetreamMap that keeps track of futures that are currently being processed for each `FarmIndex`.
struct FarmsAddRemoveStreamMap<'a, R> {
in_progress: StreamMap<FarmIndex, AddRemoveStream<'a, R>>,
farms_to_add_remove: HashMap<FarmIndex, VecDeque<AddRemoveFuture<'a, R>>>,
}

impl<R> Default for FarmsAddRemoveStreamMap<'_, R> {
fn default() -> Self {
Self {
in_progress: StreamMap::default(),
farms_to_add_remove: HashMap::default(),
}
}
}

impl<'a, R: 'a> FarmsAddRemoveStreamMap<'a, R> {
/// When pushing a new task, it first checks if there is already a future for the given `FarmIndex` in `in_progress`.
/// - If there is, the task is added to `farms_to_add_remove`.
/// - If not, the task is directly added to `in_progress`.
fn push(&mut self, farm_index: FarmIndex, fut: AddRemoveFuture<'a, R>) {
if self.in_progress.contains_key(&farm_index) {
let queue = self.farms_to_add_remove.entry(farm_index).or_default();
queue.push_back(fut);
} else {
self.in_progress
.insert(farm_index, Box::pin(fut.into_stream()) as _);
}
}

/// Polls the next entry in `in_progress` and moves the next task from `farms_to_add_remove` to `in_progress` if there is any.
/// If there are no more tasks to execute, returns `None`.
fn poll_next_entry(&mut self, cx: &mut Context<'_>) -> Poll<Option<R>> {
if let Some((farm_index, res)) = std::task::ready!(self.in_progress.poll_next_unpin(cx)) {
// Current task completed, remove from in_progress queue and check for more tasks
self.in_progress.remove(&farm_index);
self.process_farm_queue(farm_index);
Poll::Ready(Some(res))
} else {
// No more tasks to execute
assert!(self.farms_to_add_remove.is_empty());
Poll::Ready(None)
}
}

/// Process the next task from the farm queue for the given `farm_index`
fn process_farm_queue(&mut self, farm_index: FarmIndex) {
if let Entry::Occupied(mut next_entry) = self.farms_to_add_remove.entry(farm_index) {
let task_queue = next_entry.get_mut();
if let Some(fut) = task_queue.pop_front() {
self.in_progress
.insert(farm_index, Box::pin(fut.into_stream()) as _);
}

// Remove the farm index from the map if there are no more tasks
if task_queue.is_empty() {
next_entry.remove();
}
}
}
}

impl<'a, R: 'a> Stream for FarmsAddRemoveStreamMap<'a, R> {
type Item = R;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
this.poll_next_entry(cx)
}
}

impl<'a, R: 'a> FusedStream for FarmsAddRemoveStreamMap<'a, R> {
fn is_terminated(&self) -> bool {
self.in_progress.is_empty() && self.farms_to_add_remove.is_empty()
}
}

#[derive(Debug)]
struct KnownFarm {
farm_id: FarmId,
Expand Down Expand Up @@ -142,13 +223,8 @@ pub async fn maintain_farms(
identification_broadcast_interval: Duration,
) -> anyhow::Result<()> {
let mut known_farms = KnownFarms::new(identification_broadcast_interval);

// Futures that need to be processed sequentially in order to add/remove farms, if farm was
// added, future will resolve with `Some`, `None` if removed
let mut farms_to_add_remove = VecDeque::<AddRemoveFuture<'_>>::new();
// Farm that is being added/removed right now (if any)
let mut farm_add_remove_in_progress = (Box::pin(ready(None)) as AddRemoveFuture<'_>).fuse();
// Initialize with pending future so it never ends
// Stream map for adding/removing farms
let mut farms_to_add_remove = FarmsAddRemoveStreamMap::default();
let mut farms = FuturesUnordered::new();

let farmer_identify_subscription = pin!(nats_client
Expand All @@ -174,16 +250,10 @@ pub async fn maintain_farms(
farm_pruning_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);

loop {
if farm_add_remove_in_progress.is_terminated() {
if let Some(fut) = farms_to_add_remove.pop_front() {
farm_add_remove_in_progress = fut.fuse();
}
}

select! {
(farm_index, result) = farms.select_next_some() => {
known_farms.remove(farm_index);
farms_to_add_remove.push_back(Box::pin(async move {
farms_to_add_remove.push(farm_index, Box::pin(async move {
let plotted_pieces = Arc::clone(plotted_pieces);

let delete_farm_fut = task::spawn_blocking(move || {
Expand Down Expand Up @@ -240,7 +310,7 @@ pub async fn maintain_farms(
);
}

farms_to_add_remove.push_back(Box::pin(async move {
farms_to_add_remove.push(farm_index, Box::pin(async move {
let plotted_pieces = Arc::clone(plotted_pieces);

let delete_farm_fut = task::spawn_blocking(move || {
Expand All @@ -259,7 +329,7 @@ pub async fn maintain_farms(
}));
}
}
result = farm_add_remove_in_progress => {
result = farms_to_add_remove.select_next_some() => {
if let Some((farm_index, expired_receiver, farm)) = result {
farms.push(async move {
select! {
Expand All @@ -282,7 +352,7 @@ fn process_farm_identify_message<'a>(
identify_message: ClusterFarmerIdentifyFarmBroadcast,
nats_client: &'a NatsClient,
known_farms: &mut KnownFarms,
farms_to_add_remove: &mut VecDeque<AddRemoveFuture<'a>>,
farms_to_add_remove: &mut FarmsAddRemoveStreamMap<'a, AddRemoveResult>,
plotted_pieces: &'a Arc<AsyncRwLock<PlottedPieces<FarmIndex>>>,
) {
let ClusterFarmerIdentifyFarmBroadcast {
Expand Down Expand Up @@ -327,62 +397,68 @@ fn process_farm_identify_message<'a>(
};

if remove {
farms_to_add_remove.push_back(Box::pin(async move {
let plotted_pieces = Arc::clone(plotted_pieces);

let delete_farm_fut = task::spawn_blocking(move || {
plotted_pieces.write_blocking().delete_farm(farm_index);
});
if let Err(error) = delete_farm_fut.await {
error!(
%farm_index,
%farm_id,
%error,
"Failed to delete farm that was replaced"
);
}
farms_to_add_remove.push(
farm_index,
Box::pin(async move {
let plotted_pieces = Arc::clone(plotted_pieces);

let delete_farm_fut = task::spawn_blocking(move || {
plotted_pieces.write_blocking().delete_farm(farm_index);
});
if let Err(error) = delete_farm_fut.await {
error!(
%farm_index,
%farm_id,
%error,
"Failed to delete farm that was replaced"
);
}

None
}));
None
}),
);
}

if add {
farms_to_add_remove.push_back(Box::pin(async move {
match initialize_farm(
farm_index,
farm_id,
total_sectors_count,
Arc::clone(plotted_pieces),
nats_client,
)
.await
{
Ok(farm) => {
if remove {
info!(
%farm_index,
%farm_id,
"Farm re-initialized successfully"
);
} else {
info!(
%farm_index,
%farm_id,
"Farm initialized successfully"
farms_to_add_remove.push(
farm_index,
Box::pin(async move {
match initialize_farm(
farm_index,
farm_id,
total_sectors_count,
Arc::clone(plotted_pieces),
nats_client,
)
.await
{
Ok(farm) => {
if remove {
info!(
%farm_index,
%farm_id,
"Farm re-initialized successfully"
);
} else {
info!(
%farm_index,
%farm_id,
"Farm initialized successfully"
);
}

Some((farm_index, expired_receiver, farm))
}
Err(error) => {
warn!(
%error,
"Failed to initialize farm {farm_id}"
);
None
}

Some((farm_index, expired_receiver, farm))
}
Err(error) => {
warn!(
%error,
"Failed to initialize farm {farm_id}"
);
None
}
}
}));
}),
);
}
}

Expand Down Expand Up @@ -424,15 +500,21 @@ async fn initialize_farm(
.map_err(|error| anyhow!("Failed to get plotted sectors for farm {farm_id}: {error}"))?;

{
let mut plotted_pieces = plotted_pieces.write().await;
plotted_pieces.add_farm(farm_index, farm.piece_reader());
plotted_pieces
.write()
.await
.add_farm(farm_index, farm.piece_reader());

while let Some(plotted_sector_result) = plotted_sectors.next().await {
let plotted_sector = plotted_sector_result.map_err(|error| {
anyhow!("Failed to get plotted sector for farm {farm_id}: {error}")
})?;

plotted_pieces.add_sector(farm_index, &plotted_sector);
let mut plotted_pieces_guard = plotted_pieces.write().await;
plotted_pieces_guard.add_sector(farm_index, &plotted_sector);

// Drop the guard immediately to make sure other tasks are able to access the plotted pieces
drop(plotted_pieces_guard);

task::yield_now().await;
}
Expand Down
Loading

0 comments on commit 186a844

Please sign in to comment.