feat: Dead Letter Topic 프로듀서 설정 클래스

- KafkaProducer는 Thread-Safe이기에 Bean으로 등록하여 사용함.
This commit is contained in:
이상진 2025-06-27 14:09:57 +09:00
parent c6b21da23e
commit fcc5fcffe6

View File

@ -0,0 +1,39 @@
package com.sangdol.consumer.infrastructure.kafka.producer;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.retry.annotation.EnableRetry;
import com.sangdol.consumer.infrastructure.kafka.common.KafkaProperties;
import lombok.RequiredArgsConstructor;
@EnableRetry
@Configuration
@RequiredArgsConstructor
@EnableConfigurationProperties(value = KafkaProperties.class)
public class ProducerConfiguration {
@Bean(destroyMethod = "close")
public KafkaProducer<String, String> dltProducer(KafkaProperties kafkaProperties) {
return new KafkaProducer<>(producerProperties(kafkaProperties));
}
private Properties producerProperties(KafkaProperties properties) {
Properties props = new Properties();
KafkaProperties.Producer producer = properties.getProducer();
KafkaProperties.BootStrapServers bootStrapServers = properties.getBootStrapServers();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers.getAddressByProtocol(producer.protocol()));
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, producer.keySerializer());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, producer.valueSerializer());
props.put(ProducerConfig.ACKS_CONFIG, producer.acks());
props.put(ProducerConfig.LINGER_MS_CONFIG, producer.lingerMs());
return props;
}
}