[#3] 컨슈머를 제외한 나머지 애플리케이션 코드 마이그레이션 #4
@ -0,0 +1,70 @@
|
||||
package com.sangdol.consumer.infrastructure.kafka.producer;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||
import org.springframework.retry.annotation.Backoff;
|
||||
import org.springframework.retry.annotation.Recover;
|
||||
import org.springframework.retry.annotation.Retryable;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.sangdol.consumer.infrastructure.kafka.common.KafkaProperties;
|
||||
import com.sangdol.consumer.infrastructure.kafka.common.serialize.JsonObjectMapper;
|
||||
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
@Slf4j
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
public class DLTProducer {
|
||||
|
||||
private final KafkaProducer<String, String> dltProducer;
|
||||
private final KafkaProperties.DeadLetterTopic deadLetterTopic;
|
||||
private final JsonObjectMapper jsonObjectMapper;
|
||||
|
||||
// 실제 환경에서는 DB, 파일 시스템 등 외부에 저장 필요
|
||||
private final List<FailedDltMessage> failed = new ArrayList<>();
|
||||
|
||||
@Retryable(
|
||||
retryFor = {Exception.class},
|
||||
maxAttempts = 5,
|
||||
backoff = @Backoff(delay = 500, multiplier = 2),
|
||||
listeners = {"dltRetryListener"}
|
||||
)
|
||||
public void sendToDeadLetterTopic(String key, Object value, Exception exception) throws Exception {
|
||||
String valueJson = jsonObjectMapper.serialize(value);
|
||||
String message = jsonObjectMapper.serialize(new DltMessage(exception.getMessage(), valueJson));
|
||||
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(deadLetterTopic.name(), key, message);
|
||||
|
||||
dltProducer.send(producerRecord).get();
|
||||
log.info("[DLTProducer] Succeed to send. key : {}, value: {}", key, valueJson);
|
||||
}
|
||||
|
||||
@Recover
|
||||
public void recover(Exception last, String key, Object value, Exception origin) {
|
||||
log.error("[DLTProducer] Failed to produce DLT message for key: {}, value: {} cause: {}", key, value, last.getMessage());
|
||||
DltMessage dltMessage = new DltMessage(origin.getMessage(), jsonObjectMapper.serialize(value));
|
||||
failed.add(new FailedDltMessage(key, dltMessage));
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
List<FailedDltMessage> getFailed() {
|
||||
return failed;
|
||||
}
|
||||
|
||||
record DltMessage(
|
||||
String exception,
|
||||
String message
|
||||
) {
|
||||
}
|
||||
|
||||
record FailedDltMessage(
|
||||
String key,
|
||||
DltMessage message
|
||||
) {
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,26 @@
|
||||
package com.sangdol.consumer.infrastructure.kafka.producer;
|
||||
|
||||
import org.springframework.retry.RetryCallback;
|
||||
import org.springframework.retry.RetryContext;
|
||||
import org.springframework.retry.RetryListener;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
@Slf4j
|
||||
@Component("dltRetryListener")
|
||||
public class DLTRetryListener implements RetryListener {
|
||||
|
||||
@Override
|
||||
public <T, E extends Throwable> void onError(
|
||||
RetryContext context,
|
||||
RetryCallback<T, E> callback,
|
||||
Throwable throwable
|
||||
) {
|
||||
int retryCount = context.getRetryCount();
|
||||
log.warn("[DLTProducer] Retry to send DLT message. Retry count: {}, Cause: {}", retryCount,
|
||||
throwable.getMessage());
|
||||
|
||||
RetryListener.super.onError(context, callback, throwable);
|
||||
}
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user