Skip to content

Commit

Permalink
Exert backpressure via stream flow control windows (apple#191)
Browse files Browse the repository at this point in the history
Motivation:

When a window update event is received in the stream channel we notify
the inbound window manager. Doing so may emit a WINDOW_UPDATE frame
to indicate the remote may send us more DATA. However, this happens even
if there are inbound frames waiting to be delivered to the channel. This
may lead to the remote sending us more DATA which we have to buffer
until it is read.

Modifications:

- Update the inbound window manager to track the number of bytes
  buffered in the stream channel
- Account for buffered bytes when receiving a new window size event
- Possibly emit WINDOW_UPDATE after unbuffering inbound frames into the
  stream channel's pipeline

Result:

The timing of WINDOW_UPDATE frames is better aligned with when frames
are actually read in a stream.
  • Loading branch information
glbrntt authored Mar 12, 2020
1 parent 0841f64 commit fbfe24c
Show file tree
Hide file tree
Showing 6 changed files with 189 additions and 25 deletions.
28 changes: 26 additions & 2 deletions Sources/NIOHTTP2/HTTP2StreamChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -554,11 +554,27 @@ private extension HTTP2StreamChannel {
self.pendingReads = CircularBuffer(initialCapacity: 0)
}

/// DinitialCapacityreads to the channel.
/// Deliver all pending reads to the channel.
private func deliverPendingReads() {
assert(self.isActive)
while self.pendingReads.count > 0 {
self.pipeline.fireChannelRead(NIOAny(self.pendingReads.removeFirst()))
let frame = self.pendingReads.removeFirst()

let dataLength: Int?
if case .data(let data) = frame.payload {
dataLength = data.data.readableBytes
} else {
dataLength = nil
}

self.pipeline.fireChannelRead(NIOAny(frame))

if let size = dataLength, let increment = self.windowManager.bufferedFrameEmitted(size: size) {
let frame = HTTP2Frame(streamID: self.streamID, payload: .windowUpdate(windowSizeIncrement: increment))
self.receiveOutboundFrame(frame, promise: nil)
// This flush should really go away, but we need it for now until we sort out window management.
self.multiplexer.childChannelFlush()
}
}
self.pipeline.fireChannelReadComplete()
}
Expand Down Expand Up @@ -599,8 +615,16 @@ internal extension HTTP2StreamChannel {
}

if self.unsatisfiedRead {
// We don't need to account for this frame in the window manager: it's being delivered
// straight into the pipeline.
self.pipeline.fireChannelRead(NIOAny(frame))
} else {
// Record the size of the frame so that when we receive a window update event our
// calculation on whether we emit a WINDOW_UPDATE frame is based on the bytes we have
// actually delivered into the pipeline.
if case .data(let dataPayload) = frame.payload {
self.windowManager.bufferedFrameReceived(size: dataPayload.data.readableBytes)
}
self.pendingReads.append(frame)
}
}
Expand Down
72 changes: 50 additions & 22 deletions Sources/NIOHTTP2/InboundWindowManager.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,19 @@
//
//===----------------------------------------------------------------------===//

