Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support On-Demand Repartition #14411

Open
wants to merge 23 commits into
base: main
Choose a base branch
from

Conversation

Weijun-H
Copy link
Member

@Weijun-H Weijun-H commented Feb 2, 2025

Which issue does this PR close?

Closes #14287

Rationale for this change

  • Introduce prefer_round_robin_repartititon in optimizer config, when it is false, replace all RoundRobinBatch with OnDemandRepartition
  • Use mpmc channel to make sure the Repartition poll one data when requesting instead of pre-assigning
  • Use the Tokio channel when using OnDemandRepartitionExec instead of the customized Distributed Channel
/// The OnDemandRepartitionExec operator repartitions the input data based on a push-based model.
/// It is similar to the RepartitionExec operator, but it doesn't distribute the data to the output
/// partitions until the output partitions request the data.
///
/// When polling, the operator sends the output partition number to the one partition channel, then the prefetch buffer will distribute the data based on the order of the partition number.
/// Each input steams has a prefetch buffer(channel) to distribute the data to the output partitions.
///
/// The following diagram illustrates the data flow of the OnDemandRepartitionExec operator with 3 output partitions for the input stream 1:
/// ```text
///         /\                     /\                     /\
///         ││                     ││                     ││
///         ││                     ││                     ││
///         ││                     ││                     ││
/// ┌───────┴┴────────┐    ┌───────┴┴────────┐    ┌───────┴┴────────┐
/// │     Stream      │    │     Stream      │    │     Stream      │
/// │       (1)       │    │       (2)       │    │       (3)       │
/// └────────┬────────┘    └───────┬─────────┘    └────────┬────────┘
///          │                     │                       │    / \
///          │                     │                       │    | |
///          │                     │                       │    | |
///          └────────────────┐    │    ┌──────────────────┘    | |
///                           │    │    │                       | |
///                           ▼    ▼    ▼                       | |
///                       ┌─────────────────┐                   | |
///  Send the partition   │ partion channel │                   | |
///  number when polling  │                 │                   | |
///                       └────────┬────────┘                   | |
///                                │                            | |
///                                │                            | |
///                                │  Get the partition number  | |
///                                ▼  then send data            | |
///                       ┌─────────────────┐                   | |
///                       │ Prefetch Buffer │───────────────────┘ |
///                       │       (1)       │─────────────────────┘
///                       └─────────────────┘ Distribute data to the output partitions
///
/// ```text

What changes are included in this PR?

Are these changes tested?

Are there any user-facing changes?

Benchmark

UPDATE:
I reviewed this PR again and verified that the optimizer's behavior remains consistent with RoundRobinBatch after incorporating OnDemandRepartitionExec. Based on the current results, performance is comparable for tpch and tpch_10. This PR also reduces excessive memory usage caused by prefetching.

