diff --git a/Package.swift b/Package.swift index fc233cd2..73e5cf0a 100644 --- a/Package.swift +++ b/Package.swift @@ -39,6 +39,10 @@ let package = Package( name: "Ably", package: "ably-cocoa" ), + .product( + name: "AsyncAlgorithms", + package: "swift-async-algorithms" + ), ] ), .testTarget( diff --git a/Sources/AblyChat/RoomLifecycleManager.swift b/Sources/AblyChat/RoomLifecycleManager.swift index 867f6d3c..9333d0a4 100644 --- a/Sources/AblyChat/RoomLifecycleManager.swift +++ b/Sources/AblyChat/RoomLifecycleManager.swift @@ -1,4 +1,5 @@ -import Ably +@preconcurrency import Ably +import AsyncAlgorithms /// The interface that the lifecycle manager expects its contributing realtime channels to conform to. /// @@ -11,6 +12,13 @@ internal protocol RoomLifecycleContributorChannel: Sendable { var state: ARTRealtimeChannelState { get async } var errorReason: ARTErrorInfo? { get async } + + // TODO: consider the consequences of this async (right now, it's just to make it easy to write a mock using an actor), but from a semantics point of view does it make sense in the same way as the above ones? + func subscribeToState() async -> Subscription + + // TODO: this really isn't the right place for this to go, move elsewhere + // TODO: again, this `async` is a bit dodgy, it's just there so we can use an actor to manage some subscription state + func emitDiscontinuity(_ error: ARTErrorInfo) async } internal actor RoomLifecycleManager { @@ -22,8 +30,22 @@ internal actor RoomLifecycleManager { internal var channel: Channel } + // TODO: something intelligent to say about this beyond that it's a term used in the spec. Exposed for tests + internal struct PendingDiscontinuityEvent { + internal var error: ARTErrorInfo + } + + /// Stores manager state relating to a given contributor. + private struct ContributorAnnotation { + var pendingDiscontinuityEvents: [PendingDiscontinuityEvent] = [] + } + internal private(set) var current: RoomLifecycle internal private(set) var error: ARTErrorInfo? + // TODO: link this to what the manager is actually doing + private var hasOperationInProgress: Bool + /// The annotation at a given index belongs to the element of ``contributors`` at the same index. + private var contributorAnnotations: [ContributorAnnotation] private let logger: InternalLogger private let clock: SimpleClock @@ -32,6 +54,7 @@ internal actor RoomLifecycleManager { internal init(contributors: [Contributor], logger: InternalLogger, clock: SimpleClock) async { await self.init( current: nil, + hasOperationInProgress: nil, contributors: contributors, logger: logger, clock: clock @@ -39,9 +62,10 @@ internal actor RoomLifecycleManager { } #if DEBUG - internal init(testsOnly_current current: RoomLifecycle? = nil, contributors: [Contributor], logger: InternalLogger, clock: SimpleClock) async { + internal init(testsOnly_current current: RoomLifecycle? = nil, testsOnly_hasOperationInProgress hasOperationInProgress: Bool? = nil, contributors: [Contributor], logger: InternalLogger, clock: SimpleClock) async { await self.init( current: current, + hasOperationInProgress: hasOperationInProgress, contributors: contributors, logger: logger, clock: clock @@ -49,13 +73,49 @@ internal actor RoomLifecycleManager { } #endif - private init(current: RoomLifecycle?, contributors: [Contributor], logger: InternalLogger, clock: SimpleClock) async { + private init(current: RoomLifecycle?, hasOperationInProgress: Bool?, contributors: [Contributor], logger: InternalLogger, clock: SimpleClock) async { self.current = current ?? .initialized + self.hasOperationInProgress = hasOperationInProgress ?? false self.contributors = contributors + contributorAnnotations = Array(repeating: .init(), count: contributors.count) self.logger = logger self.clock = clock + + // The idea here is to make sure that, before the initializer completes, we are already listening for state changes, so that e.g. tests don’t miss a state change. + let subscriptions = await withTaskGroup(of: Subscription.self) { group in + for contributor in contributors { + group.addTask { + await contributor.channel.subscribeToState() + } + } + + return await Array(group) + } + + // TODO: who owns this task? how does it get cancelled? how do we know when it's started and that the manager is "ready to go"? Do we need an async initializer? + Task { + await withTaskGroup(of: Void.self) { group in + for (index, subscription) in subscriptions.enumerated() { + // TODO: this capture + // TODO: is await what we want? is there a way to make the manager's initializer isolated to the manager? + // TODO: this @Sendable was to make a mysterious compiler error go away + group.addTask { @Sendable in + for await stateChange in subscription { + // TODO: why does this not inherit the actor isolation? (I mean, now that I have @Sendable, I get it) + await self.didReceiveStateChange(stateChange, forContributorAt: index) + } + } + } + } + } } + #if DEBUG + internal func testsOnly_pendingDiscontinuityEventsForContributor(at index: Int) -> [PendingDiscontinuityEvent] { + contributorAnnotations[index].pendingDiscontinuityEvents + } + #endif + // TODO: clean up old subscriptions (https://github.com/ably-labs/ably-chat-swift/issues/36) private var subscriptions: [Subscription] = [] @@ -65,6 +125,110 @@ internal actor RoomLifecycleManager { return subscription } + #if DEBUG + // TODO: explain — these are to let the tests know that it's handled a state change + private var stateChangeHandledSubscriptions: [Subscription] = [] + + internal func testsOnly_subscribeToHandledContributorStateChanges() -> Subscription { + let subscription = Subscription(bufferingPolicy: .unbounded) + stateChangeHandledSubscriptions.append(subscription) + return subscription + } + #endif + + // TODO: this is only async because it needs to call `emitDiscontinuity`; that's not great. update: now it's also calling detach on channels but that probably shouldn't be inline, need clarification) + /// Implements CHA-RL4b’s contributor state change handling. + private func didReceiveStateChange(_ stateChange: ARTChannelStateChange, forContributorAt index: Int) async { + logger.log(message: "Got state change \(stateChange) for contributor at index \(index)", level: .info) + + switch stateChange.event { + case .update: + // CHA-RL4a1 — if RESUMED then no-op + guard !stateChange.resumed else { + break + } + + guard let reason = stateChange.reason else { + // TODO: is this OK? would be good if ably-cocoa could communicate this in types + preconditionFailure("State change event with resumed == false should have a reason") + } + + if hasOperationInProgress { + // CHA-RL4a3 + logger.log(message: "Recording pending discontinuity event for contributor at index \(index)", level: .info) + + contributorAnnotations[index].pendingDiscontinuityEvents.append( + .init(error: reason) + ) + } else { + // CHA-RL4a4 + logger.log(message: "Emitting discontinuity event for contributor at index \(index)", level: .info) + + let contributor = contributors[index] + await contributor.channel.emitDiscontinuity(reason) + } + case .attached: + if hasOperationInProgress { + if !stateChange.resumed { + // CHA-RL4b1 + logger.log(message: "Recording pending discontinuity event for contributor at index \(index)", level: .info) + + guard let reason = stateChange.reason else { + // TODO: same question as above about whether this is OK + preconditionFailure("State change event with resumed == false should have a reason") + } + + contributorAnnotations[index].pendingDiscontinuityEvents.append( + .init(error: reason) + ) + } + } else if current != .attached { + if await (contributors.async.map { await $0.channel.state }.allSatisfy { $0 == .attached }) { + // CHA-RL4b8 + logger.log(message: "Now that all contributors are ATTACHED, transitioning room to ATTACHED", level: .info) + changeStatus(to: .attached) + } + } + case .failed: + if !hasOperationInProgress { + // CHA-RL4b5 + guard let reason = stateChange.reason else { + // TODO: same question as above about whether this is OK + preconditionFailure("FAILED state change event should have a reason") + } + + changeStatus(to: .failed, error: reason) + + // TODO: CHA-RL4b5 is a bit unclear about how to handle failure, and whether they can be detached concurrently (asked in https://github.com/ably/specification/pull/200/files#r1777471810) + for contributor in contributors { + do { + try await contributor.channel.detach() + } catch { + logger.log(message: "Failed to detach contributor \(contributor), error \(error)", level: .info) + } + } + } + case .suspended: + if !hasOperationInProgress { + // CHA-RL4b9 + guard let reason = stateChange.reason else { + // TODO: same question as above about whether this is OK + preconditionFailure("SUSPENDED state change event should have a reason") + } + + changeStatus(to: .suspended, error: reason) + } + default: + break + } + + #if DEBUG + for subscription in stateChangeHandledSubscriptions { + subscription.emit(stateChange) + } + #endif + } + /// Updates ``current`` and ``error`` and emits a status change event. private func changeStatus(to new: RoomLifecycle, error: ARTErrorInfo? = nil) { logger.log(message: "Transitioning from \(current) to \(new), error \(String(describing: error))", level: .info) diff --git a/Tests/AblyChatTests/Mocks/MockRoomLifecycleContributorChannel.swift b/Tests/AblyChatTests/Mocks/MockRoomLifecycleContributorChannel.swift index d23539af..c13b046b 100644 --- a/Tests/AblyChatTests/Mocks/MockRoomLifecycleContributorChannel.swift +++ b/Tests/AblyChatTests/Mocks/MockRoomLifecycleContributorChannel.swift @@ -1,4 +1,4 @@ -import Ably +@preconcurrency import Ably @testable import AblyChat final actor MockRoomLifecycleContributorChannel: RoomLifecycleContributorChannel { @@ -7,9 +7,12 @@ final actor MockRoomLifecycleContributorChannel: RoomLifecycleContributorChannel var state: ARTRealtimeChannelState var errorReason: ARTErrorInfo? + // TODO: clean up + private var subscriptions: [Subscription] = [] private(set) var attachCallCount = 0 private(set) var detachCallCount = 0 + private(set) var emitDiscontinuityArguments: [ARTErrorInfo] = [] init( initialState: ARTRealtimeChannelState, @@ -92,4 +95,20 @@ final actor MockRoomLifecycleContributorChannel: RoomLifecycleContributorChannel throw error } } + + func subscribeToState() -> Subscription { + let subscription = Subscription(bufferingPolicy: .unbounded) + subscriptions.append(subscription) + return subscription + } + + func emitStateChange(_ stateChange: ARTChannelStateChange) { + for subscription in subscriptions { + subscription.emit(stateChange) + } + } + + func emitDiscontinuity(_ error: ARTErrorInfo) async { + emitDiscontinuityArguments.append(error) + } } diff --git a/Tests/AblyChatTests/RoomLifecycleManagerTests.swift b/Tests/AblyChatTests/RoomLifecycleManagerTests.swift index f632fb62..1b6a5b1c 100644 --- a/Tests/AblyChatTests/RoomLifecycleManagerTests.swift +++ b/Tests/AblyChatTests/RoomLifecycleManagerTests.swift @@ -1,4 +1,4 @@ -import Ably +@preconcurrency import Ably @testable import AblyChat import Testing @@ -29,10 +29,11 @@ struct RoomLifecycleManagerTests { private func createManager( forTestingWhatHappensWhenCurrentlyIn current: RoomLifecycle? = nil, + forTestingWhatHappensWhenHasOperationInProgress hasOperationInProgress: Bool? = nil, contributors: [RoomLifecycleManager.Contributor] = [], clock: SimpleClock = MockSimpleClock() ) async -> RoomLifecycleManager { - await .init(testsOnly_current: current, contributors: contributors, logger: TestLogger(), clock: clock) + await .init(testsOnly_current: current, testsOnly_hasOperationInProgress: hasOperationInProgress, contributors: contributors, logger: TestLogger(), clock: clock) } private func createContributor( @@ -662,4 +663,243 @@ struct RoomLifecycleManagerTests { #expect(await manager.current == .released) } + + // MARK: - Handling contributor UPDATE events + + // @spec CHA-RL4a1 + @Test + func contributorUpdate_withResumedTrue_doesNothing() async throws { + // Given: A RoomLifecycleManager + let contributor = createContributor() + let manager = await createManager(contributors: [contributor]) + + // When: A contributor emits an UPDATE event with `resumed` flag set to true + let contributorStateChange = ARTChannelStateChange( + current: .attached, // arbitrary + previous: .attached, // arbitrary + event: .update, + reason: ARTErrorInfo(domain: "SomeDomain", code: 123), // arbitrary + resumed: true + ) + + // TODO: explain + let handledContributorStateChangesSubscription = await manager.testsOnly_subscribeToHandledContributorStateChanges() + async let handledContributorStateChangeSignal = handledContributorStateChangesSubscription.first { $0 === contributorStateChange } + + await contributor.channel.emitStateChange(contributorStateChange) + + // TODO: explain + _ = try #require(await handledContributorStateChangeSignal) + + // Then: The manager does not record a pending discontinuity event for this contributor, nor does it call `emitDiscontinuity` on the contributor (this is my interpretation of "no action should be taken" in CHA-RL4a1; i.e. that the actions described in CHA-RL4a2 and CHA-RL4a3 shouldn’t happen) (TODO: get clarification; have asked in https://github.com/ably/specification/pull/200#discussion_r1777385499) + #expect(await manager.testsOnly_pendingDiscontinuityEventsForContributor(at: 0).isEmpty) + #expect(await contributor.channel.emitDiscontinuityArguments.isEmpty) + } + + // @spec CHA-RL4a3 + @Test + func contributorUpdate_withResumedFalse_withOperationInProgress_recordsPendingDiscontinuityEvent() async throws { + // Given: A RoomLifecycleManager, with a room lifecycle operation in progress + let contributor = createContributor() + let manager = await createManager(forTestingWhatHappensWhenHasOperationInProgress: true, contributors: [contributor]) + + // When: A contributor emits an UPDATE event with `resumed` flag set to false + let contributorStateChange = ARTChannelStateChange( + current: .attached, // arbitrary + previous: .attached, // arbitrary + event: .update, + reason: ARTErrorInfo(domain: "SomeDomain", code: 123), // arbitrary + resumed: false + ) + + // TODO: explain + let handledContributorStateChangesSubscription = await manager.testsOnly_subscribeToHandledContributorStateChanges() + async let handledContributorStateChangeSignal = handledContributorStateChangesSubscription.first { $0 === contributorStateChange } + + await contributor.channel.emitStateChange(contributorStateChange) + + // TODO: explain + _ = try #require(await handledContributorStateChangeSignal) + + // Then: The manager records a pending discontinuity event for this contributor, and this discontinuity event has error equal to the contributor UPDATE event’s `reason` + let pendingDiscontinuityEvents = await manager.testsOnly_pendingDiscontinuityEventsForContributor(at: 0) + try #require(pendingDiscontinuityEvents.count == 1) + + let pendingDiscontinuityEvent = pendingDiscontinuityEvents[0] + #expect(pendingDiscontinuityEvent.error === contributorStateChange.reason) + } + + // @spec CHA-RL4a4 + @Test + func contributorUpdate_withResumedTrue_withNoOperationInProgress_emitsDiscontinuityEvent() async throws { + // Given: A RoomLifecycleManager, with no room lifecycle operation in progress + let contributor = createContributor() + let manager = await createManager(forTestingWhatHappensWhenHasOperationInProgress: false, contributors: [contributor]) + + // When: A contributor emits an UPDATE event with `resumed` flag set to false + let contributorStateChange = ARTChannelStateChange( + current: .attached, // arbitrary + previous: .attached, // arbitrary + event: .update, + reason: ARTErrorInfo(domain: "SomeDomain", code: 123), // arbitrary + resumed: false + ) + + // TODO: explain + let handledContributorStateChangesSubscription = await manager.testsOnly_subscribeToHandledContributorStateChanges() + async let handledContributorStateChangeSignal = handledContributorStateChangesSubscription.first { $0 === contributorStateChange } + + await contributor.channel.emitStateChange(contributorStateChange) + + // TODO: explain + _ = try #require(await handledContributorStateChangeSignal) + + // Then: The manager calls `emitDiscontinuity` on the contributor, with error equal to the contributor UPDATE event’s `reason` + let emitDiscontinuityArguments = await contributor.channel.emitDiscontinuityArguments + try #require(emitDiscontinuityArguments.count == 1) + + let discontinuity = emitDiscontinuityArguments[0] + #expect(discontinuity === contributorStateChange.reason) + } + + // @specPartial CHA-RL4b1 - I don’t know the meaning of "and the particular contributor has been attached previously" so haven’t implemented that part of the spec point (TODO: asked in https://github.com/ably/specification/pull/200/files#r1775552624) + @Test + func contributorAttachEvent_withResumeFalse_withOperationInProgress_recordsPendingDiscontinuityEvent() async throws { + // Given: A RoomLifecycleManager, with a room lifecycle operation in progress + let contributor = createContributor() + let manager = await createManager(forTestingWhatHappensWhenHasOperationInProgress: true, contributors: [contributor]) + + // When: A contributor emits an ATTACHED event with `resumed` flag set to false + let contributorStateChange = ARTChannelStateChange( + current: .attached, + previous: .attaching, // arbitrary + event: .attached, + reason: ARTErrorInfo(domain: "SomeDomain", code: 123), // arbitrary + resumed: false + ) + // TODO: (threading) — read this from the event or the contributor? + + // TODO: explain + let handledContributorStateChangesSubscription = await manager.testsOnly_subscribeToHandledContributorStateChanges() + async let handledContributorStateChangeSignal = handledContributorStateChangesSubscription.first { $0 === contributorStateChange } + + await contributor.channel.emitStateChange(contributorStateChange) + + // TODO: explain + _ = try #require(await handledContributorStateChangeSignal) + + // Then: The manager records a pending discontinuity event for this contributor, and this discontinuity event has error equal to the contributor ATTACHED event’s `reason` + let pendingDiscontinuityEvents = await manager.testsOnly_pendingDiscontinuityEventsForContributor(at: 0) + try #require(pendingDiscontinuityEvents.count == 1) + + let pendingDiscontinuityEvent = pendingDiscontinuityEvents[0] + #expect(pendingDiscontinuityEvent.error === contributorStateChange.reason) + } + + // @specPartial CHA-RL4b5 - Haven’t implemented the part that refers to "transient disconnect timeouts"; TODO do this (https://github.com/ably-labs/ably-chat-swift/issues/48) + @Test + func contributorFailedEvent_withNoOperationInProgress() async throws { + // Given: A RoomLifecycleManager, with no room lifecycle operation in progress + let contributors = [ + // TODO: I think success is fine, the spec doesn't say what to do in response anyway + createContributor(detachBehavior: .success), + createContributor(detachBehavior: .success), + ] + let manager = await createManager(forTestingWhatHappensWhenHasOperationInProgress: false, contributors: contributors) + + let roomStatusSubscription = await manager.onChange(bufferingPolicy: .unbounded) + async let failedStatusChange = roomStatusSubscription.first { $0.current == .failed } + + // When: A contributor emits an FAILED event + let contributorStateChange = ARTChannelStateChange( + current: .failed, + previous: .attached, // arbitrary + event: .failed, + reason: ARTErrorInfo(domain: "SomeDomain", code: 123), // arbitrary + resumed: false // arbitrary + ) + + // TODO: explain + let handledContributorStateChangesSubscription = await manager.testsOnly_subscribeToHandledContributorStateChanges() + async let handledContributorStateChangeSignal = handledContributorStateChangesSubscription.first { $0 === contributorStateChange } + + await contributors[0].channel.emitStateChange(contributorStateChange) + + // TODO: explain + _ = try #require(await handledContributorStateChangeSignal) + + // Then: + // - the room status transitions to failed, with the error of the status change being the `reason` of the contributor FAILED event + // - and it calls `detach` on all contributors + _ = try #require(await failedStatusChange) + #expect(await manager.current == .failed) + + for contributor in contributors { + #expect(await contributor.channel.detachCallCount == 1) + } + } + + // @spec CHA-RL4b8 + // TODO: should I a test for the case where not all attached? but again you have the business of "how do you know it hasn't" + @Test + func contributorAttachedEvent_withNoOperationInProgress_roomNotAttached_allContributorsAttached() async throws { + // Given: A RoomLifecycleManager, not in the ATTACHED state, all of whose contributors are in the ATTACHED state (to satisfy the condition of CHA-RL4b8; for the purposes of this test I don’t care that they’re in this state even _before_ the state change of the When) + let contributors = [ + createContributor(initialState: .attached), + createContributor(initialState: .attached), + ] + + let manager = await createManager( + forTestingWhatHappensWhenCurrentlyIn: .initialized, // arbitrary non-ATTACHED + forTestingWhatHappensWhenHasOperationInProgress: false, + contributors: contributors + ) + + let roomStatusSubscription = await manager.onChange(bufferingPolicy: .unbounded) + async let maybeAttachedRoomStatusChange = roomStatusSubscription.first { $0.current == .attached } + + // When: A contributor emits a state change to ATTACHED + let contributorStateChange = ARTChannelStateChange( + current: .attached, + previous: .attaching, // arbitrary + event: .attached, + reason: ARTErrorInfo(domain: "SomeDomain", code: 123), // arbitrary + resumed: false // arbitrary + ) + + await contributors[0].channel.emitStateChange(contributorStateChange) + + // Then: The room status transitions to ATTACHED + _ = try #require(await maybeAttachedRoomStatusChange) + #expect(await manager.current == .attached) + } + + // @specPartial CHA-RL4b9 - Haven’t implemented the part that refers to "transient disconnect timeouts"; TODO do this (https://github.com/ably-labs/ably-chat-swift/issues/48). Nor have I implemented "the room enters the RETRY loop"; TODO do this (https://github.com/ably-labs/ably-chat-swift/issues/51) + @Test + func contributorSuspendedEvent_withNoOperationInProgress() async throws { + // Given: A RoomLifecycleManager with no lifecycle operation in progress + let contributor = createContributor() + let manager = await createManager(forTestingWhatHappensWhenHasOperationInProgress: false, contributors: [contributor]) + + let roomStatusSubscription = await manager.onChange(bufferingPolicy: .unbounded) + async let maybeSuspendedRoomStatusChange = roomStatusSubscription.first { $0.current == .suspended } + + // When: A contributor emits a state change to SUSPENDED + let contributorStateChange = ARTChannelStateChange( + current: .suspended, + previous: .attached, // arbitrary + event: .suspended, + reason: ARTErrorInfo(domain: "SomeDomain", code: 123), // arbitrary + resumed: false // arbitrary + ) + + await contributor.channel.emitStateChange(contributorStateChange) + + // Then: The room transitions to SUSPENDED, and this state change has error equal to the contributor state change’s `reason` + let suspendedRoomStatusChange = try #require(await maybeSuspendedRoomStatusChange) + #expect(suspendedRoomStatusChange.error === contributorStateChange.reason) + + #expect(await manager.current == .suspended) + #expect(await manager.error === contributorStateChange.reason) + } }