Compare commits

...

10 Commits

Author SHA1 Message Date
ebd003b79e Merge pull request '[#5] 컨슈머 코드 마이그레이션' (#6) from feature/#5 into main
Reviewed-on: #6
2025-06-27 05:43:39 +00:00
8ff8584a31 refactor: DLTProducerTest에서 불필요한 컨슈머 연결 방지를 위한 컨슈머 Mocking
- Kafka 클러스터 실행 없이 테스트를 진행해도 정상 동작해야 함.
2025-06-27 14:38:29 +09:00
54328767b9 feat: 컨슈머 설정 클래스 구현
- 컨슈머 관련 설정 + 메트릭 측정을 위한 MeterBinder 빈
- KafkaConsumer는 Thread-Safe하지 않기에 빈 등록 X
2025-06-27 14:37:05 +09:00
c891e26b8a feat: 컨슈머 기능 구현
- 3개의 워커스레드 생성 및 실행
- 스프링의 PreDestroy를 이용한 shutdown 로직 구현
2025-06-27 14:36:10 +09:00
bad7b5725e feat: 컨슈머 워커 스레드 기능 구현
- 오프셋 비동기 커밋 & 예외 처리 & shutdown 기능 구현
2025-06-27 14:35:10 +09:00
e4b86e6f21 feat: 컨슈머 레코드 처리 기능 구현
- 메트릭 측정(ref: ce20f85c)을 위한 로직
- 레코드 자체를 처리하는 비즈니스 로직은 없음.
2025-06-27 14:34:13 +09:00
69bd95b981 feat: 컨슈머 리밸런스 발생시 처리 기능 구현
- 일단 로깅 및 Manual Commit만 진행
2025-06-27 14:32:11 +09:00
26f39596f0 feat: application.yml의 workerThreads로 지정된 값 만큼의 스레드 풀 및 크기값을 가지는 클래스
- 스레드 풀이 필요한 곳, 단순히 스레드 수가 필요한 곳 모두에서 사용하기 위함.
2025-06-27 14:31:19 +09:00
ce20f85c04 feat: Plaintext와 Ssl에서의 전송 시간 메트릭 측정에 사용되는 ConsumerStats 클래스 2025-06-27 14:29:46 +09:00
3e9e8ebe2a feat: 로컬에서의 정상 실행을 확인하기 위한 카프카 클러스터 실행용 스크립트 2025-06-27 14:27:55 +09:00
9 changed files with 387 additions and 0 deletions

3
run-kafka-cluster.sh Executable file
View File

@ -0,0 +1,3 @@
#!/bin/bash
docker compose -f docker/docker-compose-kafka-cluster.yml down
docker compose -f docker/docker-compose-kafka-cluster.yml up -d

View File

@ -0,0 +1,88 @@
package com.sangdol.consumer.infrastructure.kafka.consumer;
import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.sangdol.consumer.domain.TestRecord;
import com.sangdol.consumer.infrastructure.kafka.common.KafkaProperties;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.binder.MeterBinder;
import lombok.RequiredArgsConstructor;
@Configuration
@RequiredArgsConstructor
@EnableConfigurationProperties(value = KafkaProperties.class)
public class ConsumerConfiguration {
private final KafkaProperties kafkaProperties;
public KafkaConsumer<String, TestRecord> kafkaConsumer() {
KafkaConsumer<String, TestRecord> consumer = new KafkaConsumer<>(consumerProperties());
consumer.subscribe(kafkaProperties.getConsumer().topics(), new RebalanceListener(consumer));
return consumer;
}
@Bean
public MeterBinder elapsedTime(ConsumerStats consumerStats) {
String consumerProtocol = kafkaProperties.getConsumer().protocol();
return registry -> Gauge.builder("elapsed.time", consumerStats, ConsumerStats::averageTime)
.tag("protocol", consumerProtocol)
.register(registry);
}
@Bean
public ConsumerStats consumerStats() {
Integer workerThreads = kafkaProperties.getConsumer().workerThreads();
return new ConsumerStats(workerThreads);
}
@Bean
public ConsumerPool consumerPool() {
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat("consumer-pool-%d")
.build();
int workerThreads = kafkaProperties.getConsumer().workerThreads();
return new ConsumerPool(
Executors.newFixedThreadPool(workerThreads, threadFactory),
workerThreads
);
}
private Properties consumerProperties() {
KafkaProperties.Consumer consumer = kafkaProperties.getConsumer();
KafkaProperties.BootStrapServers bootStrapServers = kafkaProperties.getBootStrapServers();
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootStrapServers.getAddressByProtocol(consumer.protocol()));
properties.put(ConsumerConfig.GROUP_ID_CONFIG, consumer.groupId());
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, consumer.keyDeserializer());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, consumer.valueDeserializer());
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, consumer.autoOffsetReset());
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, consumer.enableAutoCommit());
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, consumer.maxPollRecords());
applyRemainPropsBySslEnabled(consumer.isSslEnabled(), properties);
return properties;
}
private void applyRemainPropsBySslEnabled(boolean isSslEnabled, Properties properties) {
if (isSslEnabled) {
properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SSL.name);
properties.putAll(kafkaProperties.getSsl().toPropsMap());
}
}
}

