Develop/Reactor+Coroutine

[Reactor] 서버 느리게 만드는 API 개발하기 (feat: block)

연로그 2024. 5. 6. 18:09
반응형

1. 엉망진창 시작 🤯

 

Reactor에 대해서 'WebFlux는 비동기 프로그래밍할 때 쓴다', 'Mono는 단일 요소, Flux는 여러 요소' 이 두 가지 외에는 아무것도 모르는 상태에서 로직을 작성해야 했다. WebFlux의 기본 문법을 공부하는 중이긴 했지만, 학습 속도보다 업무 진행이 훨씬 급했다. 무작정 코드를 작성하니 어떻게든 내가 원하는 결과가 만들어졌다. 베타 서버 데이터를 이용한 테스트도 완료했다. 성공 케이스, 실패 케이스, 에러 케이스 모두 예상한대로 돌아갔다. 며칠 뒤, 퇴근했다가 갑자기 베타 서버가 이상하다는 슬랙이 온다.😭

 

 


2. 문제의 코드 😵

 

자세한 비즈니스 로직을 생략하기 위해 조금 억지스러운 예제를 가져왔다. 상품을 조회할 때, 상품을 판매하는 판매사명이 "test"이라면 조회할 수 없다. 판매사 id는 상품 데이터 내에 저장되어 있다. 만약 동기식으로 짰다면 아래와 같이 짰을 것이다.

fun getProductDetailResponse(productId: String): ProductDetailResponse? {
    val product = productRepository.findById(productId)
    val seller = sellerRepository.findById(product.sellerId)
    return if (seller.name != "test") product else null
}

 

위 코드를 WebFlux를 이용해 아래와 같이 작성하였다. block()을 이용해 동기식으로 값을 가져오는게 마음에 걸리긴 했지만, 원하는 응답값이 잘 조회되었다.

fun getProductDetailResponse(productId: String): Mono<ProductDetailResponse> =
    productRepository.findById(productId)
        .filter { isSalableSeller(it.sellerId) }

// 판매 가능한 판매사인지 판단하는 로직
private fun isSalableSeller(sellerId: Long): Mono<Boolean> =
    sellerRepository.findById(sellerId)
        .map { name != "test" }
        .block() ?: false

 

위 로직이 포함된 API를 베타 서버에 배포하고나서 이상하게 서버가 느려진 것 같았다. 로그를 확인해 보니 어떤 API들은 타임아웃을 뱉었다. 불안한 마음에 재배포도 해보고, 팀원에게 조언을 구하는 등 몇 가지 노력을 해봤지만 특별한 소득은 없었다. 베타 서버에 다른 변경사항들이 동시에 배포되기도 했고, 내 작업에는 전혀 영향이 없는 API에서도 문제가 발생하였고, 언제는 잘 됐다가 언제는 또 잘 안됐다가 등 현상 재현까지 불가능해서 결국 일시적인 현상이라고 결론지었다. 운영 배포까지 시간적 여유가 있으니, 해당 문제가 다시 발견되면 그때 가서 확인해야겠다 생각하고 넘겼다.

 

 


3. 빠르게 해결하기 😎

 

퇴근하고 누워있는데 갑자기 슬랙에서 호출되었다. 불안한 마음과 함께 확인해 보니 역시나 내 코드가 문제가 되었다. Reactor에 익숙한 분이 block() 호출하는 부분이 문제가 되는 것 같다, 일반적인 로직에서 block()은 잘 사용하지 않는다고 이야기해 주셨다.

 

block이란?

공식 문서를 직역해 보자면 Mono를 subscribe 하고 다음 신호가 올 때까지 무기한 차단한다. 값을 반환하거나, Mono가 비어있으면 null을 반환한다. Mono 에러가 발생하는 경우, 원래의 예외가 발생한다. (checked Exception인 경우 RuntimeException으로 래핑 된다.)

Subscribe to this Mono and block indefinitely until a next signal is received. Returns that value, or null if the Mono completes empty. In case the Mono errors, the original exception is thrown (wrapped in a RuntimeException if it was a checked exception).   ㅡ project reactor 문서

이미지 출처: projectReactor 문서

 

