From 593f652ffb2170f92fd299c93d5da3fcf2af54cb Mon Sep 17 00:00:00 2001 From: Thomas Bajis Date: Sun, 16 May 2021 20:36:33 -0400 Subject: [PATCH 01/13] Add SnailPublisher, SnailSubscription --- Snail.xcodeproj/project.pbxproj | 16 +++++++++ Snail/Snail+Combine/Observable+Combine.swift | 34 ++++++++++++++++++++ Snail/Snail+Combine/SnailSubscription.swift | 34 ++++++++++++++++++++ 3 files changed, 84 insertions(+) create mode 100644 Snail/Snail+Combine/Observable+Combine.swift create mode 100644 Snail/Snail+Combine/SnailSubscription.swift diff --git a/Snail.xcodeproj/project.pbxproj b/Snail.xcodeproj/project.pbxproj index 2e5ec04..50a819b 100644 --- a/Snail.xcodeproj/project.pbxproj +++ b/Snail.xcodeproj/project.pbxproj @@ -16,6 +16,8 @@ 24FABD581DFEF7EC005CF84E /* FailTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 24FABD571DFEF7EC005CF84E /* FailTests.swift */; }; 24FABD5A1DFF0B48005CF84E /* Replay.swift in Sources */ = {isa = PBXBuildFile; fileRef = 24FABD591DFF0B48005CF84E /* Replay.swift */; }; 24FABD5C1DFF0BAF005CF84E /* ReplayTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 24FABD5B1DFF0BAF005CF84E /* ReplayTests.swift */; }; + 2E53BDEC264ECC940030B9FB /* Observable+Combine.swift in Sources */ = {isa = PBXBuildFile; fileRef = 2E53BDEB264ECC940030B9FB /* Observable+Combine.swift */; }; + 2E53BDEE264ED4C70030B9FB /* SnailSubscription.swift in Sources */ = {isa = PBXBuildFile; fileRef = 2E53BDED264ED4C70030B9FB /* SnailSubscription.swift */; }; CB2936771DFE151B00792E6B /* Just.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB2936761DFE151B00792E6B /* Just.swift */; }; CB2936791DFEF77500792E6B /* JustTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB2936781DFEF77500792E6B /* JustTests.swift */; }; CBE54A7A1E5A16AC00971F74 /* Subscriber.swift in Sources */ = {isa = PBXBuildFile; fileRef = CBE54A791E5A16AC00971F74 /* Subscriber.swift */; }; @@ -59,6 +61,8 @@ 24FABD571DFEF7EC005CF84E /* FailTests.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = FailTests.swift; sourceTree = ""; }; 24FABD591DFF0B48005CF84E /* Replay.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Replay.swift; sourceTree = ""; }; 24FABD5B1DFF0BAF005CF84E /* ReplayTests.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = ReplayTests.swift; sourceTree = ""; }; + 2E53BDEB264ECC940030B9FB /* Observable+Combine.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "Observable+Combine.swift"; sourceTree = ""; }; + 2E53BDED264ED4C70030B9FB /* SnailSubscription.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = SnailSubscription.swift; sourceTree = ""; }; CB2936761DFE151B00792E6B /* Just.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Just.swift; sourceTree = ""; }; CB2936781DFEF77500792E6B /* JustTests.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = JustTests.swift; sourceTree = ""; }; CBE54A791E5A16AC00971F74 /* Subscriber.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Subscriber.swift; sourceTree = ""; }; @@ -112,6 +116,15 @@ path = Extensions; sourceTree = ""; }; + 2E53BDEA264ECC770030B9FB /* Snail+Combine */ = { + isa = PBXGroup; + children = ( + 2E53BDEB264ECC940030B9FB /* Observable+Combine.swift */, + 2E53BDED264ED4C70030B9FB /* SnailSubscription.swift */, + ); + path = "Snail+Combine"; + sourceTree = ""; + }; CBE54E371DFB36DF0008DD64 = { isa = PBXGroup; children = ( @@ -133,6 +146,7 @@ CBE54E431DFB36DF0008DD64 /* Snail */ = { isa = PBXGroup; children = ( + 2E53BDEA264ECC770030B9FB /* Snail+Combine */, F569538B2320476100D35C80 /* Closure.swift */, CBE54E441DFB36DF0008DD64 /* Snail.h */, CBE54E451DFB36DF0008DD64 /* Info.plist */, @@ -314,6 +328,7 @@ isa = PBXSourcesBuildPhase; buildActionMask = 2147483647; files = ( + 2E53BDEC264ECC940030B9FB /* Observable+Combine.swift in Sources */, CBE54E621DFB39510008DD64 /* ObservableType.swift in Sources */, CBE54A7A1E5A16AC00971F74 /* Subscriber.swift in Sources */, 2421BA721E09801000EA9064 /* UIGestureRecognizerExtensions.swift in Sources */, @@ -327,6 +342,7 @@ 24CF1FA81EF875A400F34234 /* URLSessionExtensions.swift in Sources */, 2408FA902112A15900B9F59E /* Scheduler.swift in Sources */, CBE54E601DFB39510008DD64 /* Event.swift in Sources */, + 2E53BDEE264ED4C70030B9FB /* SnailSubscription.swift in Sources */, F5C973A622F20C86003DB42C /* Disposer.swift in Sources */, CBE54E631DFB39510008DD64 /* Variable.swift in Sources */, CBE54E6D1DFB6A910008DD64 /* UIControlExtensions.swift in Sources */, diff --git a/Snail/Snail+Combine/Observable+Combine.swift b/Snail/Snail+Combine/Observable+Combine.swift new file mode 100644 index 0000000..2215395 --- /dev/null +++ b/Snail/Snail+Combine/Observable+Combine.swift @@ -0,0 +1,34 @@ +// Copyright © 2021 Compass. All rights reserved. + +#if canImport(Combine) +import Combine +import Foundation + +@available(iOS 13.0, *) +public extension ObservableType { + var publisher: AnyPublisher { + return SnailPublisher(upstream: self).eraseToAnyPublisher() + } + + func asPublisher() -> AnyPublisher { + return publisher + } +} + +@available(iOS 13.0, *) +public class SnailPublisher: Publisher { + public typealias Output = Upstream.T + public typealias Failure = Swift.Error + + private let upstream: Upstream + + init(upstream: Upstream) { + self.upstream = upstream + } + + public func receive(subscriber: S) where Failure == S.Failure, Output == S.Input { + subscriber.receive(subscription: SnailSubscription(upstream: upstream, + downstream: subscriber)) + } +} +#endif diff --git a/Snail/Snail+Combine/SnailSubscription.swift b/Snail/Snail+Combine/SnailSubscription.swift new file mode 100644 index 0000000..7058040 --- /dev/null +++ b/Snail/Snail+Combine/SnailSubscription.swift @@ -0,0 +1,34 @@ +// Copyright © 2021 Compass. All rights reserved. + +#if canImport(Combine) +import Combine +import Foundation + +@available(iOS 13.0, *) +class SnailSubscription: Combine.Subscription where Downstream.Input == Upstream.T, Downstream.Failure == Error { + private var disposable: Subscriber? + + init(upstream: Upstream, + downstream: Downstream) { + disposable = upstream.subscribe(queue: nil, + onNext: { value in + _ = downstream.receive(value) + }, + onError: { error in + downstream.receive(completion: .failure(error)) + }, + onDone: { + downstream.receive(completion: .finished) + }) + } + + func request(_ demand: Subscribers.Demand) { + // For now, not supporting changing any kind of demand + } + + func cancel() { + disposable?.dispose() + disposable = nil + } +} +#endif From 1b29b8630ca0b7c633587e1566119e4a06f81f18 Mon Sep 17 00:00:00 2001 From: Thomas Bajis Date: Sun, 16 May 2021 21:35:26 -0400 Subject: [PATCH 02/13] Add ObservableAsPublisherTests --- Snail.xcodeproj/project.pbxproj | 12 + .../Combine/ObservableAsPublisherTests.swift | 1132 +++++++++++++++++ 2 files changed, 1144 insertions(+) create mode 100644 SnailTests/Combine/ObservableAsPublisherTests.swift diff --git a/Snail.xcodeproj/project.pbxproj b/Snail.xcodeproj/project.pbxproj index 50a819b..861d3ed 100644 --- a/Snail.xcodeproj/project.pbxproj +++ b/Snail.xcodeproj/project.pbxproj @@ -18,6 +18,7 @@ 24FABD5C1DFF0BAF005CF84E /* ReplayTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 24FABD5B1DFF0BAF005CF84E /* ReplayTests.swift */; }; 2E53BDEC264ECC940030B9FB /* Observable+Combine.swift in Sources */ = {isa = PBXBuildFile; fileRef = 2E53BDEB264ECC940030B9FB /* Observable+Combine.swift */; }; 2E53BDEE264ED4C70030B9FB /* SnailSubscription.swift in Sources */ = {isa = PBXBuildFile; fileRef = 2E53BDED264ED4C70030B9FB /* SnailSubscription.swift */; }; + 2E53BE072651F2990030B9FB /* ObservableAsPublisherTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 2E53BE062651F2990030B9FB /* ObservableAsPublisherTests.swift */; }; CB2936771DFE151B00792E6B /* Just.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB2936761DFE151B00792E6B /* Just.swift */; }; CB2936791DFEF77500792E6B /* JustTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB2936781DFEF77500792E6B /* JustTests.swift */; }; CBE54A7A1E5A16AC00971F74 /* Subscriber.swift in Sources */ = {isa = PBXBuildFile; fileRef = CBE54A791E5A16AC00971F74 /* Subscriber.swift */; }; @@ -63,6 +64,7 @@ 24FABD5B1DFF0BAF005CF84E /* ReplayTests.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = ReplayTests.swift; sourceTree = ""; }; 2E53BDEB264ECC940030B9FB /* Observable+Combine.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "Observable+Combine.swift"; sourceTree = ""; }; 2E53BDED264ED4C70030B9FB /* SnailSubscription.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = SnailSubscription.swift; sourceTree = ""; }; + 2E53BE062651F2990030B9FB /* ObservableAsPublisherTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ObservableAsPublisherTests.swift; sourceTree = ""; }; CB2936761DFE151B00792E6B /* Just.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Just.swift; sourceTree = ""; }; CB2936781DFEF77500792E6B /* JustTests.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = JustTests.swift; sourceTree = ""; }; CBE54A791E5A16AC00971F74 /* Subscriber.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Subscriber.swift; sourceTree = ""; }; @@ -125,6 +127,14 @@ path = "Snail+Combine"; sourceTree = ""; }; + 2E53BE052651F2710030B9FB /* Combine */ = { + isa = PBXGroup; + children = ( + 2E53BE062651F2990030B9FB /* ObservableAsPublisherTests.swift */, + ); + path = Combine; + sourceTree = ""; + }; CBE54E371DFB36DF0008DD64 = { isa = PBXGroup; children = ( @@ -169,6 +179,7 @@ CBE54E4E1DFB36DF0008DD64 /* SnailTests */ = { isa = PBXGroup; children = ( + 2E53BE052651F2710030B9FB /* Combine */, CBE54E511DFB36DF0008DD64 /* Info.plist */, F5695389232046AA00D35C80 /* ClosureTests.swift */, F5C973A722F4B359003DB42C /* DisposerTests.swift */, @@ -362,6 +373,7 @@ 24FABD5C1DFF0BAF005CF84E /* ReplayTests.swift in Sources */, DEF9B98D1FD5D8FD00F8514E /* UniqueTests.swift in Sources */, F569538A232046AA00D35C80 /* ClosureTests.swift in Sources */, + 2E53BE072651F2990030B9FB /* ObservableAsPublisherTests.swift in Sources */, CBE54E671DFB4F3F0008DD64 /* VariableTests.swift in Sources */, 24FABD581DFEF7EC005CF84E /* FailTests.swift in Sources */, DEEDA8EB2051BCB4000FE578 /* NotificationCenterExtensions.swift in Sources */, diff --git a/SnailTests/Combine/ObservableAsPublisherTests.swift b/SnailTests/Combine/ObservableAsPublisherTests.swift new file mode 100644 index 0000000..3e24ec2 --- /dev/null +++ b/SnailTests/Combine/ObservableAsPublisherTests.swift @@ -0,0 +1,1132 @@ +// Copyright © 2021 Compass. All rights reserved. + +// swiftlint:disable file_length + +#if canImport(Combine) +import Combine +import Foundation +@testable import Snail +import XCTest + +@available(iOS 13.0, *) +class ObservableAsPublisherTests: XCTestCase { + enum TestError: Error { + case test + } + + private var subject: Observable! + private var strings: [String]! + private var error: Error? + private var done: Bool? + private var disposer: Disposer! + + private var subscription: AnyCancellable? + + override func setUp() { + super.setUp() + subject = Observable() + strings = [] + error = nil + done = nil + disposer = Disposer() + } + + override func tearDown() { + subject = nil + strings = nil + error = nil + done = nil + disposer = nil + subscription = nil + super.tearDown() + } + + func testNext() { + subscription = subject.asPublisher() + .sink(receiveCompletion: { [unowned self] completion in + switch completion { + case .finished: + self.done = true + case let .failure(underlyingError): + self.error = underlyingError + } + }, + receiveValue: { [unowned self] in self.strings?.append($0) }) + + ["1", "2"].forEach { subject?.on(.next($0)) } + + XCTAssertEqual(strings, ["1", "2"]) + } + + func testOnDone() { + subscription = subject.asPublisher() + .sink(receiveCompletion: { [unowned self] completion in + switch completion { + case .finished: + self.done = true + case let .failure(underlyingError): + self.error = underlyingError + } + }, + receiveValue: { [unowned self] in self.strings?.append($0) }) + + subject?.on(.next("1")) + subject?.on(.next("2")) + subject?.on(.done) + subject?.on(.next("3")) + + XCTAssertEqual(strings?.count, 2) + XCTAssertEqual(strings?[0], "1") + XCTAssertEqual(strings?[1], "2") + XCTAssertEqual(done, true) + } + + func testOnError() { + subscription = subject.asPublisher() + .sink(receiveCompletion: { [unowned self] completion in + switch completion { + case .finished: + self.done = true + case let .failure(underlyingError): + self.error = underlyingError + } + }, + receiveValue: { [unowned self] in self.strings?.append($0) }) + + subject?.on(.next("1")) + subject?.on(.next("2")) + subject?.on(.error(TestError.test)) + subject?.on(.next("3")) + XCTAssertEqual(strings?.count, 2) + XCTAssertEqual(strings?[0], "1") + XCTAssertEqual(strings?[1], "2") + XCTAssertEqual(error as? TestError, .test) + } + + func testFiresStoppedEventOnSubscribeIfStopped() { + subject?.on(.error(TestError.test)) + + var oldError: TestError? + subscription = subject.asPublisher() + .sink(receiveCompletion: { completion in + if case let .failure(underlying) = completion { + oldError = underlying as? TestError + } + }, receiveValue: { _ in }) + XCTAssertEqual(oldError, .test) + } + + func testRemovingSubscribers() { + subscription = subject.asPublisher() + .sink(receiveCompletion: { _ in }, + receiveValue: { [unowned self] str in + self.strings.append(str) + }) + subject?.on(.next("1")) + subject?.removeSubscribers() + subject?.on(.next("2")) + XCTAssertEqual(strings?[0], "1") + XCTAssertEqual(strings?.count, 1) + } + + func testRemoveSubscriber() { + let subscriberToRemove = subject?.subscribe( + onNext: { string in self.strings?.append(string) }, + onError: { error in self.error = error }, + onDone: { self.done = true }) + + subscription = subject.asPublisher() + .sink(receiveCompletion: { _ in }, + receiveValue: { [unowned self] str in + self.strings.append(str) + }) + + subject?.on(.next("1")) + guard let subscriber = subscriberToRemove else { + return + } + subject?.removeSubscriber(subscriber: subscriber) + subject?.on(.next("2")) + XCTAssertEqual(strings?.count, 3) + XCTAssertEqual(strings?[0], "1") + XCTAssertEqual(strings?[1], "1") + XCTAssertEqual(strings?[2], "2") + subject?.removeSubscriber(subscriber: subscriber) + subject?.on(.next("3")) + XCTAssertEqual(strings?.count, 4) + XCTAssertEqual(strings?[0], "1") + XCTAssertEqual(strings?[1], "1") + XCTAssertEqual(strings?[2], "2") + XCTAssertEqual(strings?[3], "3") + } + + func testThrottle() { + let observable = Observable() + var received: [String] = [] + + let exp = expectation(description: "throttle") + let delay = 0.01 + + DispatchQueue.global().asyncAfter(deadline: DispatchTime.now() + delay) { + exp.fulfill() + } + + subscription = observable.throttle(delay) + .asPublisher() + .sink(receiveCompletion: { _ in }, + receiveValue: { received.append($0) }) + + observable.on(.next("1")) + observable.on(.next("2")) + waitForExpectations(timeout: delay*2) { _ in + XCTAssertEqual(received.count, 1) + XCTAssertEqual(received.first, "2") + } + } + + func testThrottleDelays() { + let observable = Observable() + var received: [String] = [] + + let exp = expectation(description: "debounce") + let delay = 0.01 + + DispatchQueue.main.asyncAfter(deadline: DispatchTime.now() + delay) { + observable.on(.next("2")) + observable.on(.next("3")) + DispatchQueue.main.asyncAfter(deadline: DispatchTime.now() + delay) { + exp.fulfill() + } + } + + subscription = observable.throttle(delay) + .asPublisher() + .sink(receiveCompletion: { _ in }, + receiveValue: { received.append($0) }) + + observable.on(.next("1")) + + waitForExpectations(timeout: 1) { _ in + XCTAssertEqual(received.count, 2) + XCTAssertEqual(received.first, "1") + XCTAssertEqual(received.last, "3") + } + } + + func testDebounce() { + let observable = Observable() + var received: [String] = [] + + let exp = expectation(description: "debounce") + let delay = 0.02 + + DispatchQueue.main.asyncAfter(deadline: DispatchTime.now() + delay/2) { + observable.on(.next("2")) + DispatchQueue.main.asyncAfter(deadline: DispatchTime.now() + delay/2) { + observable.on(.next("3")) + DispatchQueue.main.asyncAfter(deadline: DispatchTime.now() + delay) { + exp.fulfill() + } + } + } + + subscription = observable.debounce(delay) + .asPublisher() + .sink(receiveCompletion: { _ in }, + receiveValue: { received.append($0) }) + + observable.on(.next("1")) + + waitForExpectations(timeout: 1) { _ in + XCTAssertEqual(received.count, 1) + XCTAssertEqual(received.first, "3") + } + } + + func testSkipFirst() { + let observable = Observable() + var received: [String] = [] + + subscription = observable.skip(first: 2) + .asPublisher() + .sink(receiveCompletion: { _ in }, + receiveValue: { received.append($0) }) + + observable.on(.next("1")) + observable.on(.next("2")) + observable.on(.next("3")) + XCTAssertEqual(received.count, 1) + XCTAssertEqual(received.first, "3") + } + + func testSkipError() { + let observable = Observable() + + var error: TestError? + subscription = observable.skip(first: 2) + .asPublisher() + .sink(receiveCompletion: { completion in + if case let .failure(underlying) = completion { + error = underlying as? TestError + } + }, + receiveValue: { _ in }) + observable.on(.error(TestError.test)) + + XCTAssertEqual(error, .test) + } + + func testSkipDone() { + let observable = Observable() + var done = false + + subscription = observable.skip(first: 2) + .asPublisher() + .sink(receiveCompletion: { completion in + if case .finished = completion { + done = true + } + }, + receiveValue: { _ in }) + + observable.on(.done) + + XCTAssertEqual(done, true) + } + + func testTakeFirst() { + let observable = Observable() + var received: [String] = [] + + subscription = observable.take(first: 2) + .asPublisher() + .sink(receiveCompletion: { _ in }, + receiveValue: { received.append($0) }) + + observable.on(.next("1")) + observable.on(.next("2")) + observable.on(.next("3")) + + XCTAssertEqual(received, ["1", "2"]) + } + + func testTakeError() { + let observable = Observable() + + var error: TestError? + subscription = observable.take(first: 2) + .asPublisher() + .sink(receiveCompletion: { completion in + if case let .failure(underlying) = completion { + error = underlying as? TestError + } + }, + receiveValue: { _ in }) + + observable.on(.error(TestError.test)) + + XCTAssertEqual(error, .test) + } + + func testTakeDone() { + let observable = Observable() + var done = false + + subscription = observable.take(first: 2) + .asPublisher() + .sink(receiveCompletion: { completion in + if case .finished = completion { + done = true + } + }, + receiveValue: { _ in }) + + observable.on(.done) + + XCTAssertEqual(done, true) + } + + func testTakeDoneWhenCountIsReached() { + let observable = Observable() + var done = false + + subscription = observable.take(first: 2) + .asPublisher() + .sink(receiveCompletion: { completion in + if case .finished = completion { + done = true + } + }, + receiveValue: { _ in }) + + observable.on(.next("1")) + observable.on(.next("2")) + + XCTAssertEqual(done, true) + } + + func testForward() { + var received: [String] = [] + var receivedError: TestError? + + let subject = Observable() + let observable = Observable() + observable.forward(to: subject) + + subscription = subject.asPublisher() + .sink(receiveCompletion: { completion in + if case let .failure(underlying) = completion { + receivedError = underlying as? TestError + } + }, + receiveValue: { received.append($0) }) + + observable.on(.next("1")) + observable.on(.error(TestError.test)) + + XCTAssertEqual(received.count, 1) + XCTAssertEqual(received.first, "1") + XCTAssertEqual(receivedError, .test) + } + + func testForwardDone() { + var received: [String] = [] + + let subject = Observable() + let observable = Observable() + observable.forward(to: subject) + + subscription = subject.asPublisher() + .sink(receiveCompletion: { _ in }, + receiveValue: { received.append($0) }) + + observable.on(.next("1")) + observable.on(.done) + + XCTAssertEqual(received.count, 1) + XCTAssertEqual(received.first, "1") + } + + func testMerge() { + var received: [String] = [] + + let a = Observable() + let b = Observable() + + let subject = Observable.merge([a, b]) + + subscription = subject.asPublisher() + .sink(receiveCompletion: { _ in }, + receiveValue: { received.append($0) }) + + a.on(.next("1")) + b.on(.next("2")) + b.on(.done) + + XCTAssertEqual(received.count, 2) + XCTAssertEqual(received.first, "1") + XCTAssertEqual(received.last, "2") + } + + func testMergeWithoutArray() { + var received: [String] = [] + + let a = Observable() + let b = Observable() + + let subject = Observable.merge(a, b) + + subscription = subject.asPublisher() + .sink(receiveCompletion: { _ in }, + receiveValue: { received.append($0) }) + + a.on(.next("1")) + b.on(.next("2")) + b.on(.done) + + XCTAssertEqual(received.count, 2) + XCTAssertEqual(received.first, "1") + XCTAssertEqual(received.last, "2") + } + + func testCombineLatestNonOptional() { + var received: [String] = [] + + let string = Observable() + let int = Observable() + + let subject = Observable.combineLatest(string, int) + + subscription = subject.asPublisher() + .sink(receiveCompletion: { _ in }, + receiveValue: { string, int in + received.append(("\(string): \(int)")) + }) + + string.on(.next("The value")) + string.on(.next("The number")) + int.on(.next(1)) + int.on(.next(2)) + string.on(.next("The digit")) + int.on(.next(3)) + int.on(.done) + + XCTAssertEqual(received.count, 4) + XCTAssertEqual(received[0], "The number: 1") + XCTAssertEqual(received[1], "The number: 2") + XCTAssertEqual(received[2], "The digit: 2") + XCTAssertEqual(received[3], "The digit: 3") + } + + func testCombineLatestOptional() { + var received: [String] = [] + + let string = Observable() + let int = Observable() + + let subject = Observable.combineLatest(string, int) + + subscription = subject.asPublisher() + .sink(receiveCompletion: { _ in }, + receiveValue: { string, int in + received.append(("\(string ?? ""): \(int ?? 0)")) + }) + + string.on(.next("The value")) + string.on(.next("The number")) + int.on(.next(1)) + int.on(.next(nil)) + string.on(.next(nil)) + int.on(.next(3)) + string.on(.done) + + XCTAssertEqual(received.count, 4) + XCTAssertEqual(received[0], "The number: 1") + XCTAssertEqual(received[1], "The number: 0") + XCTAssertEqual(received[2], ": 0") + XCTAssertEqual(received[3], ": 3") + } + + func testCombineLatestError_firstMember() { + var received: [String] = [] + + let string = Observable() + let int = Observable() + + let subject = Observable.combineLatest(string, int) + + subscription = subject.asPublisher() + .sink(receiveCompletion: { completion in + if case .failure = completion { + received.append("ERROR") + } + }, + receiveValue: { string, int in + received.append(("\(string): \(int)")) + }) + + string.on(.next("The number")) + int.on(.next(1)) + string.on(.error(TestError.test)) + + string.on(.next("The digit")) + int.on(.next(2)) + string.on(.error(TestError.test)) + + XCTAssertEqual(received.count, 2) + XCTAssertEqual(received.first, "The number: 1") + XCTAssertEqual(received.last, "ERROR") + } + + func testCombineLatestError_secondMember() { + var received: [String] = [] + + let string = Observable() + let int = Observable() + + let subject = Observable.combineLatest(string, int) + + subscription = subject.asPublisher() + .sink(receiveCompletion: { completion in + if case .failure = completion { + received.append("ERROR") + } + }, + receiveValue: { _ in }) + + int.on(.error(TestError.test)) + int.on(.next(1)) + + XCTAssertEqual(received.count, 1) + XCTAssertEqual(received.first, "ERROR") + } + + func testCombineLatestDone_whenAllDone() { + let obs1 = Observable() + let obs2 = Observable() + + var isDone = false + subscription = Observable.combineLatest(obs1, obs2) + .asPublisher() + .sink(receiveCompletion: { completion in + if case .finished = completion { + isDone = true + } + }, + receiveValue: { _ in }) + + obs1.on(.done) + XCTAssertFalse(isDone) + + obs2.on(.done) + XCTAssertTrue(isDone) + } + + func testCombineLatest3() { + let one = Observable() + let two = Observable() + let three = Observable() + + var received = [(String, Int, Double)]() + let subject = Observable.combineLatest(one, two, three) + + subscription = subject.asPublisher() + .sink(receiveCompletion: { _ in }, + receiveValue: { received.append($0) }) + + one.on(.next("The string")) + XCTAssertTrue(received.isEmpty) + + two.on(.next(1)) + XCTAssertTrue(received.isEmpty) + + three.on(.next(100.5)) + XCTAssertEqual(received[0].0, "The string") + XCTAssertEqual(received[0].1, 1) + XCTAssertEqual(received[0].2, 100.5) + } + + func testCombineLatest3Error_fromSecondMember() { + let one = Observable() + let two = Observable() + let three = Observable() + let subject = Observable.combineLatest(one, two, three) + + let exp = expectation(description: "combineLatest3 forwards error from observable") + subscription = subject.asPublisher() + .sink(receiveCompletion: { completion in + if case .failure = completion { + exp.fulfill() + } + }, + receiveValue: { _ in }) + two.on(.error(TestError.test)) + waitForExpectations(timeout: 1) + } + + func testCombineLatest3Error_fromThirdMember() { + let one = Observable() + let two = Observable() + let three = Observable() + let subject = Observable.combineLatest(one, two, three) + + let exp = expectation(description: "combineLatest3 forwards error from observable") + subscription = subject.asPublisher() + .sink(receiveCompletion: { completion in + if case .failure = completion { + exp.fulfill() + } + }, + receiveValue: { _ in }) + three.on(.error(TestError.test)) + waitForExpectations(timeout: 1) + } + + func testCombineLatest3Done_whenAllDone() { + let obs1 = Observable() + let obs2 = Observable() + let obs3 = Observable() + + var isDone = false + subscription = Observable.combineLatest(obs1, obs2, obs3) + .asPublisher() + .sink(receiveCompletion: { completion in + if case .finished = completion { + isDone = true + } + }, + receiveValue: { _ in }) + + obs1.on(.done) + XCTAssertFalse(isDone) + + obs2.on(.done) + XCTAssertFalse(isDone) + + obs3.on(.done) + XCTAssertTrue(isDone) + } + + func testCombineLatest3Optional() { + let one = Observable() + let two = Observable() + let three = Observable() + + var received = [(String, Int?, Double)]() + let subject = Observable.combineLatest(one, two, three) + + subscription = subject.asPublisher() + .sink(receiveCompletion: { _ in }, + receiveValue: { received.append($0) }) + + one.on(.next("The string")) + XCTAssertTrue(received.isEmpty) + + two.on(.next(nil)) + XCTAssertTrue(received.isEmpty) + + three.on(.next(100.5)) + XCTAssertEqual(received[0].0, "The string") + XCTAssertEqual(received[0].1, nil) + XCTAssertEqual(received[0].2, 100.5) + } + + func testCombineLatest4() { + let one = Observable() + let two = Observable() + let three = Observable() + let four = Observable() + + var received = [(String, Int, Double, String)]() + let subject = Observable.combineLatest(one, two, three, four) + + subscription = subject.asPublisher() + .sink(receiveCompletion: { _ in }, + receiveValue: { received.append($0) }) + + one.on(.next("The string")) + XCTAssertTrue(received.isEmpty) + + two.on(.next(1)) + XCTAssertTrue(received.isEmpty) + + three.on(.next(100.5)) + XCTAssertTrue(received.isEmpty) + + four.on(.next("The other string")) + XCTAssertEqual(received[0].0, "The string") + XCTAssertEqual(received[0].1, 1) + XCTAssertEqual(received[0].2, 100.5) + XCTAssertEqual(received[0].3, "The other string") + } + + func testCombineLatest4Error_fromFirstMember() { + let one = Observable() + let two = Observable() + let three = Observable() + let four = Observable() + let subject = Observable.combineLatest(one, two, three, four) + + let exp = expectation(description: "combineLatest4 forwards error from observable") + subscription = subject.asPublisher() + .sink(receiveCompletion: { completion in + if case .failure = completion { + exp.fulfill() + } + }, + receiveValue: { _ in }) + one.on(.error(TestError.test)) + waitForExpectations(timeout: 1) + } + + func testCombineLatest4Error_fromSecondMember() { + let one = Observable() + let two = Observable() + let three = Observable() + let four = Observable() + let subject = Observable.combineLatest(one, two, three, four) + + let exp = expectation(description: "combineLatest4 forwards error from observable") + subscription = subject.asPublisher() + .sink(receiveCompletion: { completion in + if case .failure = completion { + exp.fulfill() + } + }, + receiveValue: { _ in }) + two.on(.error(TestError.test)) + waitForExpectations(timeout: 1) + } + + func testCombineLatest4Error_fromThirdMember() { + let one = Observable() + let two = Observable() + let three = Observable() + let four = Observable() + let subject = Observable.combineLatest(one, two, three, four) + + let exp = expectation(description: "combineLatest4 forwards error from observable") + subscription = subject.asPublisher() + .sink(receiveCompletion: { completion in + if case .failure = completion { + exp.fulfill() + } + }, + receiveValue: { _ in }) + three.on(.error(TestError.test)) + waitForExpectations(timeout: 1) + } + + func testCombineLatest4Error_fromFourthMember() { + let one = Observable() + let two = Observable() + let three = Observable() + let four = Observable() + let subject = Observable.combineLatest(one, two, three, four) + + let exp = expectation(description: "combineLatest4 forwards error from observable") + subscription = subject.asPublisher() + .sink(receiveCompletion: { completion in + if case .failure = completion { + exp.fulfill() + } + }, + receiveValue: { _ in }) + four.on(.error(TestError.test)) + waitForExpectations(timeout: 1) + } + + func testCombineLatest4Done_whenAllDone() { + let obs1 = Observable() + let obs2 = Observable() + let obs3 = Observable() + let obs4 = Observable() + + var isDone = false + subscription = Observable.combineLatest(obs1, obs2, obs3, obs4) + .asPublisher() + .sink(receiveCompletion: { completion in + if case .finished = completion { + isDone = true + } + }, + receiveValue: { _ in }) + + obs1.on(.done) + XCTAssertFalse(isDone) + + obs2.on(.done) + XCTAssertFalse(isDone) + + obs3.on(.done) + XCTAssertFalse(isDone) + + obs4.on(.done) + XCTAssertTrue(isDone) + } + + func testCombineLatest4Optional() { + let one = Observable() + let two = Observable() + let three = Observable() + let four = Observable() + + var received = [(String, Int?, Double, String?)]() + let subject = Observable.combineLatest(one, two, three, four) + + subscription = subject.asPublisher() + .sink(receiveCompletion: { _ in }, + receiveValue: { received.append($0) }) + + one.on(.next("The string")) + XCTAssertTrue(received.isEmpty) + + two.on(.next(nil)) + XCTAssertTrue(received.isEmpty) + + three.on(.next(100.5)) + XCTAssertTrue(received.isEmpty) + + four.on(.next(nil)) + XCTAssertEqual(received[0].0, "The string") + XCTAssertEqual(received[0].1, nil) + XCTAssertEqual(received[0].2, 100.5) + XCTAssertEqual(received[0].3, nil) + } + + func testObservableMap() { + let observable = Observable() + let subject = observable.map { "Number: \($0)" } + var received = [String]() + + subscription = subject.asPublisher() + .sink(receiveCompletion: { _ in }, + receiveValue: { received.append($0) }) + + observable.on(.next(1)) + observable.on(.next(10)) + + XCTAssertEqual(received, ["Number: 1", "Number: 10"]) + } + + func testObservableMapError() { + let observable = Observable() + let subject = observable.map { "Number: \($0)" } + + let exp = expectation(description: "observable map forwards error") + subscription = subject.asPublisher() + .sink(receiveCompletion: { completion in + if case .failure = completion { + exp.fulfill() + } + }, + receiveValue: { _ in }) + + observable.on(.error(TestError.test)) + + waitForExpectations(timeout: 1) + } + + func testObservableMapDone() { + let observable = Observable() + let subject = observable.map { "Number: \($0)" } + + let exp = expectation(description: "observable map forwards done") + subscription = subject.asPublisher() + .sink(receiveCompletion: { completion in + if case .finished = completion { + exp.fulfill() + } + }, + receiveValue: { _ in }) + + observable.on(.done) + + waitForExpectations(timeout: 1) + } + + func testObservableFilter() { + let observable = Observable() + let subject = observable.filter { $0 % 2 == 0 } + var received = [Int]() + + subscription = subject.asPublisher() + .sink(receiveCompletion: { _ in }, + receiveValue: { received.append($0) }) + + observable.on(.next(1)) + observable.on(.next(2)) + observable.on(.next(8)) + observable.on(.next(5)) + + XCTAssertEqual(received, [2, 8]) + } + + func testObservableFilterError() { + let observable = Observable() + let subject = observable.filter { $0 % 2 == 0 } + + let exp = expectation(description: "observable filter forwards error") + subscription = subject.asPublisher() + .sink(receiveCompletion: { completion in + if case .failure = completion { + exp.fulfill() + } + }, + receiveValue: { _ in }) + + observable.on(.error(TestError.test)) + + waitForExpectations(timeout: 1) + } + + func testObservableFilterDone() { + let observable = Observable() + let subject = observable.filter { $0 % 2 == 0 } + + let exp = expectation(description: "observable filter forwards done") + subscription = subject.asPublisher() + .sink(receiveCompletion: { completion in + if case .finished = completion { + exp.fulfill() + } + }, + receiveValue: { _ in }) + + observable.on(.done) + + waitForExpectations(timeout: 1) + } + + func testObservableFlatMap() { + let fetchTrigger = Observable() + let subject = fetchTrigger.flatMap { Variable(100).asObservable() } + var received = [Int]() + + subscription = subject.asPublisher() + .sink(receiveCompletion: { _ in }, + receiveValue: { received.append($0) }) + fetchTrigger.on(.next(())) + + XCTAssertEqual(received, [100]) + } + + func testObservableFlatMapError() { + let fetchTrigger = Observable() + let subject = fetchTrigger.flatMap { Variable(100).asObservable() } + + let exp = expectation(description: "observable flatMap forwards error") + subscription = subject.asPublisher() + .sink(receiveCompletion: { completion in + if case .failure = completion { + exp.fulfill() + } + }, + receiveValue: { _ in }) + fetchTrigger.on(.error(TestError.test)) + + waitForExpectations(timeout: 1) + } + + func testObservableFlatMapDone() { + let fetchTrigger = Observable() + let subject = fetchTrigger.flatMap { Variable(100).asObservable() } + + let exp = expectation(description: "observable flatMap forwards done") + subscription = subject.asPublisher() + .sink(receiveCompletion: { completion in + if case .finished = completion { + exp.fulfill() + } + }, + receiveValue: { _ in }) + fetchTrigger.on(.done) + + waitForExpectations(timeout: 1) + } + + func testZipNonOptional() { + var received: [String] = [] + + let string = Observable() + let int = Observable() + + let subject = Observable.zip(string, int) + + subscription = subject.asPublisher() + .sink(receiveCompletion: { _ in }, + receiveValue: { string, int in + received.append("\(string): \(int)") + }) + + string.on(.next("The value")) + string.on(.next("The number")) + int.on(.next(1)) + int.on(.next(2)) + string.on(.next("The digit")) + int.on(.next(3)) + int.on(.done) + + XCTAssertEqual(received.count, 3) + XCTAssertEqual(received[0], "The value: 1") + XCTAssertEqual(received[1], "The number: 2") + XCTAssertEqual(received[2], "The digit: 3") + } + + func testZipOptional() { + var received: [String] = [] + + let string = Observable() + let int = Observable() + + let subject = Observable.zip(string, int) + + subscription = subject.asPublisher() + .sink(receiveCompletion: { _ in }, + receiveValue: { string, int in + received.append("\(string ?? ""): \(int ?? 0)") + }) + + string.on(.next("The value")) + string.on(.next("The number")) + int.on(.next(1)) + int.on(.next(nil)) + string.on(.next(nil)) + int.on(.next(3)) + string.on(.done) + + XCTAssertEqual(received.count, 3) + XCTAssertEqual(received[0], "The value: 1") + XCTAssertEqual(received[1], "The number: 0") + XCTAssertEqual(received[2], ": 3") + } + + func testZipCountEqualToSourceWithFewestEmissions() { + var received: [String] = [] + + let string = Observable() + let int = Observable() + + let subject = Observable.zip(string, int) + + subscription = subject.asPublisher() + .sink(receiveCompletion: { _ in }, + receiveValue: { string, int in + received.append("\(string): \(int)") + }) + + string.on(.next("The value")) + string.on(.next("The number")) + int.on(.next(1)) + int.on(.next(2)) + string.on(.next("The digit")) + int.on(.next(3)) + int.on(.next(4)) + int.on(.next(5)) + int.on(.next(6)) + int.on(.done) + + XCTAssertEqual(received.count, 3) + XCTAssertEqual(received[0], "The value: 1") + XCTAssertEqual(received[1], "The number: 2") + XCTAssertEqual(received[2], "The digit: 3") + } + + func testZipError() { + let one = Observable() + let two = Observable() + + let subject = Observable.zip(one, two) + + let exp = expectation(description: "zip forwards error from observable") + subscription = subject.asPublisher() + .sink(receiveCompletion: { completion in + if case .failure = completion { + exp.fulfill() + } + }, + receiveValue: { _ in }) + one.on(.error(TestError.test)) + + waitForExpectations(timeout: 1) + } + + func testZipDone() { + let one = Observable() + let two = Observable() + + var isDone = false + subscription = Observable.zip(one, two) + .asPublisher() + .sink(receiveCompletion: { completion in + if case .finished = completion { + isDone = true + } + }, + receiveValue: { _ in }) + + one.on(.done) + XCTAssertTrue(isDone) + } +} +#endif From 39c579115ea9f578eab6272102539d8498bf75d9 Mon Sep 17 00:00:00 2001 From: Thomas Bajis Date: Mon, 17 May 2021 12:23:45 -0400 Subject: [PATCH 03/13] add DemandBuffer implementation --- Snail.xcodeproj/project.pbxproj | 4 ++ Snail/Snail+Combine/DemandBuffer.swift | 76 +++++++++++++++++++++ Snail/Snail+Combine/SnailSubscription.swift | 20 ++++-- 3 files changed, 93 insertions(+), 7 deletions(-) create mode 100644 Snail/Snail+Combine/DemandBuffer.swift diff --git a/Snail.xcodeproj/project.pbxproj b/Snail.xcodeproj/project.pbxproj index 861d3ed..56a7925 100644 --- a/Snail.xcodeproj/project.pbxproj +++ b/Snail.xcodeproj/project.pbxproj @@ -19,6 +19,7 @@ 2E53BDEC264ECC940030B9FB /* Observable+Combine.swift in Sources */ = {isa = PBXBuildFile; fileRef = 2E53BDEB264ECC940030B9FB /* Observable+Combine.swift */; }; 2E53BDEE264ED4C70030B9FB /* SnailSubscription.swift in Sources */ = {isa = PBXBuildFile; fileRef = 2E53BDED264ED4C70030B9FB /* SnailSubscription.swift */; }; 2E53BE072651F2990030B9FB /* ObservableAsPublisherTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 2E53BE062651F2990030B9FB /* ObservableAsPublisherTests.swift */; }; + 2E53BE092652B6D60030B9FB /* DemandBuffer.swift in Sources */ = {isa = PBXBuildFile; fileRef = 2E53BE082652B6D60030B9FB /* DemandBuffer.swift */; }; CB2936771DFE151B00792E6B /* Just.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB2936761DFE151B00792E6B /* Just.swift */; }; CB2936791DFEF77500792E6B /* JustTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB2936781DFEF77500792E6B /* JustTests.swift */; }; CBE54A7A1E5A16AC00971F74 /* Subscriber.swift in Sources */ = {isa = PBXBuildFile; fileRef = CBE54A791E5A16AC00971F74 /* Subscriber.swift */; }; @@ -65,6 +66,7 @@ 2E53BDEB264ECC940030B9FB /* Observable+Combine.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "Observable+Combine.swift"; sourceTree = ""; }; 2E53BDED264ED4C70030B9FB /* SnailSubscription.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = SnailSubscription.swift; sourceTree = ""; }; 2E53BE062651F2990030B9FB /* ObservableAsPublisherTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ObservableAsPublisherTests.swift; sourceTree = ""; }; + 2E53BE082652B6D60030B9FB /* DemandBuffer.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = DemandBuffer.swift; sourceTree = ""; }; CB2936761DFE151B00792E6B /* Just.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Just.swift; sourceTree = ""; }; CB2936781DFEF77500792E6B /* JustTests.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = JustTests.swift; sourceTree = ""; }; CBE54A791E5A16AC00971F74 /* Subscriber.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Subscriber.swift; sourceTree = ""; }; @@ -123,6 +125,7 @@ children = ( 2E53BDEB264ECC940030B9FB /* Observable+Combine.swift */, 2E53BDED264ED4C70030B9FB /* SnailSubscription.swift */, + 2E53BE082652B6D60030B9FB /* DemandBuffer.swift */, ); path = "Snail+Combine"; sourceTree = ""; @@ -351,6 +354,7 @@ CB2936771DFE151B00792E6B /* Just.swift in Sources */, 241F15581E03124600DD70A2 /* UIBarButtonItemExtensions.swift in Sources */, 24CF1FA81EF875A400F34234 /* URLSessionExtensions.swift in Sources */, + 2E53BE092652B6D60030B9FB /* DemandBuffer.swift in Sources */, 2408FA902112A15900B9F59E /* Scheduler.swift in Sources */, CBE54E601DFB39510008DD64 /* Event.swift in Sources */, 2E53BDEE264ED4C70030B9FB /* SnailSubscription.swift in Sources */, diff --git a/Snail/Snail+Combine/DemandBuffer.swift b/Snail/Snail+Combine/DemandBuffer.swift new file mode 100644 index 0000000..ba189fb --- /dev/null +++ b/Snail/Snail+Combine/DemandBuffer.swift @@ -0,0 +1,76 @@ +// Copyright © 2021 Compass. All rights reserved. + +import Combine +import Foundation + +@available(iOS 13.0, *) +class DemandBuffer { + private struct Demand { + var processed: Subscribers.Demand = .none + var requested: Subscribers.Demand = .none + var sent: Subscribers.Demand = .none + } + + private let lock = NSRecursiveLock() + private var buffer: [S.Input] = [] + private let subscriber: S + private var completion: Subscribers.Completion? + private var demandState = Demand() + + init(subscriber: S) { + self.subscriber = subscriber + } + + func buffer(value: S.Input) -> Subscribers.Demand { + precondition(self.completion == nil, + "Completed publisher should not be able to send values") + + switch demandState.requested { + case .unlimited: + return subscriber.receive(value) + default: + buffer.append(value) + return flush() + } + } + + func complete(completion: Subscribers.Completion) { + precondition(self.completion == nil, + "Completion should not be completed at this point") + + self.completion = completion + _ = flush() + } + + func demand(_ demand: Subscribers.Demand) -> Subscribers.Demand { + flush(adding: demand) + } + + private func flush(adding newDemand: Subscribers.Demand? = nil) -> Subscribers.Demand { + lock.lock() + defer { lock.unlock() } + + if let newDemand = newDemand { + demandState.requested += newDemand + } + + guard demandState.requested > 0 || newDemand == Subscribers.Demand.none else { return .none } + + while !buffer.isEmpty && demandState.processed < demandState.requested { + demandState.requested += subscriber.receive(buffer.remove(at: 0)) + demandState.processed += 1 + } + + if let completion = completion { + buffer = [] + demandState = .init() + self.completion = nil + subscriber.receive(completion: completion) + return .none + } + + let sentDemand = demandState.requested - demandState.sent + demandState.sent += sentDemand + return sentDemand + } +} diff --git a/Snail/Snail+Combine/SnailSubscription.swift b/Snail/Snail+Combine/SnailSubscription.swift index 7058040..6c874e7 100644 --- a/Snail/Snail+Combine/SnailSubscription.swift +++ b/Snail/Snail+Combine/SnailSubscription.swift @@ -7,23 +7,29 @@ import Foundation @available(iOS 13.0, *) class SnailSubscription: Combine.Subscription where Downstream.Input == Upstream.T, Downstream.Failure == Error { private var disposable: Subscriber? + private let buffer: DemandBuffer init(upstream: Upstream, downstream: Downstream) { + buffer = DemandBuffer(subscriber: downstream) disposable = upstream.subscribe(queue: nil, - onNext: { value in - _ = downstream.receive(value) + onNext: { [weak self] value in + guard let self = self else { return } + + _ = self.buffer.buffer(value: value) }, - onError: { error in - downstream.receive(completion: .failure(error)) + onError: { [weak self] error in + guard let self = self else { return } + self.buffer.complete(completion: .failure(error)) }, - onDone: { - downstream.receive(completion: .finished) + onDone: { [weak self] in + guard let self = self else { return } + self.buffer.complete(completion: .finished) }) } func request(_ demand: Subscribers.Demand) { - // For now, not supporting changing any kind of demand + _ = self.buffer.demand(demand) } func cancel() { From 61e5dea8df4160d6a79ce89b439728024c45f052 Mon Sep 17 00:00:00 2001 From: Thomas Bajis Date: Mon, 17 May 2021 12:24:18 -0400 Subject: [PATCH 04/13] fix spacing --- Snail/Snail+Combine/SnailSubscription.swift | 1 - 1 file changed, 1 deletion(-) diff --git a/Snail/Snail+Combine/SnailSubscription.swift b/Snail/Snail+Combine/SnailSubscription.swift index 6c874e7..39d295a 100644 --- a/Snail/Snail+Combine/SnailSubscription.swift +++ b/Snail/Snail+Combine/SnailSubscription.swift @@ -15,7 +15,6 @@ class SnailSubscription Date: Mon, 17 May 2021 13:06:42 -0400 Subject: [PATCH 05/13] Add remaining test cases for all different types of Observables --- Snail.xcodeproj/project.pbxproj | 20 ++++ SnailTests/Combine/FailAsPublisherTests.swift | 53 +++++++++ SnailTests/Combine/JustAsPublisherTests.swift | 31 +++++ .../Combine/ObservableAsPublisherTests.swift | 2 - .../Combine/ReplayAsPublisherTests.swift | 37 ++++++ .../Combine/UniqueAsPublisherTests.swift | 94 +++++++++++++++ .../Combine/VariableAsPublisherTests.swift | 108 ++++++++++++++++++ 7 files changed, 343 insertions(+), 2 deletions(-) create mode 100644 SnailTests/Combine/FailAsPublisherTests.swift create mode 100644 SnailTests/Combine/JustAsPublisherTests.swift create mode 100644 SnailTests/Combine/ReplayAsPublisherTests.swift create mode 100644 SnailTests/Combine/UniqueAsPublisherTests.swift create mode 100644 SnailTests/Combine/VariableAsPublisherTests.swift diff --git a/Snail.xcodeproj/project.pbxproj b/Snail.xcodeproj/project.pbxproj index 56a7925..4e7cef2 100644 --- a/Snail.xcodeproj/project.pbxproj +++ b/Snail.xcodeproj/project.pbxproj @@ -20,6 +20,11 @@ 2E53BDEE264ED4C70030B9FB /* SnailSubscription.swift in Sources */ = {isa = PBXBuildFile; fileRef = 2E53BDED264ED4C70030B9FB /* SnailSubscription.swift */; }; 2E53BE072651F2990030B9FB /* ObservableAsPublisherTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 2E53BE062651F2990030B9FB /* ObservableAsPublisherTests.swift */; }; 2E53BE092652B6D60030B9FB /* DemandBuffer.swift in Sources */ = {isa = PBXBuildFile; fileRef = 2E53BE082652B6D60030B9FB /* DemandBuffer.swift */; }; + 2E53BE0B2652D0960030B9FB /* FailAsPublisherTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 2E53BE0A2652D0960030B9FB /* FailAsPublisherTests.swift */; }; + 2E53BE0D2652D37D0030B9FB /* JustAsPublisherTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 2E53BE0C2652D37D0030B9FB /* JustAsPublisherTests.swift */; }; + 2E53BE0F2652D4E50030B9FB /* ReplayAsPublisherTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 2E53BE0E2652D4E50030B9FB /* ReplayAsPublisherTests.swift */; }; + 2E53BE112652D7370030B9FB /* UniqueAsPublisherTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 2E53BE102652D7370030B9FB /* UniqueAsPublisherTests.swift */; }; + 2E53BE132652D8B80030B9FB /* VariableAsPublisherTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 2E53BE122652D8B80030B9FB /* VariableAsPublisherTests.swift */; }; CB2936771DFE151B00792E6B /* Just.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB2936761DFE151B00792E6B /* Just.swift */; }; CB2936791DFEF77500792E6B /* JustTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB2936781DFEF77500792E6B /* JustTests.swift */; }; CBE54A7A1E5A16AC00971F74 /* Subscriber.swift in Sources */ = {isa = PBXBuildFile; fileRef = CBE54A791E5A16AC00971F74 /* Subscriber.swift */; }; @@ -67,6 +72,11 @@ 2E53BDED264ED4C70030B9FB /* SnailSubscription.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = SnailSubscription.swift; sourceTree = ""; }; 2E53BE062651F2990030B9FB /* ObservableAsPublisherTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ObservableAsPublisherTests.swift; sourceTree = ""; }; 2E53BE082652B6D60030B9FB /* DemandBuffer.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = DemandBuffer.swift; sourceTree = ""; }; + 2E53BE0A2652D0960030B9FB /* FailAsPublisherTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = FailAsPublisherTests.swift; sourceTree = ""; }; + 2E53BE0C2652D37D0030B9FB /* JustAsPublisherTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = JustAsPublisherTests.swift; sourceTree = ""; }; + 2E53BE0E2652D4E50030B9FB /* ReplayAsPublisherTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ReplayAsPublisherTests.swift; sourceTree = ""; }; + 2E53BE102652D7370030B9FB /* UniqueAsPublisherTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = UniqueAsPublisherTests.swift; sourceTree = ""; }; + 2E53BE122652D8B80030B9FB /* VariableAsPublisherTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = VariableAsPublisherTests.swift; sourceTree = ""; }; CB2936761DFE151B00792E6B /* Just.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Just.swift; sourceTree = ""; }; CB2936781DFEF77500792E6B /* JustTests.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = JustTests.swift; sourceTree = ""; }; CBE54A791E5A16AC00971F74 /* Subscriber.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Subscriber.swift; sourceTree = ""; }; @@ -134,6 +144,11 @@ isa = PBXGroup; children = ( 2E53BE062651F2990030B9FB /* ObservableAsPublisherTests.swift */, + 2E53BE0A2652D0960030B9FB /* FailAsPublisherTests.swift */, + 2E53BE0C2652D37D0030B9FB /* JustAsPublisherTests.swift */, + 2E53BE0E2652D4E50030B9FB /* ReplayAsPublisherTests.swift */, + 2E53BE102652D7370030B9FB /* UniqueAsPublisherTests.swift */, + 2E53BE122652D8B80030B9FB /* VariableAsPublisherTests.swift */, ); path = Combine; sourceTree = ""; @@ -372,12 +387,17 @@ buildActionMask = 2147483647; files = ( CB2936791DFEF77500792E6B /* JustTests.swift in Sources */, + 2E53BE132652D8B80030B9FB /* VariableAsPublisherTests.swift in Sources */, + 2E53BE0F2652D4E50030B9FB /* ReplayAsPublisherTests.swift in Sources */, CBE54E651DFB395A0008DD64 /* ObservableTests.swift in Sources */, + 2E53BE0B2652D0960030B9FB /* FailAsPublisherTests.swift in Sources */, F5C973A822F4B359003DB42C /* DisposerTests.swift in Sources */, 24FABD5C1DFF0BAF005CF84E /* ReplayTests.swift in Sources */, DEF9B98D1FD5D8FD00F8514E /* UniqueTests.swift in Sources */, + 2E53BE112652D7370030B9FB /* UniqueAsPublisherTests.swift in Sources */, F569538A232046AA00D35C80 /* ClosureTests.swift in Sources */, 2E53BE072651F2990030B9FB /* ObservableAsPublisherTests.swift in Sources */, + 2E53BE0D2652D37D0030B9FB /* JustAsPublisherTests.swift in Sources */, CBE54E671DFB4F3F0008DD64 /* VariableTests.swift in Sources */, 24FABD581DFEF7EC005CF84E /* FailTests.swift in Sources */, DEEDA8EB2051BCB4000FE578 /* NotificationCenterExtensions.swift in Sources */, diff --git a/SnailTests/Combine/FailAsPublisherTests.swift b/SnailTests/Combine/FailAsPublisherTests.swift new file mode 100644 index 0000000..5a66352 --- /dev/null +++ b/SnailTests/Combine/FailAsPublisherTests.swift @@ -0,0 +1,53 @@ +// Copyright © 2021 Compass. All rights reserved. + +import Combine +import Foundation +@testable import Snail +import XCTest + +@available(iOS 13.0, *) +class FailAsPublisherTests: XCTestCase { + enum TestError: Error { + case test + } + + private var subject: Observable! + private var strings: [String]! + private var error: Error? + private var subscription: AnyCancellable? + + override func setUp() { + super.setUp() + subject = Fail(TestError.test) + strings = [] + error = nil + } + + override func tearDown() { + subject = nil + strings = nil + error = nil + subscription = nil + } + + func testOnErrorIsRun() { + subscription = subject.asPublisher() + .sink(receiveCompletion: { [unowned self] completion in + if case let .failure(underlying) = completion { + self.error = underlying as? TestError + } + }, + receiveValue: { _ in }) + + XCTAssertEqual((error as? TestError), TestError.test) + } + + func testOnNextIsNotRun() { + subscription = subject.asPublisher() + .sink(receiveCompletion: { _ in }, + receiveValue: { [unowned self] in strings.append($0) }) + subject?.on(.next("1")) + + XCTAssertEqual(strings?.count, 0) + } +} diff --git a/SnailTests/Combine/JustAsPublisherTests.swift b/SnailTests/Combine/JustAsPublisherTests.swift new file mode 100644 index 0000000..e880a2c --- /dev/null +++ b/SnailTests/Combine/JustAsPublisherTests.swift @@ -0,0 +1,31 @@ +// Copyright © 2021 Compass. All rights reserved. + +import Combine +import Foundation +@testable import Snail +import XCTest + +@available(iOS 13.0, *) +class JustAsPublisherTests: XCTestCase { + private var subscription: AnyCancellable? + + override func tearDown() { + subscription = nil + super.tearDown() + } + + func testJust() { + var result: Int? + var done = false + + subscription = Just(1).asPublisher() + .sink(receiveCompletion: { completion in + if case .finished = completion { + done = true + } + }, + receiveValue: { result = $0 }) + XCTAssertEqual(1, result) + XCTAssertTrue(done) + } +} diff --git a/SnailTests/Combine/ObservableAsPublisherTests.swift b/SnailTests/Combine/ObservableAsPublisherTests.swift index 3e24ec2..c28aaa1 100644 --- a/SnailTests/Combine/ObservableAsPublisherTests.swift +++ b/SnailTests/Combine/ObservableAsPublisherTests.swift @@ -2,7 +2,6 @@ // swiftlint:disable file_length -#if canImport(Combine) import Combine import Foundation @testable import Snail @@ -1129,4 +1128,3 @@ class ObservableAsPublisherTests: XCTestCase { XCTAssertTrue(isDone) } } -#endif diff --git a/SnailTests/Combine/ReplayAsPublisherTests.swift b/SnailTests/Combine/ReplayAsPublisherTests.swift new file mode 100644 index 0000000..4ae5bf1 --- /dev/null +++ b/SnailTests/Combine/ReplayAsPublisherTests.swift @@ -0,0 +1,37 @@ +// Copyright © 2021 Compass. All rights reserved. + +import Combine +import Foundation +@testable import Snail +import XCTest + +@available(iOS 13.0, *) +class ReplayAsPublisherTests: XCTestCase { + private var subject: Replay! + private var subscription: AnyCancellable? + + override func setUp() { + super.setUp() + subject = Replay(2) + } + + override func tearDown() { + subject = nil + subscription = nil + super.tearDown() + } + + func testReplay() { + var strings: [String] = [] + subject?.on(.next("1")) + subject?.on(.next("2")) + subject?.on(.done) + + subscription = subject.asPublisher() + .sink(receiveCompletion: { _ in }, + receiveValue: { strings.append($0) }) + + XCTAssertEqual(strings[0], "1") + XCTAssertEqual(strings[1], "2") + } +} diff --git a/SnailTests/Combine/UniqueAsPublisherTests.swift b/SnailTests/Combine/UniqueAsPublisherTests.swift new file mode 100644 index 0000000..b1955d2 --- /dev/null +++ b/SnailTests/Combine/UniqueAsPublisherTests.swift @@ -0,0 +1,94 @@ +// Copyright © 2021 Compass. All rights reserved. + +import Combine +import Foundation +@testable import Snail +import XCTest + +@available(iOS 13.0, *) +class UniqueAsPublisherTests: XCTestCase { + private var subscription: AnyCancellable? + + override func tearDown() { + subscription = nil + } + + func testVariableChanges() { + var events: [String?] = [] + let subject = Unique(nil) + subscription = subject.asObservable() + .asPublisher() + .sink(receiveCompletion: { _ in }, + receiveValue: { events.append($0) }) + + subject.value = nil + subject.value = "1" + subject.value = "2" + subject.value = "2" + subject.value = "2" + XCTAssertEqual(events[0], nil) + XCTAssertEqual(events[1], "1") + XCTAssertEqual(events[2], "2") + XCTAssertEqual(subject.value, "2") + XCTAssertEqual(events.count, 3) + } + + func testVariableNotifiesOnSubscribe() { + let subject = Unique("initial") + subject.value = "new" + var result: String? + + subscription = subject.asObservable() + .asPublisher() + .sink(receiveCompletion: { _ in }, + receiveValue: { result = $0 }) + + XCTAssertEqual("new", result) + } + + func testVariableNotifiesInitialOnSubscribe() { + let subject = Unique("initial") + var result: String? + + subscription = subject.asObservable() + .asPublisher() + .sink(receiveCompletion: { _ in }, + receiveValue: { result = $0 }) + + XCTAssertEqual("initial", result) + } + + func testVariableHandlesEquatableArrays() { + var events: [[String]] = [] + let subject = Unique<[String]>(["1", "2"]) + subscription = subject.asObservable() + .asPublisher() + .sink(receiveCompletion: { _ in }, + receiveValue: { events.append($0) }) + + subject.value = ["1", "2"] + subject.value = ["2", "1"] + subject.value = ["2", "1"] + subject.value = ["1", "2"] + XCTAssertEqual(events[0], ["1", "2"]) + XCTAssertEqual(events[1], ["2", "1"]) + XCTAssertEqual(events[2], ["1", "2"]) + } + + func testVariableHandlesOptionalArrays() { + var events: [[String]?] = [] + let subject = Unique<[String]?>(nil) + subscription = subject.asObservable() + .asPublisher() + .sink(receiveCompletion: { _ in }, + receiveValue: { events.append($0) }) + subject.value = ["1", "2"] + subject.value = nil + subject.value = nil + subject.value = ["1", "2"] + XCTAssertEqual(events[0], nil) + XCTAssertEqual(events[1], ["1", "2"]) + XCTAssertEqual(events[2], nil) + XCTAssertEqual(events[3], ["1", "2"]) + } +} diff --git a/SnailTests/Combine/VariableAsPublisherTests.swift b/SnailTests/Combine/VariableAsPublisherTests.swift new file mode 100644 index 0000000..3f14e62 --- /dev/null +++ b/SnailTests/Combine/VariableAsPublisherTests.swift @@ -0,0 +1,108 @@ +// Copyright © 2021 Compass. All rights reserved. + +import Combine +import Foundation +@testable import Snail +import XCTest + +@available(iOS 13.0, *) +class VariableAsPublisherTests: XCTestCase { + private var subscription: AnyCancellable? + + override func tearDown() { + subscription = nil + } + + func testVariableChanges() { + var events: [String?] = [] + let subject = Variable(nil) + subscription = subject.asObservable() + .asPublisher() + .sink(receiveCompletion: { _ in }, + receiveValue: { events.append($0) }) + subject.value = "1" + subject.value = "2" + XCTAssertEqual(events[0], nil) + XCTAssertEqual(events[1], "1") + XCTAssertEqual(events[2], "2") + XCTAssertEqual(subject.value, "2") + } + + func testVariableNotifiesOnSubscribe() { + let subject = Variable("initial") + subject.value = "new" + var result: String? + + subscription = subject.asObservable() + .asPublisher() + .sink(receiveCompletion: { _ in }, + receiveValue: { result = $0 }) + + XCTAssertEqual("new", result) + } + + func testVariableNotifiesInitialOnSubscribe() { + let subject = Variable("initial") + var result: String? + + subscription = subject.asObservable() + .asPublisher() + .sink(receiveCompletion: { _ in }, + receiveValue: { result = $0 }) + + XCTAssertEqual("initial", result) + } + + func testMappedVariableNotifiesOnSubscribe() { + let subject = Variable("initial") + subject.value = "new" + var subjectCharactersCount: Int? + + subscription = subject.asObservable() + .asPublisher() + .sink(receiveCompletion: { _ in }, + receiveValue: { subjectCharactersCount = $0.count }) + + XCTAssertEqual(subject.value.count, subjectCharactersCount) + } + + func testMappedVariableNotifiesInitialOnSubscribe() { + let subject = Variable("initial") + var subjectCharactersCount: Int? + + subscription = subject.asObservable() + .asPublisher() + .sink(receiveCompletion: { _ in }, + receiveValue: { subjectCharactersCount = $0.count }) + + XCTAssertEqual(subject.value.count, subjectCharactersCount) + } + + func testUniqueFireCounts() { + let subject = Unique("sameValue") + var firedCount = 0 + + subscription = subject.asObservable() + .asPublisher() + .sink(receiveCompletion: { _ in }, + receiveValue: { _ in firedCount += 1 }) + + subject.value = "sameValue" + + XCTAssertEqual(firedCount, 1) + } + + func testVariableFireCounts() { + let subject = Variable("sameValue") + var firedCount = 0 + + subscription = subject.asObservable() + .asPublisher() + .sink(receiveCompletion: { _ in }, + receiveValue: { _ in firedCount += 1 }) + + subject.value = "sameValue" + + XCTAssertEqual(firedCount, 2) + } +} From ed61a49b4242ddb29a588c927fe0f580b868fad8 Mon Sep 17 00:00:00 2001 From: Thomas Bajis Date: Mon, 17 May 2021 13:17:59 -0400 Subject: [PATCH 06/13] remove canImports for now --- Snail/Snail+Combine/Observable+Combine.swift | 2 -- Snail/Snail+Combine/SnailSubscription.swift | 2 -- 2 files changed, 4 deletions(-) diff --git a/Snail/Snail+Combine/Observable+Combine.swift b/Snail/Snail+Combine/Observable+Combine.swift index 2215395..0753a3b 100644 --- a/Snail/Snail+Combine/Observable+Combine.swift +++ b/Snail/Snail+Combine/Observable+Combine.swift @@ -1,6 +1,5 @@ // Copyright © 2021 Compass. All rights reserved. -#if canImport(Combine) import Combine import Foundation @@ -31,4 +30,3 @@ public class SnailPublisher: Publisher { downstream: subscriber)) } } -#endif diff --git a/Snail/Snail+Combine/SnailSubscription.swift b/Snail/Snail+Combine/SnailSubscription.swift index 39d295a..5be3ec3 100644 --- a/Snail/Snail+Combine/SnailSubscription.swift +++ b/Snail/Snail+Combine/SnailSubscription.swift @@ -1,6 +1,5 @@ // Copyright © 2021 Compass. All rights reserved. -#if canImport(Combine) import Combine import Foundation @@ -36,4 +35,3 @@ class SnailSubscription Date: Tue, 18 May 2021 18:08:10 -0400 Subject: [PATCH 07/13] Update tests to use subscription set --- SnailTests/Combine/FailAsPublisherTests.swift | 13 +- SnailTests/Combine/JustAsPublisherTests.swift | 12 +- .../Combine/ObservableAsPublisherTests.swift | 251 ++++++++++++++---- .../Combine/ReplayAsPublisherTests.swift | 8 +- .../Combine/UniqueAsPublisherTests.swift | 23 +- .../Combine/VariableAsPublisherTests.swift | 29 +- 6 files changed, 251 insertions(+), 85 deletions(-) diff --git a/SnailTests/Combine/FailAsPublisherTests.swift b/SnailTests/Combine/FailAsPublisherTests.swift index 5a66352..0f17687 100644 --- a/SnailTests/Combine/FailAsPublisherTests.swift +++ b/SnailTests/Combine/FailAsPublisherTests.swift @@ -14,12 +14,13 @@ class FailAsPublisherTests: XCTestCase { private var subject: Observable! private var strings: [String]! private var error: Error? - private var subscription: AnyCancellable? + private var subscriptions: Set! override func setUp() { super.setUp() subject = Fail(TestError.test) strings = [] + subscriptions = Set() error = nil } @@ -27,25 +28,27 @@ class FailAsPublisherTests: XCTestCase { subject = nil strings = nil error = nil - subscription = nil + subscriptions = nil } func testOnErrorIsRun() { - subscription = subject.asPublisher() + subject.asPublisher() .sink(receiveCompletion: { [unowned self] completion in if case let .failure(underlying) = completion { self.error = underlying as? TestError } }, receiveValue: { _ in }) + .store(in: &subscriptions) XCTAssertEqual((error as? TestError), TestError.test) } func testOnNextIsNotRun() { - subscription = subject.asPublisher() + subject.asPublisher() .sink(receiveCompletion: { _ in }, - receiveValue: { [unowned self] in strings.append($0) }) + receiveValue: { [unowned self] in self.strings.append($0) }) + .store(in: &subscriptions) subject?.on(.next("1")) XCTAssertEqual(strings?.count, 0) diff --git a/SnailTests/Combine/JustAsPublisherTests.swift b/SnailTests/Combine/JustAsPublisherTests.swift index e880a2c..c411180 100644 --- a/SnailTests/Combine/JustAsPublisherTests.swift +++ b/SnailTests/Combine/JustAsPublisherTests.swift @@ -7,10 +7,14 @@ import XCTest @available(iOS 13.0, *) class JustAsPublisherTests: XCTestCase { - private var subscription: AnyCancellable? + private var subscriptions: Set! + + override func setUp() { + subscriptions = Set() + } override func tearDown() { - subscription = nil + subscriptions = nil super.tearDown() } @@ -18,13 +22,15 @@ class JustAsPublisherTests: XCTestCase { var result: Int? var done = false - subscription = Just(1).asPublisher() + Just(1).asPublisher() .sink(receiveCompletion: { completion in if case .finished = completion { done = true } }, receiveValue: { result = $0 }) + .store(in: &subscriptions) + XCTAssertEqual(1, result) XCTAssertTrue(done) } diff --git a/SnailTests/Combine/ObservableAsPublisherTests.swift b/SnailTests/Combine/ObservableAsPublisherTests.swift index c28aaa1..f22b85c 100644 --- a/SnailTests/Combine/ObservableAsPublisherTests.swift +++ b/SnailTests/Combine/ObservableAsPublisherTests.swift @@ -19,12 +19,13 @@ class ObservableAsPublisherTests: XCTestCase { private var done: Bool? private var disposer: Disposer! - private var subscription: AnyCancellable? + private var subscriptions: Set! override func setUp() { super.setUp() subject = Observable() strings = [] + subscriptions = Set() error = nil done = nil disposer = Disposer() @@ -33,15 +34,15 @@ class ObservableAsPublisherTests: XCTestCase { override func tearDown() { subject = nil strings = nil + subscriptions = nil error = nil done = nil disposer = nil - subscription = nil super.tearDown() } func testNext() { - subscription = subject.asPublisher() + subject.asPublisher() .sink(receiveCompletion: { [unowned self] completion in switch completion { case .finished: @@ -51,6 +52,7 @@ class ObservableAsPublisherTests: XCTestCase { } }, receiveValue: { [unowned self] in self.strings?.append($0) }) + .store(in: &subscriptions) ["1", "2"].forEach { subject?.on(.next($0)) } @@ -58,7 +60,7 @@ class ObservableAsPublisherTests: XCTestCase { } func testOnDone() { - subscription = subject.asPublisher() + subject.asPublisher() .sink(receiveCompletion: { [unowned self] completion in switch completion { case .finished: @@ -68,6 +70,7 @@ class ObservableAsPublisherTests: XCTestCase { } }, receiveValue: { [unowned self] in self.strings?.append($0) }) + .store(in: &subscriptions) subject?.on(.next("1")) subject?.on(.next("2")) @@ -81,7 +84,7 @@ class ObservableAsPublisherTests: XCTestCase { } func testOnError() { - subscription = subject.asPublisher() + subject.asPublisher() .sink(receiveCompletion: { [unowned self] completion in switch completion { case .finished: @@ -91,6 +94,7 @@ class ObservableAsPublisherTests: XCTestCase { } }, receiveValue: { [unowned self] in self.strings?.append($0) }) + .store(in: &subscriptions) subject?.on(.next("1")) subject?.on(.next("2")) @@ -102,25 +106,110 @@ class ObservableAsPublisherTests: XCTestCase { XCTAssertEqual(error as? TestError, .test) } + func testMultipleSubscribers() { + var more: [String] = [] + subject.subscribe(onNext: { string in + more.append(string) + }).add(to: disposer) + subject.asPublisher() + .sink(receiveCompletion: { _ in }, + receiveValue: { [unowned self] string in + self.strings?.append(string) + } + ) + .store(in: &subscriptions) + + subject?.on(.next("1")) + XCTAssertEqual(strings?.first, more.first) + } + func testFiresStoppedEventOnSubscribeIfStopped() { subject?.on(.error(TestError.test)) var oldError: TestError? - subscription = subject.asPublisher() + subject.asPublisher() .sink(receiveCompletion: { completion in if case let .failure(underlying) = completion { oldError = underlying as? TestError } - }, receiveValue: { _ in }) + }, + receiveValue: { _ in }) + .store(in: &subscriptions) XCTAssertEqual(oldError, .test) } + func testSubscribeOnMainThread() { + var isMainQueue = false + let exp = expectation(description: "queue") + + DispatchQueue.global().async { + self.subject.asPublisher() + .receive(on: DispatchQueue.main) + .sink(receiveCompletion: { _ in }, + receiveValue: { _ in + exp.fulfill() + isMainQueue = Thread.isMainThread + }) + .store(in: &self.subscriptions) + self.subject?.on(.next("1")) + } + + waitForExpectations(timeout: 2) { error in + XCTAssertNil(error) + XCTAssertEqual(isMainQueue, true) + } + } + + func testOnMainThreadNotifiedOnMain() { + var isMainQueue = false + let exp = expectation(description: "queue") + + DispatchQueue.global().async { + self.subject.on(.main) + .asPublisher() + .sink(receiveCompletion: { _ in }, + receiveValue: { _ in + exp.fulfill() + isMainQueue = Thread.isMainThread + }) + .store(in: &self.subscriptions) + self.subject?.on(.next("1")) + } + + waitForExpectations(timeout: 2) { error in + XCTAssertNil(error) + XCTAssertEqual(isMainQueue, true) + } + } + + func testOnGlobalThreadNotifiedOnMain() { + var isMainQueue = false + let exp = expectation(description: "queue") + + self.subject.on(.global()) + .asPublisher() + .receive(on: DispatchQueue.main) + .sink(receiveCompletion: { _ in }, + receiveValue: { _ in + exp.fulfill() + isMainQueue = Thread.isMainThread + }) + .store(in: &self.subscriptions) + self.subject?.on(.next("1")) + + waitForExpectations(timeout: 2) { error in + XCTAssertNil(error) + XCTAssertEqual(isMainQueue, true) + } + } + func testRemovingSubscribers() { - subscription = subject.asPublisher() + subject.asPublisher() .sink(receiveCompletion: { _ in }, receiveValue: { [unowned self] str in self.strings.append(str) - }) + }) + .store(in: &subscriptions) subject?.on(.next("1")) subject?.removeSubscribers() subject?.on(.next("2")) @@ -134,11 +223,12 @@ class ObservableAsPublisherTests: XCTestCase { onError: { error in self.error = error }, onDone: { self.done = true }) - subscription = subject.asPublisher() + subject.asPublisher() .sink(receiveCompletion: { _ in }, receiveValue: { [unowned self] str in self.strings.append(str) }) + .store(in: &subscriptions) subject?.on(.next("1")) guard let subscriber = subscriberToRemove else { @@ -170,10 +260,11 @@ class ObservableAsPublisherTests: XCTestCase { exp.fulfill() } - subscription = observable.throttle(delay) + observable.throttle(delay) .asPublisher() .sink(receiveCompletion: { _ in }, receiveValue: { received.append($0) }) + .store(in: &subscriptions) observable.on(.next("1")) observable.on(.next("2")) @@ -198,10 +289,11 @@ class ObservableAsPublisherTests: XCTestCase { } } - subscription = observable.throttle(delay) + observable.throttle(delay) .asPublisher() .sink(receiveCompletion: { _ in }, receiveValue: { received.append($0) }) + .store(in: &subscriptions) observable.on(.next("1")) @@ -229,10 +321,11 @@ class ObservableAsPublisherTests: XCTestCase { } } - subscription = observable.debounce(delay) + observable.debounce(delay) .asPublisher() .sink(receiveCompletion: { _ in }, receiveValue: { received.append($0) }) + .store(in: &subscriptions) observable.on(.next("1")) @@ -246,10 +339,11 @@ class ObservableAsPublisherTests: XCTestCase { let observable = Observable() var received: [String] = [] - subscription = observable.skip(first: 2) + observable.skip(first: 2) .asPublisher() .sink(receiveCompletion: { _ in }, receiveValue: { received.append($0) }) + .store(in: &subscriptions) observable.on(.next("1")) observable.on(.next("2")) @@ -262,7 +356,7 @@ class ObservableAsPublisherTests: XCTestCase { let observable = Observable() var error: TestError? - subscription = observable.skip(first: 2) + observable.skip(first: 2) .asPublisher() .sink(receiveCompletion: { completion in if case let .failure(underlying) = completion { @@ -270,6 +364,7 @@ class ObservableAsPublisherTests: XCTestCase { } }, receiveValue: { _ in }) + .store(in: &subscriptions) observable.on(.error(TestError.test)) XCTAssertEqual(error, .test) @@ -279,7 +374,7 @@ class ObservableAsPublisherTests: XCTestCase { let observable = Observable() var done = false - subscription = observable.skip(first: 2) + observable.skip(first: 2) .asPublisher() .sink(receiveCompletion: { completion in if case .finished = completion { @@ -287,6 +382,7 @@ class ObservableAsPublisherTests: XCTestCase { } }, receiveValue: { _ in }) + .store(in: &subscriptions) observable.on(.done) @@ -297,10 +393,11 @@ class ObservableAsPublisherTests: XCTestCase { let observable = Observable() var received: [String] = [] - subscription = observable.take(first: 2) + observable.take(first: 2) .asPublisher() .sink(receiveCompletion: { _ in }, receiveValue: { received.append($0) }) + .store(in: &subscriptions) observable.on(.next("1")) observable.on(.next("2")) @@ -313,7 +410,7 @@ class ObservableAsPublisherTests: XCTestCase { let observable = Observable() var error: TestError? - subscription = observable.take(first: 2) + observable.take(first: 2) .asPublisher() .sink(receiveCompletion: { completion in if case let .failure(underlying) = completion { @@ -321,6 +418,7 @@ class ObservableAsPublisherTests: XCTestCase { } }, receiveValue: { _ in }) + .store(in: &subscriptions) observable.on(.error(TestError.test)) @@ -331,7 +429,7 @@ class ObservableAsPublisherTests: XCTestCase { let observable = Observable() var done = false - subscription = observable.take(first: 2) + observable.take(first: 2) .asPublisher() .sink(receiveCompletion: { completion in if case .finished = completion { @@ -339,6 +437,7 @@ class ObservableAsPublisherTests: XCTestCase { } }, receiveValue: { _ in }) + .store(in: &subscriptions) observable.on(.done) @@ -349,7 +448,7 @@ class ObservableAsPublisherTests: XCTestCase { let observable = Observable() var done = false - subscription = observable.take(first: 2) + observable.take(first: 2) .asPublisher() .sink(receiveCompletion: { completion in if case .finished = completion { @@ -357,6 +456,7 @@ class ObservableAsPublisherTests: XCTestCase { } }, receiveValue: { _ in }) + .store(in: &subscriptions) observable.on(.next("1")) observable.on(.next("2")) @@ -372,13 +472,14 @@ class ObservableAsPublisherTests: XCTestCase { let observable = Observable() observable.forward(to: subject) - subscription = subject.asPublisher() + subject.asPublisher() .sink(receiveCompletion: { completion in if case let .failure(underlying) = completion { receivedError = underlying as? TestError } }, receiveValue: { received.append($0) }) + .store(in: &subscriptions) observable.on(.next("1")) observable.on(.error(TestError.test)) @@ -395,9 +496,10 @@ class ObservableAsPublisherTests: XCTestCase { let observable = Observable() observable.forward(to: subject) - subscription = subject.asPublisher() + subject.asPublisher() .sink(receiveCompletion: { _ in }, - receiveValue: { received.append($0) }) + receiveValue: { received.append($0) }) + .store(in: &subscriptions) observable.on(.next("1")) observable.on(.done) @@ -414,9 +516,10 @@ class ObservableAsPublisherTests: XCTestCase { let subject = Observable.merge([a, b]) - subscription = subject.asPublisher() + subject.asPublisher() .sink(receiveCompletion: { _ in }, - receiveValue: { received.append($0) }) + receiveValue: { received.append($0) }) + .store(in: &subscriptions) a.on(.next("1")) b.on(.next("2")) @@ -435,9 +538,10 @@ class ObservableAsPublisherTests: XCTestCase { let subject = Observable.merge(a, b) - subscription = subject.asPublisher() + subject.asPublisher() .sink(receiveCompletion: { _ in }, - receiveValue: { received.append($0) }) + receiveValue: { received.append($0) }) + .store(in: &subscriptions) a.on(.next("1")) b.on(.next("2")) @@ -456,11 +560,12 @@ class ObservableAsPublisherTests: XCTestCase { let subject = Observable.combineLatest(string, int) - subscription = subject.asPublisher() + subject.asPublisher() .sink(receiveCompletion: { _ in }, receiveValue: { string, int in received.append(("\(string): \(int)")) }) + .store(in: &subscriptions) string.on(.next("The value")) string.on(.next("The number")) @@ -485,11 +590,12 @@ class ObservableAsPublisherTests: XCTestCase { let subject = Observable.combineLatest(string, int) - subscription = subject.asPublisher() + subject.asPublisher() .sink(receiveCompletion: { _ in }, receiveValue: { string, int in received.append(("\(string ?? ""): \(int ?? 0)")) }) + .store(in: &subscriptions) string.on(.next("The value")) string.on(.next("The number")) @@ -514,7 +620,7 @@ class ObservableAsPublisherTests: XCTestCase { let subject = Observable.combineLatest(string, int) - subscription = subject.asPublisher() + subject.asPublisher() .sink(receiveCompletion: { completion in if case .failure = completion { received.append("ERROR") @@ -523,6 +629,7 @@ class ObservableAsPublisherTests: XCTestCase { receiveValue: { string, int in received.append(("\(string): \(int)")) }) + .store(in: &subscriptions) string.on(.next("The number")) int.on(.next(1)) @@ -545,13 +652,14 @@ class ObservableAsPublisherTests: XCTestCase { let subject = Observable.combineLatest(string, int) - subscription = subject.asPublisher() + subject.asPublisher() .sink(receiveCompletion: { completion in if case .failure = completion { received.append("ERROR") } }, receiveValue: { _ in }) + .store(in: &subscriptions) int.on(.error(TestError.test)) int.on(.next(1)) @@ -565,7 +673,7 @@ class ObservableAsPublisherTests: XCTestCase { let obs2 = Observable() var isDone = false - subscription = Observable.combineLatest(obs1, obs2) + Observable.combineLatest(obs1, obs2) .asPublisher() .sink(receiveCompletion: { completion in if case .finished = completion { @@ -573,6 +681,7 @@ class ObservableAsPublisherTests: XCTestCase { } }, receiveValue: { _ in }) + .store(in: &subscriptions) obs1.on(.done) XCTAssertFalse(isDone) @@ -589,9 +698,10 @@ class ObservableAsPublisherTests: XCTestCase { var received = [(String, Int, Double)]() let subject = Observable.combineLatest(one, two, three) - subscription = subject.asPublisher() + subject.asPublisher() .sink(receiveCompletion: { _ in }, receiveValue: { received.append($0) }) + .store(in: &subscriptions) one.on(.next("The string")) XCTAssertTrue(received.isEmpty) @@ -612,13 +722,14 @@ class ObservableAsPublisherTests: XCTestCase { let subject = Observable.combineLatest(one, two, three) let exp = expectation(description: "combineLatest3 forwards error from observable") - subscription = subject.asPublisher() + subject.asPublisher() .sink(receiveCompletion: { completion in if case .failure = completion { exp.fulfill() } }, receiveValue: { _ in }) + .store(in: &subscriptions) two.on(.error(TestError.test)) waitForExpectations(timeout: 1) } @@ -630,13 +741,14 @@ class ObservableAsPublisherTests: XCTestCase { let subject = Observable.combineLatest(one, two, three) let exp = expectation(description: "combineLatest3 forwards error from observable") - subscription = subject.asPublisher() + subject.asPublisher() .sink(receiveCompletion: { completion in if case .failure = completion { exp.fulfill() } }, receiveValue: { _ in }) + .store(in: &subscriptions) three.on(.error(TestError.test)) waitForExpectations(timeout: 1) } @@ -647,7 +759,7 @@ class ObservableAsPublisherTests: XCTestCase { let obs3 = Observable() var isDone = false - subscription = Observable.combineLatest(obs1, obs2, obs3) + Observable.combineLatest(obs1, obs2, obs3) .asPublisher() .sink(receiveCompletion: { completion in if case .finished = completion { @@ -655,6 +767,7 @@ class ObservableAsPublisherTests: XCTestCase { } }, receiveValue: { _ in }) + .store(in: &subscriptions) obs1.on(.done) XCTAssertFalse(isDone) @@ -674,9 +787,10 @@ class ObservableAsPublisherTests: XCTestCase { var received = [(String, Int?, Double)]() let subject = Observable.combineLatest(one, two, three) - subscription = subject.asPublisher() + subject.asPublisher() .sink(receiveCompletion: { _ in }, receiveValue: { received.append($0) }) + .store(in: &subscriptions) one.on(.next("The string")) XCTAssertTrue(received.isEmpty) @@ -699,9 +813,10 @@ class ObservableAsPublisherTests: XCTestCase { var received = [(String, Int, Double, String)]() let subject = Observable.combineLatest(one, two, three, four) - subscription = subject.asPublisher() + subject.asPublisher() .sink(receiveCompletion: { _ in }, receiveValue: { received.append($0) }) + .store(in: &subscriptions) one.on(.next("The string")) XCTAssertTrue(received.isEmpty) @@ -727,13 +842,14 @@ class ObservableAsPublisherTests: XCTestCase { let subject = Observable.combineLatest(one, two, three, four) let exp = expectation(description: "combineLatest4 forwards error from observable") - subscription = subject.asPublisher() + subject.asPublisher() .sink(receiveCompletion: { completion in if case .failure = completion { exp.fulfill() } }, receiveValue: { _ in }) + .store(in: &subscriptions) one.on(.error(TestError.test)) waitForExpectations(timeout: 1) } @@ -746,13 +862,14 @@ class ObservableAsPublisherTests: XCTestCase { let subject = Observable.combineLatest(one, two, three, four) let exp = expectation(description: "combineLatest4 forwards error from observable") - subscription = subject.asPublisher() + subject.asPublisher() .sink(receiveCompletion: { completion in if case .failure = completion { exp.fulfill() } }, receiveValue: { _ in }) + .store(in: &subscriptions) two.on(.error(TestError.test)) waitForExpectations(timeout: 1) } @@ -765,13 +882,14 @@ class ObservableAsPublisherTests: XCTestCase { let subject = Observable.combineLatest(one, two, three, four) let exp = expectation(description: "combineLatest4 forwards error from observable") - subscription = subject.asPublisher() + subject.asPublisher() .sink(receiveCompletion: { completion in if case .failure = completion { exp.fulfill() } }, receiveValue: { _ in }) + .store(in: &subscriptions) three.on(.error(TestError.test)) waitForExpectations(timeout: 1) } @@ -784,13 +902,14 @@ class ObservableAsPublisherTests: XCTestCase { let subject = Observable.combineLatest(one, two, three, four) let exp = expectation(description: "combineLatest4 forwards error from observable") - subscription = subject.asPublisher() + subject.asPublisher() .sink(receiveCompletion: { completion in if case .failure = completion { exp.fulfill() } }, receiveValue: { _ in }) + .store(in: &subscriptions) four.on(.error(TestError.test)) waitForExpectations(timeout: 1) } @@ -802,7 +921,7 @@ class ObservableAsPublisherTests: XCTestCase { let obs4 = Observable() var isDone = false - subscription = Observable.combineLatest(obs1, obs2, obs3, obs4) + Observable.combineLatest(obs1, obs2, obs3, obs4) .asPublisher() .sink(receiveCompletion: { completion in if case .finished = completion { @@ -810,6 +929,7 @@ class ObservableAsPublisherTests: XCTestCase { } }, receiveValue: { _ in }) + .store(in: &subscriptions) obs1.on(.done) XCTAssertFalse(isDone) @@ -833,9 +953,10 @@ class ObservableAsPublisherTests: XCTestCase { var received = [(String, Int?, Double, String?)]() let subject = Observable.combineLatest(one, two, three, four) - subscription = subject.asPublisher() + subject.asPublisher() .sink(receiveCompletion: { _ in }, receiveValue: { received.append($0) }) + .store(in: &subscriptions) one.on(.next("The string")) XCTAssertTrue(received.isEmpty) @@ -858,9 +979,10 @@ class ObservableAsPublisherTests: XCTestCase { let subject = observable.map { "Number: \($0)" } var received = [String]() - subscription = subject.asPublisher() + subject.asPublisher() .sink(receiveCompletion: { _ in }, receiveValue: { received.append($0) }) + .store(in: &subscriptions) observable.on(.next(1)) observable.on(.next(10)) @@ -873,13 +995,14 @@ class ObservableAsPublisherTests: XCTestCase { let subject = observable.map { "Number: \($0)" } let exp = expectation(description: "observable map forwards error") - subscription = subject.asPublisher() + subject.asPublisher() .sink(receiveCompletion: { completion in if case .failure = completion { exp.fulfill() } }, receiveValue: { _ in }) + .store(in: &subscriptions) observable.on(.error(TestError.test)) @@ -891,13 +1014,14 @@ class ObservableAsPublisherTests: XCTestCase { let subject = observable.map { "Number: \($0)" } let exp = expectation(description: "observable map forwards done") - subscription = subject.asPublisher() + subject.asPublisher() .sink(receiveCompletion: { completion in if case .finished = completion { exp.fulfill() } }, receiveValue: { _ in }) + .store(in: &subscriptions) observable.on(.done) @@ -909,9 +1033,10 @@ class ObservableAsPublisherTests: XCTestCase { let subject = observable.filter { $0 % 2 == 0 } var received = [Int]() - subscription = subject.asPublisher() + subject.asPublisher() .sink(receiveCompletion: { _ in }, receiveValue: { received.append($0) }) + .store(in: &subscriptions) observable.on(.next(1)) observable.on(.next(2)) @@ -926,13 +1051,14 @@ class ObservableAsPublisherTests: XCTestCase { let subject = observable.filter { $0 % 2 == 0 } let exp = expectation(description: "observable filter forwards error") - subscription = subject.asPublisher() + subject.asPublisher() .sink(receiveCompletion: { completion in if case .failure = completion { exp.fulfill() } }, receiveValue: { _ in }) + .store(in: &subscriptions) observable.on(.error(TestError.test)) @@ -944,13 +1070,14 @@ class ObservableAsPublisherTests: XCTestCase { let subject = observable.filter { $0 % 2 == 0 } let exp = expectation(description: "observable filter forwards done") - subscription = subject.asPublisher() + subject.asPublisher() .sink(receiveCompletion: { completion in if case .finished = completion { exp.fulfill() } }, receiveValue: { _ in }) + .store(in: &subscriptions) observable.on(.done) @@ -962,9 +1089,10 @@ class ObservableAsPublisherTests: XCTestCase { let subject = fetchTrigger.flatMap { Variable(100).asObservable() } var received = [Int]() - subscription = subject.asPublisher() + subject.asPublisher() .sink(receiveCompletion: { _ in }, receiveValue: { received.append($0) }) + .store(in: &subscriptions) fetchTrigger.on(.next(())) XCTAssertEqual(received, [100]) @@ -975,13 +1103,14 @@ class ObservableAsPublisherTests: XCTestCase { let subject = fetchTrigger.flatMap { Variable(100).asObservable() } let exp = expectation(description: "observable flatMap forwards error") - subscription = subject.asPublisher() + subject.asPublisher() .sink(receiveCompletion: { completion in if case .failure = completion { exp.fulfill() } }, receiveValue: { _ in }) + .store(in: &subscriptions) fetchTrigger.on(.error(TestError.test)) waitForExpectations(timeout: 1) @@ -992,13 +1121,14 @@ class ObservableAsPublisherTests: XCTestCase { let subject = fetchTrigger.flatMap { Variable(100).asObservable() } let exp = expectation(description: "observable flatMap forwards done") - subscription = subject.asPublisher() + subject.asPublisher() .sink(receiveCompletion: { completion in if case .finished = completion { exp.fulfill() } }, receiveValue: { _ in }) + .store(in: &subscriptions) fetchTrigger.on(.done) waitForExpectations(timeout: 1) @@ -1012,11 +1142,12 @@ class ObservableAsPublisherTests: XCTestCase { let subject = Observable.zip(string, int) - subscription = subject.asPublisher() + subject.asPublisher() .sink(receiveCompletion: { _ in }, receiveValue: { string, int in received.append("\(string): \(int)") }) + .store(in: &subscriptions) string.on(.next("The value")) string.on(.next("The number")) @@ -1040,11 +1171,12 @@ class ObservableAsPublisherTests: XCTestCase { let subject = Observable.zip(string, int) - subscription = subject.asPublisher() + subject.asPublisher() .sink(receiveCompletion: { _ in }, receiveValue: { string, int in received.append("\(string ?? ""): \(int ?? 0)") }) + .store(in: &subscriptions) string.on(.next("The value")) string.on(.next("The number")) @@ -1068,11 +1200,12 @@ class ObservableAsPublisherTests: XCTestCase { let subject = Observable.zip(string, int) - subscription = subject.asPublisher() + subject.asPublisher() .sink(receiveCompletion: { _ in }, receiveValue: { string, int in received.append("\(string): \(int)") }) + .store(in: &subscriptions) string.on(.next("The value")) string.on(.next("The number")) @@ -1098,13 +1231,14 @@ class ObservableAsPublisherTests: XCTestCase { let subject = Observable.zip(one, two) let exp = expectation(description: "zip forwards error from observable") - subscription = subject.asPublisher() + subject.asPublisher() .sink(receiveCompletion: { completion in if case .failure = completion { exp.fulfill() } }, receiveValue: { _ in }) + .store(in: &subscriptions) one.on(.error(TestError.test)) waitForExpectations(timeout: 1) @@ -1115,7 +1249,7 @@ class ObservableAsPublisherTests: XCTestCase { let two = Observable() var isDone = false - subscription = Observable.zip(one, two) + Observable.zip(one, two) .asPublisher() .sink(receiveCompletion: { completion in if case .finished = completion { @@ -1123,6 +1257,7 @@ class ObservableAsPublisherTests: XCTestCase { } }, receiveValue: { _ in }) + .store(in: &subscriptions) one.on(.done) XCTAssertTrue(isDone) diff --git a/SnailTests/Combine/ReplayAsPublisherTests.swift b/SnailTests/Combine/ReplayAsPublisherTests.swift index 4ae5bf1..1b80e28 100644 --- a/SnailTests/Combine/ReplayAsPublisherTests.swift +++ b/SnailTests/Combine/ReplayAsPublisherTests.swift @@ -8,16 +8,17 @@ import XCTest @available(iOS 13.0, *) class ReplayAsPublisherTests: XCTestCase { private var subject: Replay! - private var subscription: AnyCancellable? + private var subscriptions: Set! override func setUp() { super.setUp() subject = Replay(2) + subscriptions = Set() } override func tearDown() { subject = nil - subscription = nil + subscriptions = nil super.tearDown() } @@ -27,9 +28,10 @@ class ReplayAsPublisherTests: XCTestCase { subject?.on(.next("2")) subject?.on(.done) - subscription = subject.asPublisher() + subject.asPublisher() .sink(receiveCompletion: { _ in }, receiveValue: { strings.append($0) }) + .store(in: &subscriptions) XCTAssertEqual(strings[0], "1") XCTAssertEqual(strings[1], "2") diff --git a/SnailTests/Combine/UniqueAsPublisherTests.swift b/SnailTests/Combine/UniqueAsPublisherTests.swift index b1955d2..68df702 100644 --- a/SnailTests/Combine/UniqueAsPublisherTests.swift +++ b/SnailTests/Combine/UniqueAsPublisherTests.swift @@ -7,19 +7,24 @@ import XCTest @available(iOS 13.0, *) class UniqueAsPublisherTests: XCTestCase { - private var subscription: AnyCancellable? + private var subscriptions: Set! + + override func setUp() { + subscriptions = Set() + } override func tearDown() { - subscription = nil + subscriptions = nil } func testVariableChanges() { var events: [String?] = [] let subject = Unique(nil) - subscription = subject.asObservable() + subject.asObservable() .asPublisher() .sink(receiveCompletion: { _ in }, receiveValue: { events.append($0) }) + .store(in: &subscriptions) subject.value = nil subject.value = "1" @@ -38,10 +43,11 @@ class UniqueAsPublisherTests: XCTestCase { subject.value = "new" var result: String? - subscription = subject.asObservable() + subject.asObservable() .asPublisher() .sink(receiveCompletion: { _ in }, receiveValue: { result = $0 }) + .store(in: &subscriptions) XCTAssertEqual("new", result) } @@ -50,10 +56,11 @@ class UniqueAsPublisherTests: XCTestCase { let subject = Unique("initial") var result: String? - subscription = subject.asObservable() + subject.asObservable() .asPublisher() .sink(receiveCompletion: { _ in }, receiveValue: { result = $0 }) + .store(in: &subscriptions) XCTAssertEqual("initial", result) } @@ -61,10 +68,11 @@ class UniqueAsPublisherTests: XCTestCase { func testVariableHandlesEquatableArrays() { var events: [[String]] = [] let subject = Unique<[String]>(["1", "2"]) - subscription = subject.asObservable() + subject.asObservable() .asPublisher() .sink(receiveCompletion: { _ in }, receiveValue: { events.append($0) }) + .store(in: &subscriptions) subject.value = ["1", "2"] subject.value = ["2", "1"] @@ -78,10 +86,11 @@ class UniqueAsPublisherTests: XCTestCase { func testVariableHandlesOptionalArrays() { var events: [[String]?] = [] let subject = Unique<[String]?>(nil) - subscription = subject.asObservable() + subject.asObservable() .asPublisher() .sink(receiveCompletion: { _ in }, receiveValue: { events.append($0) }) + .store(in: &subscriptions) subject.value = ["1", "2"] subject.value = nil subject.value = nil diff --git a/SnailTests/Combine/VariableAsPublisherTests.swift b/SnailTests/Combine/VariableAsPublisherTests.swift index 3f14e62..374de17 100644 --- a/SnailTests/Combine/VariableAsPublisherTests.swift +++ b/SnailTests/Combine/VariableAsPublisherTests.swift @@ -7,19 +7,24 @@ import XCTest @available(iOS 13.0, *) class VariableAsPublisherTests: XCTestCase { - private var subscription: AnyCancellable? + private var subscriptions: Set! + + override func setUp() { + subscriptions = Set() + } override func tearDown() { - subscription = nil + subscriptions = nil } func testVariableChanges() { var events: [String?] = [] let subject = Variable(nil) - subscription = subject.asObservable() + subject.asObservable() .asPublisher() .sink(receiveCompletion: { _ in }, receiveValue: { events.append($0) }) + .store(in: &subscriptions) subject.value = "1" subject.value = "2" XCTAssertEqual(events[0], nil) @@ -33,10 +38,11 @@ class VariableAsPublisherTests: XCTestCase { subject.value = "new" var result: String? - subscription = subject.asObservable() + subject.asObservable() .asPublisher() .sink(receiveCompletion: { _ in }, receiveValue: { result = $0 }) + .store(in: &subscriptions) XCTAssertEqual("new", result) } @@ -45,10 +51,11 @@ class VariableAsPublisherTests: XCTestCase { let subject = Variable("initial") var result: String? - subscription = subject.asObservable() + subject.asObservable() .asPublisher() .sink(receiveCompletion: { _ in }, receiveValue: { result = $0 }) + .store(in: &subscriptions) XCTAssertEqual("initial", result) } @@ -58,10 +65,11 @@ class VariableAsPublisherTests: XCTestCase { subject.value = "new" var subjectCharactersCount: Int? - subscription = subject.asObservable() + subject.asObservable() .asPublisher() .sink(receiveCompletion: { _ in }, receiveValue: { subjectCharactersCount = $0.count }) + .store(in: &subscriptions) XCTAssertEqual(subject.value.count, subjectCharactersCount) } @@ -70,10 +78,11 @@ class VariableAsPublisherTests: XCTestCase { let subject = Variable("initial") var subjectCharactersCount: Int? - subscription = subject.asObservable() + subject.asObservable() .asPublisher() .sink(receiveCompletion: { _ in }, receiveValue: { subjectCharactersCount = $0.count }) + .store(in: &subscriptions) XCTAssertEqual(subject.value.count, subjectCharactersCount) } @@ -82,10 +91,11 @@ class VariableAsPublisherTests: XCTestCase { let subject = Unique("sameValue") var firedCount = 0 - subscription = subject.asObservable() + subject.asObservable() .asPublisher() .sink(receiveCompletion: { _ in }, receiveValue: { _ in firedCount += 1 }) + .store(in: &subscriptions) subject.value = "sameValue" @@ -96,10 +106,11 @@ class VariableAsPublisherTests: XCTestCase { let subject = Variable("sameValue") var firedCount = 0 - subscription = subject.asObservable() + subject.asObservable() .asPublisher() .sink(receiveCompletion: { _ in }, receiveValue: { _ in firedCount += 1 }) + .store(in: &subscriptions) subject.value = "sameValue" From 9de0fc0a7874829f98213e3172f79d55c7d2aa00 Mon Sep 17 00:00:00 2001 From: Thomas Bajis Date: Tue, 18 May 2021 18:08:28 -0400 Subject: [PATCH 08/13] Create separate file for SnailPublisher --- Snail.xcodeproj/project.pbxproj | 4 ++++ Snail/Snail+Combine/Observable+Combine.swift | 23 +------------------- Snail/Snail+Combine/SnailPublisher.swift | 21 ++++++++++++++++++ 3 files changed, 26 insertions(+), 22 deletions(-) create mode 100644 Snail/Snail+Combine/SnailPublisher.swift diff --git a/Snail.xcodeproj/project.pbxproj b/Snail.xcodeproj/project.pbxproj index 4e7cef2..89875a8 100644 --- a/Snail.xcodeproj/project.pbxproj +++ b/Snail.xcodeproj/project.pbxproj @@ -25,6 +25,7 @@ 2E53BE0F2652D4E50030B9FB /* ReplayAsPublisherTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 2E53BE0E2652D4E50030B9FB /* ReplayAsPublisherTests.swift */; }; 2E53BE112652D7370030B9FB /* UniqueAsPublisherTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 2E53BE102652D7370030B9FB /* UniqueAsPublisherTests.swift */; }; 2E53BE132652D8B80030B9FB /* VariableAsPublisherTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 2E53BE122652D8B80030B9FB /* VariableAsPublisherTests.swift */; }; + 2E53BE512654717B0030B9FB /* SnailPublisher.swift in Sources */ = {isa = PBXBuildFile; fileRef = 2E53BE502654717B0030B9FB /* SnailPublisher.swift */; }; CB2936771DFE151B00792E6B /* Just.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB2936761DFE151B00792E6B /* Just.swift */; }; CB2936791DFEF77500792E6B /* JustTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB2936781DFEF77500792E6B /* JustTests.swift */; }; CBE54A7A1E5A16AC00971F74 /* Subscriber.swift in Sources */ = {isa = PBXBuildFile; fileRef = CBE54A791E5A16AC00971F74 /* Subscriber.swift */; }; @@ -77,6 +78,7 @@ 2E53BE0E2652D4E50030B9FB /* ReplayAsPublisherTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ReplayAsPublisherTests.swift; sourceTree = ""; }; 2E53BE102652D7370030B9FB /* UniqueAsPublisherTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = UniqueAsPublisherTests.swift; sourceTree = ""; }; 2E53BE122652D8B80030B9FB /* VariableAsPublisherTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = VariableAsPublisherTests.swift; sourceTree = ""; }; + 2E53BE502654717B0030B9FB /* SnailPublisher.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = SnailPublisher.swift; sourceTree = ""; }; CB2936761DFE151B00792E6B /* Just.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Just.swift; sourceTree = ""; }; CB2936781DFEF77500792E6B /* JustTests.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = JustTests.swift; sourceTree = ""; }; CBE54A791E5A16AC00971F74 /* Subscriber.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Subscriber.swift; sourceTree = ""; }; @@ -136,6 +138,7 @@ 2E53BDEB264ECC940030B9FB /* Observable+Combine.swift */, 2E53BDED264ED4C70030B9FB /* SnailSubscription.swift */, 2E53BE082652B6D60030B9FB /* DemandBuffer.swift */, + 2E53BE502654717B0030B9FB /* SnailPublisher.swift */, ); path = "Snail+Combine"; sourceTree = ""; @@ -371,6 +374,7 @@ 24CF1FA81EF875A400F34234 /* URLSessionExtensions.swift in Sources */, 2E53BE092652B6D60030B9FB /* DemandBuffer.swift in Sources */, 2408FA902112A15900B9F59E /* Scheduler.swift in Sources */, + 2E53BE512654717B0030B9FB /* SnailPublisher.swift in Sources */, CBE54E601DFB39510008DD64 /* Event.swift in Sources */, 2E53BDEE264ED4C70030B9FB /* SnailSubscription.swift in Sources */, F5C973A622F20C86003DB42C /* Disposer.swift in Sources */, diff --git a/Snail/Snail+Combine/Observable+Combine.swift b/Snail/Snail+Combine/Observable+Combine.swift index 0753a3b..3d8f212 100644 --- a/Snail/Snail+Combine/Observable+Combine.swift +++ b/Snail/Snail+Combine/Observable+Combine.swift @@ -5,28 +5,7 @@ import Foundation @available(iOS 13.0, *) public extension ObservableType { - var publisher: AnyPublisher { - return SnailPublisher(upstream: self).eraseToAnyPublisher() - } - func asPublisher() -> AnyPublisher { - return publisher - } -} - -@available(iOS 13.0, *) -public class SnailPublisher: Publisher { - public typealias Output = Upstream.T - public typealias Failure = Swift.Error - - private let upstream: Upstream - - init(upstream: Upstream) { - self.upstream = upstream - } - - public func receive(subscriber: S) where Failure == S.Failure, Output == S.Input { - subscriber.receive(subscription: SnailSubscription(upstream: upstream, - downstream: subscriber)) + return SnailPublisher(upstream: self).eraseToAnyPublisher() } } diff --git a/Snail/Snail+Combine/SnailPublisher.swift b/Snail/Snail+Combine/SnailPublisher.swift new file mode 100644 index 0000000..7aca47f --- /dev/null +++ b/Snail/Snail+Combine/SnailPublisher.swift @@ -0,0 +1,21 @@ +// Copyright © 2021 Compass. All rights reserved. + +import Combine +import Foundation + +@available(iOS 13.0, *) +public class SnailPublisher: Publisher { + public typealias Output = Upstream.T + public typealias Failure = Error + + private let upstream: Upstream + + init(upstream: Upstream) { + self.upstream = upstream + } + + public func receive(subscriber: S) where Failure == S.Failure, Output == S.Input { + subscriber.receive(subscription: SnailSubscription(upstream: upstream, + downstream: subscriber)) + } +} From 529ec1dcc6ca1fd1c9a3893a6df3756f005a3080 Mon Sep 17 00:00:00 2001 From: Thomas Bajis Date: Wed, 9 Jun 2021 21:31:02 -0400 Subject: [PATCH 09/13] remove whitespaces --- SnailTests/Combine/UniqueAsPublisherTests.swift | 2 +- SnailTests/Combine/VariableAsPublisherTests.swift | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/SnailTests/Combine/UniqueAsPublisherTests.swift b/SnailTests/Combine/UniqueAsPublisherTests.swift index 68df702..c00c5e6 100644 --- a/SnailTests/Combine/UniqueAsPublisherTests.swift +++ b/SnailTests/Combine/UniqueAsPublisherTests.swift @@ -8,7 +8,7 @@ import XCTest @available(iOS 13.0, *) class UniqueAsPublisherTests: XCTestCase { private var subscriptions: Set! - + override func setUp() { subscriptions = Set() } diff --git a/SnailTests/Combine/VariableAsPublisherTests.swift b/SnailTests/Combine/VariableAsPublisherTests.swift index 374de17..832f2c8 100644 --- a/SnailTests/Combine/VariableAsPublisherTests.swift +++ b/SnailTests/Combine/VariableAsPublisherTests.swift @@ -8,7 +8,7 @@ import XCTest @available(iOS 13.0, *) class VariableAsPublisherTests: XCTestCase { private var subscriptions: Set! - + override func setUp() { subscriptions = Set() } From c2ba6308fd4cadef770d5dceb425cfa0eff6cbbd Mon Sep 17 00:00:00 2001 From: Thomas Bajis Date: Wed, 9 Jun 2021 21:31:15 -0400 Subject: [PATCH 10/13] Use mostly original DemandBuffer file --- Snail/Snail+Combine/DemandBuffer.swift | 74 ++++++++++++++++++++++---- 1 file changed, 63 insertions(+), 11 deletions(-) diff --git a/Snail/Snail+Combine/DemandBuffer.swift b/Snail/Snail+Combine/DemandBuffer.swift index ba189fb..4d6aa1e 100644 --- a/Snail/Snail+Combine/DemandBuffer.swift +++ b/Snail/Snail+Combine/DemandBuffer.swift @@ -1,29 +1,47 @@ -// Copyright © 2021 Compass. All rights reserved. +// +// DemandBuffer.swift +// RxCombine +// +// Created by Shai Mishali on 21/02/2020. +// Copyright © 2020 Combine Community. All rights reserved. +// import Combine -import Foundation +import Darwin +/// A buffer responsible for managing the demand of a downstream +/// subscriber for an upstream publisher +/// +/// It buffers values and completion events and forwards them dynamically +/// according to the demand requested by the downstream +/// +/// In a sense, the subscription only relays the requests for demand, as well +/// the events emitted by the upstream — to this buffer, which manages +/// the entire behavior and backpressure contract @available(iOS 13.0, *) class DemandBuffer { - private struct Demand { - var processed: Subscribers.Demand = .none - var requested: Subscribers.Demand = .none - var sent: Subscribers.Demand = .none - } - private let lock = NSRecursiveLock() - private var buffer: [S.Input] = [] + private var buffer = [S.Input]() private let subscriber: S private var completion: Subscribers.Completion? private var demandState = Demand() + /// Initialize a new demand buffer for a provided downstream subscriber + /// + /// - parameter subscriber: The downstream subscriber demanding events init(subscriber: S) { self.subscriber = subscriber } + /// Buffer an upstream value to later be forwarded to + /// the downstream subscriber, once it demands it + /// + /// - parameter value: Upstream value to buffer + /// + /// - returns: The demand fulfilled by the bufferr func buffer(value: S.Input) -> Subscribers.Demand { precondition(self.completion == nil, - "Completed publisher should not be able to send values") + "How could a completed publisher sent values?! Beats me 🤷‍♂️") switch demandState.requested { case .unlimited: @@ -34,18 +52,38 @@ class DemandBuffer { } } + /// Complete the demand buffer with an upstream completion event + /// + /// This method will deplete the buffer immediately, + /// based on the currently accumulated demand, and relay the + /// completion event down as soon as demand is fulfilled + /// + /// - parameter completion: Completion event func complete(completion: Subscribers.Completion) { precondition(self.completion == nil, - "Completion should not be completed at this point") + "Completion have already occured, which is quite awkward 🥺") self.completion = completion _ = flush() } + /// Signal to the buffer that the downstream requested new demand + /// + /// - note: The buffer will attempt to flush as many events rqeuested + /// by the downstream at this point func demand(_ demand: Subscribers.Demand) -> Subscribers.Demand { flush(adding: demand) } + /// Flush buffered events to the downstream based on the current + /// state of the downstream's demand + /// + /// - parameter newDemand: The new demand to add. If `nil`, the flush isn't the + /// result of an explicit demand change + /// + /// - note: After fulfilling the downstream's request, if completion + /// has already occured, the buffer will be cleared and the + /// completion event will be sent to the downstream subscriber private func flush(adding newDemand: Subscribers.Demand? = nil) -> Subscribers.Demand { lock.lock() defer { lock.unlock() } @@ -54,6 +92,7 @@ class DemandBuffer { demandState.requested += newDemand } + // If buffer isn't ready for flushing, return immediately guard demandState.requested > 0 || newDemand == Subscribers.Demand.none else { return .none } while !buffer.isEmpty && demandState.processed < demandState.requested { @@ -62,6 +101,7 @@ class DemandBuffer { } if let completion = completion { + // Completion event was already sent buffer = [] demandState = .init() self.completion = nil @@ -74,3 +114,15 @@ class DemandBuffer { return sentDemand } } + +// MARK: - Private Helpers +@available(iOS 13.0, *) +private extension DemandBuffer { + /// A model that tracks the downstream's + /// accumulated demand state + struct Demand { + var processed: Subscribers.Demand = .none + var requested: Subscribers.Demand = .none + var sent: Subscribers.Demand = .none + } +} From 2fdc565a5758cd2f071891e83fb46ecdaad58ccb Mon Sep 17 00:00:00 2001 From: Thomas Bajis Date: Thu, 10 Jun 2021 11:55:13 -0400 Subject: [PATCH 11/13] add foundation import --- Snail/Snail+Combine/DemandBuffer.swift | 1 + 1 file changed, 1 insertion(+) diff --git a/Snail/Snail+Combine/DemandBuffer.swift b/Snail/Snail+Combine/DemandBuffer.swift index 4d6aa1e..1e82c19 100644 --- a/Snail/Snail+Combine/DemandBuffer.swift +++ b/Snail/Snail+Combine/DemandBuffer.swift @@ -8,6 +8,7 @@ import Combine import Darwin +import Foundation /// A buffer responsible for managing the demand of a downstream /// subscriber for an upstream publisher From dd328de59f3081b3c4be7996189ade738b84db53 Mon Sep 17 00:00:00 2001 From: Thomas Bajis Date: Tue, 21 Sep 2021 13:39:48 -0400 Subject: [PATCH 12/13] change bridge naming to .asAnyPublisher() --- Snail/Snail+Combine/Observable+Combine.swift | 2 +- SnailTests/Combine/FailAsPublisherTests.swift | 4 +- SnailTests/Combine/JustAsPublisherTests.swift | 2 +- .../Combine/ObservableAsPublisherTests.swift | 110 +++++++++--------- .../Combine/ReplayAsPublisherTests.swift | 2 +- .../Combine/UniqueAsPublisherTests.swift | 10 +- .../Combine/VariableAsPublisherTests.swift | 14 +-- 7 files changed, 72 insertions(+), 72 deletions(-) diff --git a/Snail/Snail+Combine/Observable+Combine.swift b/Snail/Snail+Combine/Observable+Combine.swift index 3d8f212..e47bb09 100644 --- a/Snail/Snail+Combine/Observable+Combine.swift +++ b/Snail/Snail+Combine/Observable+Combine.swift @@ -5,7 +5,7 @@ import Foundation @available(iOS 13.0, *) public extension ObservableType { - func asPublisher() -> AnyPublisher { + func asAnyPublisher() -> AnyPublisher { return SnailPublisher(upstream: self).eraseToAnyPublisher() } } diff --git a/SnailTests/Combine/FailAsPublisherTests.swift b/SnailTests/Combine/FailAsPublisherTests.swift index 0f17687..51d9aa7 100644 --- a/SnailTests/Combine/FailAsPublisherTests.swift +++ b/SnailTests/Combine/FailAsPublisherTests.swift @@ -32,7 +32,7 @@ class FailAsPublisherTests: XCTestCase { } func testOnErrorIsRun() { - subject.asPublisher() + subject.asAnyPublisher() .sink(receiveCompletion: { [unowned self] completion in if case let .failure(underlying) = completion { self.error = underlying as? TestError @@ -45,7 +45,7 @@ class FailAsPublisherTests: XCTestCase { } func testOnNextIsNotRun() { - subject.asPublisher() + subject.asAnyPublisher() .sink(receiveCompletion: { _ in }, receiveValue: { [unowned self] in self.strings.append($0) }) .store(in: &subscriptions) diff --git a/SnailTests/Combine/JustAsPublisherTests.swift b/SnailTests/Combine/JustAsPublisherTests.swift index c411180..5dc3273 100644 --- a/SnailTests/Combine/JustAsPublisherTests.swift +++ b/SnailTests/Combine/JustAsPublisherTests.swift @@ -22,7 +22,7 @@ class JustAsPublisherTests: XCTestCase { var result: Int? var done = false - Just(1).asPublisher() + Just(1).asAnyPublisher() .sink(receiveCompletion: { completion in if case .finished = completion { done = true diff --git a/SnailTests/Combine/ObservableAsPublisherTests.swift b/SnailTests/Combine/ObservableAsPublisherTests.swift index f22b85c..a548a89 100644 --- a/SnailTests/Combine/ObservableAsPublisherTests.swift +++ b/SnailTests/Combine/ObservableAsPublisherTests.swift @@ -42,7 +42,7 @@ class ObservableAsPublisherTests: XCTestCase { } func testNext() { - subject.asPublisher() + subject.asAnyPublisher() .sink(receiveCompletion: { [unowned self] completion in switch completion { case .finished: @@ -60,7 +60,7 @@ class ObservableAsPublisherTests: XCTestCase { } func testOnDone() { - subject.asPublisher() + subject.asAnyPublisher() .sink(receiveCompletion: { [unowned self] completion in switch completion { case .finished: @@ -84,7 +84,7 @@ class ObservableAsPublisherTests: XCTestCase { } func testOnError() { - subject.asPublisher() + subject.asAnyPublisher() .sink(receiveCompletion: { [unowned self] completion in switch completion { case .finished: @@ -111,7 +111,7 @@ class ObservableAsPublisherTests: XCTestCase { subject.subscribe(onNext: { string in more.append(string) }).add(to: disposer) - subject.asPublisher() + subject.asAnyPublisher() .sink(receiveCompletion: { _ in }, receiveValue: { [unowned self] string in self.strings?.append(string) @@ -127,7 +127,7 @@ class ObservableAsPublisherTests: XCTestCase { subject?.on(.error(TestError.test)) var oldError: TestError? - subject.asPublisher() + subject.asAnyPublisher() .sink(receiveCompletion: { completion in if case let .failure(underlying) = completion { oldError = underlying as? TestError @@ -143,7 +143,7 @@ class ObservableAsPublisherTests: XCTestCase { let exp = expectation(description: "queue") DispatchQueue.global().async { - self.subject.asPublisher() + self.subject.asAnyPublisher() .receive(on: DispatchQueue.main) .sink(receiveCompletion: { _ in }, receiveValue: { _ in @@ -166,7 +166,7 @@ class ObservableAsPublisherTests: XCTestCase { DispatchQueue.global().async { self.subject.on(.main) - .asPublisher() + .asAnyPublisher() .sink(receiveCompletion: { _ in }, receiveValue: { _ in exp.fulfill() @@ -187,7 +187,7 @@ class ObservableAsPublisherTests: XCTestCase { let exp = expectation(description: "queue") self.subject.on(.global()) - .asPublisher() + .asAnyPublisher() .receive(on: DispatchQueue.main) .sink(receiveCompletion: { _ in }, receiveValue: { _ in @@ -204,7 +204,7 @@ class ObservableAsPublisherTests: XCTestCase { } func testRemovingSubscribers() { - subject.asPublisher() + subject.asAnyPublisher() .sink(receiveCompletion: { _ in }, receiveValue: { [unowned self] str in self.strings.append(str) @@ -223,7 +223,7 @@ class ObservableAsPublisherTests: XCTestCase { onError: { error in self.error = error }, onDone: { self.done = true }) - subject.asPublisher() + subject.asAnyPublisher() .sink(receiveCompletion: { _ in }, receiveValue: { [unowned self] str in self.strings.append(str) @@ -261,7 +261,7 @@ class ObservableAsPublisherTests: XCTestCase { } observable.throttle(delay) - .asPublisher() + .asAnyPublisher() .sink(receiveCompletion: { _ in }, receiveValue: { received.append($0) }) .store(in: &subscriptions) @@ -290,7 +290,7 @@ class ObservableAsPublisherTests: XCTestCase { } observable.throttle(delay) - .asPublisher() + .asAnyPublisher() .sink(receiveCompletion: { _ in }, receiveValue: { received.append($0) }) .store(in: &subscriptions) @@ -322,7 +322,7 @@ class ObservableAsPublisherTests: XCTestCase { } observable.debounce(delay) - .asPublisher() + .asAnyPublisher() .sink(receiveCompletion: { _ in }, receiveValue: { received.append($0) }) .store(in: &subscriptions) @@ -340,7 +340,7 @@ class ObservableAsPublisherTests: XCTestCase { var received: [String] = [] observable.skip(first: 2) - .asPublisher() + .asAnyPublisher() .sink(receiveCompletion: { _ in }, receiveValue: { received.append($0) }) .store(in: &subscriptions) @@ -357,7 +357,7 @@ class ObservableAsPublisherTests: XCTestCase { var error: TestError? observable.skip(first: 2) - .asPublisher() + .asAnyPublisher() .sink(receiveCompletion: { completion in if case let .failure(underlying) = completion { error = underlying as? TestError @@ -375,7 +375,7 @@ class ObservableAsPublisherTests: XCTestCase { var done = false observable.skip(first: 2) - .asPublisher() + .asAnyPublisher() .sink(receiveCompletion: { completion in if case .finished = completion { done = true @@ -394,7 +394,7 @@ class ObservableAsPublisherTests: XCTestCase { var received: [String] = [] observable.take(first: 2) - .asPublisher() + .asAnyPublisher() .sink(receiveCompletion: { _ in }, receiveValue: { received.append($0) }) .store(in: &subscriptions) @@ -411,7 +411,7 @@ class ObservableAsPublisherTests: XCTestCase { var error: TestError? observable.take(first: 2) - .asPublisher() + .asAnyPublisher() .sink(receiveCompletion: { completion in if case let .failure(underlying) = completion { error = underlying as? TestError @@ -430,7 +430,7 @@ class ObservableAsPublisherTests: XCTestCase { var done = false observable.take(first: 2) - .asPublisher() + .asAnyPublisher() .sink(receiveCompletion: { completion in if case .finished = completion { done = true @@ -449,7 +449,7 @@ class ObservableAsPublisherTests: XCTestCase { var done = false observable.take(first: 2) - .asPublisher() + .asAnyPublisher() .sink(receiveCompletion: { completion in if case .finished = completion { done = true @@ -472,7 +472,7 @@ class ObservableAsPublisherTests: XCTestCase { let observable = Observable() observable.forward(to: subject) - subject.asPublisher() + subject.asAnyPublisher() .sink(receiveCompletion: { completion in if case let .failure(underlying) = completion { receivedError = underlying as? TestError @@ -496,7 +496,7 @@ class ObservableAsPublisherTests: XCTestCase { let observable = Observable() observable.forward(to: subject) - subject.asPublisher() + subject.asAnyPublisher() .sink(receiveCompletion: { _ in }, receiveValue: { received.append($0) }) .store(in: &subscriptions) @@ -516,7 +516,7 @@ class ObservableAsPublisherTests: XCTestCase { let subject = Observable.merge([a, b]) - subject.asPublisher() + subject.asAnyPublisher() .sink(receiveCompletion: { _ in }, receiveValue: { received.append($0) }) .store(in: &subscriptions) @@ -538,7 +538,7 @@ class ObservableAsPublisherTests: XCTestCase { let subject = Observable.merge(a, b) - subject.asPublisher() + subject.asAnyPublisher() .sink(receiveCompletion: { _ in }, receiveValue: { received.append($0) }) .store(in: &subscriptions) @@ -560,7 +560,7 @@ class ObservableAsPublisherTests: XCTestCase { let subject = Observable.combineLatest(string, int) - subject.asPublisher() + subject.asAnyPublisher() .sink(receiveCompletion: { _ in }, receiveValue: { string, int in received.append(("\(string): \(int)")) @@ -590,7 +590,7 @@ class ObservableAsPublisherTests: XCTestCase { let subject = Observable.combineLatest(string, int) - subject.asPublisher() + subject.asAnyPublisher() .sink(receiveCompletion: { _ in }, receiveValue: { string, int in received.append(("\(string ?? ""): \(int ?? 0)")) @@ -620,7 +620,7 @@ class ObservableAsPublisherTests: XCTestCase { let subject = Observable.combineLatest(string, int) - subject.asPublisher() + subject.asAnyPublisher() .sink(receiveCompletion: { completion in if case .failure = completion { received.append("ERROR") @@ -652,7 +652,7 @@ class ObservableAsPublisherTests: XCTestCase { let subject = Observable.combineLatest(string, int) - subject.asPublisher() + subject.asAnyPublisher() .sink(receiveCompletion: { completion in if case .failure = completion { received.append("ERROR") @@ -674,7 +674,7 @@ class ObservableAsPublisherTests: XCTestCase { var isDone = false Observable.combineLatest(obs1, obs2) - .asPublisher() + .asAnyPublisher() .sink(receiveCompletion: { completion in if case .finished = completion { isDone = true @@ -698,7 +698,7 @@ class ObservableAsPublisherTests: XCTestCase { var received = [(String, Int, Double)]() let subject = Observable.combineLatest(one, two, three) - subject.asPublisher() + subject.asAnyPublisher() .sink(receiveCompletion: { _ in }, receiveValue: { received.append($0) }) .store(in: &subscriptions) @@ -722,7 +722,7 @@ class ObservableAsPublisherTests: XCTestCase { let subject = Observable.combineLatest(one, two, three) let exp = expectation(description: "combineLatest3 forwards error from observable") - subject.asPublisher() + subject.asAnyPublisher() .sink(receiveCompletion: { completion in if case .failure = completion { exp.fulfill() @@ -741,7 +741,7 @@ class ObservableAsPublisherTests: XCTestCase { let subject = Observable.combineLatest(one, two, three) let exp = expectation(description: "combineLatest3 forwards error from observable") - subject.asPublisher() + subject.asAnyPublisher() .sink(receiveCompletion: { completion in if case .failure = completion { exp.fulfill() @@ -760,7 +760,7 @@ class ObservableAsPublisherTests: XCTestCase { var isDone = false Observable.combineLatest(obs1, obs2, obs3) - .asPublisher() + .asAnyPublisher() .sink(receiveCompletion: { completion in if case .finished = completion { isDone = true @@ -787,7 +787,7 @@ class ObservableAsPublisherTests: XCTestCase { var received = [(String, Int?, Double)]() let subject = Observable.combineLatest(one, two, three) - subject.asPublisher() + subject.asAnyPublisher() .sink(receiveCompletion: { _ in }, receiveValue: { received.append($0) }) .store(in: &subscriptions) @@ -813,7 +813,7 @@ class ObservableAsPublisherTests: XCTestCase { var received = [(String, Int, Double, String)]() let subject = Observable.combineLatest(one, two, three, four) - subject.asPublisher() + subject.asAnyPublisher() .sink(receiveCompletion: { _ in }, receiveValue: { received.append($0) }) .store(in: &subscriptions) @@ -842,7 +842,7 @@ class ObservableAsPublisherTests: XCTestCase { let subject = Observable.combineLatest(one, two, three, four) let exp = expectation(description: "combineLatest4 forwards error from observable") - subject.asPublisher() + subject.asAnyPublisher() .sink(receiveCompletion: { completion in if case .failure = completion { exp.fulfill() @@ -862,7 +862,7 @@ class ObservableAsPublisherTests: XCTestCase { let subject = Observable.combineLatest(one, two, three, four) let exp = expectation(description: "combineLatest4 forwards error from observable") - subject.asPublisher() + subject.asAnyPublisher() .sink(receiveCompletion: { completion in if case .failure = completion { exp.fulfill() @@ -882,7 +882,7 @@ class ObservableAsPublisherTests: XCTestCase { let subject = Observable.combineLatest(one, two, three, four) let exp = expectation(description: "combineLatest4 forwards error from observable") - subject.asPublisher() + subject.asAnyPublisher() .sink(receiveCompletion: { completion in if case .failure = completion { exp.fulfill() @@ -902,7 +902,7 @@ class ObservableAsPublisherTests: XCTestCase { let subject = Observable.combineLatest(one, two, three, four) let exp = expectation(description: "combineLatest4 forwards error from observable") - subject.asPublisher() + subject.asAnyPublisher() .sink(receiveCompletion: { completion in if case .failure = completion { exp.fulfill() @@ -922,7 +922,7 @@ class ObservableAsPublisherTests: XCTestCase { var isDone = false Observable.combineLatest(obs1, obs2, obs3, obs4) - .asPublisher() + .asAnyPublisher() .sink(receiveCompletion: { completion in if case .finished = completion { isDone = true @@ -953,7 +953,7 @@ class ObservableAsPublisherTests: XCTestCase { var received = [(String, Int?, Double, String?)]() let subject = Observable.combineLatest(one, two, three, four) - subject.asPublisher() + subject.asAnyPublisher() .sink(receiveCompletion: { _ in }, receiveValue: { received.append($0) }) .store(in: &subscriptions) @@ -979,7 +979,7 @@ class ObservableAsPublisherTests: XCTestCase { let subject = observable.map { "Number: \($0)" } var received = [String]() - subject.asPublisher() + subject.asAnyPublisher() .sink(receiveCompletion: { _ in }, receiveValue: { received.append($0) }) .store(in: &subscriptions) @@ -995,7 +995,7 @@ class ObservableAsPublisherTests: XCTestCase { let subject = observable.map { "Number: \($0)" } let exp = expectation(description: "observable map forwards error") - subject.asPublisher() + subject.asAnyPublisher() .sink(receiveCompletion: { completion in if case .failure = completion { exp.fulfill() @@ -1014,7 +1014,7 @@ class ObservableAsPublisherTests: XCTestCase { let subject = observable.map { "Number: \($0)" } let exp = expectation(description: "observable map forwards done") - subject.asPublisher() + subject.asAnyPublisher() .sink(receiveCompletion: { completion in if case .finished = completion { exp.fulfill() @@ -1033,7 +1033,7 @@ class ObservableAsPublisherTests: XCTestCase { let subject = observable.filter { $0 % 2 == 0 } var received = [Int]() - subject.asPublisher() + subject.asAnyPublisher() .sink(receiveCompletion: { _ in }, receiveValue: { received.append($0) }) .store(in: &subscriptions) @@ -1051,7 +1051,7 @@ class ObservableAsPublisherTests: XCTestCase { let subject = observable.filter { $0 % 2 == 0 } let exp = expectation(description: "observable filter forwards error") - subject.asPublisher() + subject.asAnyPublisher() .sink(receiveCompletion: { completion in if case .failure = completion { exp.fulfill() @@ -1070,7 +1070,7 @@ class ObservableAsPublisherTests: XCTestCase { let subject = observable.filter { $0 % 2 == 0 } let exp = expectation(description: "observable filter forwards done") - subject.asPublisher() + subject.asAnyPublisher() .sink(receiveCompletion: { completion in if case .finished = completion { exp.fulfill() @@ -1089,7 +1089,7 @@ class ObservableAsPublisherTests: XCTestCase { let subject = fetchTrigger.flatMap { Variable(100).asObservable() } var received = [Int]() - subject.asPublisher() + subject.asAnyPublisher() .sink(receiveCompletion: { _ in }, receiveValue: { received.append($0) }) .store(in: &subscriptions) @@ -1103,7 +1103,7 @@ class ObservableAsPublisherTests: XCTestCase { let subject = fetchTrigger.flatMap { Variable(100).asObservable() } let exp = expectation(description: "observable flatMap forwards error") - subject.asPublisher() + subject.asAnyPublisher() .sink(receiveCompletion: { completion in if case .failure = completion { exp.fulfill() @@ -1121,7 +1121,7 @@ class ObservableAsPublisherTests: XCTestCase { let subject = fetchTrigger.flatMap { Variable(100).asObservable() } let exp = expectation(description: "observable flatMap forwards done") - subject.asPublisher() + subject.asAnyPublisher() .sink(receiveCompletion: { completion in if case .finished = completion { exp.fulfill() @@ -1142,7 +1142,7 @@ class ObservableAsPublisherTests: XCTestCase { let subject = Observable.zip(string, int) - subject.asPublisher() + subject.asAnyPublisher() .sink(receiveCompletion: { _ in }, receiveValue: { string, int in received.append("\(string): \(int)") @@ -1171,7 +1171,7 @@ class ObservableAsPublisherTests: XCTestCase { let subject = Observable.zip(string, int) - subject.asPublisher() + subject.asAnyPublisher() .sink(receiveCompletion: { _ in }, receiveValue: { string, int in received.append("\(string ?? ""): \(int ?? 0)") @@ -1200,7 +1200,7 @@ class ObservableAsPublisherTests: XCTestCase { let subject = Observable.zip(string, int) - subject.asPublisher() + subject.asAnyPublisher() .sink(receiveCompletion: { _ in }, receiveValue: { string, int in received.append("\(string): \(int)") @@ -1231,7 +1231,7 @@ class ObservableAsPublisherTests: XCTestCase { let subject = Observable.zip(one, two) let exp = expectation(description: "zip forwards error from observable") - subject.asPublisher() + subject.asAnyPublisher() .sink(receiveCompletion: { completion in if case .failure = completion { exp.fulfill() @@ -1250,7 +1250,7 @@ class ObservableAsPublisherTests: XCTestCase { var isDone = false Observable.zip(one, two) - .asPublisher() + .asAnyPublisher() .sink(receiveCompletion: { completion in if case .finished = completion { isDone = true diff --git a/SnailTests/Combine/ReplayAsPublisherTests.swift b/SnailTests/Combine/ReplayAsPublisherTests.swift index 1b80e28..8e34741 100644 --- a/SnailTests/Combine/ReplayAsPublisherTests.swift +++ b/SnailTests/Combine/ReplayAsPublisherTests.swift @@ -28,7 +28,7 @@ class ReplayAsPublisherTests: XCTestCase { subject?.on(.next("2")) subject?.on(.done) - subject.asPublisher() + subject.asAnyPublisher() .sink(receiveCompletion: { _ in }, receiveValue: { strings.append($0) }) .store(in: &subscriptions) diff --git a/SnailTests/Combine/UniqueAsPublisherTests.swift b/SnailTests/Combine/UniqueAsPublisherTests.swift index c00c5e6..a19f1ea 100644 --- a/SnailTests/Combine/UniqueAsPublisherTests.swift +++ b/SnailTests/Combine/UniqueAsPublisherTests.swift @@ -21,7 +21,7 @@ class UniqueAsPublisherTests: XCTestCase { var events: [String?] = [] let subject = Unique(nil) subject.asObservable() - .asPublisher() + .asAnyPublisher() .sink(receiveCompletion: { _ in }, receiveValue: { events.append($0) }) .store(in: &subscriptions) @@ -44,7 +44,7 @@ class UniqueAsPublisherTests: XCTestCase { var result: String? subject.asObservable() - .asPublisher() + .asAnyPublisher() .sink(receiveCompletion: { _ in }, receiveValue: { result = $0 }) .store(in: &subscriptions) @@ -57,7 +57,7 @@ class UniqueAsPublisherTests: XCTestCase { var result: String? subject.asObservable() - .asPublisher() + .asAnyPublisher() .sink(receiveCompletion: { _ in }, receiveValue: { result = $0 }) .store(in: &subscriptions) @@ -69,7 +69,7 @@ class UniqueAsPublisherTests: XCTestCase { var events: [[String]] = [] let subject = Unique<[String]>(["1", "2"]) subject.asObservable() - .asPublisher() + .asAnyPublisher() .sink(receiveCompletion: { _ in }, receiveValue: { events.append($0) }) .store(in: &subscriptions) @@ -87,7 +87,7 @@ class UniqueAsPublisherTests: XCTestCase { var events: [[String]?] = [] let subject = Unique<[String]?>(nil) subject.asObservable() - .asPublisher() + .asAnyPublisher() .sink(receiveCompletion: { _ in }, receiveValue: { events.append($0) }) .store(in: &subscriptions) diff --git a/SnailTests/Combine/VariableAsPublisherTests.swift b/SnailTests/Combine/VariableAsPublisherTests.swift index 832f2c8..0ae8edb 100644 --- a/SnailTests/Combine/VariableAsPublisherTests.swift +++ b/SnailTests/Combine/VariableAsPublisherTests.swift @@ -21,7 +21,7 @@ class VariableAsPublisherTests: XCTestCase { var events: [String?] = [] let subject = Variable(nil) subject.asObservable() - .asPublisher() + .asAnyPublisher() .sink(receiveCompletion: { _ in }, receiveValue: { events.append($0) }) .store(in: &subscriptions) @@ -39,7 +39,7 @@ class VariableAsPublisherTests: XCTestCase { var result: String? subject.asObservable() - .asPublisher() + .asAnyPublisher() .sink(receiveCompletion: { _ in }, receiveValue: { result = $0 }) .store(in: &subscriptions) @@ -52,7 +52,7 @@ class VariableAsPublisherTests: XCTestCase { var result: String? subject.asObservable() - .asPublisher() + .asAnyPublisher() .sink(receiveCompletion: { _ in }, receiveValue: { result = $0 }) .store(in: &subscriptions) @@ -66,7 +66,7 @@ class VariableAsPublisherTests: XCTestCase { var subjectCharactersCount: Int? subject.asObservable() - .asPublisher() + .asAnyPublisher() .sink(receiveCompletion: { _ in }, receiveValue: { subjectCharactersCount = $0.count }) .store(in: &subscriptions) @@ -79,7 +79,7 @@ class VariableAsPublisherTests: XCTestCase { var subjectCharactersCount: Int? subject.asObservable() - .asPublisher() + .asAnyPublisher() .sink(receiveCompletion: { _ in }, receiveValue: { subjectCharactersCount = $0.count }) .store(in: &subscriptions) @@ -92,7 +92,7 @@ class VariableAsPublisherTests: XCTestCase { var firedCount = 0 subject.asObservable() - .asPublisher() + .asAnyPublisher() .sink(receiveCompletion: { _ in }, receiveValue: { _ in firedCount += 1 }) .store(in: &subscriptions) @@ -107,7 +107,7 @@ class VariableAsPublisherTests: XCTestCase { var firedCount = 0 subject.asObservable() - .asPublisher() + .asAnyPublisher() .sink(receiveCompletion: { _ in }, receiveValue: { _ in firedCount += 1 }) .store(in: &subscriptions) From db3af24c617622c952d06fd251965d303ce3dc39 Mon Sep 17 00:00:00 2001 From: Thomas Bajis Date: Tue, 21 Sep 2021 13:49:39 -0400 Subject: [PATCH 13/13] update for missing test case --- .../Combine/ObservableAsPublisherTests.swift | 23 +++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/SnailTests/Combine/ObservableAsPublisherTests.swift b/SnailTests/Combine/ObservableAsPublisherTests.swift index a548a89..b654929 100644 --- a/SnailTests/Combine/ObservableAsPublisherTests.swift +++ b/SnailTests/Combine/ObservableAsPublisherTests.swift @@ -1262,4 +1262,27 @@ class ObservableAsPublisherTests: XCTestCase { one.on(.done) XCTAssertTrue(isDone) } + + func testNoDeadLock() { + class TestObject { + let disposer = Disposer() + + init(variable: Variable) { + variable.asObservable().subscribe() + .add(to: disposer) + } + } + + let subject = Variable(true) + + subject.asObservable() + .asAnyPublisher() + .sink(receiveCompletion: { _ in }, + receiveValue: { _ in + _ = TestObject(variable: subject) + }) + .store(in: &subscriptions) + + subject.value = true + } }