diff --git a/src/main/java/com/sangdol/consumer/infrastructure/kafka/consumer/ConsumerWorker.java b/src/main/java/com/sangdol/consumer/infrastructure/kafka/consumer/ConsumerWorker.java new file mode 100644 index 0000000..84b7492 --- /dev/null +++ b/src/main/java/com/sangdol/consumer/infrastructure/kafka/consumer/ConsumerWorker.java @@ -0,0 +1,87 @@ +package com.sangdol.consumer.infrastructure.kafka.consumer; + +import java.time.Duration; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.WakeupException; + +import com.sangdol.consumer.domain.TestRecord; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@RequiredArgsConstructor +public class ConsumerWorker implements Runnable { + + private final KafkaConsumer consumer; + private final RecordProcessor recordProcessor; + private final AtomicBoolean running = new AtomicBoolean(true); + + @Override + public void run() { + try { + log.info("[ConsumerWorker] started. thread: {}", Thread.currentThread().getName()); + while (running.get()) { + ConsumerRecords records = consumer.poll(Duration.ofMillis(1000)); + + if (records.isEmpty()) { + recordProcessor.processIfEmptyRecords(); + continue; + } + + log.debug("[ConsumerWorker] received {} records. thread: {}", records.count(), Thread.currentThread().getName()); + recordProcessor.process(records); + commitAsync(records); + } + } catch (WakeupException e) { + log.info("[ConsumerWorker] Wakeup Exception. thread: {}", Thread.currentThread().getName()); + } catch (Exception e) { + log.info("[ConsumerWorker] Unexpected Exception. thread: {}", Thread.currentThread().getName()); + } finally { + try { + consumer.commitSync(); + log.info("[ConsumerWorker] Final offset committed. Close consumer.. "); + } catch (Exception e) { + log.error("[ConsumerWorker] Failed to commit final offsets when shutdown. thread: {}", + Thread.currentThread().getName()); + } finally { + consumer.close(); + log.info("[ConsumerWorker] Closed consumer thread. thread: {}", Thread.currentThread().getName()); + } + } + } + + private void commitAsync(ConsumerRecords records) { + Map offsets = records.partitions().stream() + .collect(Collectors.toMap(tp -> tp, tp -> { + List> partitionRecords = records.records(tp); + long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); + + return new OffsetAndMetadata(lastOffset + 1); + })); + + consumer.commitAsync(offsets, (o, exception) -> { + if (exception != null) { + log.error("[ConsumerWorker] failed to commit offsets for offset: {}, partition {}", o, + records.partitions(), exception); + } else { + log.debug("[ConsumerWorker] committed offsets for offset: {}, partition {}", o, + records.partitions()); + } + }); + } + + public void shutdown() { + running.set(false); + consumer.wakeup(); + } +}