Skip to content

Commit

Permalink
use channel wait in geyser + fix retransmitShreds comment
Browse files Browse the repository at this point in the history
  • Loading branch information
kprotty committed Jan 21, 2025
1 parent 84836dd commit 64dae09
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 3 deletions.
4 changes: 3 additions & 1 deletion src/geyser/core.zig
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,9 @@ pub const GeyserWriter = struct {
}

pub fn IOStreamLoop(self: *Self) !void {
while (!self.exit.load(.acquire)) {
while (true) {
self.io_channel.waitToReceive(.{ .unordered = self.exit }) catch break;

while (self.io_channel.tryReceive()) |payload| {
_ = self.writeToPipe(payload) catch |err| {
if (err == WritePipeError.PipeBlockedWithExitSignaled) {
Expand Down
4 changes: 3 additions & 1 deletion src/geyser/main.zig
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,9 @@ pub fn csvDumpIOWriter(
var timer = try sig.time.Timer.start();
errdefer exit.store(true, .monotonic);

while (!exit.load(.monotonic)) {
while (true) {
io_channel.waitToReceive(.{ .unordered = exit }) catch break;

while (io_channel.tryReceive()) |csv_row| {
// write to file
try csv_file.writeAll(csv_row);
Expand Down
3 changes: 2 additions & 1 deletion src/shred_network/shred_retransmitter.zig
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,8 @@ fn retransmitShreds(
while (!exit.load(.acquire)) {
var retransmit_shred_timer = try sig.time.Timer.start();

// NOTE: multiple `retransmitShreds` run concurrently can't use receiver.wait() here.
// NOTE: multiple `retransmitShreds` run concurrently so we can't use
// `receiver.waitToReceive()` here as it only supports one caller thread.
const retransmit_info: RetransmitShredInfo = receiver.tryReceive() orelse continue;
defer retransmit_info.turbine_tree.releaseUnsafe();

Expand Down

0 comments on commit 64dae09

Please sign in to comment.