diff --git a/benchmarking/tpch/ray_job_runner.py b/benchmarking/tpch/ray_job_runner.py index 89301cf647..dc6141ad18 100644 --- a/benchmarking/tpch/ray_job_runner.py +++ b/benchmarking/tpch/ray_job_runner.py @@ -1,3 +1,11 @@ +# /// script +# requires-python = ">=3.12" +# dependencies = [ +# "ray[default]", +# "getdaft", +# ] +# /// + from __future__ import annotations import argparse diff --git a/pyproject.toml b/pyproject.toml index 225bb8fc9a..7673bfbf24 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -93,6 +93,9 @@ venvPath = "." [[tool.pyright.executionEnvironments]] root = ".github/ci-scripts" +[[tool.pyright.executionEnvironments]] +root = "tools" + [tool.pytest.ini_options] addopts = "-m 'not (integration or benchmark or hypothesis)'" minversion = "6.0" diff --git a/tools/git_utils.py b/tools/git_utils.py new file mode 100644 index 0000000000..a149363bdc --- /dev/null +++ b/tools/git_utils.py @@ -0,0 +1,91 @@ +import subprocess +import time +import typing +from typing import Optional + +import gha_run_cluster_job +from github import Auth, Github +from github.Workflow import Workflow +from github.WorkflowRun import WorkflowRun + +RETRY_ATTEMPTS = 5 + +auth = Auth.Token(gha_run_cluster_job.get_oauth_token()) +g = Github(auth=auth) +repo = g.get_repo("Eventual-Inc/Daft") + + +def dispatch(workflow: Workflow, branch_name: str, inputs: dict) -> WorkflowRun: + pre_creation_latest_run = get_latest_run(workflow) + + print(f"Launching workflow '{workflow.name}' on the branch '{branch_name}'") + created = workflow.create_dispatch( + ref=branch_name, + inputs=inputs, + ) + if not created: + raise RuntimeError("Could not create workflow, suggestion: run again with --verbose") + + post_creation_latest_run = None + for _ in range(RETRY_ATTEMPTS): + post_creation_latest_run = get_latest_run(workflow) + if pre_creation_latest_run.run_number == post_creation_latest_run.run_number: + sleep_and_then_retry() + elif pre_creation_latest_run.run_number < post_creation_latest_run.run_number: + break + else: + typing.assert_never( + "Run numbers are always returned in sorted order and are always monotonically increasing" + ) + if not post_creation_latest_run: + raise RuntimeError(f"Unable to locate the new run request for the '{workflow.name}' workflow") + + print(f"Launched new '{workflow.name}' workflow with id: {post_creation_latest_run.id}") + print(f"View the workflow run at: {post_creation_latest_run.html_url}") + + return post_creation_latest_run + + +def sleep_and_then_retry(sleep_amount_sec: int = 3): + time.sleep(sleep_amount_sec) + + +def get_latest_run(workflow: Workflow) -> WorkflowRun: + for _ in range(RETRY_ATTEMPTS): + runs = workflow.get_runs() + + if runs.totalCount > 0: + return runs[0] + + sleep_and_then_retry() + + raise RuntimeError("Unable to list all workflow invocations") + + +def get_name_and_commit_hash(branch_name: Optional[str]) -> tuple[str, str]: + branch_name = branch_name or "HEAD" + name = ( + subprocess.check_output(["git", "rev-parse", "--abbrev-ref", branch_name], stderr=subprocess.STDOUT) + .strip() + .decode("utf-8") + ) + commit_hash = ( + subprocess.check_output(["git", "rev-parse", branch_name], stderr=subprocess.STDOUT).strip().decode("utf-8") + ) + return name, commit_hash + + +def parse_questions(questions: Optional[str], total_number_of_questions: int) -> list[int]: + if questions is None: + return list(range(total_number_of_questions)) + else: + + def to_int(q: str) -> int: + question = int(q) + if question > total_number_of_questions: + raise ValueError( + f"Question number should be less than {total_number_of_questions}, instead got {question}" + ) + return question + + return list(map(to_int, filter(lambda q: q, questions.split(",")))) diff --git a/tools/tpcds.py b/tools/tpcds.py new file mode 100644 index 0000000000..554abac914 --- /dev/null +++ b/tools/tpcds.py @@ -0,0 +1,97 @@ +# /// script +# requires-python = ">=3.12" +# dependencies = [ +# "PyGithub", +# "boto3", +# ] +# /// + +import argparse +import json +from typing import Optional + +import git_utils +import github + + +def run( + branch_name: str, + questions: Optional[str], + scale_factor: int, + cluster_profile: str, + env_vars: Optional[str], +): + branch_name, _ = git_utils.get_name_and_commit_hash(branch_name) + + expanded_questions = git_utils.parse_questions(questions, 99) + print(f"Running scale-factor of {scale_factor}GB on questions: {', '.join(map(str, expanded_questions))}") + args_as_list = [f"--question={q} --scale-factor={scale_factor}" for q in expanded_questions] + entrypoint_args = json.dumps(args_as_list) + + workflow = git_utils.repo.get_workflow("run-cluster.yaml") + git_utils.dispatch( + workflow=workflow, + branch_name=branch_name, + inputs={ + "cluster_profile": cluster_profile, + "working_dir": "benchmarking/tpcds", + "entrypoint_script": "ray_entrypoint.py", + "entrypoint_args": entrypoint_args, + "env_vars": env_vars, + }, + ) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("--ref", type=str, required=False, help="The branch name to run on") + parser.add_argument("--questions", type=str, required=False, help="A comma separated list of questions to run") + parser.add_argument( + "--scale-factor", + choices=[ + 2, + 5, + 10, + 100, + 1000, + ], + type=int, + required=False, + default=2, + help="The scale factor to run on", + ) + parser.add_argument( + "--cluster-profile", + choices=["debug_xs-x86", "medium-x86"], + type=str, + required=False, + help="The ray cluster configuration to run on", + ) + parser.add_argument( + "--env-var", + type=str, + action="append", + required=False, + help="Environment variable in the format KEY=VALUE. Can be specified multiple times.", + ) + parser.add_argument("--verbose", action="store_true", help="Verbose debugging") + args = parser.parse_args() + + if args.verbose: + github.enable_console_debug_logging() + + env_vars = None + if args.env_var: + list_of_env_vars: list[str] = args.env_var + for env_var in list_of_env_vars: + if "=" not in env_var: + raise ValueError("Environment variables must in the form `KEY=VALUE`") + env_vars = ",".join(list_of_env_vars) + + run( + branch_name=args.ref, + questions=args.questions, + scale_factor=args.scale_factor, + cluster_profile=args.cluster_profile, + env_vars=env_vars, + ) diff --git a/tools/tpch.py b/tools/tpch.py new file mode 100644 index 0000000000..744c524bf5 --- /dev/null +++ b/tools/tpch.py @@ -0,0 +1,101 @@ +# /// script +# requires-python = ">=3.12" +# dependencies = [ +# "PyGithub", +# "boto3", +# ] +# /// + +import argparse +import json +from typing import Optional + +import git_utils +import github + +TOTAL_NUMBER_OF_QUESTIONS = 22 + + +def run( + branch_name: str, + questions: Optional[str], + scale_factor: int, + num_partitions: int, + cluster_profile: str, + env_vars: Optional[str], +): + branch_name, _ = git_utils.get_name_and_commit_hash(branch_name) + + expanded_questions = git_utils.parse_questions(questions, TOTAL_NUMBER_OF_QUESTIONS) + print( + f"Running scale-factor of {scale_factor}GB with {num_partitions} partitions on questions: {', '.join(map(str, expanded_questions))}" + ) + args_as_list = [ + f"--question-number={q} --parquet-folder=s3://eventual-dev-benchmarking-fixtures/uncompressed/tpch-dbgen/{scale_factor}_0/{num_partitions}/parquet/" + for q in expanded_questions + ] + entrypoint_args = json.dumps(args_as_list) + + workflow = git_utils.repo.get_workflow("run-cluster.yaml") + git_utils.dispatch( + workflow=workflow, + branch_name=branch_name, + inputs={ + "cluster_profile": cluster_profile, + "working_dir": "benchmarking/tpch", + "entrypoint_script": "ray_job_runner.py", + "entrypoint_args": entrypoint_args, + "env_vars": env_vars, + }, + ) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("--ref", type=str, required=False, help="The branch name to run on") + parser.add_argument("--questions", type=str, required=False, help="A comma separated list of questions to run") + parser.add_argument( + "--scale-factor", + choices=[2, 10, 100, 1000], + type=int, + required=False, + default=2, + help="The scale factor to run on", + ) + parser.add_argument("--num-partitions", type=int, required=True, help="The number of partitions") + parser.add_argument( + "--cluster-profile", + choices=["debug_xs-x86", "medium-x86"], + type=str, + required=False, + help="The ray cluster configuration to run on", + ) + parser.add_argument( + "--env-var", + type=str, + action="append", + required=False, + help="Environment variable in the format KEY=VALUE. Can be specified multiple times.", + ) + parser.add_argument("--verbose", action="store_true", help="Verbose debugging") + args = parser.parse_args() + + if args.verbose: + github.enable_console_debug_logging() + + env_vars = None + if args.env_var: + list_of_env_vars: list[str] = args.env_var + for env_var in list_of_env_vars: + if "=" not in env_var: + raise ValueError("Environment variables must in the form `KEY=VALUE`") + env_vars = ",".join(list_of_env_vars) + + run( + branch_name=args.ref, + questions=args.questions, + scale_factor=args.scale_factor, + num_partitions=args.num_partitions, + cluster_profile=args.cluster_profile, + env_vars=env_vars, + )