Comparing main and on-demand-repartition-with-config
--------------------
Benchmark clickbench_partitioned.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃      main ┃ on-demand-repartition-with-config ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 0     │    1.38ms │                            1.36ms │     no change │
│ QQuery 1     │   24.85ms │                           21.66ms │ +1.15x faster │
│ QQuery 2     │   68.20ms │                           61.85ms │ +1.10x faster │
│ QQuery 3     │   58.28ms │                           50.94ms │ +1.14x faster │
│ QQuery 4     │  528.42ms │                          475.99ms │ +1.11x faster │
│ QQuery 5     │  600.49ms │                          545.88ms │ +1.10x faster │
│ QQuery 6     │   26.50ms │                           22.23ms │ +1.19x faster │
│ QQuery 7     │   29.55ms │                           24.92ms │ +1.19x faster │
│ QQuery 8     │  593.82ms │                          528.14ms │ +1.12x faster │
│ QQuery 9     │  825.73ms │                          758.74ms │ +1.09x faster │
│ QQuery 10    │  184.29ms │                          156.35ms │ +1.18x faster │
│ QQuery 11    │  199.85ms │                          184.58ms │ +1.08x faster │
│ QQuery 12    │  628.97ms │                          574.94ms │ +1.09x faster │
│ QQuery 13    │  956.18ms │                          846.00ms │ +1.13x faster │
│ QQuery 14    │  589.47ms │                          540.67ms │ +1.09x faster │
│ QQuery 15    │  581.46ms │                          583.13ms │     no change │
│ QQuery 16    │ 1146.36ms │                         1139.16ms │     no change │
│ QQuery 17    │ 1061.61ms │                         1060.72ms │     no change │
│ QQuery 18    │ 2981.00ms │                         3135.04ms │  1.05x slower │
│ QQuery 19    │   45.52ms │                           47.64ms │     no change │
│ QQuery 20    │  749.51ms │                          808.07ms │  1.08x slower │
│ QQuery 21    │  948.92ms │                         1041.79ms │  1.10x slower │
│ QQuery 22    │ 1762.67ms │                         1945.83ms │  1.10x slower │
│ QQuery 23    │ 6135.95ms │                         6001.12ms │     no change │
│ QQuery 24    │  347.65ms │                          314.43ms │ +1.11x faster │
│ QQuery 25    │  299.04ms │                          280.35ms │ +1.07x faster │
│ QQuery 26    │  389.45ms │                          352.87ms │ +1.10x faster │
│ QQuery 27    │ 1270.61ms │                         1107.89ms │ +1.15x faster │
│ QQuery 28    │ 9219.13ms │                         9246.19ms │     no change │
│ QQuery 29    │  413.52ms │                          410.10ms │     no change │
│ QQuery 30    │  507.99ms │                          517.29ms │     no change │
│ QQuery 31    │  523.95ms │                          516.16ms │     no change │
│ QQuery 32    │ 4086.74ms │                         3822.42ms │ +1.07x faster │
│ QQuery 33    │ 4527.02ms │                         4281.42ms │ +1.06x faster │
│ QQuery 34    │ 4620.38ms │                         4507.69ms │     no change │
│ QQuery 35    │  807.47ms │                          782.89ms │     no change │
│ QQuery 36    │  101.85ms │                          102.97ms │     no change │
│ QQuery 37    │   49.30ms │                           48.18ms │     no change │
│ QQuery 38    │   69.31ms │                           68.05ms │     no change │
│ QQuery 39    │  187.85ms │                          188.15ms │     no change │
│ QQuery 40    │   25.24ms │                           21.63ms │ +1.17x faster │
│ QQuery 41    │   19.93ms │                           20.37ms │     no change │
│ QQuery 42    │   28.83ms │                           27.58ms │     no change │
└──────────────┴───────────┴───────────────────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓
┃ Benchmark Summary                                ┃            ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩
│ Total Time (main)                                │ 48224.23ms │
│ Total Time (on-demand-repartition-with-config)   │ 47173.35ms │
│ Average Time (main)                              │  1121.49ms │
│ Average Time (on-demand-repartition-with-config) │  1097.05ms │
│ Queries Faster                                   │         21 │
│ Queries Slower                                   │          4 │
│ Queries with No Change                           │         18 │
└──────────────────────────────────────────────────┴────────────┘
--------------------
Benchmark tpch_mem_sf1.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃     main ┃ on-demand-repartition-with-config ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 1     │  74.67ms │                           81.91ms │  1.10x slower │
│ QQuery 2     │  12.86ms │                           13.10ms │     no change │
│ QQuery 3     │  21.95ms │                           22.46ms │     no change │
│ QQuery 4     │  10.63ms │                           11.24ms │  1.06x slower │
│ QQuery 5     │  34.58ms │                           34.76ms │     no change │
│ QQuery 6     │   4.29ms │                            4.56ms │  1.06x slower │
│ QQuery 7     │  65.73ms │                           66.18ms │     no change │
│ QQuery 8     │  15.16ms │                           14.89ms │     no change │
│ QQuery 9     │  37.87ms │                           36.69ms │     no change │
│ QQuery 10    │  31.83ms │                           31.63ms │     no change │
│ QQuery 11    │   5.86ms │                            5.55ms │ +1.06x faster │
│ QQuery 12    │  19.80ms │                           22.36ms │  1.13x slower │
│ QQuery 13    │  16.25ms │                           16.86ms │     no change │
│ QQuery 14    │   5.13ms │                            5.00ms │     no change │
│ QQuery 15    │  11.39ms │                           11.51ms │     no change │
│ QQuery 16    │  12.91ms │                           12.53ms │     no change │
│ QQuery 17    │  57.56ms │                           57.26ms │     no change │
│ QQuery 18    │ 123.26ms │                          117.69ms │     no change │
│ QQuery 19    │  24.02ms │                           24.64ms │     no change │
│ QQuery 20    │  19.34ms │                           19.70ms │     no change │
│ QQuery 21    │  84.93ms │                           83.70ms │     no change │
│ QQuery 22    │  18.57ms │                           18.51ms │     no change │
└──────────────┴──────────┴───────────────────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━┓
┃ Benchmark Summary                                ┃          ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━┩
│ Total Time (main)                                │ 708.57ms │
│ Total Time (on-demand-repartition-with-config)   │ 712.72ms │
│ Average Time (main)                              │  32.21ms │
│ Average Time (on-demand-repartition-with-config) │  32.40ms │
│ Queries Faster                                   │        1 │
│ Queries Slower                                   │        4 │
│ Queries with No Change                           │       17 │
└──────────────────────────────────────────────────┴──────────┘
--------------------
Benchmark tpch_mem_sf10.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━┓
┃ Query        ┃      main ┃ on-demand-repartition-with-config ┃         Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━┩
│ QQuery 1     │ 1205.24ms │                         1221.21ms │      no change │
│ QQuery 2     │  124.11ms │                          123.65ms │      no change │
│ QQuery 3     │  219.29ms │                          257.70ms │   1.18x slower │
│ QQuery 4     │  126.91ms │                          120.37ms │  +1.05x faster │
│ QQuery 5     │  852.54ms │                          543.09ms │  +1.57x faster │
│ QQuery 6     │  459.33ms │                           38.20ms │ +12.02x faster │
│ QQuery 7     │ 1336.75ms │                         1428.79ms │   1.07x slower │
│ QQuery 8     │  337.11ms │                          374.10ms │   1.11x slower │
│ QQuery 9     │ 1301.22ms │                         1032.98ms │  +1.26x faster │
│ QQuery 10    │  380.01ms │                          358.18ms │  +1.06x faster │
│ QQuery 11    │  104.96ms │                          105.24ms │      no change │
│ QQuery 12    │  247.07ms │                          246.46ms │      no change │
│ QQuery 13    │  274.90ms │                          355.60ms │   1.29x slower │
│ QQuery 14    │   64.29ms │                          108.13ms │   1.68x slower │
│ QQuery 15    │  131.72ms │                          127.82ms │      no change │
│ QQuery 16    │   95.77ms │                          159.78ms │   1.67x slower │
│ QQuery 17    │  892.58ms │                          875.12ms │      no change │
│ QQuery 18    │ 4457.59ms │                         4259.71ms │      no change │
│ QQuery 19    │  228.19ms │                          163.73ms │  +1.39x faster │
│ QQuery 20    │  293.38ms │                          238.83ms │  +1.23x faster │
│ QQuery 21    │ 1827.50ms │                         1738.36ms │      no change │
│ QQuery 22    │   90.64ms │                          100.17ms │   1.11x slower │
└──────────────┴───────────┴───────────────────────────────────┴────────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓
┃ Benchmark Summary                                ┃            ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩
│ Total Time (main)                                │ 15051.13ms │
│ Total Time (on-demand-repartition-with-config)   │ 13977.26ms │
│ Average Time (main)                              │   684.14ms │
│ Average Time (on-demand-repartition-with-config) │   635.33ms │
│ Queries Faster                                   │          7 │
│ Queries Slower                                   │          7 │
│ Queries with No Change                           │          8 │
└──────────────────────────────────────────────────┴────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓
┃ Benchmark Summary                                ┃            ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩
│ Total Time (main)                                │ 13520.93ms │
│ Total Time (on-demand-repartition-with-config)   │ 13997.54ms │
│ Average Time (main)                              │   614.59ms │
│ Average Time (on-demand-repartition-with-config) │   636.25ms │
│ Queries Faster                                   │          8 │
│ Queries Slower                                   │          7 │
│ Queries with No Change                           │          7 │
└──────────────────────────────────────────────────┴────────────┘
--------------------
Benchmark tpch_sf1.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃     main ┃ on-demand-repartition-with-config ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 1     │  93.60ms │                           98.43ms │  1.05x slower │
│ QQuery 2     │  20.71ms │                           19.28ms │ +1.07x faster │
│ QQuery 3     │  37.50ms │                           37.86ms │     no change │
│ QQuery 4     │  22.01ms │                           21.43ms │     no change │
│ QQuery 5     │  55.11ms │                           53.96ms │     no change │
│ QQuery 6     │  18.41ms │                           18.40ms │     no change │
│ QQuery 7     │  74.99ms │                           74.32ms │     no change │
│ QQuery 8     │  48.71ms │                           48.85ms │     no change │
│ QQuery 9     │  67.27ms │                           67.94ms │     no change │
│ QQuery 10    │  56.82ms │                           56.80ms │     no change │
│ QQuery 11    │  13.96ms │                           14.17ms │     no change │
│ QQuery 12    │  33.16ms │                           35.51ms │  1.07x slower │
│ QQuery 13    │  33.36ms │                           31.52ms │ +1.06x faster │
│ QQuery 14    │  27.97ms │                           28.48ms │     no change │
│ QQuery 15    │  43.40ms │                           41.47ms │     no change │
│ QQuery 16    │  15.27ms │                           14.51ms │ +1.05x faster │
│ QQuery 17    │  89.93ms │                           88.28ms │     no change │
│ QQuery 18    │ 117.42ms │                          115.98ms │     no change │
│ QQuery 19    │  45.06ms │                           46.41ms │     no change │
│ QQuery 20    │  39.22ms │                           38.78ms │     no change │
│ QQuery 21    │  92.68ms │                           91.44ms │     no change │
│ QQuery 22    │  15.93ms │                           15.85ms │     no change │
└──────────────┴──────────┴───────────────────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┓
┃ Benchmark Summary                                ┃           ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━┩
│ Total Time (main)                                │ 1062.51ms │
│ Total Time (on-demand-repartition-with-config)   │ 1059.66ms │
│ Average Time (main)                              │   48.30ms │
│ Average Time (on-demand-repartition-with-config) │   48.17ms │
│ Queries Faster                                   │         3 │
│ Queries Slower                                   │         2 │
│ Queries with No Change                           │        17 │
└──────────────────────────────────────────────────┴───────────┘
--------------------
Benchmark tpch_sf10.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┓
┃ Query        ┃      main ┃ on-demand-repartition-with-config ┃    Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━┩
│ QQuery 1     │  829.39ms │                          857.63ms │ no change │
│ QQuery 2     │  125.20ms │                          121.53ms │ no change │
│ QQuery 3     │  401.00ms │                          398.08ms │ no change │
│ QQuery 4     │  197.26ms │                          194.64ms │ no change │
│ QQuery 5     │  595.67ms │                          598.63ms │ no change │
│ QQuery 6     │  138.55ms │                          140.14ms │ no change │
│ QQuery 7     │  877.30ms │                          899.47ms │ no change │
│ QQuery 8     │  613.26ms │                          639.66ms │ no change │
│ QQuery 9     │  984.13ms │                         1018.76ms │ no change │
│ QQuery 10    │  549.10ms │                          543.43ms │ no change │
│ QQuery 11    │   87.79ms │                           88.50ms │ no change │
│ QQuery 12    │  280.14ms │                          280.92ms │ no change │
│ QQuery 13    │  410.62ms │                          415.76ms │ no change │
│ QQuery 14    │  226.57ms │                          224.97ms │ no change │
│ QQuery 15    │  372.03ms │                          385.68ms │ no change │
│ QQuery 16    │   92.60ms │                           95.83ms │ no change │
│ QQuery 17    │ 1061.41ms │                         1068.29ms │ no change │
│ QQuery 18    │ 1562.40ms │                         1576.68ms │ no change │
│ QQuery 19    │  389.10ms │                          387.42ms │ no change │
│ QQuery 20    │  363.99ms │                          366.32ms │ no change │
│ QQuery 21    │ 1340.67ms │                         1329.64ms │ no change │
│ QQuery 22    │  135.89ms │                          130.24ms │ no change │
└──────────────┴───────────┴───────────────────────────────────┴───────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓
┃ Benchmark Summary                                ┃            ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩
│ Total Time (main)                                │ 11634.06ms │
│ Total Time (on-demand-repartition-with-config)   │ 11762.23ms │
│ Average Time (main)                              │   528.82ms │
│ Average Time (on-demand-repartition-with-config) │   534.65ms │
│ Queries Faster                                   │          0 │
│ Queries Slower                                   │          0 │
│ Queries with No Change                           │         22 │
└──────────────────────────────────────────────────┴────────────┘
--------------------
Benchmark tpch_sf50.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃       main ┃ on-demand-repartition-with-config ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 1     │  4056.73ms │                         4105.18ms │     no change │
│ QQuery 2     │   821.39ms │                          824.71ms │     no change │
│ QQuery 3     │  2003.29ms │                         1992.30ms │     no change │
│ QQuery 4     │   960.06ms │                          974.04ms │     no change │
│ QQuery 5     │  3328.71ms │                         3326.73ms │     no change │
│ QQuery 6     │   672.96ms │                          672.13ms │     no change │
│ QQuery 7     │  9750.95ms │                         9099.02ms │ +1.07x faster │
│ QQuery 8     │  3829.53ms │                         3268.53ms │ +1.17x faster │
│ QQuery 9     │  6594.97ms │                         6189.83ms │ +1.07x faster │
│ QQuery 10    │  2779.14ms │                         2818.25ms │     no change │
│ QQuery 11    │   767.51ms │                          777.75ms │     no change │
│ QQuery 12    │  1328.17ms │                         1326.20ms │     no change │
│ QQuery 13    │  2438.08ms │                         2453.81ms │     no change │
│ QQuery 14    │  1061.81ms │                         1083.64ms │     no change │
│ QQuery 15    │  2412.39ms │                         2405.18ms │     no change │
│ QQuery 16    │   412.71ms │                          408.90ms │     no change │
│ QQuery 17    │  5928.70ms │                         5945.75ms │     no change │
│ QQuery 18    │ 15010.01ms │                        14812.32ms │     no change │
│ QQuery 19    │  1765.70ms │                         1822.16ms │     no change │
│ QQuery 20    │  2134.86ms │                         2071.58ms │     no change │
│ QQuery 21    │  8523.05ms │                         8563.92ms │     no change │
│ QQuery 22    │   670.66ms │                          667.75ms │     no change │
└──────────────┴────────────┴───────────────────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓
┃ Benchmark Summary                                ┃            ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩
│ Total Time (main)                                │ 77251.37ms │
│ Total Time (on-demand-repartition-with-config)   │ 75609.67ms │
│ Average Time (main)                              │  3511.43ms │
│ Average Time (on-demand-repartition-with-config) │  3436.80ms │
│ Queries Faster                                   │          3 │
│ Queries Slower                                   │          0 │
│ Queries with No Change                           │         19 │
└──────────────────────────────────────────────────┴────────────┘

