feat: 컨슈머 설정 클래스 구현

- 컨슈머 관련 설정 + 메트릭 측정을 위한 MeterBinder 빈
- KafkaConsumer는 Thread-Safe하지 않기에 빈 등록 X
This commit is contained in:
이상진 2025-06-27 14:37:05 +09:00
parent c891e26b8a
commit 54328767b9

View File

@ -0,0 +1,88 @@
package com.sangdol.consumer.infrastructure.kafka.consumer;
import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.sangdol.consumer.domain.TestRecord;
import com.sangdol.consumer.infrastructure.kafka.common.KafkaProperties;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.binder.MeterBinder;
import lombok.RequiredArgsConstructor;
@Configuration
@RequiredArgsConstructor
@EnableConfigurationProperties(value = KafkaProperties.class)
public class ConsumerConfiguration {
private final KafkaProperties kafkaProperties;
public KafkaConsumer<String, TestRecord> kafkaConsumer() {
KafkaConsumer<String, TestRecord> consumer = new KafkaConsumer<>(consumerProperties());
consumer.subscribe(kafkaProperties.getConsumer().topics(), new RebalanceListener(consumer));
return consumer;
}
@Bean
public MeterBinder elapsedTime(ConsumerStats consumerStats) {
String consumerProtocol = kafkaProperties.getConsumer().protocol();
return registry -> Gauge.builder("elapsed.time", consumerStats, ConsumerStats::averageTime)
.tag("protocol", consumerProtocol)
.register(registry);
}
@Bean
public ConsumerStats consumerStats() {
Integer workerThreads = kafkaProperties.getConsumer().workerThreads();
return new ConsumerStats(workerThreads);
}
@Bean
public ConsumerPool consumerPool() {
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat("consumer-pool-%d")
.build();
int workerThreads = kafkaProperties.getConsumer().workerThreads();
return new ConsumerPool(
Executors.newFixedThreadPool(workerThreads, threadFactory),
workerThreads
);
}
private Properties consumerProperties() {
KafkaProperties.Consumer consumer = kafkaProperties.getConsumer();
KafkaProperties.BootStrapServers bootStrapServers = kafkaProperties.getBootStrapServers();
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootStrapServers.getAddressByProtocol(consumer.protocol()));
properties.put(ConsumerConfig.GROUP_ID_CONFIG, consumer.groupId());
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, consumer.keyDeserializer());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, consumer.valueDeserializer());
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, consumer.autoOffsetReset());
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, consumer.enableAutoCommit());
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, consumer.maxPollRecords());
applyRemainPropsBySslEnabled(consumer.isSslEnabled(), properties);
return properties;
}
private void applyRemainPropsBySslEnabled(boolean isSslEnabled, Properties properties) {
if (isSslEnabled) {
properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SSL.name);
properties.putAll(kafkaProperties.getSsl().toPropsMap());
}
}
}