From 77ee1dff8c61e65f33fb083038d3a1eafba51c29 Mon Sep 17 00:00:00 2001 From: pricelees Date: Fri, 27 Jun 2025 15:07:15 +0900 Subject: [PATCH] =?UTF-8?q?refactor:=20ConsumerWorker=EC=97=90=EC=84=9C?= =?UTF-8?q?=EC=9D=98=20=EB=A9=94=EC=84=9C=EB=93=9C=20=EB=B0=8F=20=EB=B3=80?= =?UTF-8?q?=EC=88=98=20=EB=B6=84=EB=A6=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - finally 구문에서 실행되는 종료 로직을 closeAfterManualCommit() 으로 분리 - consumer.commitAsync에 들어가는 OffsetCommitCallback을 변수로 분리 --- .../kafka/consumer/ConsumerWorker.java | 37 +++++++++++-------- 1 file changed, 22 insertions(+), 15 deletions(-) diff --git a/src/main/java/com/sangdol/consumer/infrastructure/kafka/consumer/ConsumerWorker.java b/src/main/java/com/sangdol/consumer/infrastructure/kafka/consumer/ConsumerWorker.java index 84b7492..13f7472 100644 --- a/src/main/java/com/sangdol/consumer/infrastructure/kafka/consumer/ConsumerWorker.java +++ b/src/main/java/com/sangdol/consumer/infrastructure/kafka/consumer/ConsumerWorker.java @@ -10,6 +10,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.consumer.OffsetCommitCallback; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.WakeupException; @@ -47,19 +48,15 @@ public class ConsumerWorker implements Runnable { } catch (Exception e) { log.info("[ConsumerWorker] Unexpected Exception. thread: {}", Thread.currentThread().getName()); } finally { - try { - consumer.commitSync(); - log.info("[ConsumerWorker] Final offset committed. Close consumer.. "); - } catch (Exception e) { - log.error("[ConsumerWorker] Failed to commit final offsets when shutdown. thread: {}", - Thread.currentThread().getName()); - } finally { - consumer.close(); - log.info("[ConsumerWorker] Closed consumer thread. thread: {}", Thread.currentThread().getName()); - } + closeAfterManualCommit(); } } + public void shutdown() { + running.set(false); + consumer.wakeup(); + } + private void commitAsync(ConsumerRecords records) { Map offsets = records.partitions().stream() .collect(Collectors.toMap(tp -> tp, tp -> { @@ -69,7 +66,7 @@ public class ConsumerWorker implements Runnable { return new OffsetAndMetadata(lastOffset + 1); })); - consumer.commitAsync(offsets, (o, exception) -> { + OffsetCommitCallback callback = (o, exception) -> { if (exception != null) { log.error("[ConsumerWorker] failed to commit offsets for offset: {}, partition {}", o, records.partitions(), exception); @@ -77,11 +74,21 @@ public class ConsumerWorker implements Runnable { log.debug("[ConsumerWorker] committed offsets for offset: {}, partition {}", o, records.partitions()); } - }); + }; + + consumer.commitAsync(offsets, callback); } - public void shutdown() { - running.set(false); - consumer.wakeup(); + private void closeAfterManualCommit() { + try { + consumer.commitSync(); + log.info("[ConsumerWorker] Final offset committed. Close consumer.. "); + } catch (Exception e) { + log.error("[ConsumerWorker] Failed to commit final offsets when shutdown. thread: {}", + Thread.currentThread().getName()); + } finally { + consumer.close(); + log.info("[ConsumerWorker] Closed consumer thread. thread: {}", Thread.currentThread().getName()); + } } }