@github-actions github-actions bot added physical-expr Physical Expressions optimizer Optimizer rules core Core DataFusion crate sqllogictest SQL Logic Tests (.slt) common Related to common crate proto Related to proto crate labels Feb 2, 2025
@Weijun-H Weijun-H force-pushed the on-demand-repartition-with-config branch from 54db067 to 6ffe62c Compare February 2, 2025 16:23
@github-actions github-actions bot added the documentation Improvements or additions to documentation label Feb 2, 2025
@ozankabak
Copy link
Contributor

@Weijun-H has been working on this with the Synnada team for a while. The initial benchmark results were promising, so we decided to continue development while receiving community feedback 🚀

@ozankabak
Copy link
Contributor

This is still in somewhat early stages, and there is work to do. But it might be good to get feedback early on from the community as the performance of this code is somewhat sensitive to idioms used with channels etc.

Copy link
Contributor

@mertak-synnada mertak-synnada left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for all the work! Just put some comments

@Weijun-H Weijun-H force-pushed the on-demand-repartition-with-config branch 3 times, most recently from 69a3c4f to f6934d1 Compare February 6, 2025 14:42
@Weijun-H Weijun-H marked this pull request as ready for review February 6, 2025 15:20
@alamb
Copy link
Contributor

alamb commented Feb 6, 2025

