diff --git a/src/main/java/com/sangdol/consumer/infrastructure/kafka/producer/DLTProducer.java b/src/main/java/com/sangdol/consumer/infrastructure/kafka/producer/DLTProducer.java new file mode 100644 index 0000000..59edd21 --- /dev/null +++ b/src/main/java/com/sangdol/consumer/infrastructure/kafka/producer/DLTProducer.java @@ -0,0 +1,70 @@ +package com.sangdol.consumer.infrastructure.kafka.producer; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.springframework.retry.annotation.Backoff; +import org.springframework.retry.annotation.Recover; +import org.springframework.retry.annotation.Retryable; +import org.springframework.stereotype.Component; + +import com.google.common.annotations.VisibleForTesting; +import com.sangdol.consumer.infrastructure.kafka.common.KafkaProperties; +import com.sangdol.consumer.infrastructure.kafka.common.serialize.JsonObjectMapper; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Component +@RequiredArgsConstructor +public class DLTProducer { + + private final KafkaProducer dltProducer; + private final KafkaProperties.DeadLetterTopic deadLetterTopic; + private final JsonObjectMapper jsonObjectMapper; + + // 실제 환경에서는 DB, 파일 시스템 등 외부에 저장 필요 + private final List failed = new ArrayList<>(); + + @Retryable( + retryFor = {Exception.class}, + maxAttempts = 5, + backoff = @Backoff(delay = 500, multiplier = 2), + listeners = {"dltRetryListener"} + ) + public void sendToDeadLetterTopic(String key, Object value, Exception exception) throws Exception { + String valueJson = jsonObjectMapper.serialize(value); + String message = jsonObjectMapper.serialize(new DltMessage(exception.getMessage(), valueJson)); + ProducerRecord producerRecord = new ProducerRecord<>(deadLetterTopic.name(), key, message); + + dltProducer.send(producerRecord).get(); + log.info("[DLTProducer] Succeed to send. key : {}, value: {}", key, valueJson); + } + + @Recover + public void recover(Exception last, String key, Object value, Exception origin) { + log.error("[DLTProducer] Failed to produce DLT message for key: {}, value: {} cause: {}", key, value, last.getMessage()); + DltMessage dltMessage = new DltMessage(origin.getMessage(), jsonObjectMapper.serialize(value)); + failed.add(new FailedDltMessage(key, dltMessage)); + } + + @VisibleForTesting + List getFailed() { + return failed; + } + + record DltMessage( + String exception, + String message + ) { + } + + record FailedDltMessage( + String key, + DltMessage message + ) { + } +} diff --git a/src/main/java/com/sangdol/consumer/infrastructure/kafka/producer/DLTRetryListener.java b/src/main/java/com/sangdol/consumer/infrastructure/kafka/producer/DLTRetryListener.java new file mode 100644 index 0000000..b10a543 --- /dev/null +++ b/src/main/java/com/sangdol/consumer/infrastructure/kafka/producer/DLTRetryListener.java @@ -0,0 +1,26 @@ +package com.sangdol.consumer.infrastructure.kafka.producer; + +import org.springframework.retry.RetryCallback; +import org.springframework.retry.RetryContext; +import org.springframework.retry.RetryListener; +import org.springframework.stereotype.Component; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Component("dltRetryListener") +public class DLTRetryListener implements RetryListener { + + @Override + public void onError( + RetryContext context, + RetryCallback callback, + Throwable throwable + ) { + int retryCount = context.getRetryCount(); + log.warn("[DLTProducer] Retry to send DLT message. Retry count: {}, Cause: {}", retryCount, + throwable.getMessage()); + + RetryListener.super.onError(context, callback, throwable); + } +}