/// A simple structure that managse an inbound flow control window.
/// A simple structure that manages an inbound flow control window.
///
/// For now, this just aims to emit window update frames whenever the flow control window drops below a certain size. It's very naive.
/// We'll worry about the rest of it later.
struct InboundWindowManager {
private var targetWindowSize: Int32

// The last window size we were told about. Used when we get changes to SETTINGS_INITIAL_WINDOW_SIZE.
/// The last window size we were told about. Used when we get changes to SETTINGS_INITIAL_WINDOW_SIZE.
private var lastWindowSize: Int?

/// The number of bytes of buffered frames.
private var bufferedBytes: Int = 0

init(targetSize: Int32) {
assert(targetSize <= HTTP2FlowControlWindow.maxSize)
assert(targetSize >= 0)
Expand All @@ -32,39 +35,64 @@ struct InboundWindowManager {
mutating func newWindowSize(_ newSize: Int) -> Int? {
self.lastWindowSize = newSize

// All math here happens on 64-bit ints, as it avoids overflow problems.
// The new size assumes all frames have been delivered to the stream. This isn't necessarily
// the case so we need to take the size of any buffered frames into account here.
return self.calculateWindowIncrement(windowSize: newSize + self.bufferedBytes)
}

mutating func initialWindowSizeChanged(delta: Int) -> Int? {
self.targetWindowSize += Int32(delta)

if let lastWindowSize = self.lastWindowSize {
// The delta applies to the current window size as well.
return self.newWindowSize(lastWindowSize + delta)
} else {
return nil
}
}

mutating func bufferedFrameReceived(size: Int) {
self.bufferedBytes += size
}

mutating func bufferedFrameEmitted(size: Int) -> Int? {
// Consume the bytes we just emitted.
self.bufferedBytes -= size
assert(self.bufferedBytes >= 0)

guard let lastWindowSize = self.lastWindowSize else {
return nil
}

return self.calculateWindowIncrement(windowSize: lastWindowSize + self.bufferedBytes)
}

private func calculateWindowIncrement(windowSize: Int) -> Int? {
// The simplest case is where newSize >= targetWindowSize. In that case, we do nothing.
// The next simplest case is where 0 <= newSize < targetWindowSize. In that case, if targetWindowSize >= newSize * 2, we update to full size.
// The other case is where newSize is negative. This can happen. In those cases, we want to increment by Int32.max or the total distance between
// newSize and targetWindowSize, whichever is *smaller*. This ensures the result fits into Int32.
if newSize >= targetWindowSize {
//
// The next simplest case is where 0 <= newSize < targetWindowSize. In that case,
// if targetWindowSize >= newSize * 2, we update to full size.
//
// The other case is where newSize is negative. This can happen. In those cases, we want to
// increment by Int32.max or the total distance between newSize and targetWindowSize,
// whichever is *smaller*. This ensures the result fits into Int32.
if windowSize >= self.targetWindowSize {
return nil
} else if newSize >= 0 {
let increment = self.targetWindowSize - Int32(newSize)
if increment >= newSize {
} else if windowSize >= 0 {
let increment = self.targetWindowSize - Int32(windowSize)
if increment >= windowSize {
return Int(increment)
} else {
return nil
}
} else {
// All math in here happens on 64-bit ints to avoid overflow issues.
let newSize = Int64(newSize)
let windowSize = Int64(windowSize)
let targetWindowSize = Int64(self.targetWindowSize)

let increment = min(abs(newSize) + targetWindowSize, Int64(Int32.max))
let increment = min(abs(windowSize) + targetWindowSize, Int64(Int32.max))
return Int(increment)
}
}

mutating func initialWindowSizeChanged(delta: Int) -> Int? {
self.targetWindowSize += Int32(delta)

if let lastWindowSize = self.lastWindowSize {
// The delta applies to the current window size as well.
return self.newWindowSize(lastWindowSize + delta)
} else {
return nil
}
}
}
1 change: 1 addition & 0 deletions Tests/LinuxMain.swift
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import XCTest
testCase(HTTP2ToHTTP1CodecTests.allTests),
testCase(HeaderTableTests.allTests),
testCase(HuffmanCodingTests.allTests),
testCase(InboundWindowManagerTests.allTests),
testCase(IntegerCodingTests.allTests),
testCase(OutboundFlowControlBufferTests.allTests),
testCase(ReentrancyTests.allTests),
Expand Down
37 changes: 37 additions & 0 deletions Tests/NIOHTTP2Tests/InboundWindowManagerTests+XCTest.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2017-2018 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
//
// InboundWindowManagerTests+XCTest.swift
//
import XCTest

///
/// NOTE: This file was generated by generate_linux_tests.rb
///
/// Do NOT edit this file directly as it will be regenerated automatically when needed.
///

extension InboundWindowManagerTests {

static var allTests : [(String, (InboundWindowManagerTests) -> () throws -> Void)] {
return [
("testNewWindowSizeWhenNewSizeIsAtOrAboveTarget", testNewWindowSizeWhenNewSizeIsAtOrAboveTarget),
("testNewWindowSizeWhenNewSizeIsAtLeastHalfTarget", testNewWindowSizeWhenNewSizeIsAtLeastHalfTarget),
("testNewWindowSizeWhenNewSizeIsLessThanOrEqualHalfTarget", testNewWindowSizeWhenNewSizeIsLessThanOrEqualHalfTarget),
("testNewWindowSizeWithBufferedBytes", testNewWindowSizeWithBufferedBytes),
("testInitialWindowSizeChanged", testInitialWindowSizeChanged),
]
}
}

74 changes: 74 additions & 0 deletions Tests/NIOHTTP2Tests/InboundWindowManagerTests.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2020 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

@testable import NIOHTTP2
import XCTest

class InboundWindowManagerTests: XCTestCase {

func testNewWindowSizeWhenNewSizeIsAtOrAboveTarget() {
var windowManager = InboundWindowManager(targetSize: 10)
XCTAssertNil(windowManager.newWindowSize(10))
XCTAssertNil(windowManager.newWindowSize(11))
}

func testNewWindowSizeWhenNewSizeIsAtLeastHalfTarget() {
var windowManager = InboundWindowManager(targetSize: 10)
XCTAssertNil(windowManager.newWindowSize(6))
XCTAssertNil(windowManager.newWindowSize(9))
}

func testNewWindowSizeWhenNewSizeIsLessThanOrEqualHalfTarget() {
var windowManager = InboundWindowManager(targetSize: 10)
XCTAssertEqual(windowManager.newWindowSize(0), 10)
XCTAssertEqual(windowManager.newWindowSize(5), 5)
}

func testNewWindowSizeWithBufferedBytes() {
var windowManager = InboundWindowManager(targetSize: 10)

// The adjusted newSize is 3 (new size 2, buffered 1) which is less than half the target so
// we need to increment.
windowManager.bufferedFrameReceived(size: 1)
XCTAssertEqual(windowManager.newWindowSize(2), 7)

// The adjusted newSize is 6 (new size 2, buffered 1+3=4) which is more than half the target
// so no need to increment.
windowManager.bufferedFrameReceived(size: 3)
XCTAssertNil(windowManager.newWindowSize(2))

// The adjusted newSize is 10 (new size 2, buffered 4+4=8) which is equal to the target so
// no need to increment.
windowManager.bufferedFrameReceived(size: 4)
XCTAssertNil(windowManager.newWindowSize(2))

// The last window size was 2; we're emitting 2 of our 8 buffered bytes so now the adjusted
// size is 8 (last size 2, buffered 6); no need to increment.
XCTAssertNil(windowManager.bufferedFrameEmitted(size: 2))

// Emit another 5 bytes: we're down to 3 (last size 2, buffered 1); we need to increment.
XCTAssertEqual(windowManager.bufferedFrameEmitted(size: 5), 7)
}

func testInitialWindowSizeChanged() {
var windowManager = InboundWindowManager(targetSize: 10)

// There's no lastWindow size, so no increment.
XCTAssertNil(windowManager.initialWindowSizeChanged(delta: 10))

windowManager.bufferedFrameReceived(size: 3)
// adjusted size is 8 (new size 5, buffered 3) which is less than half the target (now 20).
XCTAssertEqual(windowManager.newWindowSize(5), 12)
}
}
2 changes: 1 addition & 1 deletion scripts/sanity.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ here="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"

function replace_acceptable_years() {
# this needs to replace all acceptable forms with 'YEARS'
sed -e 's/2017-201[89]/YEARS/g' -e 's/2019/YEARS/g'
sed -e 's/2017-201[89]/YEARS/g' -e 's/2019/YEARS/g' -e 's/2020/YEARS/g'
}

printf "=> Checking linux tests... "
Expand Down

0 comments on commit fbfe24c

Please sign in to comment.