From ef8c526fec157e21fff15082a2981d793de5b831 Mon Sep 17 00:00:00 2001 From: pricelees Date: Fri, 27 Jun 2025 13:57:15 +0900 Subject: [PATCH 1/6] =?UTF-8?q?feat:=20=ED=85=8C=EC=8A=A4=ED=8A=B8?= =?UTF-8?q?=EC=9A=A9=20=EB=A0=88=EC=BD=94=EB=93=9C=20=ED=81=B4=EB=9E=98?= =?UTF-8?q?=EC=8A=A4=EC=99=80=20=EC=A7=81=EB=A0=AC=ED=99=94,=20=EC=97=AD?= =?UTF-8?q?=EC=A7=81=EB=A0=AC=ED=99=94=20=ED=81=B4=EB=9E=98=EC=8A=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../sangdol/consumer/domain/TestRecord.java | 8 +++ .../common/serialize/JsonObjectMapper.java | 50 +++++++++++++++++++ .../serialize/TestRecordDeserializer.java | 26 ++++++++++ .../serialize/TestRecordSerializer.java | 25 ++++++++++ 4 files changed, 109 insertions(+) create mode 100644 src/main/java/com/sangdol/consumer/domain/TestRecord.java create mode 100644 src/main/java/com/sangdol/consumer/infrastructure/kafka/common/serialize/JsonObjectMapper.java create mode 100644 src/main/java/com/sangdol/consumer/infrastructure/kafka/common/serialize/TestRecordDeserializer.java create mode 100644 src/main/java/com/sangdol/consumer/infrastructure/kafka/common/serialize/TestRecordSerializer.java diff --git a/src/main/java/com/sangdol/consumer/domain/TestRecord.java b/src/main/java/com/sangdol/consumer/domain/TestRecord.java new file mode 100644 index 0000000..e08bae4 --- /dev/null +++ b/src/main/java/com/sangdol/consumer/domain/TestRecord.java @@ -0,0 +1,8 @@ +package com.sangdol.consumer.domain; + +public record TestRecord( + Integer userId, + String topic, + String timeStamp +) { +} diff --git a/src/main/java/com/sangdol/consumer/infrastructure/kafka/common/serialize/JsonObjectMapper.java b/src/main/java/com/sangdol/consumer/infrastructure/kafka/common/serialize/JsonObjectMapper.java new file mode 100644 index 0000000..dd3e3e3 --- /dev/null +++ b/src/main/java/com/sangdol/consumer/infrastructure/kafka/common/serialize/JsonObjectMapper.java @@ -0,0 +1,50 @@ +package com.sangdol.consumer.infrastructure.kafka.common.serialize; + +import org.springframework.stereotype.Component; + +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Component +public class JsonObjectMapper { + + private final ObjectMapper objectMapper; + + public JsonObjectMapper() { + this.objectMapper = new ObjectMapper() + .registerModule(new JavaTimeModule()) + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + } + + public T deserialize(String json, Class clazz) { + try { + return objectMapper.readValue(json, clazz); + } catch (Exception e) { + log.info("[JsonObjectMapper] Exception Occurred while deserializing data={} to class = {}", json, clazz, e); + return null; + } + } + + public T deserialize(Object object, Class clazz) { + try { + return objectMapper.convertValue(object, clazz); + } catch (Exception e) { + log.info("[JsonObjectMapper] Exception Occurred while deserializing data={} to class = {}", object, clazz, + e); + return null; + } + } + + public String serialize(Object object) { + try { + return objectMapper.writeValueAsString(object); + } catch (Exception e) { + log.info("[JsonObjectMapper] Exception Occurred while serializing object = {}", object, e); + return null; + } + } +} diff --git a/src/main/java/com/sangdol/consumer/infrastructure/kafka/common/serialize/TestRecordDeserializer.java b/src/main/java/com/sangdol/consumer/infrastructure/kafka/common/serialize/TestRecordDeserializer.java new file mode 100644 index 0000000..c400cf4 --- /dev/null +++ b/src/main/java/com/sangdol/consumer/infrastructure/kafka/common/serialize/TestRecordDeserializer.java @@ -0,0 +1,26 @@ +package com.sangdol.consumer.infrastructure.kafka.common.serialize; + +import java.nio.charset.StandardCharsets; + +import org.apache.kafka.common.serialization.Deserializer; + +import com.sangdol.consumer.domain.TestRecord; + +public class TestRecordDeserializer implements Deserializer { + + private final JsonObjectMapper jsonObjectMapper; + + public TestRecordDeserializer() { + this.jsonObjectMapper = new JsonObjectMapper(); + } + + // null-return이 아닌 DLT 전송 등의 예외 처리 필요. + @Override + public TestRecord deserialize(String topic, byte[] data) { + if (data == null) { + return null; + } + + return jsonObjectMapper.deserialize(new String(data, StandardCharsets.UTF_8), TestRecord.class); + } +} diff --git a/src/main/java/com/sangdol/consumer/infrastructure/kafka/common/serialize/TestRecordSerializer.java b/src/main/java/com/sangdol/consumer/infrastructure/kafka/common/serialize/TestRecordSerializer.java new file mode 100644 index 0000000..05caa0a --- /dev/null +++ b/src/main/java/com/sangdol/consumer/infrastructure/kafka/common/serialize/TestRecordSerializer.java @@ -0,0 +1,25 @@ +package com.sangdol.consumer.infrastructure.kafka.common.serialize; + +import java.nio.charset.StandardCharsets; + +import org.apache.kafka.common.serialization.Serializer; + +import com.sangdol.consumer.domain.TestRecord; + +public class TestRecordSerializer implements Serializer { + + private final JsonObjectMapper jsonObjectMapper; + + public TestRecordSerializer() { + this.jsonObjectMapper = new JsonObjectMapper(); + } + + @Override + public byte[] serialize(String topic, TestRecord data) { + if (data == null) { + return null; + } + + return jsonObjectMapper.serialize(data).getBytes(StandardCharsets.UTF_8); + } +} From b6e6484cb0fd5daf63e1994eb6f56a506ee82b57 Mon Sep 17 00:00:00 2001 From: pricelees Date: Fri, 27 Jun 2025 14:02:14 +0900 Subject: [PATCH 2/6] =?UTF-8?q?feat:=20=EC=B9=B4=ED=94=84=EC=B9=B4=20?= =?UTF-8?q?=EC=84=A4=EC=A0=95=EA=B0=92=EC=9D=84=20=EB=8B=B4=EB=8A=94=20Kaf?= =?UTF-8?q?kaProperties=20=ED=81=B4=EB=9E=98=EC=8A=A4=20=EB=B0=8F=20Config?= =?UTF-8?q?urationProperties=20=EC=A0=81=EC=9A=A9=EC=9D=84=20=EC=9C=84?= =?UTF-8?q?=ED=95=9C=20application.yml=20=EC=84=A4=EC=A0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../kafka/common/KafkaProperties.java | 111 ++++++++++++++++++ src/main/resources/application.yml | 65 ++++++++++ 2 files changed, 176 insertions(+) create mode 100644 src/main/java/com/sangdol/consumer/infrastructure/kafka/common/KafkaProperties.java diff --git a/src/main/java/com/sangdol/consumer/infrastructure/kafka/common/KafkaProperties.java b/src/main/java/com/sangdol/consumer/infrastructure/kafka/common/KafkaProperties.java new file mode 100644 index 0000000..bfe1b47 --- /dev/null +++ b/src/main/java/com/sangdol/consumer/infrastructure/kafka/common/KafkaProperties.java @@ -0,0 +1,111 @@ +package com.sangdol.consumer.infrastructure.kafka.common; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.kafka.common.config.SslConfigs; +import org.springframework.boot.context.properties.ConfigurationProperties; + +import lombok.Getter; +import lombok.RequiredArgsConstructor; + +@Getter +@RequiredArgsConstructor +@ConfigurationProperties(prefix = "kafka") +public class KafkaProperties { + + private final DeadLetterTopic deadLetterTopic; + private final BootStrapServers bootStrapServers; + private final Consumer consumer; + private final Producer producer; + private final Ssl ssl; + + public record DeadLetterTopic( + String name + ) { + } + + public record BootStrapServers( + String runEnvironment, + ServerAddress docker, + ServerAddress local + ) { + public String getAddressByProtocol(String protocol) { + if (runEnvironment == null || runEnvironment.isBlank()) { + throw new IllegalArgumentException( + "[KafkaProperties.BootStrapServers] runEnvironment is null or black"); + } + if (runEnvironment.equalsIgnoreCase("local")) { + return local.getAddress(protocol); + } + if (runEnvironment.equalsIgnoreCase("docker")) { + return docker.getAddress(protocol); + } + throw new IllegalArgumentException( + "[KafkaProperties.BootStrapServers] unknown runEnvironment: " + runEnvironment); + } + } + + public record Consumer( + String groupId, + String keyDeserializer, + String valueDeserializer, + String protocol, + boolean enableAutoCommit, + String autoOffsetReset, + Integer maxPollRecords, + Integer workerThreads, + List topics + ) { + public boolean isSslEnabled() { + return protocol.equalsIgnoreCase("ssl"); + } + } + + public record Producer( + String keySerializer, + String valueSerializer, + String acks, + Integer lingerMs, + String protocol + ) { + } + + public record Ssl( + String truststoreLocation, + String truststorePassword, + String keystoreLocation, + String keystorePassword, + String keyPassword + ) { + public Map toPropsMap() { + Map props = new HashMap<>(); + props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststoreLocation); + props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, truststorePassword); + props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, keystoreLocation); + props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, keystorePassword); + props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, keyPassword); + + return props; + } + } + + public record ServerAddress( + String plainText, + String ssl + ) { + public String getAddress(String protocol) { + if (protocol == null || protocol.isBlank()) { + throw new IllegalArgumentException("[KafkaProperties.ServerAddress] protocol is null or black"); + } + if (protocol.equalsIgnoreCase("plaintext")) { + return plainText; + } + if (protocol.equalsIgnoreCase("ssl")) { + return ssl; + } + throw new IllegalArgumentException("[KafkaProperties.ServerAddress] unknown protocol: " + protocol); + } + } +} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 002c42d..a7de48b 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -1,3 +1,68 @@ +server: + port: 8080 + spring: application: name: consumer + +logging: + level: + org.study.consumerssl.infrastructure.kafka.consumer: INFO + org.apache.kafka.common.network: INFO + +management: + info: + java: + enabled: true + os: + enabled: true + endpoints: + web: + exposure: + include: "health,info,prometheus" + endpoint: + health: + show-details: always + +kafka: + dead-letter-topic: + name: "test-dlt" + + bootstrap-servers: + run-environment: ${RUN_ENVIRONMENT:local} + local: + plain-text: "localhost:29092,localhost:39092,localhost:49092" + ssl: "localhost:29093,localhost:39093,localhost:49093" + docker: + plain-text: "broker-1:19092,broker-2:19092,broker-3:19092" + ssl: "broker-1:19093,broker-2:19093,broker-3:19093" + + producer: + keySerializer: "org.apache.kafka.common.serialization.StringSerializer" + valueSerializer: "org.apache.kafka.common.serialization.StringSerializer" + acks: "all" + linger-ms: 10 + protocol: "plaintext" + + consumer: + group-id: "sangdol-group" + keyDeserializer: "org.apache.kafka.common.serialization.StringDeserializer" + valueDeserializer: "com.sangdol.consumer.infrastructure.kafka.common.serialize.TestRecordDeserializer" + protocol: ${CONSUMER_PROTOCOL:plaintext} + enable-auto-commit: false + auto-offset-reset: "earliest" + max-poll-records: 500 + worker-threads: 3 + topics: + - "topic-0" + - "topic-1" + - "topic-2" + + ssl: + truststore-location: ssl/kafka.broker.truststore.jks + truststore-password: abcd1234 + keystore-location: ssl/kafka.broker.keystore.jks + keystore-password: abcd1234 + key-password: abcd1234 + + From c6b21da23e77785d9f7ee15f59d899f91059bdd6 Mon Sep 17 00:00:00 2001 From: pricelees Date: Fri, 27 Jun 2025 14:03:26 +0900 Subject: [PATCH 3/6] =?UTF-8?q?feat:=20=EC=BB=A8=EC=8A=88=EB=A8=B8,=20?= =?UTF-8?q?=ED=94=84=EB=A1=9C=EB=93=80=EC=84=9C=EC=97=90=EC=84=9C=20?= =?UTF-8?q?=EB=AA=A8=EB=91=90=20=EC=93=B0=EC=9D=B4=EB=8A=94=20Configuratio?= =?UTF-8?q?n=20=ED=81=B4=EB=9E=98=EC=8A=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 현재는 Dead Letter Topic 프로듀서 Bean만 등록 --- .../kafka/common/CommonConfiguration.java | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) create mode 100644 src/main/java/com/sangdol/consumer/infrastructure/kafka/common/CommonConfiguration.java diff --git a/src/main/java/com/sangdol/consumer/infrastructure/kafka/common/CommonConfiguration.java b/src/main/java/com/sangdol/consumer/infrastructure/kafka/common/CommonConfiguration.java new file mode 100644 index 0000000..3fb2e32 --- /dev/null +++ b/src/main/java/com/sangdol/consumer/infrastructure/kafka/common/CommonConfiguration.java @@ -0,0 +1,18 @@ +package com.sangdol.consumer.infrastructure.kafka.common; + +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import lombok.RequiredArgsConstructor; + +@Configuration +@RequiredArgsConstructor +@EnableConfigurationProperties(KafkaProperties.class) +public class CommonConfiguration { + + @Bean + public KafkaProperties.DeadLetterTopic deadLetterTopic(KafkaProperties kafkaProperties) { + return kafkaProperties.getDeadLetterTopic(); + } +} From fcc5fcffe6aef708fedb9f34deb2256aecc81c02 Mon Sep 17 00:00:00 2001 From: pricelees Date: Fri, 27 Jun 2025 14:09:57 +0900 Subject: [PATCH 4/6] =?UTF-8?q?feat:=20Dead=20Letter=20Topic=20=ED=94=84?= =?UTF-8?q?=EB=A1=9C=EB=93=80=EC=84=9C=20=EC=84=A4=EC=A0=95=20=ED=81=B4?= =?UTF-8?q?=EB=9E=98=EC=8A=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - KafkaProducer는 Thread-Safe이기에 Bean으로 등록하여 사용함. --- .../kafka/producer/ProducerConfiguration.java | 39 +++++++++++++++++++ 1 file changed, 39 insertions(+) create mode 100644 src/main/java/com/sangdol/consumer/infrastructure/kafka/producer/ProducerConfiguration.java diff --git a/src/main/java/com/sangdol/consumer/infrastructure/kafka/producer/ProducerConfiguration.java b/src/main/java/com/sangdol/consumer/infrastructure/kafka/producer/ProducerConfiguration.java new file mode 100644 index 0000000..7c6b3ac --- /dev/null +++ b/src/main/java/com/sangdol/consumer/infrastructure/kafka/producer/ProducerConfiguration.java @@ -0,0 +1,39 @@ +package com.sangdol.consumer.infrastructure.kafka.producer; + +import java.util.Properties; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.retry.annotation.EnableRetry; + +import com.sangdol.consumer.infrastructure.kafka.common.KafkaProperties; + +import lombok.RequiredArgsConstructor; + +@EnableRetry +@Configuration +@RequiredArgsConstructor +@EnableConfigurationProperties(value = KafkaProperties.class) +public class ProducerConfiguration { + + @Bean(destroyMethod = "close") + public KafkaProducer dltProducer(KafkaProperties kafkaProperties) { + return new KafkaProducer<>(producerProperties(kafkaProperties)); + } + + private Properties producerProperties(KafkaProperties properties) { + Properties props = new Properties(); + KafkaProperties.Producer producer = properties.getProducer(); + KafkaProperties.BootStrapServers bootStrapServers = properties.getBootStrapServers(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers.getAddressByProtocol(producer.protocol())); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, producer.keySerializer()); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, producer.valueSerializer()); + props.put(ProducerConfig.ACKS_CONFIG, producer.acks()); + props.put(ProducerConfig.LINGER_MS_CONFIG, producer.lingerMs()); + + return props; + } +} From 1f60a0ff33385a76271c65c58e58522d5028ad5f Mon Sep 17 00:00:00 2001 From: pricelees Date: Fri, 27 Jun 2025 14:11:16 +0900 Subject: [PATCH 5/6] =?UTF-8?q?feat:=20Dead=20Letter=20Topic=20=ED=94=84?= =?UTF-8?q?=EB=A1=9C=EB=93=80=EC=84=9C=20=EA=B8=B0=EB=8A=A5=20=EA=B5=AC?= =?UTF-8?q?=ED=98=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 최대 5회 재시도. 재시도 실패시 인메모리 컬렉션에 저장 --- .../kafka/producer/DLTProducer.java | 70 +++++++++++++++++++ .../kafka/producer/DLTRetryListener.java | 26 +++++++ 2 files changed, 96 insertions(+) create mode 100644 src/main/java/com/sangdol/consumer/infrastructure/kafka/producer/DLTProducer.java create mode 100644 src/main/java/com/sangdol/consumer/infrastructure/kafka/producer/DLTRetryListener.java diff --git a/src/main/java/com/sangdol/consumer/infrastructure/kafka/producer/DLTProducer.java b/src/main/java/com/sangdol/consumer/infrastructure/kafka/producer/DLTProducer.java new file mode 100644 index 0000000..59edd21 --- /dev/null +++ b/src/main/java/com/sangdol/consumer/infrastructure/kafka/producer/DLTProducer.java @@ -0,0 +1,70 @@ +package com.sangdol.consumer.infrastructure.kafka.producer; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.springframework.retry.annotation.Backoff; +import org.springframework.retry.annotation.Recover; +import org.springframework.retry.annotation.Retryable; +import org.springframework.stereotype.Component; + +import com.google.common.annotations.VisibleForTesting; +import com.sangdol.consumer.infrastructure.kafka.common.KafkaProperties; +import com.sangdol.consumer.infrastructure.kafka.common.serialize.JsonObjectMapper; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Component +@RequiredArgsConstructor +public class DLTProducer { + + private final KafkaProducer dltProducer; + private final KafkaProperties.DeadLetterTopic deadLetterTopic; + private final JsonObjectMapper jsonObjectMapper; + + // 실제 환경에서는 DB, 파일 시스템 등 외부에 저장 필요 + private final List failed = new ArrayList<>(); + + @Retryable( + retryFor = {Exception.class}, + maxAttempts = 5, + backoff = @Backoff(delay = 500, multiplier = 2), + listeners = {"dltRetryListener"} + ) + public void sendToDeadLetterTopic(String key, Object value, Exception exception) throws Exception { + String valueJson = jsonObjectMapper.serialize(value); + String message = jsonObjectMapper.serialize(new DltMessage(exception.getMessage(), valueJson)); + ProducerRecord producerRecord = new ProducerRecord<>(deadLetterTopic.name(), key, message); + + dltProducer.send(producerRecord).get(); + log.info("[DLTProducer] Succeed to send. key : {}, value: {}", key, valueJson); + } + + @Recover + public void recover(Exception last, String key, Object value, Exception origin) { + log.error("[DLTProducer] Failed to produce DLT message for key: {}, value: {} cause: {}", key, value, last.getMessage()); + DltMessage dltMessage = new DltMessage(origin.getMessage(), jsonObjectMapper.serialize(value)); + failed.add(new FailedDltMessage(key, dltMessage)); + } + + @VisibleForTesting + List getFailed() { + return failed; + } + + record DltMessage( + String exception, + String message + ) { + } + + record FailedDltMessage( + String key, + DltMessage message + ) { + } +} diff --git a/src/main/java/com/sangdol/consumer/infrastructure/kafka/producer/DLTRetryListener.java b/src/main/java/com/sangdol/consumer/infrastructure/kafka/producer/DLTRetryListener.java new file mode 100644 index 0000000..b10a543 --- /dev/null +++ b/src/main/java/com/sangdol/consumer/infrastructure/kafka/producer/DLTRetryListener.java @@ -0,0 +1,26 @@ +package com.sangdol.consumer.infrastructure.kafka.producer; + +import org.springframework.retry.RetryCallback; +import org.springframework.retry.RetryContext; +import org.springframework.retry.RetryListener; +import org.springframework.stereotype.Component; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Component("dltRetryListener") +public class DLTRetryListener implements RetryListener { + + @Override + public void onError( + RetryContext context, + RetryCallback callback, + Throwable throwable + ) { + int retryCount = context.getRetryCount(); + log.warn("[DLTProducer] Retry to send DLT message. Retry count: {}, Cause: {}", retryCount, + throwable.getMessage()); + + RetryListener.super.onError(context, callback, throwable); + } +} From b0456c18a26d1d51f80d0bf7941af0032a3ccb9c Mon Sep 17 00:00:00 2001 From: pricelees Date: Fri, 27 Jun 2025 14:13:33 +0900 Subject: [PATCH 6/6] =?UTF-8?q?test:=20Dead=20Letter=20Topic=20=ED=94=84?= =?UTF-8?q?=EB=A1=9C=EB=93=80=EC=84=9C=20=EC=9E=AC=EC=8B=9C=EB=8F=84=20?= =?UTF-8?q?=ED=85=8C=EC=8A=A4=ED=8A=B8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - ref: 1f60a0ff --- .../kafka/producer/DLTProducerTest.java | 78 +++++++++++++++++++ 1 file changed, 78 insertions(+) create mode 100644 src/test/java/com/sangdol/consumer/infrastructure/kafka/producer/DLTProducerTest.java diff --git a/src/test/java/com/sangdol/consumer/infrastructure/kafka/producer/DLTProducerTest.java b/src/test/java/com/sangdol/consumer/infrastructure/kafka/producer/DLTProducerTest.java new file mode 100644 index 0000000..2ffd6e6 --- /dev/null +++ b/src/test/java/com/sangdol/consumer/infrastructure/kafka/producer/DLTProducerTest.java @@ -0,0 +1,78 @@ +package com.sangdol.consumer.infrastructure.kafka.producer; + +import static org.assertj.core.api.Assertions.*; +import static org.mockito.ArgumentMatchers.*; +import static org.mockito.Mockito.*; + +import java.time.Instant; +import java.util.concurrent.Future; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.bean.override.mockito.MockitoBean; + +import com.sangdol.consumer.domain.TestRecord; +import com.sangdol.consumer.infrastructure.kafka.common.serialize.JsonObjectMapper; + +/** + * Spring Retryable 동작 및 로그 출력 확인용 테스트 + */ +@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) +class DLTProducerTest { + + @MockitoBean + private KafkaProducer kafkaProducer; + + @Autowired + private DLTProducer dltProducer; + + @Autowired + private JsonObjectMapper jsonObjectMapper; + + @DisplayName("DLT 메시지 전송이 첫 번째 시도에서는 실패하고, 두 번째 시도에서 성공한다.") + @Test + void dltRetryTest_whenSucceedSecondTrial() throws Exception { + // given + String key = "key"; + TestRecord value = new TestRecord(1234152, "topic1", Instant.now().toString()); + RuntimeException origin = new RuntimeException("Intended Exception for Test"); + Future mockfuture = mock(Future.class); + when(kafkaProducer.send(any())).thenReturn(mockfuture); + when(mockfuture.get()) + .thenThrow(origin) + .thenReturn(mock(RecordMetadata.class)); + + // when + assertThatCode(() -> dltProducer.sendToDeadLetterTopic(key, value, origin)) + .doesNotThrowAnyException(); + } + + @DisplayName("DLT 메시지 전송에 실패한다.") + @Test + void dltRetryTest_whenFailedAllTries() throws Exception { + // given + String key = "key"; + TestRecord value = new TestRecord(1234152, "topic1", Instant.now().toString()); + RuntimeException origin = new RuntimeException("Intended Exception for Test"); + Future mockfuture = mock(Future.class); + when(kafkaProducer.send(any())).thenReturn(mockfuture); + when(mockfuture.get()).thenThrow(origin); + + // when + dltProducer.sendToDeadLetterTopic(key, value, origin); + + // then + verify(kafkaProducer, times(5)).send(any()); + + assertThat(dltProducer.getFailed()).hasSize(1); + + DLTProducer.FailedDltMessage failedDltMessage = dltProducer.getFailed().get(0); + assertThat(failedDltMessage.key()).isEqualTo(key); + assertThat(failedDltMessage.message().exception()).isEqualTo(origin.getMessage()); + assertThat(failedDltMessage.message().message()).isEqualTo(jsonObjectMapper.serialize(value)); + } +} \ No newline at end of file