View File

@ -0,0 +1,6 @@
package com.sangdol.consumer.infrastructure.kafka.consumer;
import java.util.concurrent.ExecutorService;
public record ConsumerPool(ExecutorService executorService, int poolSize) {
}

View File

@ -0,0 +1,30 @@
package com.sangdol.consumer.infrastructure.kafka.consumer;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
public class ConsumerStats {
private final int workerThreads;
private final Map<String, Long> threadElapsedTime;
public ConsumerStats(int workerThreads) {
this.workerThreads = workerThreads;
this.threadElapsedTime = new ConcurrentHashMap<>();
}
public void update(long elapsedTime) {
threadElapsedTime.put(Thread.currentThread().getName(), elapsedTime);
}
public long averageTime() {
if (threadElapsedTime.size() != workerThreads) {
return 0L;
}
return threadElapsedTime.values().stream()
.collect(Collectors.averagingLong(Long::longValue))
.longValue();
}
}

View File

@ -0,0 +1,87 @@
package com.sangdol.consumer.infrastructure.kafka.consumer;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import com.sangdol.consumer.domain.TestRecord;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@RequiredArgsConstructor
public class ConsumerWorker implements Runnable {
private final KafkaConsumer<String, TestRecord> consumer;
private final RecordProcessor recordProcessor;
private final AtomicBoolean running = new AtomicBoolean(true);
@Override
public void run() {
try {
log.info("[ConsumerWorker] started. thread: {}", Thread.currentThread().getName());
while (running.get()) {
ConsumerRecords<String, TestRecord> records = consumer.poll(Duration.ofMillis(1000));
if (records.isEmpty()) {
recordProcessor.processIfEmptyRecords();
continue;
}
log.debug("[ConsumerWorker] received {} records. thread: {}", records.count(), Thread.currentThread().getName());
recordProcessor.process(records);
commitAsync(records);
}
} catch (WakeupException e) {
log.info("[ConsumerWorker] Wakeup Exception. thread: {}", Thread.currentThread().getName());
} catch (Exception e) {
log.info("[ConsumerWorker] Unexpected Exception. thread: {}", Thread.currentThread().getName());
} finally {
try {
consumer.commitSync();
log.info("[ConsumerWorker] Final offset committed. Close consumer.. ");
} catch (Exception e) {
log.error("[ConsumerWorker] Failed to commit final offsets when shutdown. thread: {}",
Thread.currentThread().getName());
} finally {
consumer.close();
log.info("[ConsumerWorker] Closed consumer thread. thread: {}", Thread.currentThread().getName());
}
}
}
private void commitAsync(ConsumerRecords<String, TestRecord> records) {
Map<TopicPartition, OffsetAndMetadata> offsets = records.partitions().stream()
.collect(Collectors.toMap(tp -> tp, tp -> {
List<ConsumerRecord<String, TestRecord>> partitionRecords = records.records(tp);
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
return new OffsetAndMetadata(lastOffset + 1);
}));
consumer.commitAsync(offsets, (o, exception) -> {
if (exception != null) {
log.error("[ConsumerWorker] failed to commit offsets for offset: {}, partition {}", o,
records.partitions(), exception);
} else {
log.debug("[ConsumerWorker] committed offsets for offset: {}, partition {}", o,
records.partitions());
}
});
}
public void shutdown() {
running.set(false);
consumer.wakeup();
}
}

View File