block() 부분이 문제 되는 것을 알았으니, 해당 코드를 제거하기만 하면 됐다. 처음 코드를 작성했을 때와는 달리 수정 작업을 할 때쯤에는 몇 가지 연산자의 기본 사용법 정도는 익힌 상태였다. 동기 부분을 제거하고 비동기로 처리할 수 있게끔 코드를 수정해 주었다. (filter 대신 filterWhen 활용)

fun getProductDetailResponse(productId: String): Mono<ProductDetailResponse> =
    // 상품 조회
    productRepository.findById(productId)
        .filterWhen { isSalableSeller(it.sellerId) }

// 판매 가능한 판매사인지 판단하는 로직
private fun isSalableSeller(sellerId: Long): Mono<Boolean> =
    sellerRepository.findById(sellerId)
        .map { name != "test" }
        .defaultIfEmpty(false)

 

 


4. 서버가 느려진 원인 찾기 🤔

 

문제 해결은 되었는데 block() 하나 때문에 서버가 이상해진다는 게 이해가 잘 안 됐다. WebFlux에서 동기식 코드를 작성할 바에 WebFlux를 아예 안 쓰는 게 낫다~식의 이야기를 듣기는 했지만 그래도 겨우 이거 하나 때문에 서버 전체가 이상해진다고? 그런 위험한 메서드가 존재해도 되는 건가? 그래서 이 문제를 조금 더 파고들기로 해본다. 서버가 느려진 원인이 block()이 맞는지 확인해보자.

 

block()이 원인이라는 가설이 있으니 현상 재현도 쉬웠다. 로컬에서 애플리케이션을 띄우고, 문제가 되는 API를 연속적으로 여러 번 호출하면 응답이 눈에 띄게 느려졌다. log()를 추가해 어떤 순서로 동작하고 있는지 살펴보았다.

fun getProductDetailResponse(productId: String): Mono<ProductDetailResponse> =
    productRepository.findById(productId)
        .filter { isSalableSeller(it.sellerId) }
        .log()   // 로그 추가

private fun isSalableSeller(sellerId: Long): Mono<Boolean> =
    sellerRepository.findById(sellerId)
        .map { name != "test" }
        .log()   // 로그 추가
        .block() ?: false

 

 

1. 한 번만 호출했을 때

onSubscribe -> request -> onNext -> onComplete까지 정상적으로 동작한다.

[ctor-http-nio-2] reactor.Mono.Map.1         : onSubscribe(FluxMap.MapConditionalSubscriber)
[ctor-http-nio-2] reactor.Mono.Map.1         : request(unbounded)
[xecutorLoop-3-2] reactor.Mono.PeekTerminal.2: | onSubscribe([Fuseable] MonoPeekTerminal.MonoTerminalPeekSubscriber)
[xecutorLoop-3-2] reactor.Mono.PeekTerminal.2: | request(unbounded)
[xecutorLoop-3-4] reactor.Mono.PeekTerminal.2: | onNext(true)
[xecutorLoop-3-4] reactor.Mono.PeekTerminal.2: | onComplete()
[xecutorLoop-3-2] reactor.Mono.Map.1         : onNext([ProductDetailResponse(...)])
[xecutorLoop-3-2] reactor.Mono.Map.1         : onComplete()

 

2. 여러 번 호출했을 때

처음에는 한 번만 호출했을 때처럼 정상 동작했다. 한데 어느 시점부터 응답이 느려지더니 갑자기 모두 cancel 된 것을 확인할 수 있다.

