Skip to content

Commit

Permalink
Implemented priority executor to execute operations mutually exclusive
Browse files Browse the repository at this point in the history
  • Loading branch information
sacOO7 committed Oct 17, 2024
1 parent ba0f81e commit b70db24
Showing 1 changed file with 48 additions and 0 deletions.
48 changes: 48 additions & 0 deletions chat-android/src/main/java/com/ably/chat/PriorityExecutor.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package com.ably.chat

import java.util.PriorityQueue
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.async
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch

private class OrderedCoroutine(
private val priority: Int,
val suspendedFn: suspend CoroutineScope.() -> Any,
val resultChannel: Channel<Result<Any>>)
: Comparable<OrderedCoroutine> {
override fun compareTo(other: OrderedCoroutine): Int {
return other.priority - this.priority
}
}

/**
* Aim of PriorityExecutor is to execute given coroutine operation mutually exclusive.
* @property scope Uses single threaded dispatcher to avoid thread synchronization issues.
*/
class PriorityExecutor(private val scope: CoroutineScope) {
private var isRunning = false
private val priorityQueue: PriorityQueue<OrderedCoroutine> = PriorityQueue()

suspend fun <T : Any>execute(priority:Int, suspendedFun: suspend CoroutineScope.() -> T) : Channel<Result<T>> {
// Size of resultChannel is set to 1 to keep while loop running.
// i.e. If other end doesn't call receive on the channel, loop will be blocked.
val resultChannel = Channel<Result<Any>>(1)
priorityQueue.add(OrderedCoroutine(priority, suspendedFun, resultChannel))

scope.launch {
if (!isRunning) {
isRunning = true
while (priorityQueue.size > 0) {
val runningCoroutine = priorityQueue.remove()
val result = kotlin.runCatching { scope.async(block = runningCoroutine.suspendedFn).await() }
runningCoroutine.resultChannel.send(result)
}
isRunning = false
}
}

@Suppress("UNCHECKED_CAST")
return resultChannel as Channel<Result<T>>
}
}

0 comments on commit b70db24

Please sign in to comment.