From c3bf5e008e4b083da5ef00d601f32c0ac0713e6e Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Fri, 18 Oct 2024 19:49:45 +0530 Subject: [PATCH] Refactored TaskScheduler, renamed to AtomicCoroutineScope which can only be initialized from static method --- .../com/ably/chat/AtomicCoroutineScope.kt | 84 +++++++++++++++++++ .../main/java/com/ably/chat/TaskScheduler.kt | 55 ------------ ...lerTest.kt => AtomicCoroutineScopeTest.kt} | 34 ++++---- 3 files changed, 98 insertions(+), 75 deletions(-) create mode 100644 chat-android/src/main/java/com/ably/chat/AtomicCoroutineScope.kt delete mode 100644 chat-android/src/main/java/com/ably/chat/TaskScheduler.kt rename chat-android/src/test/java/com/ably/chat/{TaskSchedulerTest.kt => AtomicCoroutineScopeTest.kt} (60%) diff --git a/chat-android/src/main/java/com/ably/chat/AtomicCoroutineScope.kt b/chat-android/src/main/java/com/ably/chat/AtomicCoroutineScope.kt new file mode 100644 index 00000000..b53d00c0 --- /dev/null +++ b/chat-android/src/main/java/com/ably/chat/AtomicCoroutineScope.kt @@ -0,0 +1,84 @@ +package com.ably.chat + +import java.util.PriorityQueue +import java.util.concurrent.Executors +import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.ExecutorCoroutineDispatcher +import kotlinx.coroutines.asCoroutineDispatcher +import kotlinx.coroutines.async +import kotlinx.coroutines.cancel +import kotlinx.coroutines.launch + +/** + * Each ChatRoomLifecycleManager is supposed to have it's own AtomicCoroutineScope. + * AtomicCoroutineScope makes sure all operations are atomic and run with given priority. + * Uses single threaded dispatcher to avoid thread synchronization issues. + */ +class AtomicCoroutineScope private constructor(private val scope: CoroutineScope) { + + private class Job( + private val priority: Int, + val coroutineBlock: suspend CoroutineScope.() -> Any, + val deferredResult: CompletableDeferred, + ) : + Comparable { + override fun compareTo(other: Job): Int = this.priority.compareTo(other.priority) + } + + private var isRunning = false + private val jobs: PriorityQueue = PriorityQueue() + + /** + * @param priority Defines priority for the operation execution. + * @param coroutineBlock Suspended function that needs to be executed mutually exclusive under given scope. + */ + suspend fun async(priority: Int = 0, coroutineBlock: suspend CoroutineScope.() -> T): CompletableDeferred { + val deferredResult = CompletableDeferred() + scope.launch { + jobs.add(Job(priority, coroutineBlock, deferredResult)) + if (!isRunning) { + isRunning = true + while (jobs.isNotEmpty()) { + val job = jobs.poll() + job?.let { + try { + it.deferredResult.complete(scope.async(block = it.coroutineBlock).await()) + } catch (t: Throwable) { + it.deferredResult.completeExceptionally(t) + } + } + } + isRunning = false + } + } + + @Suppress("UNCHECKED_CAST") + return deferredResult as CompletableDeferred + } + + /** + * Cancels all jobs along along with it's children. + * This includes cancelling queued jobs and current retry timers. + */ + fun cancel(message: String, cause: Throwable? = null) { + jobs.clear() + scope.cancel(message, cause) + } + + companion object { + private var _singleThreadedDispatcher : ExecutorCoroutineDispatcher? = null + + fun create(): AtomicCoroutineScope { + if (_singleThreadedDispatcher == null) { + _singleThreadedDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher(); + } + return AtomicCoroutineScope(CoroutineScope(singleThreadedDispatcher)) + } + + val singleThreadedDispatcher: ExecutorCoroutineDispatcher + get() { + return _singleThreadedDispatcher?: error("Call SingleThreadedExecutor.create() method to initialize SingleThreadedDispatcher") + } + } +} diff --git a/chat-android/src/main/java/com/ably/chat/TaskScheduler.kt b/chat-android/src/main/java/com/ably/chat/TaskScheduler.kt deleted file mode 100644 index 5ce286f5..00000000 --- a/chat-android/src/main/java/com/ably/chat/TaskScheduler.kt +++ /dev/null @@ -1,55 +0,0 @@ -package com.ably.chat - -import java.util.PriorityQueue -import kotlinx.coroutines.CompletableDeferred -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.async -import kotlinx.coroutines.launch - -private class Task( - private val priority: Int, - val coroutineBlock: suspend CoroutineScope.() -> Any, - val deferredResult: CompletableDeferred, -) : - Comparable { - override fun compareTo(other: Task): Int = this.priority.compareTo(other.priority) -} - -/** - * TaskScheduler schedules given coroutine operation mutually exclusive. - * @property scope Uses single threaded dispatcher to avoid thread synchronization issues. - */ -class TaskScheduler(private val scope: CoroutineScope) { - private var isRunning = false - private val tasks: PriorityQueue = PriorityQueue() - - /** - * @param priority Defines priority for the operation execution. - * Default=0, means operation will be performed immediately after ongoing operation. - * This can also be set to negative number if operation needs higher priority than existing ones. - * @param coroutineBlock Suspended function that needs to be executed mutually exclusive under given scope. - */ - suspend fun schedule(priority: Int = 0, coroutineBlock: suspend CoroutineScope.() -> T): CompletableDeferred { - val deferredResult = CompletableDeferred() - scope.launch { - tasks.add(Task(priority, coroutineBlock, deferredResult)) - if (!isRunning) { - isRunning = true - while (tasks.isNotEmpty()) { - val task = tasks.poll() - task?.let { - try { - it.deferredResult.complete(scope.async(block = it.coroutineBlock).await()) - } catch (t: Throwable) { - it.deferredResult.completeExceptionally(t) - } - } - } - isRunning = false - } - } - - @Suppress("UNCHECKED_CAST") - return deferredResult as CompletableDeferred - } -} diff --git a/chat-android/src/test/java/com/ably/chat/TaskSchedulerTest.kt b/chat-android/src/test/java/com/ably/chat/AtomicCoroutineScopeTest.kt similarity index 60% rename from chat-android/src/test/java/com/ably/chat/TaskSchedulerTest.kt rename to chat-android/src/test/java/com/ably/chat/AtomicCoroutineScopeTest.kt index 80f71c30..8016f356 100644 --- a/chat-android/src/test/java/com/ably/chat/TaskSchedulerTest.kt +++ b/chat-android/src/test/java/com/ably/chat/AtomicCoroutineScopeTest.kt @@ -2,12 +2,9 @@ package com.ably.chat import io.ably.lib.types.AblyException import io.ably.lib.types.ErrorInfo -import java.util.concurrent.Executors import kotlin.time.DurationUnit import kotlin.time.toDuration -import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Deferred -import kotlinx.coroutines.asCoroutineDispatcher import kotlinx.coroutines.awaitAll import kotlinx.coroutines.delay import kotlinx.coroutines.runBlocking @@ -15,46 +12,43 @@ import kotlinx.coroutines.test.runTest import org.junit.Assert import org.junit.Test -class TaskSchedulerTest { +class AtomicCoroutineScopeTest { @Test fun `should perform given operation`() = runTest { - val singleThreadedDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher() - val taskScheduler = TaskScheduler(CoroutineScope(singleThreadedDispatcher)) - val taskResult = taskScheduler.schedule { + val atomicCoroutineScope = AtomicCoroutineScope.create() + val deferredResult = atomicCoroutineScope.async { delay(3000) - return@schedule "Operation Success!" + return@async "Operation Success!" } - val result = taskResult.await() + val result = deferredResult.await() Assert.assertEquals("Operation Success!", result) } @Test fun `should capture failure of the given operation`() = runTest { - val singleThreadedDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher() - val taskScheduler = TaskScheduler(CoroutineScope(singleThreadedDispatcher)) - val taskResult = taskScheduler.schedule { + val atomicCoroutineScope = AtomicCoroutineScope.create() + val deferredResult = atomicCoroutineScope.async { delay(2000) throw AblyException.fromErrorInfo(ErrorInfo("Error performing operation", 400)) } Assert.assertThrows("Error performing operation", AblyException::class.java) { runBlocking { - taskResult.await() + deferredResult.await() } } } @Test fun `should perform mutually exclusive operations with given priority`() = runTest { - val singleThreadedDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher() - val taskScheduler = TaskScheduler(CoroutineScope(singleThreadedDispatcher)) - val taskResults = mutableListOf>() + val atomicCoroutineScope = AtomicCoroutineScope.create() + val deferredResults = mutableListOf>() var operationInProgress = false var counter = 0 val threadIds = mutableSetOf() repeat(20) { - val result = taskScheduler.schedule(it) { + val result = atomicCoroutineScope.async(it) { threadIds.add(Thread.currentThread().id) if (operationInProgress) { error("Can't perform operation when other operation is going on") @@ -63,12 +57,12 @@ class TaskSchedulerTest { delay((200..800).random().toDuration(DurationUnit.MILLISECONDS)) operationInProgress = false val returnValue = counter++ - return@schedule returnValue + return@async returnValue } - taskResults.add(result) + deferredResults.add(result) } - val results = taskResults.awaitAll() + val results = deferredResults.awaitAll() repeat(20) { Assert.assertEquals(it, results[it]) }