This is still in somewhat early stages, and there is work to do. But it might be good to get feedback early on from the community as the performance of this code is somewhat sensitive to idioms used with channels etc.

Maybe I am missing something, but the benchmark numbers reported above don't really show much of an improvement

For example, this branch appears to be basically the same

│ Total Time (main)                                │ 11767.43ms │
│ Total Time (on-demand-repartition-with-config)   │ 11787.12ms │

Are there any benchmarks that show a performance benefit of all this new code?

@ozankabak
Copy link
Contributor

@Weijun-H did some benchmarks a while back and the approach seemed promising in TPCH/SF50.

@mertak-synnada will do a detailed review of this tomorrow and then @Weijun-H can run the latest benchmarks for us to see how the performance changes

@berkaysynnada
Copy link
Contributor

Maybe I am missing something, but the benchmark numbers reported above don't really show much of an improvement

this might be a silly question but, did you set the config flag for on-demand-repartition-with-config branch?😅

@Weijun-H Weijun-H force-pushed the on-demand-repartition-with-config branch from fa91ea3 to beacced Compare February 7, 2025 04:18
@Weijun-H
Copy link
Member Author

Weijun-H commented Feb 7, 2025

I updated the latest benchmark results. It appears that the OnDemandRepartition improved performance on clickbench_partitioned and large datasets such as tpch_sf50. For tpch_sf1 and tpch_sf10, the results are similar. I will check again to ensure everything is functioning correctly in the coming days.
@ozankabak @alamb @berkaysynnada @mertak-synnada

