From 1f60a0ff33385a76271c65c58e58522d5028ad5f Mon Sep 17 00:00:00 2001 From: pricelees Date: Fri, 27 Jun 2025 14:11:16 +0900 Subject: [PATCH] =?UTF-8?q?feat:=20Dead=20Letter=20Topic=20=ED=94=84?= =?UTF-8?q?=EB=A1=9C=EB=93=80=EC=84=9C=20=EA=B8=B0=EB=8A=A5=20=EA=B5=AC?= =?UTF-8?q?=ED=98=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 최대 5회 재시도. 재시도 실패시 인메모리 컬렉션에 저장 --- .../kafka/producer/DLTProducer.java | 70 +++++++++++++++++++ .../kafka/producer/DLTRetryListener.java | 26 +++++++ 2 files changed, 96 insertions(+) create mode 100644 src/main/java/com/sangdol/consumer/infrastructure/kafka/producer/DLTProducer.java create mode 100644 src/main/java/com/sangdol/consumer/infrastructure/kafka/producer/DLTRetryListener.java diff --git a/src/main/java/com/sangdol/consumer/infrastructure/kafka/producer/DLTProducer.java b/src/main/java/com/sangdol/consumer/infrastructure/kafka/producer/DLTProducer.java new file mode 100644 index 0000000..59edd21 --- /dev/null +++ b/src/main/java/com/sangdol/consumer/infrastructure/kafka/producer/DLTProducer.java @@ -0,0 +1,70 @@ +package com.sangdol.consumer.infrastructure.kafka.producer; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.springframework.retry.annotation.Backoff; +import org.springframework.retry.annotation.Recover; +import org.springframework.retry.annotation.Retryable; +import org.springframework.stereotype.Component; + +import com.google.common.annotations.VisibleForTesting; +import com.sangdol.consumer.infrastructure.kafka.common.KafkaProperties; +import com.sangdol.consumer.infrastructure.kafka.common.serialize.JsonObjectMapper; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Component +@RequiredArgsConstructor +public class DLTProducer { + + private final KafkaProducer dltProducer; + private final KafkaProperties.DeadLetterTopic deadLetterTopic; + private final JsonObjectMapper jsonObjectMapper; + + // 실제 환경에서는 DB, 파일 시스템 등 외부에 저장 필요 + private final List failed = new ArrayList<>(); + + @Retryable( + retryFor = {Exception.class}, + maxAttempts = 5, + backoff = @Backoff(delay = 500, multiplier = 2), + listeners = {"dltRetryListener"} + ) + public void sendToDeadLetterTopic(String key, Object value, Exception exception) throws Exception { + String valueJson = jsonObjectMapper.serialize(value); + String message = jsonObjectMapper.serialize(new DltMessage(exception.getMessage(), valueJson)); + ProducerRecord producerRecord = new ProducerRecord<>(deadLetterTopic.name(), key, message); + + dltProducer.send(producerRecord).get(); + log.info("[DLTProducer] Succeed to send. key : {}, value: {}", key, valueJson); + } + + @Recover + public void recover(Exception last, String key, Object value, Exception origin) { + log.error("[DLTProducer] Failed to produce DLT message for key: {}, value: {} cause: {}", key, value, last.getMessage()); + DltMessage dltMessage = new DltMessage(origin.getMessage(), jsonObjectMapper.serialize(value)); + failed.add(new FailedDltMessage(key, dltMessage)); + } + + @VisibleForTesting + List getFailed() { + return failed; + } + + record DltMessage( + String exception, + String message + ) { + } + + record FailedDltMessage( + String key, + DltMessage message + ) { + } +} diff --git a/src/main/java/com/sangdol/consumer/infrastructure/kafka/producer/DLTRetryListener.java b/src/main/java/com/sangdol/consumer/infrastructure/kafka/producer/DLTRetryListener.java new file mode 100644 index 0000000..b10a543 --- /dev/null +++ b/src/main/java/com/sangdol/consumer/infrastructure/kafka/producer/DLTRetryListener.java @@ -0,0 +1,26 @@ +package com.sangdol.consumer.infrastructure.kafka.producer; + +import org.springframework.retry.RetryCallback; +import org.springframework.retry.RetryContext; +import org.springframework.retry.RetryListener; +import org.springframework.stereotype.Component; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Component("dltRetryListener") +public class DLTRetryListener implements RetryListener { + + @Override + public void onError( + RetryContext context, + RetryCallback callback, + Throwable throwable + ) { + int retryCount = context.getRetryCount(); + log.warn("[DLTProducer] Retry to send DLT message. Retry count: {}, Cause: {}", retryCount, + throwable.getMessage()); + + RetryListener.super.onError(context, callback, throwable); + } +}