Develop/Reactor+Coroutine

[Coroutine] 코루틴 빌더, 코루틴 컨텍스트

연로그 2024. 6. 23. 18:47
반응형

코루틴 스터디를 하며 정리중인 시리즈 글

  1. 왜 코루틴을 써야할까?
  2. [Coroutine] 코루틴 빌더, 코루틴 컨텍스트
  3. [Coroutine] 구조화된 동시성
  4. [Coroutine] 코루틴 스코프 함수

 


🔅 코루틴 빌더

 

코틀린 코루틴은 코루틴을 일시 중단시킨다. 이런 방식을 통해 스레드를 블로킹할 때보다 훨씬 가볍게 사용할 수 있다. 그런데 이 코루틴 중단은 중단 함수 내에서만 일어날 수 있다. 그리고 중단함수는 중단함수 내에서만 호출되어야 한다. 중단함수가 최초로 시작되는 지점은 어디일까? 바로 코루틴 빌더다.

 

🔸 코루틴 빌더

  • 중단 함수가 시작되는 지점
  • 모든 코루틴 빌더는 자신의 Job을 생성 (Job에 대한 설명은 하단에 있다.)
  • launch, async, runBlocking 등이 있다.

 

🔸 launch

  • 별개의 코루틴을 만들어 작업 실행
  • CoroutineScope 인터페이스의 확장함수

 

별개의 코루틴을 만들어 작업을 실행할 때는 주의할 점이 있다. 아래 예제를 살펴보면 "launch2"가 출력되기도 전에 프로그램이 종료된 것을 확인할 수 있다. launch는 코루틴을 별개로 실행하므로 작업이 병렬적으로 실행된다. launch 내의 작업을 끝마치고 싶다면 작업 중간에 프로그램이 끝나지 않도록 처리해야 한다.

suspend fun main() {
    GlobalScope.launch {
        println("launch1")
    }
    GlobalScope.launch {
        delay(1000L)
        println("launch2")
    }
    println("main method")
}

 

중간에 프로그램이 끝나지 않기 위해서는 main() 내에 delay를 추가한다던가, 하나의 스코프로 감싸는 등 여러 방법이 존재한다. (스코프는 scope, 범위라는 뜻이다. 지금은 코루틴이 동작하는 범위 정도로 이해하고 넘어가고, 이후에 더 자세히 학습해 보자.)

// coroutineScope를 이용해 하나의 스코프 내에서 동작하도록 만들었다.
suspend fun main() = coroutineScope {
    launch {
        println("launch1")
    }
    launch {
        delay(1000L)
        println("launch2")
    }
    println("main method")
}

 

 

🔸 async

  • launch와 유사한 특성을 가짐
  • 반환값이 Deferred<T>이며, await()을 통해 값 수신이 가능
suspend fun main() = coroutineScope {
    val value1 = async {
        delay(1000L)
        "A"
    }
    val value2 = async {
        delay(500L)
        "B"
    }
    val value3 = async {
        delay(700L)
        "C"
    }
    println(value1.await())
    println(value2.await())
    println(value3.await())
}

 

여러 작업의 결과를 수신해야 하는 경우, awaitAll()을 이용하면 코루틴이 비동기 작업들을 더 효과적으로 수행시킨다. 위의 예제 같은 경우에는 value1의 결과를 수신할 때까지 기다렸다가 출력, value2의 결과를 수신할 때까지 기다렸다가 출력, value3의 결과를 수신할 때까지 기다렸다가 출력 순차적으로 진행된다. awaitAll()을 이용하면 작업이 병렬로 실행되며, 결괏값을 조회할 때의 순서는 보장된다. 또한 여러 작업 중 하나라도 예외가 발생하면 다른 작업들을 취소 처리한다.

suspend fun main() = coroutineScope {
    val value1 = async {
        delay(1000L)
        "A"
    }
    val value2 = async {
        delay(500L)
        "B"
    }
    val value3 = async {
        delay(700L)
        "C"
    }
    awaitAll(value1, value2, value3).forEach { println(it) }
}

 

 

