diff --git a/src/main/java/com/sangdol/consumer/infrastructure/kafka/consumer/ConsumerConfiguration.java b/src/main/java/com/sangdol/consumer/infrastructure/kafka/consumer/ConsumerConfiguration.java new file mode 100644 index 0000000..e47c047 --- /dev/null +++ b/src/main/java/com/sangdol/consumer/infrastructure/kafka/consumer/ConsumerConfiguration.java @@ -0,0 +1,88 @@ +package com.sangdol.consumer.infrastructure.kafka.consumer; + +import java.util.Properties; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.sangdol.consumer.domain.TestRecord; +import com.sangdol.consumer.infrastructure.kafka.common.KafkaProperties; + +import io.micrometer.core.instrument.Gauge; +import io.micrometer.core.instrument.binder.MeterBinder; +import lombok.RequiredArgsConstructor; + +@Configuration +@RequiredArgsConstructor +@EnableConfigurationProperties(value = KafkaProperties.class) +public class ConsumerConfiguration { + + private final KafkaProperties kafkaProperties; + + public KafkaConsumer kafkaConsumer() { + KafkaConsumer consumer = new KafkaConsumer<>(consumerProperties()); + consumer.subscribe(kafkaProperties.getConsumer().topics(), new RebalanceListener(consumer)); + + return consumer; + } + + @Bean + public MeterBinder elapsedTime(ConsumerStats consumerStats) { + String consumerProtocol = kafkaProperties.getConsumer().protocol(); + return registry -> Gauge.builder("elapsed.time", consumerStats, ConsumerStats::averageTime) + .tag("protocol", consumerProtocol) + .register(registry); + } + + @Bean + public ConsumerStats consumerStats() { + Integer workerThreads = kafkaProperties.getConsumer().workerThreads(); + return new ConsumerStats(workerThreads); + } + + @Bean + public ConsumerPool consumerPool() { + ThreadFactory threadFactory = new ThreadFactoryBuilder() + .setNameFormat("consumer-pool-%d") + .build(); + int workerThreads = kafkaProperties.getConsumer().workerThreads(); + + return new ConsumerPool( + Executors.newFixedThreadPool(workerThreads, threadFactory), + workerThreads + ); + } + + private Properties consumerProperties() { + KafkaProperties.Consumer consumer = kafkaProperties.getConsumer(); + KafkaProperties.BootStrapServers bootStrapServers = kafkaProperties.getBootStrapServers(); + Properties properties = new Properties(); + properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, + bootStrapServers.getAddressByProtocol(consumer.protocol())); + properties.put(ConsumerConfig.GROUP_ID_CONFIG, consumer.groupId()); + properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, consumer.keyDeserializer()); + properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, consumer.valueDeserializer()); + properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, consumer.autoOffsetReset()); + properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, consumer.enableAutoCommit()); + properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, consumer.maxPollRecords()); + + applyRemainPropsBySslEnabled(consumer.isSslEnabled(), properties); + + return properties; + } + + private void applyRemainPropsBySslEnabled(boolean isSslEnabled, Properties properties) { + if (isSslEnabled) { + properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SSL.name); + properties.putAll(kafkaProperties.getSsl().toPropsMap()); + } + } +}