Skip to content

Commit

Permalink
KTOR-8194 Fix SelectorManager dispatcher ignored on Native
Browse files Browse the repository at this point in the history
  • Loading branch information
Thomas-Vos committed Feb 13, 2025
1 parent bd652e4 commit 75459e6
Show file tree
Hide file tree
Showing 7 changed files with 19 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ internal class CIOEngine(
requestJob.join()
} finally {
selector.close()
selector.coroutineContext[Job]!!.join()
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ internal actual class SelectorHelper {
}

@OptIn(ExperimentalForeignApi::class, InternalAPI::class)
private fun selectionLoop() {
private suspend fun selectionLoop() {
val completed = mutableSetOf<EventInfo>()
val watchSet = mutableSetOf<EventInfo>()
val closeSet = mutableSetOf<Int>()
Expand All @@ -84,6 +84,8 @@ internal actual class SelectorHelper {

maxDescriptor = max(maxDescriptor + 1, wakeupSignalEvent.descriptor + 1)

yield()

try {
selector_pselect(maxDescriptor + 1, readSet, writeSet, errorSet).check()
} catch (_: PosixException.BadFileDescriptorException) {
Expand Down Expand Up @@ -263,4 +265,8 @@ internal actual class SelectorHelper {
private fun isDescriptorValid(descriptor: Int): Boolean {
return fcntl(descriptor, F_GETFL) != -1 || errno != EBADF
}

private suspend inline fun dispatchIfNeeded() {
yield() // it will always redispatch it to the right thread
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class TcpSocketTestNix {
socket.close()
selector.close()

selector.coroutineContext[Job]?.join()
socket.awaitClosed()

val isDescriptorValid = fcntl(descriptor, F_GETFL) != -1 || errno != EBADF
check(!isDescriptorValid) { "Descriptor was not closed" }
Expand Down Expand Up @@ -59,7 +59,9 @@ class TcpSocketTestNix {
server.close()
selector.close()

selector.coroutineContext[Job]?.join()
serverConnection.awaitClosed()
clientConnection.awaitClosed()
server.awaitClosed()

val isServerDescriptorValid = fcntl(serverDescriptor, F_GETFL) != -1 || errno != EBADF
check(!isServerDescriptorValid) { "Server descriptor was not closed" }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class UdpSocketTestNix {
socket.close()
selector.close()

selector.coroutineContext[Job]?.join()
socket.awaitClosed()

val isDescriptorValid = fcntl(descriptor, F_GETFL) != -1 || errno != EBADF
check(!isDescriptorValid) { "Descriptor was not closed" }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import kotlin.coroutines.*

public actual fun SelectorManager(
dispatcher: CoroutineContext
): SelectorManager = WorkerSelectorManager()
): SelectorManager = WorkerSelectorManager(dispatcher)

public actual interface SelectorManager : CoroutineScope, Closeable {
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,8 @@ import kotlinx.coroutines.*
import kotlinx.io.*
import kotlin.coroutines.*

@OptIn(ExperimentalCoroutinesApi::class)
internal class WorkerSelectorManager : SelectorManager {
@OptIn(DelicateCoroutinesApi::class)
private val selectorContext = newSingleThreadContext("WorkerSelectorManager")
private val job = Job()
override val coroutineContext: CoroutineContext = selectorContext + job
internal class WorkerSelectorManager(context: CoroutineContext) : SelectorManager {
override val coroutineContext: CoroutineContext = context + CoroutineName("selector")

private val selector = SelectorHelper()

Expand All @@ -39,7 +35,5 @@ internal class WorkerSelectorManager : SelectorManager {

override fun close() {
selector.requestTermination()
selectorContext.close()
job.cancel()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,17 @@ internal actual class SelectorHelper {
}

@OptIn(ExperimentalForeignApi::class, InternalAPI::class)
private fun selectionLoop() {
private suspend fun selectionLoop() {
val completed = mutableSetOf<EventInfo>()
val watchSet = mutableSetOf<EventInfo>()
val closeSet = mutableSetOf<Int>()

try {
while (!interestQueue.isClosed) {
val wsaEvents = fillHandlersOrClose(watchSet, completed, closeSet)

yield()

val index = memScoped {
val length = wsaEvents.size + 1
val wsaEventsWithWake = allocArray<CPointerVarOf<COpaquePointer>>(length).apply {
Expand Down

0 comments on commit 75459e6

Please sign in to comment.