Skip to content

Commit

Permalink
rename SocketPIpe to SocketThread
Browse files Browse the repository at this point in the history
- `init{Sender/Receiver}` -> `spawn{Sender/Receiver}`
- `deinit(allocator)` -> `join()`

also fixes Channel "send hook" tests after beforeSend was removed
  • Loading branch information
kprotty committed Jan 17, 2025
1 parent 3281d26 commit 3a56b91
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 85 deletions.
24 changes: 12 additions & 12 deletions src/gossip/service.zig
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ const PingAndSocketAddr = sig.gossip.ping_pong.PingAndSocketAddr;
const ServiceManager = sig.utils.service_manager.ServiceManager;
const Duration = sig.time.Duration;
const ExitCondition = sig.sync.ExitCondition;
const SocketPipe = sig.net.SocketPipe;
const SocketThread = sig.net.SocketThread;

const endpointToString = sig.net.endpointToString;
const globalRegistry = sig.prometheus.globalRegistry;
Expand Down Expand Up @@ -104,7 +104,7 @@ const GOSSIP_PRNG_SEED = 19;

/// The flow of data goes as follows:
///
/// `SocketPipe` ->
/// `SocketThread.initReceiver` ->
/// - reads from the gossip socket
/// - puts the new packet onto `packet_incoming_channel`
/// - repeat until exit
Expand All @@ -120,14 +120,14 @@ const GOSSIP_PRNG_SEED = 19;
/// - processes the verified message it has received
/// - depending on the type of message received, it may put something onto `packet_outgoing_channel`
///
/// `SocketPipe` ->
/// `SocketThread.initSender` ->
/// - receives from `packet_outgoing_channel`
/// - sends the outgoing packet onto the gossip socket
/// - repeats while `exit` is false and `packet_outgoing_channel`
/// - when `SocketPipe` sees that `exit` has become `true`, it will begin waiting on
/// - when `SocketThread` sees that `exit` has become `true`, it will begin waiting on
/// the previous thing in the chain to close, that usually being `processMessages`.
/// this ensures that `processMessages` doesn't add new items to `packet_outgoing_channel`
/// after the `SocketPipe` thread exits.
/// after the `SocketThread` exits.
///
pub const GossipService = struct {
/// used for general allocation purposes
Expand All @@ -148,10 +148,10 @@ pub const GossipService = struct {
/// Indicates if the gossip service is closed.
closed: bool,

/// Piping between the gossip_socket.
/// Piping data between the gossip_socket and the channels.
/// Set to null until start() is called as they represent threads.
incoming_pipe: ?*SocketPipe = null,
outgoing_pipe: ?*SocketPipe = null,
incoming_socket_thread: ?*SocketThread = null,
outgoing_socket_thread: ?*SocketThread = null,

/// communication between threads
packet_incoming_channel: *Channel(Packet),
Expand Down Expand Up @@ -334,8 +334,8 @@ pub const GossipService = struct {
self.service_manager.deinit();

// Wait for pipes to shutdown if any
if (self.incoming_pipe) |pipe| pipe.deinit(self.allocator);
if (self.outgoing_pipe) |pipe| pipe.deinit(self.allocator);
if (self.incoming_socket_thread) |thread| thread.join();
if (self.outgoing_socket_thread) |thread| thread.join();

// assert the channels are empty in order to make sure no data was lost.
// everything should be cleaned up when the thread-pool joins.
Expand Down Expand Up @@ -403,7 +403,7 @@ pub const GossipService = struct {
},
};

self.incoming_pipe = try SocketPipe.initReceiver(
self.incoming_socket_thread = try SocketThread.spawnReceiver(
self.allocator,
self.logger.unscoped(),
self.gossip_socket,
Expand Down Expand Up @@ -434,7 +434,7 @@ pub const GossipService = struct {
exit_condition.ordered.exit_index += 1;
}

self.outgoing_pipe = try SocketPipe.initSender(
self.outgoing_socket_thread = try SocketThread.spawnSender(
self.allocator,
self.logger.unscoped(),
self.gossip_socket,
Expand Down
2 changes: 1 addition & 1 deletion src/net/lib.zig
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ pub const quic_client = @import("quic_client.zig");
pub const IpAddr = net.IpAddr;
pub const SocketAddr = net.SocketAddr;
pub const Packet = packet.Packet;
pub const SocketPipe = socket_utils.SocketPipe;
pub const SocketThread = socket_utils.SocketThread;

pub const requestIpEcho = echo.requestIpEcho;
pub const enablePortReuse = net.enablePortReuse;
Expand Down
81 changes: 45 additions & 36 deletions src/net/socket_utils.zig
Original file line number Diff line number Diff line change
Expand Up @@ -18,51 +18,55 @@ pub const PACKETS_PER_BATCH: usize = 64;
// The identifier for the scoped logger used in this file.
const LOG_SCOPE: []const u8 = "socket_utils";

pub const SocketPipe = struct {
pub const SocketThread = struct {
allocator: Allocator,
handle: std.Thread,

const Self = @This();

pub fn initSender(
pub fn spawnSender(
allocator: Allocator,
logger: Logger,
socket: UdpSocket,
outgoing_channel: *Channel(Packet),
exit: ExitCondition,
) !*Self {
// TODO(king): store event-loop data in SocketPipe (hence, heap-alloc)..
const self = try allocator.create(Self);
errdefer allocator.destroy(self);

self.handle = try std.Thread.spawn(
.{},
runSender,
.{ logger, socket, outgoing_channel, exit },
);

return self;
) !*SocketThread {
return spawn(allocator, logger, socket, outgoing_channel, exit, runSender);
}

pub fn initReceiver(
pub fn spawnReceiver(
allocator: Allocator,
logger: Logger,
socket: UdpSocket,
incoming_channel: *Channel(Packet),
exit: ExitCondition,
) !*Self {
// TODO(king): store event-loop data in SocketPipe (hence, heap-alloc)..
const self = try allocator.create(Self);
) !*SocketThread {
return spawn(allocator, logger, socket, incoming_channel, exit, runReceiver);
}

fn spawn(
allocator: Allocator,
logger: Logger,
socket: UdpSocket,
channel: *Channel(Packet),
exit: ExitCondition,
comptime runFn: anytype,
) !*SocketThread {
// TODO(king): store event-loop data in SocketThread (hence, heap-alloc)..
const self = try allocator.create(SocketThread);
errdefer allocator.destroy(self);

self.handle = try std.Thread.spawn(
.{},
runReceiver,
.{ logger, socket, incoming_channel, exit },
);
self.* = .{
.allocator = allocator,
.handle = try std.Thread.spawn(.{}, runFn, .{ logger, socket, channel, exit }),
};

return self;
}

pub fn join(self: *SocketThread) void {
self.handle.join();
self.allocator.destroy(self);
}

fn runReceiver(
logger_: Logger,
socket_: UdpSocket,
Expand Down Expand Up @@ -123,11 +127,6 @@ pub const SocketPipe = struct {
}
}
}

pub fn deinit(self: *Self, allocator: Allocator) void {
self.handle.join();
allocator.destroy(self);
}
};

pub const BenchmarkPacketProcessing = struct {
Expand Down Expand Up @@ -164,9 +163,14 @@ pub const BenchmarkPacketProcessing = struct {
var incoming_channel = try Channel(Packet).init(allocator);
defer incoming_channel.deinit();

const incoming_pipe = try SocketPipe
.init(allocator, .receiver, .noop, socket, &incoming_channel, exit_condition);
defer incoming_pipe.deinit(allocator);
const incoming_pipe = try SocketThread.spawnReceiver(
allocator,
.noop,
socket,
&incoming_channel,
exit_condition,
);
defer incoming_pipe.join();

// Start outgoing

Expand Down Expand Up @@ -200,9 +204,14 @@ pub const BenchmarkPacketProcessing = struct {
var outgoing_channel = try Channel(Packet).init(allocator);
defer outgoing_channel.deinit();

const outgoing_pipe = try SocketPipe
.init(allocator, .sender, .noop, socket, &outgoing_channel, exit_condition);
defer outgoing_pipe.deinit(allocator);
const outgoing_pipe = try SocketThread.spawnSender(
allocator,
.noop,
socket,
&outgoing_channel,
exit_condition,
);
defer outgoing_pipe.join();

const outgoing_handle = try std.Thread.spawn(
.{},
Expand Down
11 changes: 5 additions & 6 deletions src/shred_network/repair_service.zig
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ const Registry = sig.prometheus.Registry;
const RwMux = sig.sync.RwMux;
const SignedGossipData = sig.gossip.SignedGossipData;
const SocketAddr = sig.net.SocketAddr;
const SocketPipe = sig.net.SocketPipe;
const SocketThread = sig.net.SocketThread;
const Channel = sig.sync.Channel;
const Slot = sig.core.Slot;

Expand Down Expand Up @@ -252,7 +252,7 @@ pub const RepairRequester = struct {
logger: ScopedLogger(@typeName(Self)),
random: Random,
keypair: *const KeyPair,
sender_pipe: *SocketPipe,
sender_thread: *SocketThread,
sender_channel: *Channel(Packet),
metrics: Metrics,

Expand All @@ -277,28 +277,27 @@ pub const RepairRequester = struct {
const channel = try Channel(Packet).create(allocator);
errdefer channel.destroy();

const pipe = try SocketPipe.initSender(
const thread = try SocketThread.spawnSender(
allocator,
logger,
udp_send_socket,
channel,
.{ .unordered = exit },
);
errdefer pipe.deinit(allocator);

return .{
.allocator = allocator,
.logger = logger.withScope(@typeName(Self)),
.random = random,
.keypair = keypair,
.sender_pipe = pipe,
.sender_thread = thread,
.sender_channel = channel,
.metrics = try registry.initStruct(Metrics),
};
}

pub fn deinit(self: Self) void {
self.sender_pipe.deinit(self.allocator);
self.sender_thread.join();
self.sender_channel.destroy();
}

Expand Down
14 changes: 7 additions & 7 deletions src/shred_network/shred_receiver.zig
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ const Ping = sig.gossip.Ping;
const Pong = sig.gossip.Pong;
const RepairMessage = shred_network.repair_message.RepairMessage;
const Slot = sig.core.Slot;
const SocketPipe = sig.net.SocketPipe;
const SocketThread = sig.net.SocketThread;
const ExitCondition = sig.sync.ExitCondition;
const VariantCounter = sig.prometheus.VariantCounter;

Expand Down Expand Up @@ -74,42 +74,42 @@ pub const ShredReceiver = struct {
const response_sender = try Channel(Packet).create(self.allocator);
defer response_sender.destroy();

const response_sender_pipe = try SocketPipe.initSender(
const response_sender_thread = try SocketThread.spawnSender(
self.allocator,
self.logger.unscoped(),
self.repair_socket,
response_sender,
exit,
);
defer response_sender_pipe.deinit(self.allocator);
defer response_sender_thread.join();

// Create pipe from repair_socket -> response_receiver.
const response_receiver = try Channel(Packet).create(self.allocator);
response_receiver.send_hook = &receive_signal.hook;
defer response_receiver.destroy();

const response_receiver_pipe = try SocketPipe.initReceiver(
const response_receiver_thread = try SocketThread.spawnReceiver(
self.allocator,
self.logger.unscoped(),
self.repair_socket,
response_receiver,
exit,
);
defer response_receiver_pipe.deinit(self.allocator);
defer response_receiver_thread.join();

// Create pipe from turbine_socket -> turbine_receiver.
const turbine_receiver = try Channel(Packet).create(self.allocator);
turbine_receiver.send_hook = &receive_signal.hook;
defer turbine_receiver.destroy();

const turbine_receiver_pipe = try SocketPipe.initReceiver(
const turbine_receiver_thread = try SocketThread.spawnReceiver(
self.allocator,
self.logger.unscoped(),
self.turbine_socket,
turbine_receiver,
.{ .unordered = self.exit },
);
defer turbine_receiver_pipe.deinit(self.allocator);
defer turbine_receiver_thread.join();

// Run thread to handle incoming packets. Stops when exit is set.
while (true) {
Expand Down
4 changes: 2 additions & 2 deletions src/shred_network/shred_retransmitter.zig
Original file line number Diff line number Diff line change
Expand Up @@ -113,14 +113,14 @@ pub fn runShredRetransmitter(params: struct {
));
}

const pipe = try socket_utils.SocketPipe.initSender(
const sender_thread = try socket_utils.SocketThread.spawnSender(
params.allocator,
params.logger,
retransmit_socket,
&retransmit_to_socket_channel,
.{ .unordered = params.exit },
);
defer pipe.deinit(params.allocator);
defer sender_thread.join();

for (thread_handles.items) |thread| thread.join();
}
Expand Down
Loading

0 comments on commit 3a56b91

Please sign in to comment.