This repository has been archived by the owner on Jan 24, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 14
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #159 from UrbanCompass/thomas/observable-to-publisher
ObservableType (Snail) -> Publisher (Combine)
- Loading branch information
Showing
11 changed files
with
1,896 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,129 @@ | ||
// | ||
// DemandBuffer.swift | ||
// RxCombine | ||
// | ||
// Created by Shai Mishali on 21/02/2020. | ||
// Copyright © 2020 Combine Community. All rights reserved. | ||
// | ||
|
||
import Combine | ||
import Darwin | ||
import Foundation | ||
|
||
/// A buffer responsible for managing the demand of a downstream | ||
/// subscriber for an upstream publisher | ||
/// | ||
/// It buffers values and completion events and forwards them dynamically | ||
/// according to the demand requested by the downstream | ||
/// | ||
/// In a sense, the subscription only relays the requests for demand, as well | ||
/// the events emitted by the upstream — to this buffer, which manages | ||
/// the entire behavior and backpressure contract | ||
@available(iOS 13.0, *) | ||
class DemandBuffer<S: Combine.Subscriber> { | ||
private let lock = NSRecursiveLock() | ||
private var buffer = [S.Input]() | ||
private let subscriber: S | ||
private var completion: Subscribers.Completion<S.Failure>? | ||
private var demandState = Demand() | ||
|
||
/// Initialize a new demand buffer for a provided downstream subscriber | ||
/// | ||
/// - parameter subscriber: The downstream subscriber demanding events | ||
init(subscriber: S) { | ||
self.subscriber = subscriber | ||
} | ||
|
||
/// Buffer an upstream value to later be forwarded to | ||
/// the downstream subscriber, once it demands it | ||
/// | ||
/// - parameter value: Upstream value to buffer | ||
/// | ||
/// - returns: The demand fulfilled by the bufferr | ||
func buffer(value: S.Input) -> Subscribers.Demand { | ||
precondition(self.completion == nil, | ||
"How could a completed publisher sent values?! Beats me 🤷♂️") | ||
|
||
switch demandState.requested { | ||
case .unlimited: | ||
return subscriber.receive(value) | ||
default: | ||
buffer.append(value) | ||
return flush() | ||
} | ||
} | ||
|
||
/// Complete the demand buffer with an upstream completion event | ||
/// | ||
/// This method will deplete the buffer immediately, | ||
/// based on the currently accumulated demand, and relay the | ||
/// completion event down as soon as demand is fulfilled | ||
/// | ||
/// - parameter completion: Completion event | ||
func complete(completion: Subscribers.Completion<S.Failure>) { | ||
precondition(self.completion == nil, | ||
"Completion have already occured, which is quite awkward 🥺") | ||
|
||
self.completion = completion | ||
_ = flush() | ||
} | ||
|
||
/// Signal to the buffer that the downstream requested new demand | ||
/// | ||
/// - note: The buffer will attempt to flush as many events rqeuested | ||
/// by the downstream at this point | ||
func demand(_ demand: Subscribers.Demand) -> Subscribers.Demand { | ||
flush(adding: demand) | ||
} | ||
|
||
/// Flush buffered events to the downstream based on the current | ||
/// state of the downstream's demand | ||
/// | ||
/// - parameter newDemand: The new demand to add. If `nil`, the flush isn't the | ||
/// result of an explicit demand change | ||
/// | ||
/// - note: After fulfilling the downstream's request, if completion | ||
/// has already occured, the buffer will be cleared and the | ||
/// completion event will be sent to the downstream subscriber | ||
private func flush(adding newDemand: Subscribers.Demand? = nil) -> Subscribers.Demand { | ||
lock.lock() | ||
defer { lock.unlock() } | ||
|
||
if let newDemand = newDemand { | ||
demandState.requested += newDemand | ||
} | ||
|
||
// If buffer isn't ready for flushing, return immediately | ||
guard demandState.requested > 0 || newDemand == Subscribers.Demand.none else { return .none } | ||
|
||
while !buffer.isEmpty && demandState.processed < demandState.requested { | ||
demandState.requested += subscriber.receive(buffer.remove(at: 0)) | ||
demandState.processed += 1 | ||
} | ||
|
||
if let completion = completion { | ||
// Completion event was already sent | ||
buffer = [] | ||
demandState = .init() | ||
self.completion = nil | ||
subscriber.receive(completion: completion) | ||
return .none | ||
} | ||
|
||
let sentDemand = demandState.requested - demandState.sent | ||
demandState.sent += sentDemand | ||
return sentDemand | ||
} | ||
} | ||
|
||
// MARK: - Private Helpers | ||
@available(iOS 13.0, *) | ||
private extension DemandBuffer { | ||
/// A model that tracks the downstream's | ||
/// accumulated demand state | ||
struct Demand { | ||
var processed: Subscribers.Demand = .none | ||
var requested: Subscribers.Demand = .none | ||
var sent: Subscribers.Demand = .none | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
// Copyright © 2021 Compass. All rights reserved. | ||
|
||
import Combine | ||
import Foundation | ||
|
||
@available(iOS 13.0, *) | ||
public extension ObservableType { | ||
func asAnyPublisher() -> AnyPublisher<T, Error> { | ||
return SnailPublisher(upstream: self).eraseToAnyPublisher() | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
// Copyright © 2021 Compass. All rights reserved. | ||
|
||
import Combine | ||
import Foundation | ||
|
||
@available(iOS 13.0, *) | ||
public class SnailPublisher<Upstream: ObservableType>: Publisher { | ||
public typealias Output = Upstream.T | ||
public typealias Failure = Error | ||
|
||
private let upstream: Upstream | ||
|
||
init(upstream: Upstream) { | ||
self.upstream = upstream | ||
} | ||
|
||
public func receive<S: Combine.Subscriber>(subscriber: S) where Failure == S.Failure, Output == S.Input { | ||
subscriber.receive(subscription: SnailSubscription(upstream: upstream, | ||
downstream: subscriber)) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
// Copyright © 2021 Compass. All rights reserved. | ||
|
||
import Combine | ||
import Foundation | ||
|
||
@available(iOS 13.0, *) | ||
class SnailSubscription<Upstream: ObservableType, Downstream: Combine.Subscriber>: Combine.Subscription where Downstream.Input == Upstream.T, Downstream.Failure == Error { | ||
private var disposable: Subscriber<Upstream.T>? | ||
private let buffer: DemandBuffer<Downstream> | ||
|
||
init(upstream: Upstream, | ||
downstream: Downstream) { | ||
buffer = DemandBuffer(subscriber: downstream) | ||
disposable = upstream.subscribe(queue: nil, | ||
onNext: { [weak self] value in | ||
guard let self = self else { return } | ||
_ = self.buffer.buffer(value: value) | ||
}, | ||
onError: { [weak self] error in | ||
guard let self = self else { return } | ||
self.buffer.complete(completion: .failure(error)) | ||
}, | ||
onDone: { [weak self] in | ||
guard let self = self else { return } | ||
self.buffer.complete(completion: .finished) | ||
}) | ||
} | ||
|
||
func request(_ demand: Subscribers.Demand) { | ||
_ = self.buffer.demand(demand) | ||
} | ||
|
||
func cancel() { | ||
disposable?.dispose() | ||
disposable = nil | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,56 @@ | ||
// Copyright © 2021 Compass. All rights reserved. | ||
|
||
import Combine | ||
import Foundation | ||
@testable import Snail | ||
import XCTest | ||
|
||
@available(iOS 13.0, *) | ||
class FailAsPublisherTests: XCTestCase { | ||
enum TestError: Error { | ||
case test | ||
} | ||
|
||
private var subject: Observable<String>! | ||
private var strings: [String]! | ||
private var error: Error? | ||
private var subscriptions: Set<AnyCancellable>! | ||
|
||
override func setUp() { | ||
super.setUp() | ||
subject = Fail(TestError.test) | ||
strings = [] | ||
subscriptions = Set<AnyCancellable>() | ||
error = nil | ||
} | ||
|
||
override func tearDown() { | ||
subject = nil | ||
strings = nil | ||
error = nil | ||
subscriptions = nil | ||
} | ||
|
||
func testOnErrorIsRun() { | ||
subject.asAnyPublisher() | ||
.sink(receiveCompletion: { [unowned self] completion in | ||
if case let .failure(underlying) = completion { | ||
self.error = underlying as? TestError | ||
} | ||
}, | ||
receiveValue: { _ in }) | ||
.store(in: &subscriptions) | ||
|
||
XCTAssertEqual((error as? TestError), TestError.test) | ||
} | ||
|
||
func testOnNextIsNotRun() { | ||
subject.asAnyPublisher() | ||
.sink(receiveCompletion: { _ in }, | ||
receiveValue: { [unowned self] in self.strings.append($0) }) | ||
.store(in: &subscriptions) | ||
subject?.on(.next("1")) | ||
|
||
XCTAssertEqual(strings?.count, 0) | ||
} | ||
} |
Oops, something went wrong.