Skip to content

Commit

Permalink
refactor(benchmarks): Make benchmarks return sig.time.Duration & ad…
Browse files Browse the repository at this point in the history
…just imports (#302)

* Make benchmarks return Duration & adjust imports

* Use dir handles in snapshot load bench & refactor
  • Loading branch information
InKryption authored Oct 7, 2024
1 parent 8f4ddae commit d0752e4
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 59 deletions.
60 changes: 32 additions & 28 deletions src/accountsdb/db.zig
Original file line number Diff line number Diff line change
Expand Up @@ -4027,40 +4027,47 @@ pub const BenchmarkAccountsDBSnapshotLoad = struct {
// },
};

pub fn loadSnapshot(bench_args: BenchArgs) !u64 {
pub fn loadSnapshot(bench_args: BenchArgs) !sig.time.Duration {
const allocator = std.heap.c_allocator;

// unpack the snapshot
// NOTE: usually this will be an incremental snapshot
// renamed as a full snapshot (mv {inc-snap-fmt}.tar.zstd {full-snap-fmt}.tar.zstd)
// (because test snapshots are too small and full snapshots are too big)
const dir_path = sig.TEST_DATA_DIR ++ "bench_snapshot/";
const accounts_path = dir_path ++ "accounts";

// const logger = Logger{ .noop = {} };
const logger = Logger.init(allocator, .debug);
defer logger.deinit();
logger.spawn();

const snapshot_dir = std.fs.cwd().openDir(dir_path, .{ .iterate = true }) catch {
// unpack the snapshot
// NOTE: usually this will be an incremental snapshot
// renamed as a full snapshot (mv {inc-snap-fmt}.tar.zstd {full-snap-fmt}.tar.zstd)
// (because test snapshots are too small and full snapshots are too big)
const dir_path = sig.TEST_DATA_DIR ++ "bench_snapshot/";
var snapshot_dir = std.fs.cwd().openDir(dir_path, .{ .iterate = true }) catch {
std.debug.print("need to setup a snapshot in {s} for this benchmark...\n", .{dir_path});
return 0;
return sig.time.Duration.fromNanos(0);
};
defer snapshot_dir.close();

const snapshot_files = try SnapshotFiles.find(allocator, snapshot_dir);

std.fs.cwd().access(accounts_path, .{}) catch {
const archive_file = try snapshot_dir.openFile(snapshot_files.full_snapshot.snapshotNameStr().constSlice(), .{});
defer archive_file.close();
try parallelUnpackZstdTarBall(
allocator,
logger,
archive_file,
snapshot_dir,
try std.Thread.getCpuCount() / 2,
true,
);
};
var accounts_dir = inline for (0..2) |attempt| {
if (snapshot_dir.openDir("accounts", .{ .iterate = true })) |accounts_dir|
break accounts_dir
else |err| switch (err) {
else => |e| return e,
error.FileNotFound => if (attempt == 0) {
const archive_file = try snapshot_dir.openFile(snapshot_files.full_snapshot.snapshotNameStr().constSlice(), .{});
defer archive_file.close();
try parallelUnpackZstdTarBall(
allocator,
logger,
archive_file,
snapshot_dir,
try std.Thread.getCpuCount() / 2,
true,
);
},
}
} else return error.SnapshotMissingAccountsDir;
defer accounts_dir.close();

var snapshots = try AllSnapshotFields.fromFiles(allocator, logger, snapshot_dir, snapshot_files);
defer snapshots.deinit(allocator);
Expand All @@ -4072,9 +4079,6 @@ pub const BenchmarkAccountsDBSnapshotLoad = struct {
}, null);
defer accounts_db.deinit();

var accounts_dir = try std.fs.cwd().openDir(accounts_path, .{ .iterate = true });
defer accounts_dir.close();

const duration = try accounts_db.loadFromSnapshot(
snapshot.accounts_db_fields,
bench_args.n_threads,
Expand All @@ -4090,7 +4094,7 @@ pub const BenchmarkAccountsDBSnapshotLoad = struct {
});
std.debug.print("r: hash: {}, lamports: {}\n", .{ accounts_hash, total_lamports });

return duration.asNanos();
return duration;
}
};

Expand Down Expand Up @@ -4235,7 +4239,7 @@ pub const BenchmarkAccountsDB = struct {
// },
};

pub fn readWriteAccounts(bench_args: BenchArgs) !u64 {
pub fn readWriteAccounts(bench_args: BenchArgs) !sig.time.Duration {
const n_accounts = bench_args.n_accounts;
const slot_list_len = bench_args.slot_list_len;
const total_n_accounts = n_accounts * slot_list_len;
Expand Down Expand Up @@ -4375,7 +4379,7 @@ pub const BenchmarkAccountsDB = struct {
std.debug.print("WRITE: {d}\n", .{elapsed});
}

var timer = try std.time.Timer.start();
var timer = try sig.time.Timer.start();
for (0..n_accounts) |i| {
const pubkey = &pubkeys[i];
const account = try accounts_db.getAccount(pubkey);
Expand Down
14 changes: 7 additions & 7 deletions src/accountsdb/swiss_map.zig
Original file line number Diff line number Diff line change
Expand Up @@ -639,7 +639,7 @@ pub const BenchmarkSwissMap = struct {
},
};

pub fn swissmapReadWriteBenchmark(bench_args: BenchArgs) !u64 {
pub fn swissmapReadWriteBenchmark(bench_args: BenchArgs) !sig.time.Duration {
const allocator = std.heap.c_allocator;
const n_accounts = bench_args.n_accounts;

Expand Down Expand Up @@ -679,18 +679,18 @@ pub const BenchmarkSwissMap = struct {
null,
);

const write_speedup = @as(f32, @floatFromInt(std_write_time)) / @as(f32, @floatFromInt(write_time));
const write_speedup = @as(f32, @floatFromInt(std_write_time.asNanos())) / @as(f32, @floatFromInt(write_time.asNanos()));
const write_faster_or_slower = if (write_speedup < 1.0) "slower" else "faster";
std.debug.print("\tWRITE: {} ({d:.2}x {s} than std)\n", .{
std.fmt.fmtDuration(write_time),
std.fmt.fmtDuration(write_time.asNanos()),
write_speedup,
write_faster_or_slower,
});

const read_speedup = @as(f32, @floatFromInt(std_read_time)) / @as(f32, @floatFromInt(read_time));
const read_speedup = @as(f32, @floatFromInt(std_read_time.asNanos())) / @as(f32, @floatFromInt(read_time.asNanos()));
const read_faster_or_slower = if (read_speedup < 1.0) "slower" else "faster";
std.debug.print("\tREAD: {} ({d:.2}x {s} than std)\n", .{
std.fmt.fmtDuration(read_time),
std.fmt.fmtDuration(read_time.asNanos()),
read_speedup,
read_faster_or_slower,
});
Expand All @@ -705,10 +705,10 @@ fn benchGetOrPut(
accounts: []accounts_db.index.AccountRef,
pubkeys: []sig.core.Pubkey,
read_amount: ?usize,
) !struct { usize, usize } {
) !struct { sig.time.Duration, sig.time.Duration } {
var t = try T.initCapacity(allocator, accounts.len);

var timer = try std.time.Timer.start();
var timer = try sig.time.Timer.start();
for (0..accounts.len) |i| {
const result = t.getOrPutAssumeCapacity(accounts[i].pubkey);
if (!result.found_existing) {
Expand Down
14 changes: 8 additions & 6 deletions src/benchmarks.zig
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
const std = @import("std");
const builtin = @import("builtin");
const logger = @import("./trace/log.zig");

const sig = @import("sig.zig");

const Decl = std.builtin.Type.Declaration;

Expand All @@ -12,7 +13,7 @@ const meta = std.meta;
/// zig build benchmark -- gossip
pub fn main() !void {
const allocator = std.heap.c_allocator;
logger.default_logger.* = logger.Logger.init(allocator, .debug);
sig.trace.log.default_logger.* = sig.trace.Logger.init(allocator, .debug);

if (builtin.mode == .Debug) std.debug.print("warning: running benchmark in Debug mode\n", .{});

Expand Down Expand Up @@ -213,12 +214,13 @@ pub fn benchmark(
while (i < min_iterations or
(i < max_iterations and runtime_sum < max_time)) : (i += 1)
{
const ns_time = try switch (@TypeOf(arg)) {
void => @field(B, def.name)(),
else => @field(B, def.name)(arg),
const benchFunction = @field(B, def.name);
const ns_duration: sig.time.Duration = try switch (@TypeOf(arg)) {
void => benchFunction(),
else => benchFunction(arg),
};

const runtime = try time_unit.unitsfromNanoseconds(ns_time);
const runtime = try time_unit.unitsfromNanoseconds(ns_duration.asNanos());

runtimes[i] = runtime;
runtime_sum += runtime;
Expand Down
8 changes: 4 additions & 4 deletions src/gossip/service.zig
Original file line number Diff line number Diff line change
Expand Up @@ -3229,7 +3229,7 @@ pub const BenchmarkGossipServiceGeneral = struct {
},
};

pub fn benchmarkGossipService(bench_args: BenchmarkArgs) !usize {
pub fn benchmarkGossipService(bench_args: BenchmarkArgs) !sig.time.Duration {
const allocator = std.heap.c_allocator;
var keypair = try KeyPair.create(null);
var address = SocketAddr.initIpv4(.{ 127, 0, 0, 1 }, 8888);
Expand Down Expand Up @@ -3312,7 +3312,7 @@ pub const BenchmarkGossipServiceGeneral = struct {
});

// wait for all messages to be processed
var timer = try std.time.Timer.start();
var timer = try sig.time.Timer.start();

gossip_service.shutdown();
packet_handle.join();
Expand Down Expand Up @@ -3345,7 +3345,7 @@ pub const BenchmarkGossipServicePullRequests = struct {
},
};

pub fn benchmarkPullRequests(bench_args: BenchmarkArgs) !usize {
pub fn benchmarkPullRequests(bench_args: BenchmarkArgs) !sig.time.Duration {
const allocator = std.heap.c_allocator;
var keypair = try KeyPair.create(null);
var address = SocketAddr.initIpv4(.{ 127, 0, 0, 1 }, 8888);
Expand Down Expand Up @@ -3434,7 +3434,7 @@ pub const BenchmarkGossipServicePullRequests = struct {
},
});

var timer = try std.time.Timer.start();
var timer = try sig.time.Timer.start();

// wait for all messages to be processed
gossip_service.shutdown();
Expand Down
21 changes: 12 additions & 9 deletions src/net/socket_utils.zig
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
const std = @import("std");
const Allocator = std.mem.Allocator;
const Atomic = std.atomic.Value;

const sig = @import("../sig.zig");
const Packet = sig.net.Packet;
const PACKET_DATA_SIZE = sig.net.PACKET_DATA_SIZE;
const Channel = sig.sync.Channel;
const Logger = sig.trace.Logger;

const UdpSocket = @import("zig-network").Socket;
const Packet = @import("packet.zig").Packet;
const PACKET_DATA_SIZE = @import("packet.zig").PACKET_DATA_SIZE;
const Channel = @import("../sync/channel.zig").Channel;
const std = @import("std");
const Logger = @import("../trace/log.zig").Logger;

pub const SOCKET_TIMEOUT_US: usize = 1 * std.time.us_per_s;
pub const PACKETS_PER_BATCH: usize = 64;
Expand Down Expand Up @@ -147,7 +150,7 @@ pub const BenchmarkPacketProcessing = struct {
},
};

pub fn benchmarkReadSocket(bench_args: BenchmarkArgs) !u64 {
pub fn benchmarkReadSocket(bench_args: BenchmarkArgs) !sig.time.Duration {
const n_packets = bench_args.n_packets;
const allocator = std.heap.c_allocator;

Expand All @@ -174,7 +177,7 @@ pub const BenchmarkPacketProcessing = struct {

var rand = std.rand.DefaultPrng.init(0);
var packet_buf: [PACKET_DATA_SIZE]u8 = undefined;
var timer = try std.time.Timer.start();
var timer = try sig.time.Timer.start();

// NOTE: send more packets than we need because UDP drops some
for (1..(n_packets * 2 + 1)) |i| {
Expand All @@ -186,8 +189,8 @@ pub const BenchmarkPacketProcessing = struct {
// = 10 packets per second
if (i % 10 == 0) {
const elapsed = timer.read();
if (elapsed < std.time.ns_per_s) {
std.time.sleep(std.time.ns_per_s - elapsed);
if (elapsed.asNanos() < std.time.ns_per_s) {
std.time.sleep(std.time.ns_per_s - elapsed.asNanos());
}
}
}
Expand Down
11 changes: 6 additions & 5 deletions src/sync/channel.zig
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ const std = @import("std");
const Atomic = std.atomic.Value;
const Allocator = std.mem.Allocator;
const assert = std.debug.assert;
const Mux = @import("mux.zig").Mux;

const sig = @import("../sig.zig");
const Backoff = @import("backoff.zig").Backoff;

pub fn Channel(T: type) type {
Expand Down Expand Up @@ -532,12 +533,12 @@ pub const BenchmarkChannel = struct {
},
};

pub fn benchmarkSimpleUsizeBetterChannel(argss: BenchmarkArgs) !usize {
pub fn benchmarkSimpleUsizeBetterChannel(argss: BenchmarkArgs) !sig.time.Duration {
var thread_handles: [64]?std.Thread = [_]?std.Thread{null} ** 64;
const n_items = argss.n_items;
const senders_count = argss.n_senders;
const receivers_count = argss.n_receivers;
var timer = try std.time.Timer.start();
var timer = try sig.time.Timer.start();

const allocator = std.heap.c_allocator;
var channel = try Channel(usize).init(allocator);
Expand Down Expand Up @@ -567,12 +568,12 @@ pub const BenchmarkChannel = struct {
return elapsed;
}

pub fn benchmarkSimplePacketBetterChannel(argss: BenchmarkArgs) !usize {
pub fn benchmarkSimplePacketBetterChannel(argss: BenchmarkArgs) !sig.time.Duration {
var thread_handles: [64]?std.Thread = [_]?std.Thread{null} ** 64;
const n_items = argss.n_items;
const senders_count = argss.n_senders;
const receivers_count = argss.n_receivers;
var timer = try std.time.Timer.start();
var timer = try sig.time.Timer.start();

const allocator = std.heap.c_allocator;
var channel = try Channel(Packet).init(allocator);
Expand Down

0 comments on commit d0752e4

Please sign in to comment.