diff --git a/src/main/java/com/sangdol/consumer/infrastructure/kafka/consumer/ConsumerStats.java b/src/main/java/com/sangdol/consumer/infrastructure/kafka/consumer/ConsumerStats.java index ac86c11..51365e5 100644 --- a/src/main/java/com/sangdol/consumer/infrastructure/kafka/consumer/ConsumerStats.java +++ b/src/main/java/com/sangdol/consumer/infrastructure/kafka/consumer/ConsumerStats.java @@ -4,6 +4,8 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; +import com.google.common.annotations.VisibleForTesting; + public class ConsumerStats { private final int workerThreads; @@ -27,4 +29,9 @@ public class ConsumerStats { .collect(Collectors.averagingLong(Long::longValue)) .longValue(); } + + @VisibleForTesting + Map getThreadElapsedTime() { + return threadElapsedTime; + } } diff --git a/src/main/java/com/sangdol/consumer/infrastructure/kafka/consumer/RecordProcessor.java b/src/main/java/com/sangdol/consumer/infrastructure/kafka/consumer/RecordProcessor.java index bb4562a..42718ce 100644 --- a/src/main/java/com/sangdol/consumer/infrastructure/kafka/consumer/RecordProcessor.java +++ b/src/main/java/com/sangdol/consumer/infrastructure/kafka/consumer/RecordProcessor.java @@ -22,7 +22,7 @@ public class RecordProcessor { private long initialRecordReceivedTime = 0; public void process(ConsumerRecords records) throws Exception { - initializeTime(); + preProcess(); log.debug("[RecordProcessor] process {} records. thread: {}", records.count(), Thread.currentThread().getName()); @@ -50,9 +50,11 @@ public class RecordProcessor { } } - private void initializeTime() { + private void preProcess() { if (initialRecordReceivedTime == 0) { initialRecordReceivedTime = System.currentTimeMillis(); } + + emptyPollResultCount.set(0); } } diff --git a/src/test/java/com/sangdol/consumer/infrastructure/kafka/consumer/RecordProcessorTest.java b/src/test/java/com/sangdol/consumer/infrastructure/kafka/consumer/RecordProcessorTest.java new file mode 100644 index 0000000..e027488 --- /dev/null +++ b/src/test/java/com/sangdol/consumer/infrastructure/kafka/consumer/RecordProcessorTest.java @@ -0,0 +1,90 @@ +package com.sangdol.consumer.infrastructure.kafka.consumer; + +import static org.assertj.core.api.Assertions.*; +import static org.mockito.Mockito.*; + +import java.time.Instant; +import java.util.List; +import java.util.Map; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.TopicPartition; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +import com.sangdol.consumer.domain.TestRecord; +import com.sangdol.consumer.infrastructure.kafka.producer.DLTProducer; + +class RecordProcessorTest { + + private static final ConsumerRecords RECORDS = new ConsumerRecords<>(Map.of( + new TopicPartition("test", 1), + List.of(new ConsumerRecord<>( + "test", + 1, + 2, + "key", + new TestRecord(123, "test", Instant.now().toString()) + )) + )); + + private static final ConsumerRecords EMPTY = ConsumerRecords.empty(); + + @DisplayName("5회 연속으로 빈 레코드를 받으면 poll 작업의 완료로 판단한다.") + @Test + void receiveEmptyRecordsMoreThanOrEqualThreshold() { + // given + ConsumerStats consumerStats = new ConsumerStats(3); + RecordProcessor recordProcessor = new RecordProcessor(mock(DLTProducer.class), consumerStats); + + // 시간 초기화를 위해 레코드를 담아 1회 전송 + test(recordProcessor, RECORDS); + + // when + for (int i = 0; i < 5; i++) { + test(recordProcessor, EMPTY); + } + + Map result = consumerStats.getThreadElapsedTime(); + + assertThat(result).isNotEmpty(); + assertThat(result.containsKey(Thread.currentThread().getName())).isTrue(); + } + + @DisplayName("5회 연속으로 빈 레코드를 받지 않으면 poll 작업의 완료로 판단하지 않는다.") + @Test + void receiveEmptyRecordsLessThanThreshold() { + // given + ConsumerStats consumerStats = new ConsumerStats(3); + RecordProcessor recordProcessor = new RecordProcessor(mock(DLTProducer.class), consumerStats); + + // 시간 초기화를 위해 레코드를 담아 1회 전송 + test(recordProcessor, RECORDS); + + // when + // 3회 빈 레코드 -> 이후 비어있지 않은 레코드 -> 빈 레코드 + for (int i = 0; i < 3; i++) { + test(recordProcessor, EMPTY); + } + test(recordProcessor, RECORDS); + test(recordProcessor, EMPTY); + + // then + Map result = consumerStats.getThreadElapsedTime(); + + assertThat(result).isEmpty(); + } + + private void test(RecordProcessor recordProcessor, ConsumerRecords records) { + if (records.isEmpty()) { + recordProcessor.processIfEmptyRecords(); + return; + } + try { + recordProcessor.process(records); + } catch (Exception e) { + throw new RuntimeException(e); + } + } +} \ No newline at end of file