diff --git a/src/main/java/com/sangdol/consumer/infrastructure/kafka/common/KafkaProperties.java b/src/main/java/com/sangdol/consumer/infrastructure/kafka/common/KafkaProperties.java new file mode 100644 index 0000000..bfe1b47 --- /dev/null +++ b/src/main/java/com/sangdol/consumer/infrastructure/kafka/common/KafkaProperties.java @@ -0,0 +1,111 @@ +package com.sangdol.consumer.infrastructure.kafka.common; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.kafka.common.config.SslConfigs; +import org.springframework.boot.context.properties.ConfigurationProperties; + +import lombok.Getter; +import lombok.RequiredArgsConstructor; + +@Getter +@RequiredArgsConstructor +@ConfigurationProperties(prefix = "kafka") +public class KafkaProperties { + + private final DeadLetterTopic deadLetterTopic; + private final BootStrapServers bootStrapServers; + private final Consumer consumer; + private final Producer producer; + private final Ssl ssl; + + public record DeadLetterTopic( + String name + ) { + } + + public record BootStrapServers( + String runEnvironment, + ServerAddress docker, + ServerAddress local + ) { + public String getAddressByProtocol(String protocol) { + if (runEnvironment == null || runEnvironment.isBlank()) { + throw new IllegalArgumentException( + "[KafkaProperties.BootStrapServers] runEnvironment is null or black"); + } + if (runEnvironment.equalsIgnoreCase("local")) { + return local.getAddress(protocol); + } + if (runEnvironment.equalsIgnoreCase("docker")) { + return docker.getAddress(protocol); + } + throw new IllegalArgumentException( + "[KafkaProperties.BootStrapServers] unknown runEnvironment: " + runEnvironment); + } + } + + public record Consumer( + String groupId, + String keyDeserializer, + String valueDeserializer, + String protocol, + boolean enableAutoCommit, + String autoOffsetReset, + Integer maxPollRecords, + Integer workerThreads, + List topics + ) { + public boolean isSslEnabled() { + return protocol.equalsIgnoreCase("ssl"); + } + } + + public record Producer( + String keySerializer, + String valueSerializer, + String acks, + Integer lingerMs, + String protocol + ) { + } + + public record Ssl( + String truststoreLocation, + String truststorePassword, + String keystoreLocation, + String keystorePassword, + String keyPassword + ) { + public Map toPropsMap() { + Map props = new HashMap<>(); + props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststoreLocation); + props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, truststorePassword); + props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, keystoreLocation); + props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, keystorePassword); + props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, keyPassword); + + return props; + } + } + + public record ServerAddress( + String plainText, + String ssl + ) { + public String getAddress(String protocol) { + if (protocol == null || protocol.isBlank()) { + throw new IllegalArgumentException("[KafkaProperties.ServerAddress] protocol is null or black"); + } + if (protocol.equalsIgnoreCase("plaintext")) { + return plainText; + } + if (protocol.equalsIgnoreCase("ssl")) { + return ssl; + } + throw new IllegalArgumentException("[KafkaProperties.ServerAddress] unknown protocol: " + protocol); + } + } +} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 002c42d..a7de48b 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -1,3 +1,68 @@ +server: + port: 8080 + spring: application: name: consumer + +logging: + level: + org.study.consumerssl.infrastructure.kafka.consumer: INFO + org.apache.kafka.common.network: INFO + +management: + info: + java: + enabled: true + os: + enabled: true + endpoints: + web: + exposure: + include: "health,info,prometheus" + endpoint: + health: + show-details: always + +kafka: + dead-letter-topic: + name: "test-dlt" + + bootstrap-servers: + run-environment: ${RUN_ENVIRONMENT:local} + local: + plain-text: "localhost:29092,localhost:39092,localhost:49092" + ssl: "localhost:29093,localhost:39093,localhost:49093" + docker: + plain-text: "broker-1:19092,broker-2:19092,broker-3:19092" + ssl: "broker-1:19093,broker-2:19093,broker-3:19093" + + producer: + keySerializer: "org.apache.kafka.common.serialization.StringSerializer" + valueSerializer: "org.apache.kafka.common.serialization.StringSerializer" + acks: "all" + linger-ms: 10 + protocol: "plaintext" + + consumer: + group-id: "sangdol-group" + keyDeserializer: "org.apache.kafka.common.serialization.StringDeserializer" + valueDeserializer: "com.sangdol.consumer.infrastructure.kafka.common.serialize.TestRecordDeserializer" + protocol: ${CONSUMER_PROTOCOL:plaintext} + enable-auto-commit: false + auto-offset-reset: "earliest" + max-poll-records: 500 + worker-threads: 3 + topics: + - "topic-0" + - "topic-1" + - "topic-2" + + ssl: + truststore-location: ssl/kafka.broker.truststore.jks + truststore-password: abcd1234 + keystore-location: ssl/kafka.broker.keystore.jks + keystore-password: abcd1234 + key-password: abcd1234 + +