[#5] 컨슈머 코드 마이그레이션 #6
@ -0,0 +1,35 @@
|
||||
package com.sangdol.consumer.infrastructure.kafka.consumer;
|
||||
|
||||
import java.util.Collection;
|
||||
|
||||
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
|
||||
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
|
||||
import com.sangdol.consumer.domain.TestRecord;
|
||||
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
@Slf4j
|
||||
@RequiredArgsConstructor
|
||||
public class RebalanceListener implements ConsumerRebalanceListener {
|
||||
|
||||
private final KafkaConsumer<String, TestRecord> consumer;
|
||||
|
||||
@Override
|
||||
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
|
||||
log.warn("[RebalanceListener] Partitions revoked for partitions {} ", partitions);
|
||||
try {
|
||||
consumer.commitSync();
|
||||
log.info("[RebalanceListener] Offsets committed before partition revoked. partition: {}", partitions);
|
||||
} catch (Exception e) {
|
||||
log.error("[RebalanceListener] Failed to commit before partition revoked", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
|
||||
log.info("[RebalanceListener] Partitions assigned for partitions {} ", partitions);
|
||||
}
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user