🔸 runBlocking

  • launch, async와는 달리 스레드 블로킹이 필요할 때 사용
  • 새 코루틴을 생성하고 실행 및 완료할 때까지 스레드를 블로킹함
suspend fun main() {
    println(Thread.currentThread())

    runBlocking {
        delay(1000L)
        println(Thread.currentThread())
        println(coroutineContext)
    }

    runBlocking {
        delay(1000L)
        println(Thread.currentThread())
        println(coroutineContext)
    }
}

 

 


🌊 Job

 

모든 코루틴 빌더는 Job을 반환한다. async가 반환하는 Deferred 역시 Job을 구현한 클래스이다. 이 Job은 코루틴의 생명주기를 관리하는데 필요하다.

 

🔹 Job은 작업의 현재 상태를 확인할 수 있다.

Job 인터페이스를 살펴보면 아래와 같은 메서드를 확인할 수 있다.

  • isActive: 이 작업이 활성 상태인가?
  • isCompleted: 이 작업이 완료된 상태인가?
  • isCancelled: 이 작업이 취소된 상태인가?

 

위 세 가지 메서드를 이용해 현재 작업이 어떤 상태인지 확인할 수 있다. 예를 들어 Job이 막 생성되었다면 New 상태에 있다고 표현한다. 이때 각 메서드를 호출하면 isActive = false, isCompleted = false, isCancelled = false가 반환된다. 

                                          wait children
    +-----+ start  +--------+ complete   +-------------+  finish  +-----------+
    | New | -----> | Active | ---------> | Completing  | -------> | Completed |
    +-----+        +--------+            +-------------+          +-----------+
                     |  cancel / fail       |
                     |     +----------------+
                     |     |
                     V     V
                 +------------+                           finish  +-----------+
                 | Cancelling | --------------------------------> | Cancelled |
                 +------------+                                   +-----------+
// 출처: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-job/
State isActive isCompleted isCancelled
New false false false
Active true false false
Completing true false false
Completed false true false
Cancelling false false true
Cancelled false true true

 

 

🔹 Job은 서로를 참조할 수 있는 부모-자식 관계를 가졌다.

모든 코루틴은 자신만의 Job을 생성한다. 인자로 주어졌거나 부모 코루틴으로부터 잡을 받아오면 이는 새로 생성되는 잡의 부모로 사용된다. 코드를 통해 확인해보자.

fun main(): Unit = runBlocking {
    val parentJob = coroutineContext[Job] ?: error("no Job in coroutineContext")

    launch {
        println("childJob -> ${coroutineContext[Job]}")
        // 부모 Job은 모든 자식 Job들을 참조할 수 있는 children라는 프로퍼티를 가진다.
        println("find childJob from parentJob -> ${parentJob.children.first()}")
    }
}

 

참고로 인자로 새로운 Job을 넘기는 경우, 구조적 동시성이 깨질 수 있다. (구조적 동시성이 뭔지 모른다면 '왜 코루틴을 써야할까?' 글에 간략히 설명해 두었으니 참고하길 바란다.)

suspend fun main(): Unit = runBlocking {
    println("main start")

    val newJob = Job()
    launch(newJob) {
        println("start launch")
        throw RuntimeException("error")
    }

    delay(1000L)
    println("main end")
}

인자로 newJob을 넘긴 경우
인자로 newJob을 넘기지 않은 경우

 

🔹 Job을 이용하면 취소 처리가 가능하다.

Job의 부모-자식 관계를 이용하면 부모 잡에서 모든 자식이 작업을 완료할 때까지 기다리게 만들 수 있다. (부모 잡의 children 프로퍼티를 이용하면 모든 자식을 참조할 수 있다.) 이런 점들을 활용해 코루틴 스코프 내에서 취소나 예외 처리의 구현이 가능하게 된다.

 