[ctor-http-nio-3] reactor.Mono.Map.3         : onSubscribe(FluxMap.MapConditionalSubscriber)
[ctor-http-nio-3] reactor.Mono.Map.3         : request(unbounded)
[xecutorLoop-3-6] reactor.Mono.PeekTerminal.4: | onSubscribe([Fuseable] MonoPeekTerminal.MonoTerminalPeekSubscriber)
[xecutorLoop-3-6] reactor.Mono.PeekTerminal.4: | request(unbounded)
[ctor-http-nio-4] reactor.Mono.Map.5         : onSubscribe(FluxMap.MapConditionalSubscriber)
[ctor-http-nio-4] reactor.Mono.Map.5         : request(unbounded)
[xecutorLoop-3-9] reactor.Mono.PeekTerminal.6: | onSubscribe([Fuseable] MonoPeekTerminal.MonoTerminalPeekSubscriber)
[xecutorLoop-3-9] reactor.Mono.PeekTerminal.6: | request(unbounded)
[ctor-http-nio-5] reactor.Mono.Map.7         : onSubscribe(FluxMap.MapConditionalSubscriber)
[ctor-http-nio-5] reactor.Mono.Map.7         : request(unbounded)
[xecutorLoop-3-4] reactor.Mono.PeekTerminal.8: | onSubscribe([Fuseable] MonoPeekTerminal.MonoTerminalPeekSubscriber)
[xecutorLoop-3-4] reactor.Mono.PeekTerminal.8: | request(unbounded)
[ctor-http-nio-6] reactor.Mono.Map.9         : onSubscribe(FluxMap.MapConditionalSubscriber)
[ctor-http-nio-6] reactor.Mono.Map.9         : request(unbounded)
[xecutorLoop-3-7] reactor.Mono.PeekTerminal.4: | onNext(true)
[xecutorLoop-3-7] reactor.Mono.PeekTerminal.4: | onComplete()
[xecutorLoop-3-6] reactor.Mono.Map.3         : onNext([ProductInformationRes(...)])
[xecutorLoop-3-6] reactor.Mono.Map.3         : onComplete()
// 생략 ...
[ctor-http-nio-4] reactor.Mono.Map.20        : cancel()
[ctor-http-nio-5] reactor.Mono.Map.21        : cancel()
[ctor-http-nio-6] reactor.Mono.Map.22        : cancel()

 

cancel()은 구독이 취소되었다는 의미다. 데이터 스트림 처리를 하다가 더 이상 아이템이 필요하지 않다고 판단되면 발생한다. 아이템이 필요하지 않은 이유는 필요한 처리가 모두 완료되었는데 아직 스트림이 활성화된 상태던가, 에러가 발생했다던가, 스레드 같은 자원이 고갈되었다 등 다양한 이유가 있다.

 

현재 상황에서는 에러 로그가 발생한 것도 아니고, 비정상적으로 API 응답 속도가 느려졌으니 자원 고갈 쪽이 의심되었다. 따라서 스레드 덤프를 확인해 보기로 한다.

📍 Thread Dump란?
- Java 프로세스의 모든 스레드 상태에 대한 스냅샷
- 애플리케이션이 느리게 동작하거나 로그만으로 분석이 어려울 때 등 이용 가능함
- 획득 방법: https://www.baeldung.com/java-thread-dump

 

스레드 덤프 전문은 보안 이슈로 생략한다. 스레드 덤프를 살펴보니 대기 상태(WAITING, TIMED_WAITING) 스레드가 많았다. 또한 아래와 같은 로그가 여러 개 있었다.

"server" #110 prio=5 os_prio=0 cpu=5.58ms elapsed=5298.47s tid=0x0000fffecd9069e0 nid=0xda5 waiting on condition [0x0000fffec19c4000]
  java.lang.Thread.State: WAITING (parking)
    at jdk.internal.misc.Unsafe.park(java.base@17.0.8/Native Method) 
    - parking to wait for <0x00000007b47001f8> (a java.util.concurrent.CountDownLatch$Sync) 
    at java.util.concurrent.locks.LockSupport.park(java.base@17.0.8/LockSupport.java:211) 
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(java.base@17.0.8/AbstractQueuedSynchronizer.java:715) 
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(java.base@17.0.8/AbstractQueuedSynchronizer.java:1047) 
    at java.util.concurrent.CountDownLatch.await(java.base@17.0.8/CountDownLatch.java:230) 
    at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:90) 
    at reactor.core.publisher.Mono.block(Mono.java:1728) 
    at org.springframework.boot.web.embedded.netty.NettyWebServer$1.run(NettyWebServer.java:220)

 

stack trace 부분을 살펴보면 스레드에서 무슨 일이 일어나고 있는지 확인할 수 있다. Mono.block 과정에서 CountDownLatch를 사용하였고, 이로 인해 스레드가 WAITING 상태가 된 것을 확인할 수 있다.

 

