[#3] 컨슈머를 제외한 나머지 애플리케이션 코드 마이그레이션 #4
@ -0,0 +1,78 @@
|
|||||||
|
package com.sangdol.consumer.infrastructure.kafka.producer;
|
||||||
|
|
||||||
|
import static org.assertj.core.api.Assertions.*;
|
||||||
|
import static org.mockito.ArgumentMatchers.*;
|
||||||
|
import static org.mockito.Mockito.*;
|
||||||
|
|
||||||
|
import java.time.Instant;
|
||||||
|
import java.util.concurrent.Future;
|
||||||
|
|
||||||
|
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||||
|
import org.apache.kafka.clients.producer.RecordMetadata;
|
||||||
|
import org.junit.jupiter.api.DisplayName;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.boot.test.context.SpringBootTest;
|
||||||
|
import org.springframework.test.context.bean.override.mockito.MockitoBean;
|
||||||
|
|
||||||
|
import com.sangdol.consumer.domain.TestRecord;
|
||||||
|
import com.sangdol.consumer.infrastructure.kafka.common.serialize.JsonObjectMapper;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Spring Retryable 동작 및 로그 출력 확인용 테스트
|
||||||
|
*/
|
||||||
|
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
|
||||||
|
class DLTProducerTest {
|
||||||
|
|
||||||
|
@MockitoBean
|
||||||
|
private KafkaProducer<String, String> kafkaProducer;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private DLTProducer dltProducer;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private JsonObjectMapper jsonObjectMapper;
|
||||||
|
|
||||||
|
@DisplayName("DLT 메시지 전송이 첫 번째 시도에서는 실패하고, 두 번째 시도에서 성공한다.")
|
||||||
|
@Test
|
||||||
|
void dltRetryTest_whenSucceedSecondTrial() throws Exception {
|
||||||
|
// given
|
||||||
|
String key = "key";
|
||||||
|
TestRecord value = new TestRecord(1234152, "topic1", Instant.now().toString());
|
||||||
|
RuntimeException origin = new RuntimeException("Intended Exception for Test");
|
||||||
|
Future<RecordMetadata> mockfuture = mock(Future.class);
|
||||||
|
when(kafkaProducer.send(any())).thenReturn(mockfuture);
|
||||||
|
when(mockfuture.get())
|
||||||
|
.thenThrow(origin)
|
||||||
|
.thenReturn(mock(RecordMetadata.class));
|
||||||
|
|
||||||
|
// when
|
||||||
|
assertThatCode(() -> dltProducer.sendToDeadLetterTopic(key, value, origin))
|
||||||
|
.doesNotThrowAnyException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@DisplayName("DLT 메시지 전송에 실패한다.")
|
||||||
|
@Test
|
||||||
|
void dltRetryTest_whenFailedAllTries() throws Exception {
|
||||||
|
// given
|
||||||
|
String key = "key";
|
||||||
|
TestRecord value = new TestRecord(1234152, "topic1", Instant.now().toString());
|
||||||
|
RuntimeException origin = new RuntimeException("Intended Exception for Test");
|
||||||
|
Future<RecordMetadata> mockfuture = mock(Future.class);
|
||||||
|
when(kafkaProducer.send(any())).thenReturn(mockfuture);
|
||||||
|
when(mockfuture.get()).thenThrow(origin);
|
||||||
|
|
||||||
|
// when
|
||||||
|
dltProducer.sendToDeadLetterTopic(key, value, origin);
|
||||||
|
|
||||||
|
// then
|
||||||
|
verify(kafkaProducer, times(5)).send(any());
|
||||||
|
|
||||||
|
assertThat(dltProducer.getFailed()).hasSize(1);
|
||||||
|
|
||||||
|
DLTProducer.FailedDltMessage failedDltMessage = dltProducer.getFailed().get(0);
|
||||||
|
assertThat(failedDltMessage.key()).isEqualTo(key);
|
||||||
|
assertThat(failedDltMessage.message().exception()).isEqualTo(origin.getMessage());
|
||||||
|
assertThat(failedDltMessage.message().message()).isEqualTo(jsonObjectMapper.serialize(value));
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
x
Reference in New Issue
Block a user