Compare commits
No commits in common. "b0456c18a26d1d51f80d0bf7941af0032a3ccb9c" and "c48e2cdf4a1a7940382a59ba1db1d30efcf478e1" have entirely different histories.
b0456c18a2
...
c48e2cdf4a
@ -1,8 +0,0 @@
|
||||
package com.sangdol.consumer.domain;
|
||||
|
||||
public record TestRecord(
|
||||
Integer userId,
|
||||
String topic,
|
||||
String timeStamp
|
||||
) {
|
||||
}
|
||||
@ -1,18 +0,0 @@
|
||||
package com.sangdol.consumer.infrastructure.kafka.common;
|
||||
|
||||
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
import lombok.RequiredArgsConstructor;
|
||||
|
||||
@Configuration
|
||||
@RequiredArgsConstructor
|
||||
@EnableConfigurationProperties(KafkaProperties.class)
|
||||
public class CommonConfiguration {
|
||||
|
||||
@Bean
|
||||
public KafkaProperties.DeadLetterTopic deadLetterTopic(KafkaProperties kafkaProperties) {
|
||||
return kafkaProperties.getDeadLetterTopic();
|
||||
}
|
||||
}
|
||||
@ -1,111 +0,0 @@
|
||||
package com.sangdol.consumer.infrastructure.kafka.common;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.kafka.common.config.SslConfigs;
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
|
||||
import lombok.Getter;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
|
||||
@Getter
|
||||
@RequiredArgsConstructor
|
||||
@ConfigurationProperties(prefix = "kafka")
|
||||
public class KafkaProperties {
|
||||
|
||||
private final DeadLetterTopic deadLetterTopic;
|
||||
private final BootStrapServers bootStrapServers;
|
||||
private final Consumer consumer;
|
||||
private final Producer producer;
|
||||
private final Ssl ssl;
|
||||
|
||||
public record DeadLetterTopic(
|
||||
String name
|
||||
) {
|
||||
}
|
||||
|
||||
public record BootStrapServers(
|
||||
String runEnvironment,
|
||||
ServerAddress docker,
|
||||
ServerAddress local
|
||||
) {
|
||||
public String getAddressByProtocol(String protocol) {
|
||||
if (runEnvironment == null || runEnvironment.isBlank()) {
|
||||
throw new IllegalArgumentException(
|
||||
"[KafkaProperties.BootStrapServers] runEnvironment is null or black");
|
||||
}
|
||||
if (runEnvironment.equalsIgnoreCase("local")) {
|
||||
return local.getAddress(protocol);
|
||||
}
|
||||
if (runEnvironment.equalsIgnoreCase("docker")) {
|
||||
return docker.getAddress(protocol);
|
||||
}
|
||||
throw new IllegalArgumentException(
|
||||
"[KafkaProperties.BootStrapServers] unknown runEnvironment: " + runEnvironment);
|
||||
}
|
||||
}
|
||||
|
||||
public record Consumer(
|
||||
String groupId,
|
||||
String keyDeserializer,
|
||||
String valueDeserializer,
|
||||
String protocol,
|
||||
boolean enableAutoCommit,
|
||||
String autoOffsetReset,
|
||||
Integer maxPollRecords,
|
||||
Integer workerThreads,
|
||||
List<String> topics
|
||||
) {
|
||||
public boolean isSslEnabled() {
|
||||
return protocol.equalsIgnoreCase("ssl");
|
||||
}
|
||||
}
|
||||
|
||||
public record Producer(
|
||||
String keySerializer,
|
||||
String valueSerializer,
|
||||
String acks,
|
||||
Integer lingerMs,
|
||||
String protocol
|
||||
) {
|
||||
}
|
||||
|
||||
public record Ssl(
|
||||
String truststoreLocation,
|
||||
String truststorePassword,
|
||||
String keystoreLocation,
|
||||
String keystorePassword,
|
||||
String keyPassword
|
||||
) {
|
||||
public Map<Object, Object> toPropsMap() {
|
||||
Map<Object, Object> props = new HashMap<>();
|
||||
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststoreLocation);
|
||||
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, truststorePassword);
|
||||
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, keystoreLocation);
|
||||
props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, keystorePassword);
|
||||
props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, keyPassword);
|
||||
|
||||
return props;
|
||||
}
|
||||
}
|
||||
|
||||
public record ServerAddress(
|
||||
String plainText,
|
||||
String ssl
|
||||
) {
|
||||
public String getAddress(String protocol) {
|
||||
if (protocol == null || protocol.isBlank()) {
|
||||
throw new IllegalArgumentException("[KafkaProperties.ServerAddress] protocol is null or black");
|
||||
}
|
||||
if (protocol.equalsIgnoreCase("plaintext")) {
|
||||
return plainText;
|
||||
}
|
||||
if (protocol.equalsIgnoreCase("ssl")) {
|
||||
return ssl;
|
||||
}
|
||||
throw new IllegalArgumentException("[KafkaProperties.ServerAddress] unknown protocol: " + protocol);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1,50 +0,0 @@
|
||||
package com.sangdol.consumer.infrastructure.kafka.common.serialize;
|
||||
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import com.fasterxml.jackson.databind.DeserializationFeature;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
@Slf4j
|
||||
@Component
|
||||
public class JsonObjectMapper {
|
||||
|
||||
private final ObjectMapper objectMapper;
|
||||
|
||||
public JsonObjectMapper() {
|
||||
this.objectMapper = new ObjectMapper()
|
||||
.registerModule(new JavaTimeModule())
|
||||
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
|
||||
}
|
||||
|
||||
public <T> T deserialize(String json, Class<T> clazz) {
|
||||
try {
|
||||
return objectMapper.readValue(json, clazz);
|
||||
} catch (Exception e) {
|
||||
log.info("[JsonObjectMapper] Exception Occurred while deserializing data={} to class = {}", json, clazz, e);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
public <T> T deserialize(Object object, Class<T> clazz) {
|
||||
try {
|
||||
return objectMapper.convertValue(object, clazz);
|
||||
} catch (Exception e) {
|
||||
log.info("[JsonObjectMapper] Exception Occurred while deserializing data={} to class = {}", object, clazz,
|
||||
e);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
public String serialize(Object object) {
|
||||
try {
|
||||
return objectMapper.writeValueAsString(object);
|
||||
} catch (Exception e) {
|
||||
log.info("[JsonObjectMapper] Exception Occurred while serializing object = {}", object, e);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1,26 +0,0 @@
|
||||
package com.sangdol.consumer.infrastructure.kafka.common.serialize;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
|
||||
import org.apache.kafka.common.serialization.Deserializer;
|
||||
|
||||
import com.sangdol.consumer.domain.TestRecord;
|
||||
|
||||
public class TestRecordDeserializer implements Deserializer<TestRecord> {
|
||||
|
||||
private final JsonObjectMapper jsonObjectMapper;
|
||||
|
||||
public TestRecordDeserializer() {
|
||||
this.jsonObjectMapper = new JsonObjectMapper();
|
||||
}
|
||||
|
||||
// null-return이 아닌 DLT 전송 등의 예외 처리 필요.
|
||||
@Override
|
||||
public TestRecord deserialize(String topic, byte[] data) {
|
||||
if (data == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return jsonObjectMapper.deserialize(new String(data, StandardCharsets.UTF_8), TestRecord.class);
|
||||
}
|
||||
}
|
||||
@ -1,25 +0,0 @@
|
||||
package com.sangdol.consumer.infrastructure.kafka.common.serialize;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
|
||||
import org.apache.kafka.common.serialization.Serializer;
|
||||
|
||||
import com.sangdol.consumer.domain.TestRecord;
|
||||
|
||||
public class TestRecordSerializer implements Serializer<TestRecord> {
|
||||
|
||||
private final JsonObjectMapper jsonObjectMapper;
|
||||
|
||||
public TestRecordSerializer() {
|
||||
this.jsonObjectMapper = new JsonObjectMapper();
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] serialize(String topic, TestRecord data) {
|
||||
if (data == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return jsonObjectMapper.serialize(data).getBytes(StandardCharsets.UTF_8);
|
||||
}
|
||||
}
|
||||
@ -1,70 +0,0 @@
|
||||
package com.sangdol.consumer.infrastructure.kafka.producer;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||
import org.springframework.retry.annotation.Backoff;
|
||||
import org.springframework.retry.annotation.Recover;
|
||||
import org.springframework.retry.annotation.Retryable;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.sangdol.consumer.infrastructure.kafka.common.KafkaProperties;
|
||||
import com.sangdol.consumer.infrastructure.kafka.common.serialize.JsonObjectMapper;
|
||||
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
@Slf4j
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
public class DLTProducer {
|
||||
|
||||
private final KafkaProducer<String, String> dltProducer;
|
||||
private final KafkaProperties.DeadLetterTopic deadLetterTopic;
|
||||
private final JsonObjectMapper jsonObjectMapper;
|
||||
|
||||
// 실제 환경에서는 DB, 파일 시스템 등 외부에 저장 필요
|
||||
private final List<FailedDltMessage> failed = new ArrayList<>();
|
||||
|
||||
@Retryable(
|
||||
retryFor = {Exception.class},
|
||||
maxAttempts = 5,
|
||||
backoff = @Backoff(delay = 500, multiplier = 2),
|
||||
listeners = {"dltRetryListener"}
|
||||
)
|
||||
public void sendToDeadLetterTopic(String key, Object value, Exception exception) throws Exception {
|
||||
String valueJson = jsonObjectMapper.serialize(value);
|
||||
String message = jsonObjectMapper.serialize(new DltMessage(exception.getMessage(), valueJson));
|
||||
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(deadLetterTopic.name(), key, message);
|
||||
|
||||
dltProducer.send(producerRecord).get();
|
||||
log.info("[DLTProducer] Succeed to send. key : {}, value: {}", key, valueJson);
|
||||
}
|
||||
|
||||
@Recover
|
||||
public void recover(Exception last, String key, Object value, Exception origin) {
|
||||
log.error("[DLTProducer] Failed to produce DLT message for key: {}, value: {} cause: {}", key, value, last.getMessage());
|
||||
DltMessage dltMessage = new DltMessage(origin.getMessage(), jsonObjectMapper.serialize(value));
|
||||
failed.add(new FailedDltMessage(key, dltMessage));
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
List<FailedDltMessage> getFailed() {
|
||||
return failed;
|
||||
}
|
||||
|
||||
record DltMessage(
|
||||
String exception,
|
||||
String message
|
||||
) {
|
||||
}
|
||||
|
||||
record FailedDltMessage(
|
||||
String key,
|
||||
DltMessage message
|
||||
) {
|
||||
}
|
||||
}
|
||||
@ -1,26 +0,0 @@
|
||||
package com.sangdol.consumer.infrastructure.kafka.producer;
|
||||
|
||||
import org.springframework.retry.RetryCallback;
|
||||
import org.springframework.retry.RetryContext;
|
||||
import org.springframework.retry.RetryListener;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
@Slf4j
|
||||
@Component("dltRetryListener")
|
||||
public class DLTRetryListener implements RetryListener {
|
||||
|
||||
@Override
|
||||
public <T, E extends Throwable> void onError(
|
||||
RetryContext context,
|
||||
RetryCallback<T, E> callback,
|
||||
Throwable throwable
|
||||
) {
|
||||
int retryCount = context.getRetryCount();
|
||||
log.warn("[DLTProducer] Retry to send DLT message. Retry count: {}, Cause: {}", retryCount,
|
||||
throwable.getMessage());
|
||||
|
||||
RetryListener.super.onError(context, callback, throwable);
|
||||
}
|
||||
}
|
||||
@ -1,39 +0,0 @@
|
||||
package com.sangdol.consumer.infrastructure.kafka.producer;
|
||||
|
||||
import java.util.Properties;
|
||||
|
||||
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||
import org.apache.kafka.clients.producer.ProducerConfig;
|
||||
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.retry.annotation.EnableRetry;
|
||||
|
||||
import com.sangdol.consumer.infrastructure.kafka.common.KafkaProperties;
|
||||
|
||||
import lombok.RequiredArgsConstructor;
|
||||
|
||||
@EnableRetry
|
||||
@Configuration
|
||||
@RequiredArgsConstructor
|
||||
@EnableConfigurationProperties(value = KafkaProperties.class)
|
||||
public class ProducerConfiguration {
|
||||
|
||||
@Bean(destroyMethod = "close")
|
||||
public KafkaProducer<String, String> dltProducer(KafkaProperties kafkaProperties) {
|
||||
return new KafkaProducer<>(producerProperties(kafkaProperties));
|
||||
}
|
||||
|
||||
private Properties producerProperties(KafkaProperties properties) {
|
||||
Properties props = new Properties();
|
||||
KafkaProperties.Producer producer = properties.getProducer();
|
||||
KafkaProperties.BootStrapServers bootStrapServers = properties.getBootStrapServers();
|
||||
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers.getAddressByProtocol(producer.protocol()));
|
||||
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, producer.keySerializer());
|
||||
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, producer.valueSerializer());
|
||||
props.put(ProducerConfig.ACKS_CONFIG, producer.acks());
|
||||
props.put(ProducerConfig.LINGER_MS_CONFIG, producer.lingerMs());
|
||||
|
||||
return props;
|
||||
}
|
||||
}
|
||||
@ -1,68 +1,3 @@
|
||||
server:
|
||||
port: 8080
|
||||
|
||||
spring:
|
||||
application:
|
||||
name: consumer
|
||||
|
||||
logging:
|
||||
level:
|
||||
org.study.consumerssl.infrastructure.kafka.consumer: INFO
|
||||
org.apache.kafka.common.network: INFO
|
||||
|
||||
management:
|
||||
info:
|
||||
java:
|
||||
enabled: true
|
||||
os:
|
||||
enabled: true
|
||||
endpoints:
|
||||
web:
|
||||
exposure:
|
||||
include: "health,info,prometheus"
|
||||
endpoint:
|
||||
health:
|
||||
show-details: always
|
||||
|
||||
kafka:
|
||||
dead-letter-topic:
|
||||
name: "test-dlt"
|
||||
|
||||
bootstrap-servers:
|
||||
run-environment: ${RUN_ENVIRONMENT:local}
|
||||
local:
|
||||
plain-text: "localhost:29092,localhost:39092,localhost:49092"
|
||||
ssl: "localhost:29093,localhost:39093,localhost:49093"
|
||||
docker:
|
||||
plain-text: "broker-1:19092,broker-2:19092,broker-3:19092"
|
||||
ssl: "broker-1:19093,broker-2:19093,broker-3:19093"
|
||||
|
||||
producer:
|
||||
keySerializer: "org.apache.kafka.common.serialization.StringSerializer"
|
||||
valueSerializer: "org.apache.kafka.common.serialization.StringSerializer"
|
||||
acks: "all"
|
||||
linger-ms: 10
|
||||
protocol: "plaintext"
|
||||
|
||||
consumer:
|
||||
group-id: "sangdol-group"
|
||||
keyDeserializer: "org.apache.kafka.common.serialization.StringDeserializer"
|
||||
valueDeserializer: "com.sangdol.consumer.infrastructure.kafka.common.serialize.TestRecordDeserializer"
|
||||
protocol: ${CONSUMER_PROTOCOL:plaintext}
|
||||
enable-auto-commit: false
|
||||
auto-offset-reset: "earliest"
|
||||
max-poll-records: 500
|
||||
worker-threads: 3
|
||||
topics:
|
||||
- "topic-0"
|
||||
- "topic-1"
|
||||
- "topic-2"
|
||||
|
||||
ssl:
|
||||
truststore-location: ssl/kafka.broker.truststore.jks
|
||||
truststore-password: abcd1234
|
||||
keystore-location: ssl/kafka.broker.keystore.jks
|
||||
keystore-password: abcd1234
|
||||
key-password: abcd1234
|
||||
|
||||
|
||||
|
||||
@ -1,78 +0,0 @@
|
||||
package com.sangdol.consumer.infrastructure.kafka.producer;
|
||||
|
||||
import static org.assertj.core.api.Assertions.*;
|
||||
import static org.mockito.ArgumentMatchers.*;
|
||||
import static org.mockito.Mockito.*;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||
import org.apache.kafka.clients.producer.RecordMetadata;
|
||||
import org.junit.jupiter.api.DisplayName;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.test.context.bean.override.mockito.MockitoBean;
|
||||
|
||||
import com.sangdol.consumer.domain.TestRecord;
|
||||
import com.sangdol.consumer.infrastructure.kafka.common.serialize.JsonObjectMapper;
|
||||
|
||||
/**
|
||||
* Spring Retryable 동작 및 로그 출력 확인용 테스트
|
||||
*/
|
||||
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
|
||||
class DLTProducerTest {
|
||||
|
||||
@MockitoBean
|
||||
private KafkaProducer<String, String> kafkaProducer;
|
||||
|
||||
@Autowired
|
||||
private DLTProducer dltProducer;
|
||||
|
||||
@Autowired
|
||||
private JsonObjectMapper jsonObjectMapper;
|
||||
|
||||
@DisplayName("DLT 메시지 전송이 첫 번째 시도에서는 실패하고, 두 번째 시도에서 성공한다.")
|
||||
@Test
|
||||
void dltRetryTest_whenSucceedSecondTrial() throws Exception {
|
||||
// given
|
||||
String key = "key";
|
||||
TestRecord value = new TestRecord(1234152, "topic1", Instant.now().toString());
|
||||
RuntimeException origin = new RuntimeException("Intended Exception for Test");
|
||||
Future<RecordMetadata> mockfuture = mock(Future.class);
|
||||
when(kafkaProducer.send(any())).thenReturn(mockfuture);
|
||||
when(mockfuture.get())
|
||||
.thenThrow(origin)
|
||||
.thenReturn(mock(RecordMetadata.class));
|
||||
|
||||
// when
|
||||
assertThatCode(() -> dltProducer.sendToDeadLetterTopic(key, value, origin))
|
||||
.doesNotThrowAnyException();
|
||||
}
|
||||
|
||||
@DisplayName("DLT 메시지 전송에 실패한다.")
|
||||
@Test
|
||||
void dltRetryTest_whenFailedAllTries() throws Exception {
|
||||
// given
|
||||
String key = "key";
|
||||
TestRecord value = new TestRecord(1234152, "topic1", Instant.now().toString());
|
||||
RuntimeException origin = new RuntimeException("Intended Exception for Test");
|
||||
Future<RecordMetadata> mockfuture = mock(Future.class);
|
||||
when(kafkaProducer.send(any())).thenReturn(mockfuture);
|
||||
when(mockfuture.get()).thenThrow(origin);
|
||||
|
||||
// when
|
||||
dltProducer.sendToDeadLetterTopic(key, value, origin);
|
||||
|
||||
// then
|
||||
verify(kafkaProducer, times(5)).send(any());
|
||||
|
||||
assertThat(dltProducer.getFailed()).hasSize(1);
|
||||
|
||||
DLTProducer.FailedDltMessage failedDltMessage = dltProducer.getFailed().get(0);
|
||||
assertThat(failedDltMessage.key()).isEqualTo(key);
|
||||
assertThat(failedDltMessage.message().exception()).isEqualTo(origin.getMessage());
|
||||
assertThat(failedDltMessage.message().message()).isEqualTo(jsonObjectMapper.serialize(value));
|
||||
}
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user