From 7994dcfd4a676506f9f892ab8c983ef2397ca328 Mon Sep 17 00:00:00 2001 From: ZHANG Hao Date: Thu, 28 Oct 2021 00:12:14 +0800 Subject: [PATCH] fix: consumer records stats (#29) --- bin/bench.py | 28 ++++++++++++++++++++++++---- 1 file changed, 24 insertions(+), 4 deletions(-) diff --git a/bin/bench.py b/bin/bench.py index e3ce05e7108d1..3815a0eacb6a5 100755 --- a/bin/bench.py +++ b/bin/bench.py @@ -222,11 +222,18 @@ def parse_consumer(line): line = line.strip() toks = line.split(",") - if len(toks) >= 4 and toks[1].strip() == '0': - toks = line.split(",") + if len(toks) == 4 and "records received" in toks[0]: + r_t = toks[1].strip().split(" ") + rec_per_s = float(r_t[0].strip()) + thr_per_s = float(r_t[2].strip('(')) + + avg_lat = float(toks[2].strip().split(' ')[0].strip()) + max_lat = float(toks[3].strip().split(' ')[0].strip()) + + aggr_records = int(toks[0].strip().split(' ')[0]) + elif len(toks) >= 4 and toks[1].strip() == '0': rec_per_s = float(toks[5].strip()) thr_per_s = float(toks[3].strip()) - thr_unit = "MB/s" avg_lat = float(toks[-1].strip()) max_lat = float(toks[-2].strip()) @@ -383,6 +390,10 @@ def print_res(cum_records, total_rec_per_s, total_thr_per_s, thr_unit, total_avg out_f = open(args.output_file, "w+", encoding="UTF8") out_writer = csv.writer(out_f) + hist_aggr_records = {} + for host in hosts: + hist_aggr_records[host] = [0] * threads + while True: completed = 0 @@ -398,9 +409,11 @@ def print_res(cum_records, total_rec_per_s, total_thr_per_s, thr_unit, total_avg res_count = 0 for host, outs_per_host in outs.items(): + i = 0 for out in outs_per_host: try: line = next(out) + # print("{}:{} {}".format(host, res_count, line)) if args.type == "producer": rec_per_s, thr_per_s, thr_unit, avg_lat, max_lat, aggr_records, throughput_limit = parse_producer(line) total_throughput_limit += throughput_limit @@ -413,11 +426,16 @@ def print_res(cum_records, total_rec_per_s, total_thr_per_s, thr_unit, total_avg total_avg_lat = total_avg_lat + avg_lat * aggr_records total_max_lat = max(total_max_lat, max_lat) total_aggr_records = total_aggr_records + aggr_records + + hist_aggr_records[host][i] = aggr_records + # print("{}:{} {}".format(host, res_count, throughput_limit)) - # print("{}:{} {}".format(host, res_count, line)) # print(rec_per_s, thr_per_s, thr_unit, avg_lat, max_lat, aggr_records, total_rec_per_s, total_thr_per_s, total_avg_lat / total_aggr_records, total_max_lat, total_aggr_records) except StopIteration: completed += 1 + if args.type == "consumer": + total_aggr_records = total_aggr_records + hist_aggr_records[host][i] + if args.wait_for_all and completed >= total_clients: print("All " + str(completed) + " clients completed") break @@ -425,6 +443,8 @@ def print_res(cum_records, total_rec_per_s, total_thr_per_s, thr_unit, total_avg print("{} clients completed. Kill the {} other clients".format(completed, (total_clients - completed))) kill_clients(hosts, typ=args.type) break + + i += 1 else: continue break