From fc1d1cd136ffb547cf92ddb26a7bfc6d26919ee0 Mon Sep 17 00:00:00 2001 From: Manolo van Ee Date: Thu, 9 Jan 2025 16:05:36 +0100 Subject: [PATCH 1/2] Add NIOAsyncWriter test for suspending a buffered yield when writer is finished --- .../AsyncSequences/NIOAsyncWriterTests.swift | 45 +++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/Tests/NIOCoreTests/AsyncSequences/NIOAsyncWriterTests.swift b/Tests/NIOCoreTests/AsyncSequences/NIOAsyncWriterTests.swift index 31c680b8bf..4f15ac9af9 100644 --- a/Tests/NIOCoreTests/AsyncSequences/NIOAsyncWriterTests.swift +++ b/Tests/NIOCoreTests/AsyncSequences/NIOAsyncWriterTests.swift @@ -606,6 +606,51 @@ final class NIOAsyncWriterTests: XCTestCase { self.assert(suspendCallCount: 1, yieldCallCount: 1, terminateCallCount: 1) } + func testSuspendingBufferedYield_whenWriterFinished() async throws { + self.sink.setWritability(to: false) + + let bothSuspended = expectation(description: "suspended on both yields") + let suspendedAgain = ConditionLock(value: false) + self.delegate.didSuspendHandler = { + if self.delegate.didSuspendCallCount == 2 { + bothSuspended.fulfill() + } else if self.delegate.didSuspendCallCount > 2 { + suspendedAgain.lock() + suspendedAgain.unlock(withValue: true) + } + } + + self.delegate.didYieldHandler = { _ in + if self.delegate.didYieldCallCount == 1 { + // Delay this yield until the other yield is suspended again. + // FIXME: This will never finish if no other thread is handling the other yield. + suspendedAgain.lock(whenValue: true) + suspendedAgain.unlock() + } + } + + let task1 = Task { [writer] in + try await writer!.yield("message1") + } + let task2 = Task { [writer] in + try await writer!.yield("message2") + } + + await fulfillment(of: [bothSuspended], timeout: 1) + self.writer.finish() + + self.assert(suspendCallCount: 2, yieldCallCount: 0, terminateCallCount: 0) + + // We have to become writable again to unbuffer the yields + // The first call to didYield will pause, so that the other yield will be suspended again. + self.sink.setWritability(to: true) + + await XCTAssertNoThrow(try await task1.value) + await XCTAssertNoThrow(try await task2.value) + + self.assert(suspendCallCount: 3, yieldCallCount: 2, terminateCallCount: 1) + } + func testWriterFinish_whenFinished() { // This tests just checks that finishing again is a no-op self.writer.finish() From f15a87b7b29d97a5f9fdc5ba2fa0a0a23d89f73c Mon Sep 17 00:00:00 2001 From: Manolo van Ee Date: Thu, 9 Jan 2025 16:19:24 +0100 Subject: [PATCH 2/2] Fix suspending of buffered yield when NIOAsyncWriter is finished --- .../AsyncSequences/NIOAsyncWriter.swift | 35 +++++++++++++++++-- 1 file changed, 33 insertions(+), 2 deletions(-) diff --git a/Sources/NIOCore/AsyncSequences/NIOAsyncWriter.swift b/Sources/NIOCore/AsyncSequences/NIOAsyncWriter.swift index badc34f967..f232d518f9 100644 --- a/Sources/NIOCore/AsyncSequences/NIOAsyncWriter.swift +++ b/Sources/NIOCore/AsyncSequences/NIOAsyncWriter.swift @@ -1173,7 +1173,38 @@ extension NIOAsyncWriter { delegate: delegate ) - case .initial, .finished, .writerFinished: + case .writerFinished( + let isWritable, + let inDelegateOutcall, + var suspendedYields, + let cancelledYields, + let bufferedYieldIDs, + let delegate, + let error + ): + // We have a suspended yield at this point that hasn't been cancelled yet. + // It was buffered before we became finished, so we still have to deliver it. + // We need to store the yield now. + + self._state = .modifying + + let suspendedYield = SuspendedYield( + yieldID: yieldID, + continuation: continuation + ) + suspendedYields.append(suspendedYield) + + self._state = .writerFinished( + isWritable: isWritable, + inDelegateOutcall: inDelegateOutcall, + suspendedYields: suspendedYields, + cancelledYields: cancelledYields, + bufferedYieldIDs: bufferedYieldIDs, + delegate: delegate, + error: error + ) + + case .initial, .finished: preconditionFailure("This should have already been handled by `yield()`") case .modifying: @@ -1501,7 +1532,7 @@ extension NIOAsyncWriter { self._state = .writerFinished( isWritable: isWritable, - inDelegateOutcall: inDelegateOutcall, + inDelegateOutcall: false, suspendedYields: .init(), cancelledYields: cancelledYields, bufferedYieldIDs: bufferedYieldIDs,