spring-cloud-stream(4) Error Handling, Retry, DLQ (with RabbitMQ)

Error Handling

  • spring-cloud-stream 에서 에러를 처리하기 위한 메커니즘을 제공한다. 하지만 각 binder 별로 세부적인 기능 차이가 있기 때문에 binder 가 변경된다면, 에러 처리 부분은 확인이 필요하다. 여기서는 RabbitMQ 를 기준으로 정리해보려고 한다.
    • Handler(Function) 이 예외를 던지면, binder 로 예외를 전파한다. 그러면 binder 는 messaging system 으로 에러를 전달한다. 세부적인 기능은 messaging system 의 기능에 따라 다르지만 메시지를 drop, re-queue, DLQ 로 전송할 수 있다. Rabbit 과 Kafka 같은 binder 는 이 개념들을 모두 지원한다. 하지만 다른 binder 의 경우 지원하지 않을 수도 있다. 그래서 error-handling 옵션은 각각의 binder documentation 을 확인해야 한다.
  • spring cloud stream 은 Annotation Handler 과 명령형 Function, Reactive Function 방법이 있다. 명령형 Function 과 Handler 는 Spring Retry 를 사용하고, Reactive Function 은 reactive API 의 retryBackoff 를 사용한다.

Drop Failed Messages

  • 추가적인 system-level 의 설정을 추가하지 않으면 messaging system 은 실패한 메시지를 drop 한다. 메시지가 유실되지 않으려면 복구 메커니즘이 필요!

DLQ - Dead Letter Queue

  • Failed Messages 를 특정한 destination 으로 다시 전송하는 것.

  • DLQ 의 메시지를 재처리, 모니터링, 보정(수정)

  • RabbitMQ 에서 DLQ 사용 방법

    spring:
      cloud:
        stream:
          bindings:
            message-in-0:
              binder: rabbit
              destination: message
              group: messageConsumers
          rabbit:
            bindings:
              message-in-0:
                consumer:
                  auto-bind-dlq: true
    
    • auto-bind-dlq 를 true 로 설정하면 message.messageConsumers.dlq 로 RabbitMQ queue 가 추가된다. DLQ 설정이 완료되면, 실패한 메시지는 기존 데이터와 함께 오류 메시지를 포함하여 DLQ 로 전송된다.

      • Message 의 x-exception-stacktrace Header 에 error message 가 있다.

        delivery_mode:	2
        headers:
        x-original-exchange:
        x-exception-message:	has an error
        x-original-routingKey:	input.myGroup
        x-exception-stacktrace:	org.springframework.messaging.MessageHandlingException: nested exception is
              org.springframework.messaging.MessagingException: has an error, failedMessage=GenericMessage [payload=byte[15],
              headers={amqp_receivedDeliveryMode=NON_PERSISTENT, amqp_receivedRoutingKey=input.hello, amqp_deliveryTag=1,
              deliveryAttempt=3, amqp_consumerQueue=input.hello, amqp_redelivered=false, id=a15231e6-3f80-677b-5ad7-d4b1e61e486e,
              amqp_consumerTag=amq.ctag-skBFapilvtZhDsn0k3ZmQg, contentType=application/json, timestamp=1522327846136}]
              at org.spring...integ...han...MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:107)
              at. . . . .
        Payload {"name”:"Bob"}
        

Re-queue Failed Messages

  • 레빗이나 카프카 등.. 메시지 재처리를 위해 내부적으로 RetryTemplate 을 이용한다.

  • 디폴트 설정을 사용하면 최대 3 번 requeue 한다. max-attempts 를 넘은 메시지는 Drop.

  • max-attempt property 를 1 로 설정하면, 내부적인 재처리 로직은 disable 되면서 requeue-rejected property 도 true 로 된다. 1이 아닌 수를 넣게되면 최대 n 번 만큼 재처리하는 loop 를 돌게 된다.

    • 두 가지 properties
    spring:
      cloud:
        stream:
          bindings:
            message-in-0:
              consumer:
                max-attempts: 1
          rabbit:
            bindings:
              message-in-0:
                consumer:
                  requeue-rejected: true
    

Retry Template and retryBackoff

  • retry configuration
  • 두 가지 다른 메커니즘
    • imperative → RetryTemplate → Spring Retry
    • reactive → retryBackoff
maxAttempts
  • 메시지 재시도 횟수
  • Default : 3
backOffInitialInterval
  • retry 간격
  • Default : 1000ms
    • 너무 짧은 간격으로 재시도하는 것도 상황에 따라서 좋지 않을 수 있을 것 같다. 예를들어 디비나 내부 호출하는 서버간 네트워크 지연으로 메시지 처리가 실패하였다면, 트래픽이 너무 몰리지 않도록 backOffIntiailInterval 을 통해 어느정도 처리를 지연시키는 것도 좋을 것 같다.
backOffMaxInterval
  • backOffInitialInterval 옵션의 최대 값
  • Default : 10000ms
backOffMultiplier
  • 재시도 마다 interval * multiplier 만큼 재시도 시간 간격이 증가
  • Default : 2.0
defaultRetryable
  • retryableExceptions 에 정의되지 않은 exception 이 listener 에서 발생하면 retry 할지 결정하는 옵션
  • Default : true
retryableExceptions
  • 특정 exception (and subclasses) 에 대한 retry 여부를 결정
    • 비즈니스 로직에 따라 메시지 처리를 drop 하기 위한 exception 을 정의할 때 사용하면 좋을 것 같다.
  • map 으로 정의
    • key - Throwable class name
    • value - boolean
  • spring.cloud.stream.bindings.input.consumer.retryable-exceptions.java.lang.IllegalStateException=false
  • Default : empty

Custom Retry

  • 몇 가지 Properties 를 제공하지만, 복잡한 요구사항을 구현하기 위해 커스텀이 필요한 경우 RetryTemplate 을 직접 구현하여 제공하는 방법도 있다. Application configuration 에서 RetryTemplate 을 bean 으로 설정하면 된다.

  • 기존의 RetryTemplate 과 conflict 를 피하기 위해 binder 에 사용하려는 RetryTemplate 에 @Bean 이 아닌 @StreamRetryTemplate 을 붙여야 한다. (내부적으로 qualified @Bean)

    @StreamRetryTemplate
    public RetryTemplate myRetryTemplate() {
    	return new RetryTemplate();
    }
    

    image-20200927113053754

  • binding 별로 세부적으로 Retry 정책을 달리하고 싶다면, property 를 설정할 수도 있다.

    • spring.cloud.stream.bindings.<foo>.consumer.retry-template-name=<your-retry-template-bean-name>

참고