Merge pull request '[#3] 컨슈머를 제외한 나머지 애플리케이션 코드 마이그레이션' (#4) from feature/#3 into main

Reviewed-on: #4
This commit is contained in:
이상진 2025-06-27 05:20:06 +00:00
commit bc79ff7035
11 changed files with 516 additions and 0 deletions

View File

@ -0,0 +1,8 @@
package com.sangdol.consumer.domain;
public record TestRecord(
Integer userId,
String topic,
String timeStamp
) {
}

View File

@ -0,0 +1,18 @@
package com.sangdol.consumer.infrastructure.kafka.common;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import lombok.RequiredArgsConstructor;
@Configuration
@RequiredArgsConstructor
@EnableConfigurationProperties(KafkaProperties.class)
public class CommonConfiguration {
@Bean
public KafkaProperties.DeadLetterTopic deadLetterTopic(KafkaProperties kafkaProperties) {
return kafkaProperties.getDeadLetterTopic();
}
}

View File

@ -0,0 +1,111 @@
package com.sangdol.consumer.infrastructure.kafka.common;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.common.config.SslConfigs;
import org.springframework.boot.context.properties.ConfigurationProperties;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
@Getter
@RequiredArgsConstructor
@ConfigurationProperties(prefix = "kafka")
public class KafkaProperties {
private final DeadLetterTopic deadLetterTopic;
private final BootStrapServers bootStrapServers;
private final Consumer consumer;
private final Producer producer;
private final Ssl ssl;
public record DeadLetterTopic(
String name
) {
}
public record BootStrapServers(
String runEnvironment,
ServerAddress docker,
ServerAddress local
) {
public String getAddressByProtocol(String protocol) {
if (runEnvironment == null || runEnvironment.isBlank()) {
throw new IllegalArgumentException(
"[KafkaProperties.BootStrapServers] runEnvironment is null or black");
}
if (runEnvironment.equalsIgnoreCase("local")) {
return local.getAddress(protocol);
}
if (runEnvironment.equalsIgnoreCase("docker")) {
return docker.getAddress(protocol);
}
throw new IllegalArgumentException(
"[KafkaProperties.BootStrapServers] unknown runEnvironment: " + runEnvironment);
}
}
public record Consumer(
String groupId,
String keyDeserializer,
String valueDeserializer,
String protocol,
boolean enableAutoCommit,
String autoOffsetReset,
Integer maxPollRecords,
Integer workerThreads,
List<String> topics
) {
public boolean isSslEnabled() {
return protocol.equalsIgnoreCase("ssl");
}
}
public record Producer(
String keySerializer,
String valueSerializer,
String acks,
Integer lingerMs,
String protocol
) {
}
public record Ssl(
String truststoreLocation,
String truststorePassword,
String keystoreLocation,
String keystorePassword,
String keyPassword
) {
public Map<Object, Object> toPropsMap() {
Map<Object, Object> props = new HashMap<>();
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststoreLocation);
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, truststorePassword);
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, keystoreLocation);
props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, keystorePassword);
props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, keyPassword);
return props;
}
}
public record ServerAddress(
String plainText,
String ssl
) {
public String getAddress(String protocol) {
if (protocol == null || protocol.isBlank()) {
throw new IllegalArgumentException("[KafkaProperties.ServerAddress] protocol is null or black");
}
if (protocol.equalsIgnoreCase("plaintext")) {
return plainText;
}
if (protocol.equalsIgnoreCase("ssl")) {
return ssl;
}
throw new IllegalArgumentException("[KafkaProperties.ServerAddress] unknown protocol: " + protocol);
}
}
}

View File

@ -0,0 +1,50 @@
package com.sangdol.consumer.infrastructure.kafka.common.serialize;
import org.springframework.stereotype.Component;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Component
public class JsonObjectMapper {
private final ObjectMapper objectMapper;
public JsonObjectMapper() {
this.objectMapper = new ObjectMapper()
.registerModule(new JavaTimeModule())
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
}
public <T> T deserialize(String json, Class<T> clazz) {
try {
return objectMapper.readValue(json, clazz);
} catch (Exception e) {
log.info("[JsonObjectMapper] Exception Occurred while deserializing data={} to class = {}", json, clazz, e);
return null;
}
}
public <T> T deserialize(Object object, Class<T> clazz) {
try {
return objectMapper.convertValue(object, clazz);
} catch (Exception e) {
log.info("[JsonObjectMapper] Exception Occurred while deserializing data={} to class = {}", object, clazz,
e);
return null;
}
}
public String serialize(Object object) {
try {
return objectMapper.writeValueAsString(object);
} catch (Exception e) {
log.info("[JsonObjectMapper] Exception Occurred while serializing object = {}", object, e);
return null;
}
}
}

View File

