[#3] 컨슈머를 제외한 나머지 애플리케이션 코드 마이그레이션 #4
@ -0,0 +1,8 @@
|
|||||||
|
package com.sangdol.consumer.domain;
|
||||||
|
|
||||||
|
public record TestRecord(
|
||||||
|
Integer userId,
|
||||||
|
String topic,
|
||||||
|
String timeStamp
|
||||||
|
) {
|
||||||
|
}
|
||||||
@ -0,0 +1,18 @@
|
|||||||
|
package com.sangdol.consumer.infrastructure.kafka.common;
|
||||||
|
|
||||||
|
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
||||||
|
import org.springframework.context.annotation.Bean;
|
||||||
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
|
||||||
|
import lombok.RequiredArgsConstructor;
|
||||||
|
|
||||||
|
@Configuration
|
||||||
|
@RequiredArgsConstructor
|
||||||
|
@EnableConfigurationProperties(KafkaProperties.class)
|
||||||
|
public class CommonConfiguration {
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public KafkaProperties.DeadLetterTopic deadLetterTopic(KafkaProperties kafkaProperties) {
|
||||||
|
return kafkaProperties.getDeadLetterTopic();
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -0,0 +1,111 @@
|
|||||||
|
package com.sangdol.consumer.infrastructure.kafka.common;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.apache.kafka.common.config.SslConfigs;
|
||||||
|
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||||
|
|
||||||
|
import lombok.Getter;
|
||||||
|
import lombok.RequiredArgsConstructor;
|
||||||
|
|
||||||
|
@Getter
|
||||||
|
@RequiredArgsConstructor
|
||||||
|
@ConfigurationProperties(prefix = "kafka")
|
||||||
|
public class KafkaProperties {
|
||||||
|
|
||||||
|
private final DeadLetterTopic deadLetterTopic;
|
||||||
|
private final BootStrapServers bootStrapServers;
|
||||||
|
private final Consumer consumer;
|
||||||
|
private final Producer producer;
|
||||||
|
private final Ssl ssl;
|
||||||
|
|
||||||
|
public record DeadLetterTopic(
|
||||||
|
String name
|
||||||
|
) {
|
||||||
|
}
|
||||||
|
|
||||||
|
public record BootStrapServers(
|
||||||
|
String runEnvironment,
|
||||||
|
ServerAddress docker,
|
||||||
|
ServerAddress local
|
||||||
|
) {
|
||||||
|
public String getAddressByProtocol(String protocol) {
|
||||||
|
if (runEnvironment == null || runEnvironment.isBlank()) {
|
||||||
|
throw new IllegalArgumentException(
|
||||||
|
"[KafkaProperties.BootStrapServers] runEnvironment is null or black");
|
||||||
|
}
|
||||||
|
if (runEnvironment.equalsIgnoreCase("local")) {
|
||||||
|
return local.getAddress(protocol);
|
||||||
|
}
|
||||||
|
if (runEnvironment.equalsIgnoreCase("docker")) {
|
||||||
|
return docker.getAddress(protocol);
|
||||||
|
}
|
||||||
|
throw new IllegalArgumentException(
|
||||||
|
"[KafkaProperties.BootStrapServers] unknown runEnvironment: " + runEnvironment);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public record Consumer(
|
||||||
|
String groupId,
|
||||||
|
String keyDeserializer,
|
||||||
|
String valueDeserializer,
|
||||||
|
String protocol,
|
||||||
|
boolean enableAutoCommit,
|
||||||
|
String autoOffsetReset,
|
||||||
|
Integer maxPollRecords,
|
||||||
|
Integer workerThreads,
|
||||||
|
List<String> topics
|
||||||
|
) {
|
||||||
|
public boolean isSslEnabled() {
|
||||||
|
return protocol.equalsIgnoreCase("ssl");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public record Producer(
|
||||||
|
String keySerializer,
|
||||||
|
String valueSerializer,
|
||||||
|
String acks,
|
||||||
|
Integer lingerMs,
|
||||||
|
String protocol
|
||||||
|
) {
|
||||||
|
}
|
||||||
|
|
||||||
|
public record Ssl(
|
||||||
|
String truststoreLocation,
|
||||||
|
String truststorePassword,
|
||||||
|
String keystoreLocation,
|
||||||
|
String keystorePassword,
|
||||||
|
String keyPassword
|
||||||
|
) {
|
||||||
|
public Map<Object, Object> toPropsMap() {
|
||||||
|
Map<Object, Object> props = new HashMap<>();
|
||||||
|
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststoreLocation);
|
||||||
|
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, truststorePassword);
|
||||||
|
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, keystoreLocation);
|
||||||
|
props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, keystorePassword);
|
||||||
|
props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, keyPassword);
|
||||||
|
|
||||||
|
return props;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public record ServerAddress(
|
||||||
|
String plainText,
|
||||||
|
String ssl
|
||||||
|
) {
|
||||||
|
public String getAddress(String protocol) {
|
||||||
|
if (protocol == null || protocol.isBlank()) {
|
||||||
|
throw new IllegalArgumentException("[KafkaProperties.ServerAddress] protocol is null or black");
|
||||||
|
}
|
||||||
|
if (protocol.equalsIgnoreCase("plaintext")) {
|
||||||
|
return plainText;
|
||||||
|
}
|
||||||
|
if (protocol.equalsIgnoreCase("ssl")) {
|
||||||
|
return ssl;
|
||||||
|
}
|
||||||
|
throw new IllegalArgumentException("[KafkaProperties.ServerAddress] unknown protocol: " + protocol);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -0,0 +1,50 @@
|
|||||||
|
package com.sangdol.consumer.infrastructure.kafka.common.serialize;
|
||||||
|
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.DeserializationFeature;
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
|
||||||
|
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
|
@Component
|
||||||
|
public class JsonObjectMapper {
|
||||||
|
|
||||||
|
private final ObjectMapper objectMapper;
|
||||||
|
|
||||||
|
public JsonObjectMapper() {
|
||||||
|
this.objectMapper = new ObjectMapper()
|
||||||
|
.registerModule(new JavaTimeModule())
|
||||||
|
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
public <T> T deserialize(String json, Class<T> clazz) {
|
||||||
|
try {
|
||||||
|
return objectMapper.readValue(json, clazz);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.info("[JsonObjectMapper] Exception Occurred while deserializing data={} to class = {}", json, clazz, e);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public <T> T deserialize(Object object, Class<T> clazz) {
|
||||||
|
try {
|
||||||
|
return objectMapper.convertValue(object, clazz);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.info("[JsonObjectMapper] Exception Occurred while deserializing data={} to class = {}", object, clazz,
|
||||||
|
e);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public String serialize(Object object) {
|
||||||
|
try {
|
||||||
|
return objectMapper.writeValueAsString(object);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.info("[JsonObjectMapper] Exception Occurred while serializing object = {}", object, e);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -0,0 +1,26 @@
|
|||||||
|
package com.sangdol.consumer.infrastructure.kafka.common.serialize;
|
||||||
|
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
|
||||||
|
import org.apache.kafka.common.serialization.Deserializer;
|
||||||
|
|
||||||
|
import com.sangdol.consumer.domain.TestRecord;
|
||||||
|
|
||||||
|
public class TestRecordDeserializer implements Deserializer<TestRecord> {
|
||||||
|
|
||||||
|
private final JsonObjectMapper jsonObjectMapper;
|
||||||
|
|
||||||
|
public TestRecordDeserializer() {
|
||||||
|
this.jsonObjectMapper = new JsonObjectMapper();
|
||||||
|
}
|
||||||
|
|
||||||
|
// null-return이 아닌 DLT 전송 등의 예외 처리 필요.
|
||||||
|
@Override
|
||||||
|
public TestRecord deserialize(String topic, byte[] data) {
|
||||||
|
if (data == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
return jsonObjectMapper.deserialize(new String(data, StandardCharsets.UTF_8), TestRecord.class);
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -0,0 +1,25 @@
|
|||||||
|
package com.sangdol.consumer.infrastructure.kafka.common.serialize;
|
||||||
|
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
|
||||||
|
import org.apache.kafka.common.serialization.Serializer;
|
||||||
|
|
||||||
|
import com.sangdol.consumer.domain.TestRecord;
|
||||||
|
|
||||||
|
public class TestRecordSerializer implements Serializer<TestRecord> {
|
||||||
|
|
||||||
|
private final JsonObjectMapper jsonObjectMapper;
|
||||||
|
|
||||||
|
public TestRecordSerializer() {
|
||||||
|
this.jsonObjectMapper = new JsonObjectMapper();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public byte[] serialize(String topic, TestRecord data) {
|
||||||
|
if (data == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
return jsonObjectMapper.serialize(data).getBytes(StandardCharsets.UTF_8);
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -0,0 +1,70 @@
|
|||||||
|
package com.sangdol.consumer.infrastructure.kafka.producer;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||||
|
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||||
|
import org.springframework.retry.annotation.Backoff;
|
||||||
|
import org.springframework.retry.annotation.Recover;
|
||||||
|
import org.springframework.retry.annotation.Retryable;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import com.sangdol.consumer.infrastructure.kafka.common.KafkaProperties;
|
||||||
|
import com.sangdol.consumer.infrastructure.kafka.common.serialize.JsonObjectMapper;
|
||||||
|
|
||||||
|
import lombok.RequiredArgsConstructor;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
|
@Component
|
||||||
|
@RequiredArgsConstructor
|
||||||
|
public class DLTProducer {
|
||||||
|
|
||||||
|
private final KafkaProducer<String, String> dltProducer;
|
||||||
|
private final KafkaProperties.DeadLetterTopic deadLetterTopic;
|
||||||
|
private final JsonObjectMapper jsonObjectMapper;
|
||||||
|
|
||||||
|
// 실제 환경에서는 DB, 파일 시스템 등 외부에 저장 필요
|
||||||
|
private final List<FailedDltMessage> failed = new ArrayList<>();
|
||||||
|
|
||||||
|
@Retryable(
|
||||||
|
retryFor = {Exception.class},
|
||||||
|
maxAttempts = 5,
|
||||||
|
backoff = @Backoff(delay = 500, multiplier = 2),
|
||||||
|
listeners = {"dltRetryListener"}
|
||||||
|
)
|
||||||
|
public void sendToDeadLetterTopic(String key, Object value, Exception exception) throws Exception {
|
||||||
|
String valueJson = jsonObjectMapper.serialize(value);
|
||||||
|
String message = jsonObjectMapper.serialize(new DltMessage(exception.getMessage(), valueJson));
|
||||||
|
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(deadLetterTopic.name(), key, message);
|
||||||
|
|
||||||
|
dltProducer.send(producerRecord).get();
|
||||||
|
log.info("[DLTProducer] Succeed to send. key : {}, value: {}", key, valueJson);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Recover
|
||||||
|
public void recover(Exception last, String key, Object value, Exception origin) {
|
||||||
|
log.error("[DLTProducer] Failed to produce DLT message for key: {}, value: {} cause: {}", key, value, last.getMessage());
|
||||||
|
DltMessage dltMessage = new DltMessage(origin.getMessage(), jsonObjectMapper.serialize(value));
|
||||||
|
failed.add(new FailedDltMessage(key, dltMessage));
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
List<FailedDltMessage> getFailed() {
|
||||||
|
return failed;
|
||||||
|
}
|
||||||
|
|
||||||
|
record DltMessage(
|
||||||
|
String exception,
|
||||||
|
String message
|
||||||
|
) {
|
||||||
|
}
|
||||||
|
|
||||||
|
record FailedDltMessage(
|
||||||
|
String key,
|
||||||
|
DltMessage message
|
||||||
|
) {
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -0,0 +1,26 @@
|
|||||||
|
package com.sangdol.consumer.infrastructure.kafka.producer;
|
||||||
|
|
||||||
|
import org.springframework.retry.RetryCallback;
|
||||||
|
import org.springframework.retry.RetryContext;
|
||||||
|
import org.springframework.retry.RetryListener;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
|
@Component("dltRetryListener")
|
||||||
|
public class DLTRetryListener implements RetryListener {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public <T, E extends Throwable> void onError(
|
||||||
|
RetryContext context,
|
||||||
|
RetryCallback<T, E> callback,
|
||||||
|
Throwable throwable
|
||||||
|
) {
|
||||||
|
int retryCount = context.getRetryCount();
|
||||||
|
log.warn("[DLTProducer] Retry to send DLT message. Retry count: {}, Cause: {}", retryCount,
|
||||||
|
throwable.getMessage());
|
||||||
|
|
||||||
|
RetryListener.super.onError(context, callback, throwable);
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -0,0 +1,39 @@
|
|||||||
|
package com.sangdol.consumer.infrastructure.kafka.producer;
|
||||||
|
|
||||||
|
import java.util.Properties;
|
||||||
|
|
||||||
|
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||||
|
import org.apache.kafka.clients.producer.ProducerConfig;
|
||||||
|
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
||||||
|
import org.springframework.context.annotation.Bean;
|
||||||
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
import org.springframework.retry.annotation.EnableRetry;
|
||||||
|
|
||||||
|
import com.sangdol.consumer.infrastructure.kafka.common.KafkaProperties;
|
||||||
|
|
||||||
|
import lombok.RequiredArgsConstructor;
|
||||||
|
|
||||||
|
@EnableRetry
|
||||||
|
@Configuration
|
||||||
|
@RequiredArgsConstructor
|
||||||
|
@EnableConfigurationProperties(value = KafkaProperties.class)
|
||||||
|
public class ProducerConfiguration {
|
||||||
|
|
||||||
|
@Bean(destroyMethod = "close")
|
||||||
|
public KafkaProducer<String, String> dltProducer(KafkaProperties kafkaProperties) {
|
||||||
|
return new KafkaProducer<>(producerProperties(kafkaProperties));
|
||||||
|
}
|
||||||
|
|
||||||
|
private Properties producerProperties(KafkaProperties properties) {
|
||||||
|
Properties props = new Properties();
|
||||||
|
KafkaProperties.Producer producer = properties.getProducer();
|
||||||
|
KafkaProperties.BootStrapServers bootStrapServers = properties.getBootStrapServers();
|
||||||
|
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers.getAddressByProtocol(producer.protocol()));
|
||||||
|
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, producer.keySerializer());
|
||||||
|
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, producer.valueSerializer());
|
||||||
|
props.put(ProducerConfig.ACKS_CONFIG, producer.acks());
|
||||||
|
props.put(ProducerConfig.LINGER_MS_CONFIG, producer.lingerMs());
|
||||||
|
|
||||||
|
return props;
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -1,3 +1,68 @@
|
|||||||
|
server:
|
||||||
|
port: 8080
|
||||||
|
|
||||||
spring:
|
spring:
|
||||||
application:
|
application:
|
||||||
name: consumer
|
name: consumer
|
||||||
|
|
||||||
|
logging:
|
||||||
|
level:
|
||||||
|
org.study.consumerssl.infrastructure.kafka.consumer: INFO
|
||||||
|
org.apache.kafka.common.network: INFO
|
||||||
|
|
||||||
|
management:
|
||||||
|
info:
|
||||||
|
java:
|
||||||
|
enabled: true
|
||||||
|
os:
|
||||||
|
enabled: true
|
||||||
|
endpoints:
|
||||||
|
web:
|
||||||
|
exposure:
|
||||||
|
include: "health,info,prometheus"
|
||||||
|
endpoint:
|
||||||
|
health:
|
||||||
|
show-details: always
|
||||||
|
|
||||||
|
kafka:
|
||||||
|
dead-letter-topic:
|
||||||
|
name: "test-dlt"
|
||||||
|
|
||||||
|
bootstrap-servers:
|
||||||
|
run-environment: ${RUN_ENVIRONMENT:local}
|
||||||
|
local:
|
||||||
|
plain-text: "localhost:29092,localhost:39092,localhost:49092"
|
||||||
|
ssl: "localhost:29093,localhost:39093,localhost:49093"
|
||||||
|
docker:
|
||||||
|
plain-text: "broker-1:19092,broker-2:19092,broker-3:19092"
|
||||||
|
ssl: "broker-1:19093,broker-2:19093,broker-3:19093"
|
||||||
|
|
||||||
|
producer:
|
||||||
|
keySerializer: "org.apache.kafka.common.serialization.StringSerializer"
|
||||||
|
valueSerializer: "org.apache.kafka.common.serialization.StringSerializer"
|
||||||
|
acks: "all"
|
||||||
|
linger-ms: 10
|
||||||
|
protocol: "plaintext"
|
||||||
|
|
||||||
|
consumer:
|
||||||
|
group-id: "sangdol-group"
|
||||||
|
keyDeserializer: "org.apache.kafka.common.serialization.StringDeserializer"
|
||||||
|
valueDeserializer: "com.sangdol.consumer.infrastructure.kafka.common.serialize.TestRecordDeserializer"
|
||||||
|
protocol: ${CONSUMER_PROTOCOL:plaintext}
|
||||||
|
enable-auto-commit: false
|
||||||
|
auto-offset-reset: "earliest"
|
||||||
|
max-poll-records: 500
|
||||||
|
worker-threads: 3
|
||||||
|
topics:
|
||||||
|
- "topic-0"
|
||||||
|
- "topic-1"
|
||||||
|
- "topic-2"
|
||||||
|
|
||||||
|
ssl:
|
||||||
|
truststore-location: ssl/kafka.broker.truststore.jks
|
||||||
|
truststore-password: abcd1234
|
||||||
|
keystore-location: ssl/kafka.broker.keystore.jks
|
||||||
|
keystore-password: abcd1234
|
||||||
|
key-password: abcd1234
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@ -0,0 +1,78 @@
|
|||||||
|
package com.sangdol.consumer.infrastructure.kafka.producer;
|
||||||
|
|
||||||
|
import static org.assertj.core.api.Assertions.*;
|
||||||
|
import static org.mockito.ArgumentMatchers.*;
|
||||||
|
import static org.mockito.Mockito.*;
|
||||||
|
|
||||||
|
import java.time.Instant;
|
||||||
|
import java.util.concurrent.Future;
|
||||||
|
|
||||||
|
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||||
|
import org.apache.kafka.clients.producer.RecordMetadata;
|
||||||
|
import org.junit.jupiter.api.DisplayName;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.boot.test.context.SpringBootTest;
|
||||||
|
import org.springframework.test.context.bean.override.mockito.MockitoBean;
|
||||||
|
|
||||||
|
import com.sangdol.consumer.domain.TestRecord;
|
||||||
|
import com.sangdol.consumer.infrastructure.kafka.common.serialize.JsonObjectMapper;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Spring Retryable 동작 및 로그 출력 확인용 테스트
|
||||||
|
*/
|
||||||
|
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
|
||||||
|
class DLTProducerTest {
|
||||||
|
|
||||||
|
@MockitoBean
|
||||||
|
private KafkaProducer<String, String> kafkaProducer;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private DLTProducer dltProducer;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private JsonObjectMapper jsonObjectMapper;
|
||||||
|
|
||||||
|
@DisplayName("DLT 메시지 전송이 첫 번째 시도에서는 실패하고, 두 번째 시도에서 성공한다.")
|
||||||
|
@Test
|
||||||
|
void dltRetryTest_whenSucceedSecondTrial() throws Exception {
|
||||||
|
// given
|
||||||
|
String key = "key";
|
||||||
|
TestRecord value = new TestRecord(1234152, "topic1", Instant.now().toString());
|
||||||
|
RuntimeException origin = new RuntimeException("Intended Exception for Test");
|
||||||
|
Future<RecordMetadata> mockfuture = mock(Future.class);
|
||||||
|
when(kafkaProducer.send(any())).thenReturn(mockfuture);
|
||||||
|
when(mockfuture.get())
|
||||||
|
.thenThrow(origin)
|
||||||
|
.thenReturn(mock(RecordMetadata.class));
|
||||||
|
|
||||||
|
// when
|
||||||
|
assertThatCode(() -> dltProducer.sendToDeadLetterTopic(key, value, origin))
|
||||||
|
.doesNotThrowAnyException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@DisplayName("DLT 메시지 전송에 실패한다.")
|
||||||
|
@Test
|
||||||
|
void dltRetryTest_whenFailedAllTries() throws Exception {
|
||||||
|
// given
|
||||||
|
String key = "key";
|
||||||
|
TestRecord value = new TestRecord(1234152, "topic1", Instant.now().toString());
|
||||||
|
RuntimeException origin = new RuntimeException("Intended Exception for Test");
|
||||||
|
Future<RecordMetadata> mockfuture = mock(Future.class);
|
||||||
|
when(kafkaProducer.send(any())).thenReturn(mockfuture);
|
||||||
|
when(mockfuture.get()).thenThrow(origin);
|
||||||
|
|
||||||
|
// when
|
||||||
|
dltProducer.sendToDeadLetterTopic(key, value, origin);
|
||||||
|
|
||||||
|
// then
|
||||||
|
verify(kafkaProducer, times(5)).send(any());
|
||||||
|
|
||||||
|
assertThat(dltProducer.getFailed()).hasSize(1);
|
||||||
|
|
||||||
|
DLTProducer.FailedDltMessage failedDltMessage = dltProducer.getFailed().get(0);
|
||||||
|
assertThat(failedDltMessage.key()).isEqualTo(key);
|
||||||
|
assertThat(failedDltMessage.message().exception()).isEqualTo(origin.getMessage());
|
||||||
|
assertThat(failedDltMessage.message().message()).isEqualTo(jsonObjectMapper.serialize(value));
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
x
Reference in New Issue
Block a user