-
Notifications
You must be signed in to change notification settings - Fork 653
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
PERF-#4494: Get partition widths/lengths in parallel instead of serially #4683
base: master
Are you sure you want to change the base?
Conversation
Codecov Report
@@ Coverage Diff @@
## master #4683 +/- ##
===========================================
- Coverage 85.28% 72.15% -13.13%
===========================================
Files 259 259
Lines 19378 19496 +118
===========================================
- Hits 16527 14068 -2459
- Misses 2851 5428 +2577
📣 Codecov can now indicate which changes are the most critical in Pull Requests. Learn more |
@@ -377,6 +377,20 @@ def length(self): | |||
""" | |||
if self._length_cache is None: | |||
if self.axis == 0: | |||
caches = [ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logic is duplicated from the PartitionManager
classes above, but I'm not sure how to access the correct partition manager from here.
Haven't taken a closer look at the implementation details, but do you have any benchmarks or performance measurements to compare with master? |
Sadly no, and I’d appreciate some suggestions on what code to run. Rehan
suggested manually invalidating the ._row_lengths_cache and .length_cache
fields on a dataframe and its partitions, then ensuring they’re recomputed
properly. It succeeds for simple examples, but I had trouble producing a
Ray timeline, and I’m not sure how else to benchmark it (most API-level
dataframe manipulations would probably hit the cached length/width).
…On Wed, Jul 20, 2022 at 19:18 Karthik Velayutham ***@***.***> wrote:
Haven't taken a closer look at the implementation details, but do you have
any benchmarks or performance measurements to compare with master?
—
Reply to this email directly, view it on GitHub
<#4683 (comment)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AFFY4GR46CDY7NCNZ722GSDVVCXO7ANCNFSM535Z7DMA>
.
You are receiving this because you authored the thread.Message ID:
***@***.***>
|
modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition_manager.py
Outdated
Show resolved
Hide resolved
I spent a while today trying to get a script that showcases the performance here without breaking anything in Modin, but I failed. Getting a reproducer is hard for a few reasons. For one thing, this optimization is only useful for unusual cases like in #4493 where the partitions' call queues include costly operations. When there is no call queue, the partitions will execute all dataframe functions eagerly, simultaneously calculating shapes. The call queues are generally meant to carry cheap operations like transpose and reindexing, but the reproducer in that issue has a frame that is very expensive to serialize, so that even the transpose was expensive. There the slow code was in Looking at all the serial shape computations I listed here, most are in internal length computations. One is I think it's good practice to get multiple ray objects in parallel (see also this note about a similar improvement in |
This adds a certain bit of complexity (judging by the number of lines change, haven't looked at the diff yet), and I haven't yet seen any performance proof for that. I would like to see some measurements before increasing our (already huge) codebase... |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some comments, but great work!
@@ -1214,6 +1233,19 @@ def apply_func_to_indices_both_axis( | |||
if col_widths is None: | |||
col_widths = [None] * len(col_partitions_list) | |||
|
|||
if row_lengths is None and col_widths is None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to compute dimensions here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The length
and width
values of each partition are accessed in the local compute_part_size
, defined immediately below. The double for
loop structure where compute_part_size
is called makes it hard to parallelize the computation of these dimensions, so I thought it would be simplest to precompute the relevant dimensions before the loop.
@@ -273,13 +274,42 @@ def length(self): | |||
int | |||
The length of the object. | |||
""" | |||
self.try_build_length_cache() | |||
return self._length_cache |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to unwrap _length_cache
here, since its type will be PandasDataframePartition
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you mean by unwrap
? Also, as far as I can tell, the logic for this method should be the same as it originally was (the code was just moved into the try_build_length_cache
, so does this mean the original code returned PandasDataframePartition
as well?
modin/core/execution/dask/implementations/pandas_on_dask/partitioning/virtual_partition.py
Outdated
Show resolved
Hide resolved
for i, cache in enumerate(caches): | ||
if isinstance(cache, Future): | ||
self.list_of_partitions_to_combine[i].try_set_length_cache( | ||
new_lengths[dask_idx] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this just be i
as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, since new_lengths
may have fewer elements than caches
in the case where some length values were already computed (and are filtered out by the isinstance(cache, Future)
check). The value computed at new_lengths[dask_idx]
should correspond to the promise at caches[i]
.
modin/core/execution/dask/implementations/pandas_on_dask/partitioning/virtual_partition.py
Outdated
Show resolved
Hide resolved
modin/core/execution/dask/implementations/pandas_on_dask/partitioning/partition.py
Outdated
Show resolved
Hide resolved
modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py
Outdated
Show resolved
Hide resolved
modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition_manager.py
Outdated
Show resolved
Hide resolved
modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition_manager.py
Outdated
Show resolved
Hide resolved
modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py
Outdated
Show resolved
Hide resolved
@vnlitvinov that makes sense, I'll look into coming up with concrete benchmarks. |
@pyrito please have a look at https://github.com/vnlitvinov/modin/tree/speedup-masking and #4726, it might be doing somewhat the same in terms of getting the sizes in parallel |
Related discussion on handling metadata (index and columns) in #3673. |
6a17fc3
to
e0bb5fa
Compare
Signed-off-by: Jonathan Shi <[email protected]>
Signed-off-by: Jonathan Shi <[email protected]>
Signed-off-by: Jonathan Shi <[email protected]>
Signed-off-by: Jonathan Shi <[email protected]>
Signed-off-by: Jonathan Shi <[email protected]>
Signed-off-by: Jonathan Shi <[email protected]>
Signed-off-by: Jonathan Shi <[email protected]>
…end) Signed-off-by: Jonathan Shi <[email protected]>
Signed-off-by: Jonathan Shi <[email protected]>
Signed-off-by: Jonathan Shi <[email protected]>
Co-authored-by: Rehan Sohail Durrani <[email protected]>
Signed-off-by: Jonathan Shi <[email protected]>
Signed-off-by: Jonathan Shi <[email protected]>
Signed-off-by: Jonathan Shi <[email protected]>
e0bb5fa
to
490778c
Compare
What do these changes do?
Computes widths and lengths of block partitions in parallel as batched calls to
ray.get
/DaskWrapper.materialize
rather than in serial.This adds the
try_build_[length|width]_cache
andtry_set_[length|width]_cache
methods to block partitions; the former returns a promise/future for computing the partition's length, and the latter should be called by the partition manager to inform the block partition of the computation's value. This also adds the_update_partition_dimension_caches
to thePartitionManager
class, which will call the length/width futures returned by its constituent partitions.flake8 modin/ asv_bench/benchmarks scripts/doc_checker.py
black --check modin/ asv_bench/benchmarks scripts/doc_checker.py
git commit -s
docs/development/architecture.rst
is up-to-date