Skip to content

Commit

Permalink
코루틴 스터디 모듈
Browse files Browse the repository at this point in the history
  • Loading branch information
murjune committed Sep 11, 2024
1 parent 3dd4826 commit e564b8e
Show file tree
Hide file tree
Showing 46 changed files with 3,091 additions and 47 deletions.
10 changes: 10 additions & 0 deletions coroutine/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Introduction

Kotlin Coroutine/Flow 의 stlib 기본 문법에 대해 학습하는 모듈입니다.
- 학습테스트를 통해 학습하고, 정리합니다.
- main() 함수를 통해 학습하던 코드를 test 코드로 마이그레이션 중입니다.
- Test Tool: Junit5, Kotest
- 코루틴에 대해 학습한 내용 중 일부 아래 블로그에 정리하고 있습니다.
https://velog.io/@murjune/series/Coroutine


Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package com.murjune.practice.callbackflow

import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.callbackFlow
import kotlinx.coroutines.withTimeout

// 가상의 센서 API 콜백
interface SensorCallback {
fun onSensorDataReceived(data: FloatArray)
fun onSensorError(error: Throwable)
}

// 센서 매니저: 콜백을 통해 지속적으로 센서 데이터를 제공하는 가상 API
class SensorManager {
private var callback: SensorCallback? = null

// 센서 데이터 수신을 시작하는 메소드
fun startSensorUpdates(callback: SensorCallback) {
this.callback = callback
// 가상의 센서 데이터 전달 시뮬레이션
for (i in 1..10) {
callback.onSensorDataReceived(floatArrayOf(randomFloat(), randomFloat(), randomFloat()))
}
}

// 센서 업데이트를 중지하는 메소드
fun stopSensorUpdates() {
callback = null
}

private fun randomFloat() = (1..10).random().toFloat() + (0..9).random().toFloat() / 10
}

class SensorRepository {
private val sensorManager = SensorManager()

// 센서 데이터를 Flow로 변환하는 메소드
fun sensorDataStream(): Flow<FloatArray> = callbackFlow {
val sensorCallback = object : SensorCallback {
override fun onSensorDataReceived(data: FloatArray) {
trySend(data).isSuccess // 수신된 센서 데이터를 Flow로 전송
}

override fun onSensorError(error: Throwable) {
close(error) // 에러 발생 시 Flow 종료
}
}

// 센서 업데이트 시작
sensorManager.startSensorUpdates(sensorCallback)

// Flow가 종료되거나 취소되면 센서 업데이트 중지
awaitClose {
sensorManager.stopSensorUpdates()
}
}
}

suspend fun main() {
withTimeout(1000) {
SensorRepository().sensorDataStream().collect { data ->
println("Received sensor data: ${data.joinToString(", ")}")
}
}
}
50 changes: 50 additions & 0 deletions coroutine/src/main/kotlin/com/murjune/practice/channel/Actor.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package com.murjune.practice.channel

import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.ObsoleteCoroutinesApi
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.actor
import kotlinx.coroutines.channels.produce
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext

