diff --git a/chat-android/src/main/java/com/ably/chat/AtomicCoroutineScope.kt b/chat-android/src/main/java/com/ably/chat/AtomicCoroutineScope.kt new file mode 100644 index 00000000..b53d00c0 --- /dev/null +++ b/chat-android/src/main/java/com/ably/chat/AtomicCoroutineScope.kt @@ -0,0 +1,84 @@ +package com.ably.chat + +import java.util.PriorityQueue +import java.util.concurrent.Executors +import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.ExecutorCoroutineDispatcher +import kotlinx.coroutines.asCoroutineDispatcher +import kotlinx.coroutines.async +import kotlinx.coroutines.cancel +import kotlinx.coroutines.launch + +/** + * Each ChatRoomLifecycleManager is supposed to have it's own AtomicCoroutineScope. + * AtomicCoroutineScope makes sure all operations are atomic and run with given priority. + * Uses single threaded dispatcher to avoid thread synchronization issues. + */ +class AtomicCoroutineScope private constructor(private val scope: CoroutineScope) { + + private class Job( + private val priority: Int, + val coroutineBlock: suspend CoroutineScope.() -> Any, + val deferredResult: CompletableDeferred, + ) : + Comparable { + override fun compareTo(other: Job): Int = this.priority.compareTo(other.priority) + } + + private var isRunning = false + private val jobs: PriorityQueue = PriorityQueue() + + /** + * @param priority Defines priority for the operation execution. + * @param coroutineBlock Suspended function that needs to be executed mutually exclusive under given scope. + */ + suspend fun async(priority: Int = 0, coroutineBlock: suspend CoroutineScope.() -> T): CompletableDeferred { + val deferredResult = CompletableDeferred() + scope.launch { + jobs.add(Job(priority, coroutineBlock, deferredResult)) + if (!isRunning) { + isRunning = true + while (jobs.isNotEmpty()) { + val job = jobs.poll() + job?.let { + try { + it.deferredResult.complete(scope.async(block = it.coroutineBlock).await()) + } catch (t: Throwable) { + it.deferredResult.completeExceptionally(t) + } + } + } + isRunning = false + } + } + + @Suppress("UNCHECKED_CAST") + return deferredResult as CompletableDeferred + } + + /** + * Cancels all jobs along along with it's children. + * This includes cancelling queued jobs and current retry timers. + */ + fun cancel(message: String, cause: Throwable? = null) { + jobs.clear() + scope.cancel(message, cause) + } + + companion object { + private var _singleThreadedDispatcher : ExecutorCoroutineDispatcher? = null + + fun create(): AtomicCoroutineScope { + if (_singleThreadedDispatcher == null) { + _singleThreadedDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher(); + } + return AtomicCoroutineScope(CoroutineScope(singleThreadedDispatcher)) + } + + val singleThreadedDispatcher: ExecutorCoroutineDispatcher + get() { + return _singleThreadedDispatcher?: error("Call SingleThreadedExecutor.create() method to initialize SingleThreadedDispatcher") + } + } +} diff --git a/chat-android/src/test/java/com/ably/chat/AtomicCoroutineScopeTest.kt b/chat-android/src/test/java/com/ably/chat/AtomicCoroutineScopeTest.kt new file mode 100644 index 00000000..8016f356 --- /dev/null +++ b/chat-android/src/test/java/com/ably/chat/AtomicCoroutineScopeTest.kt @@ -0,0 +1,72 @@ +package com.ably.chat + +import io.ably.lib.types.AblyException +import io.ably.lib.types.ErrorInfo +import kotlin.time.DurationUnit +import kotlin.time.toDuration +import kotlinx.coroutines.Deferred +import kotlinx.coroutines.awaitAll +import kotlinx.coroutines.delay +import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.test.runTest +import org.junit.Assert +import org.junit.Test + +class AtomicCoroutineScopeTest { + + @Test + fun `should perform given operation`() = runTest { + val atomicCoroutineScope = AtomicCoroutineScope.create() + val deferredResult = atomicCoroutineScope.async { + delay(3000) + return@async "Operation Success!" + } + val result = deferredResult.await() + Assert.assertEquals("Operation Success!", result) + } + + @Test + fun `should capture failure of the given operation`() = runTest { + val atomicCoroutineScope = AtomicCoroutineScope.create() + val deferredResult = atomicCoroutineScope.async { + delay(2000) + throw AblyException.fromErrorInfo(ErrorInfo("Error performing operation", 400)) + } + Assert.assertThrows("Error performing operation", AblyException::class.java) { + runBlocking { + deferredResult.await() + } + } + } + + @Test + fun `should perform mutually exclusive operations with given priority`() = runTest { + val atomicCoroutineScope = AtomicCoroutineScope.create() + val deferredResults = mutableListOf>() + var operationInProgress = false + var counter = 0 + val threadIds = mutableSetOf() + + repeat(20) { + val result = atomicCoroutineScope.async(it) { + threadIds.add(Thread.currentThread().id) + if (operationInProgress) { + error("Can't perform operation when other operation is going on") + } + operationInProgress = true + delay((200..800).random().toDuration(DurationUnit.MILLISECONDS)) + operationInProgress = false + val returnValue = counter++ + return@async returnValue + } + deferredResults.add(result) + } + + val results = deferredResults.awaitAll() + repeat(20) { + Assert.assertEquals(it, results[it]) + } + // Scheduler should run all async operations under single thread + Assert.assertEquals(1, threadIds.size) + } +}