@ -0,0 +1,26 @@
package com.sangdol.consumer.infrastructure.kafka.common.serialize;
import java.nio.charset.StandardCharsets;
import org.apache.kafka.common.serialization.Deserializer;
import com.sangdol.consumer.domain.TestRecord;
public class TestRecordDeserializer implements Deserializer<TestRecord> {
private final JsonObjectMapper jsonObjectMapper;
public TestRecordDeserializer() {
this.jsonObjectMapper = new JsonObjectMapper();
}
// null-return이 아닌 DLT 전송 등의 예외 처리 필요.
@Override
public TestRecord deserialize(String topic, byte[] data) {
if (data == null) {
return null;
}
return jsonObjectMapper.deserialize(new String(data, StandardCharsets.UTF_8), TestRecord.class);
}
}

View File

@ -0,0 +1,25 @@
package com.sangdol.consumer.infrastructure.kafka.common.serialize;
import java.nio.charset.StandardCharsets;
import org.apache.kafka.common.serialization.Serializer;
import com.sangdol.consumer.domain.TestRecord;
public class TestRecordSerializer implements Serializer<TestRecord> {
private final JsonObjectMapper jsonObjectMapper;
public TestRecordSerializer() {
this.jsonObjectMapper = new JsonObjectMapper();
}
@Override
public byte[] serialize(String topic, TestRecord data) {
if (data == null) {
return null;
}
return jsonObjectMapper.serialize(data).getBytes(StandardCharsets.UTF_8);
}
}

View File

@ -0,0 +1,70 @@
package com.sangdol.consumer.infrastructure.kafka.producer;
import java.util.ArrayList;
import java.util.List;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.retry.annotation.Backoff;
import org.springframework.retry.annotation.Recover;
import org.springframework.retry.annotation.Retryable;
import org.springframework.stereotype.Component;
import com.google.common.annotations.VisibleForTesting;
import com.sangdol.consumer.infrastructure.kafka.common.KafkaProperties;
import com.sangdol.consumer.infrastructure.kafka.common.serialize.JsonObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Component
@RequiredArgsConstructor
public class DLTProducer {
private final KafkaProducer<String, String> dltProducer;
private final KafkaProperties.DeadLetterTopic deadLetterTopic;
private final JsonObjectMapper jsonObjectMapper;
// 실제 환경에서는 DB, 파일 시스템 외부에 저장 필요
private final List<FailedDltMessage> failed = new ArrayList<>();
@Retryable(
retryFor = {Exception.class},
maxAttempts = 5,
backoff = @Backoff(delay = 500, multiplier = 2),
listeners = {"dltRetryListener"}
)
public void sendToDeadLetterTopic(String key, Object value, Exception exception) throws Exception {
String valueJson = jsonObjectMapper.serialize(value);
String message = jsonObjectMapper.serialize(new DltMessage(exception.getMessage(), valueJson));
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(deadLetterTopic.name(), key, message);
dltProducer.send(producerRecord).get();
log.info("[DLTProducer] Succeed to send. key : {}, value: {}", key, valueJson);
}
@Recover
public void recover(Exception last, String key, Object value, Exception origin) {
log.error("[DLTProducer] Failed to produce DLT message for key: {}, value: {} cause: {}", key, value, last.getMessage());
DltMessage dltMessage = new DltMessage(origin.getMessage(), jsonObjectMapper.serialize(value));
failed.add(new FailedDltMessage(key, dltMessage));
}
@VisibleForTesting
List<FailedDltMessage> getFailed() {
return failed;
}
record DltMessage(
String exception,
String message
) {
}
record FailedDltMessage(
String key,
DltMessage message
) {
}
}

View File

@ -0,0 +1,26 @@
package com.sangdol.consumer.infrastructure.kafka.producer;
import org.springframework.retry.RetryCallback;
import org.springframework.retry.RetryContext;
import org.springframework.retry.RetryListener;
import org.springframework.stereotype.Component;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Component("dltRetryListener")
public class DLTRetryListener implements RetryListener {
@Override
public <T, E extends Throwable> void onError(
RetryContext context,
RetryCallback<T, E> callback,
Throwable throwable
) {
int retryCount = context.getRetryCount();
log.warn("[DLTProducer] Retry to send DLT message. Retry count: {}, Cause: {}", retryCount,
throwable.getMessage());
RetryListener.super.onError(context, callback, throwable);
}
}

View File

@ -0,0 +1,39 @@
package com.sangdol.consumer.infrastructure.kafka.producer;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.retry.annotation.EnableRetry;
import com.sangdol.consumer.infrastructure.kafka.common.KafkaProperties;
import lombok.RequiredArgsConstructor;
@EnableRetry
@Configuration
@RequiredArgsConstructor
@EnableConfigurationProperties(value = KafkaProperties.class)
public class ProducerConfiguration {
@Bean(destroyMethod = "close")
public KafkaProducer<String, String> dltProducer(KafkaProperties kafkaProperties) {
return new KafkaProducer<>(producerProperties(kafkaProperties));
}
private Properties producerProperties(KafkaProperties properties) {
Properties props = new Properties();
KafkaProperties.Producer producer = properties.getProducer();
KafkaProperties.BootStrapServers bootStrapServers = properties.getBootStrapServers();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers.getAddressByProtocol(producer.protocol()));
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, producer.keySerializer());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, producer.valueSerializer());
props.put(ProducerConfig.ACKS_CONFIG, producer.acks());
props.put(ProducerConfig.LINGER_MS_CONFIG, producer.lingerMs());
return props;
}
}