@OptIn(ObsoleteCoroutinesApi::class)
suspend fun main() {
coroutineScope {
var count = 0
val actor = actor<Int>(capacity = 10) {
val receiveChannel: ReceiveChannel<Int> = channel
for (i in receiveChannel) {
count += i
}
println(count)
}
withContext(Dispatchers.Default) {
List(1000) {
launch {
delay(1)
actor.send(1)
}
}
coroutineContext[Job]?.children?.forEach { it.join() }
actor.close()
}
}
coroutineScope {
var count = 0
val channel = produce<Int>(capacity = 10) {
for (i in 1..1000) {
send(1)
}
}
launch(Dispatchers.Default) {
for (i in channel) {
count += i
}
println(count)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package com.murjune.practice.channelflow

import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlin.time.Duration.Companion.milliseconds

fun currentTime() = System.currentTimeMillis()
var startTime = 0L
suspend fun main() {
val api = FakeUserApi()
startTime = currentTime()
val user = usersFlow(api).first {
println("Collect $it ${(currentTime() - startTime).milliseconds}")
delay(1000)
it.name == "User 3"
}
println(user)
}

private fun usersFlow(api: UserApi): Flow<User> = flow {
var page = 0
do {
println("----Produce page $page----")
val users = api.users(page++)
println("$users - ${(currentTime() - startTime).milliseconds}")
emitAll(users.asFlow())
} while (users.isNotEmpty())
}

data class User(val name: String)

interface UserApi {
suspend fun users(pageNumber: Int): List<User>
}

class FakeUserApi : UserApi {
private val users = List(20) { User("User $it") }
private val pageSize: Int = 3
override suspend fun users(pageNumber: Int): List<User> {
delay(1000)
return users
.drop(pageNumber * pageSize)
.take(pageSize)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package com.murjune.practice.channelflow

import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlin.time.Duration.Companion.milliseconds

suspend fun main() {
val api = FakeUserApi()
startTime = currentTime()
val user = usersFlow(api).first {
println("Collect $it ${(currentTime() - startTime).milliseconds}")
delay(1000)
it.name == "User 3"
}
println(user)
}

private fun usersFlow(api: UserApi): Flow<User> = channelFlow {
var page = 0
do {
println("----Produce page $page----")
val users = api.users(page++)
println("$users - ${(currentTime() - startTime).milliseconds}")
users.forEach { send(it) }
} while (users.isNotEmpty())
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.murjune.practice.coroutinescope

import kotlinx.coroutines.*
import kotlin.coroutines.CoroutineContext

@OptIn(ExperimentalCoroutinesApi::class)
suspend fun main() {
val scope = object : CoroutineScope {
override val coroutineContext: CoroutineContext = Dispatchers.Default
}

scope.launch {
delay(10)
println(coroutineContext[Job])
println(coroutineContext[Job]?.parent)
}
scope.cancel()
delay(100)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package com.murjune.practice.dispatcher

import com.murjune.practice.utils.launchWithName
import com.murjune.practice.utils.log
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.Job
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.newFixedThreadPoolContext
import kotlinx.coroutines.newSingleThreadContext
import kotlinx.coroutines.runBlocking
import kotlin.time.measureTime

@OptIn(ExperimentalCoroutinesApi::class)
private suspend fun ex_singleThreadpool1() = coroutineScope {
val dispatcher = newFixedThreadPoolContext(1, "multiple-thread")
val scope = CoroutineScope(coroutineContext + dispatcher)
scope.launch {
repeat(1000) {
print("-")
}
}
scope.launch {
repeat(1000) {
print("|")
}
}
// 결과는 ----------- 가 다 끝나고 |||||||||| 가 실행된다.
// 이유는 1번이 끝날때까지 2번 코루틴은 작업 대기 상태에 놓이기 때문이다.
}

@OptIn(ExperimentalCoroutinesApi::class)
private suspend fun ex_singleThreadpool2() = coroutineScope {
measureTime {
val job = Job()
val dispatcher = newSingleThreadContext("single-thread")
val scope = CoroutineScope(dispatcher + job)
scope.launchWithName("coroutine-1") {
log("ccccc")
delay(1000)
log("ddddd")
}
scope.launchWithName("coroutine-2") {
log("ccccc")
delay(1000)
log("ddddd")
}
job.complete()
job.join()
}.also { log("실행 시간: $it") }
}

@OptIn(DelicateCoroutinesApi::class)
private suspend fun ex_multipleThreadpool() = coroutineScope {
measureTime {
val job = Job()
val dispatcher = newFixedThreadPoolContext(2, "multiple-thread")
val scope = CoroutineScope(dispatcher + job)
scope.launchWithName("coroutine-1") {
log("aaaaa")
delay(1000)
log("bbbbb")
}
scope.launchWithName("coroutine-2") {
log("ccccc")
delay(1000)
log("ddddd")
}
job.complete()
job.join()
}.also { log("실행 시간: $it") }
}

fun main() {
runBlocking {
val dispatcher = newFixedThreadPoolContext(1, "multiple-thread")
val scope = CoroutineScope(coroutineContext + dispatcher)
scope.launch {
repeat(1000) {
print("-")
}
}
scope.launch {
repeat(1000) {
print("|")
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.murjune.practice.dispatcher

import com.murjune.practice.utils.log
import kotlinx.coroutines.*

fun main() {
runBlocking {
val threadNames = mutableSetOf<String>()
threadNames.add(Thread.currentThread().name)
withContext(Dispatchers.Default.limitedParallelism(5)) {
val jobs = List(1000) {
launch {
threadNames.add(Thread.currentThread().name)
delay(100)
}
}
jobs.joinAll()
}

val uniqueThreadsCount = threadNames.size
log(
"Unique Threads: $uniqueThreadsCount PROCESSOR ${
Runtime.getRuntime().availableProcessors()
} \nTHREAD NAME \n${threadNames.joinToString("\n")}"
)
}
}
36 changes: 36 additions & 0 deletions coroutine/src/main/kotlin/com/murjune/practice/exception/Ex.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package com.murjune.practice.exception

import kotlinx.coroutines.*

//val time = TimeSource.Monotonic
//fun mark() = time.markNow()
fun main() {
runBlocking {
// val s = mark()
// val li = List(3) {
// async {
// try {
// println("start - ${mark() - s}")
// val imageUrl =
// URL("https://velog.velcdn.com/images/murjune/profile/998edd4f-1357-4c8f-b7a4-3d6cd723c800/image.jpg")
// val connection: HttpURLConnection = imageUrl.openConnection() as HttpURLConnection
// connection.doInput = true
// connection.connect()
// val inputStream: InputStream = connection.inputStream
// ImageIO.read(inputStream)
// } catch (e: Exception) {
// e.printStackTrace()
// null
// } finally {
// println("finish - ${mark() - s}")
// }
// }
// }

CoroutineScope(Dispatchers.IO).launch {
error("코루틴 error ⚠️")
}
delay(2000)
println("얘가 실행안될 것 같죠? 실행됩니다~")
}
}
Loading

0 comments on commit e564b8e

Please sign in to comment.