diff --git a/src/main/java/com/sangdol/consumer/infrastructure/kafka/consumer/ConsumerWorker.java b/src/main/java/com/sangdol/consumer/infrastructure/kafka/consumer/ConsumerWorker.java index 84b7492..13f7472 100644 --- a/src/main/java/com/sangdol/consumer/infrastructure/kafka/consumer/ConsumerWorker.java +++ b/src/main/java/com/sangdol/consumer/infrastructure/kafka/consumer/ConsumerWorker.java @@ -10,6 +10,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.consumer.OffsetCommitCallback; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.WakeupException; @@ -47,19 +48,15 @@ public class ConsumerWorker implements Runnable { } catch (Exception e) { log.info("[ConsumerWorker] Unexpected Exception. thread: {}", Thread.currentThread().getName()); } finally { - try { - consumer.commitSync(); - log.info("[ConsumerWorker] Final offset committed. Close consumer.. "); - } catch (Exception e) { - log.error("[ConsumerWorker] Failed to commit final offsets when shutdown. thread: {}", - Thread.currentThread().getName()); - } finally { - consumer.close(); - log.info("[ConsumerWorker] Closed consumer thread. thread: {}", Thread.currentThread().getName()); - } + closeAfterManualCommit(); } } + public void shutdown() { + running.set(false); + consumer.wakeup(); + } + private void commitAsync(ConsumerRecords records) { Map offsets = records.partitions().stream() .collect(Collectors.toMap(tp -> tp, tp -> { @@ -69,7 +66,7 @@ public class ConsumerWorker implements Runnable { return new OffsetAndMetadata(lastOffset + 1); })); - consumer.commitAsync(offsets, (o, exception) -> { + OffsetCommitCallback callback = (o, exception) -> { if (exception != null) { log.error("[ConsumerWorker] failed to commit offsets for offset: {}, partition {}", o, records.partitions(), exception); @@ -77,11 +74,21 @@ public class ConsumerWorker implements Runnable { log.debug("[ConsumerWorker] committed offsets for offset: {}, partition {}", o, records.partitions()); } - }); + }; + + consumer.commitAsync(offsets, callback); } - public void shutdown() { - running.set(false); - consumer.wakeup(); + private void closeAfterManualCommit() { + try { + consumer.commitSync(); + log.info("[ConsumerWorker] Final offset committed. Close consumer.. "); + } catch (Exception e) { + log.error("[ConsumerWorker] Failed to commit final offsets when shutdown. thread: {}", + Thread.currentThread().getName()); + } finally { + consumer.close(); + log.info("[ConsumerWorker] Closed consumer thread. thread: {}", Thread.currentThread().getName()); + } } }