From bad7b5725ec886062bf4bd521f2983faed0cc6bc Mon Sep 17 00:00:00 2001 From: pricelees Date: Fri, 27 Jun 2025 14:35:10 +0900 Subject: [PATCH] =?UTF-8?q?feat:=20=EC=BB=A8=EC=8A=88=EB=A8=B8=20=EC=9B=8C?= =?UTF-8?q?=EC=BB=A4=20=EC=8A=A4=EB=A0=88=EB=93=9C=20=EA=B8=B0=EB=8A=A5=20?= =?UTF-8?q?=EA=B5=AC=ED=98=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 오프셋 비동기 커밋 & 예외 처리 & shutdown 기능 구현 --- .../kafka/consumer/ConsumerWorker.java | 87 +++++++++++++++++++ 1 file changed, 87 insertions(+) create mode 100644 src/main/java/com/sangdol/consumer/infrastructure/kafka/consumer/ConsumerWorker.java diff --git a/src/main/java/com/sangdol/consumer/infrastructure/kafka/consumer/ConsumerWorker.java b/src/main/java/com/sangdol/consumer/infrastructure/kafka/consumer/ConsumerWorker.java new file mode 100644 index 0000000..84b7492 --- /dev/null +++ b/src/main/java/com/sangdol/consumer/infrastructure/kafka/consumer/ConsumerWorker.java @@ -0,0 +1,87 @@ +package com.sangdol.consumer.infrastructure.kafka.consumer; + +import java.time.Duration; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.WakeupException; + +import com.sangdol.consumer.domain.TestRecord; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@RequiredArgsConstructor +public class ConsumerWorker implements Runnable { + + private final KafkaConsumer consumer; + private final RecordProcessor recordProcessor; + private final AtomicBoolean running = new AtomicBoolean(true); + + @Override + public void run() { + try { + log.info("[ConsumerWorker] started. thread: {}", Thread.currentThread().getName()); + while (running.get()) { + ConsumerRecords records = consumer.poll(Duration.ofMillis(1000)); + + if (records.isEmpty()) { + recordProcessor.processIfEmptyRecords(); + continue; + } + + log.debug("[ConsumerWorker] received {} records. thread: {}", records.count(), Thread.currentThread().getName()); + recordProcessor.process(records); + commitAsync(records); + } + } catch (WakeupException e) { + log.info("[ConsumerWorker] Wakeup Exception. thread: {}", Thread.currentThread().getName()); + } catch (Exception e) { + log.info("[ConsumerWorker] Unexpected Exception. thread: {}", Thread.currentThread().getName()); + } finally { + try { + consumer.commitSync(); + log.info("[ConsumerWorker] Final offset committed. Close consumer.. "); + } catch (Exception e) { + log.error("[ConsumerWorker] Failed to commit final offsets when shutdown. thread: {}", + Thread.currentThread().getName()); + } finally { + consumer.close(); + log.info("[ConsumerWorker] Closed consumer thread. thread: {}", Thread.currentThread().getName()); + } + } + } + + private void commitAsync(ConsumerRecords records) { + Map offsets = records.partitions().stream() + .collect(Collectors.toMap(tp -> tp, tp -> { + List> partitionRecords = records.records(tp); + long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); + + return new OffsetAndMetadata(lastOffset + 1); + })); + + consumer.commitAsync(offsets, (o, exception) -> { + if (exception != null) { + log.error("[ConsumerWorker] failed to commit offsets for offset: {}, partition {}", o, + records.partitions(), exception); + } else { + log.debug("[ConsumerWorker] committed offsets for offset: {}, partition {}", o, + records.partitions()); + } + }); + } + + public void shutdown() { + running.set(false); + consumer.wakeup(); + } +}