From 3e9e8ebe2a927ec91c261c448a09995ad57bdef2 Mon Sep 17 00:00:00 2001 From: pricelees Date: Fri, 27 Jun 2025 14:27:55 +0900 Subject: [PATCH 1/9] =?UTF-8?q?feat:=20=EB=A1=9C=EC=BB=AC=EC=97=90?= =?UTF-8?q?=EC=84=9C=EC=9D=98=20=EC=A0=95=EC=83=81=20=EC=8B=A4=ED=96=89?= =?UTF-8?q?=EC=9D=84=20=ED=99=95=EC=9D=B8=ED=95=98=EA=B8=B0=20=EC=9C=84?= =?UTF-8?q?=ED=95=9C=20=EC=B9=B4=ED=94=84=EC=B9=B4=20=ED=81=B4=EB=9F=AC?= =?UTF-8?q?=EC=8A=A4=ED=84=B0=20=EC=8B=A4=ED=96=89=EC=9A=A9=20=EC=8A=A4?= =?UTF-8?q?=ED=81=AC=EB=A6=BD=ED=8A=B8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- run-kafka-cluster.sh | 3 +++ 1 file changed, 3 insertions(+) create mode 100755 run-kafka-cluster.sh diff --git a/run-kafka-cluster.sh b/run-kafka-cluster.sh new file mode 100755 index 0000000..53da095 --- /dev/null +++ b/run-kafka-cluster.sh @@ -0,0 +1,3 @@ +#!/bin/bash +docker compose -f docker/docker-compose-kafka-cluster.yml down +docker compose -f docker/docker-compose-kafka-cluster.yml up -d From ce20f85c04bfff8e70b9fd701e6912af07bcd0b9 Mon Sep 17 00:00:00 2001 From: pricelees Date: Fri, 27 Jun 2025 14:29:46 +0900 Subject: [PATCH 2/9] =?UTF-8?q?feat:=20Plaintext=EC=99=80=20Ssl=EC=97=90?= =?UTF-8?q?=EC=84=9C=EC=9D=98=20=EC=A0=84=EC=86=A1=20=EC=8B=9C=EA=B0=84=20?= =?UTF-8?q?=EB=A9=94=ED=8A=B8=EB=A6=AD=20=EC=B8=A1=EC=A0=95=EC=97=90=20?= =?UTF-8?q?=EC=82=AC=EC=9A=A9=EB=90=98=EB=8A=94=20ConsumerStats=20?= =?UTF-8?q?=ED=81=B4=EB=9E=98=EC=8A=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../kafka/consumer/ConsumerStats.java | 30 +++++++++++++++++++ 1 file changed, 30 insertions(+) create mode 100644 src/main/java/com/sangdol/consumer/infrastructure/kafka/consumer/ConsumerStats.java diff --git a/src/main/java/com/sangdol/consumer/infrastructure/kafka/consumer/ConsumerStats.java b/src/main/java/com/sangdol/consumer/infrastructure/kafka/consumer/ConsumerStats.java new file mode 100644 index 0000000..ac86c11 --- /dev/null +++ b/src/main/java/com/sangdol/consumer/infrastructure/kafka/consumer/ConsumerStats.java @@ -0,0 +1,30 @@ +package com.sangdol.consumer.infrastructure.kafka.consumer; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +public class ConsumerStats { + + private final int workerThreads; + private final Map threadElapsedTime; + + public ConsumerStats(int workerThreads) { + this.workerThreads = workerThreads; + this.threadElapsedTime = new ConcurrentHashMap<>(); + } + + public void update(long elapsedTime) { + threadElapsedTime.put(Thread.currentThread().getName(), elapsedTime); + } + + public long averageTime() { + if (threadElapsedTime.size() != workerThreads) { + return 0L; + } + + return threadElapsedTime.values().stream() + .collect(Collectors.averagingLong(Long::longValue)) + .longValue(); + } +} From 26f39596f06313a6332ca7b7c9f80ce6c773df52 Mon Sep 17 00:00:00 2001 From: pricelees Date: Fri, 27 Jun 2025 14:31:19 +0900 Subject: [PATCH 3/9] =?UTF-8?q?feat:=20application.yml=EC=9D=98=20workerTh?= =?UTF-8?q?reads=EB=A1=9C=20=EC=A7=80=EC=A0=95=EB=90=9C=20=EA=B0=92=20?= =?UTF-8?q?=EB=A7=8C=ED=81=BC=EC=9D=98=20=EC=8A=A4=EB=A0=88=EB=93=9C=20?= =?UTF-8?q?=ED=92=80=20=EB=B0=8F=20=ED=81=AC=EA=B8=B0=EA=B0=92=EC=9D=84=20?= =?UTF-8?q?=EA=B0=80=EC=A7=80=EB=8A=94=20=ED=81=B4=EB=9E=98=EC=8A=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 스레드 풀이 필요한 곳, 단순히 스레드 수가 필요한 곳 모두에서 사용하기 위함. --- .../infrastructure/kafka/consumer/ConsumerPool.java | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 src/main/java/com/sangdol/consumer/infrastructure/kafka/consumer/ConsumerPool.java diff --git a/src/main/java/com/sangdol/consumer/infrastructure/kafka/consumer/ConsumerPool.java b/src/main/java/com/sangdol/consumer/infrastructure/kafka/consumer/ConsumerPool.java new file mode 100644 index 0000000..a9cb816 --- /dev/null +++ b/src/main/java/com/sangdol/consumer/infrastructure/kafka/consumer/ConsumerPool.java @@ -0,0 +1,6 @@ +package com.sangdol.consumer.infrastructure.kafka.consumer; + +import java.util.concurrent.ExecutorService; + +public record ConsumerPool(ExecutorService executorService, int poolSize) { +} From 69bd95b981f41b9718207f0c1d1df5c52c9c13d4 Mon Sep 17 00:00:00 2001 From: pricelees Date: Fri, 27 Jun 2025 14:32:11 +0900 Subject: [PATCH 4/9] =?UTF-8?q?feat:=20=EC=BB=A8=EC=8A=88=EB=A8=B8=20?= =?UTF-8?q?=EB=A6=AC=EB=B0=B8=EB=9F=B0=EC=8A=A4=20=EB=B0=9C=EC=83=9D?= =?UTF-8?q?=EC=8B=9C=20=EC=B2=98=EB=A6=AC=20=EA=B8=B0=EB=8A=A5=20=EA=B5=AC?= =?UTF-8?q?=ED=98=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 일단 로깅 및 Manual Commit만 진행 --- .../kafka/consumer/RebalanceListener.java | 35 +++++++++++++++++++ 1 file changed, 35 insertions(+) create mode 100644 src/main/java/com/sangdol/consumer/infrastructure/kafka/consumer/RebalanceListener.java diff --git a/src/main/java/com/sangdol/consumer/infrastructure/kafka/consumer/RebalanceListener.java b/src/main/java/com/sangdol/consumer/infrastructure/kafka/consumer/RebalanceListener.java new file mode 100644 index 0000000..420580f --- /dev/null +++ b/src/main/java/com/sangdol/consumer/infrastructure/kafka/consumer/RebalanceListener.java @@ -0,0 +1,35 @@ +package com.sangdol.consumer.infrastructure.kafka.consumer; + +import java.util.Collection; + +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; + +import com.sangdol.consumer.domain.TestRecord; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@RequiredArgsConstructor +public class RebalanceListener implements ConsumerRebalanceListener { + + private final KafkaConsumer consumer; + + @Override + public void onPartitionsRevoked(Collection partitions) { + log.warn("[RebalanceListener] Partitions revoked for partitions {} ", partitions); + try { + consumer.commitSync(); + log.info("[RebalanceListener] Offsets committed before partition revoked. partition: {}", partitions); + } catch (Exception e) { + log.error("[RebalanceListener] Failed to commit before partition revoked", e); + } + } + + @Override + public void onPartitionsAssigned(Collection partitions) { + log.info("[RebalanceListener] Partitions assigned for partitions {} ", partitions); + } +} From e4b86e6f21cb7bdd5bca35e19a15d2b987a8bccf Mon Sep 17 00:00:00 2001 From: pricelees Date: Fri, 27 Jun 2025 14:34:13 +0900 Subject: [PATCH 5/9] =?UTF-8?q?feat:=20=EC=BB=A8=EC=8A=88=EB=A8=B8=20?= =?UTF-8?q?=EB=A0=88=EC=BD=94=EB=93=9C=20=EC=B2=98=EB=A6=AC=20=EA=B8=B0?= =?UTF-8?q?=EB=8A=A5=20=EA=B5=AC=ED=98=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 메트릭 측정(ref: ce20f85c)을 위한 로직 - 레코드 자체를 처리하는 비즈니스 로직은 없음. --- .../kafka/consumer/RecordProcessor.java | 58 +++++++++++++++++++ 1 file changed, 58 insertions(+) create mode 100644 src/main/java/com/sangdol/consumer/infrastructure/kafka/consumer/RecordProcessor.java diff --git a/src/main/java/com/sangdol/consumer/infrastructure/kafka/consumer/RecordProcessor.java b/src/main/java/com/sangdol/consumer/infrastructure/kafka/consumer/RecordProcessor.java new file mode 100644 index 0000000..bb4562a --- /dev/null +++ b/src/main/java/com/sangdol/consumer/infrastructure/kafka/consumer/RecordProcessor.java @@ -0,0 +1,58 @@ +package com.sangdol.consumer.infrastructure.kafka.consumer; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; + +import com.sangdol.consumer.domain.TestRecord; +import com.sangdol.consumer.infrastructure.kafka.producer.DLTProducer; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@RequiredArgsConstructor +public class RecordProcessor { + + private static final int POLL_COMPLETION_THRESHOLD = 5; + private final DLTProducer dltProducer; + private final ConsumerStats consumerStats; + private final AtomicInteger emptyPollResultCount = new AtomicInteger(0); + private long initialRecordReceivedTime = 0; + + public void process(ConsumerRecords records) throws Exception { + initializeTime(); + + log.debug("[RecordProcessor] process {} records. thread: {}", records.count(), + Thread.currentThread().getName()); + for (ConsumerRecord record : records) { + try { + // 별도의 로직은 없음. 필요시 추가 + } catch (Exception e) { + log.error("[ConsumerWorker] failed to consume record, sending to DLT. offset: {}", + record.offset(), e); + dltProducer.sendToDeadLetterTopic(record.key(), record.value(), e); + } + } + } + + public void processIfEmptyRecords() { + if (initialRecordReceivedTime != 0) { + emptyPollResultCount.incrementAndGet(); + } + + if (emptyPollResultCount.get() == POLL_COMPLETION_THRESHOLD) { + long elapsedTime = System.currentTimeMillis() - initialRecordReceivedTime; + consumerStats.update(elapsedTime); + initialRecordReceivedTime = 0; + emptyPollResultCount.set(0); + } + } + + private void initializeTime() { + if (initialRecordReceivedTime == 0) { + initialRecordReceivedTime = System.currentTimeMillis(); + } + } +} From bad7b5725ec886062bf4bd521f2983faed0cc6bc Mon Sep 17 00:00:00 2001 From: pricelees Date: Fri, 27 Jun 2025 14:35:10 +0900 Subject: [PATCH 6/9] =?UTF-8?q?feat:=20=EC=BB=A8=EC=8A=88=EB=A8=B8=20?= =?UTF-8?q?=EC=9B=8C=EC=BB=A4=20=EC=8A=A4=EB=A0=88=EB=93=9C=20=EA=B8=B0?= =?UTF-8?q?=EB=8A=A5=20=EA=B5=AC=ED=98=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 오프셋 비동기 커밋 & 예외 처리 & shutdown 기능 구현 --- .../kafka/consumer/ConsumerWorker.java | 87 +++++++++++++++++++ 1 file changed, 87 insertions(+) create mode 100644 src/main/java/com/sangdol/consumer/infrastructure/kafka/consumer/ConsumerWorker.java diff --git a/src/main/java/com/sangdol/consumer/infrastructure/kafka/consumer/ConsumerWorker.java b/src/main/java/com/sangdol/consumer/infrastructure/kafka/consumer/ConsumerWorker.java new file mode 100644 index 0000000..84b7492 --- /dev/null +++ b/src/main/java/com/sangdol/consumer/infrastructure/kafka/consumer/ConsumerWorker.java @@ -0,0 +1,87 @@ +package com.sangdol.consumer.infrastructure.kafka.consumer; + +import java.time.Duration; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.WakeupException; + +import com.sangdol.consumer.domain.TestRecord; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@RequiredArgsConstructor +public class ConsumerWorker implements Runnable { + + private final KafkaConsumer consumer; + private final RecordProcessor recordProcessor; + private final AtomicBoolean running = new AtomicBoolean(true); + + @Override + public void run() { + try { + log.info("[ConsumerWorker] started. thread: {}", Thread.currentThread().getName()); + while (running.get()) { + ConsumerRecords records = consumer.poll(Duration.ofMillis(1000)); + + if (records.isEmpty()) { + recordProcessor.processIfEmptyRecords(); + continue; + } + + log.debug("[ConsumerWorker] received {} records. thread: {}", records.count(), Thread.currentThread().getName()); + recordProcessor.process(records); + commitAsync(records); + } + } catch (WakeupException e) { + log.info("[ConsumerWorker] Wakeup Exception. thread: {}", Thread.currentThread().getName()); + } catch (Exception e) { + log.info("[ConsumerWorker] Unexpected Exception. thread: {}", Thread.currentThread().getName()); + } finally { + try { + consumer.commitSync(); + log.info("[ConsumerWorker] Final offset committed. Close consumer.. "); + } catch (Exception e) { + log.error("[ConsumerWorker] Failed to commit final offsets when shutdown. thread: {}", + Thread.currentThread().getName()); + } finally { + consumer.close(); + log.info("[ConsumerWorker] Closed consumer thread. thread: {}", Thread.currentThread().getName()); + } + } + } + + private void commitAsync(ConsumerRecords records) { + Map offsets = records.partitions().stream() + .collect(Collectors.toMap(tp -> tp, tp -> { + List> partitionRecords = records.records(tp); + long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); + + return new OffsetAndMetadata(lastOffset + 1); + })); + + consumer.commitAsync(offsets, (o, exception) -> { + if (exception != null) { + log.error("[ConsumerWorker] failed to commit offsets for offset: {}, partition {}", o, + records.partitions(), exception); + } else { + log.debug("[ConsumerWorker] committed offsets for offset: {}, partition {}", o, + records.partitions()); + } + }); + } + + public void shutdown() { + running.set(false); + consumer.wakeup(); + } +} From c891e26b8a3b3894c6fe49c5db015a4fdb0a3ef7 Mon Sep 17 00:00:00 2001 From: pricelees Date: Fri, 27 Jun 2025 14:36:10 +0900 Subject: [PATCH 7/9] =?UTF-8?q?feat:=20=EC=BB=A8=EC=8A=88=EB=A8=B8=20?= =?UTF-8?q?=EA=B8=B0=EB=8A=A5=20=EA=B5=AC=ED=98=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 3개의 워커스레드 생성 및 실행 - 스프링의 PreDestroy를 이용한 shutdown 로직 구현 --- .../kafka/consumer/MultiWorkerConsumer.java | 76 +++++++++++++++++++ 1 file changed, 76 insertions(+) create mode 100644 src/main/java/com/sangdol/consumer/infrastructure/kafka/consumer/MultiWorkerConsumer.java diff --git a/src/main/java/com/sangdol/consumer/infrastructure/kafka/consumer/MultiWorkerConsumer.java b/src/main/java/com/sangdol/consumer/infrastructure/kafka/consumer/MultiWorkerConsumer.java new file mode 100644 index 0000000..d0e432a --- /dev/null +++ b/src/main/java/com/sangdol/consumer/infrastructure/kafka/consumer/MultiWorkerConsumer.java @@ -0,0 +1,76 @@ +package com.sangdol.consumer.infrastructure.kafka.consumer; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; + +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.springframework.boot.context.event.ApplicationReadyEvent; +import org.springframework.context.event.EventListener; +import org.springframework.stereotype.Component; + +import com.sangdol.consumer.domain.TestRecord; +import com.sangdol.consumer.infrastructure.kafka.producer.DLTProducer; + +import jakarta.annotation.PreDestroy; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Component +public class MultiWorkerConsumer { + + private final ConsumerConfiguration consumerConfiguration; + private final DLTProducer dltProducer; + private final ConsumerStats consumerStats; + private final ExecutorService executorService; + private final Integer workerThreads; + private final List workers; + + public MultiWorkerConsumer( + ConsumerConfiguration consumerConfiguration, + DLTProducer dltProducer, + ConsumerStats consumerStats, + ConsumerPool consumerPool + ) { + this.consumerConfiguration = consumerConfiguration; + this.dltProducer = dltProducer; + this.consumerStats = consumerStats; + this.executorService = consumerPool.executorService(); + this.workerThreads = consumerPool.poolSize(); + this.workers = new ArrayList<>(); + } + + @EventListener(ApplicationReadyEvent.class) + public void initialize() { + log.info("[MultiWorkerConsumer] initialize"); + + for (int i = 0; i < workerThreads; i++) { + KafkaConsumer consumer = consumerConfiguration.kafkaConsumer(); + RecordProcessor recordProcessor = new RecordProcessor(dltProducer, consumerStats); + ConsumerWorker consumerWorker = new ConsumerWorker(consumer, recordProcessor); + workers.add(consumerWorker); + executorService.execute(consumerWorker); + } + } + + @PreDestroy + public void shutdown() { + log.info("[MultiWorkerConsumer] Shutting down consumers.."); + workers.forEach(ConsumerWorker::shutdown); + executorService.shutdown(); + + try { + if (!executorService.awaitTermination(30, TimeUnit.SECONDS)) { + log.warn("[MultiWorkerConsumer] Executor service did not terminated during 30 seconds. shutdown now."); + executorService.shutdownNow(); + } + } catch (InterruptedException e) { + log.warn("[MultiWorkerConsumer] Interrupted while waiting for executors to terminate. shutdown now."); + executorService.shutdownNow(); + Thread.currentThread().interrupt(); + } + + log.info("[MultiWorkerConsumer] Shutdown complete."); + } +} From 54328767b9b71ea3f2cc7c2e0087d940e2665221 Mon Sep 17 00:00:00 2001 From: pricelees Date: Fri, 27 Jun 2025 14:37:05 +0900 Subject: [PATCH 8/9] =?UTF-8?q?feat:=20=EC=BB=A8=EC=8A=88=EB=A8=B8=20?= =?UTF-8?q?=EC=84=A4=EC=A0=95=20=ED=81=B4=EB=9E=98=EC=8A=A4=20=EA=B5=AC?= =?UTF-8?q?=ED=98=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 컨슈머 관련 설정 + 메트릭 측정을 위한 MeterBinder 빈 - KafkaConsumer는 Thread-Safe하지 않기에 빈 등록 X --- .../kafka/consumer/ConsumerConfiguration.java | 88 +++++++++++++++++++ 1 file changed, 88 insertions(+) create mode 100644 src/main/java/com/sangdol/consumer/infrastructure/kafka/consumer/ConsumerConfiguration.java diff --git a/src/main/java/com/sangdol/consumer/infrastructure/kafka/consumer/ConsumerConfiguration.java b/src/main/java/com/sangdol/consumer/infrastructure/kafka/consumer/ConsumerConfiguration.java new file mode 100644 index 0000000..e47c047 --- /dev/null +++ b/src/main/java/com/sangdol/consumer/infrastructure/kafka/consumer/ConsumerConfiguration.java @@ -0,0 +1,88 @@ +package com.sangdol.consumer.infrastructure.kafka.consumer; + +import java.util.Properties; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.sangdol.consumer.domain.TestRecord; +import com.sangdol.consumer.infrastructure.kafka.common.KafkaProperties; + +import io.micrometer.core.instrument.Gauge; +import io.micrometer.core.instrument.binder.MeterBinder; +import lombok.RequiredArgsConstructor; + +@Configuration +@RequiredArgsConstructor +@EnableConfigurationProperties(value = KafkaProperties.class) +public class ConsumerConfiguration { + + private final KafkaProperties kafkaProperties; + + public KafkaConsumer kafkaConsumer() { + KafkaConsumer consumer = new KafkaConsumer<>(consumerProperties()); + consumer.subscribe(kafkaProperties.getConsumer().topics(), new RebalanceListener(consumer)); + + return consumer; + } + + @Bean + public MeterBinder elapsedTime(ConsumerStats consumerStats) { + String consumerProtocol = kafkaProperties.getConsumer().protocol(); + return registry -> Gauge.builder("elapsed.time", consumerStats, ConsumerStats::averageTime) + .tag("protocol", consumerProtocol) + .register(registry); + } + + @Bean + public ConsumerStats consumerStats() { + Integer workerThreads = kafkaProperties.getConsumer().workerThreads(); + return new ConsumerStats(workerThreads); + } + + @Bean + public ConsumerPool consumerPool() { + ThreadFactory threadFactory = new ThreadFactoryBuilder() + .setNameFormat("consumer-pool-%d") + .build(); + int workerThreads = kafkaProperties.getConsumer().workerThreads(); + + return new ConsumerPool( + Executors.newFixedThreadPool(workerThreads, threadFactory), + workerThreads + ); + } + + private Properties consumerProperties() { + KafkaProperties.Consumer consumer = kafkaProperties.getConsumer(); + KafkaProperties.BootStrapServers bootStrapServers = kafkaProperties.getBootStrapServers(); + Properties properties = new Properties(); + properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, + bootStrapServers.getAddressByProtocol(consumer.protocol())); + properties.put(ConsumerConfig.GROUP_ID_CONFIG, consumer.groupId()); + properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, consumer.keyDeserializer()); + properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, consumer.valueDeserializer()); + properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, consumer.autoOffsetReset()); + properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, consumer.enableAutoCommit()); + properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, consumer.maxPollRecords()); + + applyRemainPropsBySslEnabled(consumer.isSslEnabled(), properties); + + return properties; + } + + private void applyRemainPropsBySslEnabled(boolean isSslEnabled, Properties properties) { + if (isSslEnabled) { + properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SSL.name); + properties.putAll(kafkaProperties.getSsl().toPropsMap()); + } + } +} From 8ff8584a31f2ecb9cd2c1a5264e1823b259d8baf Mon Sep 17 00:00:00 2001 From: pricelees Date: Fri, 27 Jun 2025 14:38:29 +0900 Subject: [PATCH 9/9] =?UTF-8?q?refactor:=20DLTProducerTest=EC=97=90?= =?UTF-8?q?=EC=84=9C=20=EB=B6=88=ED=95=84=EC=9A=94=ED=95=9C=20=EC=BB=A8?= =?UTF-8?q?=EC=8A=88=EB=A8=B8=20=EC=97=B0=EA=B2=B0=20=EB=B0=A9=EC=A7=80?= =?UTF-8?q?=EB=A5=BC=20=EC=9C=84=ED=95=9C=20=EC=BB=A8=EC=8A=88=EB=A8=B8=20?= =?UTF-8?q?Mocking?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Kafka 클러스터 실행 없이 테스트를 진행해도 정상 동작해야 함. --- .../infrastructure/kafka/producer/DLTProducerTest.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/test/java/com/sangdol/consumer/infrastructure/kafka/producer/DLTProducerTest.java b/src/test/java/com/sangdol/consumer/infrastructure/kafka/producer/DLTProducerTest.java index 2ffd6e6..94b4a04 100644 --- a/src/test/java/com/sangdol/consumer/infrastructure/kafka/producer/DLTProducerTest.java +++ b/src/test/java/com/sangdol/consumer/infrastructure/kafka/producer/DLTProducerTest.java @@ -17,6 +17,7 @@ import org.springframework.test.context.bean.override.mockito.MockitoBean; import com.sangdol.consumer.domain.TestRecord; import com.sangdol.consumer.infrastructure.kafka.common.serialize.JsonObjectMapper; +import com.sangdol.consumer.infrastructure.kafka.consumer.MultiWorkerConsumer; /** * Spring Retryable 동작 및 로그 출력 확인용 테스트 @@ -24,6 +25,9 @@ import com.sangdol.consumer.infrastructure.kafka.common.serialize.JsonObjectMapp @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) class DLTProducerTest { + @MockitoBean + private MultiWorkerConsumer multiWorkerConsumer; + @MockitoBean private KafkaProducer kafkaProducer;