실제로 block() 메서드를 따라가다 보면 CountDownLatch가 호출되는 것을 확인할 수 있다.

Mono::block
BlockingSingleSubscriber::blockingGet
CountDownLatch::await

 

 


5. 이벤트 루프와 block() 🥸

 

위 과정을 통해 스레드가 block() 호출로 인해 대기 상태에 있는 것을 확인하였다. 하지만 여전히 block()이 스레드 몇 개 잡고 있다고 서버 전체가 느려지는 현상에 대해서는 이해할 수 없었다. 열심히 구글링 한 결과, 이를 이해하기 위해서는 '이벤트 루프'에 대한 이해가 필요했다.

 

현재 프로젝트에서 WebFlux와 Lettuce를 사용하고 있는데, 해당 프레임워크/라이브러리에서 Netty를 사용중이었다. 이벤트 루프는 이 Netty에서 중요하게 사용되는 개념 중 하나다.

📍 Netty란?
- 네트워크 애플리케이션을 쉽고 빠르게 개발할 수 있는 non-blocking I/O 클라이언트 서버 프레임워크
- 공식 홈페이지: https://netty.io/ 

 

💡 Netty의 이벤트 루프 관련 클래스

EventLoop

  • 직역: 등록된 Channel에 대한 모든 I/O 작업을 처리한다. 하나의 EventLoop 인스턴스는 보통 1개 이상의 Channel을 처리하지만, 이는 구현에 따라 달라질 수 있다.
  • 원문: Will handle all the I/O operations for a Channel once registered. One EventLoop instance will usually handle more than one Channel but this may depend on implementation details and internals.

 

Channel

  • 직역: 읽기, 쓰기, 연결, 바인딩과 같은 I/O 작업을 수행할 수 있는 네트워크 소켓/컴포넌트 연결점.
  • 원문: A nexus to a network socket or a component which is capable of I/O operations such as read, write, connect, and bind.

 

💡 이벤트 처리 과정

이벤트를 발행하고 수행하기까지의 과정을 굉장히 단순화했다. (설명을 생략한 과정이 많으며, 실제 동작은 훨씬 복잡하다.)

  1. 1개 이상의 Channel이 이벤트를 발행한다.
  2. 이벤트는 Queue에 쌓인다.
  3. EventLoop가 무한 루프를 돌며 Queue에서 이벤트를 가져와 처리한다.

 

 

EventLoop는 기본적으로 단일 스레드에서 실행된다. block() 호출이 발생하면 이 스레드가 블록 된다. 그러면 EventLoop가 관리하는 모든 Channel의 이벤트 처리가 지연된다. 이로 인해 서버가 느려질 수 있다. Thread Dump를 다시 살펴보았다. event loop 관련 스레드가 Mono.block()으로 인해 WAITING 상태임을 확인했다. 😬

"lettuce-eventExecutorLoop-3-4" #52 daemon prio=5 os_prio=0 cpu=397.71ms elapsed=5309.22s tid=0x0000ffff2c16e4a0 nid=0xd4d waiting on condition  [0x0000fffec67ce000]
   java.lang.Thread.State: WAITING (parking)
    at jdk.internal.misc.Unsafe.park(java.base@17.0.8/Native Method)
    - parking to wait for  <0x00000007b4800098> (a java.util.concurrent.CountDownLatch$Sync)
    // (생략)
    at reactor.core.publisher.Mono.block(Mono.java:1728)
    at <block 코드를 가진 클래스>.<block 코드를 가진 메서드명>
    // (생략)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(java.base@17.0.8/Thread.java:833)

 

🔻 EventLoop 코드를 구경하는 방법

더보기

👉 EventLoop가 동작하는 코드를 구경하고 싶다면 run() 메서드가 구현된 부분을 확인하면 된다. 비동기 환경에서는 일반적으로 NioEventLoop나 EpollEventLoop를 사용한다.

 

만약 애플리케이션이 사용 중인 EventLoop가 궁금하다면 Thread Dump를 살펴보면 된다. 예를 들어 EpollEventLoop을 사용한다면 스레드명이 "reactor-http-epoll"나 "epollEventLoop" 등 epoll event loop라는 키워드가 존재한다.

 


참고

반응형