From 62a2e204e5e365bc09b3eab867f3c4d38148b54d Mon Sep 17 00:00:00 2001 From: Kamil Piechowiak <32928185+KamilPiechowiak@users.noreply.github.com> Date: Tue, 31 Dec 2024 11:47:35 +0100 Subject: [PATCH] fix high cpu idle usage in `_buffer` and `_forget` (#7904) GitOrigin-RevId: 822f7d7fae87fca8629169fa1f5c62d0f0720e79 --- CHANGELOG.md | 3 +++ .../communication/src/allocator/counters.rs | 17 +++++++++++++---- 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2b838524..71a0c04d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,9 @@ All notable changes to this project will be documented in this file. This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased] +### Fixed +- temporal behaviors in temporal operators (`windowby`, `interval_join`) now consume no CPU when no data passes through them. + ## [0.16.2] - 2024-12-19 ### Added diff --git a/external/timely-dataflow/communication/src/allocator/counters.rs b/external/timely-dataflow/communication/src/allocator/counters.rs index 4bee0611..feb09362 100644 --- a/external/timely-dataflow/communication/src/allocator/counters.rs +++ b/external/timely-dataflow/communication/src/allocator/counters.rs @@ -34,7 +34,7 @@ impl> Push for Pusher { // if self.count != 0 { // self.events // .borrow_mut() - // .push_back(self.index); + // .push(self.index); // self.count = 0; // } // } @@ -43,10 +43,19 @@ impl> Push for Pusher { // } // TODO: Version above is less chatty, but can be a bit late in // moving information along. Better, but needs cooperation. - self.events - .borrow_mut() - .push(self.index); + // [Pathway extension]: do not push index if the message is none. + // Without it, the program never parks if Variable is used. Then None messages + // circulate and in every call to `step_or_park` some operators are activated. + // This Pusher is only used by Thread allocator and apparently ignoring None + // messages here doesn't block the computation. (The Thread allocator is used + // if only one worker is used and always for Pipeline pact). + if element.is_some() { + self.events + .borrow_mut() + .push(self.index); + } + self.pusher.push(element) } }