Skip to content

Commit

Permalink
Avoid abstraction overheads on isolated ELF (#3055)
Browse files Browse the repository at this point in the history
Motivation:

Unfortunately, closure composition is really expensive: closures that
capture closures always heap allocate. To make ELF.Isolated perform
well, then, we need to inline the method bodies directly.

Modifications:

- Add some isolated functions into ELF for enqueueing callbacks.
- Inline the implementation of the ELF methods into the isolated view.

Result:

Allocation counts match between isolated/nonisolated.
  • Loading branch information
Lukasa authored Jan 13, 2025
1 parent 588523e commit e19202c
Show file tree
Hide file tree
Showing 7 changed files with 146 additions and 51 deletions.
2 changes: 1 addition & 1 deletion IntegrationTests/tests_04_performance/Thresholds/5.10.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
"encode_1000_ws_frames_new_buffer_with_space": 3050,
"encode_1000_ws_frames_new_buffer_with_space_with_mask": 5050,
"execute_hop_10000_tasks": 0,
"future_assume_isolated_lots_of_callbacks": 92050,
"future_assume_isolated_lots_of_callbacks": 74050,
"future_erase_result": 4050,
"future_lots_of_callbacks": 74050,
"get_100000_headers_canonical_form": 700050,
Expand Down
2 changes: 1 addition & 1 deletion IntegrationTests/tests_04_performance/Thresholds/5.9.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
"encode_1000_ws_frames_new_buffer_with_space": 3050,
"encode_1000_ws_frames_new_buffer_with_space_with_mask": 5050,
"execute_hop_10000_tasks": 0,
"future_assume_isolated_lots_of_callbacks": 91050,
"future_assume_isolated_lots_of_callbacks": 74050,
"future_erase_result": 4050,
"future_lots_of_callbacks": 74050,
"get_100000_headers_canonical_form": 700050,
Expand Down
2 changes: 1 addition & 1 deletion IntegrationTests/tests_04_performance/Thresholds/6.0.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
"encode_1000_ws_frames_new_buffer_with_space": 3050,
"encode_1000_ws_frames_new_buffer_with_space_with_mask": 5050,
"execute_hop_10000_tasks": 0,
"future_assume_isolated_lots_of_callbacks": 92050,
"future_assume_isolated_lots_of_callbacks": 74050,
"future_erase_result": 4050,
"future_lots_of_callbacks": 74050,
"get_100000_headers_canonical_form": 700050,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
"encode_1000_ws_frames_new_buffer_with_space": 3050,
"encode_1000_ws_frames_new_buffer_with_space_with_mask": 5050,
"execute_hop_10000_tasks": 0,
"future_assume_isolated_lots_of_callbacks": 92050,
"future_assume_isolated_lots_of_callbacks": 74050,
"future_erase_result": 4050,
"future_lots_of_callbacks": 74050,
"get_100000_headers_canonical_form": 700050,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
"encode_1000_ws_frames_new_buffer_with_space": 3050,
"encode_1000_ws_frames_new_buffer_with_space_with_mask": 5050,
"execute_hop_10000_tasks": 0,
"future_assume_isolated_lots_of_callbacks": 92050,
"future_assume_isolated_lots_of_callbacks": 74050,
"future_erase_result": 4050,
"future_lots_of_callbacks": 74050,
"get_100000_headers_canonical_form": 500050,
Expand Down
167 changes: 126 additions & 41 deletions Sources/NIOCore/EventLoopFuture+AssumeIsolated.swift
Original file line number Diff line number Diff line change
Expand Up @@ -213,10 +213,25 @@ extension EventLoopFuture {
public func flatMap<NewValue: Sendable>(
_ callback: @escaping (Value) -> EventLoopFuture<NewValue>
) -> EventLoopFuture<NewValue>.Isolated {
let unsafeTransfer = UnsafeTransfer(callback)
return self._wrapped.flatMap {
unsafeTransfer.wrappedValue($0)
}.assumeIsolatedUnsafeUnchecked()
let next = EventLoopPromise<NewValue>.makeUnleakablePromise(eventLoop: self._wrapped.eventLoop)
let base = self._wrapped
base._whenCompleteIsolated {
switch base._value! {
case .success(let t):
let futureU = callback(t)
if futureU.eventLoop.inEventLoop {
return futureU._addCallback {
next._setValue(value: futureU._value!)
}
} else {
futureU.cascade(to: next)
return CallbackList()
}
case .failure(let error):
return next._setValue(value: .failure(error))
}
}
return next.futureResult.assumeIsolatedUnsafeUnchecked()
}

/// When the current `EventLoopFuture<Value>` is fulfilled, run the provided callback, which
Expand All @@ -238,10 +253,22 @@ extension EventLoopFuture {
public func flatMapThrowing<NewValue>(
_ callback: @escaping (Value) throws -> NewValue
) -> EventLoopFuture<NewValue>.Isolated {
let unsafeTransfer = UnsafeTransfer(callback)
return self._wrapped.flatMapThrowing {
try unsafeTransfer.wrappedValue($0)
}.assumeIsolatedUnsafeUnchecked()
let next = EventLoopPromise<NewValue>.makeUnleakablePromise(eventLoop: self._wrapped.eventLoop)
let base = self._wrapped
base._whenCompleteIsolated {
switch base._value! {
case .success(let t):
do {
let r = try callback(t)
return next._setValue(value: .success(r))
} catch {
return next._setValue(value: .failure(error))
}
case .failure(let e):
return next._setValue(value: .failure(e))
}
}
return next.futureResult.assumeIsolatedUnsafeUnchecked()
}

/// When the current `EventLoopFuture<Value>` is in an error state, run the provided callback, which
Expand All @@ -263,10 +290,22 @@ extension EventLoopFuture {
public func flatMapErrorThrowing(
_ callback: @escaping (Error) throws -> Value
) -> EventLoopFuture<Value>.Isolated {
let unsafeTransfer = UnsafeTransfer(callback)
return self._wrapped.flatMapErrorThrowing {
try unsafeTransfer.wrappedValue($0)
}.assumeIsolatedUnsafeUnchecked()
let next = EventLoopPromise<Value>.makeUnleakablePromise(eventLoop: self._wrapped.eventLoop)
let base = self._wrapped
base._whenCompleteIsolated {
switch base._value! {
case .success(let t):
return next._setValue(value: .success(t))
case .failure(let e):
do {
let r = try callback(e)
return next._setValue(value: .success(r))
} catch {
return next._setValue(value: .failure(error))
}
}
}
return next.futureResult.assumeIsolatedUnsafeUnchecked()
}

/// When the current `EventLoopFuture<Value>` is fulfilled, run the provided callback, which
Expand Down Expand Up @@ -300,10 +339,17 @@ extension EventLoopFuture {
public func map<NewValue>(
_ callback: @escaping (Value) -> (NewValue)
) -> EventLoopFuture<NewValue>.Isolated {
let unsafeTransfer = UnsafeTransfer(callback)
return self._wrapped.map {
unsafeTransfer.wrappedValue($0)
}.assumeIsolatedUnsafeUnchecked()
if NewValue.self == Value.self && NewValue.self == Void.self {
self.whenSuccess(callback as! (Value) -> Void)
return self as! EventLoopFuture<NewValue>.Isolated
} else {
let next = EventLoopPromise<NewValue>.makeUnleakablePromise(eventLoop: self._wrapped.eventLoop)
let base = self._wrapped
base._whenCompleteIsolated {
next._setValue(value: base._value!.map(callback))
}
return next.futureResult.assumeIsolatedUnsafeUnchecked()
}
}

/// When the current `EventLoopFuture<Value>` is in an error state, run the provided callback, which
Expand All @@ -325,10 +371,25 @@ extension EventLoopFuture {
public func flatMapError(
_ callback: @escaping (Error) -> EventLoopFuture<Value>
) -> EventLoopFuture<Value>.Isolated where Value: Sendable {
let unsafeTransfer = UnsafeTransfer(callback)
return self._wrapped.flatMapError {
unsafeTransfer.wrappedValue($0)
}.assumeIsolatedUnsafeUnchecked()
let next = EventLoopPromise<Value>.makeUnleakablePromise(eventLoop: self._wrapped.eventLoop)
let base = self._wrapped
base._whenCompleteIsolated {
switch base._value! {
case .success(let t):
return next._setValue(value: .success(t))
case .failure(let e):
let t = callback(e)
if t.eventLoop.inEventLoop {
return t._addCallback {
next._setValue(value: t._value!)
}
} else {
t.cascade(to: next)
return CallbackList()
}
}
}
return next.futureResult.assumeIsolatedUnsafeUnchecked()
}

/// When the current `EventLoopFuture<Value>` is fulfilled, run the provided callback, which
Expand All @@ -349,10 +410,22 @@ extension EventLoopFuture {
public func flatMapResult<NewValue, SomeError: Error>(
_ body: @escaping (Value) -> Result<NewValue, SomeError>
) -> EventLoopFuture<NewValue>.Isolated {
let unsafeTransfer = UnsafeTransfer(body)
return self._wrapped.flatMapResult {
unsafeTransfer.wrappedValue($0)
}.assumeIsolatedUnsafeUnchecked()
let next = EventLoopPromise<NewValue>.makeUnleakablePromise(eventLoop: self._wrapped.eventLoop)
let base = self._wrapped
base._whenCompleteIsolated {
switch base._value! {
case .success(let value):
switch body(value) {
case .success(let newValue):
return next._setValue(value: .success(newValue))
case .failure(let error):
return next._setValue(value: .failure(error))
}
case .failure(let e):
return next._setValue(value: .failure(e))
}
}
return next.futureResult.assumeIsolatedUnsafeUnchecked()
}

/// When the current `EventLoopFuture<Value>` is in an error state, run the provided callback, which
Expand All @@ -372,10 +445,17 @@ extension EventLoopFuture {
public func recover(
_ callback: @escaping (Error) -> Value
) -> EventLoopFuture<Value>.Isolated {
let unsafeTransfer = UnsafeTransfer(callback)
return self._wrapped.recover {
unsafeTransfer.wrappedValue($0)
}.assumeIsolatedUnsafeUnchecked()
let next = EventLoopPromise<Value>.makeUnleakablePromise(eventLoop: self._wrapped.eventLoop)
let base = self._wrapped
base._whenCompleteIsolated {
switch base._value! {
case .success(let t):
return next._setValue(value: .success(t))
case .failure(let e):
return next._setValue(value: .success(callback(e)))
}
}
return next.futureResult.assumeIsolatedUnsafeUnchecked()
}

/// Adds an observer callback to this `EventLoopFuture` that is called when the
Expand All @@ -391,9 +471,12 @@ extension EventLoopFuture {
@inlinable
@available(*, noasync)
public func whenSuccess(_ callback: @escaping (Value) -> Void) {
let unsafeTransfer = UnsafeTransfer(callback)
return self._wrapped.whenSuccess {
unsafeTransfer.wrappedValue($0)
let base = self._wrapped
base._whenCompleteIsolated {
if case .success(let t) = base._value! {
callback(t)
}
return CallbackList()
}
}

Expand All @@ -410,9 +493,12 @@ extension EventLoopFuture {
@inlinable
@available(*, noasync)
public func whenFailure(_ callback: @escaping (Error) -> Void) {
let unsafeTransfer = UnsafeTransfer(callback)
return self._wrapped.whenFailure {
unsafeTransfer.wrappedValue($0)
let base = self._wrapped
base._whenCompleteIsolated {
if case .failure(let e) = base._value! {
callback(e)
}
return CallbackList()
}
}

Expand All @@ -426,9 +512,10 @@ extension EventLoopFuture {
public func whenComplete(
_ callback: @escaping (Result<Value, Error>) -> Void
) {
let unsafeTransfer = UnsafeTransfer(callback)
return self._wrapped.whenComplete {
unsafeTransfer.wrappedValue($0)
let base = self._wrapped
base._whenCompleteIsolated {
callback(base._value!)
return CallbackList()
}
}

Expand All @@ -443,10 +530,8 @@ extension EventLoopFuture {
public func always(
_ callback: @escaping (Result<Value, Error>) -> Void
) -> EventLoopFuture<Value>.Isolated {
let unsafeTransfer = UnsafeTransfer(callback)
return self._wrapped.always {
unsafeTransfer.wrappedValue($0)
}.assumeIsolatedUnsafeUnchecked()
self.whenComplete { result in callback(result) }
return self
}

/// Unwrap an `EventLoopFuture` where its type parameter is an `Optional`.
Expand Down
20 changes: 15 additions & 5 deletions Sources/NIOCore/EventLoopFuture.swift
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ import Dispatch
/// In particular, note that _run() here continues to obtain and execute lists of callbacks until it completes.
/// This eliminates recursion when processing `flatMap()` chains.
@usableFromInline
internal struct CallbackList: Sendable {
internal struct CallbackList {
@usableFromInline
internal typealias Element = @Sendable () -> CallbackList
internal typealias Element = () -> CallbackList
@usableFromInline
internal var firstCallback: Optional<Element>
@usableFromInline
Expand Down Expand Up @@ -115,6 +115,9 @@ internal struct CallbackList: Sendable {
}
}

@available(*, unavailable)
extension CallbackList: Sendable {}

/// Internal error for operations that return results that were not replaced
@usableFromInline
internal struct OperationPlaceholderError: Error {
Expand Down Expand Up @@ -779,7 +782,7 @@ extension EventLoopFuture {

/// Add a callback. If there's already a value, invoke it and return the resulting list of new callback functions.
@inlinable
internal func _addCallback(_ callback: @escaping @Sendable () -> CallbackList) -> CallbackList {
internal func _addCallback(_ callback: @escaping () -> CallbackList) -> CallbackList {
self.eventLoop.assertInEventLoop()
if self._value == nil {
self._callbacks.append(callback)
Expand All @@ -800,14 +803,21 @@ extension EventLoopFuture {
@inlinable
internal func _internalWhenComplete(_ callback: @escaping @Sendable () -> CallbackList) {
if self.eventLoop.inEventLoop {
self._addCallback(callback)._run()
self._whenCompleteIsolated(callback)
} else {
self.eventLoop.execute {
self._addCallback(callback)._run()
self._whenCompleteIsolated(callback)
}
}
}

/// Add a callback. If there's already a value, run as much of the chain as we can.
@inlinable
internal func _whenCompleteIsolated(_ callback: @escaping () -> CallbackList) {
self.eventLoop.assertInEventLoop()
self._addCallback(callback)._run()
}

/// Adds an observer callback to this `EventLoopFuture` that is called when the
/// `EventLoopFuture` has a success result.
///
Expand Down

0 comments on commit e19202c

Please sign in to comment.