Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NIOAsyncChannel doesn't flush all its writes if connection is closed #2757

Open
adam-fowler opened this issue Jun 27, 2024 · 5 comments
Open

Comments

@adam-fowler
Copy link
Contributor

Expected behavior

That all writes via NIOAsyncChannel.output.write are flushed when a connection is closed.

Actual behavior

If I write a large buffer of over 1MB and exit the closure sent to NIOAsyncChannel.executeThenClose immediately the buffer doesn't get flushed.

Here is the related Slack conversation https://swift-open-source.slack.com/archives/C9MMT6VGB/p1713405447642699

Steps to reproduce

  1. Create HTTP1 server using NIOAsyncChannel, which returns 1MB for every request
  2. curl -H "Connection: close" localhost:8888 --output -

SwiftNIO version/commit hash

2.67.0

System & version information

swift-driver version: 1.90.11.1 Apple Swift version 5.10 (swiftlang-5.10.0.13 clang-1500.3.9.4)
Target: arm64-apple-macosx14.0

@FranzBusch
Copy link
Member

Was outbound half closure enabled on the channel?

@Lukasa
Copy link
Contributor

Lukasa commented Jun 27, 2024

It should always be enabled.

@adam-fowler
Copy link
Contributor Author

Was outbound half closure enabled on the channel?

Yes, I've always used half closure

@FranzBusch
Copy link
Member

@adam-fowler Could you do me a favour and add a reproducer test for this? I think I know what's going on and the solution to this might be more interesting. My gut feel is that we somewhere buffer the writes since writer.write isn't waiting for the write to make it out we can get the close before all writes have been flushed. This means that some buffered writes might get dropped.

Now solving this might be a little complicated. The easy way would be to just attach a promise to each write but that's going to very costly. @Lukasa and I have discussed a "soft-close" mode a few times which tells the pipeline to flush everything out and then close when possible.

@adam-fowler
Copy link
Contributor Author

adam-fowler commented Jul 1, 2024

Making me do half your work 🤣 . Here's a snippet

import NIOPosix
import NIOCore
import NIOHTTP1

/// Sendable server response that doesn't use ``IOData``
public typealias SendableHTTPServerResponsePart = HTTPPart<HTTPResponseHead, ByteBuffer>

/// Channel to convert HTTPServerResponsePart to the Sendable type HBHTTPServerResponsePart
final class HTTPSendableResponseChannelHandler: ChannelOutboundHandler, RemovableChannelHandler {
    typealias OutboundIn = SendableHTTPServerResponsePart
    typealias OutboundOut = HTTPServerResponsePart

    func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
        let part = unwrapOutboundIn(data)
        switch part {
        case .head(let head):
            context.write(self.wrapOutboundOut(.head(head)), promise: promise)
        case .body(let buffer):
            context.write(self.wrapOutboundOut(.body(.byteBuffer(buffer))), promise: promise)
        case .end:
            context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: promise)
        }
    }
}

@available(macOS 14, *)
func server() async throws {
    let asyncChannel = try await ServerBootstrap(group: MultiThreadedEventLoopGroup.singleton)
        // Specify backlog and enable SO_REUSEADDR for the server itself
        .serverChannelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1)
        .childChannelOption(ChannelOptions.allowRemoteHalfClosure, value: true)
        .bind(host: "127.0.0.1", port: 8888, serverBackPressureStrategy: nil) { channel in
            return channel.eventLoop.makeCompletedFuture {
                try channel.pipeline.syncOperations.configureHTTPServerPipeline()
                try channel.pipeline.syncOperations.addHandler(HTTPSendableResponseChannelHandler())
                return try NIOAsyncChannel<HTTPServerRequestPart, SendableHTTPServerResponsePart>(
                    wrappingChannelSynchronously: channel,
                    configuration: .init()
                )
            }
        }
    await withDiscardingTaskGroup { group in
        do {
            try await asyncChannel.executeThenClose { inbound in
                for try await childChannel in inbound {
                    group.addTask {
                        try? await childChannel.executeThenClose { inbound, outbound in
                            for try await part in inbound {
                                if case .end = part {
                                    let buffer = ByteBuffer(repeating: 0, count: 1000000)
                                    try await outbound.write(.head(.init(version: .http1_1, status: .ok, headers: ["content-length": "1000000"])))
                                    try await outbound.write(.body(buffer))
                                    try await outbound.write(.end(nil))
                                    break
                                }
                            }
                        }
                    }
                }
            }
        } catch {
            print("ERROR: Waiting on child channel: \(error)")
        }
    }

}

if #available(macOS 14, *) {
    try await server()
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants