Skip to content

Commit

Permalink
Don't use file-as-struct
Browse files Browse the repository at this point in the history
  • Loading branch information
InKryption committed Jan 20, 2025
1 parent d2f2e56 commit 945988d
Show file tree
Hide file tree
Showing 2 changed files with 138 additions and 135 deletions.
2 changes: 1 addition & 1 deletion src/rpc/server.zig
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ pub const WorkPool = union(enum) {
.no => noreturn,
},

pub const LinuxIoUring = @import("server/LinuxIoUring.zig");
pub const LinuxIoUring = @import("server/linux_io_uring.zig").LinuxIoUring;

const BasicAASCError =
connection.AcceptHandledError ||
Expand Down
271 changes: 137 additions & 134 deletions src/rpc/server/LinuxIoUring.zig → src/rpc/server/linux_io_uring.zig
Original file line number Diff line number Diff line change
Expand Up @@ -8,155 +8,158 @@ const requests = @import("requests.zig");
const IoUring = std.os.linux.IoUring;
const ServerCtx = sig.rpc.server.Context;

const LinuxIoUring = @This();
io_uring: IoUring,
multishot_accept_submitted: bool,
pending_cqes_count: u8,
pending_cqes_buf: [255]std.os.linux.io_uring_cqe,

pub const can_use: enum { no, yes, check } = switch (builtin.os.getVersionRange()) {
.linux => |version| can_use: {
const min_version: std.SemanticVersion = .{ .major = 6, .minor = 0, .patch = 0 };
const is_at_least = version.isAtLeast(min_version) orelse break :can_use .check;
break :can_use if (is_at_least) .yes else .no;
},
else => .no,
};

