diff --git a/src/main/java/com/sangdol/consumer/infrastructure/kafka/consumer/RecordProcessor.java b/src/main/java/com/sangdol/consumer/infrastructure/kafka/consumer/RecordProcessor.java new file mode 100644 index 0000000..bb4562a --- /dev/null +++ b/src/main/java/com/sangdol/consumer/infrastructure/kafka/consumer/RecordProcessor.java @@ -0,0 +1,58 @@ +package com.sangdol.consumer.infrastructure.kafka.consumer; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; + +import com.sangdol.consumer.domain.TestRecord; +import com.sangdol.consumer.infrastructure.kafka.producer.DLTProducer; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@RequiredArgsConstructor +public class RecordProcessor { + + private static final int POLL_COMPLETION_THRESHOLD = 5; + private final DLTProducer dltProducer; + private final ConsumerStats consumerStats; + private final AtomicInteger emptyPollResultCount = new AtomicInteger(0); + private long initialRecordReceivedTime = 0; + + public void process(ConsumerRecords records) throws Exception { + initializeTime(); + + log.debug("[RecordProcessor] process {} records. thread: {}", records.count(), + Thread.currentThread().getName()); + for (ConsumerRecord record : records) { + try { + // 별도의 로직은 없음. 필요시 추가 + } catch (Exception e) { + log.error("[ConsumerWorker] failed to consume record, sending to DLT. offset: {}", + record.offset(), e); + dltProducer.sendToDeadLetterTopic(record.key(), record.value(), e); + } + } + } + + public void processIfEmptyRecords() { + if (initialRecordReceivedTime != 0) { + emptyPollResultCount.incrementAndGet(); + } + + if (emptyPollResultCount.get() == POLL_COMPLETION_THRESHOLD) { + long elapsedTime = System.currentTimeMillis() - initialRecordReceivedTime; + consumerStats.update(elapsedTime); + initialRecordReceivedTime = 0; + emptyPollResultCount.set(0); + } + } + + private void initializeTime() { + if (initialRecordReceivedTime == 0) { + initialRecordReceivedTime = System.currentTimeMillis(); + } + } +}