diff --git a/chat-android/build.gradle.kts b/chat-android/build.gradle.kts index 9f116f4b..8890d600 100644 --- a/chat-android/build.gradle.kts +++ b/chat-android/build.gradle.kts @@ -45,6 +45,7 @@ buildConfig { dependencies { api(libs.ably.android) implementation(libs.gson) + implementation(libs.coroutine.core) testImplementation(libs.junit) testImplementation(libs.mockk) diff --git a/chat-android/src/main/java/com/ably/chat/AtomicCoroutineScope.kt b/chat-android/src/main/java/com/ably/chat/AtomicCoroutineScope.kt new file mode 100644 index 00000000..e9770635 --- /dev/null +++ b/chat-android/src/main/java/com/ably/chat/AtomicCoroutineScope.kt @@ -0,0 +1,93 @@ +package com.ably.chat + +import java.util.concurrent.PriorityBlockingQueue +import kotlin.coroutines.cancellation.CancellationException +import kotlin.coroutines.coroutineContext +import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.cancelChildren +import kotlinx.coroutines.launch + +/** + * AtomicCoroutineScope is a thread safe wrapper to run multiple operations mutually exclusive. + * All operations are atomic and run with given priority. + * Accepts scope as a constructor parameter to run operations under the given scope. + * See [Kotlin Dispatchers](https://kt.academy/article/cc-dispatchers) for more information. + */ +class AtomicCoroutineScope(private val scope: CoroutineScope = CoroutineScope(Dispatchers.Default)) { + + private val sequentialScope: CoroutineScope = CoroutineScope(Dispatchers.Default.limitedParallelism(1)) + + private class Job( + private val priority: Int, + val coroutineBlock: suspend CoroutineScope.() -> T, + val deferredResult: CompletableDeferred, + val queuedPriority: Int, + ) : Comparable> { + override fun compareTo(other: Job<*>) = when { + this.priority == other.priority -> this.queuedPriority.compareTo(other.queuedPriority) + else -> this.priority.compareTo(other.priority) + } + } + + // Handles jobs of any type + private val jobs: PriorityBlockingQueue> = PriorityBlockingQueue() // Accessed from both sequentialScope and async method + private var isRunning = false // Only accessed from sequentialScope + private var queueCounter = 0 // Only accessed from synchronized method + + val finishedProcessing: Boolean + get() = jobs.isEmpty() && !isRunning + + val pendingJobCount: Int + get() = jobs.size + + /** + * Defines priority for the operation execution and + * executes given coroutineBlock mutually exclusive under given scope. + */ + @Synchronized + fun async(priority: Int = 0, coroutineBlock: suspend CoroutineScope.() -> T): CompletableDeferred { + val deferredResult = CompletableDeferred() + jobs.add(Job(priority, coroutineBlock, deferredResult, queueCounter++)) + sequentialScope.launch { + if (!isRunning) { + isRunning = true + while (jobs.isNotEmpty()) { + val job = jobs.poll() + job?.let { + safeExecute(it) + } + } + isRunning = false + } + } + return deferredResult + } + + private suspend fun safeExecute(job: Job) { + try { + // Appends coroutineContext to cancel current/pending jobs when AtomicCoroutineScope is cancelled + scope.launch(coroutineContext) { + try { + val result = job.coroutineBlock(this) + job.deferredResult.complete(result) + } catch (t: Throwable) { + job.deferredResult.completeExceptionally(t) + } + }.join() + } catch (t: Throwable) { + job.deferredResult.completeExceptionally(t) + } + } + + /** + * Cancels ongoing and pending operations with given error. + * See [Coroutine cancellation](https://kt.academy/article/cc-cancellation#cancellation-in-a-coroutine-scope) for more information. + */ + @Synchronized + fun cancel(message: String?, cause: Throwable? = null) { + queueCounter = 0 + sequentialScope.coroutineContext.cancelChildren(CancellationException(message, cause)) + } +} diff --git a/chat-android/src/main/java/com/ably/chat/ChatApi.kt b/chat-android/src/main/java/com/ably/chat/ChatApi.kt index 2bc74d91..db7af8aa 100644 --- a/chat-android/src/main/java/com/ably/chat/ChatApi.kt +++ b/chat-android/src/main/java/com/ably/chat/ChatApi.kt @@ -87,7 +87,7 @@ internal class ChatApi(private val realtimeClient: RealtimeClient, private val c ErrorInfo( "Metadata contains reserved 'ably-chat' key", HttpStatusCodes.BadRequest, - ErrorCodes.InvalidRequestBody, + ErrorCodes.InvalidRequestBody.errorCode, ), ) } @@ -98,7 +98,7 @@ internal class ChatApi(private val realtimeClient: RealtimeClient, private val c ErrorInfo( "Headers contains reserved key with reserved 'ably-chat' prefix", HttpStatusCodes.BadRequest, - ErrorCodes.InvalidRequestBody, + ErrorCodes.InvalidRequestBody.errorCode, ), ) } diff --git a/chat-android/src/main/java/com/ably/chat/Discontinuities.kt b/chat-android/src/main/java/com/ably/chat/Discontinuities.kt new file mode 100644 index 00000000..61048411 --- /dev/null +++ b/chat-android/src/main/java/com/ably/chat/Discontinuities.kt @@ -0,0 +1,55 @@ +package com.ably.chat + +import io.ably.lib.types.ErrorInfo +import io.ably.lib.util.EventEmitter +import io.ably.lib.util.Log +import io.ably.lib.realtime.ChannelBase as AblyRealtimeChannel + +/** + * Represents an object that has a channel and therefore may care about discontinuities. + */ +interface HandlesDiscontinuity { + /** + * A promise of the channel that this object is associated with. The promise + * is resolved when the feature has finished initializing. + */ + val channel: AblyRealtimeChannel + + /** + * Called when a discontinuity is detected on the channel. + * @param reason The error that caused the discontinuity. + */ + fun discontinuityDetected(reason: ErrorInfo?) +} + +/** + * An interface to be implemented by objects that can emit discontinuities to listeners. + */ +interface EmitsDiscontinuities { + /** + * Register a listener to be called when a discontinuity is detected. + * @param listener The listener to be called when a discontinuity is detected. + */ + fun onDiscontinuity(listener: Listener): Subscription + + /** + * An interface for listening when discontinuity happens + */ + fun interface Listener { + /** + * A function that can be called when discontinuity happens. + * @param reason reason for discontinuity + */ + fun discontinuityEmitted(reason: ErrorInfo?) + } +} + +class DiscontinuityEmitter : EventEmitter() { + override fun apply(listener: EmitsDiscontinuities.Listener?, event: String?, vararg args: Any?) { + try { + listener?.discontinuityEmitted(args[0] as? ErrorInfo?) + } catch (t: Throwable) { + Log.e("DiscontinuityEmitter", "Unexpected exception calling Discontinuity Listener", t) + } + } +} diff --git a/chat-android/src/main/java/com/ably/chat/EmitsDiscontinuities.kt b/chat-android/src/main/java/com/ably/chat/EmitsDiscontinuities.kt deleted file mode 100644 index 07412f42..00000000 --- a/chat-android/src/main/java/com/ably/chat/EmitsDiscontinuities.kt +++ /dev/null @@ -1,25 +0,0 @@ -package com.ably.chat - -import io.ably.lib.types.ErrorInfo - -/** - * An interface to be implemented by objects that can emit discontinuities to listeners. - */ -interface EmitsDiscontinuities { - /** - * Register a listener to be called when a discontinuity is detected. - * @param listener The listener to be called when a discontinuity is detected. - */ - fun onDiscontinuity(listener: Listener): Subscription - - /** - * An interface for listening when discontinuity happens - */ - fun interface Listener { - /** - * A function that can be called when discontinuity happens. - * @param reason reason for discontinuity - */ - fun discontinuityEmitted(reason: ErrorInfo?) - } -} diff --git a/chat-android/src/main/java/com/ably/chat/ErrorCodes.kt b/chat-android/src/main/java/com/ably/chat/ErrorCodes.kt index b683aed9..a86dd8ef 100644 --- a/chat-android/src/main/java/com/ably/chat/ErrorCodes.kt +++ b/chat-android/src/main/java/com/ably/chat/ErrorCodes.kt @@ -3,105 +3,106 @@ package com.ably.chat /** * Error codes for the Chat SDK. */ -object ErrorCodes { +enum class ErrorCodes(val errorCode: Int) { + /** * The messages feature failed to attach. */ - const val MessagesAttachmentFailed = 102_001 + MessagesAttachmentFailed(102_001), /** * The presence feature failed to attach. */ - const val PresenceAttachmentFailed = 102_002 + PresenceAttachmentFailed(102_002), /** * The reactions feature failed to attach. */ - const val ReactionsAttachmentFailed = 102_003 + ReactionsAttachmentFailed(102_003), /** * The occupancy feature failed to attach. */ - const val OccupancyAttachmentFailed = 102_004 + OccupancyAttachmentFailed(102_004), /** * The typing feature failed to attach. */ - const val TypingAttachmentFailed = 102_005 - // 102006 - 102049 reserved for future use for attachment errors + TypingAttachmentFailed(102_005), + // 102_006 - 102_049 reserved for future use for attachment errors /** * The messages feature failed to detach. */ - const val MessagesDetachmentFailed = 102_050 + MessagesDetachmentFailed(102_050), /** * The presence feature failed to detach. */ - const val PresenceDetachmentFailed = 102_051 + PresenceDetachmentFailed(102_051), /** * The reactions feature failed to detach. */ - const val ReactionsDetachmentFailed = 102_052 + ReactionsDetachmentFailed(102_052), /** * The occupancy feature failed to detach. */ - const val OccupancyDetachmentFailed = 102_053 + OccupancyDetachmentFailed(102_053), /** * The typing feature failed to detach. */ - const val TypingDetachmentFailed = 102_054 - // 102055 - 102099 reserved for future use for detachment errors + TypingDetachmentFailed(102_054), + // 102_055 - 102_099 reserved for future use for detachment errors /** * The room has experienced a discontinuity. */ - const val RoomDiscontinuity = 102_100 + RoomDiscontinuity(102_100), // Unable to perform operation; /** * Cannot perform operation because the room is in a failed state. */ - const val RoomInFailedState = 102_101 + RoomInFailedState(102_101), /** * Cannot perform operation because the room is in a releasing state. */ - const val RoomIsReleasing = 102_102 + RoomIsReleasing(102_102), /** * Cannot perform operation because the room is in a released state. */ - const val RoomIsReleased = 102_103 + RoomIsReleased(102_103), /** * Cannot perform operation because the previous operation failed. */ - const val PreviousOperationFailed = 102_104 + PreviousOperationFailed(102_104), /** * An unknown error has happened in the room lifecycle. */ - const val RoomLifecycleError = 102_105 + RoomLifecycleError(102_105), /** * The request cannot be understood */ - const val BadRequest = 40_000 + BadRequest(40_000), /** * Invalid request body */ - const val InvalidRequestBody = 40_001 + InvalidRequestBody(40_001), /** * Internal error */ - const val InternalError = 50_000 + InternalError(50_000), } /** diff --git a/chat-android/src/main/java/com/ably/chat/Messages.kt b/chat-android/src/main/java/com/ably/chat/Messages.kt index e0c8ca84..c30b072b 100644 --- a/chat-android/src/main/java/com/ably/chat/Messages.kt +++ b/chat-android/src/main/java/com/ably/chat/Messages.kt @@ -5,13 +5,13 @@ package com.ably.chat import com.ably.chat.QueryOptions.MessageOrder.NewestFirst import com.google.gson.JsonObject import io.ably.lib.realtime.AblyRealtime -import io.ably.lib.realtime.Channel import io.ably.lib.realtime.ChannelState import io.ably.lib.realtime.ChannelStateListener import io.ably.lib.types.AblyException import io.ably.lib.types.ErrorInfo +import io.ably.lib.realtime.Channel as AblyRealtimeChannel -typealias PubSubMessageListener = Channel.MessageListener +typealias PubSubMessageListener = AblyRealtimeChannel.MessageListener typealias PubSubMessage = io.ably.lib.types.Message /** @@ -26,7 +26,7 @@ interface Messages : EmitsDiscontinuities { * * @returns the realtime channel */ - val channel: Channel + val channel: AblyRealtimeChannel /** * Subscribe to new messages in this chat room. @@ -207,7 +207,7 @@ internal class DefaultMessagesSubscription( ErrorInfo( "The `end` parameter is specified and is more recent than the subscription point timeserial", HttpStatusCodes.BadRequest, - ErrorCodes.BadRequest, + ErrorCodes.BadRequest.errorCode, ), ) } @@ -225,7 +225,7 @@ internal class DefaultMessages( private val roomId: String, realtimeChannels: AblyRealtime.Channels, private val chatApi: ChatApi, -) : Messages { +) : Messages, ContributesToRoomLifecycleImpl(), ResolvedContributor { private var listeners: Map> = emptyMap() @@ -239,7 +239,13 @@ internal class DefaultMessages( */ private val messagesChannelName = "$roomId::\$chat::\$chatMessages" - override val channel: Channel = realtimeChannels.get(messagesChannelName, ChatChannelOptions()) + override val channel = realtimeChannels.get(messagesChannelName, ChatChannelOptions()) + + override val contributor: ContributesToRoomLifecycle = this + + override val attachmentErrorCode: ErrorCodes = ErrorCodes.MessagesAttachmentFailed + + override val detachmentErrorCode: ErrorCodes = ErrorCodes.MessagesDetachmentFailed init { channelStateListener = ChannelStateListener { @@ -253,7 +259,7 @@ internal class DefaultMessages( addListener(listener, deferredChannelSerial) val messageListener = PubSubMessageListener { val pubSubMessage = it ?: throw AblyException.fromErrorInfo( - ErrorInfo("Got empty pubsub channel message", HttpStatusCodes.BadRequest, ErrorCodes.BadRequest), + ErrorInfo("Got empty pubsub channel message", HttpStatusCodes.BadRequest, ErrorCodes.BadRequest.errorCode), ) val data = parsePubSubMessageData(pubSubMessage.data) val chatMessage = Message( @@ -284,7 +290,7 @@ internal class DefaultMessages( ErrorInfo( "This messages subscription instance was already unsubscribed", HttpStatusCodes.BadRequest, - ErrorCodes.BadRequest, + ErrorCodes.BadRequest.errorCode, ), ) }, @@ -295,10 +301,6 @@ internal class DefaultMessages( override suspend fun send(params: SendMessageParams): Message = chatApi.sendMessage(roomId, params) - override fun onDiscontinuity(listener: EmitsDiscontinuities.Listener): Subscription { - TODO("Not yet implemented") - } - fun release() { channel.off(channelStateListener) } @@ -323,14 +325,22 @@ internal class DefaultMessages( private fun requireChannelSerial(): String { return channel.properties.channelSerial ?: throw AblyException.fromErrorInfo( - ErrorInfo("Channel has been attached, but channelSerial is not defined", HttpStatusCodes.BadRequest, ErrorCodes.BadRequest), + ErrorInfo( + "Channel has been attached, but channelSerial is not defined", + HttpStatusCodes.BadRequest, + ErrorCodes.BadRequest.errorCode, + ), ) } private fun requireAttachSerial(): String { return channel.properties.attachSerial ?: throw AblyException.fromErrorInfo( - ErrorInfo("Channel has been attached, but attachSerial is not defined", HttpStatusCodes.BadRequest, ErrorCodes.BadRequest), + ErrorInfo( + "Channel has been attached, but attachSerial is not defined", + HttpStatusCodes.BadRequest, + ErrorCodes.BadRequest.errorCode, + ), ) } diff --git a/chat-android/src/main/java/com/ably/chat/Occupancy.kt b/chat-android/src/main/java/com/ably/chat/Occupancy.kt index a9e18ed1..83e9ec4c 100644 --- a/chat-android/src/main/java/com/ably/chat/Occupancy.kt +++ b/chat-android/src/main/java/com/ably/chat/Occupancy.kt @@ -2,7 +2,7 @@ package com.ably.chat -import io.ably.lib.realtime.Channel +import io.ably.lib.realtime.Channel as AblyRealtimeChannel /** * This interface is used to interact with occupancy in a chat room: subscribing to occupancy updates and @@ -16,7 +16,7 @@ interface Occupancy : EmitsDiscontinuities { * * @returns The underlying Ably channel for occupancy events. */ - val channel: Channel + val channel: AblyRealtimeChannel /** * Subscribe a given listener to occupancy updates of the chat room. @@ -61,9 +61,15 @@ data class OccupancyEvent( internal class DefaultOccupancy( private val messages: Messages, -) : Occupancy { - override val channel: Channel - get() = messages.channel +) : Occupancy, ContributesToRoomLifecycleImpl(), ResolvedContributor { + + override val channel = messages.channel + + override val contributor: ContributesToRoomLifecycle = this + + override val attachmentErrorCode: ErrorCodes = ErrorCodes.OccupancyAttachmentFailed + + override val detachmentErrorCode: ErrorCodes = ErrorCodes.OccupancyDetachmentFailed override fun subscribe(listener: Occupancy.Listener): Subscription { TODO("Not yet implemented") @@ -72,8 +78,4 @@ internal class DefaultOccupancy( override suspend fun get(): OccupancyEvent { TODO("Not yet implemented") } - - override fun onDiscontinuity(listener: EmitsDiscontinuities.Listener): Subscription { - TODO("Not yet implemented") - } } diff --git a/chat-android/src/main/java/com/ably/chat/Presence.kt b/chat-android/src/main/java/com/ably/chat/Presence.kt index 2973e48c..c095fc4c 100644 --- a/chat-android/src/main/java/com/ably/chat/Presence.kt +++ b/chat-android/src/main/java/com/ably/chat/Presence.kt @@ -3,8 +3,9 @@ package com.ably.chat import android.text.PrecomputedText.Params -import io.ably.lib.realtime.Channel +import io.ably.lib.types.ErrorInfo import io.ably.lib.types.PresenceMessage +import io.ably.lib.realtime.Channel as AblyRealtimeChannel typealias PresenceData = Any @@ -19,7 +20,7 @@ interface Presence : EmitsDiscontinuities { * Get the underlying Ably realtime channel used for presence in this chat room. * @returns The realtime channel. */ - val channel: Channel + val channel: AblyRealtimeChannel /** * Method to get list of the current online users and returns the latest presence messages associated to it. @@ -131,10 +132,15 @@ data class PresenceEvent( internal class DefaultPresence( private val messages: Messages, -) : Presence { +) : Presence, ContributesToRoomLifecycleImpl(), ResolvedContributor { - override val channel: Channel - get() = messages.channel + override val channel = messages.channel + + override val contributor: ContributesToRoomLifecycle = this + + override val attachmentErrorCode: ErrorCodes = ErrorCodes.PresenceAttachmentFailed + + override val detachmentErrorCode: ErrorCodes = ErrorCodes.PresenceDetachmentFailed override suspend fun get(params: List): List { TODO("Not yet implemented") @@ -159,8 +165,4 @@ internal class DefaultPresence( override fun subscribe(listener: Presence.Listener): Subscription { TODO("Not yet implemented") } - - override fun onDiscontinuity(listener: EmitsDiscontinuities.Listener): Subscription { - TODO("Not yet implemented") - } } diff --git a/chat-android/src/main/java/com/ably/chat/Room.kt b/chat-android/src/main/java/com/ably/chat/Room.kt index 0ff81d96..4c601630 100644 --- a/chat-android/src/main/java/com/ably/chat/Room.kt +++ b/chat-android/src/main/java/com/ably/chat/Room.kt @@ -2,6 +2,11 @@ package com.ably.chat +import io.ably.lib.util.Log.LogHandler +import kotlinx.coroutines.CoroutineName +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers + /** * Represents a chat room. */ @@ -89,44 +94,71 @@ internal class DefaultRoom( override val options: RoomOptions, realtimeClient: RealtimeClient, chatApi: ChatApi, + val logger: LogHandler?, ) : Room { - private val _messages = DefaultMessages( + private val _logger = logger + override val status = DefaultStatus(logger) + + /** + * RoomScope is a crucial part of the Room lifecycle. It manages sequential and atomic operations. + * Parallelism is intentionally limited to 1 to ensure that only one coroutine runs at a time, + * preventing concurrency issues. Every operation within Room must be performed through this scope. + */ + private val roomScope = + CoroutineScope(Dispatchers.Default.limitedParallelism(1) + CoroutineName(roomId)) + + override val messages = DefaultMessages( roomId = roomId, realtimeChannels = realtimeClient.channels, chatApi = chatApi, ) - override val messages: Messages = _messages - - override val presence: Presence = DefaultPresence( + override val presence = DefaultPresence( messages = messages, ) - override val reactions: RoomReactions = DefaultRoomReactions( + override val typing = DefaultTyping( roomId = roomId, - clientId = realtimeClient.auth.clientId, - realtimeChannels = realtimeClient.channels, + realtimeClient = realtimeClient, ) - override val typing: Typing = DefaultTyping( + override val reactions = DefaultRoomReactions( roomId = roomId, - realtimeClient = realtimeClient, + clientId = realtimeClient.auth.clientId, + realtimeChannels = realtimeClient.channels, ) - override val occupancy: Occupancy = DefaultOccupancy( + override val occupancy = DefaultOccupancy( messages = messages, ) - override val status: RoomStatus - get() { - TODO("Not yet implemented") - } + private var _lifecycleManager: RoomLifecycleManager? = null + + init { + /** + * TODO + * Initialize features based on provided RoomOptions. + * By default, only messages feature should be initialized. + * Currently, all features are initialized by default. + */ + val features = listOf(messages, presence, typing, reactions, occupancy) + _lifecycleManager = RoomLifecycleManager(roomScope, status, features, _logger) + /** + * TODO + * Make sure previous release op. for same was a success. + * Make sure channels were removed using realtime.channels.release(contributor.channel.name); + * Once this is a success, set room to initialized, if not set it to failed and throw error. + * Note that impl. can change based on recent proposed changes to chat-room-lifecycle DR. + */ + this.status.setStatus(RoomLifecycle.Initialized) + } override suspend fun attach() { - messages.channel.attachCoroutine() - typing.channel.attachCoroutine() - reactions.channel.attachCoroutine() + if (_lifecycleManager == null) { + // TODO - wait for room to be initialized inside init + } + _lifecycleManager?.attach() } override suspend fun detach() { @@ -136,6 +168,6 @@ internal class DefaultRoom( } fun release() { - _messages.release() + messages.release() } } diff --git a/chat-android/src/main/java/com/ably/chat/RoomLifecycleManager.kt b/chat-android/src/main/java/com/ably/chat/RoomLifecycleManager.kt new file mode 100644 index 00000000..4d4a0d03 --- /dev/null +++ b/chat-android/src/main/java/com/ably/chat/RoomLifecycleManager.kt @@ -0,0 +1,477 @@ +package com.ably.chat + +import io.ably.lib.realtime.ChannelState +import io.ably.lib.types.AblyException +import io.ably.lib.types.ErrorInfo +import io.ably.lib.util.Log.LogHandler +import kotlin.coroutines.resume +import kotlin.coroutines.resumeWithException +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.async +import kotlinx.coroutines.awaitAll +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.delay +import kotlinx.coroutines.suspendCancellableCoroutine +import io.ably.lib.realtime.Channel as AblyRealtimeChannel + +/** + * An interface for features that contribute to the room status. + */ +interface ContributesToRoomLifecycle : EmitsDiscontinuities, HandlesDiscontinuity { + /** + * Gets the channel on which the feature operates. This promise is never + * rejected except in the case where room initialization is canceled. + */ + override val channel: AblyRealtimeChannel + + /** + * Gets the ErrorInfo code that should be used when the feature fails to attach. + * @returns The error that should be used when the feature fails to attach. + */ + val attachmentErrorCode: ErrorCodes + + /** + * Gets the ErrorInfo code that should be used when the feature fails to detach. + * @returns The error that should be used when the feature fails to detach. + */ + val detachmentErrorCode: ErrorCodes +} + +abstract class ContributesToRoomLifecycleImpl : ContributesToRoomLifecycle { + + private val discontinuityEmitter = DiscontinuityEmitter() + + override fun onDiscontinuity(listener: EmitsDiscontinuities.Listener): Subscription { + discontinuityEmitter.on(listener) + return Subscription { + discontinuityEmitter.off(listener) + } + } + + override fun discontinuityDetected(reason: ErrorInfo?) { + discontinuityEmitter.emit("discontinuity", reason) + } +} + +/** + * This interface represents a feature that contributes to the room lifecycle and + * exposes its channel directly. Objects of this type are created by awaiting the + * channel promises of all the {@link ContributesToRoomLifecycle} objects. + * + * @internal + */ +interface ResolvedContributor { + val channel: AblyRealtimeChannel + val contributor: ContributesToRoomLifecycle +} + +/** + * The order of precedence for lifecycle operations, passed to PriorityQueueExecutor which allows + * us to ensure that internal operations take precedence over user-driven operations. + */ +enum class LifecycleOperationPrecedence(val priority: Int) { + Internal(1), + Release(2), + AttachOrDetach(3), +} + +/** + * A map of contributors to pending discontinuity events. + */ +typealias DiscontinuityEventMap = MutableMap + +/** + * An internal interface that represents the result of a room attachment operation. + */ +interface RoomAttachmentResult : NewRoomStatus { + val failedFeature: ResolvedContributor? + val exception: AblyException +} + +class DefaultRoomAttachmentResult : RoomAttachmentResult { + internal var statusField: RoomLifecycle = RoomLifecycle.Attached + override val status: RoomLifecycle + get() = statusField + + internal var failedFeatureField: ResolvedContributor? = null + override val failedFeature: ResolvedContributor? + get() = failedFeatureField + + internal var errorField: ErrorInfo? = null + override val error: ErrorInfo? + get() = errorField + + internal var throwable: Throwable? = null + + override val exception: AblyException + get() { + val errorInfo = errorField + ?: ErrorInfo("unknown error in attach", ErrorCodes.RoomLifecycleError.errorCode, HttpStatusCodes.InternalServerError) + throwable?.let { + return AblyException.fromErrorInfo(throwable, errorInfo) + } + return AblyException.fromErrorInfo(errorInfo) + } +} + +/** + * An implementation of the `Status` interface. + * @internal + */ +class RoomLifecycleManager +(private val roomScope: CoroutineScope, status: DefaultStatus, contributors: List, logger: LogHandler? = null) { + + /** + * The status of the room. + */ + private var _status: DefaultStatus = status + + /** + * The features that contribute to the room status. + */ + private var _contributors: List = contributors + + /** + * Logger for RoomLifeCycleManager + */ + private val _logger: LogHandler? = logger + + /** + * AtomicCoroutineScope makes sure all operations are atomic and run with given priority. + * See [Kotlin Dispatchers](https://kt.academy/article/cc-dispatchers) for more information. + * Spec: CHA-RL7 + */ + private val atomicCoroutineScope = AtomicCoroutineScope(roomScope) + + /** + * This flag indicates whether some sort of controlled operation is in progress (e.g. attaching, detaching, releasing). + * + * It is used to prevent the room status from being changed by individual channel state changes and ignore + * underlying channel events until we reach a consistent state. + */ + private var _operationInProgress = false + + /** + * A map of pending discontinuity events. + * + * When a discontinuity happens due to a failed resume, we don't want to surface that until the room is consistently + * attached again. This map allows us to queue up discontinuity events until we're ready to process them. + */ + private val _pendingDiscontinuityEvents: DiscontinuityEventMap = mutableMapOf() + + /** + * A map of contributors to whether their first attach has completed. + * + * Used to control whether we should trigger discontinuity events. + */ + private val _firstAttachesCompleted = mutableMapOf() + + /** + * Retry duration in milliseconds, used by internal doRetry and runDownChannelsOnFailedAttach methods + */ + private val _retryDurationInMs: Long = 250 + + init { + if (_status.current != RoomLifecycle.Attached) { + _operationInProgress = true + } + // TODO - [CHA-RL4] set up room monitoring here + } + + /** + * Clears all transient detach timeouts - used when some event supersedes the transient detach such + * as a failed channel or suspension. + */ + private fun clearAllTransientDetachTimeouts() { + // This will be implemented as a part of channel detach + } + + /** + * Given some contributor that has entered a suspended state: + * + * - Wind down any other channels + * - Wait for our contributor to recover + * - Attach everything else + * + * Repeat until either of the following happens: + * + * - Our contributor reattaches and we can attach everything else (repeat with the next contributor to break if necessary) + * - The room enters a failed state + * + * @param contributor The contributor that has entered a suspended state. + * @returns Returns when the room is attached, or the room enters a failed state. + */ + @SuppressWarnings("CognitiveComplexMethod") + private suspend fun doRetry(contributor: ResolvedContributor) { + // Handle the channel wind-down for other channels + var result = kotlin.runCatching { doChannelWindDown(contributor) } + while (result.isFailure) { + // If in doing the wind down, we've entered failed state, then it's game over anyway + if (this._status.current === RoomLifecycle.Failed) { + error("room is in a failed state") + } + delay(_retryDurationInMs) + result = kotlin.runCatching { doChannelWindDown(contributor) } + } + + // A helper that allows us to retry the attach operation + val doAttachWithRetry: suspend () -> Unit = { + coroutineScope { + _status.setStatus(RoomLifecycle.Attaching) + val attachmentResult = doAttach() + + // If we're in failed, then we should wind down all the channels, eventually - but we're done here + if (attachmentResult.status === RoomLifecycle.Failed) { + atomicCoroutineScope.async(LifecycleOperationPrecedence.Internal.priority) { + runDownChannelsOnFailedAttach() + } + return@coroutineScope + } + + // If we're in suspended, then we should wait for the channel to reattach, but wait for it to do so + if (attachmentResult.status === RoomLifecycle.Suspended) { + val failedFeature = attachmentResult.failedFeature + if (failedFeature == null) { + AblyException.fromErrorInfo( + ErrorInfo( + "no failed feature in doRetry", + ErrorCodes.RoomLifecycleError.errorCode, + HttpStatusCodes.InternalServerError, + ), + ) + } + // No need to catch errors, rather they should propagate to caller method + return@coroutineScope doRetry(failedFeature as ResolvedContributor) + } + // We attached, huzzah! + } + } + + // If given suspended contributor channel has reattached, then we can retry the attach + if (contributor.channel.state == ChannelState.attached) { + return doAttachWithRetry() + } + + // Otherwise, wait for our suspended contributor channel to re-attach and try again + try { + listenToChannelAttachOrFailure(contributor) + // Attach successful + return doAttachWithRetry() + } catch (ex: AblyException) { + // Channel attach failed + _status.setStatus(RoomLifecycle.Failed, ex.errorInfo) + throw ex + } + } + + private suspend fun listenToChannelAttachOrFailure(contributor: ResolvedContributor) = suspendCancellableCoroutine { continuation -> + contributor.channel.once(ChannelState.attached) { + continuation.resume(Unit) + } + contributor.channel.once(ChannelState.failed) { + val exception = AblyException.fromErrorInfo( + it.reason + ?: ErrorInfo("unknown error in _doRetry", ErrorCodes.RoomLifecycleError.errorCode, HttpStatusCodes.InternalServerError), + ) + continuation.resumeWithException(exception) + } + } + + /** + * Try to attach all the channels in a room. + * + * If the operation succeeds, the room enters the attached state and this promise resolves. + * If a channel enters the suspended state, then we reject, but we will retry after a short delay as is the case + * in the core SDK. + * If a channel enters the failed state, we reject and then begin to wind down the other channels. + */ + @SuppressWarnings("ThrowsCount") + internal suspend fun attach() { + val deferredAttach = atomicCoroutineScope.async(LifecycleOperationPrecedence.AttachOrDetach.priority) { + when (_status.current) { + RoomLifecycle.Attached -> return@async + RoomLifecycle.Releasing -> + throw AblyException.fromErrorInfo( + ErrorInfo( + "unable to attach room; room is releasing", + HttpStatusCodes.InternalServerError, + ErrorCodes.RoomIsReleasing.errorCode, + ), + ) + RoomLifecycle.Released -> + throw AblyException.fromErrorInfo( + ErrorInfo( + "unable to attach room; room is released", + HttpStatusCodes.InternalServerError, + ErrorCodes.RoomIsReleased.errorCode, + ), + ) + else -> {} + } + + // At this point, we force the room status to be attaching + clearAllTransientDetachTimeouts() + _operationInProgress = true + _status.setStatus(RoomLifecycle.Attaching) + + val attachResult = doAttach() + + // If we're in a failed state, then we should wind down all the channels, eventually + if (attachResult.status === RoomLifecycle.Failed) { + atomicCoroutineScope.async(LifecycleOperationPrecedence.Internal.priority) { + runDownChannelsOnFailedAttach() + } + throw attachResult.exception + } + + // If we're in suspended, then this attach should fail, but we'll retry after a short delay async + if (attachResult.status === RoomLifecycle.Suspended) { + if (attachResult.failedFeature == null) { + AblyException.fromErrorInfo( + ErrorInfo( + "no failed feature in attach", + ErrorCodes.RoomLifecycleError.errorCode, + HttpStatusCodes.InternalServerError, + ), + ) + } + attachResult.failedFeature?.let { + atomicCoroutineScope.async(LifecycleOperationPrecedence.Internal.priority) { + doRetry(it) + } + } + throw attachResult.exception + } + + // We attached, finally! + } + + deferredAttach.await() + } + + /** + * + * Attaches each feature channel with rollback on channel attach failure. + * This method is re-usable and can be called as a part of internal room operations. + * + */ + private suspend fun doAttach(): RoomAttachmentResult { + val attachResult = DefaultRoomAttachmentResult() + for (feature in _contributors) { + try { + feature.channel.attachCoroutine() + _firstAttachesCompleted[feature] = true + } catch (ex: Throwable) { + attachResult.throwable = ex + attachResult.failedFeatureField = feature + attachResult.errorField = ErrorInfo( + "failed to attach feature", + feature.contributor.attachmentErrorCode.errorCode, + HttpStatusCodes.InternalServerError, + ) + + // The current feature should be in one of two states, it will be either suspended or failed + // If it's in suspended, we wind down the other channels and wait for the reattach + // If it's failed, we can fail the entire room + when (feature.channel.state) { + ChannelState.suspended -> attachResult.statusField = RoomLifecycle.Suspended + ChannelState.failed -> attachResult.statusField = RoomLifecycle.Failed + else -> { + attachResult.statusField = RoomLifecycle.Failed + attachResult.errorField = ErrorInfo( + "unexpected channel state in doAttach ${feature.channel.state}", + ErrorCodes.RoomLifecycleError.errorCode, + HttpStatusCodes.InternalServerError, + ) + } + } + + // Regardless of whether we're suspended or failed, run-down the other channels + // The wind-down procedure will take Precedence over any user-driven actions + _status.setStatus(attachResult) + return attachResult + } + } + + // We successfully attached all the channels - set our status to attached, start listening changes in channel status + this._status.setStatus(attachResult) + this._operationInProgress = false + + // Iterate the pending discontinuity events and trigger them + for ((contributor, error) in _pendingDiscontinuityEvents) { + contributor.contributor.discontinuityDetected(error) + } + _pendingDiscontinuityEvents.clear() + return attachResult + } + + /** + * If we've failed to attach, then we're in the failed state and all that is left to do is to detach all the channels. + * + * @returns Returns only when all channels are detached. Doesn't throw exception. + */ + private suspend fun runDownChannelsOnFailedAttach() { + // At this point, we have control over the channel lifecycle, so we can hold onto it until things are resolved + // Keep trying to detach the channels until they're all detached. + var channelWindDown = kotlin.runCatching { doChannelWindDown() } + while (channelWindDown.isFailure) { + // Something went wrong during the wind down. After a short delay, to give others a turn, we should run down + // again until we reach a suitable conclusion. + delay(_retryDurationInMs) + channelWindDown = kotlin.runCatching { doChannelWindDown() } + } + } + + /** + * Detach all features except the one exception provided. + * If the room is in a failed state, then all channels should either reach the failed state or be detached. + * + * @param except The contributor to exclude from the detachment. + * @returns Success/Failure when all channels are detached or at least one of them fails. + */ + @SuppressWarnings("CognitiveComplexMethod", "ComplexCondition") + private suspend fun doChannelWindDown(except: ResolvedContributor? = null) = coroutineScope { + _contributors.map { contributor: ResolvedContributor -> + async { + // If its the contributor we want to wait for a conclusion on, then we should not detach it + // Unless we're in a failed state, in which case we should detach it + if (contributor === except && _status.current !== RoomLifecycle.Failed) { + return@async + } + // If the room's already in the failed state, or it's releasing, we should not detach a failed channel + if (( + _status.current === RoomLifecycle.Failed || + _status.current === RoomLifecycle.Releasing || + _status.current === RoomLifecycle.Released + ) && + contributor.channel.state === ChannelState.failed + ) { + return@async + } + + try { + contributor.channel.detachCoroutine() + } catch (throwable: Throwable) { + // If the contributor is in a failed state and we're not ignoring failed states, we should fail the room + if ( + contributor.channel.state === ChannelState.failed && + _status.current !== RoomLifecycle.Failed && + _status.current !== RoomLifecycle.Releasing && + _status.current !== RoomLifecycle.Released + ) { + val contributorError = ErrorInfo( + "failed to detach feature", + contributor.contributor.detachmentErrorCode.errorCode, + HttpStatusCodes.InternalServerError, + ) + _status.setStatus(RoomLifecycle.Failed, contributorError) + throw AblyException.fromErrorInfo(throwable, contributorError) + } + + // We throw an error so that the promise rejects + throw AblyException.fromErrorInfo(throwable, ErrorInfo("detach failure, retry", -1, -1)) + } + } + }.awaitAll() + } +} diff --git a/chat-android/src/main/java/com/ably/chat/RoomReactions.kt b/chat-android/src/main/java/com/ably/chat/RoomReactions.kt index 3fab956d..59a10d93 100644 --- a/chat-android/src/main/java/com/ably/chat/RoomReactions.kt +++ b/chat-android/src/main/java/com/ably/chat/RoomReactions.kt @@ -4,10 +4,10 @@ package com.ably.chat import com.google.gson.JsonObject import io.ably.lib.realtime.AblyRealtime -import io.ably.lib.realtime.Channel import io.ably.lib.types.AblyException import io.ably.lib.types.ErrorInfo import io.ably.lib.types.MessageExtras +import io.ably.lib.realtime.Channel as AblyRealtimeChannel /** * This interface is used to interact with room-level reactions in a chat room: subscribing to reactions and sending them. @@ -21,7 +21,7 @@ interface RoomReactions : EmitsDiscontinuities { * * @returns The Ably realtime channel instance. */ - val channel: Channel + val channel: AblyRealtimeChannel /** * Send a reaction to the room including some metadata. @@ -107,11 +107,17 @@ internal class DefaultRoomReactions( roomId: String, private val clientId: String, realtimeChannels: AblyRealtime.Channels, -) : RoomReactions { - // (CHA-ER1) +) : RoomReactions, ContributesToRoomLifecycleImpl(), ResolvedContributor { + private val roomReactionsChannelName = "$roomId::\$chat::\$reactions" - override val channel: Channel = realtimeChannels.get(roomReactionsChannelName, ChatChannelOptions()) + override val channel: AblyRealtimeChannel = realtimeChannels.get(roomReactionsChannelName, ChatChannelOptions()) + + override val contributor: ContributesToRoomLifecycle = this + + override val attachmentErrorCode: ErrorCodes = ErrorCodes.ReactionsAttachmentFailed + + override val detachmentErrorCode: ErrorCodes = ErrorCodes.ReactionsDetachmentFailed // (CHA-ER3) Ephemeral room reactions are sent to Ably via the Realtime connection via a send method. // (CHA-ER3a) Reactions are sent on the channel using a message in a particular format - see spec for format. @@ -136,7 +142,7 @@ internal class DefaultRoomReactions( override fun subscribe(listener: RoomReactions.Listener): Subscription { val messageListener = PubSubMessageListener { val pubSubMessage = it ?: throw AblyException.fromErrorInfo( - ErrorInfo("Got empty pubsub channel message", HttpStatusCodes.BadRequest, ErrorCodes.BadRequest), + ErrorInfo("Got empty pubsub channel message", HttpStatusCodes.BadRequest, ErrorCodes.BadRequest.errorCode), ) val data = pubSubMessage.data as? JsonObject ?: throw AblyException.fromErrorInfo( ErrorInfo("Unrecognized Pub/Sub channel's message for `roomReaction` event", HttpStatusCodes.InternalServerError), @@ -154,8 +160,4 @@ internal class DefaultRoomReactions( channel.subscribe(RoomReactionEventType.Reaction.eventName, messageListener) return Subscription { channel.unsubscribe(RoomReactionEventType.Reaction.eventName, messageListener) } } - - override fun onDiscontinuity(listener: EmitsDiscontinuities.Listener): Subscription { - TODO("Not yet implemented") - } } diff --git a/chat-android/src/main/java/com/ably/chat/RoomStatus.kt b/chat-android/src/main/java/com/ably/chat/RoomStatus.kt index c08846a8..25823cd7 100644 --- a/chat-android/src/main/java/com/ably/chat/RoomStatus.kt +++ b/chat-android/src/main/java/com/ably/chat/RoomStatus.kt @@ -1,6 +1,9 @@ package com.ably.chat import io.ably.lib.types.ErrorInfo +import io.ably.lib.util.EventEmitter +import io.ably.lib.util.Log +import io.ably.lib.util.Log.LogHandler /** * Represents the status of a Room. @@ -23,7 +26,7 @@ interface RoomStatus { * @param listener The function to call when the status changes. * @returns An object that can be used to unregister the listener. */ - fun on(listener: Listener): Subscription + fun onChange(listener: Listener): Subscription /** * An interface for listening to changes for the room status @@ -35,6 +38,42 @@ interface RoomStatus { */ fun roomStatusChanged(change: RoomStatusChange) } + + /** + * Removes all listeners that were added by the `onChange` method. + */ + fun offAll() +} + +/** + * A new room status that can be set. + */ +interface NewRoomStatus { + /** + * The new status of the room. + */ + val status: RoomLifecycle + + /** + * An error that provides a reason why the room has + * entered the new status, if applicable. + */ + val error: ErrorInfo? +} + +interface InternalRoomStatus : RoomStatus { + /** + * Registers a listener that will be called once when the room status changes. + * @param listener The function to call when the status changes. + */ + fun onChangeOnce(listener: RoomStatus.Listener) + + /** + * Sets the status of the room. + * + * @param params The new status of the room. + */ + fun setStatus(params: NewRoomStatus) } /** @@ -42,6 +81,11 @@ interface RoomStatus { * The different states that a room can be in throughout its lifecycle. */ enum class RoomLifecycle(val stateName: String) { + /** + * The library is currently initializing the room. + */ + Initializing("initializing"), + /** * (CHA-RS1a) * A temporary state for when the library is first initialized. @@ -118,3 +162,61 @@ data class RoomStatusChange( */ val error: ErrorInfo? = null, ) + +class RoomStatusEventEmitter : EventEmitter() { + + override fun apply(listener: RoomStatus.Listener?, event: RoomLifecycle?, vararg args: Any?) { + try { + listener?.roomStatusChanged(args[0] as RoomStatusChange) + } catch (t: Throwable) { + Log.e("RoomEventEmitter", "Unexpected exception calling Room Status Listener", t) + } + } +} + +class DefaultStatus(private val logger: LogHandler?) : InternalRoomStatus { + + private val _logger = logger + + private var _state = RoomLifecycle.Initializing + override val current: RoomLifecycle + get() = _state + + private var _error: ErrorInfo? = null + override val error: ErrorInfo? + get() = _error + + private val externalEmitter = RoomStatusEventEmitter() + private val internalEmitter = RoomStatusEventEmitter() + + override fun onChange(listener: RoomStatus.Listener): Subscription { + externalEmitter.on(listener) + return Subscription { + externalEmitter.off(listener) + } + } + + override fun offAll() { + externalEmitter.off() + } + + override fun onChangeOnce(listener: RoomStatus.Listener) { + internalEmitter.once(listener) + } + + override fun setStatus(params: NewRoomStatus) { + val change = RoomStatusChange(params.status, current, params.error) + _state = change.current + _error = change.error + internalEmitter.emit(change.current, change) + externalEmitter.emit(change.current, change) + } + + fun setStatus(status: RoomLifecycle, error: ErrorInfo? = null) { + val newStatus = object : NewRoomStatus { + override val status: RoomLifecycle = status + override val error: ErrorInfo? = error + } + this.setStatus(newStatus) + } +} diff --git a/chat-android/src/main/java/com/ably/chat/Rooms.kt b/chat-android/src/main/java/com/ably/chat/Rooms.kt index 31c0c49a..0d99e047 100644 --- a/chat-android/src/main/java/com/ably/chat/Rooms.kt +++ b/chat-android/src/main/java/com/ably/chat/Rooms.kt @@ -57,12 +57,13 @@ internal class DefaultRooms( options = options, realtimeClient = realtimeClient, chatApi = chatApi, + logger = clientOptions.logHandler, ) } if (room.options != options) { throw AblyException.fromErrorInfo( - ErrorInfo("Room already exists with different options", HttpStatusCodes.BadRequest, ErrorCodes.BadRequest), + ErrorInfo("Room already exists with different options", HttpStatusCodes.BadRequest, ErrorCodes.BadRequest.errorCode), ) } diff --git a/chat-android/src/main/java/com/ably/chat/Timeserial.kt b/chat-android/src/main/java/com/ably/chat/Timeserial.kt index 99b161dc..7ade1f89 100644 --- a/chat-android/src/main/java/com/ably/chat/Timeserial.kt +++ b/chat-android/src/main/java/com/ably/chat/Timeserial.kt @@ -49,7 +49,7 @@ data class Timeserial( fun parse(timeserial: String): Timeserial { val matched = """(\w+)@(\d+)-(\d+)(?::(\d+))?""".toRegex().matchEntire(timeserial) ?: throw AblyException.fromErrorInfo( - ErrorInfo("invalid timeserial", HttpStatusCodes.InternalServerError, ErrorCodes.InternalError), + ErrorInfo("invalid timeserial", HttpStatusCodes.InternalServerError, ErrorCodes.InternalError.errorCode), ) val (seriesId, timestamp, counter, index) = matched.destructured diff --git a/chat-android/src/main/java/com/ably/chat/Typing.kt b/chat-android/src/main/java/com/ably/chat/Typing.kt index 6add59f8..9950adc4 100644 --- a/chat-android/src/main/java/com/ably/chat/Typing.kt +++ b/chat-android/src/main/java/com/ably/chat/Typing.kt @@ -2,7 +2,7 @@ package com.ably.chat -import io.ably.lib.realtime.Channel +import io.ably.lib.realtime.Channel as AblyRealtimeChannel /** * base retry interval, we double it each time @@ -30,7 +30,7 @@ interface Typing : EmitsDiscontinuities { * Get the name of the realtime channel underpinning typing events. * @returns The name of the realtime channel. */ - val channel: Channel + val channel: AblyRealtimeChannel /** * Subscribe a given listener to all typing events from users in the chat room. @@ -78,11 +78,17 @@ data class TypingEvent(val currentlyTyping: Set) internal class DefaultTyping( roomId: String, private val realtimeClient: RealtimeClient, -) : Typing { +) : Typing, ContributesToRoomLifecycleImpl(), ResolvedContributor { + private val typingIndicatorsChannelName = "$roomId::\$chat::\$typingIndicators" - override val channel: Channel - get() = realtimeClient.channels.get(typingIndicatorsChannelName, ChatChannelOptions()) + override val channel = realtimeClient.channels.get(typingIndicatorsChannelName, ChatChannelOptions()) + + override val contributor: ContributesToRoomLifecycle = this + + override val attachmentErrorCode: ErrorCodes = ErrorCodes.TypingAttachmentFailed + + override val detachmentErrorCode: ErrorCodes = ErrorCodes.TypingDetachmentFailed override fun subscribe(listener: Typing.Listener): Subscription { TODO("Not yet implemented") @@ -99,8 +105,4 @@ internal class DefaultTyping( override suspend fun stop() { TODO("Not yet implemented") } - - override fun onDiscontinuity(listener: EmitsDiscontinuities.Listener): Subscription { - TODO("Not yet implemented") - } } diff --git a/chat-android/src/test/java/com/ably/chat/AtomicCoroutineScopeTest.kt b/chat-android/src/test/java/com/ably/chat/AtomicCoroutineScopeTest.kt new file mode 100644 index 00000000..c2f71b80 --- /dev/null +++ b/chat-android/src/test/java/com/ably/chat/AtomicCoroutineScopeTest.kt @@ -0,0 +1,286 @@ +package com.ably.chat + +import io.ably.lib.types.AblyException +import io.ably.lib.types.ErrorInfo +import java.util.concurrent.LinkedBlockingQueue +import kotlin.time.DurationUnit +import kotlin.time.toDuration +import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.CoroutineName +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Deferred +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.awaitAll +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.test.runTest +import kotlinx.coroutines.withContext +import org.hamcrest.CoreMatchers.containsString +import org.junit.Assert +import org.junit.Test + +class AtomicCoroutineScopeTest { + + @Test + fun `should perform given operation`() = runTest { + val atomicCoroutineScope = AtomicCoroutineScope() + val deferredResult = atomicCoroutineScope.async { + delay(3000) + return@async "Operation Success!" + } + assertWaiter { !atomicCoroutineScope.finishedProcessing } + val result = deferredResult.await() + assertWaiter { atomicCoroutineScope.finishedProcessing } + Assert.assertEquals("Operation Success!", result) + } + + @Test + fun `should capture failure of the given operation and continue performing other operation`() = runTest { + val atomicCoroutineScope = AtomicCoroutineScope() + val deferredResult1 = atomicCoroutineScope.async { + delay(2000) + throw AblyException.fromErrorInfo(ErrorInfo("Error performing operation", 400)) + } + val deferredResult2 = atomicCoroutineScope.async { + delay(2000) + return@async "Operation Success!" + } + assertWaiter { !atomicCoroutineScope.finishedProcessing } + + val ex = Assert.assertThrows(AblyException::class.java) { + runBlocking { + deferredResult1.await() + } + } + Assert.assertEquals("Error performing operation", ex.errorInfo.message) + + val result2 = deferredResult2.await() + assertWaiter { atomicCoroutineScope.finishedProcessing } + Assert.assertEquals("Operation Success!", result2) + } + + @Test + fun `should perform mutually exclusive operations`() = runTest { + val atomicCoroutineScope = AtomicCoroutineScope() + val deferredResults = mutableListOf>() + var operationInProgress = false + var counter = 0 + + repeat(10) { + val result = atomicCoroutineScope.async { + if (operationInProgress) { + error("Can't perform operation when other operation is going on") + } + operationInProgress = true + delay((200..800).random().toDuration(DurationUnit.MILLISECONDS)) + val returnValue = counter++ + operationInProgress = false + return@async returnValue + } + deferredResults.add(result) + } + assertWaiter { !atomicCoroutineScope.finishedProcessing } + + val results = deferredResults.awaitAll() + assertWaiter { atomicCoroutineScope.finishedProcessing } + + repeat(10) { + Assert.assertEquals(it, results[it]) + } + } + + @Test + fun `Concurrently perform mutually exclusive operations`() = runTest { + val atomicCoroutineScope = AtomicCoroutineScope() + val deferredResults = LinkedBlockingQueue>() + + var operationInProgress = false + var counter = 0 + val countedValues = mutableListOf() + + // Concurrently schedule 100000 jobs from multiple threads + withContext(Dispatchers.IO) { + repeat(1_00_000) { + launch { + val result = atomicCoroutineScope.async { + if (operationInProgress) { + error("Can't perform operation when other operation is going on") + } + operationInProgress = true + countedValues.add(counter++) + operationInProgress = false + } + deferredResults.add(result) + } + } + } + + assertWaiter { deferredResults.size == 1_00_000 } + + deferredResults.awaitAll() + assertWaiter { atomicCoroutineScope.finishedProcessing } + Assert.assertEquals((0..99_999).toList(), countedValues) + } + + @Test + fun `should perform mutually exclusive operations with custom room scope`() = runTest { + val roomScope = CoroutineScope(Dispatchers.Default.limitedParallelism(1) + CoroutineName("roomId")) + val atomicCoroutineScope = AtomicCoroutineScope(roomScope) + val deferredResults = mutableListOf>() + + val contexts = mutableListOf() + val contextNames = mutableListOf() + + var operationInProgress = false + var counter = 0 + + repeat(10) { + val result = atomicCoroutineScope.async { + if (operationInProgress) { + error("Can't perform operation when other operation is going on") + } + operationInProgress = true + contexts.add(coroutineContext.toString()) + contextNames.add(coroutineContext[CoroutineName]!!.name) + + delay((200..800).random().toDuration(DurationUnit.MILLISECONDS)) + val returnValue = counter++ + operationInProgress = false + return@async returnValue + } + deferredResults.add(result) + } + assertWaiter { !atomicCoroutineScope.finishedProcessing } + + val results = deferredResults.awaitAll() + repeat(10) { + Assert.assertEquals(it, results[it]) + Assert.assertEquals("roomId", contextNames[it]) + Assert.assertThat(contexts[it], containsString("Dispatchers.Default.limitedParallelism(1)")) + } + assertWaiter { atomicCoroutineScope.finishedProcessing } + } + + @Test + fun `should perform mutually exclusive operations with given priority`() = runTest { + val atomicCoroutineScope = AtomicCoroutineScope() + val deferredResults = mutableListOf>() + var operationInProgress = false + var counter = 0 + val contexts = mutableListOf() + + // This will start internal operation + deferredResults.add( + atomicCoroutineScope.async { + delay(1000) + return@async 99 + }, + ) + delay(100) + + // Add more jobs, will be processed based on priority + repeat(10) { + val result = atomicCoroutineScope.async(10 - it) { + if (operationInProgress) { + error("Can't perform operation when other operation is going on") + } + operationInProgress = true + contexts.add(this.coroutineContext.toString()) + delay((200..800).random().toDuration(DurationUnit.MILLISECONDS)) + val returnValue = counter++ + operationInProgress = false + return@async returnValue + } + deferredResults.add(result) + } + + val results = deferredResults.awaitAll() + val expectedResults = listOf(99, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0) + repeat(10) { + Assert.assertEquals(expectedResults[it], results[it]) + Assert.assertThat(contexts[it], containsString("Dispatchers.Default")) + } + assertWaiter { atomicCoroutineScope.finishedProcessing } + } + + @Test + fun `Concurrently execute mutually exclusive operations with given priority`() = runTest { + val atomicCoroutineScope = AtomicCoroutineScope() + val deferredResults = LinkedBlockingQueue>() + + var operationInProgress = false + val processedValues = mutableListOf() + +// This will start first internal operation + deferredResults.add( + atomicCoroutineScope.async { + delay(1000) + processedValues.add(1000) + return@async + }, + ) + + // Add more jobs, will be processed based on priority + // Concurrently schedule 1000 jobs with incremental priority from multiple threads + withContext(Dispatchers.IO) { + repeat(1000) { + launch { + val result = atomicCoroutineScope.async(1000 - it) { + if (operationInProgress) { + error("Can't perform operation when other operation is going on") + } + operationInProgress = true + processedValues.add(it) + operationInProgress = false + } + deferredResults.add(result) + } + } + } + + deferredResults.awaitAll() + val expectedResults = (1000 downTo 0).toList() + repeat(1001) { + Assert.assertEquals(expectedResults[it], processedValues[it]) + } + assertWaiter { atomicCoroutineScope.finishedProcessing } + } + + @Test + fun `should cancel current+pending operations once scope is cancelled and continue performing new operations`() = runTest { + val atomicCoroutineScope = AtomicCoroutineScope() + val results = mutableListOf>() + repeat(10) { + results.add( + atomicCoroutineScope.async { + delay(10_000) + }, + ) + } + assertWaiter { !atomicCoroutineScope.finishedProcessing } + Assert.assertEquals(9, atomicCoroutineScope.pendingJobCount) + + // Cancelling scope should cancel current job and other queued jobs + atomicCoroutineScope.cancel("scope cancelled externally") + assertWaiter { atomicCoroutineScope.finishedProcessing } + Assert.assertEquals(0, atomicCoroutineScope.pendingJobCount) + + for (result in results) { + val result1 = kotlin.runCatching { result.await() } + Assert.assertTrue(result1.isFailure) + Assert.assertEquals("scope cancelled externally", result1.exceptionOrNull()!!.message) + } + + // Should process new job + val deferredResult3 = atomicCoroutineScope.async { + delay(200) + return@async "Operation Success!" + } + assertWaiter { !atomicCoroutineScope.finishedProcessing } + + val result3 = deferredResult3.await() + assertWaiter { atomicCoroutineScope.finishedProcessing } + Assert.assertEquals("Operation Success!", result3) + } +} diff --git a/chat-android/src/test/java/com/ably/chat/TestUtils.kt b/chat-android/src/test/java/com/ably/chat/TestUtils.kt index e493418b..1e202e08 100644 --- a/chat-android/src/test/java/com/ably/chat/TestUtils.kt +++ b/chat-android/src/test/java/com/ably/chat/TestUtils.kt @@ -4,6 +4,10 @@ import com.google.gson.JsonElement import io.ably.lib.types.AsyncHttpPaginatedResponse import io.mockk.every import io.mockk.mockk +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.delay +import kotlinx.coroutines.withContext +import kotlinx.coroutines.withTimeout fun buildAsyncHttpPaginatedResponse(items: List): AsyncHttpPaginatedResponse { val response = mockk() @@ -49,3 +53,14 @@ fun mockOccupancyApiResponse(realtimeClientMock: RealtimeClient, response: JsonE ) } } + +suspend fun assertWaiter(timeoutInMs: Long = 10_000, block: () -> Boolean) { + withContext(Dispatchers.Default) { + withTimeout(timeoutInMs) { + do { + val success = block() + delay(100) + } while (!success) + } + } +} diff --git a/detekt.yml b/detekt.yml index 95cd7e46..de7664d5 100644 --- a/detekt.yml +++ b/detekt.yml @@ -99,7 +99,7 @@ complexity: - 'with' LabeledExpression: active: true - ignoredLabels: [ ] + ignoredLabels: [ 'async', 'coroutineScope' ] LargeClass: active: true threshold: 600 @@ -952,7 +952,7 @@ style: UnderscoresInNumericLiterals: active: true acceptableLength: 4 - allowNonStandardGrouping: false + allowNonStandardGrouping: true UnnecessaryAbstractClass: active: false UnnecessaryAnnotationUseSiteTarget: @@ -990,7 +990,7 @@ style: - 'Preview' UnusedPrivateProperty: active: true - allowedNames: '_|ignored|expected|serialVersionUID' + allowedNames: '_|ignored|expected|serialVersionUID|_logger' UseAnyOrNoneInsteadOfFind: active: false UseArrayLiteralsInAnnotations: diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 35bce2df..0c361124 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -3,7 +3,7 @@ [versions] ably-chat = "0.0.1" -ably = "1.2.43" +ably = "1.2.44" junit = "4.13.2" agp = "8.5.2" detekt = "1.23.6" @@ -18,7 +18,7 @@ activity-compose = "1.9.1" compose-bom = "2024.06.00" gson = "2.11.0" mockk = "1.13.12" -coroutine = "1.8.1" +coroutine = "1.9.0" build-config = "5.4.0" [libraries]