diff --git a/src/main/java/com/sangdol/consumer/infrastructure/kafka/producer/ProducerConfiguration.java b/src/main/java/com/sangdol/consumer/infrastructure/kafka/producer/ProducerConfiguration.java new file mode 100644 index 0000000..7c6b3ac --- /dev/null +++ b/src/main/java/com/sangdol/consumer/infrastructure/kafka/producer/ProducerConfiguration.java @@ -0,0 +1,39 @@ +package com.sangdol.consumer.infrastructure.kafka.producer; + +import java.util.Properties; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.retry.annotation.EnableRetry; + +import com.sangdol.consumer.infrastructure.kafka.common.KafkaProperties; + +import lombok.RequiredArgsConstructor; + +@EnableRetry +@Configuration +@RequiredArgsConstructor +@EnableConfigurationProperties(value = KafkaProperties.class) +public class ProducerConfiguration { + + @Bean(destroyMethod = "close") + public KafkaProducer dltProducer(KafkaProperties kafkaProperties) { + return new KafkaProducer<>(producerProperties(kafkaProperties)); + } + + private Properties producerProperties(KafkaProperties properties) { + Properties props = new Properties(); + KafkaProperties.Producer producer = properties.getProducer(); + KafkaProperties.BootStrapServers bootStrapServers = properties.getBootStrapServers(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers.getAddressByProtocol(producer.protocol())); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, producer.keySerializer()); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, producer.valueSerializer()); + props.put(ProducerConfig.ACKS_CONFIG, producer.acks()); + props.put(ProducerConfig.LINGER_MS_CONFIG, producer.lingerMs()); + + return props; + } +}