From 3bb07617452554f2238f1ef7255910004b772eab Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Sun, 12 Jan 2025 13:46:39 +0000 Subject: [PATCH 1/9] Add flush and writeAndFlush to NIOAsyncChannelOutboundWriter --- .../NIOCore/AsyncChannel/AsyncChannel.swift | 2 + .../AsyncChannel/AsyncChannelHandler.swift | 50 ++++++++++++++----- .../AsyncChannelOutboundWriter.swift | 43 ++++++++++++++-- 3 files changed, 79 insertions(+), 16 deletions(-) diff --git a/Sources/NIOCore/AsyncChannel/AsyncChannel.swift b/Sources/NIOCore/AsyncChannel/AsyncChannel.swift index 105862576d..d43d488b6b 100644 --- a/Sources/NIOCore/AsyncChannel/AsyncChannel.swift +++ b/Sources/NIOCore/AsyncChannel/AsyncChannel.swift @@ -293,6 +293,7 @@ public struct NIOAsyncChannel: Sendable { result = try await body(self._inbound, self._outbound) } catch let bodyError { do { + try await self._outbound.flush() self._outbound.finish() try await self.channel.close().get() throw bodyError @@ -301,6 +302,7 @@ public struct NIOAsyncChannel: Sendable { } } + try await self._outbound.flush() self._outbound.finish() // We ignore errors from close, since all we care about is that the channel has been closed // at this point. diff --git a/Sources/NIOCore/AsyncChannel/AsyncChannelHandler.swift b/Sources/NIOCore/AsyncChannel/AsyncChannelHandler.swift index 149c0ff5ea..c7504b11b2 100644 --- a/Sources/NIOCore/AsyncChannel/AsyncChannelHandler.swift +++ b/Sources/NIOCore/AsyncChannel/AsyncChannelHandler.swift @@ -14,6 +14,16 @@ import DequeModule +@usableFromInline +enum OutboundAction: Sendable where OutboundOut: Sendable { + /// Write value + case write(OutboundOut) + /// Write value and flush pipeline + case writeAndFlush(OutboundOut, EventLoopPromise) + /// flush pipeline + case flush(EventLoopPromise) +} + /// A ``ChannelHandler`` that is used to transform the inbound portion of a NIO /// ``Channel`` into an asynchronous sequence that supports back-pressure. It's also used /// to write the outbound portion of a NIO ``Channel`` from Swift Concurrency with back-pressure @@ -77,7 +87,7 @@ internal final class NIOAsyncChannelHandler, NIOAsyncChannelHandlerWriterDelegate > @@ -372,7 +382,10 @@ struct NIOAsyncChannelHandlerProducerDelegate: @unchecked Sendable, NIOAsyncSequ @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) @usableFromInline -struct NIOAsyncChannelHandlerWriterDelegate: NIOAsyncWriterSinkDelegate, @unchecked Sendable { +struct NIOAsyncChannelHandlerWriterDelegate: NIOAsyncWriterSinkDelegate, @unchecked Sendable { + @usableFromInline + typealias Element = OutboundAction + @usableFromInline let eventLoop: EventLoop @@ -386,7 +399,7 @@ struct NIOAsyncChannelHandlerWriterDelegate: NIOAsyncWriterSi let _didTerminate: ((any Error)?) -> Void @inlinable - init(handler: NIOAsyncChannelHandler) { + init(handler: NIOAsyncChannelHandler) { self.eventLoop = handler.eventLoop self._didYieldContentsOf = handler._didYield(sequence:) self._didYield = handler._didYield(element:) @@ -430,7 +443,7 @@ struct NIOAsyncChannelHandlerWriterDelegate: NIOAsyncWriterSi @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) extension NIOAsyncChannelHandler { @inlinable - func _didYield(sequence: Deque) { + func _didYield(sequence: Deque>) { // This is always called from an async context, so we must loop-hop. // Because we always loop-hop, we're always at the top of a stack frame. As this // is the only source of writes for us, and as this channel handler doesn't implement @@ -447,7 +460,7 @@ extension NIOAsyncChannelHandler { } @inlinable - func _didYield(element: OutboundOut) { + func _didYield(element: OutboundAction) { // This is always called from an async context, so we must loop-hop. // Because we always loop-hop, we're always at the top of a stack frame. As this // is the only source of writes for us, and as this channel handler doesn't implement @@ -475,18 +488,31 @@ extension NIOAsyncChannelHandler { } @inlinable - func _doOutboundWrites(context: ChannelHandlerContext, writes: Deque) { + func _doOutboundWrites(context: ChannelHandlerContext, writes: Deque>) { for write in writes { - context.write(Self.wrapOutboundOut(write), promise: nil) + switch write { + case .write(let value): + context.write(Self.wrapOutboundOut(value), promise: nil) + case .flush(let promise): + context.flush() + promise.succeed() + case .writeAndFlush(let value, let promise): + context.writeAndFlush(Self.wrapOutboundOut(value), promise: promise) + } } - - context.flush() } @inlinable - func _doOutboundWrite(context: ChannelHandlerContext, write: OutboundOut) { - context.write(Self.wrapOutboundOut(write), promise: nil) - context.flush() + func _doOutboundWrite(context: ChannelHandlerContext, write: OutboundAction) { + switch write { + case .write(let value): + context.write(Self.wrapOutboundOut(value), promise: nil) + case .flush(let promise): + context.flush() + promise.succeed() + case .writeAndFlush(let value, let promise): + context.writeAndFlush(Self.wrapOutboundOut(value), promise: promise) + } } } diff --git a/Sources/NIOCore/AsyncChannel/AsyncChannelOutboundWriter.swift b/Sources/NIOCore/AsyncChannel/AsyncChannelOutboundWriter.swift index dfdeeb0fda..31cec9c0ca 100644 --- a/Sources/NIOCore/AsyncChannel/AsyncChannelOutboundWriter.swift +++ b/Sources/NIOCore/AsyncChannel/AsyncChannelOutboundWriter.swift @@ -21,7 +21,7 @@ public struct NIOAsyncChannelOutboundWriter: Sendable { @usableFromInline typealias _Writer = NIOAsyncWriter< - OutboundOut, + OutboundAction, NIOAsyncChannelHandlerWriterDelegate > @@ -72,6 +72,9 @@ public struct NIOAsyncChannelOutboundWriter: Sendable { @usableFromInline internal let _backing: Backing + @usableFromInline + internal let eventLoop: EventLoop? + /// Creates a new ``NIOAsyncChannelOutboundWriter`` backed by a ``NIOAsyncChannelOutboundWriter/TestSink``. /// This is mostly useful for testing purposes where one wants to observe the written data. @inlinable @@ -93,7 +96,7 @@ public struct NIOAsyncChannelOutboundWriter: Sendable { ) throws { eventLoop.preconditionInEventLoop() let writer = _Writer.makeWriter( - elementType: OutboundOut.self, + elementType: OutboundAction.self, isWritable: true, finishOnDeinit: closeOnDeinit, delegate: .init(handler: handler) @@ -103,11 +106,13 @@ public struct NIOAsyncChannelOutboundWriter: Sendable { handler.writer = writer.writer self._backing = .writer(writer.writer) + self.eventLoop = eventLoop } @inlinable init(continuation: AsyncStream.Continuation) { self._backing = .asyncStream(continuation) + self.eventLoop = nil } /// Send a write into the ``ChannelPipeline`` and flush it right away. @@ -119,7 +124,26 @@ public struct NIOAsyncChannelOutboundWriter: Sendable { case .asyncStream(let continuation): continuation.yield(data) case .writer(let writer): - try await writer.yield(data) + try await writer.yield(.write(data)) + } + } + + /// Send a write into the ``ChannelPipeline`` and flush it right away. + /// + /// This method suspends if the underlying channel is not writable and will resume once the it becomes writable again. + @inlinable + public func writeAndFlush(_ data: OutboundOut) async throws { + switch self._backing { + case .asyncStream(let continuation): + continuation.yield(data) + case .writer(let writer): + if let eventLoop { + let promise = eventLoop.makePromise(of: Void.self) + try await writer.yield(.writeAndFlush(data, promise)) + try await promise.futureResult.get() + } else { + try await writer.yield(.write(data)) + } } } @@ -134,7 +158,7 @@ public struct NIOAsyncChannelOutboundWriter: Sendable { continuation.yield(data) } case .writer(let writer): - try await writer.yield(contentsOf: sequence) + try await writer.yield(contentsOf: sequence.dropLast().map { .write($0) }) } } @@ -151,6 +175,17 @@ public struct NIOAsyncChannelOutboundWriter: Sendable { } } + @inlinable + public func flush() async throws { + if case .writer(let writer) = self._backing, + let eventLoop + { + let promise = eventLoop.makePromise(of: Void.self) + try await writer.yield(.flush(promise)) + try await promise.futureResult.get() + } + } + /// Finishes the writer. /// /// This might trigger a half closure if the ``NIOAsyncChannel`` was configured to support it. From f59a045762e0f8d31e6824b266114dcedddd3733 Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Sun, 12 Jan 2025 20:55:32 +0000 Subject: [PATCH 2/9] Still parse flush outboundActions when channel is closed --- .../NIOCore/AsyncChannel/AsyncChannel.swift | 1 + .../AsyncChannel/AsyncChannelHandler.swift | 36 ++++++++++++------- 2 files changed, 24 insertions(+), 13 deletions(-) diff --git a/Sources/NIOCore/AsyncChannel/AsyncChannel.swift b/Sources/NIOCore/AsyncChannel/AsyncChannel.swift index d43d488b6b..427b112402 100644 --- a/Sources/NIOCore/AsyncChannel/AsyncChannel.swift +++ b/Sources/NIOCore/AsyncChannel/AsyncChannel.swift @@ -302,6 +302,7 @@ public struct NIOAsyncChannel: Sendable { } } + // ensure everything written to outbound is written to channel try await self._outbound.flush() self._outbound.finish() // We ignore errors from close, since all we care about is that the channel has been closed diff --git a/Sources/NIOCore/AsyncChannel/AsyncChannelHandler.swift b/Sources/NIOCore/AsyncChannel/AsyncChannelHandler.swift index c7504b11b2..4fc693081d 100644 --- a/Sources/NIOCore/AsyncChannel/AsyncChannelHandler.swift +++ b/Sources/NIOCore/AsyncChannel/AsyncChannelHandler.swift @@ -20,7 +20,7 @@ enum OutboundAction: Sendable where OutboundOut: Sendable { case write(OutboundOut) /// Write value and flush pipeline case writeAndFlush(OutboundOut, EventLoopPromise) - /// flush pipeline + /// flush writes to writer case flush(EventLoopPromise) } @@ -451,10 +451,6 @@ extension NIOAsyncChannelHandler { // awkward re-entrancy protections NIO usually requires, and can safely just do an iterative // write. self.eventLoop.preconditionInEventLoop() - guard let context = self.context else { - // Already removed from the channel by now, we can stop. - return - } self._doOutboundWrites(context: context, writes: sequence) } @@ -468,10 +464,6 @@ extension NIOAsyncChannelHandler { // awkward re-entrancy protections NIO usually requires, and can safely just do an iterative // write. self.eventLoop.preconditionInEventLoop() - guard let context = self.context else { - // Already removed from the channel by now, we can stop. - return - } self._doOutboundWrite(context: context, write: element) } @@ -488,29 +480,47 @@ extension NIOAsyncChannelHandler { } @inlinable - func _doOutboundWrites(context: ChannelHandlerContext, writes: Deque>) { + func _doOutboundWrites(context: ChannelHandlerContext?, writes: Deque>) { for write in writes { switch write { case .write(let value): + guard let context = self.context else { + // Already removed from the channel by now, we can stop. + return + } context.write(Self.wrapOutboundOut(value), promise: nil) - case .flush(let promise): context.flush() + case .flush(let promise): promise.succeed() case .writeAndFlush(let value, let promise): + guard let context = self.context else { + // Already removed from the channel by now, we can stop. + promise.succeed() + return + } context.writeAndFlush(Self.wrapOutboundOut(value), promise: promise) } } } @inlinable - func _doOutboundWrite(context: ChannelHandlerContext, write: OutboundAction) { + func _doOutboundWrite(context: ChannelHandlerContext?, write: OutboundAction) { switch write { case .write(let value): + guard let context = self.context else { + // Already removed from the channel by now, we can stop. + return + } context.write(Self.wrapOutboundOut(value), promise: nil) - case .flush(let promise): context.flush() + case .flush(let promise): promise.succeed() case .writeAndFlush(let value, let promise): + guard let context = self.context else { + // Already removed from the channel by now, we can stop. + promise.succeed() + return + } context.writeAndFlush(Self.wrapOutboundOut(value), promise: promise) } } From f69545688be51f3cdc0bfe8294e6652849720b6d Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Sun, 12 Jan 2025 20:55:46 +0000 Subject: [PATCH 3/9] Add withPromise --- .../AsyncChannelOutboundWriter.swift | 26 ++++++++++++++----- 1 file changed, 20 insertions(+), 6 deletions(-) diff --git a/Sources/NIOCore/AsyncChannel/AsyncChannelOutboundWriter.swift b/Sources/NIOCore/AsyncChannel/AsyncChannelOutboundWriter.swift index 31cec9c0ca..aa9a5815de 100644 --- a/Sources/NIOCore/AsyncChannel/AsyncChannelOutboundWriter.swift +++ b/Sources/NIOCore/AsyncChannel/AsyncChannelOutboundWriter.swift @@ -138,9 +138,9 @@ public struct NIOAsyncChannelOutboundWriter: Sendable { continuation.yield(data) case .writer(let writer): if let eventLoop { - let promise = eventLoop.makePromise(of: Void.self) - try await writer.yield(.writeAndFlush(data, promise)) - try await promise.futureResult.get() + try await self.withPromise(eventLoop: eventLoop) { promise in + try await writer.yield(.writeAndFlush(data, promise)) + } } else { try await writer.yield(.write(data)) } @@ -180,9 +180,9 @@ public struct NIOAsyncChannelOutboundWriter: Sendable { if case .writer(let writer) = self._backing, let eventLoop { - let promise = eventLoop.makePromise(of: Void.self) - try await writer.yield(.flush(promise)) - try await promise.futureResult.get() + try await self.withPromise(eventLoop: eventLoop) { promise in + try await writer.yield(.flush(promise)) + } } } @@ -197,6 +197,20 @@ public struct NIOAsyncChannelOutboundWriter: Sendable { writer.finish() } } + + @usableFromInline + func withPromise( + eventLoop: EventLoop, + _ process: (EventLoopPromise) async throws -> Void + ) async throws { + let promise = eventLoop.makePromise(of: Void.self) + do { + try await process(promise) + try await promise.futureResult.get() + } catch { + promise.fail(error) + } + } } @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) From 53575b5dda1da3268e1c4e9eb3165b253e5c4caf Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Sun, 12 Jan 2025 20:56:06 +0000 Subject: [PATCH 4/9] Add testAllWritesAreWritten --- .../AsyncChannel/AsyncChannelTests.swift | 55 +++++++++++++++++++ 1 file changed, 55 insertions(+) diff --git a/Tests/NIOCoreTests/AsyncChannel/AsyncChannelTests.swift b/Tests/NIOCoreTests/AsyncChannel/AsyncChannelTests.swift index 3d21c2c7a9..43523c8bd2 100644 --- a/Tests/NIOCoreTests/AsyncChannel/AsyncChannelTests.swift +++ b/Tests/NIOCoreTests/AsyncChannel/AsyncChannelTests.swift @@ -180,6 +180,34 @@ final class AsyncChannelTests: XCTestCase { } } + func testAllWritesAreWritten() async throws { + let channel = NIOAsyncTestingChannel() + let promise = channel.testingEventLoop.makePromise(of: Void.self) + let wrapped = try await channel.testingEventLoop.executeInContext { + try channel.pipeline.syncOperations.addHandler(DelayingChannelHandler(promise: promise)) + return try NIOAsyncChannel(wrappingChannelSynchronously: channel) + } + try await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + try await wrapped.executeThenClose { inbound, outbound in + try await outbound.write("hello") + try await outbound.writeAndFlush("world") + } + } + group.addTask { + let firstRead = try await channel.waitForOutboundWrite(as: String.self) + let secondRead = try await channel.waitForOutboundWrite(as: String.self) + + XCTAssertEqual(firstRead, "hello") + XCTAssertEqual(secondRead, "world") + } + + try await Task.sleep(for: .milliseconds(50)) + promise.succeed() + await channel.testingEventLoop.advanceTime(by: .seconds(1)) + } + } + func testErrorsArePropagatedButAfterReads() async throws { let channel = NIOAsyncTestingChannel() let wrapped = try await channel.testingEventLoop.executeInContext { @@ -429,6 +457,17 @@ private final class CloseRecorder: ChannelOutboundHandler, @unchecked Sendable { } } +struct UnsafeContext: @unchecked Sendable { + private let _context: ChannelHandlerContext + var context: ChannelHandlerContext { + self._context.eventLoop.preconditionInEventLoop() + return _context + } + init(_ context: ChannelHandlerContext) { + self._context = context + } +} + private final class CloseSuppressor: ChannelOutboundHandler, RemovableChannelHandler, Sendable { typealias OutboundIn = Any @@ -438,6 +477,22 @@ private final class CloseSuppressor: ChannelOutboundHandler, RemovableChannelHan } } +private final class DelayingChannelHandler: ChannelOutboundHandler, RemovableChannelHandler, Sendable { + typealias OutboundIn = Any + typealias OutboundOut = Any + let waitPromise: EventLoopPromise + + init(promise: EventLoopPromise) { + self.waitPromise = promise + } + func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise?) { + let unsafeTransfer = UnsafeTransfer((context: context, data: data)) + self.waitPromise.futureResult.whenComplete { _ in + unsafeTransfer.wrappedValue.context.writeAndFlush(unsafeTransfer.wrappedValue.data, promise: promise) + } + } +} + @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) extension NIOAsyncTestingChannel { fileprivate func closeIgnoringSuppression() async throws { From 8604b9e76e4fabc7c36882ebcd6fe1952470ba22 Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Sun, 12 Jan 2025 21:15:50 +0000 Subject: [PATCH 5/9] Update comments --- Sources/NIOCore/AsyncChannel/AsyncChannelOutboundWriter.swift | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/Sources/NIOCore/AsyncChannel/AsyncChannelOutboundWriter.swift b/Sources/NIOCore/AsyncChannel/AsyncChannelOutboundWriter.swift index aa9a5815de..19544b5380 100644 --- a/Sources/NIOCore/AsyncChannel/AsyncChannelOutboundWriter.swift +++ b/Sources/NIOCore/AsyncChannel/AsyncChannelOutboundWriter.swift @@ -130,7 +130,7 @@ public struct NIOAsyncChannelOutboundWriter: Sendable { /// Send a write into the ``ChannelPipeline`` and flush it right away. /// - /// This method suspends if the underlying channel is not writable and will resume once the it becomes writable again. + /// This method suspends until the write has been written and flushed. @inlinable public func writeAndFlush(_ data: OutboundOut) async throws { switch self._backing { @@ -175,6 +175,7 @@ public struct NIOAsyncChannelOutboundWriter: Sendable { } } + /// Ensure all writes to the writer have been read @inlinable public func flush() async throws { if case .writer(let writer) = self._backing, From d35055f7a0dafba030929cccaad31da87415accb Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Mon, 13 Jan 2025 09:14:57 +0000 Subject: [PATCH 6/9] Remove `flush` from `AsyncChannel.executeAndClose` as user should have flushed pipeline already. --- Sources/NIOCore/AsyncChannel/AsyncChannel.swift | 2 -- 1 file changed, 2 deletions(-) diff --git a/Sources/NIOCore/AsyncChannel/AsyncChannel.swift b/Sources/NIOCore/AsyncChannel/AsyncChannel.swift index 427b112402..6cd2053366 100644 --- a/Sources/NIOCore/AsyncChannel/AsyncChannel.swift +++ b/Sources/NIOCore/AsyncChannel/AsyncChannel.swift @@ -302,8 +302,6 @@ public struct NIOAsyncChannel: Sendable { } } - // ensure everything written to outbound is written to channel - try await self._outbound.flush() self._outbound.finish() // We ignore errors from close, since all we care about is that the channel has been closed // at this point. From ce2b5665263473df8eca610e39c3e8c4ce67c43f Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Mon, 13 Jan 2025 09:31:28 +0000 Subject: [PATCH 7/9] Add writeAndFlush(contentsOf:) --- .../AsyncChannel/AsyncChannelHandler.swift | 33 ++++++++++++++----- .../AsyncChannelOutboundWriter.swift | 24 +++++++++++++- .../AsyncChannel/AsyncChannelTests.swift | 31 ++++++++++++++++- 3 files changed, 77 insertions(+), 11 deletions(-) diff --git a/Sources/NIOCore/AsyncChannel/AsyncChannelHandler.swift b/Sources/NIOCore/AsyncChannel/AsyncChannelHandler.swift index 4fc693081d..83e48cd229 100644 --- a/Sources/NIOCore/AsyncChannel/AsyncChannelHandler.swift +++ b/Sources/NIOCore/AsyncChannel/AsyncChannelHandler.swift @@ -481,9 +481,10 @@ extension NIOAsyncChannelHandler { @inlinable func _doOutboundWrites(context: ChannelHandlerContext?, writes: Deque>) { - for write in writes { + // write everything but the last item + for write in writes.dropLast() { switch write { - case .write(let value): + case .write(let value), .writeAndFlush(let value, _): guard let context = self.context else { // Already removed from the channel by now, we can stop. return @@ -492,15 +493,29 @@ extension NIOAsyncChannelHandler { context.flush() case .flush(let promise): promise.succeed() - case .writeAndFlush(let value, let promise): - guard let context = self.context else { - // Already removed from the channel by now, we can stop. - promise.succeed() - return - } - context.writeAndFlush(Self.wrapOutboundOut(value), promise: promise) } } + // write last item + switch writes.last { + case .write(let value): + guard let context = self.context else { + // Already removed from the channel by now, we can stop. + return + } + context.write(Self.wrapOutboundOut(value), promise: nil) + context.flush() + case .flush(let promise): + promise.succeed() + case .writeAndFlush(let value, let promise): + guard let context = self.context else { + // Already removed from the channel by now, we can stop. + promise.succeed() + return + } + context.writeAndFlush(Self.wrapOutboundOut(value), promise: promise) + case .none: + break + } } @inlinable diff --git a/Sources/NIOCore/AsyncChannel/AsyncChannelOutboundWriter.swift b/Sources/NIOCore/AsyncChannel/AsyncChannelOutboundWriter.swift index 19544b5380..03ec9d34ff 100644 --- a/Sources/NIOCore/AsyncChannel/AsyncChannelOutboundWriter.swift +++ b/Sources/NIOCore/AsyncChannel/AsyncChannelOutboundWriter.swift @@ -158,7 +158,29 @@ public struct NIOAsyncChannelOutboundWriter: Sendable { continuation.yield(data) } case .writer(let writer): - try await writer.yield(contentsOf: sequence.dropLast().map { .write($0) }) + try await writer.yield(contentsOf: sequence.map { .write($0) }) + } + } + + /// Send a sequence of writes into the ``ChannelPipeline`` and flush them right away. + /// + /// This method suspends if the underlying channel is not writable and will resume once the it becomes writable again. + @inlinable + public func writeAndFlush(contentsOf sequence: Writes) async throws + where Writes.Element == OutboundOut { + switch self._backing { + case .asyncStream(let continuation): + for data in sequence { + continuation.yield(data) + } + case .writer(let writer): + if let eventLoop { + try await withPromise(eventLoop: eventLoop) { promise in + try await writer.yield(contentsOf: sequence.map { .writeAndFlush($0, promise) }) + } + } else { + try await writer.yield(contentsOf: sequence.map { .write($0) }) + } } } diff --git a/Tests/NIOCoreTests/AsyncChannel/AsyncChannelTests.swift b/Tests/NIOCoreTests/AsyncChannel/AsyncChannelTests.swift index 43523c8bd2..581d5a4548 100644 --- a/Tests/NIOCoreTests/AsyncChannel/AsyncChannelTests.swift +++ b/Tests/NIOCoreTests/AsyncChannel/AsyncChannelTests.swift @@ -202,9 +202,38 @@ final class AsyncChannelTests: XCTestCase { XCTAssertEqual(secondRead, "world") } + // wait 50 milliseconds to ensure we are inside write and flush then + // trigger pipeline flush by succeeding promise in DelayingChannelHandler + try await Task.sleep(for: .milliseconds(50)) + promise.succeed() + } + } + + func testAllWritesInSequenceAreWritten() async throws { + let channel = NIOAsyncTestingChannel() + let promise = channel.testingEventLoop.makePromise(of: Void.self) + let wrapped = try await channel.testingEventLoop.executeInContext { + try channel.pipeline.syncOperations.addHandler(DelayingChannelHandler(promise: promise)) + return try NIOAsyncChannel(wrappingChannelSynchronously: channel) + } + try await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + try await wrapped.executeThenClose { inbound, outbound in + try await outbound.writeAndFlush(contentsOf: ["hello", "world"]) + } + } + group.addTask { + let firstRead = try await channel.waitForOutboundWrite(as: String.self) + let secondRead = try await channel.waitForOutboundWrite(as: String.self) + + XCTAssertEqual(firstRead, "hello") + XCTAssertEqual(secondRead, "world") + } + + // wait 50 milliseconds to ensure we are inside write and flush then + // trigger pipeline flush by succeeding promise in DelayingChannelHandler try await Task.sleep(for: .milliseconds(50)) promise.succeed() - await channel.testingEventLoop.advanceTime(by: .seconds(1)) } } From 294098df7253873d6de2bc7bea06c94ff8731b38 Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Mon, 13 Jan 2025 09:33:05 +0000 Subject: [PATCH 8/9] Remove flush from executeThenClose error handling --- Sources/NIOCore/AsyncChannel/AsyncChannel.swift | 1 - 1 file changed, 1 deletion(-) diff --git a/Sources/NIOCore/AsyncChannel/AsyncChannel.swift b/Sources/NIOCore/AsyncChannel/AsyncChannel.swift index 6cd2053366..105862576d 100644 --- a/Sources/NIOCore/AsyncChannel/AsyncChannel.swift +++ b/Sources/NIOCore/AsyncChannel/AsyncChannel.swift @@ -293,7 +293,6 @@ public struct NIOAsyncChannel: Sendable { result = try await body(self._inbound, self._outbound) } catch let bodyError { do { - try await self._outbound.flush() self._outbound.finish() try await self.channel.close().get() throw bodyError From 9afbf7709a3ee6dadffb99a1b2f572c01070860a Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Mon, 13 Jan 2025 09:45:40 +0000 Subject: [PATCH 9/9] Move eventLoop as associated value of writer case --- .../AsyncChannelOutboundWriter.swift | 39 ++++++------------- 1 file changed, 12 insertions(+), 27 deletions(-) diff --git a/Sources/NIOCore/AsyncChannel/AsyncChannelOutboundWriter.swift b/Sources/NIOCore/AsyncChannel/AsyncChannelOutboundWriter.swift index 03ec9d34ff..604f5eff6f 100644 --- a/Sources/NIOCore/AsyncChannel/AsyncChannelOutboundWriter.swift +++ b/Sources/NIOCore/AsyncChannel/AsyncChannelOutboundWriter.swift @@ -66,15 +66,12 @@ public struct NIOAsyncChannelOutboundWriter: Sendable { @usableFromInline enum Backing: Sendable { case asyncStream(AsyncStream.Continuation) - case writer(_Writer) + case writer(_Writer, EventLoop) } @usableFromInline internal let _backing: Backing - @usableFromInline - internal let eventLoop: EventLoop? - /// Creates a new ``NIOAsyncChannelOutboundWriter`` backed by a ``NIOAsyncChannelOutboundWriter/TestSink``. /// This is mostly useful for testing purposes where one wants to observe the written data. @inlinable @@ -105,14 +102,12 @@ public struct NIOAsyncChannelOutboundWriter: Sendable { handler.sink = writer.sink handler.writer = writer.writer - self._backing = .writer(writer.writer) - self.eventLoop = eventLoop + self._backing = .writer(writer.writer, eventLoop) } @inlinable init(continuation: AsyncStream.Continuation) { self._backing = .asyncStream(continuation) - self.eventLoop = nil } /// Send a write into the ``ChannelPipeline`` and flush it right away. @@ -123,7 +118,7 @@ public struct NIOAsyncChannelOutboundWriter: Sendable { switch self._backing { case .asyncStream(let continuation): continuation.yield(data) - case .writer(let writer): + case .writer(let writer, _): try await writer.yield(.write(data)) } } @@ -136,13 +131,9 @@ public struct NIOAsyncChannelOutboundWriter: Sendable { switch self._backing { case .asyncStream(let continuation): continuation.yield(data) - case .writer(let writer): - if let eventLoop { - try await self.withPromise(eventLoop: eventLoop) { promise in - try await writer.yield(.writeAndFlush(data, promise)) - } - } else { - try await writer.yield(.write(data)) + case .writer(let writer, let eventLoop): + try await self.withPromise(eventLoop: eventLoop) { promise in + try await writer.yield(.writeAndFlush(data, promise)) } } } @@ -157,7 +148,7 @@ public struct NIOAsyncChannelOutboundWriter: Sendable { for data in sequence { continuation.yield(data) } - case .writer(let writer): + case .writer(let writer, _): try await writer.yield(contentsOf: sequence.map { .write($0) }) } } @@ -173,13 +164,9 @@ public struct NIOAsyncChannelOutboundWriter: Sendable { for data in sequence { continuation.yield(data) } - case .writer(let writer): - if let eventLoop { - try await withPromise(eventLoop: eventLoop) { promise in - try await writer.yield(contentsOf: sequence.map { .writeAndFlush($0, promise) }) - } - } else { - try await writer.yield(contentsOf: sequence.map { .write($0) }) + case .writer(let writer, let eventLoop): + try await withPromise(eventLoop: eventLoop) { promise in + try await writer.yield(contentsOf: sequence.map { .writeAndFlush($0, promise) }) } } } @@ -200,9 +187,7 @@ public struct NIOAsyncChannelOutboundWriter: Sendable { /// Ensure all writes to the writer have been read @inlinable public func flush() async throws { - if case .writer(let writer) = self._backing, - let eventLoop - { + if case .writer(let writer, let eventLoop) = self._backing { try await self.withPromise(eventLoop: eventLoop) { promise in try await writer.yield(.flush(promise)) } @@ -216,7 +201,7 @@ public struct NIOAsyncChannelOutboundWriter: Sendable { switch self._backing { case .asyncStream(let continuation): continuation.finish() - case .writer(let writer): + case .writer(let writer, _): writer.finish() } }