Develop/Reactor+Coroutines

[Coroutines] 코루틴과 공유 상태

연로그 2024. 7. 31. 23:43
반응형

코루틴 스터디를 하며 정리 중인 시리즈 글

  1. 왜 코루틴을 써야할까?
  2. [Coroutines] 코루틴 빌더, 코루틴 컨텍스트
  3. [Coroutines] 구조화된 동시성
  4. [Coroutines] 코루틴 스코프 함수
  5. [Coroutines] 디스패처
  6. [Coroutines] 코루틴과 공유상태

 


코루틴

 

코루틴의 이름에는 아래와 같은 뜻이 담겨있다.

  • co: 함께, 협력하는
  • routine: 특정한 일을 처리하기 위한 일련의 명령
  • coroutine: 함께 실행되는 루틴. 서로 간에 스레드 사용을 양보하며 함께 실행

co, routine의 뜻

 

코루틴은 항상 하나의 스레드 위에서 동작하지는 않는다. 코루틴 디스패처와 스레드풀의 연관 관계는 '[Coroutine] 디스패처' 포스트를 참고하길 바란다. 코루틴은 일시 중단이 일어나면 다른 코루틴에게 스레드를 양보한다는 점을 기억하자.

 

 


코루틴의 스레드 양보

 

코루틴을 스레드에 할당시켜 실행되도록 만드는 주체는 CoroutineDispatcher이다. 하지만 스레드를 양보하게 만드는 주체는 CoroutineDispatcher가 아니라 코루틴이다. 코루틴이 스레드를 양보할 때는 delay, join, await, yield 같은 양보를 위한 함수를 직접 호출해야 한다.

 

아래 간단한 예제를 작성해 보았다. 아래 코드에서 어떤 순서로 출력될지 예상해 보자.

suspend fun main() = runBlocking {
    val job = launch {
        println("start launch ${Thread.currentThread().name}")
        delay(1000)
        println("end launch  ${Thread.currentThread().name}")
    }
    println("before join  ${Thread.currentThread().name}")
    job.join()
    println("after join  ${Thread.currentThread().name}")
}

 

 

비동기로 실행하니까 start launch가 before join보다 먼저 실행되는 경우가 있지 않을까? 예상과는 달리 몇 번을 실행해도 결과는 같다. 그 이유는 스레드 양보가 일어나는 시점 때문이다. 위 코드는 runBlocking을 이용해 단일 스레드 환경에서 동작시키게 만들었다. 코루틴이 스레드를 양보할 때는 delay, join 등의 함수를 직접 호출해야 한다고 했다. 양보 가능한 함수가 호출될 때(job.join())까지 스레드 양보가 일어나지 못하기 때문에 before join이 항상 제일 먼저 출력되었다.

 

 


공유 상태로 인한 문제

 

멀티 스레드 환경에서 스레드 간에 데이터를 전달하거나 공유된 자원을 사용할 때 가변 변수를 사용하고는 한다. 가변 변수에 동시에 접근하는 경우 여러 가지 문제가 생길 수 있는데, 코루틴도 보통 멀티 스레드 환경에서 사용되기 때문에 같은 문제가 발생할 수 있다. 발생 가능한 문제는 크게 두 가지로 나눌 수 있다.

  • 메모리 가시성: CPU 캐시와 메인 메모리 간의 데이터 불일치 문제
  • 경쟁 상태: 서로 다른 스레드가 동시에 값을 읽고 변경하다 연산이 유실되는 문제

 

이를 해결하기 위한 방법으로는 여러 가지가 있다.

 

1. @Volatile 사용

@Volatile을 공유 변수에 달면 java의 volatile 키워드와 유사한 방식으로 동작한다. (관련 포스팅: [Java] volatile 키워드 (feat. Atomic & Concurrent)) 하지만 이 방법으로는 메모리 가시성 문제만 해결할 수 있고, 경쟁 상태 문제가 해결되지는 않는다.

@Volatile
var value = 0

 

2. 원자성 있는 데이터 구조 사용

AtomicXXX 같은 클래스를 사용할 수 있다. (AtomicXXX 클래스에 대한 설명은 [Java] 동시성 이슈 해결하기 포스팅으로 대체한다.) 다만, 이미 다른 스레드가 AtomicXXX의 연산을 실행 중이면, 연산이 완료될 때까지 스레드를 블로킹시킨다. 코루틴이 스레드를 블로킹하는 상황을 지양해야 한다는 점을 기억하자.

