Skip to content

Commit

Permalink
KTOR-7518 Add ReadableByteChannel to ByteReadChannel interop util (#4358
Browse files Browse the repository at this point in the history
)
  • Loading branch information
bjhham authored Oct 4, 2024
1 parent e4ecb22 commit 8d3f7c7
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 0 deletions.
6 changes: 6 additions & 0 deletions ktor-io/api/ktor-io.api
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,12 @@ public final class io/ktor/utils/io/jvm/javaio/WritingKt {
public static synthetic fun copyTo$default (Lio/ktor/utils/io/ByteReadChannel;Ljava/io/OutputStream;JLkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
}

public final class io/ktor/utils/io/jvm/nio/ReadingKt {
public static final fun asSource (Ljava/nio/channels/ReadableByteChannel;)Lkotlinx/io/RawSource;
public static final fun toByteReadChannel (Ljava/nio/channels/ReadableByteChannel;Lkotlin/coroutines/CoroutineContext;)Lio/ktor/utils/io/ByteReadChannel;
public static synthetic fun toByteReadChannel$default (Ljava/nio/channels/ReadableByteChannel;Lkotlin/coroutines/CoroutineContext;ILjava/lang/Object;)Lio/ktor/utils/io/ByteReadChannel;
}

public final class io/ktor/utils/io/jvm/nio/WriteSuspendSession {
public fun <init> (Lio/ktor/utils/io/ByteWriteChannel;)V
public final fun getChannel ()Lio/ktor/utils/io/ByteWriteChannel;
Expand Down
1 change: 1 addition & 0 deletions ktor-io/jvm/src/io/ktor/utils/io/jvm/javaio/Reading.kt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import kotlinx.io.EOFException
import kotlinx.io.IOException
import java.io.*
import java.nio.*
import java.nio.channels.*
import kotlin.coroutines.*

/**
Expand Down
71 changes: 71 additions & 0 deletions ktor-io/jvm/src/io/ktor/utils/io/jvm/nio/Reading.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright 2014-2024 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
*/

package io.ktor.utils.io.jvm.nio

import io.ktor.utils.io.*
import io.ktor.utils.io.jvm.javaio.*
import kotlinx.coroutines.*
import kotlinx.io.*
import kotlinx.io.Buffer
import kotlinx.io.unsafe.*
import java.nio.*
import java.nio.channels.*
import kotlin.coroutines.*

/**
* Converts a [ReadableByteChannel] to a [ByteReadChannel], enabling asynchronous reading of bytes.
*
* @param context the [CoroutineContext] to execute the read operation. Defaults to [Dispatchers.IO].
* @return a [ByteReadChannel] for reading bytes asynchronously from the given [ReadableByteChannel].
*/
public fun ReadableByteChannel.toByteReadChannel(
context: CoroutineContext = Dispatchers.IO,
): ByteReadChannel = RawSourceChannel(asSource(), context)

/**
* Converts a [ReadableByteChannel] into a [RawSource].
*
* This extension function wraps the given [ReadableByteChannel] into a [RawSource],
* enabling efficient reading of bytes from the channel as a source of data.
*
* @return a [RawSource] representation of the [ReadableByteChannel].
*/
public fun ReadableByteChannel.asSource(): RawSource =
ReadableByteChannelSource(this)

/**
* A data source that reads from a [ReadableByteChannel].
*
* This class implements the [RawSource] interface, allowing for the reading
* of bytes from a [ReadableByteChannel] into a [Buffer].
*
* @property channel The [ReadableByteChannel] from which bytes are read.
*/
private open class ReadableByteChannelSource(
private val channel: ReadableByteChannel,
) : RawSource {
@OptIn(UnsafeIoApi::class)
override fun readAtMostTo(sink: Buffer, byteCount: Long): Long {
if (byteCount <= 0L) return 0L

var readTotal: Int
val actualByteCount = minOf(byteCount, Int.MAX_VALUE.toLong()).toInt()

UnsafeBufferOperations.writeToTail(sink, 1) { data, pos, limit ->
val maxToCopy = minOf(actualByteCount, limit - pos)
val buffer = ByteBuffer.wrap(data, pos, maxToCopy)
readTotal = channel.read(buffer)
maxOf(readTotal, 0)
}

return readTotal.toLong()
}

override fun close() =
channel.close()

override fun toString(): String =
"ReadableByteChannelSource($channel)"
}
35 changes: 35 additions & 0 deletions ktor-io/jvm/test/io/ktor/utils/io/jvm/nio/ReadingTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright 2014-2024 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
*/

package io.ktor.utils.io.jvm.nio

import io.ktor.utils.io.*
import kotlinx.coroutines.test.*
import java.io.*
import java.nio.channels.*
import java.nio.file.*
import kotlin.io.path.*
import kotlin.test.*

class ReadingTest {

@Test
fun readsFromByteChannel() = runTest {
val expected = "This is a test string"
val channel = Channels.newChannel(ByteArrayInputStream(expected.encodeToByteArray()))
val actual = channel.toByteReadChannel().readRemaining().readText()
assertEquals(expected, actual)
}

@Test
fun readsFromFileChannelAndCloses() = runTest {
val expected = "This is a test string"
val temp = Files.createTempFile("file", "txt")
temp.writeText(expected)
val channel = Files.newByteChannel(temp)
val actual = channel.toByteReadChannel().readRemaining().readText()
assertEquals(expected, actual)
assertFalse(channel.isOpen)
}
}

0 comments on commit 8d3f7c7

Please sign in to comment.