From 72c0b7f85c84de5debd95a39d0314a215c67b3fc Mon Sep 17 00:00:00 2001 From: Trevor Berrange Sanchez Date: Mon, 20 Jan 2025 20:23:20 +0100 Subject: [PATCH] Don't use file-as-struct --- src/rpc/server.zig | 2 +- .../{LinuxIoUring.zig => linux_io_uring.zig} | 271 +++++++++--------- 2 files changed, 138 insertions(+), 135 deletions(-) rename src/rpc/server/{LinuxIoUring.zig => linux_io_uring.zig} (80%) diff --git a/src/rpc/server.zig b/src/rpc/server.zig index 3c66d1e64..17282fd05 100644 --- a/src/rpc/server.zig +++ b/src/rpc/server.zig @@ -100,7 +100,7 @@ pub const WorkPool = union(enum) { .no => noreturn, }, - pub const LinuxIoUring = @import("server/LinuxIoUring.zig"); + pub const LinuxIoUring = @import("server/linux_io_uring.zig").LinuxIoUring; const BasicAASCError = connection.AcceptHandledError || diff --git a/src/rpc/server/LinuxIoUring.zig b/src/rpc/server/linux_io_uring.zig similarity index 80% rename from src/rpc/server/LinuxIoUring.zig rename to src/rpc/server/linux_io_uring.zig index e4992e09d..4d35529cb 100644 --- a/src/rpc/server/LinuxIoUring.zig +++ b/src/rpc/server/linux_io_uring.zig @@ -8,155 +8,158 @@ const requests = @import("requests.zig"); const IoUring = std.os.linux.IoUring; const ServerCtx = sig.rpc.server.Context; -const LinuxIoUring = @This(); -io_uring: IoUring, -multishot_accept_submitted: bool, -pending_cqes_count: u8, -pending_cqes_buf: [255]std.os.linux.io_uring_cqe, - -pub const can_use: enum { no, yes, check } = switch (builtin.os.getVersionRange()) { - .linux => |version| can_use: { - const min_version: std.SemanticVersion = .{ .major = 6, .minor = 0, .patch = 0 }; - const is_at_least = version.isAtLeast(min_version) orelse break :can_use .check; - break :can_use if (is_at_least) .yes else .no; - }, - else => .no, -}; - -pub const InitError = std.posix.MMapError || error{ - EntriesZero, - EntriesNotPowerOfTwo, +pub const LinuxIoUring = struct { + io_uring: IoUring, + multishot_accept_submitted: bool, + pending_cqes_count: u8, + pending_cqes_buf: [255]std.os.linux.io_uring_cqe, + + pub const can_use: enum { no, yes, check } = switch (builtin.os.getVersionRange()) { + .linux => |version| can_use: { + const min_version: std.SemanticVersion = .{ .major = 6, .minor = 0, .patch = 0 }; + const is_at_least = version.isAtLeast(min_version) orelse break :can_use .check; + break :can_use if (is_at_least) .yes else .no; + }, + else => .no, + }; - ParamsOutsideAccessibleAddressSpace, - ArgumentsInvalid, - ProcessFdQuotaExceeded, - SystemFdQuotaExceeded, - SystemResources, + pub const InitError = std.posix.MMapError || error{ + EntriesZero, + EntriesNotPowerOfTwo, - PermissionDenied, - SystemOutdated, -}; + ParamsOutsideAccessibleAddressSpace, + ArgumentsInvalid, + ProcessFdQuotaExceeded, + SystemFdQuotaExceeded, + SystemResources, -// NOTE(ink): constructing the return type as `E!?T`, where `E` and `T` are resolved -// separately seems to help ZLS with understanding the types involved better, which is -// why I've done it like that here. If ZLS gets smarter in the future, you could probably -// inline this into a single branch in the return type expression. -const InitErrOrEmpty = if (can_use == .no) error{} else InitError; -const InitResultOrNoreturn = if (can_use == .no) noreturn else LinuxIoUring; -pub fn init() InitErrOrEmpty!?InitResultOrNoreturn { - const need_runtime_check = switch (can_use) { - .no => return null, - .yes => false, - .check => true, + PermissionDenied, + SystemOutdated, }; - var io_uring = IoUring.init(4096, 0) catch |err| return switch (err) { - error.SystemOutdated, - error.PermissionDenied, - => |e| if (!need_runtime_check) e else return null, - else => |e| e, - }; - errdefer io_uring.deinit(); + // NOTE(ink): constructing the return type as `E!?T`, where `E` and `T` are resolved + // separately seems to help ZLS with understanding the types involved better, which is + // why I've done it like that here. If ZLS gets smarter in the future, you could probably + // inline this into a single branch in the return type expression. + const InitErrOrEmpty = if (can_use == .no) error{} else InitError; + const InitResultOrNoreturn = if (can_use == .no) noreturn else LinuxIoUring; + pub fn init() InitErrOrEmpty!?InitResultOrNoreturn { + const need_runtime_check = switch (can_use) { + .no => return null, + .yes => false, + .check => true, + }; - return .{ - .io_uring = io_uring, - .multishot_accept_submitted = false, - .pending_cqes_count = 0, - .pending_cqes_buf = undefined, - }; -} + var io_uring = IoUring.init(4096, 0) catch |err| return switch (err) { + error.SystemOutdated, + error.PermissionDenied, + => |e| if (!need_runtime_check) e else return null, + else => |e| e, + }; + errdefer io_uring.deinit(); -pub fn deinit(self: *LinuxIoUring) void { - self.io_uring.deinit(); -} + return .{ + .io_uring = io_uring, + .multishot_accept_submitted = false, + .pending_cqes_count = 0, + .pending_cqes_buf = undefined, + }; + } -pub const AcceptAndServeConnectionsError = error{ - /// This was the first call, and we failed to prep, queue, and submit the multishot accept. - FailedToAcceptMultishot, - SubmissionQueueFull, -} || IouSubmitError || - HandleOurCqeError || - std.mem.Allocator.Error; + pub fn deinit(self: *LinuxIoUring) void { + self.io_uring.deinit(); + } -pub fn acceptAndServeConnections( - self: *LinuxIoUring, - server_ctx: *ServerCtx, -) AcceptAndServeConnectionsError!void { - if (!self.multishot_accept_submitted) { - self.multishot_accept_submitted = true; - errdefer self.multishot_accept_submitted = false; - _ = self.io_uring.accept_multishot( - @bitCast(Entry.ACCEPT), - server_ctx.tcp.stream.handle, - null, - null, - std.os.linux.SOCK.CLOEXEC, - ) catch |err| return switch (err) { - error.SubmissionQueueFull => { - server_ctx.logger.err().log( - "Under normal circumstances the accept_multishot would be" ++ - " the first SQE to be queued, but somehow the queue was full.", - ); + pub const AcceptAndServeConnectionsError = error{ + /// This was the first call, and we failed to prep, queue, and submit the multishot accept. + FailedToAcceptMultishot, + SubmissionQueueFull, + } || IouSubmitError || + HandleOurCqeError || + std.mem.Allocator.Error; + + pub fn acceptAndServeConnections( + self: *LinuxIoUring, + server_ctx: *ServerCtx, + ) AcceptAndServeConnectionsError!void { + if (!self.multishot_accept_submitted) { + self.multishot_accept_submitted = true; + errdefer self.multishot_accept_submitted = false; + _ = self.io_uring.accept_multishot( + @bitCast(Entry.ACCEPT), + server_ctx.tcp.stream.handle, + null, + null, + std.os.linux.SOCK.CLOEXEC, + ) catch |err| return switch (err) { + error.SubmissionQueueFull => { + server_ctx.logger.err().log( + "Under normal circumstances the accept_multishot would be" ++ + " the first SQE to be queued, but somehow the queue was full.", + ); + return error.FailedToAcceptMultishot; + }, + }; + if (try self.io_uring.submit() != 1) { return error.FailedToAcceptMultishot; - }, - }; - if (try self.io_uring.submit() != 1) { - return error.FailedToAcceptMultishot; + } + return; } - return; - } - _ = try self.io_uring.submit(); + _ = try self.io_uring.submit(); - if (self.pending_cqes_count != self.pending_cqes_buf.len) { - self.pending_cqes_count += @intCast(try self.io_uring.copy_cqes(self.pending_cqes_buf[self.pending_cqes_count..], 0)); - } - const cqes_pending = self.pending_cqes_buf[0..self.pending_cqes_count]; - - for (cqes_pending, 0..) |raw_cqe, i| { - self.pending_cqes_count -= 1; - errdefer std.mem.copyForwards( - std.os.linux.io_uring_cqe, - self.pending_cqes_buf[0..self.pending_cqes_count], - self.pending_cqes_buf[i + 1 ..][0..self.pending_cqes_count], - ); - const our_cqe = OurCqe.fromCqe(raw_cqe); - consumeOurCqe(self, server_ctx, our_cqe) catch |err| switch (err) { - // connection errors - error.ConnectionAborted, - error.ConnectionRefused, - error.ConnectionResetByPeer, - error.ConnectionTimedOut, - - // our http parse errors - error.RequestHeadersTooBig, - error.RequestTargetTooLong, - error.RequestContentTypeUnrecognized, - - // std http parse errors - error.UnknownHttpMethod, - error.HttpHeadersInvalid, - error.InvalidContentLength, - error.HttpHeaderContinuationsUnsupported, - error.HttpTransferEncodingUnsupported, - error.HttpConnectionHeaderUnsupported, - error.CompressionUnsupported, - error.MissingFinalNewline, - - // splice errors - error.BadFileDescriptors, - error.BadFdOffset, - error.InvalidSplice, - => |e| { - server_ctx.logger.err().logf("{s}", .{@errorName(e)}); - continue; - }, + if (self.pending_cqes_count != self.pending_cqes_buf.len) { + const unused = self.pending_cqes_buf[self.pending_cqes_count..]; + const new_cqe_count = try self.io_uring.copy_cqes(unused, 0); + self.pending_cqes_count += @intCast(new_cqe_count); + } + const cqes_pending = self.pending_cqes_buf[0..self.pending_cqes_count]; + + for (cqes_pending, 0..) |raw_cqe, i| { + self.pending_cqes_count -= 1; + errdefer std.mem.copyForwards( + std.os.linux.io_uring_cqe, + self.pending_cqes_buf[0..self.pending_cqes_count], + self.pending_cqes_buf[i + 1 ..][0..self.pending_cqes_count], + ); + const our_cqe = OurCqe.fromCqe(raw_cqe); + consumeOurCqe(self, server_ctx, our_cqe) catch |err| switch (err) { + // connection errors + error.ConnectionAborted, + error.ConnectionRefused, + error.ConnectionResetByPeer, + error.ConnectionTimedOut, + + // our http parse errors + error.RequestHeadersTooBig, + error.RequestTargetTooLong, + error.RequestContentTypeUnrecognized, + + // std http parse errors + error.UnknownHttpMethod, + error.HttpHeadersInvalid, + error.InvalidContentLength, + error.HttpHeaderContinuationsUnsupported, + error.HttpTransferEncodingUnsupported, + error.HttpConnectionHeaderUnsupported, + error.CompressionUnsupported, + error.MissingFinalNewline, + + // splice errors + error.BadFileDescriptors, + error.BadFdOffset, + error.InvalidSplice, + => |e| { + server_ctx.logger.err().logf("{s}", .{@errorName(e)}); + continue; + }, - error.SubmissionQueueFull => |e| return e, - else => |e| return e, - }; + error.SubmissionQueueFull => |e| return e, + else => |e| return e, + }; + } } -} +}; const HandleOurCqeError = error{ SubmissionQueueFull,