diff --git a/Cilkscale_vis/cilkscale.py b/Cilkscale_vis/cilkscale.py new file mode 100644 index 0000000..c94eacd --- /dev/null +++ b/Cilkscale_vis/cilkscale.py @@ -0,0 +1,35 @@ +import argparse +from runner import run +from plotter import plot + +def main(): + ap = argparse.ArgumentParser() + ap.add_argument("--cilkscale", "-c", help="binary compiled with -fcilktool=cilkscale", required=True) + ap.add_argument("--cilkscale-benchmark", "-b", help="binary compiled with -fcilktool=cilkscale-benchmark", required=True) + ap.add_argument("--output-csv", "-ocsv", help="csv file for output data") + ap.add_argument("--output-plot", "-oplot", help="plot file dest") + ap.add_argument("--rows-to-plot", "-rplot", help="comma-separated list of rows to generate plots for (i.e. 1,2,3)") + ap.add_argument("--args", "-a", nargs="*", help="binary arguments") + + args = ap.parse_args() + print(args) + + out_csv = args.output_csv or "out.csv" + out_plot = args.output_plot or "plot.pdf" + + bin_instrument = args.cilkscale + bin_bench = args.cilkscale_benchmark + bin_args = args.args + + # generate data and save to out_csv (defaults to out.csv) + run(bin_instrument, bin_bench, bin_args, out_csv) + + # generate plot + # (out_plot defaults to plot.pdf) + # (rows defaults to just the last row in the csv) + rows_to_plot = list(map(int, args.rows_to_plot.split(","))) + plot(out_csv, out_plot, rows_to_plot) + +if __name__ == '__main__': + main() + diff --git a/Cilkscale_vis/out.pdf b/Cilkscale_vis/out.pdf new file mode 100644 index 0000000..5d3e96e Binary files /dev/null and b/Cilkscale_vis/out.pdf differ diff --git a/Cilkscale_vis/plotter.py b/Cilkscale_vis/plotter.py new file mode 100644 index 0000000..670160c --- /dev/null +++ b/Cilkscale_vis/plotter.py @@ -0,0 +1,131 @@ +import csv +import matplotlib +import matplotlib.pyplot as plt + +# upper bound P-worker runtime for program with work T1 and parallelism PAR +def bound_runtime(T1, PAR, P): + # We scale the span term by 1.7 based on the span coefficient in the + # Cilkview paper. + Tp = T1/P + (1.0-1.0/P)*1.7*T1/PAR + return Tp + +def sanitize_rows_to_plot(out_csv, rows_to_plot): + # if any row numbers are invalid, set them to the last row by default + num_rows = 0 + with open(out_csv, "r") as out_csv_file: + rows = csv.reader(out_csv_file, delimiter=",") + num_rows = len(list(rows)) + + new_rows_to_plot = set() + for r in rows_to_plot: + if r in range(1, num_rows): + new_rows_to_plot.add(r) + else: + new_rows_to_plot.add(num_rows-1) + + return new_rows_to_plot + +def get_row_data(out_csv, rows_to_plot): + # list of (tag, data) + all_data = [] + + with open(out_csv, "r") as out_csv_file: + rows = csv.reader(out_csv_file, delimiter=",") + + row_num = 0 + num_cpus = 0 + + par_col = 0 + bench_col_start = 0 + + for row in rows: + if row_num == 0: + # get num cpus (extracts num_cpus from, for example, "32c time (seconds)" ) + num_cpus = int(row[-1].split()[0][:-1]) + # find parallelism col and start of benchmark cols + for i in range(len(row)): + if row[i] == "burdened_parallelism": + par_col = i + elif row[i].startswith("1c"): + # subtract 1 because we will add 1-indexed cpu counts to this value + bench_col_start = i-1 + + elif row_num in rows_to_plot: + # collect data from this row of the csv file + tag = row[0] + parallelism = float(row[par_col]) + single_core_runtime = float(row[bench_col_start+1]) + + data = {} + data["num_workers"] = [] + data["obs_runtime"] = [] + data["perf_lin_runtime"] = [] + data["greedy_runtime"] = [] + data["span_runtime"] = [] + data["obs_speedup"] = [] + data["perf_lin_speedup"] = [] + data["greedy_speedup"] = [] + data["span_speedup"] = [] + + for i in range(1, num_cpus+1): + data["num_workers"].append(i) + + data["obs_runtime"].append(float(row[bench_col_start+i])) + data["perf_lin_runtime"].append(single_core_runtime/i) + data["greedy_runtime"].append(bound_runtime(single_core_runtime, parallelism, i)) + data["span_runtime"].append(single_core_runtime/parallelism) + + data["obs_speedup"].append(single_core_runtime/float(row[bench_col_start+i])) + data["perf_lin_speedup"].append(1.0*i) + data["greedy_speedup"].append(single_core_runtime/(bound_runtime(single_core_runtime, parallelism, i))) + data["span_speedup"].append(parallelism) + + all_data.append((tag, data)) + + row_num += 1 + + return all_data + +# by default, plots the last row (i.e. overall execution) +def plot(out_csv="out.csv", out_plot="plot.pdf", rows_to_plot=[0]): + + rows_to_plot = sanitize_rows_to_plot(out_csv, rows_to_plot) + all_data = get_row_data(out_csv, rows_to_plot) + + num_plots = len(all_data) + + print("Generate plot") + matplotlib.use('PDF') + fig, axs = plt.subplots(nrows=num_plots, ncols=2, figsize=(12,6*num_plots)) + + for r in range(num_plots): + tag, data = all_data[r] + + # legend shared between subplots. + axs[r,0].plot(data["num_workers"], data["obs_runtime"], "mo", label="Observed", linestyle='None', markersize = 5) + axs[r,0].plot(data["num_workers"], data["perf_lin_runtime"], "g", label="Perfect linear speedup") + axs[r,0].plot(data["num_workers"], data["greedy_runtime"], "c", label="Burdened-dag bound") + axs[r,0].plot(data["num_workers"], data["span_runtime"], "y", label="Span bound") + + num_workers = data["num_workers"][-1] + + axs[r,0].set_xlabel("Num workers") + axs[r,0].set_ylabel("Runtime") + axs[r,0].set_title(tag + " execution time") + axs[r,0].set_aspect(1.0/axs[r,0].get_data_ratio()) + + + axs[r,1].plot(data["num_workers"], data["obs_speedup"], "mo", label="Observed", linestyle='None', markersize = 5) + axs[r,1].plot(data["num_workers"], data["perf_lin_speedup"], "g", label="Perfect linear speedup") + axs[r,1].plot(data["num_workers"], data["greedy_speedup"], "c", label="Burdened-dag bound") + axs[r,1].plot(data["num_workers"], data["span_speedup"], "y", label="Span bound") + + axs[r,1].set_xlabel("Num workers") + axs[r,1].set_ylabel("Speedup") + axs[r,1].set_title(tag + " speedup") + axs[r,1].set(xlim=[0,num_workers], ylim=[0,num_workers]) + + axs[r,1].legend(loc="upper left") + + plt.savefig(out_plot) + diff --git a/Cilkscale_vis/runner.py b/Cilkscale_vis/runner.py new file mode 100644 index 0000000..9baeef9 --- /dev/null +++ b/Cilkscale_vis/runner.py @@ -0,0 +1,111 @@ +import subprocess +import sys +import os +import time +import csv + +# generate csv with parallelism numbers +def get_parallelism(bin_instrument, bin_args, out_csv): + out,err = run_command("CILKSCALE_OUT=" + out_csv + " " + bin_instrument + " " + " ".join(bin_args)) + return + +def run_command(cmd, asyn = False): + proc = subprocess.Popen([cmd], shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + if not asyn: + out,err=proc.communicate() + return out,err + else: + return "" + +def get_n_cpus(): + return len(get_cpu_ordering()) + +def benchmark_tmp_output(n): + return ".out.bench." + str(n) + ".csv" + +def run_on_p_workers(P, rcommand): + cpu_ordering = get_cpu_ordering() + cpu_online = cpu_ordering[:P] + + time.sleep(0.1) + rcommand = "taskset -c " + ",".join([str(p) for (p,m) in cpu_online]) + " " + rcommand + print(rcommand) + bench_out_csv = benchmark_tmp_output(P) + proc = subprocess.Popen(['CILK_NWORKERS=' + str(P) + ' ' + "CILKSCALE_OUT=" + bench_out_csv + " " + rcommand], shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + out,err=proc.communicate() + err = str(err, "utf-8") + +def get_cpu_ordering(): + out,err = run_command("lscpu --parse") + + out = str(out, 'utf-8') + + avail_cpus = [] + for l in out.splitlines(): + if l.startswith('#'): + continue + items = l.strip().split(',') + cpu_id = int(items[0]) + node_id = int(items[1]) + socket_id = int(items[2]) + avail_cpus.append((socket_id, node_id, cpu_id)) + + avail_cpus = sorted(avail_cpus) + ret = [] + added_nodes = dict() + for x in avail_cpus: + if x[1] not in added_nodes: + added_nodes[x[1]] = True + else: + continue + ret.append((x[2], x[0])) + return ret + +def run(bin_instrument, bin_bench, bin_args, out_csv="out.csv"): + # get parallelism + get_parallelism(bin_instrument, bin_args, out_csv) + + # get benchmark runtimes + NCPUS = get_n_cpus() + + print("Generating scalability data for " + str(NCPUS) + " cpus.") + + # this will be prepended with CILK_NWORKERS and CILKSCALE_OUT in run_on_p_workers + # any tmp files will be destroyed + run_command = bin_bench + " " + " ".join(bin_args) + results = dict() + for i in range(1, NCPUS+1): + results[i] = run_on_p_workers(i, run_command) + + new_rows = [] + + # read data from out_csv + with open(out_csv, "r") as out_csv_file: + rows = out_csv_file.readlines() + new_rows = rows.copy() + for i in range(len(new_rows)): + new_rows[i] = new_rows[i].strip("\n") + + # join all the csv data + for i in range(1, NCPUS+1): + with open(benchmark_tmp_output(i), "r") as csvfile: + reader = csv.reader(csvfile, delimiter=',') + row_num = 0 + for row in reader: + if row_num == 0: + col_header = str(i) + "c " + row[1].strip() + new_rows[row_num] += "," + col_header + else: + # second col contains runtime + time = row[1].strip() + new_rows[row_num] += "," + time + row_num += 1 + os.remove(benchmark_tmp_output(i)) + + for i in range(len(new_rows)): + new_rows[i] += "\n" + + # write the joined data to out_csv + with open(out_csv, "w") as out_csv_file: + out_csv_file.writelines(new_rows) +