From 2443a5d3dd1ef6cc78b6e17cd5f820475c96f897 Mon Sep 17 00:00:00 2001 From: Jay Chia Date: Tue, 10 Dec 2024 16:01:30 +0800 Subject: [PATCH 01/13] perf: Ray runner steps with big heap memory inflations --- benchmarking/ooms/big_task_heap_usage.py | 65 ++++++++++++++++++++++++ 1 file changed, 65 insertions(+) create mode 100644 benchmarking/ooms/big_task_heap_usage.py diff --git a/benchmarking/ooms/big_task_heap_usage.py b/benchmarking/ooms/big_task_heap_usage.py new file mode 100644 index 0000000000..9de447e031 --- /dev/null +++ b/benchmarking/ooms/big_task_heap_usage.py @@ -0,0 +1,65 @@ +# /// script +# dependencies = ['numpy'] +# /// + +import argparse +import functools + +import daft +from daft.io._generator import read_generator +from daft.table.table import Table + +NUM_PARTITIONS = 8 + + +@daft.udf(return_dtype=daft.DataType.binary()) +def mock_inflate_data(data, inflation_factor): + return [x * inflation_factor for x in data.to_pylist()] + + +@daft.udf(return_dtype=daft.DataType.binary()) +def mock_deflate_data(data, deflation_factor): + return [x[: int(len(x) / deflation_factor)] for x in data.to_pylist()] + + +def generate(num_rows_per_partition): + yield Table.from_pydict({"foo": [b"x" for _ in range(num_rows_per_partition)]}) + + +def generator( + num_partitions: int, + num_rows_per_partition: int, +): + """Generate data for all partitions.""" + for i in range(num_partitions): + yield functools.partial(generate, num_rows_per_partition) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + "Runs a workload which is a simple map workload, but it will run 2 custom UDFs which first inflates the data, and then deflates it." + ) + parser.add_argument("--num-partitions", default=8) + parser.add_argument("--num-rows-per-partition", default=1000) + parser.add_argument("--inflation-factor", type=float, default=100) + parser.add_argument("--deflation-factor", type=float, default=100) + args = parser.parse_args() + + df = read_generator( + generator(args.num_partitions, args.num_rows_per_partition), + schema=daft.Schema._from_field_name_and_types([("foo", daft.DataType.binary())]), + ) + + df.collect() + print(df) + + # Big memory explosion + df = df.with_column("foo", mock_inflate_data(df["foo"], args.inflation_factor)) + + # Big memory reduction + df = df.with_column("foo", mock_deflate_data(df["foo"], args.deflation_factor)) + + df.explain(True) + + df.collect() + print(df) From 97ab1e2cdc7762dc00e320ccb71d6750dcdd2b3e Mon Sep 17 00:00:00 2001 From: Jay Chia Date: Tue, 10 Dec 2024 16:07:57 +0800 Subject: [PATCH 02/13] Add args --- benchmarking/ooms/big_task_heap_usage.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/benchmarking/ooms/big_task_heap_usage.py b/benchmarking/ooms/big_task_heap_usage.py index 9de447e031..abe5e4e81a 100644 --- a/benchmarking/ooms/big_task_heap_usage.py +++ b/benchmarking/ooms/big_task_heap_usage.py @@ -37,12 +37,14 @@ def generator( if __name__ == "__main__": parser = argparse.ArgumentParser( - "Runs a workload which is a simple map workload, but it will run 2 custom UDFs which first inflates the data, and then deflates it." + "Runs a workload which is a simple map workload, but it will run 2 custom UDFs which first inflates the data, and then deflates it. " + "It starts with 1KB partitions, then runs inflation and subsequently deflation. We expect this to OOM if the heap memory usage exceeds " + "`MEM / N_CPUS` on a given worker node." ) parser.add_argument("--num-partitions", default=8) parser.add_argument("--num-rows-per-partition", default=1000) - parser.add_argument("--inflation-factor", type=float, default=100) - parser.add_argument("--deflation-factor", type=float, default=100) + parser.add_argument("--inflation-factor", type=int, default=100) + parser.add_argument("--deflation-factor", type=int, default=100) args = parser.parse_args() df = read_generator( From f6eae7ba3f6e5194f31cb5fec42c0066b2baf5af Mon Sep 17 00:00:00 2001 From: Jay Chia Date: Tue, 10 Dec 2024 16:26:24 +0800 Subject: [PATCH 03/13] ints --- benchmarking/ooms/big_task_heap_usage.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/benchmarking/ooms/big_task_heap_usage.py b/benchmarking/ooms/big_task_heap_usage.py index abe5e4e81a..ad573ee56f 100644 --- a/benchmarking/ooms/big_task_heap_usage.py +++ b/benchmarking/ooms/big_task_heap_usage.py @@ -41,8 +41,8 @@ def generator( "It starts with 1KB partitions, then runs inflation and subsequently deflation. We expect this to OOM if the heap memory usage exceeds " "`MEM / N_CPUS` on a given worker node." ) - parser.add_argument("--num-partitions", default=8) - parser.add_argument("--num-rows-per-partition", default=1000) + parser.add_argument("--num-partitions", type=int, default=8) + parser.add_argument("--num-rows-per-partition", type=int, default=1000) parser.add_argument("--inflation-factor", type=int, default=100) parser.add_argument("--deflation-factor", type=int, default=100) args = parser.parse_args() From 741a9fb35d196aac55b45b39858d96bcf89094d5 Mon Sep 17 00:00:00 2001 From: Jay Chia Date: Tue, 10 Dec 2024 18:12:39 +0800 Subject: [PATCH 04/13] set runner ray --- benchmarking/ooms/big_task_heap_usage.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/benchmarking/ooms/big_task_heap_usage.py b/benchmarking/ooms/big_task_heap_usage.py index ad573ee56f..f66923fcc6 100644 --- a/benchmarking/ooms/big_task_heap_usage.py +++ b/benchmarking/ooms/big_task_heap_usage.py @@ -47,6 +47,8 @@ def generator( parser.add_argument("--deflation-factor", type=int, default=100) args = parser.parse_args() + daft.context.set_runner_ray() + df = read_generator( generator(args.num_partitions, args.num_rows_per_partition), schema=daft.Schema._from_field_name_and_types([("foo", daft.DataType.binary())]), From aa6eb38c3a4b62aadf04be45c06ca3e43432d0d7 Mon Sep 17 00:00:00 2001 From: Jay Chia Date: Tue, 10 Dec 2024 18:47:00 +0800 Subject: [PATCH 05/13] Use pa large binary array --- benchmarking/ooms/big_task_heap_usage.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/benchmarking/ooms/big_task_heap_usage.py b/benchmarking/ooms/big_task_heap_usage.py index f66923fcc6..b8c8746d6a 100644 --- a/benchmarking/ooms/big_task_heap_usage.py +++ b/benchmarking/ooms/big_task_heap_usage.py @@ -5,6 +5,8 @@ import argparse import functools +import pyarrow as pa + import daft from daft.io._generator import read_generator from daft.table.table import Table @@ -14,7 +16,7 @@ @daft.udf(return_dtype=daft.DataType.binary()) def mock_inflate_data(data, inflation_factor): - return [x * inflation_factor for x in data.to_pylist()] + return pa.array([x * inflation_factor for x in data.to_pylist()], type=pa.large_binary()) @daft.udf(return_dtype=daft.DataType.binary()) From e68f2300e7af869c4d4e6cbce857a2aee5e9005c Mon Sep 17 00:00:00 2001 From: Jay Chia Date: Tue, 10 Dec 2024 18:49:00 +0800 Subject: [PATCH 06/13] Upload logs even if job submission fails --- .github/workflows/run-cluster.yaml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/run-cluster.yaml b/.github/workflows/run-cluster.yaml index 644250d1f1..214e9d879b 100644 --- a/.github/workflows/run-cluster.yaml +++ b/.github/workflows/run-cluster.yaml @@ -123,6 +123,7 @@ jobs: --runtime-env-json "$ray_env_var" \ -- python ${{ inputs.entrypoint_script }} ${{ inputs.entrypoint_args }} - name: Download log files from ray cluster + if: always() run: | source .venv/bin/activate ray rsync-down .github/assets/ray.yaml /tmp/ray/session_*/logs ray-daft-logs @@ -152,6 +153,7 @@ jobs: source .venv/bin/activate ray down .github/assets/ray.yaml -y - name: Upload log files + if: always() uses: actions/upload-artifact@v4 with: name: ray-daft-logs From b38cdcce5752652d441e2891a9c8a819ed0ba82f Mon Sep 17 00:00:00 2001 From: Jay Chia Date: Wed, 11 Dec 2024 12:08:05 +0800 Subject: [PATCH 07/13] Add memray outputs to traces --- benchmarking/ooms/big_task_heap_usage.py | 2 +- daft/runners/ray_metrics.py | 20 ++++++++++++-- daft/runners/ray_tracing.py | 33 ++++++++++++++++++++++-- 3 files changed, 50 insertions(+), 5 deletions(-) diff --git a/benchmarking/ooms/big_task_heap_usage.py b/benchmarking/ooms/big_task_heap_usage.py index b8c8746d6a..b04087ab30 100644 --- a/benchmarking/ooms/big_task_heap_usage.py +++ b/benchmarking/ooms/big_task_heap_usage.py @@ -1,5 +1,5 @@ # /// script -# dependencies = ['numpy'] +# dependencies = ['numpy', 'memray'] # /// import argparse diff --git a/daft/runners/ray_metrics.py b/daft/runners/ray_metrics.py index df542446c6..31bafc871e 100644 --- a/daft/runners/ray_metrics.py +++ b/daft/runners/ray_metrics.py @@ -51,6 +51,14 @@ class EndTaskEvent(TaskEvent): # End Unix timestamp end: float + memory_stats: TaskMemoryStats + + +@dataclasses.dataclass(frozen=True) +class TaskMemoryStats: + peak_memory_allocated: int + total_memory_allocated: int + total_num_allocations: int class _NodeInfo: @@ -123,9 +131,15 @@ def mark_task_start( ) ) - def mark_task_end(self, execution_id: str, task_id: str, end: float): + def mark_task_end( + self, + execution_id: str, + task_id: str, + end: float, + memory_stats: TaskMemoryStats, + ): # Add an EndTaskEvent - self._task_events[execution_id].append(EndTaskEvent(task_id=task_id, end=end)) + self._task_events[execution_id].append(EndTaskEvent(task_id=task_id, end=end, memory_stats=memory_stats)) def get_task_events(self, execution_id: str, idx: int) -> tuple[list[TaskEvent], int]: events = self._task_events[execution_id] @@ -177,11 +191,13 @@ def mark_task_end( self, task_id: str, end: float, + memory_stats: TaskMemoryStats, ) -> None: self.actor.mark_task_end.remote( self.execution_id, task_id, end, + memory_stats, ) def get_task_events(self, idx: int) -> tuple[list[TaskEvent], int]: diff --git a/daft/runners/ray_tracing.py b/daft/runners/ray_tracing.py index b200651a76..c395903e65 100644 --- a/daft/runners/ray_tracing.py +++ b/daft/runners/ray_tracing.py @@ -10,6 +10,7 @@ import dataclasses import json import logging +import os import pathlib import time from datetime import datetime @@ -255,6 +256,11 @@ def _flush_task_metrics(self): "ph": RunnerTracer.PHASE_ASYNC_END, "pid": 1, "tid": 2, + "args": { + "memray_peak_memory_allocated": task_event.memory_stats.peak_memory_allocated, + "memray_total_memory_allocated": task_event.memory_stats.total_memory_allocated, + "memray_total_num_allocations": task_event.memory_stats.total_num_allocations, + }, }, ts=end_ts, ) @@ -272,6 +278,11 @@ def _flush_task_metrics(self): "ph": RunnerTracer.PHASE_DURATION_END, "pid": node_idx + RunnerTracer.NODE_PIDS_START, "tid": worker_idx, + "args": { + "memray_peak_memory_allocated": task_event.memory_stats.peak_memory_allocated, + "memray_total_memory_allocated": task_event.memory_stats.total_memory_allocated, + "memray_total_num_allocations": task_event.memory_stats.total_num_allocations, + }, }, ts=end_ts, ) @@ -656,8 +667,12 @@ def __next__(self): def collect_ray_task_metrics(execution_id: str, task_id: str, stage_id: int, execution_config: PyDaftExecutionConfig): """Context manager that will ping the metrics actor to record various execution metrics about a given task.""" if execution_config.enable_ray_tracing: + import tempfile import time + import memray + from memray._memray import compute_statistics + runtime_context = ray.get_runtime_context() metrics_actor = ray_metrics.get_metrics_actor(execution_id) @@ -670,7 +685,21 @@ def collect_ray_task_metrics(execution_id: str, task_id: str, stage_id: int, exe runtime_context.get_assigned_resources(), runtime_context.get_task_id(), ) - yield - metrics_actor.mark_task_end(task_id, time.time()) + with tempfile.TemporaryDirectory() as tmpdir: + memray_tmpfile = os.path.join(tmpdir, f"task-{task_id}.memray.bin") + try: + with memray.Tracker(memray_tmpfile): + yield + finally: + stats = compute_statistics(memray_tmpfile) + metrics_actor.mark_task_end( + task_id, + time.time(), + ray_metrics.TaskMemoryStats( + peak_memory_allocated=stats.peak_memory_allocated, + total_memory_allocated=stats.total_memory_allocated, + total_num_allocations=stats.total_num_allocations, + ), + ) else: yield From d878e395d62c2701a49950e10119d036e3d184e1 Mon Sep 17 00:00:00 2001 From: Jay Chia Date: Wed, 11 Dec 2024 16:47:15 +0800 Subject: [PATCH 08/13] Add a test harness for testing estimations against actual --- tests/memory/__init__.py | 0 tests/memory/test_projections.py | 71 ++++++++++++++++++++++++++++++++ 2 files changed, 71 insertions(+) create mode 100644 tests/memory/__init__.py create mode 100644 tests/memory/test_projections.py diff --git a/tests/memory/__init__.py b/tests/memory/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/memory/test_projections.py b/tests/memory/test_projections.py new file mode 100644 index 0000000000..d40b38b7a8 --- /dev/null +++ b/tests/memory/test_projections.py @@ -0,0 +1,71 @@ +from __future__ import annotations + +import contextlib +import dataclasses +import os +import tempfile +from typing import Iterator +from unittest import mock + +import memray +from memray._memray import compute_statistics + +import daft +from daft.context import get_context +from daft.execution.physical_plan import MaterializedPhysicalPlan +from daft.runners.ray_runner import build_partitions + + +@dataclasses.dataclass +class LazyMemrayStats: + memray_stats: memray._stats.Stats | None + + def unwrap(self) -> memray._stats.Stats: + assert self.memray_stats is not None + return self.memray_stats + + +@contextlib.contextmanager +def track_memory() -> Iterator[LazyMemrayStats]: + tracked = LazyMemrayStats(None) + with tempfile.TemporaryDirectory() as tmpdir: + tmpfile = os.path.join(tmpdir, "memray.bin") + with memray.Tracker(tmpfile): + yield tracked + + stats = compute_statistics(tmpfile) + tracked.memray_stats = stats + + +def df_to_tasks(df: daft.DataFrame) -> MaterializedPhysicalPlan: + cfg = get_context().daft_execution_config + physical_plan = df._builder.to_physical_plan_scheduler(cfg) + + return physical_plan.to_partition_tasks( + psets={ + k: v.values() + for k, v in get_context().get_or_create_runner()._part_set_cache.get_all_partition_sets().items() + }, + actor_pool_manager=mock.Mock(), + results_buffer_size=None, + ) + + +def test_simple_project(): + df = daft.read_parquet("tests/assets/parquet-data/mvp.parquet") + df = df.with_column("c", df["a"] + 100) + + tasks = df_to_tasks(df) + partition_task = next(tasks) + + with track_memory() as lazy_memray_stats: + _ = build_partitions( + partition_task.instructions, + partition_task.partial_metadatas, + *partition_task.inputs, + ) + + assert partition_task.resource_request.memory_bytes is not None, "Partition Task must have resource request" + assert ( + lazy_memray_stats.unwrap().peak_memory_allocated < partition_task.resource_request.memory_bytes + ), "Execution must use less memory than requested" From c3bab22464edd2299c65d48a5be2dac4367169c2 Mon Sep 17 00:00:00 2001 From: Jay Chia Date: Sat, 14 Dec 2024 13:47:06 +0800 Subject: [PATCH 09/13] Add a __main__ script --- daft/datatype.py | 43 +++++++------ tests/memory/__main__.py | 76 +++++++++++++++++++++++ tests/memory/test_projections.py | 71 --------------------- tests/memory/test_ray_build_partitions.py | 25 ++++++++ tests/memory/utils.py | 50 +++++++++++++++ 5 files changed, 175 insertions(+), 90 deletions(-) create mode 100644 tests/memory/__main__.py delete mode 100644 tests/memory/test_projections.py create mode 100644 tests/memory/test_ray_build_partitions.py create mode 100644 tests/memory/utils.py diff --git a/daft/datatype.py b/daft/datatype.py index b15902c41d..a47491477f 100644 --- a/daft/datatype.py +++ b/daft/datatype.py @@ -576,38 +576,43 @@ def __hash__(self) -> int: DataTypeLike = Union[DataType, type] +import threading + _EXT_TYPE_REGISTERED = False _STATIC_DAFT_EXTENSION = None +_ext_type_lock = threading.Lock() def _ensure_registered_super_ext_type(): global _EXT_TYPE_REGISTERED global _STATIC_DAFT_EXTENSION - if not _EXT_TYPE_REGISTERED: - class DaftExtension(pa.ExtensionType): - def __init__(self, dtype, metadata=b""): - # attributes need to be set first before calling - # super init (as that calls serialize) - self._metadata = metadata - super().__init__(dtype, "daft.super_extension") + with _ext_type_lock: + if not _EXT_TYPE_REGISTERED: + + class DaftExtension(pa.ExtensionType): + def __init__(self, dtype, metadata=b""): + # attributes need to be set first before calling + # super init (as that calls serialize) + self._metadata = metadata + super().__init__(dtype, "daft.super_extension") - def __reduce__(self): - return type(self).__arrow_ext_deserialize__, (self.storage_type, self.__arrow_ext_serialize__()) + def __reduce__(self): + return type(self).__arrow_ext_deserialize__, (self.storage_type, self.__arrow_ext_serialize__()) - def __arrow_ext_serialize__(self): - return self._metadata + def __arrow_ext_serialize__(self): + return self._metadata - @classmethod - def __arrow_ext_deserialize__(cls, storage_type, serialized): - return cls(storage_type, serialized) + @classmethod + def __arrow_ext_deserialize__(cls, storage_type, serialized): + return cls(storage_type, serialized) - _STATIC_DAFT_EXTENSION = DaftExtension - pa.register_extension_type(DaftExtension(pa.null())) - import atexit + _STATIC_DAFT_EXTENSION = DaftExtension + pa.register_extension_type(DaftExtension(pa.null())) + import atexit - atexit.register(lambda: pa.unregister_extension_type("daft.super_extension")) - _EXT_TYPE_REGISTERED = True + atexit.register(lambda: pa.unregister_extension_type("daft.super_extension")) + _EXT_TYPE_REGISTERED = True def get_super_ext_type(): diff --git a/tests/memory/__main__.py b/tests/memory/__main__.py new file mode 100644 index 0000000000..1c432e95e0 --- /dev/null +++ b/tests/memory/__main__.py @@ -0,0 +1,76 @@ +import argparse + +import daft + + +@daft.udf(return_dtype=str) +def to_pylist_identity(s): + data = s.to_pylist() + return data + + +@daft.udf(return_dtype=str) +def to_arrow_identity(s): + data = s.to_arrow() + return data + + +@daft.udf(return_dtype=str) +def to_series_identity(s): + data = s + return data + + +def transform_pylist(df, batch_size=None): + tfm = to_pylist_identity + + if batch_size is not None: + tfm = tfm.override_options(batch_size=batch_size) + + df = df.with_column("c", tfm(df["b"])) + return df + + +def transform_arrow(df, batch_size=None): + tfm = to_arrow_identity + + if batch_size is not None: + tfm = tfm.override_options(batch_size=batch_size) + + df = df.with_column("c", tfm(df["b"])) + return df + + +def transform_series(df, batch_size=None): + tfm = to_series_identity + + if batch_size is not None: + tfm = tfm.override_options(batch_size=batch_size) + + df = df.with_column("c", tfm(df["b"])) + return df + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("--transform", choices=["arrow", "pylist", "series"], default="series") + parser.add_argument("--batch-size", default=None, type=int) + args = parser.parse_args() + + if args.transform == "series": + transform = transform_series + elif args.transform == "pylist": + transform = transform_pylist + elif args.transform == "arrow": + transform = transform_arrow + else: + raise ValueError(f"Unrecignized transform {args.transform}") + + df = daft.read_parquet("tests/assets/parquet-data/mvp.parquet") + + for i in range(128): + df = df.concat(daft.read_parquet("tests/assets/parquet-data/mvp.parquet")) + + df = df.transform(transform, batch_size=args.batch_size) + df.collect() + print(df) diff --git a/tests/memory/test_projections.py b/tests/memory/test_projections.py deleted file mode 100644 index d40b38b7a8..0000000000 --- a/tests/memory/test_projections.py +++ /dev/null @@ -1,71 +0,0 @@ -from __future__ import annotations - -import contextlib -import dataclasses -import os -import tempfile -from typing import Iterator -from unittest import mock - -import memray -from memray._memray import compute_statistics - -import daft -from daft.context import get_context -from daft.execution.physical_plan import MaterializedPhysicalPlan -from daft.runners.ray_runner import build_partitions - - -@dataclasses.dataclass -class LazyMemrayStats: - memray_stats: memray._stats.Stats | None - - def unwrap(self) -> memray._stats.Stats: - assert self.memray_stats is not None - return self.memray_stats - - -@contextlib.contextmanager -def track_memory() -> Iterator[LazyMemrayStats]: - tracked = LazyMemrayStats(None) - with tempfile.TemporaryDirectory() as tmpdir: - tmpfile = os.path.join(tmpdir, "memray.bin") - with memray.Tracker(tmpfile): - yield tracked - - stats = compute_statistics(tmpfile) - tracked.memray_stats = stats - - -def df_to_tasks(df: daft.DataFrame) -> MaterializedPhysicalPlan: - cfg = get_context().daft_execution_config - physical_plan = df._builder.to_physical_plan_scheduler(cfg) - - return physical_plan.to_partition_tasks( - psets={ - k: v.values() - for k, v in get_context().get_or_create_runner()._part_set_cache.get_all_partition_sets().items() - }, - actor_pool_manager=mock.Mock(), - results_buffer_size=None, - ) - - -def test_simple_project(): - df = daft.read_parquet("tests/assets/parquet-data/mvp.parquet") - df = df.with_column("c", df["a"] + 100) - - tasks = df_to_tasks(df) - partition_task = next(tasks) - - with track_memory() as lazy_memray_stats: - _ = build_partitions( - partition_task.instructions, - partition_task.partial_metadatas, - *partition_task.inputs, - ) - - assert partition_task.resource_request.memory_bytes is not None, "Partition Task must have resource request" - assert ( - lazy_memray_stats.unwrap().peak_memory_allocated < partition_task.resource_request.memory_bytes - ), "Execution must use less memory than requested" diff --git a/tests/memory/test_ray_build_partitions.py b/tests/memory/test_ray_build_partitions.py new file mode 100644 index 0000000000..e8bbd84a98 --- /dev/null +++ b/tests/memory/test_ray_build_partitions.py @@ -0,0 +1,25 @@ +from __future__ import annotations + +import daft +from daft.runners.ray_runner import build_partitions +from tests.memory.utils import df_to_tasks, track_memory + + +def test_simple_project(): + df = daft.read_parquet("tests/assets/parquet-data/mvp.parquet") + df = df.with_column("c", df["a"] + 100) + + tasks = df_to_tasks(df) + partition_task = next(tasks) + + with track_memory() as lazy_memray_stats: + _ = build_partitions( + partition_task.instructions, + partition_task.partial_metadatas, + *partition_task.inputs, + ) + + assert partition_task.resource_request.memory_bytes is not None, "Partition Task must have resource request" + assert ( + lazy_memray_stats.unwrap().peak_memory_allocated < partition_task.resource_request.memory_bytes + ), "Execution must use less memory than requested" diff --git a/tests/memory/utils.py b/tests/memory/utils.py new file mode 100644 index 0000000000..fc73c2d899 --- /dev/null +++ b/tests/memory/utils.py @@ -0,0 +1,50 @@ +import contextlib +import dataclasses +import logging +import os +from typing import Iterator +from unittest import mock + +import memray +from memray._memray.stats import compute_statistics + +import daft +from daft.context import get_context +from daft.execution.physical_plan import MaterializedPhysicalPlan + +logger = logging.getLogger(__name__) + + +@dataclasses.dataclass +class LazyMemrayStats: + memray_stats: memray._stats.Stats | None + + def unwrap(self) -> memray._stats.Stats: + assert self.memray_stats is not None + return self.memray_stats + + +@contextlib.contextmanager +def track_memory(tmpdir) -> Iterator[LazyMemrayStats]: + tracked = LazyMemrayStats(None) + tmpfile = os.path.join(tmpdir, "memray.bin") + + with memray.Tracker(tmpfile): + yield tracked + + stats = compute_statistics(tmpfile) + tracked.memray_stats = stats + + +def df_to_tasks(df: daft.DataFrame) -> MaterializedPhysicalPlan: + cfg = get_context().daft_execution_config + physical_plan = df._builder.to_physical_plan_scheduler(cfg) + + return physical_plan.to_partition_tasks( + psets={ + k: v.values() + for k, v in get_context().get_or_create_runner()._part_set_cache.get_all_partition_sets().items() + }, + actor_pool_manager=mock.Mock(), + results_buffer_size=None, + ) From 91852ead06a22c1a68715aa12b8a591327b2ba67 Mon Sep 17 00:00:00 2001 From: Jay Chia Date: Sat, 14 Dec 2024 21:34:33 +0800 Subject: [PATCH 10/13] Add build_partition test fixture --- daft/runners/ray_tracing.py | 34 +++++----- tests/memory/__main__.py | 76 ----------------------- tests/memory/test_ray_build_partitions.py | 25 -------- tests/memory/test_udf_project.py | 36 +++++++++++ tests/memory/utils.py | 65 ++++++------------- 5 files changed, 73 insertions(+), 163 deletions(-) delete mode 100644 tests/memory/__main__.py delete mode 100644 tests/memory/test_ray_build_partitions.py create mode 100644 tests/memory/test_udf_project.py diff --git a/daft/runners/ray_tracing.py b/daft/runners/ray_tracing.py index c395903e65..74980bcc21 100644 --- a/daft/runners/ray_tracing.py +++ b/daft/runners/ray_tracing.py @@ -667,7 +667,6 @@ def __next__(self): def collect_ray_task_metrics(execution_id: str, task_id: str, stage_id: int, execution_config: PyDaftExecutionConfig): """Context manager that will ping the metrics actor to record various execution metrics about a given task.""" if execution_config.enable_ray_tracing: - import tempfile import time import memray @@ -685,21 +684,22 @@ def collect_ray_task_metrics(execution_id: str, task_id: str, stage_id: int, exe runtime_context.get_assigned_resources(), runtime_context.get_task_id(), ) - with tempfile.TemporaryDirectory() as tmpdir: - memray_tmpfile = os.path.join(tmpdir, f"task-{task_id}.memray.bin") - try: - with memray.Tracker(memray_tmpfile): - yield - finally: - stats = compute_statistics(memray_tmpfile) - metrics_actor.mark_task_end( - task_id, - time.time(), - ray_metrics.TaskMemoryStats( - peak_memory_allocated=stats.peak_memory_allocated, - total_memory_allocated=stats.total_memory_allocated, - total_num_allocations=stats.total_num_allocations, - ), - ) + tmpdir = "/tmp/ray/session_latest/logs/daft/task_memray_dumps" + os.makedirs(tmpdir, exist_ok=True) + memray_tmpfile = os.path.join(tmpdir, f"task-{task_id}.memray.bin") + try: + with memray.Tracker(memray_tmpfile, native_traces=True, follow_fork=True): + yield + finally: + stats = compute_statistics(memray_tmpfile) + metrics_actor.mark_task_end( + task_id, + time.time(), + ray_metrics.TaskMemoryStats( + peak_memory_allocated=stats.peak_memory_allocated, + total_memory_allocated=stats.total_memory_allocated, + total_num_allocations=stats.total_num_allocations, + ), + ) else: yield diff --git a/tests/memory/__main__.py b/tests/memory/__main__.py deleted file mode 100644 index 1c432e95e0..0000000000 --- a/tests/memory/__main__.py +++ /dev/null @@ -1,76 +0,0 @@ -import argparse - -import daft - - -@daft.udf(return_dtype=str) -def to_pylist_identity(s): - data = s.to_pylist() - return data - - -@daft.udf(return_dtype=str) -def to_arrow_identity(s): - data = s.to_arrow() - return data - - -@daft.udf(return_dtype=str) -def to_series_identity(s): - data = s - return data - - -def transform_pylist(df, batch_size=None): - tfm = to_pylist_identity - - if batch_size is not None: - tfm = tfm.override_options(batch_size=batch_size) - - df = df.with_column("c", tfm(df["b"])) - return df - - -def transform_arrow(df, batch_size=None): - tfm = to_arrow_identity - - if batch_size is not None: - tfm = tfm.override_options(batch_size=batch_size) - - df = df.with_column("c", tfm(df["b"])) - return df - - -def transform_series(df, batch_size=None): - tfm = to_series_identity - - if batch_size is not None: - tfm = tfm.override_options(batch_size=batch_size) - - df = df.with_column("c", tfm(df["b"])) - return df - - -if __name__ == "__main__": - parser = argparse.ArgumentParser() - parser.add_argument("--transform", choices=["arrow", "pylist", "series"], default="series") - parser.add_argument("--batch-size", default=None, type=int) - args = parser.parse_args() - - if args.transform == "series": - transform = transform_series - elif args.transform == "pylist": - transform = transform_pylist - elif args.transform == "arrow": - transform = transform_arrow - else: - raise ValueError(f"Unrecignized transform {args.transform}") - - df = daft.read_parquet("tests/assets/parquet-data/mvp.parquet") - - for i in range(128): - df = df.concat(daft.read_parquet("tests/assets/parquet-data/mvp.parquet")) - - df = df.transform(transform, batch_size=args.batch_size) - df.collect() - print(df) diff --git a/tests/memory/test_ray_build_partitions.py b/tests/memory/test_ray_build_partitions.py deleted file mode 100644 index e8bbd84a98..0000000000 --- a/tests/memory/test_ray_build_partitions.py +++ /dev/null @@ -1,25 +0,0 @@ -from __future__ import annotations - -import daft -from daft.runners.ray_runner import build_partitions -from tests.memory.utils import df_to_tasks, track_memory - - -def test_simple_project(): - df = daft.read_parquet("tests/assets/parquet-data/mvp.parquet") - df = df.with_column("c", df["a"] + 100) - - tasks = df_to_tasks(df) - partition_task = next(tasks) - - with track_memory() as lazy_memray_stats: - _ = build_partitions( - partition_task.instructions, - partition_task.partial_metadatas, - *partition_task.inputs, - ) - - assert partition_task.resource_request.memory_bytes is not None, "Partition Task must have resource request" - assert ( - lazy_memray_stats.unwrap().peak_memory_allocated < partition_task.resource_request.memory_bytes - ), "Execution must use less memory than requested" diff --git a/tests/memory/test_udf_project.py b/tests/memory/test_udf_project.py new file mode 100644 index 0000000000..f5c0666eee --- /dev/null +++ b/tests/memory/test_udf_project.py @@ -0,0 +1,36 @@ +import uuid + +import pytest +from memray._memray import compute_statistics + +import daft +from daft.execution.execution_step import ExpressionsProjection, Project +from tests.memory.utils import run_wrapper_build_partitions + + +@daft.udf(return_dtype=str) +def to_arrow_identity(s): + data = s.to_arrow() + return data + + +@daft.udf(return_dtype=str) +def to_pylist_identity(s): + data = s.to_pylist() + return data + + +@pytest.mark.parametrize( + "udf", + [ + to_arrow_identity, + to_pylist_identity, + ], +) +def test_string_identity_projection(udf): + instructions = [Project(ExpressionsProjection([udf(daft.col("a"))]))] + inputs = [{"a": [str(uuid.uuid4()) for _ in range(62500)]}] + _, memray_file = run_wrapper_build_partitions(inputs, instructions) + stats = compute_statistics(memray_file) + + assert stats.peak_memory_allocated < 100 diff --git a/tests/memory/utils.py b/tests/memory/utils.py index fc73c2d899..bc92c3dc3c 100644 --- a/tests/memory/utils.py +++ b/tests/memory/utils.py @@ -1,50 +1,25 @@ -import contextlib -import dataclasses -import logging import os -from typing import Iterator +import tempfile +import uuid from unittest import mock import memray -from memray._memray.stats import compute_statistics -import daft -from daft.context import get_context -from daft.execution.physical_plan import MaterializedPhysicalPlan - -logger = logging.getLogger(__name__) - - -@dataclasses.dataclass -class LazyMemrayStats: - memray_stats: memray._stats.Stats | None - - def unwrap(self) -> memray._stats.Stats: - assert self.memray_stats is not None - return self.memray_stats - - -@contextlib.contextmanager -def track_memory(tmpdir) -> Iterator[LazyMemrayStats]: - tracked = LazyMemrayStats(None) - tmpfile = os.path.join(tmpdir, "memray.bin") - - with memray.Tracker(tmpfile): - yield tracked - - stats = compute_statistics(tmpfile) - tracked.memray_stats = stats - - -def df_to_tasks(df: daft.DataFrame) -> MaterializedPhysicalPlan: - cfg = get_context().daft_execution_config - physical_plan = df._builder.to_physical_plan_scheduler(cfg) - - return physical_plan.to_partition_tasks( - psets={ - k: v.values() - for k, v in get_context().get_or_create_runner()._part_set_cache.get_all_partition_sets().items() - }, - actor_pool_manager=mock.Mock(), - results_buffer_size=None, - ) +from daft.execution.execution_step import Instruction +from daft.runners.ray_runner import build_partitions +from daft.table import MicroPartition + + +def run_wrapper_build_partitions( + input_partitions: list[dict], instructions: list[Instruction] +) -> tuple[list[MicroPartition], str]: + inputs = [MicroPartition.from_pydict(p) for p in input_partitions] + tmpdir = tempfile.gettempdir() + memray_path = os.path.join(tmpdir, f"memray-{uuid.uuid4()}.bin") + with memray.Tracker(memray_path, native_traces=True, follow_fork=True): + results = build_partitions( + instructions, + [mock.Mock() for _ in range(len(input_partitions))], + *inputs, + ) + return results[1:], memray_path From d4f3337abbb86a1750837d0753cf01385ee1dd65 Mon Sep 17 00:00:00 2001 From: Jay Chia Date: Sat, 14 Dec 2024 21:38:35 +0800 Subject: [PATCH 11/13] Add batched UDF tests --- tests/memory/test_udf_project.py | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/tests/memory/test_udf_project.py b/tests/memory/test_udf_project.py index f5c0666eee..9ce2afff36 100644 --- a/tests/memory/test_udf_project.py +++ b/tests/memory/test_udf_project.py @@ -20,16 +20,30 @@ def to_pylist_identity(s): return data +@daft.udf(return_dtype=str, batch_size=128) +def to_arrow_identity_batched(s): + data = s.to_arrow() + return data + + +@daft.udf(return_dtype=str, batch_size=128) +def to_pylist_identity_batched(s): + data = s.to_pylist() + return data + + @pytest.mark.parametrize( "udf", [ to_arrow_identity, to_pylist_identity, + to_arrow_identity_batched, + to_pylist_identity_batched, ], ) def test_string_identity_projection(udf): instructions = [Project(ExpressionsProjection([udf(daft.col("a"))]))] - inputs = [{"a": [str(uuid.uuid4()) for _ in range(62500)]}] + inputs = [{"a": [str(uuid.uuid4()) for _ in range(625000)]}] _, memray_file = run_wrapper_build_partitions(inputs, instructions) stats = compute_statistics(memray_file) From 1803b0cc70037bd50dc81bcbda92137e4cc7fc08 Mon Sep 17 00:00:00 2001 From: Jay Chia Date: Sat, 14 Dec 2024 21:40:32 +0800 Subject: [PATCH 12/13] Add arrow return test --- tests/memory/test_udf_project.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/tests/memory/test_udf_project.py b/tests/memory/test_udf_project.py index 9ce2afff36..7ac1bfee98 100644 --- a/tests/memory/test_udf_project.py +++ b/tests/memory/test_udf_project.py @@ -1,5 +1,6 @@ import uuid +import pyarrow as pa import pytest from memray._memray import compute_statistics @@ -32,6 +33,12 @@ def to_pylist_identity_batched(s): return data +@daft.udf(return_dtype=str, batch_size=128) +def to_pylist_identity_batched_arrow_return(s): + data = s.to_pylist() + return pa.array(data) + + @pytest.mark.parametrize( "udf", [ @@ -39,6 +46,7 @@ def to_pylist_identity_batched(s): to_pylist_identity, to_arrow_identity_batched, to_pylist_identity_batched, + to_pylist_identity_batched_arrow_return, ], ) def test_string_identity_projection(udf): From 60eed9dd8e1e7c5a8a5c7a9089a786a98e0a03d5 Mon Sep 17 00:00:00 2001 From: Jay Chia Date: Tue, 17 Dec 2024 06:05:29 +0800 Subject: [PATCH 13/13] Add more failing tests --- tests/memory/test_udf_project.py | 40 ++++++++++++++++++++++++++++++-- tests/memory/utils.py | 6 +++++ 2 files changed, 44 insertions(+), 2 deletions(-) diff --git a/tests/memory/test_udf_project.py b/tests/memory/test_udf_project.py index 7ac1bfee98..c67cda0e28 100644 --- a/tests/memory/test_udf_project.py +++ b/tests/memory/test_udf_project.py @@ -9,6 +9,15 @@ from tests.memory.utils import run_wrapper_build_partitions +def format_bytes(bytes_value): + """Format bytes into human readable string with appropriate unit.""" + for unit in ["B", "KB", "MB", "GB"]: + if bytes_value < 1024: + return f"{bytes_value:.2f} {unit}" + bytes_value /= 1024 + return f"{bytes_value:.2f} GB" + + @daft.udf(return_dtype=str) def to_arrow_identity(s): data = s.to_arrow() @@ -49,10 +58,37 @@ def to_pylist_identity_batched_arrow_return(s): to_pylist_identity_batched_arrow_return, ], ) -def test_string_identity_projection(udf): +def test_short_string_identity_projection(udf): + instructions = [Project(ExpressionsProjection([udf(daft.col("a"))]))] + inputs = [{"a": [str(uuid.uuid4()) for _ in range(62500)]}] + _, memray_file = run_wrapper_build_partitions(inputs, instructions) + stats = compute_statistics(memray_file) + + expected_peak_bytes = 100 + assert stats.peak_memory_allocated < expected_peak_bytes, ( + f"Peak memory ({format_bytes(stats.peak_memory_allocated)}) " + f"exceeded threshold ({format_bytes(expected_peak_bytes)})" + ) + + +@pytest.mark.parametrize( + "udf", + [ + to_arrow_identity, + to_pylist_identity, + to_arrow_identity_batched, + to_pylist_identity_batched, + to_pylist_identity_batched_arrow_return, + ], +) +def test_long_string_identity_projection(udf): instructions = [Project(ExpressionsProjection([udf(daft.col("a"))]))] inputs = [{"a": [str(uuid.uuid4()) for _ in range(625000)]}] _, memray_file = run_wrapper_build_partitions(inputs, instructions) stats = compute_statistics(memray_file) - assert stats.peak_memory_allocated < 100 + expected_peak_bytes = 100 + assert stats.peak_memory_allocated < expected_peak_bytes, ( + f"Peak memory ({format_bytes(stats.peak_memory_allocated)}) " + f"exceeded threshold ({format_bytes(expected_peak_bytes)})" + ) diff --git a/tests/memory/utils.py b/tests/memory/utils.py index bc92c3dc3c..a1f17f706d 100644 --- a/tests/memory/utils.py +++ b/tests/memory/utils.py @@ -1,3 +1,4 @@ +import logging import os import tempfile import uuid @@ -9,11 +10,16 @@ from daft.runners.ray_runner import build_partitions from daft.table import MicroPartition +logger = logging.getLogger(__name__) + def run_wrapper_build_partitions( input_partitions: list[dict], instructions: list[Instruction] ) -> tuple[list[MicroPartition], str]: inputs = [MicroPartition.from_pydict(p) for p in input_partitions] + + logger.info("Input total size: %s", sum(i.size_bytes() for i in inputs)) + tmpdir = tempfile.gettempdir() memray_path = os.path.join(tmpdir, f"memray-{uuid.uuid4()}.bin") with memray.Tracker(memray_path, native_traces=True, follow_fork=True):