cancel 메서드를 활용하면 작업을 취소시킬 수 있다. 잡이 취소되면 상태가 'Cancelling'으로 바뀌고, 중단점에서 CancellationException 예외를 던진다. 또한 자신의 자식 잡도 취소시키지만, 부모 잡에는 영향 주지 않는다. 무사히 취소가 완료된다면 잡의 상태가 'Cancelled'로 바뀐다.

suspend fun main(): Unit = coroutineScope {
    val job = launch {
        println("start launch!")
        delay(10000L)
        println("end launch!")
    }

    job.cancel()
    job.join() // 취소 과정이 완료하기를 기다리며 호출하였다. job.cancelAndJoin()을 활용할 수도 있다.
    println(
        """Cancelled Successfully
        isActive -> ${job.isActive}
        isCancelled -> ${job.isCancelled}
        isCompleted -> ${job.isCompleted}
        """.trimMargin()
    )
}

 

참고로, 코루틴은 CancellationException이 발생해야 정상적으로 취소된 것으로 간주하고, 다른 예외가 발생하면 작업이 실패한 것으로 인지한다. 작업이 실패하면 부모에도 영향이 가는 등 의도하지 않은 동작이 실행될 수 있다. 예제 코드에서 중단점인 delay()에서 CancellationException 대신 다른 예외를 던져보자.

suspend fun main(): Unit = coroutineScope {
    val job = launch {
        println("start launch!")
        try {
            delay(10000L)
        } catch (e: CancellationException) {
            println("catch CancellationException")
            throw RuntimeException()
        }
        println("end launch!")
    }

    job.cancel()
    job.join() // 취소 과정이 완료하기를 기다리며 호출하였다. job.cancelAndJoin()을 활용할 수도 있다.
    println(
        """Cancelled Successfully
        isActive -> ${job.isActive}
        isCancelled -> ${job.isCancelled}
        isCompleted -> ${job.isCompleted}
        """.trimMargin()
    )
}

launch에서 예외가 발생했는데 부모(main메서드)까지 영향을 준다.

 

 

 


🔔 코루틴 컨텍스트

~ 이 아래의 예제 코드들은 org.jetbrains.kotlinx:kotlinx-coroutines-test 의존성을 추가하여 작성하였다. ~

 

 

코루틴 내부 코드를 보다 보면 CoroutineContext를 심심치 않게 볼 수 있다. 코루틴 빌더에서 첫 번째 인자로 넣기도 하고, CoroutineScope의 리시버 인자로 주어지기도 하고, Continuation의 필드로 사용하기도 한다. 대체 CoroutineContext가 뭘까?

 

🔸 CoroutineContext

CoroutineContext는 코루틴의 데이터를 저장하고 전달하는 방법으로 쓰인다. 기본적으로 컬렉션 Map과 비슷한 개념을 가졌다. CoroutineContext는 원소나 원소들의 집합을 나타내는 인터페이스인데, Job, CoroutineName 등과 같은 여러 Element 객체들이 인덱싱 된 집합이다. 모든 원소는 식별 가능한 유일 Key를 가졌으며, get() 메서드나 []을 이용해 조회할 수 있다. 

 

아래와 같은 원소들을 가질 수 있는데, 모두 CoroutineContext.Element를 구현하고 있다. 

  • CoroutineName: 코루틴의 이름
  • CoroutineDispatcher: 코루틴의 실행 환경 결정
  • Job: 코루틴의 생명주기 관리
  • CoroutineExceptionHandler: 코루틴 예외 발생 시 처리 방식 정의
@OptIn(ExperimentalStdlibApi::class)
@Test
fun CoroutineContext_구성요소() = runTest {
    val context = CoroutineName("myCoroutineName") + Dispatchers.IO + Job()
    println("CoroutineName -> ${context[CoroutineName.Key]}")
    println("CoroutineDispatcher -> ${context[CoroutineDispatcher.Key]}")
    println("CoroutineExceptionHandler -> ${context[CoroutineExceptionHandler.Key]}")
    println("Job -> ${context[Job.Key]}")
}

 

 