UPDATE:
I reviewed this PR again and verified that the optimizer's behavior remains consistent with RoundRobinBatch after incorporating OnDemandRepartitionExec. Based on the current results, performance is comparable for tpch and tpch_10. Additionally, some queries in clickbench_partitioned benefit from this change. This PR also reduces excessive memory usage caused by prefetching.

@Weijun-H Weijun-H force-pushed the on-demand-repartition-with-config branch 2 times, most recently from 2ac6849 to df119c3 Compare February 7, 2025 05:14
@Weijun-H Weijun-H marked this pull request as draft February 7, 2025 07:22
@github-actions github-actions bot added core Core DataFusion crate and removed core Core DataFusion crate labels Feb 7, 2025
@Weijun-H Weijun-H force-pushed the on-demand-repartition-with-config branch from 794951b to 8b71674 Compare February 8, 2025 14:56
@Weijun-H Weijun-H marked this pull request as ready for review February 8, 2025 15:06
@2010YOUY01
Copy link
Contributor

Impressive work! I got a suggestion and a high-level question:

Suggestion

I think to justify this change, we have to make sure:

  • No performance regression (benchmarks already showed)
  • Reduce memory footprint, for queries which batch can accumulate in RepartitionExec (as the origin issue said)

I tried to check the memory usage for tpch-sf10 and clickbench, there is no noticeable change for those queries.
Perhaps we should construct queries with this anti-pattern, and demonstrate memory usage can actually reduced by this on-demand repartition executor?