pub const InitError = std.posix.MMapError || error{
EntriesZero,
EntriesNotPowerOfTwo,
pub const LinuxIoUring = struct {
io_uring: IoUring,
multishot_accept_submitted: bool,
pending_cqes_count: u8,
pending_cqes_buf: [255]std.os.linux.io_uring_cqe,

pub const can_use: enum { no, yes, check } = switch (builtin.os.getVersionRange()) {
.linux => |version| can_use: {
const min_version: std.SemanticVersion = .{ .major = 6, .minor = 0, .patch = 0 };
const is_at_least = version.isAtLeast(min_version) orelse break :can_use .check;
break :can_use if (is_at_least) .yes else .no;
},
else => .no,
};

ParamsOutsideAccessibleAddressSpace,
ArgumentsInvalid,
ProcessFdQuotaExceeded,
SystemFdQuotaExceeded,
SystemResources,
pub const InitError = std.posix.MMapError || error{
EntriesZero,
EntriesNotPowerOfTwo,

PermissionDenied,
SystemOutdated,
};
ParamsOutsideAccessibleAddressSpace,
ArgumentsInvalid,
ProcessFdQuotaExceeded,
SystemFdQuotaExceeded,
SystemResources,

// NOTE(ink): constructing the return type as `E!?T`, where `E` and `T` are resolved
// separately seems to help ZLS with understanding the types involved better, which is
// why I've done it like that here. If ZLS gets smarter in the future, you could probably
// inline this into a single branch in the return type expression.
const InitErrOrEmpty = if (can_use == .no) error{} else InitError;
const InitResultOrNoreturn = if (can_use == .no) noreturn else LinuxIoUring;
pub fn init() InitErrOrEmpty!?InitResultOrNoreturn {
const need_runtime_check = switch (can_use) {
.no => return null,
.yes => false,
.check => true,
PermissionDenied,
SystemOutdated,
};

var io_uring = IoUring.init(4096, 0) catch |err| return switch (err) {
error.SystemOutdated,
error.PermissionDenied,
=> |e| if (!need_runtime_check) e else return null,
else => |e| e,
};
errdefer io_uring.deinit();
// NOTE(ink): constructing the return type as `E!?T`, where `E` and `T` are resolved
// separately seems to help ZLS with understanding the types involved better, which is
// why I've done it like that here. If ZLS gets smarter in the future, you could probably
// inline this into a single branch in the return type expression.
const InitErrOrEmpty = if (can_use == .no) error{} else InitError;
const InitResultOrNoreturn = if (can_use == .no) noreturn else LinuxIoUring;
pub fn init() InitErrOrEmpty!?InitResultOrNoreturn {
const need_runtime_check = switch (can_use) {
.no => return null,
.yes => false,
.check => true,
};

return .{
.io_uring = io_uring,
.multishot_accept_submitted = false,
.pending_cqes_count = 0,
.pending_cqes_buf = undefined,
};
}
var io_uring = IoUring.init(4096, 0) catch |err| return switch (err) {
error.SystemOutdated,
error.PermissionDenied,
=> |e| if (!need_runtime_check) e else return null,
else => |e| e,
};
errdefer io_uring.deinit();

pub fn deinit(self: *LinuxIoUring) void {
self.io_uring.deinit();
}
return .{
.io_uring = io_uring,
.multishot_accept_submitted = false,
.pending_cqes_count = 0,
.pending_cqes_buf = undefined,
};
}

pub const AcceptAndServeConnectionsError = error{
/// This was the first call, and we failed to prep, queue, and submit the multishot accept.
FailedToAcceptMultishot,
SubmissionQueueFull,
} || IouSubmitError ||
HandleOurCqeError ||
std.mem.Allocator.Error;
pub fn deinit(self: *LinuxIoUring) void {
self.io_uring.deinit();
}

pub fn acceptAndServeConnections(
self: *LinuxIoUring,
server_ctx: *ServerCtx,
) AcceptAndServeConnectionsError!void {
if (!self.multishot_accept_submitted) {
self.multishot_accept_submitted = true;
errdefer self.multishot_accept_submitted = false;
_ = self.io_uring.accept_multishot(
@bitCast(Entry.ACCEPT),
server_ctx.tcp.stream.handle,
null,
null,
std.os.linux.SOCK.CLOEXEC,
) catch |err| return switch (err) {
error.SubmissionQueueFull => {
server_ctx.logger.err().log(
"Under normal circumstances the accept_multishot would be" ++
" the first SQE to be queued, but somehow the queue was full.",
);
pub const AcceptAndServeConnectionsError = error{
/// This was the first call, and we failed to prep, queue, and submit the multishot accept.
FailedToAcceptMultishot,
SubmissionQueueFull,
} || IouSubmitError ||
HandleOurCqeError ||
std.mem.Allocator.Error;

pub fn acceptAndServeConnections(
self: *LinuxIoUring,
server_ctx: *ServerCtx,
) AcceptAndServeConnectionsError!void {
if (!self.multishot_accept_submitted) {
self.multishot_accept_submitted = true;
errdefer self.multishot_accept_submitted = false;
_ = self.io_uring.accept_multishot(
@bitCast(Entry.ACCEPT),
server_ctx.tcp.stream.handle,
null,
null,
std.os.linux.SOCK.CLOEXEC,
) catch |err| return switch (err) {
error.SubmissionQueueFull => {
server_ctx.logger.err().log(
"Under normal circumstances the accept_multishot would be" ++
" the first SQE to be queued, but somehow the queue was full.",
);
return error.FailedToAcceptMultishot;
},
};
if (try self.io_uring.submit() != 1) {
return error.FailedToAcceptMultishot;
},
};
if (try self.io_uring.submit() != 1) {
return error.FailedToAcceptMultishot;
}
return;
}
return;
}

_ = try self.io_uring.submit();
_ = try self.io_uring.submit();

if (self.pending_cqes_count != self.pending_cqes_buf.len) {
self.pending_cqes_count += @intCast(try self.io_uring.copy_cqes(self.pending_cqes_buf[self.pending_cqes_count..], 0));
}
const cqes_pending = self.pending_cqes_buf[0..self.pending_cqes_count];

for (cqes_pending, 0..) |raw_cqe, i| {
self.pending_cqes_count -= 1;
errdefer std.mem.copyForwards(
std.os.linux.io_uring_cqe,
self.pending_cqes_buf[0..self.pending_cqes_count],
self.pending_cqes_buf[i + 1 ..][0..self.pending_cqes_count],
);
const our_cqe = OurCqe.fromCqe(raw_cqe);
consumeOurCqe(self, server_ctx, our_cqe) catch |err| switch (err) {
// connection errors
error.ConnectionAborted,
error.ConnectionRefused,
error.ConnectionResetByPeer,
error.ConnectionTimedOut,

// our http parse errors
error.RequestHeadersTooBig,
error.RequestTargetTooLong,
error.RequestContentTypeUnrecognized,

// std http parse errors
error.UnknownHttpMethod,
error.HttpHeadersInvalid,
error.InvalidContentLength,
error.HttpHeaderContinuationsUnsupported,
error.HttpTransferEncodingUnsupported,
error.HttpConnectionHeaderUnsupported,
error.CompressionUnsupported,
error.MissingFinalNewline,

// splice errors
error.BadFileDescriptors,
error.BadFdOffset,
error.InvalidSplice,
=> |e| {
server_ctx.logger.err().logf("{s}", .{@errorName(e)});
continue;
},
if (self.pending_cqes_count != self.pending_cqes_buf.len) {
const unused = self.pending_cqes_buf[self.pending_cqes_count..];
const new_cqe_count = try self.io_uring.copy_cqes(unused, 0);
self.pending_cqes_count += @intCast(new_cqe_count);
}
const cqes_pending = self.pending_cqes_buf[0..self.pending_cqes_count];

for (cqes_pending, 0..) |raw_cqe, i| {
self.pending_cqes_count -= 1;
errdefer std.mem.copyForwards(
std.os.linux.io_uring_cqe,
self.pending_cqes_buf[0..self.pending_cqes_count],
self.pending_cqes_buf[i + 1 ..][0..self.pending_cqes_count],
);
const our_cqe = OurCqe.fromCqe(raw_cqe);
consumeOurCqe(self, server_ctx, our_cqe) catch |err| switch (err) {
// connection errors
error.ConnectionAborted,
error.ConnectionRefused,
error.ConnectionResetByPeer,
error.ConnectionTimedOut,

// our http parse errors
error.RequestHeadersTooBig,
error.RequestTargetTooLong,
error.RequestContentTypeUnrecognized,

// std http parse errors
error.UnknownHttpMethod,
error.HttpHeadersInvalid,
error.InvalidContentLength,
error.HttpHeaderContinuationsUnsupported,
error.HttpTransferEncodingUnsupported,
error.HttpConnectionHeaderUnsupported,
error.CompressionUnsupported,
error.MissingFinalNewline,

// splice errors
error.BadFileDescriptors,
error.BadFdOffset,
error.InvalidSplice,
=> |e| {
server_ctx.logger.err().logf("{s}", .{@errorName(e)});
continue;
},

error.SubmissionQueueFull => |e| return e,
else => |e| return e,
};
error.SubmissionQueueFull => |e| return e,
else => |e| return e,
};
}
}
}
};

const HandleOurCqeError = error{
SubmissionQueueFull,
Expand Down

0 comments on commit 945988d

Please sign in to comment.