Skip to content

Commit

Permalink
Merge branch 'feature/roomlifecycle-attach-with-retry' into tests/roo…
Browse files Browse the repository at this point in the history
…mlifecycle-attach
  • Loading branch information
sacOO7 committed Oct 29, 2024
2 parents d990bd6 + d9094b5 commit 0a6a45e
Show file tree
Hide file tree
Showing 5 changed files with 220 additions and 42 deletions.
57 changes: 31 additions & 26 deletions chat-android/src/main/java/com/ably/chat/AtomicCoroutineScope.kt
Original file line number Diff line number Diff line change
@@ -1,22 +1,21 @@
package com.ably.chat

import java.util.PriorityQueue
import io.ably.annotation.Experimental
import java.util.concurrent.PriorityBlockingQueue
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.async
import kotlinx.coroutines.cancel
import kotlinx.coroutines.launch

/**
* AtomicCoroutineScope makes sure all operations are atomic and run with given priority.
* Each ChatRoomLifecycleManager is supposed to have it's own AtomicCoroutineScope.
* Uses limitedParallelism set to 1 to make sure coroutines under given scope do not run in parallel.
* AtomicCoroutineScope is a thread safe wrapper to run multiple operations mutually exclusive.
* All operations are atomic and run with given priority.
* Accepts scope as a constructor parameter to run operations under the given scope.
* See [Kotlin Dispatchers](https://kt.academy/article/cc-dispatchers) for more information.
*/
class AtomicCoroutineScope {
class AtomicCoroutineScope(private val scope: CoroutineScope = CoroutineScope(Dispatchers.Default)) {

private val sequentialScope = CoroutineScope(Dispatchers.Default.limitedParallelism(1))
private val sequentialScope: CoroutineScope = CoroutineScope(Dispatchers.Default.limitedParallelism(1))

private class Job(
private val priority: Int,
Expand All @@ -33,29 +32,25 @@ class AtomicCoroutineScope {
}
}

private var isRunning = false
private var queueCounter = 0
private val jobs: PriorityQueue<Job> = PriorityQueue()
private val jobs: PriorityBlockingQueue<Job> = PriorityBlockingQueue() // Accessed from both sequentialScope and async method
private var isRunning = false // Only accessed from sequentialScope
private var queueCounter = 0 // Only accessed from synchronized async method

/**
* @param priority Defines priority for the operation execution.
* @param coroutineBlock Suspended function that needs to be executed mutually exclusive under given scope.
*/
suspend fun <T : Any>async(priority: Int = 0, coroutineBlock: suspend CoroutineScope.() -> T): CompletableDeferred<T> {
@Synchronized
fun <T : Any>async(priority: Int = 0, coroutineBlock: suspend CoroutineScope.() -> T): CompletableDeferred<T> {
val deferredResult = CompletableDeferred<Any>()
jobs.add(Job(priority, coroutineBlock, deferredResult, queueCounter++))
sequentialScope.launch {
jobs.add(Job(priority, coroutineBlock, deferredResult, queueCounter++))
if (!isRunning) {
isRunning = true
while (jobs.isNotEmpty()) {
val job = jobs.poll()
job?.let {
try {
val result = sequentialScope.async(block = it.coroutineBlock).await()
it.deferredResult.complete(result)
} catch (t: Throwable) {
it.deferredResult.completeExceptionally(t)
}
safeExecute(it)
}
}
isRunning = false
Expand All @@ -66,12 +61,22 @@ class AtomicCoroutineScope {
return deferredResult as CompletableDeferred<T>
}

/**
* Cancels all jobs along with it's children.
* This includes cancelling queued jobs and current retry timers.
*/
fun cancel(message: String, cause: Throwable? = null) {
jobs.clear()
sequentialScope.cancel(message, cause)
private suspend fun safeExecute(job: Job) {
runCatching {
scope.launch {
try {
val result = job.coroutineBlock(this)
job.deferredResult.complete(result)
} catch (t: Throwable) {
job.deferredResult.completeExceptionally(t)
}
}.join()
}.onFailure {
job.deferredResult.completeExceptionally(it)
}
}

@Experimental
val finishedProcessing: Boolean
get() = jobs.isEmpty() && !isRunning
}
13 changes: 12 additions & 1 deletion chat-android/src/main/java/com/ably/chat/Room.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

package com.ably.chat
import io.ably.lib.util.Log.LogHandler
import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers

/**
* Represents a chat room.
Expand Down Expand Up @@ -96,6 +99,14 @@ internal class DefaultRoom(
private val _logger = logger
override val status = DefaultStatus(logger)

/**
* RoomScope is a crucial part of the Room lifecycle. It manages sequential and atomic operations.
* Parallelism is intentionally limited to 1 to ensure that only one coroutine runs at a time,
* preventing concurrency issues. Every operation within Room must be performed through this scope.
*/
private val roomScope =
CoroutineScope(Dispatchers.Default.limitedParallelism(1) + CoroutineName(roomId))

override val messages = DefaultMessages(
roomId = roomId,
realtimeChannels = realtimeClient.channels,
Expand Down Expand Up @@ -130,7 +141,7 @@ internal class DefaultRoom(
* Currently, all features are initialized by default.
*/
val features = listOf(messages, presence, typing, reactions, occupancy)
_lifecycleManager = RoomLifecycleManager(status, features, _logger)
_lifecycleManager = RoomLifecycleManager(roomScope, status, features, _logger)
/**
* TODO
* Make sure previous release op. for same was a success.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import io.ably.lib.types.ErrorInfo
import io.ably.lib.util.Log.LogHandler
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.coroutineScope
Expand Down Expand Up @@ -96,7 +97,7 @@ class DefaultRoomAttachmentResult : RoomAttachmentResult {
* @internal
*/
class RoomLifecycleManager
(status: DefaultStatus, contributors: List<ResolvedContributor>, logger: LogHandler? = null) {
(private val roomScope: CoroutineScope, status: DefaultStatus, contributors: List<ResolvedContributor>, logger: LogHandler? = null) {

/**
* The status of the room.
Expand All @@ -118,7 +119,7 @@ class RoomLifecycleManager
* See [Kotlin Dispatchers](https://kt.academy/article/cc-dispatchers) for more information.
* Spec: CHA-RL7
*/
private val atomicCoroutineScope = AtomicCoroutineScope()
private val atomicCoroutineScope = AtomicCoroutineScope(roomScope)

/**
* This flag indicates whether some sort of controlled operation is in progress (e.g. attaching, detaching, releasing).
Expand Down
Loading

0 comments on commit 0a6a45e

Please sign in to comment.