From 69bd95b981f41b9718207f0c1d1df5c52c9c13d4 Mon Sep 17 00:00:00 2001 From: pricelees Date: Fri, 27 Jun 2025 14:32:11 +0900 Subject: [PATCH] =?UTF-8?q?feat:=20=EC=BB=A8=EC=8A=88=EB=A8=B8=20=EB=A6=AC?= =?UTF-8?q?=EB=B0=B8=EB=9F=B0=EC=8A=A4=20=EB=B0=9C=EC=83=9D=EC=8B=9C=20?= =?UTF-8?q?=EC=B2=98=EB=A6=AC=20=EA=B8=B0=EB=8A=A5=20=EA=B5=AC=ED=98=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 일단 로깅 및 Manual Commit만 진행 --- .../kafka/consumer/RebalanceListener.java | 35 +++++++++++++++++++ 1 file changed, 35 insertions(+) create mode 100644 src/main/java/com/sangdol/consumer/infrastructure/kafka/consumer/RebalanceListener.java diff --git a/src/main/java/com/sangdol/consumer/infrastructure/kafka/consumer/RebalanceListener.java b/src/main/java/com/sangdol/consumer/infrastructure/kafka/consumer/RebalanceListener.java new file mode 100644 index 0000000..420580f --- /dev/null +++ b/src/main/java/com/sangdol/consumer/infrastructure/kafka/consumer/RebalanceListener.java @@ -0,0 +1,35 @@ +package com.sangdol.consumer.infrastructure.kafka.consumer; + +import java.util.Collection; + +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; + +import com.sangdol.consumer.domain.TestRecord; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@RequiredArgsConstructor +public class RebalanceListener implements ConsumerRebalanceListener { + + private final KafkaConsumer consumer; + + @Override + public void onPartitionsRevoked(Collection partitions) { + log.warn("[RebalanceListener] Partitions revoked for partitions {} ", partitions); + try { + consumer.commitSync(); + log.info("[RebalanceListener] Offsets committed before partition revoked. partition: {}", partitions); + } catch (Exception e) { + log.error("[RebalanceListener] Failed to commit before partition revoked", e); + } + } + + @Override + public void onPartitionsAssigned(Collection partitions) { + log.info("[RebalanceListener] Partitions assigned for partitions {} ", partitions); + } +}