diff --git a/src/main/java/com/sangdol/consumer/infrastructure/kafka/consumer/RebalanceListener.java b/src/main/java/com/sangdol/consumer/infrastructure/kafka/consumer/RebalanceListener.java new file mode 100644 index 0000000..420580f --- /dev/null +++ b/src/main/java/com/sangdol/consumer/infrastructure/kafka/consumer/RebalanceListener.java @@ -0,0 +1,35 @@ +package com.sangdol.consumer.infrastructure.kafka.consumer; + +import java.util.Collection; + +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; + +import com.sangdol.consumer.domain.TestRecord; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@RequiredArgsConstructor +public class RebalanceListener implements ConsumerRebalanceListener { + + private final KafkaConsumer consumer; + + @Override + public void onPartitionsRevoked(Collection partitions) { + log.warn("[RebalanceListener] Partitions revoked for partitions {} ", partitions); + try { + consumer.commitSync(); + log.info("[RebalanceListener] Offsets committed before partition revoked. partition: {}", partitions); + } catch (Exception e) { + log.error("[RebalanceListener] Failed to commit before partition revoked", e); + } + } + + @Override + public void onPartitionsAssigned(Collection partitions) { + log.info("[RebalanceListener] Partitions assigned for partitions {} ", partitions); + } +}