Skip to content

Commit

Permalink
feat(python,rust): Support engine callback for LazyFrame.profile
Browse files Browse the repository at this point in the history
So that we can support the GPU engine in profiled collection of a
lazyframe, plumb through a mechanism for recording raw timings for
nodes that were executed through the PythonScan node.

This necessitates some small changes to the internals of the
NodeTimer, since `Instant`s are opaque. We instead directly store
durations (as nanoseconds since the query start) and when calling into
the IR post-optimization callback, provide a duration that is the
number of nanoseconds since the query was started. On the Python side
we can then keep track and record durations independently, offsetted
by this optimisation duration.

As a side-effect, `profile` now correctly measures the optimisation
time in the logical plan, rather than as previously just the time to
produce the physical plan from the optimised logical plan.
  • Loading branch information
wence- committed Mar 3, 2025
1 parent 69612d4 commit adf45d6
Show file tree
Hide file tree
Showing 7 changed files with 217 additions and 91 deletions.
17 changes: 15 additions & 2 deletions crates/polars-expr/src/state/execution_state.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::borrow::Cow;
use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU8, Ordering};
use std::sync::{Mutex, RwLock};
use std::time::Duration;

use bitflags::bitflags;
use once_cell::sync::OnceCell;
Expand Down Expand Up @@ -136,8 +137,8 @@ impl ExecutionState {
}