Here is a script for checking memory usage in benchmark queries

# This script should be placed under benchmarks/
#
# Supported benchmarks are 'tpch' and 'clickbench'
#
# Example usage:
# Run TPCH benchmark and save results:
#   python3 membench.py run --benchmark tpch --result tpch_main.csv
#   python3 membench.py run --benchmark tpch --result tpch_optimized.csv
#
# Compare results:
#   python3 membench.py compare tpch_main.csv tpch_optimized.csv

import subprocess
import re
import csv
import argparse

def human_readable_size(size):
    units = ["B", "K", "M", "G", "T"]
    index = 0
    while size >= 1024 and index < len(units) - 1:
        size /= 1024.0
        index += 1
    return f"{size:.2f}{units[index]}"

def run_tpch_queries(label, result_file):
    results = []
    for query in range(1, 23):
        cmd = [
            "/usr/bin/time", "-l", "cargo", "run", "--release", "--bin", "dfbench", 
            "tpch", "--format", "parquet", "--path", "./data/tpch_sf10", 
            "--query", str(query), "--iterations", "1"
        ]
        
        process = subprocess.run(cmd, capture_output=True, text=True, shell=False)
        stderr_output = process.stderr
        
        match = re.search(r"(\d+)\s+maximum resident set size", stderr_output)
        max_rss = human_readable_size(int(match.group(1))) if match else "N/A"
        results.append((query, max_rss))
    
    with open(result_file, "w", newline='') as f:
        writer = csv.writer(f)
        writer.writerow(["Query", "Memory"])
        writer.writerows(results)
    
    print(f"Results saved to {result_file}")

