feat: 컨슈머 레코드 처리 기능 구현

- 메트릭 측정(ref: ce20f85c)을 위한 로직
- 레코드 자체를 처리하는 비즈니스 로직은 없음.
This commit is contained in:
이상진 2025-06-27 14:34:13 +09:00
parent 69bd95b981
commit e4b86e6f21

View File

@ -0,0 +1,58 @@
package com.sangdol.consumer.infrastructure.kafka.consumer;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import com.sangdol.consumer.domain.TestRecord;
import com.sangdol.consumer.infrastructure.kafka.producer.DLTProducer;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@RequiredArgsConstructor
public class RecordProcessor {
private static final int POLL_COMPLETION_THRESHOLD = 5;
private final DLTProducer dltProducer;
private final ConsumerStats consumerStats;
private final AtomicInteger emptyPollResultCount = new AtomicInteger(0);
private long initialRecordReceivedTime = 0;
public void process(ConsumerRecords<String, TestRecord> records) throws Exception {
initializeTime();
log.debug("[RecordProcessor] process {} records. thread: {}", records.count(),
Thread.currentThread().getName());
for (ConsumerRecord<String, TestRecord> record : records) {
try {
// 별도의 로직은 없음. 필요시 추가
} catch (Exception e) {
log.error("[ConsumerWorker] failed to consume record, sending to DLT. offset: {}",
record.offset(), e);
dltProducer.sendToDeadLetterTopic(record.key(), record.value(), e);
}
}
}
public void processIfEmptyRecords() {
if (initialRecordReceivedTime != 0) {
emptyPollResultCount.incrementAndGet();
}
if (emptyPollResultCount.get() == POLL_COMPLETION_THRESHOLD) {
long elapsedTime = System.currentTimeMillis() - initialRecordReceivedTime;
consumerStats.update(elapsedTime);
initialRecordReceivedTime = 0;
emptyPollResultCount.set(0);
}
}
private void initializeTime() {
if (initialRecordReceivedTime == 0) {
initialRecordReceivedTime = System.currentTimeMillis();
}
}
}