🔸 CoroutineContext의 원소 - CoroutineDispatcher

영단어 디스패처는 사람이나 차량 등을 필요한 곳에 보내는 것을 담당하는 사람이라는 뜻이다. 코루틴에서의 디스패처는 코루틴이 어떤 환경에서 실행될지를 결정한다.

dispatcher의 뜻

 

디스패처 특징
Dispatchers.Default - 기본적으로 설정되는 디스패처
- CPU 집약적인 연산 수행에 적절 (CPU 집약적인 연산: 대용량 데이터 정렬, 알고리즘 실행 등)
Dispatchers.IO - I/O 연산 수행에 적절 (I/O 연산: 파일 시스템 접근, 외부 API 호출 같은 네트워크 통신 등)
Dispatchers.Unconfined - 스레드를 바꾸지 않는 디스패처
- 일반적으로 코드에서 사용하면 안됨 (단위 테스트 등에서 연산의 순서를 쉽게 통제할 수 있으나 runTest를 이용하면 되므로 반드시 사용할 필요는 없다.)

 

java.util.concurrent.Executor는 asCoroutineDispatcher 확장함수를 통해 CoroutineDispatcher로 변환할 수 있다. 이를 이용해 정해진 수의 스레드 풀/스레드를 가진 디스패처를 만들 수 있지만, 이 경우 close 함수를 명시적으로 호출하여 메모리 누수를 방지해야 한다.

 

 

🔸 CoroutineContext의 원소 - CoroutineExceptionHandler

CoroutineExceptionHandler를 이용하면 예외 전파를 중단시키지는 않지만 예외가 발생하면 해야 할 일을 정의할 수 있다.

@Test
fun 에러_출력() = runTest {
    println("test start")

    val handler = CoroutineExceptionHandler { _, throwable ->
        println("start CoroutineExceptionHandler")
        println("에러 출력: $throwable")
        println("end CoroutineExceptionHandler")
    }

    val scope = CoroutineScope(SupervisorJob() + handler)
    scope.launch(handler) {
        println("start launch")
        throw Error("에러 발생")
    }

    delay(1000L)
    println("test end")
}

 

🔸 CoroutineContext는 + 연산이 가능하다.

여러 개의 CoroutineContext를 + 연산을 통해 합칠 수 있다. 여기서도 Map과 유사한 점이 존재하는데, 동일한 Key를 가진다면 새로운 원소가 기존 원소를 대체한다.

@Test
fun context_덮어씌우기() = runTest {
    println(Thread.currentThread().name)

    val context = newSingleThreadContext("myThreadContext") + CoroutineName("myCoroutineName")

    val newContext1 = CoroutineName("beforeCoroutineName") + context
    launch(context = newContext1) {
        println(Thread.currentThread().name)
    }

    val newContext2 = context + CoroutineName("afterCoroutineName")
    launch(context = newContext2) {
        println(Thread.currentThread().name)
    }
}

 

🔸 코루틴 빌더에서는 CoroutineContext를 인자로 넘길 수 있다.

CoroutineContext를 사용한 예로, 코루틴 빌더에서는 CoroutineContext를 인자로 넘길 수 있다.

@Test
fun launch_인자에_context_넘기기() = runTest {
    println(Thread.currentThread().name)

    val context = newSingleThreadContext("myThreadContext") + CoroutineName("myCoroutineName")
    launch(context = context) {
        println(Thread.currentThread().name)
    }
}

 

코루틴 빌더에 명시적으로 CoroutineContext를 넘기지 않는 경우 부모에서 자식으로 컨텍스트가 전달된다. 아래와 같은 식으로 어떤 데이터를 가졌는지 예측할 수 있다. (CoroutineContext를 합칠 때, 동일한 Key를 가진다면 새로운 원소가 기존 원소를 대체한다는 점을 기억하자.)

defaultContext + parentContext + childContext

 

 


참고

반응형