From e4b86e6f21cb7bdd5bca35e19a15d2b987a8bccf Mon Sep 17 00:00:00 2001 From: pricelees Date: Fri, 27 Jun 2025 14:34:13 +0900 Subject: [PATCH] =?UTF-8?q?feat:=20=EC=BB=A8=EC=8A=88=EB=A8=B8=20=EB=A0=88?= =?UTF-8?q?=EC=BD=94=EB=93=9C=20=EC=B2=98=EB=A6=AC=20=EA=B8=B0=EB=8A=A5=20?= =?UTF-8?q?=EA=B5=AC=ED=98=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 메트릭 측정(ref: ce20f85c)을 위한 로직 - 레코드 자체를 처리하는 비즈니스 로직은 없음. --- .../kafka/consumer/RecordProcessor.java | 58 +++++++++++++++++++ 1 file changed, 58 insertions(+) create mode 100644 src/main/java/com/sangdol/consumer/infrastructure/kafka/consumer/RecordProcessor.java diff --git a/src/main/java/com/sangdol/consumer/infrastructure/kafka/consumer/RecordProcessor.java b/src/main/java/com/sangdol/consumer/infrastructure/kafka/consumer/RecordProcessor.java new file mode 100644 index 0000000..bb4562a --- /dev/null +++ b/src/main/java/com/sangdol/consumer/infrastructure/kafka/consumer/RecordProcessor.java @@ -0,0 +1,58 @@ +package com.sangdol.consumer.infrastructure.kafka.consumer; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; + +import com.sangdol.consumer.domain.TestRecord; +import com.sangdol.consumer.infrastructure.kafka.producer.DLTProducer; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@RequiredArgsConstructor +public class RecordProcessor { + + private static final int POLL_COMPLETION_THRESHOLD = 5; + private final DLTProducer dltProducer; + private final ConsumerStats consumerStats; + private final AtomicInteger emptyPollResultCount = new AtomicInteger(0); + private long initialRecordReceivedTime = 0; + + public void process(ConsumerRecords records) throws Exception { + initializeTime(); + + log.debug("[RecordProcessor] process {} records. thread: {}", records.count(), + Thread.currentThread().getName()); + for (ConsumerRecord record : records) { + try { + // 별도의 로직은 없음. 필요시 추가 + } catch (Exception e) { + log.error("[ConsumerWorker] failed to consume record, sending to DLT. offset: {}", + record.offset(), e); + dltProducer.sendToDeadLetterTopic(record.key(), record.value(), e); + } + } + } + + public void processIfEmptyRecords() { + if (initialRecordReceivedTime != 0) { + emptyPollResultCount.incrementAndGet(); + } + + if (emptyPollResultCount.get() == POLL_COMPLETION_THRESHOLD) { + long elapsedTime = System.currentTimeMillis() - initialRecordReceivedTime; + consumerStats.update(elapsedTime); + initialRecordReceivedTime = 0; + emptyPollResultCount.set(0); + } + } + + private void initializeTime() { + if (initialRecordReceivedTime == 0) { + initialRecordReceivedTime = System.currentTimeMillis(); + } + } +}