Skip to content

Commit

Permalink
Removed use of TaskResult, used CompletableDeferred instead, updated …
Browse files Browse the repository at this point in the history
…tests

accordingly
  • Loading branch information
sacOO7 committed Oct 18, 2024
1 parent 4d1d501 commit 25759b0
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 44 deletions.
42 changes: 14 additions & 28 deletions chat-android/src/main/java/com/ably/chat/TaskScheduler.kt
Original file line number Diff line number Diff line change
@@ -1,35 +1,18 @@
package com.ably.chat

import java.util.PriorityQueue
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.async
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch

private class Task(
private val priority: Int,
val coroutineBlock: suspend CoroutineScope.() -> Any,
val result: TaskResult<Any>)
: Comparable<Task> {
override fun compareTo(other: Task): Int {
return this.priority.compareTo(other.priority)
}

suspend fun setResult(res: Result<Any>) {
result.channel.send(res)
}
}

class TaskResult<T> {
// Size of channel is set to 1. This is to avoid sender getting blocked
// because receiver doesn't call receive on the channel
internal val channel = Channel<Result<T>>(1)

suspend fun await(): Result<T> {
val result = channel.receive()
channel.close()
return result
}
val deferredResult: CompletableDeferred<Any>,
) :
Comparable<Task> {
override fun compareTo(other: Task): Int = this.priority.compareTo(other.priority)
}

/**
Expand All @@ -46,24 +29,27 @@ class TaskScheduler(private val scope: CoroutineScope) {
* This can also be set to negative number if operation needs higher priority than existing ones.
* @param coroutineBlock Suspended function that needs to be executed mutually exclusive under given scope.
*/
suspend fun <T : Any>schedule(priority:Int = 0, coroutineBlock: suspend CoroutineScope.() -> T) : TaskResult<T> {
val taskResult = TaskResult<Any>()
suspend fun <T : Any>schedule(priority: Int = 0, coroutineBlock: suspend CoroutineScope.() -> T): CompletableDeferred<T> {
val deferredResult = CompletableDeferred<Any>()
scope.launch {
tasks.add(Task(priority, coroutineBlock, taskResult))
tasks.add(Task(priority, coroutineBlock, deferredResult))
if (!isRunning) {
isRunning = true
while (tasks.isNotEmpty()) {
val task = tasks.poll()
task?.let {
val result = kotlin.runCatching { scope.async(block = it.coroutineBlock).await() }
it.setResult(result)
try {
it.deferredResult.complete(scope.async(block = it.coroutineBlock).await())
} catch (t: Throwable) {
it.deferredResult.completeExceptionally(t)
}
}
}
isRunning = false
}
}

@Suppress("UNCHECKED_CAST")
return taskResult as TaskResult<T>
return deferredResult as CompletableDeferred<T>
}
}
32 changes: 16 additions & 16 deletions chat-android/src/test/java/com/ably/chat/TaskSchedulerTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@ import java.util.concurrent.Executors
import kotlin.time.DurationUnit
import kotlin.time.toDuration
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.asCoroutineDispatcher
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.test.runTest
import org.junit.Assert
import org.junit.Test
Expand All @@ -23,9 +26,7 @@ class TaskSchedulerTest {
return@schedule "Operation Success!"
}
val result = taskResult.await()
Assert.assertTrue(result.isSuccess)
Assert.assertFalse(result.isFailure)
Assert.assertEquals("Operation Success!", result.getOrNull())
Assert.assertEquals("Operation Success!", result)
}

@Test
Expand All @@ -36,18 +37,18 @@ class TaskSchedulerTest {
delay(2000)
throw AblyException.fromErrorInfo(ErrorInfo("Error performing operation", 400))
}
val result = taskResult.await()
Assert.assertFalse(result.isSuccess)
Assert.assertTrue(result.isFailure)
val exception = result.exceptionOrNull() as AblyException
Assert.assertEquals("Error performing operation", exception.errorInfo.message)
Assert.assertThrows("Error performing operation", AblyException::class.java) {
runBlocking {
taskResult.await()
}
}
}

@Test
fun `should perform mutually exclusive operations with given priority`() = runTest {
val singleThreadedDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher()
val taskScheduler = TaskScheduler(CoroutineScope(singleThreadedDispatcher))
val taskResults = mutableListOf<TaskResult<Int>>()
val taskResults = mutableListOf<Deferred<Int>>()
var operationInProgress = false
var counter = 0
val threadIds = mutableSetOf<Long>()
Expand All @@ -56,23 +57,22 @@ class TaskSchedulerTest {
val result = taskScheduler.schedule(it) {
threadIds.add(Thread.currentThread().id)
if (operationInProgress) {
throw IllegalStateException("Can't perform operation when other operation is going on")
error("Can't perform operation when other operation is going on")
}
operationInProgress = true
delay((200..800).random().toDuration(DurationUnit.MILLISECONDS))
operationInProgress = false
return@schedule counter++
val returnValue = counter++
return@schedule returnValue
}
taskResults.add(result)
}

val results = taskResults.map { it.await() }
val results = taskResults.awaitAll()
repeat(20) {
Assert.assertTrue(results[it].isSuccess)
Assert.assertEquals(it, results[it].getOrNull())
Assert.assertEquals(it, results[it])
}

// Scheduler should run all async operations under single thread
Assert.assertEquals(1, threadIds.size);
Assert.assertEquals(1, threadIds.size)
}
}

0 comments on commit 25759b0

Please sign in to comment.