From c891e26b8a3b3894c6fe49c5db015a4fdb0a3ef7 Mon Sep 17 00:00:00 2001 From: pricelees Date: Fri, 27 Jun 2025 14:36:10 +0900 Subject: [PATCH] =?UTF-8?q?feat:=20=EC=BB=A8=EC=8A=88=EB=A8=B8=20=EA=B8=B0?= =?UTF-8?q?=EB=8A=A5=20=EA=B5=AC=ED=98=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 3개의 워커스레드 생성 및 실행 - 스프링의 PreDestroy를 이용한 shutdown 로직 구현 --- .../kafka/consumer/MultiWorkerConsumer.java | 76 +++++++++++++++++++ 1 file changed, 76 insertions(+) create mode 100644 src/main/java/com/sangdol/consumer/infrastructure/kafka/consumer/MultiWorkerConsumer.java diff --git a/src/main/java/com/sangdol/consumer/infrastructure/kafka/consumer/MultiWorkerConsumer.java b/src/main/java/com/sangdol/consumer/infrastructure/kafka/consumer/MultiWorkerConsumer.java new file mode 100644 index 0000000..d0e432a --- /dev/null +++ b/src/main/java/com/sangdol/consumer/infrastructure/kafka/consumer/MultiWorkerConsumer.java @@ -0,0 +1,76 @@ +package com.sangdol.consumer.infrastructure.kafka.consumer; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; + +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.springframework.boot.context.event.ApplicationReadyEvent; +import org.springframework.context.event.EventListener; +import org.springframework.stereotype.Component; + +import com.sangdol.consumer.domain.TestRecord; +import com.sangdol.consumer.infrastructure.kafka.producer.DLTProducer; + +import jakarta.annotation.PreDestroy; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Component +public class MultiWorkerConsumer { + + private final ConsumerConfiguration consumerConfiguration; + private final DLTProducer dltProducer; + private final ConsumerStats consumerStats; + private final ExecutorService executorService; + private final Integer workerThreads; + private final List workers; + + public MultiWorkerConsumer( + ConsumerConfiguration consumerConfiguration, + DLTProducer dltProducer, + ConsumerStats consumerStats, + ConsumerPool consumerPool + ) { + this.consumerConfiguration = consumerConfiguration; + this.dltProducer = dltProducer; + this.consumerStats = consumerStats; + this.executorService = consumerPool.executorService(); + this.workerThreads = consumerPool.poolSize(); + this.workers = new ArrayList<>(); + } + + @EventListener(ApplicationReadyEvent.class) + public void initialize() { + log.info("[MultiWorkerConsumer] initialize"); + + for (int i = 0; i < workerThreads; i++) { + KafkaConsumer consumer = consumerConfiguration.kafkaConsumer(); + RecordProcessor recordProcessor = new RecordProcessor(dltProducer, consumerStats); + ConsumerWorker consumerWorker = new ConsumerWorker(consumer, recordProcessor); + workers.add(consumerWorker); + executorService.execute(consumerWorker); + } + } + + @PreDestroy + public void shutdown() { + log.info("[MultiWorkerConsumer] Shutting down consumers.."); + workers.forEach(ConsumerWorker::shutdown); + executorService.shutdown(); + + try { + if (!executorService.awaitTermination(30, TimeUnit.SECONDS)) { + log.warn("[MultiWorkerConsumer] Executor service did not terminated during 30 seconds. shutdown now."); + executorService.shutdownNow(); + } + } catch (InterruptedException e) { + log.warn("[MultiWorkerConsumer] Interrupted while waiting for executors to terminate. shutdown now."); + executorService.shutdownNow(); + Thread.currentThread().interrupt(); + } + + log.info("[MultiWorkerConsumer] Shutdown complete."); + } +}