Skip to content

Commit

Permalink
better batch handling
Browse files Browse the repository at this point in the history
  • Loading branch information
erhant committed Dec 13, 2024
1 parent 6f66d52 commit 0ef85b5
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 12 deletions.
2 changes: 1 addition & 1 deletion compute/src/payloads/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ impl TaskStats {
}

/// Records the execution time of the task.
#[deprecated = "will be removed later"]
/// TODO: #[deprecated = "will be removed later"]
pub fn record_execution_time(mut self, started_at: Instant) -> Self {
self.execution_time = Instant::now().duration_since(started_at).as_nanos();
self
Expand Down
39 changes: 28 additions & 11 deletions compute/src/workers/workflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,22 +93,39 @@ impl WorkflowsWorker {
///
/// Batch size must NOT be larger than `MAX_BATCH_SIZE`, otherwise will panic.
pub async fn run_batch(&mut self, batch_size: usize) {
// TODO: need some better batch_size error handling here
assert!(
batch_size <= Self::MAX_BATCH_SIZE,
"Batch size must not be larger than {}",
Self::MAX_BATCH_SIZE
);

loop {
// get tasks in batch from the channel
let mut task_buffer = Vec::new();
let num_tasks = self
.workflow_rx
.recv_many(&mut task_buffer, batch_size)
.await;

if num_tasks == 0 {
return self.shutdown();
let mut tasks = Vec::new();

// get tasks in batch from the channel, we enter the loop if:
// (1) there are no tasks, or,
// (2) there are tasks less than the batch size and the channel is not empty
while tasks.len() == 0 || (tasks.len() < batch_size && !self.workflow_rx.is_empty()) {
let limit = batch_size - tasks.len();
match self.workflow_rx.recv_many(&mut tasks, limit).await {
// 0 tasks returned means that the channel is closed
0 => return self.shutdown(),
_ => {
// wait a small amount of time to allow for more tasks to be sent into the channel
tokio::time::sleep(std::time::Duration::from_millis(256)).await;
}
}
}

// process the batch
let num_tasks = tasks.len();
debug_assert!(
num_tasks <= batch_size,
"number of tasks cant be larger than batch size"
);
debug_assert!(num_tasks != 0, "number of tasks cant be zero");
log::info!("Processing {} workflows in batch", num_tasks);
let mut batch = task_buffer.into_iter().map(|b| (b, &self.publish_tx));
let mut batch = tasks.into_iter().map(|b| (b, &self.publish_tx));
match num_tasks {
1 => {
WorkflowsWorker::execute(batch.next().unwrap()).await;
Expand Down

0 comments on commit 0ef85b5

Please sign in to comment.