View File

@ -1,3 +1,68 @@
server:
port: 8080
spring: spring:
application: application:
name: consumer name: consumer
logging:
level:
org.study.consumerssl.infrastructure.kafka.consumer: INFO
org.apache.kafka.common.network: INFO
management:
info:
java:
enabled: true
os:
enabled: true
endpoints:
web:
exposure:
include: "health,info,prometheus"
endpoint:
health:
show-details: always
kafka:
dead-letter-topic:
name: "test-dlt"
bootstrap-servers:
run-environment: ${RUN_ENVIRONMENT:local}
local:
plain-text: "localhost:29092,localhost:39092,localhost:49092"
ssl: "localhost:29093,localhost:39093,localhost:49093"
docker:
plain-text: "broker-1:19092,broker-2:19092,broker-3:19092"
ssl: "broker-1:19093,broker-2:19093,broker-3:19093"
producer:
keySerializer: "org.apache.kafka.common.serialization.StringSerializer"
valueSerializer: "org.apache.kafka.common.serialization.StringSerializer"
acks: "all"
linger-ms: 10
protocol: "plaintext"
consumer:
group-id: "sangdol-group"
keyDeserializer: "org.apache.kafka.common.serialization.StringDeserializer"
valueDeserializer: "com.sangdol.consumer.infrastructure.kafka.common.serialize.TestRecordDeserializer"
protocol: ${CONSUMER_PROTOCOL:plaintext}
enable-auto-commit: false
auto-offset-reset: "earliest"
max-poll-records: 500
worker-threads: 3
topics:
- "topic-0"
- "topic-1"
- "topic-2"
ssl:
truststore-location: ssl/kafka.broker.truststore.jks
truststore-password: abcd1234
keystore-location: ssl/kafka.broker.keystore.jks
keystore-password: abcd1234
key-password: abcd1234

View File

@ -0,0 +1,78 @@
package com.sangdol.consumer.infrastructure.kafka.producer;
import static org.assertj.core.api.Assertions.*;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.*;
import java.time.Instant;
import java.util.concurrent.Future;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.bean.override.mockito.MockitoBean;
import com.sangdol.consumer.domain.TestRecord;
import com.sangdol.consumer.infrastructure.kafka.common.serialize.JsonObjectMapper;
/**
* Spring Retryable 동작 로그 출력 확인용 테스트
*/
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
class DLTProducerTest {
@MockitoBean
private KafkaProducer<String, String> kafkaProducer;
@Autowired
private DLTProducer dltProducer;
@Autowired
private JsonObjectMapper jsonObjectMapper;
@DisplayName("DLT 메시지 전송이 첫 번째 시도에서는 실패하고, 두 번째 시도에서 성공한다.")
@Test
void dltRetryTest_whenSucceedSecondTrial() throws Exception {
// given
String key = "key";
TestRecord value = new TestRecord(1234152, "topic1", Instant.now().toString());
RuntimeException origin = new RuntimeException("Intended Exception for Test");
Future<RecordMetadata> mockfuture = mock(Future.class);
when(kafkaProducer.send(any())).thenReturn(mockfuture);
when(mockfuture.get())
.thenThrow(origin)
.thenReturn(mock(RecordMetadata.class));
// when
assertThatCode(() -> dltProducer.sendToDeadLetterTopic(key, value, origin))
.doesNotThrowAnyException();
}
@DisplayName("DLT 메시지 전송에 실패한다.")
@Test
void dltRetryTest_whenFailedAllTries() throws Exception {
// given
String key = "key";
TestRecord value = new TestRecord(1234152, "topic1", Instant.now().toString());
RuntimeException origin = new RuntimeException("Intended Exception for Test");
Future<RecordMetadata> mockfuture = mock(Future.class);
when(kafkaProducer.send(any())).thenReturn(mockfuture);
when(mockfuture.get()).thenThrow(origin);
// when
dltProducer.sendToDeadLetterTopic(key, value, origin);
// then
verify(kafkaProducer, times(5)).send(any());
assertThat(dltProducer.getFailed()).hasSize(1);
DLTProducer.FailedDltMessage failedDltMessage = dltProducer.getFailed().get(0);
assertThat(failedDltMessage.key()).isEqualTo(key);
assertThat(failedDltMessage.message().exception()).isEqualTo(origin.getMessage());
assertThat(failedDltMessage.message().message()).isEqualTo(jsonObjectMapper.serialize(value));
}
}