diff --git a/src/main/java/com/sangdol/consumer/domain/TestRecord.java b/src/main/java/com/sangdol/consumer/domain/TestRecord.java new file mode 100644 index 0000000..e08bae4 --- /dev/null +++ b/src/main/java/com/sangdol/consumer/domain/TestRecord.java @@ -0,0 +1,8 @@ +package com.sangdol.consumer.domain; + +public record TestRecord( + Integer userId, + String topic, + String timeStamp +) { +} diff --git a/src/main/java/com/sangdol/consumer/infrastructure/kafka/common/CommonConfiguration.java b/src/main/java/com/sangdol/consumer/infrastructure/kafka/common/CommonConfiguration.java new file mode 100644 index 0000000..3fb2e32 --- /dev/null +++ b/src/main/java/com/sangdol/consumer/infrastructure/kafka/common/CommonConfiguration.java @@ -0,0 +1,18 @@ +package com.sangdol.consumer.infrastructure.kafka.common; + +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import lombok.RequiredArgsConstructor; + +@Configuration +@RequiredArgsConstructor +@EnableConfigurationProperties(KafkaProperties.class) +public class CommonConfiguration { + + @Bean + public KafkaProperties.DeadLetterTopic deadLetterTopic(KafkaProperties kafkaProperties) { + return kafkaProperties.getDeadLetterTopic(); + } +} diff --git a/src/main/java/com/sangdol/consumer/infrastructure/kafka/common/KafkaProperties.java b/src/main/java/com/sangdol/consumer/infrastructure/kafka/common/KafkaProperties.java new file mode 100644 index 0000000..bfe1b47 --- /dev/null +++ b/src/main/java/com/sangdol/consumer/infrastructure/kafka/common/KafkaProperties.java @@ -0,0 +1,111 @@ +package com.sangdol.consumer.infrastructure.kafka.common; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.kafka.common.config.SslConfigs; +import org.springframework.boot.context.properties.ConfigurationProperties; + +import lombok.Getter; +import lombok.RequiredArgsConstructor; + +@Getter +@RequiredArgsConstructor +@ConfigurationProperties(prefix = "kafka") +public class KafkaProperties { + + private final DeadLetterTopic deadLetterTopic; + private final BootStrapServers bootStrapServers; + private final Consumer consumer; + private final Producer producer; + private final Ssl ssl; + + public record DeadLetterTopic( + String name + ) { + } + + public record BootStrapServers( + String runEnvironment, + ServerAddress docker, + ServerAddress local + ) { + public String getAddressByProtocol(String protocol) { + if (runEnvironment == null || runEnvironment.isBlank()) { + throw new IllegalArgumentException( + "[KafkaProperties.BootStrapServers] runEnvironment is null or black"); + } + if (runEnvironment.equalsIgnoreCase("local")) { + return local.getAddress(protocol); + } + if (runEnvironment.equalsIgnoreCase("docker")) { + return docker.getAddress(protocol); + } + throw new IllegalArgumentException( + "[KafkaProperties.BootStrapServers] unknown runEnvironment: " + runEnvironment); + } + } + + public record Consumer( + String groupId, + String keyDeserializer, + String valueDeserializer, + String protocol, + boolean enableAutoCommit, + String autoOffsetReset, + Integer maxPollRecords, + Integer workerThreads, + List topics + ) { + public boolean isSslEnabled() { + return protocol.equalsIgnoreCase("ssl"); + } + } + + public record Producer( + String keySerializer, + String valueSerializer, + String acks, + Integer lingerMs, + String protocol + ) { + } + + public record Ssl( + String truststoreLocation, + String truststorePassword, + String keystoreLocation, + String keystorePassword, + String keyPassword + ) { + public Map toPropsMap() { + Map props = new HashMap<>(); + props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststoreLocation); + props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, truststorePassword); + props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, keystoreLocation); + props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, keystorePassword); + props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, keyPassword); + + return props; + } + } + + public record ServerAddress( + String plainText, + String ssl + ) { + public String getAddress(String protocol) { + if (protocol == null || protocol.isBlank()) { + throw new IllegalArgumentException("[KafkaProperties.ServerAddress] protocol is null or black"); + } + if (protocol.equalsIgnoreCase("plaintext")) { + return plainText; + } + if (protocol.equalsIgnoreCase("ssl")) { + return ssl; + } + throw new IllegalArgumentException("[KafkaProperties.ServerAddress] unknown protocol: " + protocol); + } + } +} diff --git a/src/main/java/com/sangdol/consumer/infrastructure/kafka/common/serialize/JsonObjectMapper.java b/src/main/java/com/sangdol/consumer/infrastructure/kafka/common/serialize/JsonObjectMapper.java new file mode 100644 index 0000000..dd3e3e3 --- /dev/null +++ b/src/main/java/com/sangdol/consumer/infrastructure/kafka/common/serialize/JsonObjectMapper.java @@ -0,0 +1,50 @@ +package com.sangdol.consumer.infrastructure.kafka.common.serialize; + +import org.springframework.stereotype.Component; + +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Component +public class JsonObjectMapper { + + private final ObjectMapper objectMapper; + + public JsonObjectMapper() { + this.objectMapper = new ObjectMapper() + .registerModule(new JavaTimeModule()) + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + } + + public T deserialize(String json, Class clazz) { + try { + return objectMapper.readValue(json, clazz); + } catch (Exception e) { + log.info("[JsonObjectMapper] Exception Occurred while deserializing data={} to class = {}", json, clazz, e); + return null; + } + } + + public T deserialize(Object object, Class clazz) { + try { + return objectMapper.convertValue(object, clazz); + } catch (Exception e) { + log.info("[JsonObjectMapper] Exception Occurred while deserializing data={} to class = {}", object, clazz, + e); + return null; + } + } + + public String serialize(Object object) { + try { + return objectMapper.writeValueAsString(object); + } catch (Exception e) { + log.info("[JsonObjectMapper] Exception Occurred while serializing object = {}", object, e); + return null; + } + } +} diff --git a/src/main/java/com/sangdol/consumer/infrastructure/kafka/common/serialize/TestRecordDeserializer.java b/src/main/java/com/sangdol/consumer/infrastructure/kafka/common/serialize/TestRecordDeserializer.java new file mode 100644 index 0000000..c400cf4 --- /dev/null +++ b/src/main/java/com/sangdol/consumer/infrastructure/kafka/common/serialize/TestRecordDeserializer.java @@ -0,0 +1,26 @@ +package com.sangdol.consumer.infrastructure.kafka.common.serialize; + +import java.nio.charset.StandardCharsets; + +import org.apache.kafka.common.serialization.Deserializer; + +import com.sangdol.consumer.domain.TestRecord; + +public class TestRecordDeserializer implements Deserializer { + + private final JsonObjectMapper jsonObjectMapper; + + public TestRecordDeserializer() { + this.jsonObjectMapper = new JsonObjectMapper(); + } + + // null-return이 아닌 DLT 전송 등의 예외 처리 필요. + @Override + public TestRecord deserialize(String topic, byte[] data) { + if (data == null) { + return null; + } + + return jsonObjectMapper.deserialize(new String(data, StandardCharsets.UTF_8), TestRecord.class); + } +} diff --git a/src/main/java/com/sangdol/consumer/infrastructure/kafka/common/serialize/TestRecordSerializer.java b/src/main/java/com/sangdol/consumer/infrastructure/kafka/common/serialize/TestRecordSerializer.java new file mode 100644 index 0000000..05caa0a --- /dev/null +++ b/src/main/java/com/sangdol/consumer/infrastructure/kafka/common/serialize/TestRecordSerializer.java @@ -0,0 +1,25 @@ +package com.sangdol.consumer.infrastructure.kafka.common.serialize; + +import java.nio.charset.StandardCharsets; + +import org.apache.kafka.common.serialization.Serializer; + +import com.sangdol.consumer.domain.TestRecord; + +public class TestRecordSerializer implements Serializer { + + private final JsonObjectMapper jsonObjectMapper; + + public TestRecordSerializer() { + this.jsonObjectMapper = new JsonObjectMapper(); + } + + @Override + public byte[] serialize(String topic, TestRecord data) { + if (data == null) { + return null; + } + + return jsonObjectMapper.serialize(data).getBytes(StandardCharsets.UTF_8); + } +} diff --git a/src/main/java/com/sangdol/consumer/infrastructure/kafka/producer/DLTProducer.java b/src/main/java/com/sangdol/consumer/infrastructure/kafka/producer/DLTProducer.java new file mode 100644 index 0000000..59edd21 --- /dev/null +++ b/src/main/java/com/sangdol/consumer/infrastructure/kafka/producer/DLTProducer.java @@ -0,0 +1,70 @@ +package com.sangdol.consumer.infrastructure.kafka.producer; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.springframework.retry.annotation.Backoff; +import org.springframework.retry.annotation.Recover; +import org.springframework.retry.annotation.Retryable; +import org.springframework.stereotype.Component; + +import com.google.common.annotations.VisibleForTesting; +import com.sangdol.consumer.infrastructure.kafka.common.KafkaProperties; +import com.sangdol.consumer.infrastructure.kafka.common.serialize.JsonObjectMapper; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Component +@RequiredArgsConstructor +public class DLTProducer { + + private final KafkaProducer dltProducer; + private final KafkaProperties.DeadLetterTopic deadLetterTopic; + private final JsonObjectMapper jsonObjectMapper; + + // 실제 환경에서는 DB, 파일 시스템 등 외부에 저장 필요 + private final List failed = new ArrayList<>(); + + @Retryable( + retryFor = {Exception.class}, + maxAttempts = 5, + backoff = @Backoff(delay = 500, multiplier = 2), + listeners = {"dltRetryListener"} + ) + public void sendToDeadLetterTopic(String key, Object value, Exception exception) throws Exception { + String valueJson = jsonObjectMapper.serialize(value); + String message = jsonObjectMapper.serialize(new DltMessage(exception.getMessage(), valueJson)); + ProducerRecord producerRecord = new ProducerRecord<>(deadLetterTopic.name(), key, message); + + dltProducer.send(producerRecord).get(); + log.info("[DLTProducer] Succeed to send. key : {}, value: {}", key, valueJson); + } + + @Recover + public void recover(Exception last, String key, Object value, Exception origin) { + log.error("[DLTProducer] Failed to produce DLT message for key: {}, value: {} cause: {}", key, value, last.getMessage()); + DltMessage dltMessage = new DltMessage(origin.getMessage(), jsonObjectMapper.serialize(value)); + failed.add(new FailedDltMessage(key, dltMessage)); + } + + @VisibleForTesting + List getFailed() { + return failed; + } + + record DltMessage( + String exception, + String message + ) { + } + + record FailedDltMessage( + String key, + DltMessage message + ) { + } +} diff --git a/src/main/java/com/sangdol/consumer/infrastructure/kafka/producer/DLTRetryListener.java b/src/main/java/com/sangdol/consumer/infrastructure/kafka/producer/DLTRetryListener.java new file mode 100644 index 0000000..b10a543 --- /dev/null +++ b/src/main/java/com/sangdol/consumer/infrastructure/kafka/producer/DLTRetryListener.java @@ -0,0 +1,26 @@ +package com.sangdol.consumer.infrastructure.kafka.producer; + +import org.springframework.retry.RetryCallback; +import org.springframework.retry.RetryContext; +import org.springframework.retry.RetryListener; +import org.springframework.stereotype.Component; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Component("dltRetryListener") +public class DLTRetryListener implements RetryListener { + + @Override + public void onError( + RetryContext context, + RetryCallback callback, + Throwable throwable + ) { + int retryCount = context.getRetryCount(); + log.warn("[DLTProducer] Retry to send DLT message. Retry count: {}, Cause: {}", retryCount, + throwable.getMessage()); + + RetryListener.super.onError(context, callback, throwable); + } +} diff --git a/src/main/java/com/sangdol/consumer/infrastructure/kafka/producer/ProducerConfiguration.java b/src/main/java/com/sangdol/consumer/infrastructure/kafka/producer/ProducerConfiguration.java new file mode 100644 index 0000000..7c6b3ac --- /dev/null +++ b/src/main/java/com/sangdol/consumer/infrastructure/kafka/producer/ProducerConfiguration.java @@ -0,0 +1,39 @@ +package com.sangdol.consumer.infrastructure.kafka.producer; + +import java.util.Properties; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.retry.annotation.EnableRetry; + +import com.sangdol.consumer.infrastructure.kafka.common.KafkaProperties; + +import lombok.RequiredArgsConstructor; + +@EnableRetry +@Configuration +@RequiredArgsConstructor +@EnableConfigurationProperties(value = KafkaProperties.class) +public class ProducerConfiguration { + + @Bean(destroyMethod = "close") + public KafkaProducer dltProducer(KafkaProperties kafkaProperties) { + return new KafkaProducer<>(producerProperties(kafkaProperties)); + } + + private Properties producerProperties(KafkaProperties properties) { + Properties props = new Properties(); + KafkaProperties.Producer producer = properties.getProducer(); + KafkaProperties.BootStrapServers bootStrapServers = properties.getBootStrapServers(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers.getAddressByProtocol(producer.protocol())); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, producer.keySerializer()); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, producer.valueSerializer()); + props.put(ProducerConfig.ACKS_CONFIG, producer.acks()); + props.put(ProducerConfig.LINGER_MS_CONFIG, producer.lingerMs()); + + return props; + } +} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 002c42d..a7de48b 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -1,3 +1,68 @@ +server: + port: 8080 + spring: application: name: consumer + +logging: + level: + org.study.consumerssl.infrastructure.kafka.consumer: INFO + org.apache.kafka.common.network: INFO + +management: + info: + java: + enabled: true + os: + enabled: true + endpoints: + web: + exposure: + include: "health,info,prometheus" + endpoint: + health: + show-details: always + +kafka: + dead-letter-topic: + name: "test-dlt" + + bootstrap-servers: + run-environment: ${RUN_ENVIRONMENT:local} + local: + plain-text: "localhost:29092,localhost:39092,localhost:49092" + ssl: "localhost:29093,localhost:39093,localhost:49093" + docker: + plain-text: "broker-1:19092,broker-2:19092,broker-3:19092" + ssl: "broker-1:19093,broker-2:19093,broker-3:19093" + + producer: + keySerializer: "org.apache.kafka.common.serialization.StringSerializer" + valueSerializer: "org.apache.kafka.common.serialization.StringSerializer" + acks: "all" + linger-ms: 10 + protocol: "plaintext" + + consumer: + group-id: "sangdol-group" + keyDeserializer: "org.apache.kafka.common.serialization.StringDeserializer" + valueDeserializer: "com.sangdol.consumer.infrastructure.kafka.common.serialize.TestRecordDeserializer" + protocol: ${CONSUMER_PROTOCOL:plaintext} + enable-auto-commit: false + auto-offset-reset: "earliest" + max-poll-records: 500 + worker-threads: 3 + topics: + - "topic-0" + - "topic-1" + - "topic-2" + + ssl: + truststore-location: ssl/kafka.broker.truststore.jks + truststore-password: abcd1234 + keystore-location: ssl/kafka.broker.keystore.jks + keystore-password: abcd1234 + key-password: abcd1234 + + diff --git a/src/test/java/com/sangdol/consumer/infrastructure/kafka/producer/DLTProducerTest.java b/src/test/java/com/sangdol/consumer/infrastructure/kafka/producer/DLTProducerTest.java new file mode 100644 index 0000000..2ffd6e6 --- /dev/null +++ b/src/test/java/com/sangdol/consumer/infrastructure/kafka/producer/DLTProducerTest.java @@ -0,0 +1,78 @@ +package com.sangdol.consumer.infrastructure.kafka.producer; + +import static org.assertj.core.api.Assertions.*; +import static org.mockito.ArgumentMatchers.*; +import static org.mockito.Mockito.*; + +import java.time.Instant; +import java.util.concurrent.Future; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.bean.override.mockito.MockitoBean; + +import com.sangdol.consumer.domain.TestRecord; +import com.sangdol.consumer.infrastructure.kafka.common.serialize.JsonObjectMapper; + +/** + * Spring Retryable 동작 및 로그 출력 확인용 테스트 + */ +@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) +class DLTProducerTest { + + @MockitoBean + private KafkaProducer kafkaProducer; + + @Autowired + private DLTProducer dltProducer; + + @Autowired + private JsonObjectMapper jsonObjectMapper; + + @DisplayName("DLT 메시지 전송이 첫 번째 시도에서는 실패하고, 두 번째 시도에서 성공한다.") + @Test + void dltRetryTest_whenSucceedSecondTrial() throws Exception { + // given + String key = "key"; + TestRecord value = new TestRecord(1234152, "topic1", Instant.now().toString()); + RuntimeException origin = new RuntimeException("Intended Exception for Test"); + Future mockfuture = mock(Future.class); + when(kafkaProducer.send(any())).thenReturn(mockfuture); + when(mockfuture.get()) + .thenThrow(origin) + .thenReturn(mock(RecordMetadata.class)); + + // when + assertThatCode(() -> dltProducer.sendToDeadLetterTopic(key, value, origin)) + .doesNotThrowAnyException(); + } + + @DisplayName("DLT 메시지 전송에 실패한다.") + @Test + void dltRetryTest_whenFailedAllTries() throws Exception { + // given + String key = "key"; + TestRecord value = new TestRecord(1234152, "topic1", Instant.now().toString()); + RuntimeException origin = new RuntimeException("Intended Exception for Test"); + Future mockfuture = mock(Future.class); + when(kafkaProducer.send(any())).thenReturn(mockfuture); + when(mockfuture.get()).thenThrow(origin); + + // when + dltProducer.sendToDeadLetterTopic(key, value, origin); + + // then + verify(kafkaProducer, times(5)).send(any()); + + assertThat(dltProducer.getFailed()).hasSize(1); + + DLTProducer.FailedDltMessage failedDltMessage = dltProducer.getFailed().get(0); + assertThat(failedDltMessage.key()).isEqualTo(key); + assertThat(failedDltMessage.message().exception()).isEqualTo(origin.getMessage()); + assertThat(failedDltMessage.message().message()).isEqualTo(jsonObjectMapper.serialize(value)); + } +} \ No newline at end of file