def run_clickbench_queries(label, result_file):
    results = []
    for query in range(0, 43):
        cmd = [
            "/usr/bin/time", "-l", "cargo", "run", "--release", "--bin", "dfbench", 
            "clickbench", "--path", "./data/hits.parquet", 
            "--queries-path", "./queries/clickbench/queries.sql", 
            "--query", str(query), "--iterations", "1"
        ]
        
        process = subprocess.run(cmd, capture_output=True, text=True, shell=False)
        stderr_output = process.stderr
        
        match = re.search(r"(\d+)\s+maximum resident set size", stderr_output)
        max_rss = human_readable_size(int(match.group(1))) if match else "N/A"
        results.append((query, max_rss))
    
    with open(result_file, "w", newline='') as f:
        writer = csv.writer(f)
        writer.writerow(["Query", "Memory"])
        writer.writerows(results)
    
    print(f"Results saved to {result_file}")

def compare_results(file1, file2):
    results1, results2 = {}, {}
    
    with open(file1, "r") as f1, open(file2, "r") as f2:
        reader1, reader2 = csv.reader(f1), csv.reader(f2)
        next(reader1)  # Skip header
        next(reader2)  # Skip header
        
        for row in reader1:
            results1[row[0]] = row[1]
        for row in reader2:
            results2[row[0]] = row[1]
    
    print(f"{'Query':<10}{'Branch1':<10}{'Branch2':<10}{'Change'}")
    for query in results1:
        mem1 = results1[query]
        mem2 = results2.get(query, "N/A")
        
        if mem1 != "N/A" and mem2 != "N/A":
            size1 = float(mem1[:-1])
            size2 = float(mem2[:-1])
            ratio = size2 / size1 if size1 > 0 else 1.0
            change = f"{ratio:.2f}X" if abs(ratio - 1) > 0.05 else "No Change"
        else:
            change = "N/A"
        
        print(f"{query:<10}{mem1:<10}{mem2:<10}{change}")

def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("mode", choices=["run", "compare"], help="Run benchmarks or compare results")
    parser.add_argument("--result", help="Output result file for benchmarking")
    parser.add_argument("--benchmark", choices=["tpch", "clickbench"], help="Specify which benchmark to run")
    parser.add_argument("file1", nargs="?", help="First result file for comparison")
    parser.add_argument("file2", nargs="?", help="Second result file for comparison")
    args = parser.parse_args()
    
    if args.mode == "run" and args.result and args.benchmark:
        if args.benchmark == "tpch":
            run_tpch_queries("run", args.result)
        elif args.benchmark == "clickbench":
            run_clickbench_queries("run", args.result)
    elif args.mode == "compare" and args.file1 and args.file2:
        compare_results(args.file1, args.file2)
    else:
        print("Invalid arguments. Use --help for usage information.")

