diff --git a/Snail.xcodeproj/project.pbxproj b/Snail.xcodeproj/project.pbxproj index 2e5ec04..89875a8 100644 --- a/Snail.xcodeproj/project.pbxproj +++ b/Snail.xcodeproj/project.pbxproj @@ -16,6 +16,16 @@ 24FABD581DFEF7EC005CF84E /* FailTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 24FABD571DFEF7EC005CF84E /* FailTests.swift */; }; 24FABD5A1DFF0B48005CF84E /* Replay.swift in Sources */ = {isa = PBXBuildFile; fileRef = 24FABD591DFF0B48005CF84E /* Replay.swift */; }; 24FABD5C1DFF0BAF005CF84E /* ReplayTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 24FABD5B1DFF0BAF005CF84E /* ReplayTests.swift */; }; + 2E53BDEC264ECC940030B9FB /* Observable+Combine.swift in Sources */ = {isa = PBXBuildFile; fileRef = 2E53BDEB264ECC940030B9FB /* Observable+Combine.swift */; }; + 2E53BDEE264ED4C70030B9FB /* SnailSubscription.swift in Sources */ = {isa = PBXBuildFile; fileRef = 2E53BDED264ED4C70030B9FB /* SnailSubscription.swift */; }; + 2E53BE072651F2990030B9FB /* ObservableAsPublisherTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 2E53BE062651F2990030B9FB /* ObservableAsPublisherTests.swift */; }; + 2E53BE092652B6D60030B9FB /* DemandBuffer.swift in Sources */ = {isa = PBXBuildFile; fileRef = 2E53BE082652B6D60030B9FB /* DemandBuffer.swift */; }; + 2E53BE0B2652D0960030B9FB /* FailAsPublisherTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 2E53BE0A2652D0960030B9FB /* FailAsPublisherTests.swift */; }; + 2E53BE0D2652D37D0030B9FB /* JustAsPublisherTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 2E53BE0C2652D37D0030B9FB /* JustAsPublisherTests.swift */; }; + 2E53BE0F2652D4E50030B9FB /* ReplayAsPublisherTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 2E53BE0E2652D4E50030B9FB /* ReplayAsPublisherTests.swift */; }; + 2E53BE112652D7370030B9FB /* UniqueAsPublisherTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 2E53BE102652D7370030B9FB /* UniqueAsPublisherTests.swift */; }; + 2E53BE132652D8B80030B9FB /* VariableAsPublisherTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 2E53BE122652D8B80030B9FB /* VariableAsPublisherTests.swift */; }; + 2E53BE512654717B0030B9FB /* SnailPublisher.swift in Sources */ = {isa = PBXBuildFile; fileRef = 2E53BE502654717B0030B9FB /* SnailPublisher.swift */; }; CB2936771DFE151B00792E6B /* Just.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB2936761DFE151B00792E6B /* Just.swift */; }; CB2936791DFEF77500792E6B /* JustTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB2936781DFEF77500792E6B /* JustTests.swift */; }; CBE54A7A1E5A16AC00971F74 /* Subscriber.swift in Sources */ = {isa = PBXBuildFile; fileRef = CBE54A791E5A16AC00971F74 /* Subscriber.swift */; }; @@ -59,6 +69,16 @@ 24FABD571DFEF7EC005CF84E /* FailTests.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = FailTests.swift; sourceTree = ""; }; 24FABD591DFF0B48005CF84E /* Replay.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Replay.swift; sourceTree = ""; }; 24FABD5B1DFF0BAF005CF84E /* ReplayTests.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = ReplayTests.swift; sourceTree = ""; }; + 2E53BDEB264ECC940030B9FB /* Observable+Combine.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "Observable+Combine.swift"; sourceTree = ""; }; + 2E53BDED264ED4C70030B9FB /* SnailSubscription.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = SnailSubscription.swift; sourceTree = ""; }; + 2E53BE062651F2990030B9FB /* ObservableAsPublisherTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ObservableAsPublisherTests.swift; sourceTree = ""; }; + 2E53BE082652B6D60030B9FB /* DemandBuffer.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = DemandBuffer.swift; sourceTree = ""; }; + 2E53BE0A2652D0960030B9FB /* FailAsPublisherTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = FailAsPublisherTests.swift; sourceTree = ""; }; + 2E53BE0C2652D37D0030B9FB /* JustAsPublisherTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = JustAsPublisherTests.swift; sourceTree = ""; }; + 2E53BE0E2652D4E50030B9FB /* ReplayAsPublisherTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ReplayAsPublisherTests.swift; sourceTree = ""; }; + 2E53BE102652D7370030B9FB /* UniqueAsPublisherTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = UniqueAsPublisherTests.swift; sourceTree = ""; }; + 2E53BE122652D8B80030B9FB /* VariableAsPublisherTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = VariableAsPublisherTests.swift; sourceTree = ""; }; + 2E53BE502654717B0030B9FB /* SnailPublisher.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = SnailPublisher.swift; sourceTree = ""; }; CB2936761DFE151B00792E6B /* Just.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Just.swift; sourceTree = ""; }; CB2936781DFEF77500792E6B /* JustTests.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = JustTests.swift; sourceTree = ""; }; CBE54A791E5A16AC00971F74 /* Subscriber.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Subscriber.swift; sourceTree = ""; }; @@ -112,6 +132,30 @@ path = Extensions; sourceTree = ""; }; + 2E53BDEA264ECC770030B9FB /* Snail+Combine */ = { + isa = PBXGroup; + children = ( + 2E53BDEB264ECC940030B9FB /* Observable+Combine.swift */, + 2E53BDED264ED4C70030B9FB /* SnailSubscription.swift */, + 2E53BE082652B6D60030B9FB /* DemandBuffer.swift */, + 2E53BE502654717B0030B9FB /* SnailPublisher.swift */, + ); + path = "Snail+Combine"; + sourceTree = ""; + }; + 2E53BE052651F2710030B9FB /* Combine */ = { + isa = PBXGroup; + children = ( + 2E53BE062651F2990030B9FB /* ObservableAsPublisherTests.swift */, + 2E53BE0A2652D0960030B9FB /* FailAsPublisherTests.swift */, + 2E53BE0C2652D37D0030B9FB /* JustAsPublisherTests.swift */, + 2E53BE0E2652D4E50030B9FB /* ReplayAsPublisherTests.swift */, + 2E53BE102652D7370030B9FB /* UniqueAsPublisherTests.swift */, + 2E53BE122652D8B80030B9FB /* VariableAsPublisherTests.swift */, + ); + path = Combine; + sourceTree = ""; + }; CBE54E371DFB36DF0008DD64 = { isa = PBXGroup; children = ( @@ -133,6 +177,7 @@ CBE54E431DFB36DF0008DD64 /* Snail */ = { isa = PBXGroup; children = ( + 2E53BDEA264ECC770030B9FB /* Snail+Combine */, F569538B2320476100D35C80 /* Closure.swift */, CBE54E441DFB36DF0008DD64 /* Snail.h */, CBE54E451DFB36DF0008DD64 /* Info.plist */, @@ -155,6 +200,7 @@ CBE54E4E1DFB36DF0008DD64 /* SnailTests */ = { isa = PBXGroup; children = ( + 2E53BE052651F2710030B9FB /* Combine */, CBE54E511DFB36DF0008DD64 /* Info.plist */, F5695389232046AA00D35C80 /* ClosureTests.swift */, F5C973A722F4B359003DB42C /* DisposerTests.swift */, @@ -314,6 +360,7 @@ isa = PBXSourcesBuildPhase; buildActionMask = 2147483647; files = ( + 2E53BDEC264ECC940030B9FB /* Observable+Combine.swift in Sources */, CBE54E621DFB39510008DD64 /* ObservableType.swift in Sources */, CBE54A7A1E5A16AC00971F74 /* Subscriber.swift in Sources */, 2421BA721E09801000EA9064 /* UIGestureRecognizerExtensions.swift in Sources */, @@ -325,8 +372,11 @@ CB2936771DFE151B00792E6B /* Just.swift in Sources */, 241F15581E03124600DD70A2 /* UIBarButtonItemExtensions.swift in Sources */, 24CF1FA81EF875A400F34234 /* URLSessionExtensions.swift in Sources */, + 2E53BE092652B6D60030B9FB /* DemandBuffer.swift in Sources */, 2408FA902112A15900B9F59E /* Scheduler.swift in Sources */, + 2E53BE512654717B0030B9FB /* SnailPublisher.swift in Sources */, CBE54E601DFB39510008DD64 /* Event.swift in Sources */, + 2E53BDEE264ED4C70030B9FB /* SnailSubscription.swift in Sources */, F5C973A622F20C86003DB42C /* Disposer.swift in Sources */, CBE54E631DFB39510008DD64 /* Variable.swift in Sources */, CBE54E6D1DFB6A910008DD64 /* UIControlExtensions.swift in Sources */, @@ -341,11 +391,17 @@ buildActionMask = 2147483647; files = ( CB2936791DFEF77500792E6B /* JustTests.swift in Sources */, + 2E53BE132652D8B80030B9FB /* VariableAsPublisherTests.swift in Sources */, + 2E53BE0F2652D4E50030B9FB /* ReplayAsPublisherTests.swift in Sources */, CBE54E651DFB395A0008DD64 /* ObservableTests.swift in Sources */, + 2E53BE0B2652D0960030B9FB /* FailAsPublisherTests.swift in Sources */, F5C973A822F4B359003DB42C /* DisposerTests.swift in Sources */, 24FABD5C1DFF0BAF005CF84E /* ReplayTests.swift in Sources */, DEF9B98D1FD5D8FD00F8514E /* UniqueTests.swift in Sources */, + 2E53BE112652D7370030B9FB /* UniqueAsPublisherTests.swift in Sources */, F569538A232046AA00D35C80 /* ClosureTests.swift in Sources */, + 2E53BE072651F2990030B9FB /* ObservableAsPublisherTests.swift in Sources */, + 2E53BE0D2652D37D0030B9FB /* JustAsPublisherTests.swift in Sources */, CBE54E671DFB4F3F0008DD64 /* VariableTests.swift in Sources */, 24FABD581DFEF7EC005CF84E /* FailTests.swift in Sources */, DEEDA8EB2051BCB4000FE578 /* NotificationCenterExtensions.swift in Sources */, diff --git a/Snail/Snail+Combine/DemandBuffer.swift b/Snail/Snail+Combine/DemandBuffer.swift new file mode 100644 index 0000000..1e82c19 --- /dev/null +++ b/Snail/Snail+Combine/DemandBuffer.swift @@ -0,0 +1,129 @@ +// +// DemandBuffer.swift +// RxCombine +// +// Created by Shai Mishali on 21/02/2020. +// Copyright © 2020 Combine Community. All rights reserved. +// + +import Combine +import Darwin +import Foundation + +/// A buffer responsible for managing the demand of a downstream +/// subscriber for an upstream publisher +/// +/// It buffers values and completion events and forwards them dynamically +/// according to the demand requested by the downstream +/// +/// In a sense, the subscription only relays the requests for demand, as well +/// the events emitted by the upstream — to this buffer, which manages +/// the entire behavior and backpressure contract +@available(iOS 13.0, *) +class DemandBuffer { + private let lock = NSRecursiveLock() + private var buffer = [S.Input]() + private let subscriber: S + private var completion: Subscribers.Completion? + private var demandState = Demand() + + /// Initialize a new demand buffer for a provided downstream subscriber + /// + /// - parameter subscriber: The downstream subscriber demanding events + init(subscriber: S) { + self.subscriber = subscriber + } + + /// Buffer an upstream value to later be forwarded to + /// the downstream subscriber, once it demands it + /// + /// - parameter value: Upstream value to buffer + /// + /// - returns: The demand fulfilled by the bufferr + func buffer(value: S.Input) -> Subscribers.Demand { + precondition(self.completion == nil, + "How could a completed publisher sent values?! Beats me 🤷‍♂️") + + switch demandState.requested { + case .unlimited: + return subscriber.receive(value) + default: + buffer.append(value) + return flush() + } + } + + /// Complete the demand buffer with an upstream completion event + /// + /// This method will deplete the buffer immediately, + /// based on the currently accumulated demand, and relay the + /// completion event down as soon as demand is fulfilled + /// + /// - parameter completion: Completion event + func complete(completion: Subscribers.Completion) { + precondition(self.completion == nil, + "Completion have already occured, which is quite awkward 🥺") + + self.completion = completion + _ = flush() + } + + /// Signal to the buffer that the downstream requested new demand + /// + /// - note: The buffer will attempt to flush as many events rqeuested + /// by the downstream at this point + func demand(_ demand: Subscribers.Demand) -> Subscribers.Demand { + flush(adding: demand) + } + + /// Flush buffered events to the downstream based on the current + /// state of the downstream's demand + /// + /// - parameter newDemand: The new demand to add. If `nil`, the flush isn't the + /// result of an explicit demand change + /// + /// - note: After fulfilling the downstream's request, if completion + /// has already occured, the buffer will be cleared and the + /// completion event will be sent to the downstream subscriber + private func flush(adding newDemand: Subscribers.Demand? = nil) -> Subscribers.Demand { + lock.lock() + defer { lock.unlock() } + + if let newDemand = newDemand { + demandState.requested += newDemand + } + + // If buffer isn't ready for flushing, return immediately + guard demandState.requested > 0 || newDemand == Subscribers.Demand.none else { return .none } + + while !buffer.isEmpty && demandState.processed < demandState.requested { + demandState.requested += subscriber.receive(buffer.remove(at: 0)) + demandState.processed += 1 + } + + if let completion = completion { + // Completion event was already sent + buffer = [] + demandState = .init() + self.completion = nil + subscriber.receive(completion: completion) + return .none + } + + let sentDemand = demandState.requested - demandState.sent + demandState.sent += sentDemand + return sentDemand + } +} + +// MARK: - Private Helpers +@available(iOS 13.0, *) +private extension DemandBuffer { + /// A model that tracks the downstream's + /// accumulated demand state + struct Demand { + var processed: Subscribers.Demand = .none + var requested: Subscribers.Demand = .none + var sent: Subscribers.Demand = .none + } +} diff --git a/Snail/Snail+Combine/Observable+Combine.swift b/Snail/Snail+Combine/Observable+Combine.swift new file mode 100644 index 0000000..e47bb09 --- /dev/null +++ b/Snail/Snail+Combine/Observable+Combine.swift @@ -0,0 +1,11 @@ +// Copyright © 2021 Compass. All rights reserved. + +import Combine +import Foundation + +@available(iOS 13.0, *) +public extension ObservableType { + func asAnyPublisher() -> AnyPublisher { + return SnailPublisher(upstream: self).eraseToAnyPublisher() + } +} diff --git a/Snail/Snail+Combine/SnailPublisher.swift b/Snail/Snail+Combine/SnailPublisher.swift new file mode 100644 index 0000000..7aca47f --- /dev/null +++ b/Snail/Snail+Combine/SnailPublisher.swift @@ -0,0 +1,21 @@ +// Copyright © 2021 Compass. All rights reserved. + +import Combine +import Foundation + +@available(iOS 13.0, *) +public class SnailPublisher: Publisher { + public typealias Output = Upstream.T + public typealias Failure = Error + + private let upstream: Upstream + + init(upstream: Upstream) { + self.upstream = upstream + } + + public func receive(subscriber: S) where Failure == S.Failure, Output == S.Input { + subscriber.receive(subscription: SnailSubscription(upstream: upstream, + downstream: subscriber)) + } +} diff --git a/Snail/Snail+Combine/SnailSubscription.swift b/Snail/Snail+Combine/SnailSubscription.swift new file mode 100644 index 0000000..5be3ec3 --- /dev/null +++ b/Snail/Snail+Combine/SnailSubscription.swift @@ -0,0 +1,37 @@ +// Copyright © 2021 Compass. All rights reserved. + +import Combine +import Foundation + +@available(iOS 13.0, *) +class SnailSubscription: Combine.Subscription where Downstream.Input == Upstream.T, Downstream.Failure == Error { + private var disposable: Subscriber? + private let buffer: DemandBuffer + + init(upstream: Upstream, + downstream: Downstream) { + buffer = DemandBuffer(subscriber: downstream) + disposable = upstream.subscribe(queue: nil, + onNext: { [weak self] value in + guard let self = self else { return } + _ = self.buffer.buffer(value: value) + }, + onError: { [weak self] error in + guard let self = self else { return } + self.buffer.complete(completion: .failure(error)) + }, + onDone: { [weak self] in + guard let self = self else { return } + self.buffer.complete(completion: .finished) + }) + } + + func request(_ demand: Subscribers.Demand) { + _ = self.buffer.demand(demand) + } + + func cancel() { + disposable?.dispose() + disposable = nil + } +} diff --git a/SnailTests/Combine/FailAsPublisherTests.swift b/SnailTests/Combine/FailAsPublisherTests.swift new file mode 100644 index 0000000..51d9aa7 --- /dev/null +++ b/SnailTests/Combine/FailAsPublisherTests.swift @@ -0,0 +1,56 @@ +// Copyright © 2021 Compass. All rights reserved. + +import Combine +import Foundation +@testable import Snail +import XCTest + +@available(iOS 13.0, *) +class FailAsPublisherTests: XCTestCase { + enum TestError: Error { + case test + } + + private var subject: Observable! + private var strings: [String]! + private var error: Error? + private var subscriptions: Set! + + override func setUp() { + super.setUp() + subject = Fail(TestError.test) + strings = [] + subscriptions = Set() + error = nil + } + + override func tearDown() { + subject = nil + strings = nil + error = nil + subscriptions = nil + } + + func testOnErrorIsRun() { + subject.asAnyPublisher() + .sink(receiveCompletion: { [unowned self] completion in + if case let .failure(underlying) = completion { + self.error = underlying as? TestError + } + }, + receiveValue: { _ in }) + .store(in: &subscriptions) + + XCTAssertEqual((error as? TestError), TestError.test) + } + + func testOnNextIsNotRun() { + subject.asAnyPublisher() + .sink(receiveCompletion: { _ in }, + receiveValue: { [unowned self] in self.strings.append($0) }) + .store(in: &subscriptions) + subject?.on(.next("1")) + + XCTAssertEqual(strings?.count, 0) + } +} diff --git a/SnailTests/Combine/JustAsPublisherTests.swift b/SnailTests/Combine/JustAsPublisherTests.swift new file mode 100644 index 0000000..5dc3273 --- /dev/null +++ b/SnailTests/Combine/JustAsPublisherTests.swift @@ -0,0 +1,37 @@ +// Copyright © 2021 Compass. All rights reserved. + +import Combine +import Foundation +@testable import Snail +import XCTest + +@available(iOS 13.0, *) +class JustAsPublisherTests: XCTestCase { + private var subscriptions: Set! + + override func setUp() { + subscriptions = Set() + } + + override func tearDown() { + subscriptions = nil + super.tearDown() + } + + func testJust() { + var result: Int? + var done = false + + Just(1).asAnyPublisher() + .sink(receiveCompletion: { completion in + if case .finished = completion { + done = true + } + }, + receiveValue: { result = $0 }) + .store(in: &subscriptions) + + XCTAssertEqual(1, result) + XCTAssertTrue(done) + } +} diff --git a/SnailTests/Combine/ObservableAsPublisherTests.swift b/SnailTests/Combine/ObservableAsPublisherTests.swift new file mode 100644 index 0000000..b654929 --- /dev/null +++ b/SnailTests/Combine/ObservableAsPublisherTests.swift @@ -0,0 +1,1288 @@ +// Copyright © 2021 Compass. All rights reserved. + +// swiftlint:disable file_length + +import Combine +import Foundation +@testable import Snail +import XCTest + +@available(iOS 13.0, *) +class ObservableAsPublisherTests: XCTestCase { + enum TestError: Error { + case test + } + + private var subject: Observable! + private var strings: [String]! + private var error: Error? + private var done: Bool? + private var disposer: Disposer! + + private var subscriptions: Set! + + override func setUp() { + super.setUp() + subject = Observable() + strings = [] + subscriptions = Set() + error = nil + done = nil + disposer = Disposer() + } + + override func tearDown() { + subject = nil + strings = nil + subscriptions = nil + error = nil + done = nil + disposer = nil + super.tearDown() + } + + func testNext() { + subject.asAnyPublisher() + .sink(receiveCompletion: { [unowned self] completion in + switch completion { + case .finished: + self.done = true + case let .failure(underlyingError): + self.error = underlyingError + } + }, + receiveValue: { [unowned self] in self.strings?.append($0) }) + .store(in: &subscriptions) + + ["1", "2"].forEach { subject?.on(.next($0)) } + + XCTAssertEqual(strings, ["1", "2"]) + } + + func testOnDone() { + subject.asAnyPublisher() + .sink(receiveCompletion: { [unowned self] completion in + switch completion { + case .finished: + self.done = true + case let .failure(underlyingError): + self.error = underlyingError + } + }, + receiveValue: { [unowned self] in self.strings?.append($0) }) + .store(in: &subscriptions) + + subject?.on(.next("1")) + subject?.on(.next("2")) + subject?.on(.done) + subject?.on(.next("3")) + + XCTAssertEqual(strings?.count, 2) + XCTAssertEqual(strings?[0], "1") + XCTAssertEqual(strings?[1], "2") + XCTAssertEqual(done, true) + } + + func testOnError() { + subject.asAnyPublisher() + .sink(receiveCompletion: { [unowned self] completion in + switch completion { + case .finished: + self.done = true + case let .failure(underlyingError): + self.error = underlyingError + } + }, + receiveValue: { [unowned self] in self.strings?.append($0) }) + .store(in: &subscriptions) + + subject?.on(.next("1")) + subject?.on(.next("2")) + subject?.on(.error(TestError.test)) + subject?.on(.next("3")) + XCTAssertEqual(strings?.count, 2) + XCTAssertEqual(strings?[0], "1") + XCTAssertEqual(strings?[1], "2") + XCTAssertEqual(error as? TestError, .test) + } + + func testMultipleSubscribers() { + var more: [String] = [] + subject.subscribe(onNext: { string in + more.append(string) + }).add(to: disposer) + subject.asAnyPublisher() + .sink(receiveCompletion: { _ in }, + receiveValue: { [unowned self] string in + self.strings?.append(string) + } + ) + .store(in: &subscriptions) + + subject?.on(.next("1")) + XCTAssertEqual(strings?.first, more.first) + } + + func testFiresStoppedEventOnSubscribeIfStopped() { + subject?.on(.error(TestError.test)) + + var oldError: TestError? + subject.asAnyPublisher() + .sink(receiveCompletion: { completion in + if case let .failure(underlying) = completion { + oldError = underlying as? TestError + } + }, + receiveValue: { _ in }) + .store(in: &subscriptions) + XCTAssertEqual(oldError, .test) + } + + func testSubscribeOnMainThread() { + var isMainQueue = false + let exp = expectation(description: "queue") + + DispatchQueue.global().async { + self.subject.asAnyPublisher() + .receive(on: DispatchQueue.main) + .sink(receiveCompletion: { _ in }, + receiveValue: { _ in + exp.fulfill() + isMainQueue = Thread.isMainThread + }) + .store(in: &self.subscriptions) + self.subject?.on(.next("1")) + } + + waitForExpectations(timeout: 2) { error in + XCTAssertNil(error) + XCTAssertEqual(isMainQueue, true) + } + } + + func testOnMainThreadNotifiedOnMain() { + var isMainQueue = false + let exp = expectation(description: "queue") + + DispatchQueue.global().async { + self.subject.on(.main) + .asAnyPublisher() + .sink(receiveCompletion: { _ in }, + receiveValue: { _ in + exp.fulfill() + isMainQueue = Thread.isMainThread + }) + .store(in: &self.subscriptions) + self.subject?.on(.next("1")) + } + + waitForExpectations(timeout: 2) { error in + XCTAssertNil(error) + XCTAssertEqual(isMainQueue, true) + } + } + + func testOnGlobalThreadNotifiedOnMain() { + var isMainQueue = false + let exp = expectation(description: "queue") + + self.subject.on(.global()) + .asAnyPublisher() + .receive(on: DispatchQueue.main) + .sink(receiveCompletion: { _ in }, + receiveValue: { _ in + exp.fulfill() + isMainQueue = Thread.isMainThread + }) + .store(in: &self.subscriptions) + self.subject?.on(.next("1")) + + waitForExpectations(timeout: 2) { error in + XCTAssertNil(error) + XCTAssertEqual(isMainQueue, true) + } + } + + func testRemovingSubscribers() { + subject.asAnyPublisher() + .sink(receiveCompletion: { _ in }, + receiveValue: { [unowned self] str in + self.strings.append(str) + }) + .store(in: &subscriptions) + subject?.on(.next("1")) + subject?.removeSubscribers() + subject?.on(.next("2")) + XCTAssertEqual(strings?[0], "1") + XCTAssertEqual(strings?.count, 1) + } + + func testRemoveSubscriber() { + let subscriberToRemove = subject?.subscribe( + onNext: { string in self.strings?.append(string) }, + onError: { error in self.error = error }, + onDone: { self.done = true }) + + subject.asAnyPublisher() + .sink(receiveCompletion: { _ in }, + receiveValue: { [unowned self] str in + self.strings.append(str) + }) + .store(in: &subscriptions) + + subject?.on(.next("1")) + guard let subscriber = subscriberToRemove else { + return + } + subject?.removeSubscriber(subscriber: subscriber) + subject?.on(.next("2")) + XCTAssertEqual(strings?.count, 3) + XCTAssertEqual(strings?[0], "1") + XCTAssertEqual(strings?[1], "1") + XCTAssertEqual(strings?[2], "2") + subject?.removeSubscriber(subscriber: subscriber) + subject?.on(.next("3")) + XCTAssertEqual(strings?.count, 4) + XCTAssertEqual(strings?[0], "1") + XCTAssertEqual(strings?[1], "1") + XCTAssertEqual(strings?[2], "2") + XCTAssertEqual(strings?[3], "3") + } + + func testThrottle() { + let observable = Observable() + var received: [String] = [] + + let exp = expectation(description: "throttle") + let delay = 0.01 + + DispatchQueue.global().asyncAfter(deadline: DispatchTime.now() + delay) { + exp.fulfill() + } + + observable.throttle(delay) + .asAnyPublisher() + .sink(receiveCompletion: { _ in }, + receiveValue: { received.append($0) }) + .store(in: &subscriptions) + + observable.on(.next("1")) + observable.on(.next("2")) + waitForExpectations(timeout: delay*2) { _ in + XCTAssertEqual(received.count, 1) + XCTAssertEqual(received.first, "2") + } + } + + func testThrottleDelays() { + let observable = Observable() + var received: [String] = [] + + let exp = expectation(description: "debounce") + let delay = 0.01 + + DispatchQueue.main.asyncAfter(deadline: DispatchTime.now() + delay) { + observable.on(.next("2")) + observable.on(.next("3")) + DispatchQueue.main.asyncAfter(deadline: DispatchTime.now() + delay) { + exp.fulfill() + } + } + + observable.throttle(delay) + .asAnyPublisher() + .sink(receiveCompletion: { _ in }, + receiveValue: { received.append($0) }) + .store(in: &subscriptions) + + observable.on(.next("1")) + + waitForExpectations(timeout: 1) { _ in + XCTAssertEqual(received.count, 2) + XCTAssertEqual(received.first, "1") + XCTAssertEqual(received.last, "3") + } + } + + func testDebounce() { + let observable = Observable() + var received: [String] = [] + + let exp = expectation(description: "debounce") + let delay = 0.02 + + DispatchQueue.main.asyncAfter(deadline: DispatchTime.now() + delay/2) { + observable.on(.next("2")) + DispatchQueue.main.asyncAfter(deadline: DispatchTime.now() + delay/2) { + observable.on(.next("3")) + DispatchQueue.main.asyncAfter(deadline: DispatchTime.now() + delay) { + exp.fulfill() + } + } + } + + observable.debounce(delay) + .asAnyPublisher() + .sink(receiveCompletion: { _ in }, + receiveValue: { received.append($0) }) + .store(in: &subscriptions) + + observable.on(.next("1")) + + waitForExpectations(timeout: 1) { _ in + XCTAssertEqual(received.count, 1) + XCTAssertEqual(received.first, "3") + } + } + + func testSkipFirst() { + let observable = Observable() + var received: [String] = [] + + observable.skip(first: 2) + .asAnyPublisher() + .sink(receiveCompletion: { _ in }, + receiveValue: { received.append($0) }) + .store(in: &subscriptions) + + observable.on(.next("1")) + observable.on(.next("2")) + observable.on(.next("3")) + XCTAssertEqual(received.count, 1) + XCTAssertEqual(received.first, "3") + } + + func testSkipError() { + let observable = Observable() + + var error: TestError? + observable.skip(first: 2) + .asAnyPublisher() + .sink(receiveCompletion: { completion in + if case let .failure(underlying) = completion { + error = underlying as? TestError + } + }, + receiveValue: { _ in }) + .store(in: &subscriptions) + observable.on(.error(TestError.test)) + + XCTAssertEqual(error, .test) + } + + func testSkipDone() { + let observable = Observable() + var done = false + + observable.skip(first: 2) + .asAnyPublisher() + .sink(receiveCompletion: { completion in + if case .finished = completion { + done = true + } + }, + receiveValue: { _ in }) + .store(in: &subscriptions) + + observable.on(.done) + + XCTAssertEqual(done, true) + } + + func testTakeFirst() { + let observable = Observable() + var received: [String] = [] + + observable.take(first: 2) + .asAnyPublisher() + .sink(receiveCompletion: { _ in }, + receiveValue: { received.append($0) }) + .store(in: &subscriptions) + + observable.on(.next("1")) + observable.on(.next("2")) + observable.on(.next("3")) + + XCTAssertEqual(received, ["1", "2"]) + } + + func testTakeError() { + let observable = Observable() + + var error: TestError? + observable.take(first: 2) + .asAnyPublisher() + .sink(receiveCompletion: { completion in + if case let .failure(underlying) = completion { + error = underlying as? TestError + } + }, + receiveValue: { _ in }) + .store(in: &subscriptions) + + observable.on(.error(TestError.test)) + + XCTAssertEqual(error, .test) + } + + func testTakeDone() { + let observable = Observable() + var done = false + + observable.take(first: 2) + .asAnyPublisher() + .sink(receiveCompletion: { completion in + if case .finished = completion { + done = true + } + }, + receiveValue: { _ in }) + .store(in: &subscriptions) + + observable.on(.done) + + XCTAssertEqual(done, true) + } + + func testTakeDoneWhenCountIsReached() { + let observable = Observable() + var done = false + + observable.take(first: 2) + .asAnyPublisher() + .sink(receiveCompletion: { completion in + if case .finished = completion { + done = true + } + }, + receiveValue: { _ in }) + .store(in: &subscriptions) + + observable.on(.next("1")) + observable.on(.next("2")) + + XCTAssertEqual(done, true) + } + + func testForward() { + var received: [String] = [] + var receivedError: TestError? + + let subject = Observable() + let observable = Observable() + observable.forward(to: subject) + + subject.asAnyPublisher() + .sink(receiveCompletion: { completion in + if case let .failure(underlying) = completion { + receivedError = underlying as? TestError + } + }, + receiveValue: { received.append($0) }) + .store(in: &subscriptions) + + observable.on(.next("1")) + observable.on(.error(TestError.test)) + + XCTAssertEqual(received.count, 1) + XCTAssertEqual(received.first, "1") + XCTAssertEqual(receivedError, .test) + } + + func testForwardDone() { + var received: [String] = [] + + let subject = Observable() + let observable = Observable() + observable.forward(to: subject) + + subject.asAnyPublisher() + .sink(receiveCompletion: { _ in }, + receiveValue: { received.append($0) }) + .store(in: &subscriptions) + + observable.on(.next("1")) + observable.on(.done) + + XCTAssertEqual(received.count, 1) + XCTAssertEqual(received.first, "1") + } + + func testMerge() { + var received: [String] = [] + + let a = Observable() + let b = Observable() + + let subject = Observable.merge([a, b]) + + subject.asAnyPublisher() + .sink(receiveCompletion: { _ in }, + receiveValue: { received.append($0) }) + .store(in: &subscriptions) + + a.on(.next("1")) + b.on(.next("2")) + b.on(.done) + + XCTAssertEqual(received.count, 2) + XCTAssertEqual(received.first, "1") + XCTAssertEqual(received.last, "2") + } + + func testMergeWithoutArray() { + var received: [String] = [] + + let a = Observable() + let b = Observable() + + let subject = Observable.merge(a, b) + + subject.asAnyPublisher() + .sink(receiveCompletion: { _ in }, + receiveValue: { received.append($0) }) + .store(in: &subscriptions) + + a.on(.next("1")) + b.on(.next("2")) + b.on(.done) + + XCTAssertEqual(received.count, 2) + XCTAssertEqual(received.first, "1") + XCTAssertEqual(received.last, "2") + } + + func testCombineLatestNonOptional() { + var received: [String] = [] + + let string = Observable() + let int = Observable() + + let subject = Observable.combineLatest(string, int) + + subject.asAnyPublisher() + .sink(receiveCompletion: { _ in }, + receiveValue: { string, int in + received.append(("\(string): \(int)")) + }) + .store(in: &subscriptions) + + string.on(.next("The value")) + string.on(.next("The number")) + int.on(.next(1)) + int.on(.next(2)) + string.on(.next("The digit")) + int.on(.next(3)) + int.on(.done) + + XCTAssertEqual(received.count, 4) + XCTAssertEqual(received[0], "The number: 1") + XCTAssertEqual(received[1], "The number: 2") + XCTAssertEqual(received[2], "The digit: 2") + XCTAssertEqual(received[3], "The digit: 3") + } + + func testCombineLatestOptional() { + var received: [String] = [] + + let string = Observable() + let int = Observable() + + let subject = Observable.combineLatest(string, int) + + subject.asAnyPublisher() + .sink(receiveCompletion: { _ in }, + receiveValue: { string, int in + received.append(("\(string ?? ""): \(int ?? 0)")) + }) + .store(in: &subscriptions) + + string.on(.next("The value")) + string.on(.next("The number")) + int.on(.next(1)) + int.on(.next(nil)) + string.on(.next(nil)) + int.on(.next(3)) + string.on(.done) + + XCTAssertEqual(received.count, 4) + XCTAssertEqual(received[0], "The number: 1") + XCTAssertEqual(received[1], "The number: 0") + XCTAssertEqual(received[2], ": 0") + XCTAssertEqual(received[3], ": 3") + } + + func testCombineLatestError_firstMember() { + var received: [String] = [] + + let string = Observable() + let int = Observable() + + let subject = Observable.combineLatest(string, int) + + subject.asAnyPublisher() + .sink(receiveCompletion: { completion in + if case .failure = completion { + received.append("ERROR") + } + }, + receiveValue: { string, int in + received.append(("\(string): \(int)")) + }) + .store(in: &subscriptions) + + string.on(.next("The number")) + int.on(.next(1)) + string.on(.error(TestError.test)) + + string.on(.next("The digit")) + int.on(.next(2)) + string.on(.error(TestError.test)) + + XCTAssertEqual(received.count, 2) + XCTAssertEqual(received.first, "The number: 1") + XCTAssertEqual(received.last, "ERROR") + } + + func testCombineLatestError_secondMember() { + var received: [String] = [] + + let string = Observable() + let int = Observable() + + let subject = Observable.combineLatest(string, int) + + subject.asAnyPublisher() + .sink(receiveCompletion: { completion in + if case .failure = completion { + received.append("ERROR") + } + }, + receiveValue: { _ in }) + .store(in: &subscriptions) + + int.on(.error(TestError.test)) + int.on(.next(1)) + + XCTAssertEqual(received.count, 1) + XCTAssertEqual(received.first, "ERROR") + } + + func testCombineLatestDone_whenAllDone() { + let obs1 = Observable() + let obs2 = Observable() + + var isDone = false + Observable.combineLatest(obs1, obs2) + .asAnyPublisher() + .sink(receiveCompletion: { completion in + if case .finished = completion { + isDone = true + } + }, + receiveValue: { _ in }) + .store(in: &subscriptions) + + obs1.on(.done) + XCTAssertFalse(isDone) + + obs2.on(.done) + XCTAssertTrue(isDone) + } + + func testCombineLatest3() { + let one = Observable() + let two = Observable() + let three = Observable() + + var received = [(String, Int, Double)]() + let subject = Observable.combineLatest(one, two, three) + + subject.asAnyPublisher() + .sink(receiveCompletion: { _ in }, + receiveValue: { received.append($0) }) + .store(in: &subscriptions) + + one.on(.next("The string")) + XCTAssertTrue(received.isEmpty) + + two.on(.next(1)) + XCTAssertTrue(received.isEmpty) + + three.on(.next(100.5)) + XCTAssertEqual(received[0].0, "The string") + XCTAssertEqual(received[0].1, 1) + XCTAssertEqual(received[0].2, 100.5) + } + + func testCombineLatest3Error_fromSecondMember() { + let one = Observable() + let two = Observable() + let three = Observable() + let subject = Observable.combineLatest(one, two, three) + + let exp = expectation(description: "combineLatest3 forwards error from observable") + subject.asAnyPublisher() + .sink(receiveCompletion: { completion in + if case .failure = completion { + exp.fulfill() + } + }, + receiveValue: { _ in }) + .store(in: &subscriptions) + two.on(.error(TestError.test)) + waitForExpectations(timeout: 1) + } + + func testCombineLatest3Error_fromThirdMember() { + let one = Observable() + let two = Observable() + let three = Observable() + let subject = Observable.combineLatest(one, two, three) + + let exp = expectation(description: "combineLatest3 forwards error from observable") + subject.asAnyPublisher() + .sink(receiveCompletion: { completion in + if case .failure = completion { + exp.fulfill() + } + }, + receiveValue: { _ in }) + .store(in: &subscriptions) + three.on(.error(TestError.test)) + waitForExpectations(timeout: 1) + } + + func testCombineLatest3Done_whenAllDone() { + let obs1 = Observable() + let obs2 = Observable() + let obs3 = Observable() + + var isDone = false + Observable.combineLatest(obs1, obs2, obs3) + .asAnyPublisher() + .sink(receiveCompletion: { completion in + if case .finished = completion { + isDone = true + } + }, + receiveValue: { _ in }) + .store(in: &subscriptions) + + obs1.on(.done) + XCTAssertFalse(isDone) + + obs2.on(.done) + XCTAssertFalse(isDone) + + obs3.on(.done) + XCTAssertTrue(isDone) + } + + func testCombineLatest3Optional() { + let one = Observable() + let two = Observable() + let three = Observable() + + var received = [(String, Int?, Double)]() + let subject = Observable.combineLatest(one, two, three) + + subject.asAnyPublisher() + .sink(receiveCompletion: { _ in }, + receiveValue: { received.append($0) }) + .store(in: &subscriptions) + + one.on(.next("The string")) + XCTAssertTrue(received.isEmpty) + + two.on(.next(nil)) + XCTAssertTrue(received.isEmpty) + + three.on(.next(100.5)) + XCTAssertEqual(received[0].0, "The string") + XCTAssertEqual(received[0].1, nil) + XCTAssertEqual(received[0].2, 100.5) + } + + func testCombineLatest4() { + let one = Observable() + let two = Observable() + let three = Observable() + let four = Observable() + + var received = [(String, Int, Double, String)]() + let subject = Observable.combineLatest(one, two, three, four) + + subject.asAnyPublisher() + .sink(receiveCompletion: { _ in }, + receiveValue: { received.append($0) }) + .store(in: &subscriptions) + + one.on(.next("The string")) + XCTAssertTrue(received.isEmpty) + + two.on(.next(1)) + XCTAssertTrue(received.isEmpty) + + three.on(.next(100.5)) + XCTAssertTrue(received.isEmpty) + + four.on(.next("The other string")) + XCTAssertEqual(received[0].0, "The string") + XCTAssertEqual(received[0].1, 1) + XCTAssertEqual(received[0].2, 100.5) + XCTAssertEqual(received[0].3, "The other string") + } + + func testCombineLatest4Error_fromFirstMember() { + let one = Observable() + let two = Observable() + let three = Observable() + let four = Observable() + let subject = Observable.combineLatest(one, two, three, four) + + let exp = expectation(description: "combineLatest4 forwards error from observable") + subject.asAnyPublisher() + .sink(receiveCompletion: { completion in + if case .failure = completion { + exp.fulfill() + } + }, + receiveValue: { _ in }) + .store(in: &subscriptions) + one.on(.error(TestError.test)) + waitForExpectations(timeout: 1) + } + + func testCombineLatest4Error_fromSecondMember() { + let one = Observable() + let two = Observable() + let three = Observable() + let four = Observable() + let subject = Observable.combineLatest(one, two, three, four) + + let exp = expectation(description: "combineLatest4 forwards error from observable") + subject.asAnyPublisher() + .sink(receiveCompletion: { completion in + if case .failure = completion { + exp.fulfill() + } + }, + receiveValue: { _ in }) + .store(in: &subscriptions) + two.on(.error(TestError.test)) + waitForExpectations(timeout: 1) + } + + func testCombineLatest4Error_fromThirdMember() { + let one = Observable() + let two = Observable() + let three = Observable() + let four = Observable() + let subject = Observable.combineLatest(one, two, three, four) + + let exp = expectation(description: "combineLatest4 forwards error from observable") + subject.asAnyPublisher() + .sink(receiveCompletion: { completion in + if case .failure = completion { + exp.fulfill() + } + }, + receiveValue: { _ in }) + .store(in: &subscriptions) + three.on(.error(TestError.test)) + waitForExpectations(timeout: 1) + } + + func testCombineLatest4Error_fromFourthMember() { + let one = Observable() + let two = Observable() + let three = Observable() + let four = Observable() + let subject = Observable.combineLatest(one, two, three, four) + + let exp = expectation(description: "combineLatest4 forwards error from observable") + subject.asAnyPublisher() + .sink(receiveCompletion: { completion in + if case .failure = completion { + exp.fulfill() + } + }, + receiveValue: { _ in }) + .store(in: &subscriptions) + four.on(.error(TestError.test)) + waitForExpectations(timeout: 1) + } + + func testCombineLatest4Done_whenAllDone() { + let obs1 = Observable() + let obs2 = Observable() + let obs3 = Observable() + let obs4 = Observable() + + var isDone = false + Observable.combineLatest(obs1, obs2, obs3, obs4) + .asAnyPublisher() + .sink(receiveCompletion: { completion in + if case .finished = completion { + isDone = true + } + }, + receiveValue: { _ in }) + .store(in: &subscriptions) + + obs1.on(.done) + XCTAssertFalse(isDone) + + obs2.on(.done) + XCTAssertFalse(isDone) + + obs3.on(.done) + XCTAssertFalse(isDone) + + obs4.on(.done) + XCTAssertTrue(isDone) + } + + func testCombineLatest4Optional() { + let one = Observable() + let two = Observable() + let three = Observable() + let four = Observable() + + var received = [(String, Int?, Double, String?)]() + let subject = Observable.combineLatest(one, two, three, four) + + subject.asAnyPublisher() + .sink(receiveCompletion: { _ in }, + receiveValue: { received.append($0) }) + .store(in: &subscriptions) + + one.on(.next("The string")) + XCTAssertTrue(received.isEmpty) + + two.on(.next(nil)) + XCTAssertTrue(received.isEmpty) + + three.on(.next(100.5)) + XCTAssertTrue(received.isEmpty) + + four.on(.next(nil)) + XCTAssertEqual(received[0].0, "The string") + XCTAssertEqual(received[0].1, nil) + XCTAssertEqual(received[0].2, 100.5) + XCTAssertEqual(received[0].3, nil) + } + + func testObservableMap() { + let observable = Observable() + let subject = observable.map { "Number: \($0)" } + var received = [String]() + + subject.asAnyPublisher() + .sink(receiveCompletion: { _ in }, + receiveValue: { received.append($0) }) + .store(in: &subscriptions) + + observable.on(.next(1)) + observable.on(.next(10)) + + XCTAssertEqual(received, ["Number: 1", "Number: 10"]) + } + + func testObservableMapError() { + let observable = Observable() + let subject = observable.map { "Number: \($0)" } + + let exp = expectation(description: "observable map forwards error") + subject.asAnyPublisher() + .sink(receiveCompletion: { completion in + if case .failure = completion { + exp.fulfill() + } + }, + receiveValue: { _ in }) + .store(in: &subscriptions) + + observable.on(.error(TestError.test)) + + waitForExpectations(timeout: 1) + } + + func testObservableMapDone() { + let observable = Observable() + let subject = observable.map { "Number: \($0)" } + + let exp = expectation(description: "observable map forwards done") + subject.asAnyPublisher() + .sink(receiveCompletion: { completion in + if case .finished = completion { + exp.fulfill() + } + }, + receiveValue: { _ in }) + .store(in: &subscriptions) + + observable.on(.done) + + waitForExpectations(timeout: 1) + } + + func testObservableFilter() { + let observable = Observable() + let subject = observable.filter { $0 % 2 == 0 } + var received = [Int]() + + subject.asAnyPublisher() + .sink(receiveCompletion: { _ in }, + receiveValue: { received.append($0) }) + .store(in: &subscriptions) + + observable.on(.next(1)) + observable.on(.next(2)) + observable.on(.next(8)) + observable.on(.next(5)) + + XCTAssertEqual(received, [2, 8]) + } + + func testObservableFilterError() { + let observable = Observable() + let subject = observable.filter { $0 % 2 == 0 } + + let exp = expectation(description: "observable filter forwards error") + subject.asAnyPublisher() + .sink(receiveCompletion: { completion in + if case .failure = completion { + exp.fulfill() + } + }, + receiveValue: { _ in }) + .store(in: &subscriptions) + + observable.on(.error(TestError.test)) + + waitForExpectations(timeout: 1) + } + + func testObservableFilterDone() { + let observable = Observable() + let subject = observable.filter { $0 % 2 == 0 } + + let exp = expectation(description: "observable filter forwards done") + subject.asAnyPublisher() + .sink(receiveCompletion: { completion in + if case .finished = completion { + exp.fulfill() + } + }, + receiveValue: { _ in }) + .store(in: &subscriptions) + + observable.on(.done) + + waitForExpectations(timeout: 1) + } + + func testObservableFlatMap() { + let fetchTrigger = Observable() + let subject = fetchTrigger.flatMap { Variable(100).asObservable() } + var received = [Int]() + + subject.asAnyPublisher() + .sink(receiveCompletion: { _ in }, + receiveValue: { received.append($0) }) + .store(in: &subscriptions) + fetchTrigger.on(.next(())) + + XCTAssertEqual(received, [100]) + } + + func testObservableFlatMapError() { + let fetchTrigger = Observable() + let subject = fetchTrigger.flatMap { Variable(100).asObservable() } + + let exp = expectation(description: "observable flatMap forwards error") + subject.asAnyPublisher() + .sink(receiveCompletion: { completion in + if case .failure = completion { + exp.fulfill() + } + }, + receiveValue: { _ in }) + .store(in: &subscriptions) + fetchTrigger.on(.error(TestError.test)) + + waitForExpectations(timeout: 1) + } + + func testObservableFlatMapDone() { + let fetchTrigger = Observable() + let subject = fetchTrigger.flatMap { Variable(100).asObservable() } + + let exp = expectation(description: "observable flatMap forwards done") + subject.asAnyPublisher() + .sink(receiveCompletion: { completion in + if case .finished = completion { + exp.fulfill() + } + }, + receiveValue: { _ in }) + .store(in: &subscriptions) + fetchTrigger.on(.done) + + waitForExpectations(timeout: 1) + } + + func testZipNonOptional() { + var received: [String] = [] + + let string = Observable() + let int = Observable() + + let subject = Observable.zip(string, int) + + subject.asAnyPublisher() + .sink(receiveCompletion: { _ in }, + receiveValue: { string, int in + received.append("\(string): \(int)") + }) + .store(in: &subscriptions) + + string.on(.next("The value")) + string.on(.next("The number")) + int.on(.next(1)) + int.on(.next(2)) + string.on(.next("The digit")) + int.on(.next(3)) + int.on(.done) + + XCTAssertEqual(received.count, 3) + XCTAssertEqual(received[0], "The value: 1") + XCTAssertEqual(received[1], "The number: 2") + XCTAssertEqual(received[2], "The digit: 3") + } + + func testZipOptional() { + var received: [String] = [] + + let string = Observable() + let int = Observable() + + let subject = Observable.zip(string, int) + + subject.asAnyPublisher() + .sink(receiveCompletion: { _ in }, + receiveValue: { string, int in + received.append("\(string ?? ""): \(int ?? 0)") + }) + .store(in: &subscriptions) + + string.on(.next("The value")) + string.on(.next("The number")) + int.on(.next(1)) + int.on(.next(nil)) + string.on(.next(nil)) + int.on(.next(3)) + string.on(.done) + + XCTAssertEqual(received.count, 3) + XCTAssertEqual(received[0], "The value: 1") + XCTAssertEqual(received[1], "The number: 0") + XCTAssertEqual(received[2], ": 3") + } + + func testZipCountEqualToSourceWithFewestEmissions() { + var received: [String] = [] + + let string = Observable() + let int = Observable() + + let subject = Observable.zip(string, int) + + subject.asAnyPublisher() + .sink(receiveCompletion: { _ in }, + receiveValue: { string, int in + received.append("\(string): \(int)") + }) + .store(in: &subscriptions) + + string.on(.next("The value")) + string.on(.next("The number")) + int.on(.next(1)) + int.on(.next(2)) + string.on(.next("The digit")) + int.on(.next(3)) + int.on(.next(4)) + int.on(.next(5)) + int.on(.next(6)) + int.on(.done) + + XCTAssertEqual(received.count, 3) + XCTAssertEqual(received[0], "The value: 1") + XCTAssertEqual(received[1], "The number: 2") + XCTAssertEqual(received[2], "The digit: 3") + } + + func testZipError() { + let one = Observable() + let two = Observable() + + let subject = Observable.zip(one, two) + + let exp = expectation(description: "zip forwards error from observable") + subject.asAnyPublisher() + .sink(receiveCompletion: { completion in + if case .failure = completion { + exp.fulfill() + } + }, + receiveValue: { _ in }) + .store(in: &subscriptions) + one.on(.error(TestError.test)) + + waitForExpectations(timeout: 1) + } + + func testZipDone() { + let one = Observable() + let two = Observable() + + var isDone = false + Observable.zip(one, two) + .asAnyPublisher() + .sink(receiveCompletion: { completion in + if case .finished = completion { + isDone = true + } + }, + receiveValue: { _ in }) + .store(in: &subscriptions) + + one.on(.done) + XCTAssertTrue(isDone) + } + + func testNoDeadLock() { + class TestObject { + let disposer = Disposer() + + init(variable: Variable) { + variable.asObservable().subscribe() + .add(to: disposer) + } + } + + let subject = Variable(true) + + subject.asObservable() + .asAnyPublisher() + .sink(receiveCompletion: { _ in }, + receiveValue: { _ in + _ = TestObject(variable: subject) + }) + .store(in: &subscriptions) + + subject.value = true + } +} diff --git a/SnailTests/Combine/ReplayAsPublisherTests.swift b/SnailTests/Combine/ReplayAsPublisherTests.swift new file mode 100644 index 0000000..8e34741 --- /dev/null +++ b/SnailTests/Combine/ReplayAsPublisherTests.swift @@ -0,0 +1,39 @@ +// Copyright © 2021 Compass. All rights reserved. + +import Combine +import Foundation +@testable import Snail +import XCTest + +@available(iOS 13.0, *) +class ReplayAsPublisherTests: XCTestCase { + private var subject: Replay! + private var subscriptions: Set! + + override func setUp() { + super.setUp() + subject = Replay(2) + subscriptions = Set() + } + + override func tearDown() { + subject = nil + subscriptions = nil + super.tearDown() + } + + func testReplay() { + var strings: [String] = [] + subject?.on(.next("1")) + subject?.on(.next("2")) + subject?.on(.done) + + subject.asAnyPublisher() + .sink(receiveCompletion: { _ in }, + receiveValue: { strings.append($0) }) + .store(in: &subscriptions) + + XCTAssertEqual(strings[0], "1") + XCTAssertEqual(strings[1], "2") + } +} diff --git a/SnailTests/Combine/UniqueAsPublisherTests.swift b/SnailTests/Combine/UniqueAsPublisherTests.swift new file mode 100644 index 0000000..a19f1ea --- /dev/null +++ b/SnailTests/Combine/UniqueAsPublisherTests.swift @@ -0,0 +1,103 @@ +// Copyright © 2021 Compass. All rights reserved. + +import Combine +import Foundation +@testable import Snail +import XCTest + +@available(iOS 13.0, *) +class UniqueAsPublisherTests: XCTestCase { + private var subscriptions: Set! + + override func setUp() { + subscriptions = Set() + } + + override func tearDown() { + subscriptions = nil + } + + func testVariableChanges() { + var events: [String?] = [] + let subject = Unique(nil) + subject.asObservable() + .asAnyPublisher() + .sink(receiveCompletion: { _ in }, + receiveValue: { events.append($0) }) + .store(in: &subscriptions) + + subject.value = nil + subject.value = "1" + subject.value = "2" + subject.value = "2" + subject.value = "2" + XCTAssertEqual(events[0], nil) + XCTAssertEqual(events[1], "1") + XCTAssertEqual(events[2], "2") + XCTAssertEqual(subject.value, "2") + XCTAssertEqual(events.count, 3) + } + + func testVariableNotifiesOnSubscribe() { + let subject = Unique("initial") + subject.value = "new" + var result: String? + + subject.asObservable() + .asAnyPublisher() + .sink(receiveCompletion: { _ in }, + receiveValue: { result = $0 }) + .store(in: &subscriptions) + + XCTAssertEqual("new", result) + } + + func testVariableNotifiesInitialOnSubscribe() { + let subject = Unique("initial") + var result: String? + + subject.asObservable() + .asAnyPublisher() + .sink(receiveCompletion: { _ in }, + receiveValue: { result = $0 }) + .store(in: &subscriptions) + + XCTAssertEqual("initial", result) + } + + func testVariableHandlesEquatableArrays() { + var events: [[String]] = [] + let subject = Unique<[String]>(["1", "2"]) + subject.asObservable() + .asAnyPublisher() + .sink(receiveCompletion: { _ in }, + receiveValue: { events.append($0) }) + .store(in: &subscriptions) + + subject.value = ["1", "2"] + subject.value = ["2", "1"] + subject.value = ["2", "1"] + subject.value = ["1", "2"] + XCTAssertEqual(events[0], ["1", "2"]) + XCTAssertEqual(events[1], ["2", "1"]) + XCTAssertEqual(events[2], ["1", "2"]) + } + + func testVariableHandlesOptionalArrays() { + var events: [[String]?] = [] + let subject = Unique<[String]?>(nil) + subject.asObservable() + .asAnyPublisher() + .sink(receiveCompletion: { _ in }, + receiveValue: { events.append($0) }) + .store(in: &subscriptions) + subject.value = ["1", "2"] + subject.value = nil + subject.value = nil + subject.value = ["1", "2"] + XCTAssertEqual(events[0], nil) + XCTAssertEqual(events[1], ["1", "2"]) + XCTAssertEqual(events[2], nil) + XCTAssertEqual(events[3], ["1", "2"]) + } +} diff --git a/SnailTests/Combine/VariableAsPublisherTests.swift b/SnailTests/Combine/VariableAsPublisherTests.swift new file mode 100644 index 0000000..0ae8edb --- /dev/null +++ b/SnailTests/Combine/VariableAsPublisherTests.swift @@ -0,0 +1,119 @@ +// Copyright © 2021 Compass. All rights reserved. + +import Combine +import Foundation +@testable import Snail +import XCTest + +@available(iOS 13.0, *) +class VariableAsPublisherTests: XCTestCase { + private var subscriptions: Set! + + override func setUp() { + subscriptions = Set() + } + + override func tearDown() { + subscriptions = nil + } + + func testVariableChanges() { + var events: [String?] = [] + let subject = Variable(nil) + subject.asObservable() + .asAnyPublisher() + .sink(receiveCompletion: { _ in }, + receiveValue: { events.append($0) }) + .store(in: &subscriptions) + subject.value = "1" + subject.value = "2" + XCTAssertEqual(events[0], nil) + XCTAssertEqual(events[1], "1") + XCTAssertEqual(events[2], "2") + XCTAssertEqual(subject.value, "2") + } + + func testVariableNotifiesOnSubscribe() { + let subject = Variable("initial") + subject.value = "new" + var result: String? + + subject.asObservable() + .asAnyPublisher() + .sink(receiveCompletion: { _ in }, + receiveValue: { result = $0 }) + .store(in: &subscriptions) + + XCTAssertEqual("new", result) + } + + func testVariableNotifiesInitialOnSubscribe() { + let subject = Variable("initial") + var result: String? + + subject.asObservable() + .asAnyPublisher() + .sink(receiveCompletion: { _ in }, + receiveValue: { result = $0 }) + .store(in: &subscriptions) + + XCTAssertEqual("initial", result) + } + + func testMappedVariableNotifiesOnSubscribe() { + let subject = Variable("initial") + subject.value = "new" + var subjectCharactersCount: Int? + + subject.asObservable() + .asAnyPublisher() + .sink(receiveCompletion: { _ in }, + receiveValue: { subjectCharactersCount = $0.count }) + .store(in: &subscriptions) + + XCTAssertEqual(subject.value.count, subjectCharactersCount) + } + + func testMappedVariableNotifiesInitialOnSubscribe() { + let subject = Variable("initial") + var subjectCharactersCount: Int? + + subject.asObservable() + .asAnyPublisher() + .sink(receiveCompletion: { _ in }, + receiveValue: { subjectCharactersCount = $0.count }) + .store(in: &subscriptions) + + XCTAssertEqual(subject.value.count, subjectCharactersCount) + } + + func testUniqueFireCounts() { + let subject = Unique("sameValue") + var firedCount = 0 + + subject.asObservable() + .asAnyPublisher() + .sink(receiveCompletion: { _ in }, + receiveValue: { _ in firedCount += 1 }) + .store(in: &subscriptions) + + subject.value = "sameValue" + + XCTAssertEqual(firedCount, 1) + } + + func testVariableFireCounts() { + let subject = Variable("sameValue") + var firedCount = 0 + + subject.asObservable() + .asAnyPublisher() + .sink(receiveCompletion: { _ in }, + receiveValue: { _ in firedCount += 1 }) + .store(in: &subscriptions) + + subject.value = "sameValue" + + XCTAssertEqual(firedCount, 2) + } +}