From 7036fcf71484a59851034094fff7f2ae17d4aece Mon Sep 17 00:00:00 2001 From: Elton Leander Pinto Date: Tue, 3 Dec 2024 14:45:06 -0500 Subject: [PATCH] add spark-master-ip flag --- rpc/launch_tpch_queries.py | 8 ++- scripts/run_service_experiments.py | 81 ++++++++++++++++++------------ 2 files changed, 55 insertions(+), 34 deletions(-) diff --git a/rpc/launch_tpch_queries.py b/rpc/launch_tpch_queries.py index 47965d1b..04bff0ad 100644 --- a/rpc/launch_tpch_queries.py +++ b/rpc/launch_tpch_queries.py @@ -29,7 +29,7 @@ def launch_query(query_number, args): cmd = [ f"{args.spark_mirror_path.resolve()}/bin/spark-submit", *("--deploy-mode", "cluster"), - *("--master", "spark://130.207.125.81:7077"), + *("--master", f"spark://{args.spark_master_ip}:7077"), *("--conf", "'spark.port.maxRetries=132'"), *("--conf", "'spark.eventLog.enabled=true'"), *("--conf", f"'spark.eventLog.dir={args.spark_eventlog_dir.resolve()}'"), @@ -123,6 +123,12 @@ def main(): required=True, help="Path to spark-mirror repository", ) + parser.add_argument( + "--spark-master-ip", + type=str, + required=True, + help="IP address of node running Spark master", + ) parser.add_argument( "--tpch-spark-path", type=Path, diff --git a/scripts/run_service_experiments.py b/scripts/run_service_experiments.py index 92bde625..55e074d7 100644 --- a/scripts/run_service_experiments.py +++ b/scripts/run_service_experiments.py @@ -5,9 +5,6 @@ from pathlib import Path from dataclasses import dataclass -SPARK_MIRROR_PATH = str(Path("../spark_mirror").resolve()) -TPCH_SPARK_PATH = str(Path("../tpch-spark").resolve()) - def bang(cmd, dry_run, stdout=subprocess.PIPE, stderr=subprocess.PIPE): cmd = [str(part) for part in cmd] @@ -29,6 +26,8 @@ def must(cmd, dry_run, stdout=subprocess.PIPE, stderr=subprocess.PIPE): @dataclass class Service: service_args: any + spark_mirror_path: Path + spark_master_ip: str output_dir: Path dry_run: bool @@ -59,22 +58,22 @@ def __enter__(self): # launch spark master and worker self._master = must( [ - f"{SPARK_MIRROR_PATH}/sbin/start-master.sh", - *("--host", "130.207.125.81"), + f"{self.spark_mirror_path}/sbin/start-master.sh", + *("--host", self.spark_master_ip), *( "--properties-file", - f"{SPARK_MIRROR_PATH}/conf/spark-dg-config.conf", + f"{self.spark_mirror_path}/conf/spark-dg-config.conf", ), ], self.dry_run, ) self._worker = must( [ - f"{SPARK_MIRROR_PATH}/sbin/start-worker.sh", - "spark://130.207.125.81:7077", + f"{self.spark_mirror_path}/sbin/start-worker.sh", + f"spark://{self.spark_master_ip}:7077", *( "--properties-file", - f"{SPARK_MIRROR_PATH}/conf/spark-dg-config.conf", + f"{self.spark_mirror_path}/conf/spark-dg-config.conf", ), ], self.dry_run, @@ -90,9 +89,9 @@ def clean(self): if self._service: self._service.wait() if self._master: - must([f"{SPARK_MIRROR_PATH}/sbin/stop-master.sh"], self.dry_run) + must([f"{self.spark_mirror_path}/sbin/stop-master.sh"], self.dry_run) if self._worker: - must([f"{SPARK_MIRROR_PATH}/sbin/stop-worker.sh"], self.dry_run) + must([f"{self.spark_mirror_path}/sbin/stop-worker.sh"], self.dry_run) def __exit__(self, type, value, traceback): self.clean() @@ -101,6 +100,9 @@ def __exit__(self, type, value, traceback): @dataclass class Launcher: launcher_args: any + spark_mirror_path: Path + spark_master_ip: str + tpch_spark_path: Path output_dir: Path dry_run: bool @@ -113,8 +115,9 @@ def launch(self): [ *("python3", "-u", "-m", "rpc.launch_tpch_queries"), *self.launcher_args, - *("--spark-mirror-path", SPARK_MIRROR_PATH), - *("--tpch-spark-path", TPCH_SPARK_PATH), + *("--spark-master-ip", self.spark_master_ip), + *("--spark-mirror-path", self.spark_mirror_path), + *("--tpch-spark-path", self.tpch_spark_path), ], self.dry_run, stdout=f_out, @@ -127,19 +130,27 @@ class Experiment: name: str service_args: any launcher_args: any - args: any - def run(self): - output_dir = self.args.output_dir / self.name + def run(self, args): + output_dir = args.output_dir / self.name if not output_dir.exists(): output_dir.mkdir(parents=True) with Service( service_args=self.service_args, + spark_mirror_path=args.spark_mirror_path, + spark_master_ip=args.spark_master_ip, output_dir=output_dir, - dry_run=self.args.dry_run, + dry_run=args.dry_run, ) as s: - Launcher(self.launcher_args, output_dir, self.args.dry_run).launch() + Launcher( + launcher_args=self.launcher_args, + spark_mirror_path=args.spark_mirror_path, + spark_master_ip=args.spark_master_ip, + tpch_spark_path=args.tpch_spark_path, + output_dir=output_dir, + dry_run=args.dry_run, + ).launch() def main(): @@ -149,6 +160,24 @@ def main(): action="store_true", help="Prints commands that will be executed for each experiment", ) + parser.add_argument( + "--spark-mirror-path", + type=Path, + required=True, + help="Path to spark-mirror repository", + ) + parser.add_argument( + "--spark-master-ip", + type=str, + required=True, + help="IP address of node running Spark master", + ) + parser.add_argument( + "--tpch-spark-path", + type=Path, + required=True, + help="Path to TPC-H Spark repository", + ) parser.add_argument("--output-dir", type=Path, default=Path("exp-output")) args = parser.parse_args() @@ -183,19 +212,6 @@ def main(): *("--scheduler_plan_ahead_no_consideration_gap", 1), ] experiments = [ - Experiment( - name="edf-q300-hard", - service_args=[ - *base_args, - *edf_args, - *variance_args, - ], - launcher_args=[ - *("--num_queries", 300), - *("--variable_arrival_rate", 0.052), - ], - args=args, - ), Experiment( name="dsched-q300-hard", service_args=[ @@ -207,14 +223,13 @@ def main(): *("--num_queries", 300), *("--variable_arrival_rate", 0.052), ], - args=args, ), ] for i, experiment in enumerate(experiments): try: print(f"=== {experiment.name} ({i+1}/{len(experiments)}) ===") - experiment.run() + experiment.run(args) print("=== done ===") except Exception as e: print(traceback.format_exc())