-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Support On-Demand Repartition #14411
base: main
Are you sure you want to change the base?
Conversation
54db067
to
6ffe62c
Compare
@Weijun-H has been working on this with the Synnada team for a while. The initial benchmark results were promising, so we decided to continue development while receiving community feedback 🚀 |
This is still in somewhat early stages, and there is work to do. But it might be good to get feedback early on from the community as the performance of this code is somewhat sensitive to idioms used with channels etc. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for all the work! Just put some comments
datafusion/physical-plan/src/repartition/on_demand_repartition.rs
Outdated
Show resolved
Hide resolved
datafusion/physical-plan/src/repartition/on_demand_repartition.rs
Outdated
Show resolved
Hide resolved
datafusion/physical-plan/src/repartition/on_demand_repartition.rs
Outdated
Show resolved
Hide resolved
datafusion/physical-plan/src/repartition/on_demand_repartition.rs
Outdated
Show resolved
Hide resolved
datafusion/physical-plan/src/repartition/on_demand_repartition.rs
Outdated
Show resolved
Hide resolved
datafusion/physical-plan/src/repartition/on_demand_repartition.rs
Outdated
Show resolved
Hide resolved
69a3c4f
to
f6934d1
Compare
Maybe I am missing something, but the benchmark numbers reported above don't really show much of an improvement For example, this branch appears to be basically the same
Are there any benchmarks that show a performance benefit of all this new code? |
@Weijun-H did some benchmarks a while back and the approach seemed promising in TPCH/SF50. @mertak-synnada will do a detailed review of this tomorrow and then @Weijun-H can run the latest benchmarks for us to see how the performance changes |
this might be a silly question but, did you set the config flag for |
fa91ea3
to
beacced
Compare
I updated the latest benchmark results. It appears that the UPDATE: |
2ac6849
to
df119c3
Compare
datafusion/physical-plan/src/repartition/on_demand_repartition.rs
Outdated
Show resolved
Hide resolved
datafusion/physical-plan/src/repartition/on_demand_repartition.rs
Outdated
Show resolved
Hide resolved
datafusion/physical-plan/src/repartition/on_demand_repartition.rs
Outdated
Show resolved
Hide resolved
datafusion/physical-plan/src/repartition/on_demand_repartition.rs
Outdated
Show resolved
Hide resolved
datafusion/physical-plan/src/repartition/on_demand_repartition.rs
Outdated
Show resolved
Hide resolved
datafusion/physical-plan/src/repartition/on_demand_repartition.rs
Outdated
Show resolved
Hide resolved
794951b
to
8b71674
Compare
Impressive work! I got a suggestion and a high-level question: SuggestionI think to justify this change, we have to make sure:
I tried to check the memory usage for Here is a script for checking memory usage in benchmark queries # This script should be placed under benchmarks/
#
# Supported benchmarks are 'tpch' and 'clickbench'
#
# Example usage:
# Run TPCH benchmark and save results:
# python3 membench.py run --benchmark tpch --result tpch_main.csv
# python3 membench.py run --benchmark tpch --result tpch_optimized.csv
#
# Compare results:
# python3 membench.py compare tpch_main.csv tpch_optimized.csv
import subprocess
import re
import csv
import argparse
def human_readable_size(size):
units = ["B", "K", "M", "G", "T"]
index = 0
while size >= 1024 and index < len(units) - 1:
size /= 1024.0
index += 1
return f"{size:.2f}{units[index]}"
def run_tpch_queries(label, result_file):
results = []
for query in range(1, 23):
cmd = [
"/usr/bin/time", "-l", "cargo", "run", "--release", "--bin", "dfbench",
"tpch", "--format", "parquet", "--path", "./data/tpch_sf10",
"--query", str(query), "--iterations", "1"
]
process = subprocess.run(cmd, capture_output=True, text=True, shell=False)
stderr_output = process.stderr
match = re.search(r"(\d+)\s+maximum resident set size", stderr_output)
max_rss = human_readable_size(int(match.group(1))) if match else "N/A"
results.append((query, max_rss))
with open(result_file, "w", newline='') as f:
writer = csv.writer(f)
writer.writerow(["Query", "Memory"])
writer.writerows(results)
print(f"Results saved to {result_file}")
def run_clickbench_queries(label, result_file):
results = []
for query in range(0, 43):
cmd = [
"/usr/bin/time", "-l", "cargo", "run", "--release", "--bin", "dfbench",
"clickbench", "--path", "./data/hits.parquet",
"--queries-path", "./queries/clickbench/queries.sql",
"--query", str(query), "--iterations", "1"
]
process = subprocess.run(cmd, capture_output=True, text=True, shell=False)
stderr_output = process.stderr
match = re.search(r"(\d+)\s+maximum resident set size", stderr_output)
max_rss = human_readable_size(int(match.group(1))) if match else "N/A"
results.append((query, max_rss))
with open(result_file, "w", newline='') as f:
writer = csv.writer(f)
writer.writerow(["Query", "Memory"])
writer.writerows(results)
print(f"Results saved to {result_file}")
def compare_results(file1, file2):
results1, results2 = {}, {}
with open(file1, "r") as f1, open(file2, "r") as f2:
reader1, reader2 = csv.reader(f1), csv.reader(f2)
next(reader1) # Skip header
next(reader2) # Skip header
for row in reader1:
results1[row[0]] = row[1]
for row in reader2:
results2[row[0]] = row[1]
print(f"{'Query':<10}{'Branch1':<10}{'Branch2':<10}{'Change'}")
for query in results1:
mem1 = results1[query]
mem2 = results2.get(query, "N/A")
if mem1 != "N/A" and mem2 != "N/A":
size1 = float(mem1[:-1])
size2 = float(mem2[:-1])
ratio = size2 / size1 if size1 > 0 else 1.0
change = f"{ratio:.2f}X" if abs(ratio - 1) > 0.05 else "No Change"
else:
change = "N/A"
print(f"{query:<10}{mem1:<10}{mem2:<10}{change}")
def main():
parser = argparse.ArgumentParser()
parser.add_argument("mode", choices=["run", "compare"], help="Run benchmarks or compare results")
parser.add_argument("--result", help="Output result file for benchmarking")
parser.add_argument("--benchmark", choices=["tpch", "clickbench"], help="Specify which benchmark to run")
parser.add_argument("file1", nargs="?", help="First result file for comparison")
parser.add_argument("file2", nargs="?", help="Second result file for comparison")
args = parser.parse_args()
if args.mode == "run" and args.result and args.benchmark:
if args.benchmark == "tpch":
run_tpch_queries("run", args.result)
elif args.benchmark == "clickbench":
run_clickbench_queries("run", args.result)
elif args.mode == "compare" and args.file1 and args.file2:
compare_results(args.file1, args.file2)
else:
print("Invalid arguments. Use --help for usage information.")
if __name__ == "__main__":
main() Results:
QuestionIn my understanding the new repartition executor is a wrapper on |
…obin RepartitionExec
5f6ecdb
to
4263b0f
Compare
739fb65
to
846afdd
Compare
I wonder why tpch_mem_sf10 is slower for some queries? Might it be possible the created memtable is not created evenly because of the new round robin (that might be fixable e.g. by introducing another repartition after memoryexec). |
Thank you for the advice @Dandandan. We will certainly check that after completing on-demand optimizations. |
I agree with @berkaysynnada because |
I wonder if we can set it to default once we know it generally leads to faster execution? |
Specifically, I think we can try this approach together with on-demand repartition 🤔 |
Which issue does this PR close?
Closes #14287
Rationale for this change
prefer_round_robin_repartititon
in optimizer config, when it is false, replace allRoundRobinBatch
withOnDemandRepartition
OnDemandRepartitionExec
instead of the customized Distributed ChannelWhat changes are included in this PR?
Are these changes tested?
Are there any user-facing changes?
Benchmark
UPDATE:
I reviewed this PR again and verified that the optimizer's behavior remains consistent with
RoundRobinBatch
after incorporatingOnDemandRepartitionExec
. Based on the current results, performance is comparable fortpch
andtpch_10
.This PR also reduces excessive memory usage caused by prefetching.