@ -0,0 +1,76 @@
package com.sangdol.consumer.infrastructure.kafka.consumer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
import com.sangdol.consumer.domain.TestRecord;
import com.sangdol.consumer.infrastructure.kafka.producer.DLTProducer;
import jakarta.annotation.PreDestroy;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Component
public class MultiWorkerConsumer {
private final ConsumerConfiguration consumerConfiguration;
private final DLTProducer dltProducer;
private final ConsumerStats consumerStats;
private final ExecutorService executorService;
private final Integer workerThreads;
private final List<ConsumerWorker> workers;
public MultiWorkerConsumer(
ConsumerConfiguration consumerConfiguration,
DLTProducer dltProducer,
ConsumerStats consumerStats,
ConsumerPool consumerPool
) {
this.consumerConfiguration = consumerConfiguration;
this.dltProducer = dltProducer;
this.consumerStats = consumerStats;
this.executorService = consumerPool.executorService();
this.workerThreads = consumerPool.poolSize();
this.workers = new ArrayList<>();
}
@EventListener(ApplicationReadyEvent.class)
public void initialize() {
log.info("[MultiWorkerConsumer] initialize");
for (int i = 0; i < workerThreads; i++) {
KafkaConsumer<String, TestRecord> consumer = consumerConfiguration.kafkaConsumer();
RecordProcessor recordProcessor = new RecordProcessor(dltProducer, consumerStats);
ConsumerWorker consumerWorker = new ConsumerWorker(consumer, recordProcessor);
workers.add(consumerWorker);
executorService.execute(consumerWorker);
}
}
@PreDestroy
public void shutdown() {
log.info("[MultiWorkerConsumer] Shutting down consumers..");
workers.forEach(ConsumerWorker::shutdown);
executorService.shutdown();
try {
if (!executorService.awaitTermination(30, TimeUnit.SECONDS)) {
log.warn("[MultiWorkerConsumer] Executor service did not terminated during 30 seconds. shutdown now.");
executorService.shutdownNow();
}
} catch (InterruptedException e) {
log.warn("[MultiWorkerConsumer] Interrupted while waiting for executors to terminate. shutdown now.");
executorService.shutdownNow();
Thread.currentThread().interrupt();
}
log.info("[MultiWorkerConsumer] Shutdown complete.");
}
}

View File

@ -0,0 +1,35 @@
package com.sangdol.consumer.infrastructure.kafka.consumer;
import java.util.Collection;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import com.sangdol.consumer.domain.TestRecord;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@RequiredArgsConstructor
public class RebalanceListener implements ConsumerRebalanceListener {
private final KafkaConsumer<String, TestRecord> consumer;
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
log.warn("[RebalanceListener] Partitions revoked for partitions {} ", partitions);
try {
consumer.commitSync();
log.info("[RebalanceListener] Offsets committed before partition revoked. partition: {}", partitions);
} catch (Exception e) {
log.error("[RebalanceListener] Failed to commit before partition revoked", e);
}
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
log.info("[RebalanceListener] Partitions assigned for partitions {} ", partitions);
}
}

View File

@ -0,0 +1,58 @@
package com.sangdol.consumer.infrastructure.kafka.consumer;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import com.sangdol.consumer.domain.TestRecord;
import com.sangdol.consumer.infrastructure.kafka.producer.DLTProducer;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@RequiredArgsConstructor
public class RecordProcessor {
private static final int POLL_COMPLETION_THRESHOLD = 5;
private final DLTProducer dltProducer;
private final ConsumerStats consumerStats;
private final AtomicInteger emptyPollResultCount = new AtomicInteger(0);
private long initialRecordReceivedTime = 0;
public void process(ConsumerRecords<String, TestRecord> records) throws Exception {
initializeTime();
log.debug("[RecordProcessor] process {} records. thread: {}", records.count(),
Thread.currentThread().getName());
for (ConsumerRecord<String, TestRecord> record : records) {
try {
// 별도의 로직은 없음. 필요시 추가
} catch (Exception e) {
log.error("[ConsumerWorker] failed to consume record, sending to DLT. offset: {}",
record.offset(), e);
dltProducer.sendToDeadLetterTopic(record.key(), record.value(), e);
}
}
}
public void processIfEmptyRecords() {
if (initialRecordReceivedTime != 0) {
emptyPollResultCount.incrementAndGet();
}
if (emptyPollResultCount.get() == POLL_COMPLETION_THRESHOLD) {
long elapsedTime = System.currentTimeMillis() - initialRecordReceivedTime;
consumerStats.update(elapsedTime);
initialRecordReceivedTime = 0;
emptyPollResultCount.set(0);
}
}
private void initializeTime() {
if (initialRecordReceivedTime == 0) {
initialRecordReceivedTime = System.currentTimeMillis();
}
}
}

View File

@ -17,6 +17,7 @@ import org.springframework.test.context.bean.override.mockito.MockitoBean;
import com.sangdol.consumer.domain.TestRecord;
import com.sangdol.consumer.infrastructure.kafka.common.serialize.JsonObjectMapper;
import com.sangdol.consumer.infrastructure.kafka.consumer.MultiWorkerConsumer;
/**
* Spring Retryable 동작 로그 출력 확인용 테스트
@ -24,6 +25,9 @@ import com.sangdol.consumer.infrastructure.kafka.common.serialize.JsonObjectMapp
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
class DLTProducerTest {
@MockitoBean
private MultiWorkerConsumer multiWorkerConsumer;
@MockitoBean
private KafkaProducer<String, String> kafkaProducer;