Skip to content

Commit

Permalink
chore
Browse files Browse the repository at this point in the history
  • Loading branch information
Weijun-H committed Feb 8, 2025
1 parent aa79feb commit 8b71674
Showing 1 changed file with 21 additions and 8 deletions.
29 changes: 21 additions & 8 deletions datafusion/physical-plan/src/repartition/on_demand_repartition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ impl OnDemandRepartitionExec {
async fn process_input(
input: Arc<dyn ExecutionPlan>,
partition: usize,
buffer_tx: Sender<RecordBatch>,
buffer_tx: tokio::sync::mpsc::Sender<RecordBatch>,
context: Arc<TaskContext>,
fetch_time: metrics::Time,
send_buffer_time: metrics::Time,
Expand Down Expand Up @@ -476,7 +476,7 @@ impl OnDemandRepartitionExec {
context: Arc<TaskContext>,
) -> Result<()> {
// initialize buffer channel so that we can pre-fetch from input
let (buffer_tx, buffer_rx) = async_channel::bounded::<RecordBatch>(2);
let (buffer_tx, mut buffer_rx) = tokio::sync::mpsc::channel(2);
// execute the child operator in a separate task
// that pushes batches into buffer channel with limited capacity
let processing_task = SpawnedTask::spawn(Self::process_input(
Expand All @@ -491,12 +491,6 @@ impl OnDemandRepartitionExec {
let mut batches_until_yield = partitioning.partition_count();
// When the input is done, break the loop
while !output_channels.is_empty() {
// Fetch the batch from the buffer, ideally this should reduce the time gap between the requester and the input stream
let batch = match buffer_rx.recv().await {
Ok(batch) => batch,
_ => break,
};

// Wait until a partition is requested, then get the output partition information
let partition = output_partition_rx.recv().await.map_err(|e| {
internal_datafusion_err!(
Expand All @@ -505,6 +499,25 @@ impl OnDemandRepartitionExec {
)
})?;

// Fetch the batch from the buffer, ideally this should reduce the time gap between the requester and the input stream
let batch_opt = loop {
match buffer_rx.try_recv() {
Ok(batch) => break Some(batch),
Err(tokio::sync::mpsc::error::TryRecvError::Empty) => {
tokio::task::yield_now().await;
}
Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => {
break None
}
}
};

let batch = if let Some(batch) = batch_opt {
batch
} else {
break;
};

let size = batch.get_array_memory_size();

let timer = metrics.send_time[partition].timer();
Expand Down

0 comments on commit 8b71674

Please sign in to comment.