if __name__ == "__main__":
    main()

Results:

TPCH:
----
Query     Branch1   Branch2   Change
1         464.05M   460.78M   No Change
2         397.00M   412.77M   No Change
3         714.56M   630.64M   0.88X
4         408.53M   418.78M   No Change
5         741.30M   769.73M   No Change
6         390.02M   398.72M   No Change
7         3.41G     3.45G     No Change
8         1.08G     1.05G     No Change
9         2.37G     2.31G     No Change
10        1.11G     1.16G     No Change
11        260.78M   267.41M   No Change
12        429.95M   449.06M   No Change
13        675.67M   668.22M   No Change
14        666.56M   700.22M   No Change
15        673.66M   656.70M   No Change
16        485.81M   474.59M   No Change
17        605.38M   631.92M   No Change
18        3.26G     3.29G     No Change
19        500.77M   577.95M   1.15X
20        1.07G     1.05G     No Change
21        982.59M   978.69M   No Change
22        303.86M   302.14M   No Change

Clickbench:
...(no change)

Question

In my understanding the new repartition executor is a wrapper on RepartitionExec, to enable lazy evaluation, it should support both RoundRobin and Hash repartition right? This PR only swapped RoundRobin, do you also plan to add on-demand hash repartition in the future?

@xudong963 xudong963 self-requested a review February 10, 2025 04:43
@Weijun-H Weijun-H force-pushed the on-demand-repartition-with-config branch from 5f6ecdb to 4263b0f Compare February 10, 2025 06:19
@Weijun-H Weijun-H force-pushed the on-demand-repartition-with-config branch from 739fb65 to 846afdd Compare February 10, 2025 11:28
@Dandandan
Copy link
Contributor

Dandandan commented Feb 11, 2025

I wonder why tpch_mem_sf10 is slower for some queries? Might it be possible the created memtable is not created evenly because of the new round robin (that might be fixable e.g. by introducing another repartition after memoryexec).

@berkaysynnada
Copy link
Contributor

I wonder why tpch_mem_sf10 is slower for some queries? Might it be possible the created memtable is not created evenly because of the new round robin (that might be fixable e.g. by introducing another repartition after memoryexec).

Thank you for the advice @Dandandan. We will certainly check that after completing on-demand optimizations.

@Weijun-H
Copy link
Member Author

Weijun-H commented Feb 12, 2025

I wonder why tpch_mem_sf10 is slower for some queries? Might it be possible the created memtable is not created evenly because of the new round robin (that might be fixable e.g. by introducing another repartition after memoryexec).

Thank you for the advice @Dandandan. We will certainly check that after completing on-demand optimizations.

I agree with @berkaysynnada because OnDemandRepartition is not set by default; it should be enabled when necessary. However, it is a good point for the following optimization, @Dandandan 👍.

@Dandandan
Copy link
Contributor

I wonder why tpch_mem_sf10 is slower for some queries? Might it be possible the created memtable is not created evenly because of the new round robin (that might be fixable e.g. by introducing another repartition after memoryexec).

Thank you for the advice @Dandandan. We will certainly check that after completing on-demand optimizations.

I agree with @berkaysynnada because OnDemandRepartition is not set by default; it should be enabled when necessary. However, it is a good point for the following optimization, @Dandandan 👍.

I wonder if we can set it to default once we know it generally leads to faster execution?

@Dandandan
Copy link
Contributor

Specifically, I think we can try this approach together with on-demand repartition 🤔

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
common Related to common crate core Core DataFusion crate documentation Improvements or additions to documentation optimizer Optimizer rules physical-expr Physical Expressions proto Related to proto crate sqllogictest SQL Logic Tests (.slt)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Alternative approaches to "fan-out" style RepartitionExec
7 participants