From e5916eb3053699226b7c5b871ec4c4335c94ce16 Mon Sep 17 00:00:00 2001 From: pricelees Date: Fri, 27 Jun 2025 16:08:45 +0900 Subject: [PATCH 1/2] =?UTF-8?q?refactor:=20'=EC=97=B0=EC=86=8D'=20?= =?UTF-8?q?=ED=8C=90=EB=8B=A8=EC=9D=84=20=EC=9C=84=ED=95=B4=20=EB=B9=84?= =?UTF-8?q?=EC=96=B4=EC=9E=88=EC=A7=80=20=EC=95=8A=EC=9D=80=20=EB=A0=88?= =?UTF-8?q?=EC=BD=94=EB=93=9C=EB=A5=BC=20=EC=9E=85=EB=A0=A5=EB=B0=9B?= =?UTF-8?q?=EB=8A=94=20=EA=B2=BD=EC=9A=B0=20count=EB=A5=BC=200=EC=9C=BC?= =?UTF-8?q?=EB=A1=9C=20=EC=84=A4=EC=A0=95=ED=95=98=EB=8F=84=EB=A1=9D=20?= =?UTF-8?q?=EA=B8=B0=EB=8A=A5=20=EC=88=98=EC=A0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../infrastructure/kafka/consumer/RecordProcessor.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/sangdol/consumer/infrastructure/kafka/consumer/RecordProcessor.java b/src/main/java/com/sangdol/consumer/infrastructure/kafka/consumer/RecordProcessor.java index bb4562a..42718ce 100644 --- a/src/main/java/com/sangdol/consumer/infrastructure/kafka/consumer/RecordProcessor.java +++ b/src/main/java/com/sangdol/consumer/infrastructure/kafka/consumer/RecordProcessor.java @@ -22,7 +22,7 @@ public class RecordProcessor { private long initialRecordReceivedTime = 0; public void process(ConsumerRecords records) throws Exception { - initializeTime(); + preProcess(); log.debug("[RecordProcessor] process {} records. thread: {}", records.count(), Thread.currentThread().getName()); @@ -50,9 +50,11 @@ public class RecordProcessor { } } - private void initializeTime() { + private void preProcess() { if (initialRecordReceivedTime == 0) { initialRecordReceivedTime = System.currentTimeMillis(); } + + emptyPollResultCount.set(0); } } -- 2.47.2 From e49841575deb2341c8e487da11e1fbe5b3005246 Mon Sep 17 00:00:00 2001 From: pricelees Date: Fri, 27 Jun 2025 16:09:53 +0900 Subject: [PATCH 2/2] =?UTF-8?q?test:=20RecordProcessor=EC=97=90=EC=84=9C?= =?UTF-8?q?=20=EC=97=B0=EC=86=8D=EC=9D=B8=20=EA=B2=BD=EC=9A=B0=EC=97=90?= =?UTF-8?q?=EB=A7=8C=20=EB=8F=99=EC=9E=91=ED=95=98=EB=8A=94=EC=A7=80=20?= =?UTF-8?q?=EA=B2=80=EC=A6=9D.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 검증을 위해 ConsumerStats 에서 package-private Getter 추가 --- .../kafka/consumer/ConsumerStats.java | 7 ++ .../kafka/consumer/RecordProcessorTest.java | 90 +++++++++++++++++++ 2 files changed, 97 insertions(+) create mode 100644 src/test/java/com/sangdol/consumer/infrastructure/kafka/consumer/RecordProcessorTest.java diff --git a/src/main/java/com/sangdol/consumer/infrastructure/kafka/consumer/ConsumerStats.java b/src/main/java/com/sangdol/consumer/infrastructure/kafka/consumer/ConsumerStats.java index ac86c11..51365e5 100644 --- a/src/main/java/com/sangdol/consumer/infrastructure/kafka/consumer/ConsumerStats.java +++ b/src/main/java/com/sangdol/consumer/infrastructure/kafka/consumer/ConsumerStats.java @@ -4,6 +4,8 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; +import com.google.common.annotations.VisibleForTesting; + public class ConsumerStats { private final int workerThreads; @@ -27,4 +29,9 @@ public class ConsumerStats { .collect(Collectors.averagingLong(Long::longValue)) .longValue(); } + + @VisibleForTesting + Map getThreadElapsedTime() { + return threadElapsedTime; + } } diff --git a/src/test/java/com/sangdol/consumer/infrastructure/kafka/consumer/RecordProcessorTest.java b/src/test/java/com/sangdol/consumer/infrastructure/kafka/consumer/RecordProcessorTest.java new file mode 100644 index 0000000..e027488 --- /dev/null +++ b/src/test/java/com/sangdol/consumer/infrastructure/kafka/consumer/RecordProcessorTest.java @@ -0,0 +1,90 @@ +package com.sangdol.consumer.infrastructure.kafka.consumer; + +import static org.assertj.core.api.Assertions.*; +import static org.mockito.Mockito.*; + +import java.time.Instant; +import java.util.List; +import java.util.Map; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.TopicPartition; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +import com.sangdol.consumer.domain.TestRecord; +import com.sangdol.consumer.infrastructure.kafka.producer.DLTProducer; + +class RecordProcessorTest { + + private static final ConsumerRecords RECORDS = new ConsumerRecords<>(Map.of( + new TopicPartition("test", 1), + List.of(new ConsumerRecord<>( + "test", + 1, + 2, + "key", + new TestRecord(123, "test", Instant.now().toString()) + )) + )); + + private static final ConsumerRecords EMPTY = ConsumerRecords.empty(); + + @DisplayName("5회 연속으로 빈 레코드를 받으면 poll 작업의 완료로 판단한다.") + @Test + void receiveEmptyRecordsMoreThanOrEqualThreshold() { + // given + ConsumerStats consumerStats = new ConsumerStats(3); + RecordProcessor recordProcessor = new RecordProcessor(mock(DLTProducer.class), consumerStats); + + // 시간 초기화를 위해 레코드를 담아 1회 전송 + test(recordProcessor, RECORDS); + + // when + for (int i = 0; i < 5; i++) { + test(recordProcessor, EMPTY); + } + + Map result = consumerStats.getThreadElapsedTime(); + + assertThat(result).isNotEmpty(); + assertThat(result.containsKey(Thread.currentThread().getName())).isTrue(); + } + + @DisplayName("5회 연속으로 빈 레코드를 받지 않으면 poll 작업의 완료로 판단하지 않는다.") + @Test + void receiveEmptyRecordsLessThanThreshold() { + // given + ConsumerStats consumerStats = new ConsumerStats(3); + RecordProcessor recordProcessor = new RecordProcessor(mock(DLTProducer.class), consumerStats); + + // 시간 초기화를 위해 레코드를 담아 1회 전송 + test(recordProcessor, RECORDS); + + // when + // 3회 빈 레코드 -> 이후 비어있지 않은 레코드 -> 빈 레코드 + for (int i = 0; i < 3; i++) { + test(recordProcessor, EMPTY); + } + test(recordProcessor, RECORDS); + test(recordProcessor, EMPTY); + + // then + Map result = consumerStats.getThreadElapsedTime(); + + assertThat(result).isEmpty(); + } + + private void test(RecordProcessor recordProcessor, ConsumerRecords records) { + if (records.isEmpty()) { + recordProcessor.processIfEmptyRecords(); + return; + } + try { + recordProcessor.process(records); + } catch (Exception e) { + throw new RuntimeException(e); + } + } +} \ No newline at end of file -- 2.47.2