From 54328767b9b71ea3f2cc7c2e0087d940e2665221 Mon Sep 17 00:00:00 2001 From: pricelees Date: Fri, 27 Jun 2025 14:37:05 +0900 Subject: [PATCH] =?UTF-8?q?feat:=20=EC=BB=A8=EC=8A=88=EB=A8=B8=20=EC=84=A4?= =?UTF-8?q?=EC=A0=95=20=ED=81=B4=EB=9E=98=EC=8A=A4=20=EA=B5=AC=ED=98=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 컨슈머 관련 설정 + 메트릭 측정을 위한 MeterBinder 빈 - KafkaConsumer는 Thread-Safe하지 않기에 빈 등록 X --- .../kafka/consumer/ConsumerConfiguration.java | 88 +++++++++++++++++++ 1 file changed, 88 insertions(+) create mode 100644 src/main/java/com/sangdol/consumer/infrastructure/kafka/consumer/ConsumerConfiguration.java diff --git a/src/main/java/com/sangdol/consumer/infrastructure/kafka/consumer/ConsumerConfiguration.java b/src/main/java/com/sangdol/consumer/infrastructure/kafka/consumer/ConsumerConfiguration.java new file mode 100644 index 0000000..e47c047 --- /dev/null +++ b/src/main/java/com/sangdol/consumer/infrastructure/kafka/consumer/ConsumerConfiguration.java @@ -0,0 +1,88 @@ +package com.sangdol.consumer.infrastructure.kafka.consumer; + +import java.util.Properties; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.sangdol.consumer.domain.TestRecord; +import com.sangdol.consumer.infrastructure.kafka.common.KafkaProperties; + +import io.micrometer.core.instrument.Gauge; +import io.micrometer.core.instrument.binder.MeterBinder; +import lombok.RequiredArgsConstructor; + +@Configuration +@RequiredArgsConstructor +@EnableConfigurationProperties(value = KafkaProperties.class) +public class ConsumerConfiguration { + + private final KafkaProperties kafkaProperties; + + public KafkaConsumer kafkaConsumer() { + KafkaConsumer consumer = new KafkaConsumer<>(consumerProperties()); + consumer.subscribe(kafkaProperties.getConsumer().topics(), new RebalanceListener(consumer)); + + return consumer; + } + + @Bean + public MeterBinder elapsedTime(ConsumerStats consumerStats) { + String consumerProtocol = kafkaProperties.getConsumer().protocol(); + return registry -> Gauge.builder("elapsed.time", consumerStats, ConsumerStats::averageTime) + .tag("protocol", consumerProtocol) + .register(registry); + } + + @Bean + public ConsumerStats consumerStats() { + Integer workerThreads = kafkaProperties.getConsumer().workerThreads(); + return new ConsumerStats(workerThreads); + } + + @Bean + public ConsumerPool consumerPool() { + ThreadFactory threadFactory = new ThreadFactoryBuilder() + .setNameFormat("consumer-pool-%d") + .build(); + int workerThreads = kafkaProperties.getConsumer().workerThreads(); + + return new ConsumerPool( + Executors.newFixedThreadPool(workerThreads, threadFactory), + workerThreads + ); + } + + private Properties consumerProperties() { + KafkaProperties.Consumer consumer = kafkaProperties.getConsumer(); + KafkaProperties.BootStrapServers bootStrapServers = kafkaProperties.getBootStrapServers(); + Properties properties = new Properties(); + properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, + bootStrapServers.getAddressByProtocol(consumer.protocol())); + properties.put(ConsumerConfig.GROUP_ID_CONFIG, consumer.groupId()); + properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, consumer.keyDeserializer()); + properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, consumer.valueDeserializer()); + properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, consumer.autoOffsetReset()); + properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, consumer.enableAutoCommit()); + properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, consumer.maxPollRecords()); + + applyRemainPropsBySslEnabled(consumer.isSslEnabled(), properties); + + return properties; + } + + private void applyRemainPropsBySslEnabled(boolean isSslEnabled, Properties properties) { + if (isSslEnabled) { + properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SSL.name); + properties.putAll(kafkaProperties.getSsl().toPropsMap()); + } + } +}