From fcc5fcffe6aef708fedb9f34deb2256aecc81c02 Mon Sep 17 00:00:00 2001 From: pricelees Date: Fri, 27 Jun 2025 14:09:57 +0900 Subject: [PATCH] =?UTF-8?q?feat:=20Dead=20Letter=20Topic=20=ED=94=84?= =?UTF-8?q?=EB=A1=9C=EB=93=80=EC=84=9C=20=EC=84=A4=EC=A0=95=20=ED=81=B4?= =?UTF-8?q?=EB=9E=98=EC=8A=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - KafkaProducer는 Thread-Safe이기에 Bean으로 등록하여 사용함. --- .../kafka/producer/ProducerConfiguration.java | 39 +++++++++++++++++++ 1 file changed, 39 insertions(+) create mode 100644 src/main/java/com/sangdol/consumer/infrastructure/kafka/producer/ProducerConfiguration.java diff --git a/src/main/java/com/sangdol/consumer/infrastructure/kafka/producer/ProducerConfiguration.java b/src/main/java/com/sangdol/consumer/infrastructure/kafka/producer/ProducerConfiguration.java new file mode 100644 index 0000000..7c6b3ac --- /dev/null +++ b/src/main/java/com/sangdol/consumer/infrastructure/kafka/producer/ProducerConfiguration.java @@ -0,0 +1,39 @@ +package com.sangdol.consumer.infrastructure.kafka.producer; + +import java.util.Properties; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.retry.annotation.EnableRetry; + +import com.sangdol.consumer.infrastructure.kafka.common.KafkaProperties; + +import lombok.RequiredArgsConstructor; + +@EnableRetry +@Configuration +@RequiredArgsConstructor +@EnableConfigurationProperties(value = KafkaProperties.class) +public class ProducerConfiguration { + + @Bean(destroyMethod = "close") + public KafkaProducer dltProducer(KafkaProperties kafkaProperties) { + return new KafkaProducer<>(producerProperties(kafkaProperties)); + } + + private Properties producerProperties(KafkaProperties properties) { + Properties props = new Properties(); + KafkaProperties.Producer producer = properties.getProducer(); + KafkaProperties.BootStrapServers bootStrapServers = properties.getBootStrapServers(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers.getAddressByProtocol(producer.protocol())); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, producer.keySerializer()); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, producer.valueSerializer()); + props.put(ProducerConfig.ACKS_CONFIG, producer.acks()); + props.put(ProducerConfig.LINGER_MS_CONFIG, producer.lingerMs()); + + return props; + } +}