/// Toggle this to measure execution times.
pub fn time_nodes(&mut self) {
self.node_timer = Some(NodeTimer::new())
pub fn time_nodes(&mut self, start: std::time::Instant) {
self.node_timer = Some(NodeTimer::new(start))
}
pub fn has_node_timer(&self) -> bool {
self.node_timer.is_some()
Expand All @@ -147,6 +148,18 @@ impl ExecutionState {
self.node_timer.unwrap().finish()
}

// Timings should be a list of (start, end, name) where the start
// and end are raw durations since the query start as nanoseconds.
pub fn record_raw_timings(&self, timings: &[(u64, u64, String)]) -> () {
for &(start, end, ref name) in timings {
self.node_timer.as_ref().unwrap().store_duration(
Duration::from_nanos(start),
Duration::from_nanos(end),
name.to_string(),
);
}
}

// This is wrong when the U64 overflows which will never happen.
pub fn should_stop(&self) -> PolarsResult<()> {
try_raise_keyboard_interrupt();
Expand Down
22 changes: 15 additions & 7 deletions crates/polars-expr/src/state/node_timer.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::sync::Mutex;
use std::time::Instant;
use std::time::{Duration, Instant};

use polars_core::prelude::*;
use polars_core::utils::NoNull;
Expand All @@ -8,7 +8,7 @@ type StartInstant = Instant;
type EndInstant = Instant;

type Nodes = Vec<String>;
type Ticks = Vec<(StartInstant, EndInstant)>;
type Ticks = Vec<(Duration, Duration)>;

#[derive(Clone)]
pub(super) struct NodeTimer {
Expand All @@ -17,14 +17,22 @@ pub(super) struct NodeTimer {
}

impl NodeTimer {
pub(super) fn new() -> Self {
pub(super) fn new(query_start: Instant) -> Self {
Self {
query_start: Instant::now(),
query_start,
data: Arc::new(Mutex::new((Vec::with_capacity(16), Vec::with_capacity(16)))),
}
}

pub(super) fn store(&self, start: StartInstant, end: EndInstant, name: String) {
self.store_duration(
start.duration_since(self.query_start),
end.duration_since(self.query_start),
name,
)
}

pub(super) fn store_duration(&self, start: Duration, end: Duration, name: String) {
let mut data = self.data.lock().unwrap();
let nodes = &mut data.0;
nodes.push(name);
Expand All @@ -41,18 +49,18 @@ impl NodeTimer {
// first value is end of optimization
polars_ensure!(!ticks.is_empty(), ComputeError: "no data to time");
let start = ticks[0].0;
ticks.push((self.query_start, start));
ticks.push((Duration::from_nanos(0), start));
let nodes_s = Column::new(PlSmallStr::from_static("node"), nodes);
let start: NoNull<UInt64Chunked> = ticks
.iter()
.map(|(start, _)| (start.duration_since(self.query_start)).as_micros() as u64)
.map(|(start, _)| start.as_micros() as u64)
.collect();
let mut start = start.into_inner();
start.rename(PlSmallStr::from_static("start"));

let end: NoNull<UInt64Chunked> = ticks
.iter()
.map(|(_, end)| (end.duration_since(self.query_start)).as_micros() as u64)
.map(|(_, end)| end.as_micros() as u64)
.collect();
let mut end = end.into_inner();
end.rename(PlSmallStr::from_static("end"));
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-lazy/src/frame/exitable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use super::*;

impl LazyFrame {
pub fn collect_concurrently(self) -> PolarsResult<InProcessQuery> {
let (mut state, mut physical_plan, _) = self.prepare_collect(false)?;
let (mut state, mut physical_plan, _) = self.prepare_collect(false, None)?;

let (tx, rx) = channel();
let token = state.cancel_token();
Expand Down
54 changes: 35 additions & 19 deletions crates/polars-lazy/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ use crate::frame::cached_arenas::CachedArena;
use crate::physical_plan::streaming::insert_streaming_nodes;
use crate::prelude::*;

// Function called after logical plan optimization that can potentially change the plan.
type PostOptFn =
Fn(Node, &mut Arena<IR>, &mut Arena<AExpr>, Option<std::time::Duration>) -> PolarsResult<()>;

pub trait IntoLazy {
fn lazy(self) -> LazyFrame;
}
Expand Down Expand Up @@ -668,18 +672,23 @@ impl LazyFrame {
fn prepare_collect_post_opt<P>(
mut self,
check_sink: bool,
post_opt: P,
) -> PolarsResult<(ExecutionState, Box<dyn Executor>, bool)>
where
P: Fn(Node, &mut Arena<IR>, &mut Arena<AExpr>) -> PolarsResult<()>,
{
query_start: Option<std::time::Instant>,
post_opt: PostOptFn,
) -> PolarsResult<(ExecutionState, Box<dyn Executor>, bool)> {
let (mut lp_arena, mut expr_arena) = self.get_arenas();

let mut scratch = vec![];
let lp_top =
self.optimize_with_scratch(&mut lp_arena, &mut expr_arena, &mut scratch, false)?;

post_opt(lp_top, &mut lp_arena, &mut expr_arena)?;
post_opt(
lp_top,
&mut lp_arena,
&mut expr_arena,
// Post optimization callback gets the time since the
// query was started as its "base" timepoint.
query_start.map(|s| s.elapsed()),
)?;

// sink should be replaced
let no_file_sink = if check_sink {
Expand All @@ -694,20 +703,19 @@ impl LazyFrame {
}

// post_opt: A function that is called after optimization. This can be used to modify the IR jit.
pub fn _collect_post_opt<P>(self, post_opt: P) -> PolarsResult<DataFrame>
where
P: Fn(Node, &mut Arena<IR>, &mut Arena<AExpr>) -> PolarsResult<()>,
{
let (mut state, mut physical_plan, _) = self.prepare_collect_post_opt(false, post_opt)?;
pub fn _collect_post_opt<P>(self, post_opt: PostOptFn) -> PolarsResult<DataFrame> {
let (mut state, mut physical_plan, _) =
self.prepare_collect_post_opt(false, None, post_opt)?;
physical_plan.execute(&mut state)
}

#[allow(unused_mut)]
fn prepare_collect(
self,
check_sink: bool,
query_start: Option<std::time::Instant>,
) -> PolarsResult<(ExecutionState, Box<dyn Executor>, bool)> {
self.prepare_collect_post_opt(check_sink, |_, _, _| Ok(()))
self.prepare_collect_post_opt(check_sink, query_start, |_, _, _, _| Ok(()))
}

/// Execute all the lazy operations and collect them into a [`DataFrame`].
Expand Down Expand Up @@ -745,7 +753,19 @@ impl LazyFrame {
physical_plan.execute(&mut state)
}
#[cfg(not(feature = "new_streaming"))]
self._collect_post_opt(|_, _, _| Ok(()))
self._collect_post_opt(|_, _, _, _| Ok(()))
}

// post_opt: A function that is called after optimization. This can be used to modify the IR jit.
// This version does profiling of the node execution.
pub fn _profile_post_opt<P>(self, post_opt: PostOptFn) -> PolarsResult<(DataFrame, DataFrame)> {
let query_start = std::time::Instant::now();
let (mut state, mut physical_plan, _) =
self.prepare_collect_post_opt(false, Some(query_start), post_opt)?;
state.time_nodes(query_start);
let out = physical_plan.execute(&mut state)?;
let timer_df = state.finish_timer()?;
Ok((out, timer_df))
}

/// Profile a LazyFrame.
Expand All @@ -756,11 +776,7 @@ impl LazyFrame {
///
/// The units of the timings are microseconds.
pub fn profile(self) -> PolarsResult<(DataFrame, DataFrame)> {
let (mut state, mut physical_plan, _) = self.prepare_collect(false)?;
state.time_nodes();
let out = physical_plan.execute(&mut state)?;
let timer_df = state.finish_timer()?;
Ok((out, timer_df))
self._profile_post_opt(|_, _, _, _| Ok(()))
}

/// Stream a query result into a parquet file. This is useful if the final result doesn't fit
Expand Down Expand Up @@ -919,7 +935,7 @@ impl LazyFrame {
payload,
};
self.opt_state |= OptFlags::STREAMING;
let (mut state, mut physical_plan, is_streaming) = self.prepare_collect(true)?;
let (mut state, mut physical_plan, is_streaming) = self.prepare_collect(true, None)?;
polars_ensure!(
is_streaming,
ComputeError: format!("cannot run the whole query in a streaming order; \
Expand Down
29 changes: 25 additions & 4 deletions crates/polars-mem-engine/src/executors/scan/python_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,31 @@ impl Executor for PythonScanExec {
},
};

let generator_init = if matches!(
self.options.python_source,
PythonScanSource::Pyarrow | PythonScanSource::Cuda
) {
let generator_init = if matches!(self.options.python_source, PythonScanSource::Cuda) {
let args = (
python_scan_function,
with_columns.map(|x| x.into_iter().map(|x| x.to_string()).collect::<Vec<_>>()),
predicate,
n_rows,
// If this boolean is true, callback should return
// a dataframe and list of timings [(start, end,
// name)]
state.has_node_timer(),
);
let result = callable.call1(args).map_err(to_compute_err)?;
if state.has_node_timer() {
let df = result.get_item(0).map_err(to_compute_err);
let timing_info: Vec<(u64, u64, String)> = result
.get_item(1)
.map_err(to_compute_err)?
.extract()
.map_err(to_compute_err)?;
state.record_raw_timings(&timing_info);
df
} else {
Ok(result)
}
} else if matches!(self.options.python_source, PythonScanSource::Pyarrow) {
let args = (
python_scan_function,
with_columns.map(|x| x.into_iter().map(|x| x.to_string()).collect::<Vec<_>>()),
Expand Down
76 changes: 49 additions & 27 deletions crates/polars-python/src/lazyframe/general.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ use polars_core::prelude::*;
#[cfg(feature = "parquet")]
use polars_parquet::arrow::write::StatisticsOptions;
use polars_plan::dsl::ScanSources;
use polars_plan::plans::{AExpr, IR};
use polars_utils::arena::{Arena, Node};
use pyo3::prelude::*;
use pyo3::pybacked::PyBackedStr;
use pyo3::types::{PyDict, PyList};
Expand All @@ -35,6 +37,35 @@ fn pyobject_to_first_path_and_scan_sources(
})
}

fn post_opt_callback(
lambda: &PyObject,
root: Node,
lp_arena: &mut Arena<IR>,
expr_arena: &mut Arena<AExpr>,
duration_since_start: Option<std::time::Duration>,
) -> PolarsResult<()> {
Python::with_gil(|py| {
let nt = NodeTraverser::new(root, std::mem::take(lp_arena), std::mem::take(expr_arena));

// Get a copy of the arenas.
let arenas = nt.get_arenas();

// Pass the node visitor which allows the python callback to replace parts of the query plan.
// Remove "cuda" or specify better once we have multiple post-opt callbacks.
lambda
.call1(py, (nt, duration_since_start.map(|t| t.as_nanos() as u64)))
.map_err(|e| polars_err!(ComputeError: "'cuda' conversion failed: {}", e))?;

// Unpack the arenas.
// At this point the `nt` is useless.

std::mem::swap(lp_arena, &mut *arenas.0.lock().unwrap());
std::mem::swap(expr_arena, &mut *arenas.1.lock().unwrap());

Ok(())
})
}

#[pymethods]
#[allow(clippy::should_implement_trait)]
impl PyLazyFrame {
Expand Down Expand Up @@ -613,8 +644,22 @@ impl PyLazyFrame {
ldf.cache().into()
}

fn profile(&self, py: Python) -> PyResult<(PyDataFrame, PyDataFrame)> {
let (df, time_df) = py.enter_polars(|| self.ldf.clone().profile())?;
#[pyo3(signature = (lambda_post_opt=None))]
fn profile(
&self,
py: Python,
lambda_post_opt: Option<PyObject>,
) -> PyResult<(PyDataFrame, PyDataFrame)> {
let (df, time_df) = py.enter_polars(|| {
let ldf = self.ldf.clone();
if let Some(lambda) = lambda_post_opt {
ldf._profile_post_opt(|root, lp_arena, expr_arena, duration_since_start| {
post_opt_callback(&lambda, root, lp_arena, expr_arena, duration_since_start)
})
} else {
ldf.profile()
}
})?;
Ok((df.into(), time_df.into()))
}

Expand All @@ -623,31 +668,8 @@ impl PyLazyFrame {
py.enter_polars_df(|| {
let ldf = self.ldf.clone();
if let Some(lambda) = lambda_post_opt {
ldf._collect_post_opt(|root, lp_arena, expr_arena| {
Python::with_gil(|py| {
let nt = NodeTraverser::new(
root,
std::mem::take(lp_arena),
std::mem::take(expr_arena),
);

// Get a copy of the arena's.
let arenas = nt.get_arenas();

// Pass the node visitor which allows the python callback to replace parts of the query plan.
// Remove "cuda" or specify better once we have multiple post-opt callbacks.
lambda.call1(py, (nt,)).map_err(
|e| polars_err!(ComputeError: "'cuda' conversion failed: {}", e),
)?;

// Unpack the arena's.
// At this point the `nt` is useless.

std::mem::swap(lp_arena, &mut *arenas.0.lock().unwrap());
std::mem::swap(expr_arena, &mut *arenas.1.lock().unwrap());

Ok(())
})
ldf._collect_post_opt(|root, lp_arena, expr_arena, _| {
post_opt_callback(&lambda, root, lp_arena, expr_arena, None)
})
} else {
ldf.collect()
Expand Down
Loading

0 comments on commit adf45d6

Please sign in to comment.