feat: 카프카 설정값을 담는 KafkaProperties 클래스 및 ConfigurationProperties 적용을 위한 application.yml 설정
This commit is contained in:
parent
ef8c526fec
commit
b6e6484cb0
@ -0,0 +1,111 @@
|
||||
package com.sangdol.consumer.infrastructure.kafka.common;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.kafka.common.config.SslConfigs;
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
|
||||
import lombok.Getter;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
|
||||
@Getter
|
||||
@RequiredArgsConstructor
|
||||
@ConfigurationProperties(prefix = "kafka")
|
||||
public class KafkaProperties {
|
||||
|
||||
private final DeadLetterTopic deadLetterTopic;
|
||||
private final BootStrapServers bootStrapServers;
|
||||
private final Consumer consumer;
|
||||
private final Producer producer;
|
||||
private final Ssl ssl;
|
||||
|
||||
public record DeadLetterTopic(
|
||||
String name
|
||||
) {
|
||||
}
|
||||
|
||||
public record BootStrapServers(
|
||||
String runEnvironment,
|
||||
ServerAddress docker,
|
||||
ServerAddress local
|
||||
) {
|
||||
public String getAddressByProtocol(String protocol) {
|
||||
if (runEnvironment == null || runEnvironment.isBlank()) {
|
||||
throw new IllegalArgumentException(
|
||||
"[KafkaProperties.BootStrapServers] runEnvironment is null or black");
|
||||
}
|
||||
if (runEnvironment.equalsIgnoreCase("local")) {
|
||||
return local.getAddress(protocol);
|
||||
}
|
||||
if (runEnvironment.equalsIgnoreCase("docker")) {
|
||||
return docker.getAddress(protocol);
|
||||
}
|
||||
throw new IllegalArgumentException(
|
||||
"[KafkaProperties.BootStrapServers] unknown runEnvironment: " + runEnvironment);
|
||||
}
|
||||
}
|
||||
|
||||
public record Consumer(
|
||||
String groupId,
|
||||
String keyDeserializer,
|
||||
String valueDeserializer,
|
||||
String protocol,
|
||||
boolean enableAutoCommit,
|
||||
String autoOffsetReset,
|
||||
Integer maxPollRecords,
|
||||
Integer workerThreads,
|
||||
List<String> topics
|
||||
) {
|
||||
public boolean isSslEnabled() {
|
||||
return protocol.equalsIgnoreCase("ssl");
|
||||
}
|
||||
}
|
||||
|
||||
public record Producer(
|
||||
String keySerializer,
|
||||
String valueSerializer,
|
||||
String acks,
|
||||
Integer lingerMs,
|
||||
String protocol
|
||||
) {
|
||||
}
|
||||
|
||||
public record Ssl(
|
||||
String truststoreLocation,
|
||||
String truststorePassword,
|
||||
String keystoreLocation,
|
||||
String keystorePassword,
|
||||
String keyPassword
|
||||
) {
|
||||
public Map<Object, Object> toPropsMap() {
|
||||
Map<Object, Object> props = new HashMap<>();
|
||||
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststoreLocation);
|
||||
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, truststorePassword);
|
||||
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, keystoreLocation);
|
||||
props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, keystorePassword);
|
||||
props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, keyPassword);
|
||||
|
||||
return props;
|
||||
}
|
||||
}
|
||||
|
||||
public record ServerAddress(
|
||||
String plainText,
|
||||
String ssl
|
||||
) {
|
||||
public String getAddress(String protocol) {
|
||||
if (protocol == null || protocol.isBlank()) {
|
||||
throw new IllegalArgumentException("[KafkaProperties.ServerAddress] protocol is null or black");
|
||||
}
|
||||
if (protocol.equalsIgnoreCase("plaintext")) {
|
||||
return plainText;
|
||||
}
|
||||
if (protocol.equalsIgnoreCase("ssl")) {
|
||||
return ssl;
|
||||
}
|
||||
throw new IllegalArgumentException("[KafkaProperties.ServerAddress] unknown protocol: " + protocol);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1,3 +1,68 @@
|
||||
server:
|
||||
port: 8080
|
||||
|
||||
spring:
|
||||
application:
|
||||
name: consumer
|
||||
|
||||
logging:
|
||||
level:
|
||||
org.study.consumerssl.infrastructure.kafka.consumer: INFO
|
||||
org.apache.kafka.common.network: INFO
|
||||
|
||||
management:
|
||||
info:
|
||||
java:
|
||||
enabled: true
|
||||
os:
|
||||
enabled: true
|
||||
endpoints:
|
||||
web:
|
||||
exposure:
|
||||
include: "health,info,prometheus"
|
||||
endpoint:
|
||||
health:
|
||||
show-details: always
|
||||
|
||||
kafka:
|
||||
dead-letter-topic:
|
||||
name: "test-dlt"
|
||||
|
||||
bootstrap-servers:
|
||||
run-environment: ${RUN_ENVIRONMENT:local}
|
||||
local:
|
||||
plain-text: "localhost:29092,localhost:39092,localhost:49092"
|
||||
ssl: "localhost:29093,localhost:39093,localhost:49093"
|
||||
docker:
|
||||
plain-text: "broker-1:19092,broker-2:19092,broker-3:19092"
|
||||
ssl: "broker-1:19093,broker-2:19093,broker-3:19093"
|
||||
|
||||
producer:
|
||||
keySerializer: "org.apache.kafka.common.serialization.StringSerializer"
|
||||
valueSerializer: "org.apache.kafka.common.serialization.StringSerializer"
|
||||
acks: "all"
|
||||
linger-ms: 10
|
||||
protocol: "plaintext"
|
||||
|
||||
consumer:
|
||||
group-id: "sangdol-group"
|
||||
keyDeserializer: "org.apache.kafka.common.serialization.StringDeserializer"
|
||||
valueDeserializer: "com.sangdol.consumer.infrastructure.kafka.common.serialize.TestRecordDeserializer"
|
||||
protocol: ${CONSUMER_PROTOCOL:plaintext}
|
||||
enable-auto-commit: false
|
||||
auto-offset-reset: "earliest"
|
||||
max-poll-records: 500
|
||||
worker-threads: 3
|
||||
topics:
|
||||
- "topic-0"
|
||||
- "topic-1"
|
||||
- "topic-2"
|
||||
|
||||
ssl:
|
||||
truststore-location: ssl/kafka.broker.truststore.jks
|
||||
truststore-password: abcd1234
|
||||
keystore-location: ssl/kafka.broker.keystore.jks
|
||||
keystore-password: abcd1234
|
||||
key-password: abcd1234
|
||||
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user