var value = AtomicInteger(0)

suspend fun main() = coroutineScope {
    repeat(100000) {
        launch { value.getAndAdd(1) }
    }
    delay(100) // 연산이 완료되기를 대기하기 위해 추가
    println(value)
}

 

3. 싱글 스레드로 제한된 디스패처 생성

newSingleThreadContext(), limitedParallelism(1)을 통해 단일 스레드를 가진 디스패처를 생성할 수 있다. 공유 변수 변경 시 이 단일 스레드 디스패처를 사용하도록 로직에 적용하는 방법이 있다. 이 로직에 적용하는 방식에 따라 코스 그레인드 스레드 한정, 파인 그레인드 스레드 한정으로 나뉜다.

 

코스 그레인드 스레드 한정

  • coarse-grained thread confinement
  • 디스패처를 싱글 스레드로 제한한 withContext로 전체 함수를 래핑
  • 함수 전체에서 멀티스레딩의 이점을 누리지 못함
var value = 0
val singleDispatcher = newSingleThreadContext("singleThread")

suspend fun main() = coroutineScope {
    withContext(singleDispatcher) {
        repeat(100000) {
            launch {
                value++
            }
        }
    }
    delay(100) // 연산이 완료되기를 대기하기 위해 추가
    println(value)
}

 

파인 그레인드 스레드 한정

  • fine-grained thread confinement
  • 상태를 변경하는 구문들만 래핑
var value = 0
val singleDispatcher = newSingleThreadContext("singleThread")

suspend fun main() = coroutineScope {
    repeat(100000) {
        withContext(singleDispatcher) {
            value++
        }
    }
    delay(100) // 연산이 완료되기를 대기하기 위해 추가
    println(value)
}

 

4. Mutex 사용

공유 변수를 변경하는 부분을 임계 영역(Critical Section)으로 만들어 동시 접근이 불가능하도록 제한한다. Mutex 객체의 lock(), unlock() 메서드를 통해 임계 영역을 만들 수 있다. Mutex는 코루틴을 중단시키므로 스레드를 블로킹할 때보다 훨씬 리소스가 적게 든다. 참고로 unlock() 호출이 되어야 락을 해제할 수 있으므로 unlock() 호출을 잊으면 절대 안 된다.

var value = 0

suspend fun main() = coroutineScope {
    val mutex = Mutex()
    repeat(100000) {
        launch {
            mutex.lock()
            value++
            mutex.unlock()
        }
    }
    delay(100) // 연산이 완료되기를 대기하기 위해 추가
    println(value)
}

 

Mutex의 withLock 함수는 unlock 호출을 깜빡하는 실수를 방지하면서도 간편하게 사용할 수 있다.

var value = 0

suspend fun main() = coroutineScope {
    val mutex = Mutex()
    repeat(100000) {
        launch {
            mutex.withLock {
                value++
            }
        }
    }
    delay(100) // 연산이 완료되기를 대기하기 위해 추가
    println(value)
}

 

Mutex의 주의점은 코루틴이 중단되었을 때 뮤텍스를 풀 수 없다는 점이다. delay 함수를 이용한 예제 코드로 해당 현상을 확인할 수 있다.

suspend fun main() {
    val mutex = Mutex()

    val timeMillis = measureTimeMillis {
        coroutineScope {
            repeat(5) {
                launch {
                    mutex.withLock {
                        println("index: $it, thread: ${Thread.currentThread().name}")
                        delay(1000)
                        println("index: $it, thread: ${Thread.currentThread().name}")
                    }
                }
            }
        }
    }

    println(timeMillis)
}

 

동일한 뮤텍스를 이용해 중첩으로 감쌀 때도 문제가 된다. 아래 예제 코드를 실행시키면 데드락이 걸려 프로그램이 종료되지 않는다. 

suspend fun main() = coroutineScope {
    val mutex = Mutex()

    mutex.withLock {
        mutex.withLock { println("Hello World") }
    }
}

 

 

📌 ReentranLock vs Mutex

Mutex와 유사한 ReentranLock라는 클래스가 있다. 코루틴은 Mutex를 더 권장하고 있는데 이는 두 클래스의 동작 방식에서 차이가 나기 때문이다. ReentrantLock은 lock() 호출 시 이미 다른 스레드에서 락을 획득한 상태라면, 해당 락이 해제될 때까지 스레드를 블로킹하고 기다린다.

 


참고

반응형