[#5] 컨슈머 코드 마이그레이션 #6

Merged
pricelees merged 9 commits from feature/#5 into main 2025-06-27 05:43:39 +00:00
Showing only changes of commit c891e26b8a - Show all commits

View File

@ -0,0 +1,76 @@
package com.sangdol.consumer.infrastructure.kafka.consumer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
import com.sangdol.consumer.domain.TestRecord;
import com.sangdol.consumer.infrastructure.kafka.producer.DLTProducer;
import jakarta.annotation.PreDestroy;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Component
public class MultiWorkerConsumer {
private final ConsumerConfiguration consumerConfiguration;
private final DLTProducer dltProducer;
private final ConsumerStats consumerStats;
private final ExecutorService executorService;
private final Integer workerThreads;
private final List<ConsumerWorker> workers;
public MultiWorkerConsumer(
ConsumerConfiguration consumerConfiguration,
DLTProducer dltProducer,
ConsumerStats consumerStats,
ConsumerPool consumerPool
) {
this.consumerConfiguration = consumerConfiguration;
this.dltProducer = dltProducer;
this.consumerStats = consumerStats;
this.executorService = consumerPool.executorService();
this.workerThreads = consumerPool.poolSize();
this.workers = new ArrayList<>();
}
@EventListener(ApplicationReadyEvent.class)
public void initialize() {
log.info("[MultiWorkerConsumer] initialize");
for (int i = 0; i < workerThreads; i++) {
KafkaConsumer<String, TestRecord> consumer = consumerConfiguration.kafkaConsumer();
RecordProcessor recordProcessor = new RecordProcessor(dltProducer, consumerStats);
ConsumerWorker consumerWorker = new ConsumerWorker(consumer, recordProcessor);
workers.add(consumerWorker);
executorService.execute(consumerWorker);
}
}
@PreDestroy
public void shutdown() {
log.info("[MultiWorkerConsumer] Shutting down consumers..");
workers.forEach(ConsumerWorker::shutdown);
executorService.shutdown();
try {
if (!executorService.awaitTermination(30, TimeUnit.SECONDS)) {
log.warn("[MultiWorkerConsumer] Executor service did not terminated during 30 seconds. shutdown now.");
executorService.shutdownNow();
}
} catch (InterruptedException e) {
log.warn("[MultiWorkerConsumer] Interrupted while waiting for executors to terminate. shutdown now.");
executorService.shutdownNow();
Thread.currentThread().interrupt();
}
log.info("[MultiWorkerConsumer] Shutdown complete.");
}
}