Skip to content

Commit

Permalink
Minimise channelReadComplete traffic. (apple#136)
Browse files Browse the repository at this point in the history
Motivation:

Currently swift-nio-http2 is extremely naive with its use of
channelReadComplete calls in HTTP2StreamChannel, firing one
channelReadComplete call per frame read. In high frame load cases this
leads to a lot of excessive channelReadComplete traffic, which can cause
unnecessary time spent in flush calls.

We should try to minimise the amount of time we spend on this
bookkeeping and save the CPU cost.

Modifications:

- Store a linked-list of HTTP2StreamChannels that need to have
channelReadComplete fired on them.
- Fire channelReadComplete based on this linked list.

Result:

Moderate improvement in performance in high-throughput workloads.
  • Loading branch information
Lukasa authored Jun 18, 2019
1 parent 419cd4e commit 06880b0
Show file tree
Hide file tree
Showing 5 changed files with 257 additions and 5 deletions.
20 changes: 15 additions & 5 deletions Sources/NIOHTTP2/HTTP2StreamChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,9 @@ final class HTTP2StreamChannel: Channel, ChannelCore {
/// stream is flushed, at which time we deliver them all. This buffer holds the pending ones.
private var pendingWrites: MarkedCircularBuffer<(HTTP2Frame, EventLoopPromise<Void>?)> = MarkedCircularBuffer(initialCapacity: 8)

/// A list node used to hold stream channels.
internal var streamChannelListNode: StreamChannelListNode = StreamChannelListNode()

public func register0(promise: EventLoopPromise<Void>?) {
fatalError("not implemented \(#function)")
}
Expand Down Expand Up @@ -466,8 +469,6 @@ final class HTTP2StreamChannel: Channel, ChannelCore {
return
}

assert(self.pendingReads.count > 0, "tryToRead called without reads!")

// If we're not active, we will hold on to these reads.
guard self.isActive else {
return
Expand Down Expand Up @@ -538,8 +539,11 @@ internal extension HTTP2StreamChannel {
return
}

self.pendingReads.append(frame)
self.tryToRead()
if self.unsatisfiedRead {
self.pipeline.fireChannelRead(NIOAny(frame))
} else {
self.pendingReads.append(frame)
}
}


Expand All @@ -563,6 +567,9 @@ internal extension HTTP2StreamChannel {
/// - parameters:
/// - reason: The reason received from the network, if any.
func receiveStreamClosed(_ reason: HTTP2ErrorCode?) {
// The stream is closed, we should aim to deliver any read frames we have for it.
self.tryToRead()

if let reason = reason {
let err = NIOHTTP2Errors.StreamClosed(streamID: self.streamID, errorCode: reason)
self.errorEncountered(error: err)
Expand All @@ -588,5 +595,8 @@ internal extension HTTP2StreamChannel {
self.multiplexer.childChannelFlush()
}
}
}

func receiveParentChannelReadComplete() {
self.tryToRead()
}
}
14 changes: 14 additions & 0 deletions Sources/NIOHTTP2/HTTP2StreamMultiplexer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public final class HTTP2StreamMultiplexer: ChannelInboundHandler, ChannelOutboun
private var nextOutboundStreamID: HTTP2StreamID
private var connectionFlowControlManager: InboundWindowManager
private var flushState: FlushState = .notReading
private var didReadChannels: StreamChannelList = StreamChannelList()

public func handlerAdded(context: ChannelHandlerContext) {
// We now need to check that we're on the same event loop as the one we were originally given.
Expand All @@ -44,6 +45,7 @@ public final class HTTP2StreamMultiplexer: ChannelInboundHandler, ChannelOutboun

public func handlerRemoved(context: ChannelHandlerContext) {
self.context = nil
self.didReadChannels.removeAll()
}

public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
Expand All @@ -66,6 +68,9 @@ public final class HTTP2StreamMultiplexer: ChannelInboundHandler, ChannelOutboun

if let channel = streams[streamID] {
channel.receiveInboundFrame(frame)
if !channel.inList {
self.didReadChannels.append(channel)
}
} else if case .headers = frame.payload {
let channel = HTTP2StreamChannel(allocator: self.channel.allocator,
parent: self.channel,
Expand All @@ -75,6 +80,10 @@ public final class HTTP2StreamMultiplexer: ChannelInboundHandler, ChannelOutboun
self.streams[streamID] = channel
channel.configure(initializer: self.inboundStreamStateInitializer, userPromise: nil)
channel.receiveInboundFrame(frame)

if !channel.inList {
self.didReadChannels.append(channel)
}
} else {
// This frame is for a stream we know nothing about. We can't do much about it, so we
// are going to fire an error and drop the frame.
Expand All @@ -84,6 +93,11 @@ public final class HTTP2StreamMultiplexer: ChannelInboundHandler, ChannelOutboun
}

public func channelReadComplete(context: ChannelHandlerContext) {
// Call channelReadComplete on the children until this has been propagated enough.
while let channel = self.didReadChannels.removeFirst() {
channel.receiveParentChannelReadComplete()
}

if case .flushPending = self.flushState {
self.flushState = .notReading
context.flush()
Expand Down
99 changes: 99 additions & 0 deletions Sources/NIOHTTP2/StreamChannelList.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2019 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//


/// A linked list for storing HTTP2StreamChannels.
///
/// Note that while this object *could* conform to `Sequence`, there is minimal value in doing
/// that here, as it's so single-use. If we find ourselves needing to expand on this data type
/// in future we can revisit that idea.
struct StreamChannelList {
private var head: HTTP2StreamChannel?
private var tail: HTTP2StreamChannel?
}

/// A node for objects stored in an intrusive linked list.
///
/// Any object that wishes to be stored in a linked list must embed one of these nodes.
struct StreamChannelListNode {
fileprivate enum ListState {
case inList(next: HTTP2StreamChannel?)
case notInList
}

fileprivate var state: ListState = .notInList

internal init() { }
}


extension StreamChannelList {
/// Append an element to the linked list.
mutating func append(_ element: HTTP2StreamChannel) {
precondition(!element.inList)

guard case .notInList = element.streamChannelListNode.state else {
preconditionFailure("Appended an element already in a list")
}

element.streamChannelListNode.state = .inList(next: nil)

if let tail = self.tail {
tail.streamChannelListNode.state = .inList(next: element)
self.tail = element
} else {
assert(self.head == nil)
self.head = element
self.tail = element
}
}

mutating func removeFirst() -> HTTP2StreamChannel? {
guard let head = self.head else {
assert(self.tail == nil)
return nil
}

guard case .inList(let next) = head.streamChannelListNode.state else {
preconditionFailure("Popped an element not in a list")
}

self.head = next
if self.head == nil {
assert(self.tail === head)
self.tail = nil
}

head.streamChannelListNode = .init()
return head
}

mutating func removeAll() {
while self.removeFirst() != nil { }
}
}


// MARK:- IntrusiveLinkedListElement helpers.
extension HTTP2StreamChannel {
/// Whether this element is currently in a list.
internal var inList: Bool {
switch self.streamChannelListNode.state {
case .inList:
return true
case .notInList:
return false
}
}
}
2 changes: 2 additions & 0 deletions Tests/NIOHTTP2Tests/HTTP2StreamMultiplexerTests+XCTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ extension HTTP2StreamMultiplexerTests {
("testCreatedChildChannelCanBeClosedImmediatelyWhenBaseIsActive", testCreatedChildChannelCanBeClosedImmediatelyWhenBaseIsActive),
("testCreatedChildChannelCanBeClosedBeforeWritingHeadersWhenBaseIsActive", testCreatedChildChannelCanBeClosedBeforeWritingHeadersWhenBaseIsActive),
("testMultiplexerCoalescesFlushCallsDuringChannelRead", testMultiplexerCoalescesFlushCallsDuringChannelRead),
("testMultiplexerDoesntFireReadCompleteForEachFrame", testMultiplexerDoesntFireReadCompleteForEachFrame),
("testMultiplexerCorrectlyTellsAllStreamsAboutReadComplete", testMultiplexerCorrectlyTellsAllStreamsAboutReadComplete),
]
}
}
Expand Down
127 changes: 127 additions & 0 deletions Tests/NIOHTTP2Tests/HTTP2StreamMultiplexerTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,19 @@ final class FlushCounter: ChannelOutboundHandler {
}


final class ReadCompleteCounter: ChannelInboundHandler {
typealias InboundIn = Any
typealias InboundOut = Any

var readCompleteCount = 0

func channelReadComplete(context: ChannelHandlerContext) {
self.readCompleteCount += 1
context.fireChannelReadComplete()
}
}


/// A channel handler that sends a response in response to a HEADERS frame.
final class QuickResponseHandler: ChannelInboundHandler {
typealias InboundIn = HTTP2Frame
Expand Down Expand Up @@ -1497,4 +1510,118 @@ final class HTTP2StreamMultiplexerTests: XCTestCase {
XCTAssertEqual(try self.channel.sentFrames().count, 10)
XCTAssertEqual(flushCounter.flushCount, 1)
}

func testMultiplexerDoesntFireReadCompleteForEachFrame() {
// We need to activate the underlying channel here.
XCTAssertNoThrow(try self.channel.connect(to: SocketAddress(ipAddress: "127.0.0.1", port: 80)).wait())

let frameRecorder = InboundFrameRecorder()
let readCompleteCounter = ReadCompleteCounter()

let multiplexer = HTTP2StreamMultiplexer(mode: .server, channel: self.channel) { (childChannel, _) in
return childChannel.pipeline.addHandler(frameRecorder).flatMap {
childChannel.pipeline.addHandler(readCompleteCounter)
}
}
XCTAssertNoThrow(try self.channel.pipeline.addHandler(multiplexer).wait())

XCTAssertEqual(frameRecorder.receivedFrames.count, 0)
XCTAssertEqual(readCompleteCounter.readCompleteCount, 0)

// Wake up and activate the stream.
let requestHeaders = HPACKHeaders([(":path", "/"), (":method", "GET"), (":authority", "localhost"), (":scheme", "https")])
let requestFrame = HTTP2Frame(streamID: 1, payload: .headers(.init(headers: requestHeaders, endStream: false)))
self.channel.pipeline.fireChannelRead(NIOAny(requestFrame))
self.activateStream(1)
self.channel.embeddedEventLoop.run()

XCTAssertEqual(frameRecorder.receivedFrames.count, 1)
XCTAssertEqual(readCompleteCounter.readCompleteCount, 0)

// Now we're going to send 9 data frames.
var requestData = self.channel.allocator.buffer(capacity: 1024)
requestData.writeBytes("Hello world!".utf8)
let dataFrames = repeatElement(HTTP2Frame(streamID: 1, payload: .data(.init(data: .byteBuffer(requestData), endStream: false))), count: 9)

for frame in dataFrames {
self.channel.pipeline.fireChannelRead(NIOAny(frame))
}

// We should have 10 reads, and zero read completes.
XCTAssertEqual(frameRecorder.receivedFrames.count, 10)
XCTAssertEqual(readCompleteCounter.readCompleteCount, 0)

// Fire read complete on the parent and it'll propagate to the child.
self.channel.pipeline.fireChannelReadComplete()

// We should have 10 reads, and one read complete.
XCTAssertEqual(frameRecorder.receivedFrames.count, 10)
XCTAssertEqual(readCompleteCounter.readCompleteCount, 1)

// If we fire a new read complete on the parent, the child doesn't see it this time, as it received no frames.
self.channel.pipeline.fireChannelReadComplete()
XCTAssertEqual(frameRecorder.receivedFrames.count, 10)
XCTAssertEqual(readCompleteCounter.readCompleteCount, 1)
}

func testMultiplexerCorrectlyTellsAllStreamsAboutReadComplete() {
// We need to activate the underlying channel here.
XCTAssertNoThrow(try self.channel.connect(to: SocketAddress(ipAddress: "127.0.0.1", port: 80)).wait())

// These are deliberately getting inserted to all streams. The test above confirms the single-stream
// behaviour is correct.
let frameRecorder = InboundFrameRecorder()
let readCompleteCounter = ReadCompleteCounter()

let multiplexer = HTTP2StreamMultiplexer(mode: .server, channel: self.channel) { (childChannel, _) in
return childChannel.pipeline.addHandler(frameRecorder).flatMap {
childChannel.pipeline.addHandler(readCompleteCounter)
}
}
XCTAssertNoThrow(try self.channel.pipeline.addHandler(multiplexer).wait())

XCTAssertEqual(frameRecorder.receivedFrames.count, 0)
XCTAssertEqual(readCompleteCounter.readCompleteCount, 0)

// Wake up and activate the streams.
let requestHeaders = HPACKHeaders([(":path", "/"), (":method", "GET"), (":authority", "localhost"), (":scheme", "https")])

for streamID in [HTTP2StreamID(1), HTTP2StreamID(3), HTTP2StreamID(5)] {
let requestFrame = HTTP2Frame(streamID: streamID, payload: .headers(.init(headers: requestHeaders, endStream: false)))
self.channel.pipeline.fireChannelRead(NIOAny(requestFrame))
self.activateStream(streamID)
}
self.channel.embeddedEventLoop.run()

XCTAssertEqual(frameRecorder.receivedFrames.count, 3)
XCTAssertEqual(readCompleteCounter.readCompleteCount, 0)

// Firing in readComplete causes a readComplete for each stream.
self.channel.pipeline.fireChannelReadComplete()
XCTAssertEqual(frameRecorder.receivedFrames.count, 3)
XCTAssertEqual(readCompleteCounter.readCompleteCount, 3)

// Now we're going to send a data frame on stream 1.
var requestData = self.channel.allocator.buffer(capacity: 1024)
requestData.writeBytes("Hello world!".utf8)
let frame = HTTP2Frame(streamID: 1, payload: .data(.init(data: .byteBuffer(requestData), endStream: false)))
self.channel.pipeline.fireChannelRead(NIOAny(frame))

// We should have 4 reads, and 3 read completes.
XCTAssertEqual(frameRecorder.receivedFrames.count, 4)
XCTAssertEqual(readCompleteCounter.readCompleteCount, 3)

// Fire read complete on the parent and it'll propagate to the child, but only to the one
// that saw a frame.
self.channel.pipeline.fireChannelReadComplete()

// We should have 4 reads, and 4 read completes.
XCTAssertEqual(frameRecorder.receivedFrames.count, 4)
XCTAssertEqual(readCompleteCounter.readCompleteCount, 4)

// If we fire a new read complete on the parent, the children don't see it.
self.channel.pipeline.fireChannelReadComplete()
XCTAssertEqual(frameRecorder.receivedFrames.count, 4)
XCTAssertEqual(readCompleteCounter.readCompleteCount, 4)
}
}

0 comments on commit 06880b0

Please sign in to comment.