Skip to content

Commit

Permalink
add spark-master-ip flag
Browse files Browse the repository at this point in the history
  • Loading branch information
1ntEgr8 committed Dec 3, 2024
1 parent cb96c3e commit 7036fcf
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 34 deletions.
8 changes: 7 additions & 1 deletion rpc/launch_tpch_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def launch_query(query_number, args):
cmd = [
f"{args.spark_mirror_path.resolve()}/bin/spark-submit",
*("--deploy-mode", "cluster"),
*("--master", "spark://130.207.125.81:7077"),
*("--master", f"spark://{args.spark_master_ip}:7077"),
*("--conf", "'spark.port.maxRetries=132'"),
*("--conf", "'spark.eventLog.enabled=true'"),
*("--conf", f"'spark.eventLog.dir={args.spark_eventlog_dir.resolve()}'"),
Expand Down Expand Up @@ -123,6 +123,12 @@ def main():
required=True,
help="Path to spark-mirror repository",
)
parser.add_argument(
"--spark-master-ip",
type=str,
required=True,
help="IP address of node running Spark master",
)
parser.add_argument(
"--tpch-spark-path",
type=Path,
Expand Down
81 changes: 48 additions & 33 deletions scripts/run_service_experiments.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,6 @@
from pathlib import Path
from dataclasses import dataclass

SPARK_MIRROR_PATH = str(Path("../spark_mirror").resolve())
TPCH_SPARK_PATH = str(Path("../tpch-spark").resolve())


def bang(cmd, dry_run, stdout=subprocess.PIPE, stderr=subprocess.PIPE):
cmd = [str(part) for part in cmd]
Expand All @@ -29,6 +26,8 @@ def must(cmd, dry_run, stdout=subprocess.PIPE, stderr=subprocess.PIPE):
@dataclass
class Service:
service_args: any
spark_mirror_path: Path
spark_master_ip: str
output_dir: Path
dry_run: bool

Expand Down Expand Up @@ -59,22 +58,22 @@ def __enter__(self):
# launch spark master and worker
self._master = must(
[
f"{SPARK_MIRROR_PATH}/sbin/start-master.sh",
*("--host", "130.207.125.81"),
f"{self.spark_mirror_path}/sbin/start-master.sh",
*("--host", self.spark_master_ip),
*(
"--properties-file",
f"{SPARK_MIRROR_PATH}/conf/spark-dg-config.conf",
f"{self.spark_mirror_path}/conf/spark-dg-config.conf",
),
],
self.dry_run,
)
self._worker = must(
[
f"{SPARK_MIRROR_PATH}/sbin/start-worker.sh",
"spark://130.207.125.81:7077",
f"{self.spark_mirror_path}/sbin/start-worker.sh",
f"spark://{self.spark_master_ip}:7077",
*(
"--properties-file",
f"{SPARK_MIRROR_PATH}/conf/spark-dg-config.conf",
f"{self.spark_mirror_path}/conf/spark-dg-config.conf",
),
],
self.dry_run,
Expand All @@ -90,9 +89,9 @@ def clean(self):
if self._service:
self._service.wait()
if self._master:
must([f"{SPARK_MIRROR_PATH}/sbin/stop-master.sh"], self.dry_run)
must([f"{self.spark_mirror_path}/sbin/stop-master.sh"], self.dry_run)
if self._worker:
must([f"{SPARK_MIRROR_PATH}/sbin/stop-worker.sh"], self.dry_run)
must([f"{self.spark_mirror_path}/sbin/stop-worker.sh"], self.dry_run)

def __exit__(self, type, value, traceback):
self.clean()
Expand All @@ -101,6 +100,9 @@ def __exit__(self, type, value, traceback):
@dataclass
class Launcher:
launcher_args: any
spark_mirror_path: Path
spark_master_ip: str
tpch_spark_path: Path
output_dir: Path
dry_run: bool

Expand All @@ -113,8 +115,9 @@ def launch(self):
[
*("python3", "-u", "-m", "rpc.launch_tpch_queries"),
*self.launcher_args,
*("--spark-mirror-path", SPARK_MIRROR_PATH),
*("--tpch-spark-path", TPCH_SPARK_PATH),
*("--spark-master-ip", self.spark_master_ip),
*("--spark-mirror-path", self.spark_mirror_path),
*("--tpch-spark-path", self.tpch_spark_path),
],
self.dry_run,
stdout=f_out,
Expand All @@ -127,19 +130,27 @@ class Experiment:
name: str
service_args: any
launcher_args: any
args: any

def run(self):
output_dir = self.args.output_dir / self.name
def run(self, args):
output_dir = args.output_dir / self.name
if not output_dir.exists():
output_dir.mkdir(parents=True)

with Service(
service_args=self.service_args,
spark_mirror_path=args.spark_mirror_path,
spark_master_ip=args.spark_master_ip,
output_dir=output_dir,
dry_run=self.args.dry_run,
dry_run=args.dry_run,
) as s:
Launcher(self.launcher_args, output_dir, self.args.dry_run).launch()
Launcher(
launcher_args=self.launcher_args,
spark_mirror_path=args.spark_mirror_path,
spark_master_ip=args.spark_master_ip,
tpch_spark_path=args.tpch_spark_path,
output_dir=output_dir,
dry_run=args.dry_run,
).launch()


def main():
Expand All @@ -149,6 +160,24 @@ def main():
action="store_true",
help="Prints commands that will be executed for each experiment",
)
parser.add_argument(
"--spark-mirror-path",
type=Path,
required=True,
help="Path to spark-mirror repository",
)
parser.add_argument(
"--spark-master-ip",
type=str,
required=True,
help="IP address of node running Spark master",
)
parser.add_argument(
"--tpch-spark-path",
type=Path,
required=True,
help="Path to TPC-H Spark repository",
)
parser.add_argument("--output-dir", type=Path, default=Path("exp-output"))
args = parser.parse_args()

Expand Down Expand Up @@ -183,19 +212,6 @@ def main():
*("--scheduler_plan_ahead_no_consideration_gap", 1),
]
experiments = [
Experiment(
name="edf-q300-hard",
service_args=[
*base_args,
*edf_args,
*variance_args,
],
launcher_args=[
*("--num_queries", 300),
*("--variable_arrival_rate", 0.052),
],
args=args,
),
Experiment(
name="dsched-q300-hard",
service_args=[
Expand All @@ -207,14 +223,13 @@ def main():
*("--num_queries", 300),
*("--variable_arrival_rate", 0.052),
],
args=args,
),
]

for i, experiment in enumerate(experiments):
try:
print(f"=== {experiment.name} ({i+1}/{len(experiments)}) ===")
experiment.run()
experiment.run(args)
print("=== done ===")
except Exception as e:
print(traceback.format_exc())
Expand Down

0 comments on commit 7036fcf

Please sign in to comment.