Skip to content

Commit

Permalink
Simplified naming conventions for PriorityExecutor, renamed to Atomic…
Browse files Browse the repository at this point in the history
…Executor
  • Loading branch information
sacOO7 committed Oct 17, 2024
1 parent b70db24 commit 0596e88
Showing 1 changed file with 13 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,37 +6,37 @@ import kotlinx.coroutines.async
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch

private class OrderedCoroutine(
private class Task(
private val priority: Int,
val suspendedFn: suspend CoroutineScope.() -> Any,
val coroutineBlock: suspend CoroutineScope.() -> Any,
val resultChannel: Channel<Result<Any>>)
: Comparable<OrderedCoroutine> {
override fun compareTo(other: OrderedCoroutine): Int {
: Comparable<Task> {
override fun compareTo(other: Task): Int {
return other.priority - this.priority
}
}

/**
* Aim of PriorityExecutor is to execute given coroutine operation mutually exclusive.
* Aim of AtomicExecutor is to execute given coroutine operation mutually exclusive.
* @property scope Uses single threaded dispatcher to avoid thread synchronization issues.
*/
class PriorityExecutor(private val scope: CoroutineScope) {
class AtomicExecutor(private val scope: CoroutineScope) {
private var isRunning = false
private val priorityQueue: PriorityQueue<OrderedCoroutine> = PriorityQueue()
private val priorityQueue: PriorityQueue<Task> = PriorityQueue()

suspend fun <T : Any>execute(priority:Int, suspendedFun: suspend CoroutineScope.() -> T) : Channel<Result<T>> {
suspend fun <T : Any>execute(priority:Int, coroutineBlock: suspend CoroutineScope.() -> T) : Channel<Result<T>> {
// Size of resultChannel is set to 1 to keep while loop running.
// i.e. If other end doesn't call receive on the channel, loop will be blocked.
// i.e. If caller doesn't explicitly receive on the channel, loop will be blocked.
val resultChannel = Channel<Result<Any>>(1)
priorityQueue.add(OrderedCoroutine(priority, suspendedFun, resultChannel))
priorityQueue.add(Task(priority, coroutineBlock, resultChannel))

scope.launch {
if (!isRunning) {
isRunning = true
while (priorityQueue.size > 0) {
val runningCoroutine = priorityQueue.remove()
val result = kotlin.runCatching { scope.async(block = runningCoroutine.suspendedFn).await() }
runningCoroutine.resultChannel.send(result)
val task = priorityQueue.remove()
val result = kotlin.runCatching { scope.async(block = task.coroutineBlock).await() }
task.resultChannel.send(result)
}
isRunning = false
}
Expand Down

0 comments on commit 0596e88

Please sign in to comment.