Skip to content

Commit

Permalink
refactor: 코드 리팩터링
Browse files Browse the repository at this point in the history
- I/O 작업 `boundedElastic` 변경
- 예외 처리 통일
- `Mono` & `Flux` 구분
- Retry Backoff 추가
  • Loading branch information
kor-Chipmunk committed Mar 13, 2024
1 parent 30363aa commit d4c1ff2
Showing 1 changed file with 18 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import com.lalala.streaming.handler.Command.*
import io.github.oshai.kotlinlogging.KotlinLogging
import net.bramp.ffmpeg.FFmpeg
import net.bramp.ffmpeg.FFprobe
import org.reactivestreams.Publisher
import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.core.ParameterizedTypeReference
import org.springframework.core.io.buffer.DataBuffer
Expand All @@ -27,12 +28,14 @@ import reactor.core.publisher.Mono
import reactor.core.scheduler.Schedulers
import reactor.kotlin.core.publisher.toFlux
import reactor.kotlin.core.publisher.toMono
import reactor.util.retry.Retry
import java.io.File
import java.io.IOException
import java.net.URI
import java.nio.file.Files
import java.nio.file.Paths
import java.nio.file.StandardOpenOption
import java.time.Duration
import java.util.concurrent.CountDownLatch
import java.util.concurrent.Executor
import java.util.concurrent.TimeUnit
Expand All @@ -48,8 +51,8 @@ inline fun <reified T> typeReference() = object : ParameterizedTypeReference<T>(
class StreamingHandler(
private val ffMpeg: FFmpeg,
private val ffProbe: FFprobe,
@Qualifier("musicClient") private val musicClient: WebClient.Builder,
@Qualifier("storageClient") private val storageClient: WebClient.Builder,
private val musicClient: WebClient.Builder,
private val storageClient: WebClient.Builder,
private val producer: KafkaProducer,
@Qualifier("preprocessingExecutor") private val executor: Executor,
) : WebSocketHandler {
Expand All @@ -58,18 +61,11 @@ class StreamingHandler(
override fun handle(session: WebSocketSession): Mono<Void> {
val messages = session.receive()
.doOnSubscribe { onConnect(session) }
.subscribeOn(Schedulers.parallel())
.subscribeOn(Schedulers.boundedElastic())
.map { it.payloadAsText }
.flatMap { handleTextMessage(session, it) }
.onErrorResume {
if (it is BusinessException) {
when (it.errorCode) {
ErrorCode.MUSIC_NOT_FOUND -> Flux.just(session.textMessage("failed music"))
ErrorCode.STORAGE_FILE_NOT_FOUND -> Flux.just(session.textMessage("failed storage"))
else -> Flux.just(session.textMessage("unknown error"))
}
}
Flux.just(session.textMessage("unknown error"))
Mono.just(session.textMessage("failed streaming"))
}

return session.send(messages)
Expand All @@ -94,7 +90,7 @@ class StreamingHandler(
.subscribe()
}

fun handleTextMessage(session: WebSocketSession, payload: String): Flux<WebSocketMessage> {
fun handleTextMessage(session: WebSocketSession, payload: String): Publisher<WebSocketMessage> {
logger.info { "${session.id} - ${payload}" }

val command = payload.split("/")[0]
Expand Down Expand Up @@ -125,20 +121,20 @@ class StreamingHandler(
.uri("/v1/api/musics/${musicId}")
.accept(MediaType.APPLICATION_JSON)
.retrieve()
.onStatus(HttpStatusCode::isError) {
Mono.error(BusinessException(ErrorCode.MUSIC_NOT_FOUND))
}
.bodyToMono(typeReference<BaseResponse<MusicDetailDTO>>())
.map {
it.data as MusicDetailDTO
}
.timeout(Duration.ofSeconds(1))
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1)))
.onErrorMap { BusinessException(ErrorCode.MUSIC_NOT_FOUND) }
}

fun sendPlayTime(session: WebSocketSession, musicId: String): Flux<WebSocketMessage> {
fun sendPlayTime(session: WebSocketSession, musicId: String): Mono<WebSocketMessage> {
// 음원 서버에서 파일 주소 조회
return getMusic(session, musicId)
.flatMapMany {
Flux.just(session.textMessage("${it.playTime}"))
.flatMap {
Mono.just(session.textMessage("${it.playTime}"))
}
}

Expand Down Expand Up @@ -221,15 +217,13 @@ class StreamingHandler(
.uri(filePath)
.accept(MediaType.APPLICATION_OCTET_STREAM)
.retrieve()
.onStatus(HttpStatusCode::isError) { _ ->
Mono.error(BusinessException(ErrorCode.STORAGE_FILE_NOT_FOUND))
}
.bodyToFlux(DataBuffer::class.java)
.timeout(Duration.ofSeconds(1))
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1)))
.onErrorMap { BusinessException(ErrorCode.STORAGE_FILE_NOT_FOUND) }

val outputPath = Paths.get(StreamingConstant.TEMP_FOLDER, "${session.id}.mp3")
return Mono.fromRunnable<Boolean> {
Files.deleteIfExists(outputPath)
}
return Files.deleteIfExists(outputPath).toMono()
.then(
DataBufferUtils.write(downloadFlux, outputPath, StandardOpenOption.CREATE)
)
Expand Down

0 comments on commit d4c1ff2

Please sign in to comment.