diff --git a/src/gossip/service.zig b/src/gossip/service.zig index 93b7d121f..ca59db5af 100644 --- a/src/gossip/service.zig +++ b/src/gossip/service.zig @@ -53,7 +53,7 @@ const PingAndSocketAddr = sig.gossip.ping_pong.PingAndSocketAddr; const ServiceManager = sig.utils.service_manager.ServiceManager; const Duration = sig.time.Duration; const ExitCondition = sig.sync.ExitCondition; -const SocketPipe = sig.net.SocketPipe; +const SocketThread = sig.net.SocketThread; const endpointToString = sig.net.endpointToString; const globalRegistry = sig.prometheus.globalRegistry; @@ -104,7 +104,7 @@ const GOSSIP_PRNG_SEED = 19; /// The flow of data goes as follows: /// -/// `SocketPipe` -> +/// `SocketThread.initReceiver` -> /// - reads from the gossip socket /// - puts the new packet onto `packet_incoming_channel` /// - repeat until exit @@ -120,14 +120,14 @@ const GOSSIP_PRNG_SEED = 19; /// - processes the verified message it has received /// - depending on the type of message received, it may put something onto `packet_outgoing_channel` /// -/// `SocketPipe` -> +/// `SocketThread.initSender` -> /// - receives from `packet_outgoing_channel` /// - sends the outgoing packet onto the gossip socket /// - repeats while `exit` is false and `packet_outgoing_channel` -/// - when `SocketPipe` sees that `exit` has become `true`, it will begin waiting on +/// - when `SocketThread` sees that `exit` has become `true`, it will begin waiting on /// the previous thing in the chain to close, that usually being `processMessages`. /// this ensures that `processMessages` doesn't add new items to `packet_outgoing_channel` -/// after the `SocketPipe` thread exits. +/// after the `SocketThread` exits. /// pub const GossipService = struct { /// used for general allocation purposes @@ -148,10 +148,10 @@ pub const GossipService = struct { /// Indicates if the gossip service is closed. closed: bool, - /// Piping between the gossip_socket. + /// Piping data between the gossip_socket and the channels. /// Set to null until start() is called as they represent threads. - incoming_pipe: ?*SocketPipe = null, - outgoing_pipe: ?*SocketPipe = null, + incoming_socket_thread: ?*SocketThread = null, + outgoing_socket_thread: ?*SocketThread = null, /// communication between threads packet_incoming_channel: *Channel(Packet), @@ -334,8 +334,8 @@ pub const GossipService = struct { self.service_manager.deinit(); // Wait for pipes to shutdown if any - if (self.incoming_pipe) |pipe| pipe.deinit(self.allocator); - if (self.outgoing_pipe) |pipe| pipe.deinit(self.allocator); + if (self.incoming_socket_thread) |thread| thread.join(); + if (self.outgoing_socket_thread) |thread| thread.join(); // assert the channels are empty in order to make sure no data was lost. // everything should be cleaned up when the thread-pool joins. @@ -403,7 +403,7 @@ pub const GossipService = struct { }, }; - self.incoming_pipe = try SocketPipe.initReceiver( + self.incoming_socket_thread = try SocketThread.spawnReceiver( self.allocator, self.logger.unscoped(), self.gossip_socket, @@ -434,7 +434,7 @@ pub const GossipService = struct { exit_condition.ordered.exit_index += 1; } - self.outgoing_pipe = try SocketPipe.initSender( + self.outgoing_socket_thread = try SocketThread.spawnSender( self.allocator, self.logger.unscoped(), self.gossip_socket, diff --git a/src/net/lib.zig b/src/net/lib.zig index 99571b034..5d24bc2f7 100644 --- a/src/net/lib.zig +++ b/src/net/lib.zig @@ -7,7 +7,7 @@ pub const quic_client = @import("quic_client.zig"); pub const IpAddr = net.IpAddr; pub const SocketAddr = net.SocketAddr; pub const Packet = packet.Packet; -pub const SocketPipe = socket_utils.SocketPipe; +pub const SocketThread = socket_utils.SocketThread; pub const requestIpEcho = echo.requestIpEcho; pub const enablePortReuse = net.enablePortReuse; diff --git a/src/net/socket_utils.zig b/src/net/socket_utils.zig index 0cce7c84d..afd2a9e86 100644 --- a/src/net/socket_utils.zig +++ b/src/net/socket_utils.zig @@ -18,51 +18,55 @@ pub const PACKETS_PER_BATCH: usize = 64; // The identifier for the scoped logger used in this file. const LOG_SCOPE: []const u8 = "socket_utils"; -pub const SocketPipe = struct { +pub const SocketThread = struct { + allocator: Allocator, handle: std.Thread, - const Self = @This(); - - pub fn initSender( + pub fn spawnSender( allocator: Allocator, logger: Logger, socket: UdpSocket, outgoing_channel: *Channel(Packet), exit: ExitCondition, - ) !*Self { - // TODO(king): store event-loop data in SocketPipe (hence, heap-alloc).. - const self = try allocator.create(Self); - errdefer allocator.destroy(self); - - self.handle = try std.Thread.spawn( - .{}, - runSender, - .{ logger, socket, outgoing_channel, exit }, - ); - - return self; + ) !*SocketThread { + return spawn(allocator, logger, socket, outgoing_channel, exit, runSender); } - pub fn initReceiver( + pub fn spawnReceiver( allocator: Allocator, logger: Logger, socket: UdpSocket, incoming_channel: *Channel(Packet), exit: ExitCondition, - ) !*Self { - // TODO(king): store event-loop data in SocketPipe (hence, heap-alloc).. - const self = try allocator.create(Self); + ) !*SocketThread { + return spawn(allocator, logger, socket, incoming_channel, exit, runReceiver); + } + + fn spawn( + allocator: Allocator, + logger: Logger, + socket: UdpSocket, + channel: *Channel(Packet), + exit: ExitCondition, + comptime runFn: anytype, + ) !*SocketThread { + // TODO(king): store event-loop data in SocketThread (hence, heap-alloc).. + const self = try allocator.create(SocketThread); errdefer allocator.destroy(self); - self.handle = try std.Thread.spawn( - .{}, - runReceiver, - .{ logger, socket, incoming_channel, exit }, - ); + self.* = .{ + .allocator = allocator, + .handle = try std.Thread.spawn(.{}, runFn, .{ logger, socket, channel, exit }), + }; return self; } + pub fn join(self: *SocketThread) void { + self.handle.join(); + self.allocator.destroy(self); + } + fn runReceiver( logger_: Logger, socket_: UdpSocket, @@ -123,11 +127,6 @@ pub const SocketPipe = struct { } } } - - pub fn deinit(self: *Self, allocator: Allocator) void { - self.handle.join(); - allocator.destroy(self); - } }; pub const BenchmarkPacketProcessing = struct { @@ -164,9 +163,14 @@ pub const BenchmarkPacketProcessing = struct { var incoming_channel = try Channel(Packet).init(allocator); defer incoming_channel.deinit(); - const incoming_pipe = try SocketPipe - .init(allocator, .receiver, .noop, socket, &incoming_channel, exit_condition); - defer incoming_pipe.deinit(allocator); + const incoming_pipe = try SocketThread.spawnReceiver( + allocator, + .noop, + socket, + &incoming_channel, + exit_condition, + ); + defer incoming_pipe.join(); // Start outgoing @@ -200,9 +204,14 @@ pub const BenchmarkPacketProcessing = struct { var outgoing_channel = try Channel(Packet).init(allocator); defer outgoing_channel.deinit(); - const outgoing_pipe = try SocketPipe - .init(allocator, .sender, .noop, socket, &outgoing_channel, exit_condition); - defer outgoing_pipe.deinit(allocator); + const outgoing_pipe = try SocketThread.spawnSender( + allocator, + .noop, + socket, + &outgoing_channel, + exit_condition, + ); + defer outgoing_pipe.join(); const outgoing_handle = try std.Thread.spawn( .{}, diff --git a/src/shred_network/repair_service.zig b/src/shred_network/repair_service.zig index ed516c421..7b0573409 100644 --- a/src/shred_network/repair_service.zig +++ b/src/shred_network/repair_service.zig @@ -29,7 +29,7 @@ const Registry = sig.prometheus.Registry; const RwMux = sig.sync.RwMux; const SignedGossipData = sig.gossip.SignedGossipData; const SocketAddr = sig.net.SocketAddr; -const SocketPipe = sig.net.SocketPipe; +const SocketThread = sig.net.SocketThread; const Channel = sig.sync.Channel; const Slot = sig.core.Slot; @@ -252,7 +252,7 @@ pub const RepairRequester = struct { logger: ScopedLogger(@typeName(Self)), random: Random, keypair: *const KeyPair, - sender_pipe: *SocketPipe, + sender_thread: *SocketThread, sender_channel: *Channel(Packet), metrics: Metrics, @@ -277,28 +277,27 @@ pub const RepairRequester = struct { const channel = try Channel(Packet).create(allocator); errdefer channel.destroy(); - const pipe = try SocketPipe.initSender( + const thread = try SocketThread.spawnSender( allocator, logger, udp_send_socket, channel, .{ .unordered = exit }, ); - errdefer pipe.deinit(allocator); return .{ .allocator = allocator, .logger = logger.withScope(@typeName(Self)), .random = random, .keypair = keypair, - .sender_pipe = pipe, + .sender_thread = thread, .sender_channel = channel, .metrics = try registry.initStruct(Metrics), }; } pub fn deinit(self: Self) void { - self.sender_pipe.deinit(self.allocator); + self.sender_thread.join(); self.sender_channel.destroy(); } diff --git a/src/shred_network/shred_receiver.zig b/src/shred_network/shred_receiver.zig index 6af1d44e6..00f5866ad 100644 --- a/src/shred_network/shred_receiver.zig +++ b/src/shred_network/shred_receiver.zig @@ -20,7 +20,7 @@ const Ping = sig.gossip.Ping; const Pong = sig.gossip.Pong; const RepairMessage = shred_network.repair_message.RepairMessage; const Slot = sig.core.Slot; -const SocketPipe = sig.net.SocketPipe; +const SocketThread = sig.net.SocketThread; const ExitCondition = sig.sync.ExitCondition; const VariantCounter = sig.prometheus.VariantCounter; @@ -74,42 +74,42 @@ pub const ShredReceiver = struct { const response_sender = try Channel(Packet).create(self.allocator); defer response_sender.destroy(); - const response_sender_pipe = try SocketPipe.initSender( + const response_sender_thread = try SocketThread.spawnSender( self.allocator, self.logger.unscoped(), self.repair_socket, response_sender, exit, ); - defer response_sender_pipe.deinit(self.allocator); + defer response_sender_thread.join(); // Create pipe from repair_socket -> response_receiver. const response_receiver = try Channel(Packet).create(self.allocator); response_receiver.send_hook = &receive_signal.hook; defer response_receiver.destroy(); - const response_receiver_pipe = try SocketPipe.initReceiver( + const response_receiver_thread = try SocketThread.spawnReceiver( self.allocator, self.logger.unscoped(), self.repair_socket, response_receiver, exit, ); - defer response_receiver_pipe.deinit(self.allocator); + defer response_receiver_thread.join(); // Create pipe from turbine_socket -> turbine_receiver. const turbine_receiver = try Channel(Packet).create(self.allocator); turbine_receiver.send_hook = &receive_signal.hook; defer turbine_receiver.destroy(); - const turbine_receiver_pipe = try SocketPipe.initReceiver( + const turbine_receiver_thread = try SocketThread.spawnReceiver( self.allocator, self.logger.unscoped(), self.turbine_socket, turbine_receiver, .{ .unordered = self.exit }, ); - defer turbine_receiver_pipe.deinit(self.allocator); + defer turbine_receiver_thread.join(); // Run thread to handle incoming packets. Stops when exit is set. while (true) { diff --git a/src/shred_network/shred_retransmitter.zig b/src/shred_network/shred_retransmitter.zig index 18b93b148..45027778e 100644 --- a/src/shred_network/shred_retransmitter.zig +++ b/src/shred_network/shred_retransmitter.zig @@ -113,14 +113,14 @@ pub fn runShredRetransmitter(params: struct { )); } - const pipe = try socket_utils.SocketPipe.initSender( + const sender_thread = try socket_utils.SocketThread.spawnSender( params.allocator, params.logger, retransmit_socket, &retransmit_to_socket_channel, .{ .unordered = params.exit }, ); - defer pipe.deinit(params.allocator); + defer sender_thread.join(); for (thread_handles.items) |thread| thread.join(); } diff --git a/src/sync/channel.zig b/src/sync/channel.zig index 49fad8169..632f0e747 100644 --- a/src/sync/channel.zig +++ b/src/sync/channel.zig @@ -451,45 +451,51 @@ test "spsc" { } test "send-hook" { - const Intercept = struct { - collect: *std.ArrayList(u64), - hook: Channel(u64).SendHook = .{ .before_send = beforeSend }, + const Counter = struct { + count: usize = 0, + hook: Channel(u64).SendHook = .{ .after_send = afterSend }, - fn beforeSend(hook: *Channel(u64).SendHook, _: *Channel(u64), value: u64) bool { + fn afterSend(hook: *Channel(u64).SendHook, channel: *Channel(u64)) void { const self: *@This() = @alignCast(@fieldParentPtr("hook", hook)); - self.collect.append(value) catch @panic("oom"); - return false; + self.count += 1; + std.debug.assert(channel.len() == self.count); } }; - const Reaction = struct { - collect: *std.ArrayList(u64), + const Consumer = struct { + collected: std.ArrayList(u64), hook: Channel(u64).SendHook = .{ .after_send = afterSend }, fn afterSend(hook: *Channel(u64).SendHook, channel: *Channel(u64)) void { const self: *@This() = @alignCast(@fieldParentPtr("hook", hook)); const value = channel.tryReceive() orelse @panic("empty channel after send"); - self.collect.append(value) catch @panic("oom"); + self.collected.append(value) catch @panic("oom"); } }; - var ch = try Channel(u64).init(std.testing.allocator); + const to_send = 100; + const allocator = std.testing.allocator; + + var ch = try Channel(u64).init(allocator); defer ch.deinit(); - var list = std.ArrayList(u64).init(std.testing.allocator); - defer list.deinit(); + // Check that afterSend counts sent channel items. + var counter = Counter{}; + ch.send_hook = &counter.hook; - inline for (.{ Intercept, Reaction }) |HookImpl| { - const to_send = 100; - list.clearRetainingCapacity(); + for (0..to_send) |i| try ch.send(i); + try expect(ch.len() == to_send); + try expect(counter.count == to_send); - var hook_impl = HookImpl{ .collect = &list }; - ch.send_hook = &hook_impl.hook; + // Check that afterSend consumes any sent values. + var consumer = Consumer{ .collected = std.ArrayList(u64).init(allocator) }; + ch.send_hook = &consumer.hook; + defer consumer.collected.deinit(); - for (0..to_send) |i| try ch.send(i); - try expect(ch.isEmpty()); - try expect(list.items.len == to_send); - } + while (ch.tryReceive()) |_| {} // drain before starting. + for (0..to_send) |i| try ch.send(i); + try expect(ch.isEmpty()); + try expect(consumer.collected.items.len == to_send); } test "mpmc" {