[#5] 컨슈머 코드 마이그레이션 #6
3
run-kafka-cluster.sh
Executable file
3
run-kafka-cluster.sh
Executable file
@ -0,0 +1,3 @@
|
|||||||
|
#!/bin/bash
|
||||||
|
docker compose -f docker/docker-compose-kafka-cluster.yml down
|
||||||
|
docker compose -f docker/docker-compose-kafka-cluster.yml up -d
|
||||||
@ -0,0 +1,88 @@
|
|||||||
|
package com.sangdol.consumer.infrastructure.kafka.consumer;
|
||||||
|
|
||||||
|
import java.util.Properties;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.ThreadFactory;
|
||||||
|
|
||||||
|
import org.apache.kafka.clients.CommonClientConfigs;
|
||||||
|
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||||
|
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
||||||
|
import org.apache.kafka.common.security.auth.SecurityProtocol;
|
||||||
|
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
||||||
|
import org.springframework.context.annotation.Bean;
|
||||||
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
|
||||||
|
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
|
import com.sangdol.consumer.domain.TestRecord;
|
||||||
|
import com.sangdol.consumer.infrastructure.kafka.common.KafkaProperties;
|
||||||
|
|
||||||
|
import io.micrometer.core.instrument.Gauge;
|
||||||
|
import io.micrometer.core.instrument.binder.MeterBinder;
|
||||||
|
import lombok.RequiredArgsConstructor;
|
||||||
|
|
||||||
|
@Configuration
|
||||||
|
@RequiredArgsConstructor
|
||||||
|
@EnableConfigurationProperties(value = KafkaProperties.class)
|
||||||
|
public class ConsumerConfiguration {
|
||||||
|
|
||||||
|
private final KafkaProperties kafkaProperties;
|
||||||
|
|
||||||
|
public KafkaConsumer<String, TestRecord> kafkaConsumer() {
|
||||||
|
KafkaConsumer<String, TestRecord> consumer = new KafkaConsumer<>(consumerProperties());
|
||||||
|
consumer.subscribe(kafkaProperties.getConsumer().topics(), new RebalanceListener(consumer));
|
||||||
|
|
||||||
|
return consumer;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public MeterBinder elapsedTime(ConsumerStats consumerStats) {
|
||||||
|
String consumerProtocol = kafkaProperties.getConsumer().protocol();
|
||||||
|
return registry -> Gauge.builder("elapsed.time", consumerStats, ConsumerStats::averageTime)
|
||||||
|
.tag("protocol", consumerProtocol)
|
||||||
|
.register(registry);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public ConsumerStats consumerStats() {
|
||||||
|
Integer workerThreads = kafkaProperties.getConsumer().workerThreads();
|
||||||
|
return new ConsumerStats(workerThreads);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public ConsumerPool consumerPool() {
|
||||||
|
ThreadFactory threadFactory = new ThreadFactoryBuilder()
|
||||||
|
.setNameFormat("consumer-pool-%d")
|
||||||
|
.build();
|
||||||
|
int workerThreads = kafkaProperties.getConsumer().workerThreads();
|
||||||
|
|
||||||
|
return new ConsumerPool(
|
||||||
|
Executors.newFixedThreadPool(workerThreads, threadFactory),
|
||||||
|
workerThreads
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
private Properties consumerProperties() {
|
||||||
|
KafkaProperties.Consumer consumer = kafkaProperties.getConsumer();
|
||||||
|
KafkaProperties.BootStrapServers bootStrapServers = kafkaProperties.getBootStrapServers();
|
||||||
|
Properties properties = new Properties();
|
||||||
|
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
|
||||||
|
bootStrapServers.getAddressByProtocol(consumer.protocol()));
|
||||||
|
properties.put(ConsumerConfig.GROUP_ID_CONFIG, consumer.groupId());
|
||||||
|
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, consumer.keyDeserializer());
|
||||||
|
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, consumer.valueDeserializer());
|
||||||
|
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, consumer.autoOffsetReset());
|
||||||
|
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, consumer.enableAutoCommit());
|
||||||
|
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, consumer.maxPollRecords());
|
||||||
|
|
||||||
|
applyRemainPropsBySslEnabled(consumer.isSslEnabled(), properties);
|
||||||
|
|
||||||
|
return properties;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void applyRemainPropsBySslEnabled(boolean isSslEnabled, Properties properties) {
|
||||||
|
if (isSslEnabled) {
|
||||||
|
properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SSL.name);
|
||||||
|
properties.putAll(kafkaProperties.getSsl().toPropsMap());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -0,0 +1,6 @@
|
|||||||
|
package com.sangdol.consumer.infrastructure.kafka.consumer;
|
||||||
|
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
|
||||||
|
public record ConsumerPool(ExecutorService executorService, int poolSize) {
|
||||||
|
}
|
||||||
@ -0,0 +1,30 @@
|
|||||||
|
package com.sangdol.consumer.infrastructure.kafka.consumer;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
public class ConsumerStats {
|
||||||
|
|
||||||
|
private final int workerThreads;
|
||||||
|
private final Map<String, Long> threadElapsedTime;
|
||||||
|
|
||||||
|
public ConsumerStats(int workerThreads) {
|
||||||
|
this.workerThreads = workerThreads;
|
||||||
|
this.threadElapsedTime = new ConcurrentHashMap<>();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void update(long elapsedTime) {
|
||||||
|
threadElapsedTime.put(Thread.currentThread().getName(), elapsedTime);
|
||||||
|
}
|
||||||
|
|
||||||
|
public long averageTime() {
|
||||||
|
if (threadElapsedTime.size() != workerThreads) {
|
||||||
|
return 0L;
|
||||||
|
}
|
||||||
|
|
||||||
|
return threadElapsedTime.values().stream()
|
||||||
|
.collect(Collectors.averagingLong(Long::longValue))
|
||||||
|
.longValue();
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -0,0 +1,87 @@
|
|||||||
|
package com.sangdol.consumer.infrastructure.kafka.consumer;
|
||||||
|
|
||||||
|
import java.time.Duration;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||||
|
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
||||||
|
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
||||||
|
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
|
||||||
|
import org.apache.kafka.common.TopicPartition;
|
||||||
|
import org.apache.kafka.common.errors.WakeupException;
|
||||||
|
|
||||||
|
import com.sangdol.consumer.domain.TestRecord;
|
||||||
|
|
||||||
|
import lombok.RequiredArgsConstructor;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
|
@RequiredArgsConstructor
|
||||||
|
public class ConsumerWorker implements Runnable {
|
||||||
|
|
||||||
|
private final KafkaConsumer<String, TestRecord> consumer;
|
||||||
|
private final RecordProcessor recordProcessor;
|
||||||
|
private final AtomicBoolean running = new AtomicBoolean(true);
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
log.info("[ConsumerWorker] started. thread: {}", Thread.currentThread().getName());
|
||||||
|
while (running.get()) {
|
||||||
|
ConsumerRecords<String, TestRecord> records = consumer.poll(Duration.ofMillis(1000));
|
||||||
|
|
||||||
|
if (records.isEmpty()) {
|
||||||
|
recordProcessor.processIfEmptyRecords();
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
log.debug("[ConsumerWorker] received {} records. thread: {}", records.count(), Thread.currentThread().getName());
|
||||||
|
recordProcessor.process(records);
|
||||||
|
commitAsync(records);
|
||||||
|
}
|
||||||
|
} catch (WakeupException e) {
|
||||||
|
log.info("[ConsumerWorker] Wakeup Exception. thread: {}", Thread.currentThread().getName());
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.info("[ConsumerWorker] Unexpected Exception. thread: {}", Thread.currentThread().getName());
|
||||||
|
} finally {
|
||||||
|
try {
|
||||||
|
consumer.commitSync();
|
||||||
|
log.info("[ConsumerWorker] Final offset committed. Close consumer.. ");
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("[ConsumerWorker] Failed to commit final offsets when shutdown. thread: {}",
|
||||||
|
Thread.currentThread().getName());
|
||||||
|
} finally {
|
||||||
|
consumer.close();
|
||||||
|
log.info("[ConsumerWorker] Closed consumer thread. thread: {}", Thread.currentThread().getName());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void commitAsync(ConsumerRecords<String, TestRecord> records) {
|
||||||
|
Map<TopicPartition, OffsetAndMetadata> offsets = records.partitions().stream()
|
||||||
|
.collect(Collectors.toMap(tp -> tp, tp -> {
|
||||||
|
List<ConsumerRecord<String, TestRecord>> partitionRecords = records.records(tp);
|
||||||
|
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
|
||||||
|
|
||||||
|
return new OffsetAndMetadata(lastOffset + 1);
|
||||||
|
}));
|
||||||
|
|
||||||
|
consumer.commitAsync(offsets, (o, exception) -> {
|
||||||
|
if (exception != null) {
|
||||||
|
log.error("[ConsumerWorker] failed to commit offsets for offset: {}, partition {}", o,
|
||||||
|
records.partitions(), exception);
|
||||||
|
} else {
|
||||||
|
log.debug("[ConsumerWorker] committed offsets for offset: {}, partition {}", o,
|
||||||
|
records.partitions());
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
public void shutdown() {
|
||||||
|
running.set(false);
|
||||||
|
consumer.wakeup();
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -0,0 +1,76 @@
|
|||||||
|
package com.sangdol.consumer.infrastructure.kafka.consumer;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
||||||
|
import org.springframework.boot.context.event.ApplicationReadyEvent;
|
||||||
|
import org.springframework.context.event.EventListener;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import com.sangdol.consumer.domain.TestRecord;
|
||||||
|
import com.sangdol.consumer.infrastructure.kafka.producer.DLTProducer;
|
||||||
|
|
||||||
|
import jakarta.annotation.PreDestroy;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
|
@Component
|
||||||
|
public class MultiWorkerConsumer {
|
||||||
|
|
||||||
|
private final ConsumerConfiguration consumerConfiguration;
|
||||||
|
private final DLTProducer dltProducer;
|
||||||
|
private final ConsumerStats consumerStats;
|
||||||
|
private final ExecutorService executorService;
|
||||||
|
private final Integer workerThreads;
|
||||||
|
private final List<ConsumerWorker> workers;
|
||||||
|
|
||||||
|
public MultiWorkerConsumer(
|
||||||
|
ConsumerConfiguration consumerConfiguration,
|
||||||
|
DLTProducer dltProducer,
|
||||||
|
ConsumerStats consumerStats,
|
||||||
|
ConsumerPool consumerPool
|
||||||
|
) {
|
||||||
|
this.consumerConfiguration = consumerConfiguration;
|
||||||
|
this.dltProducer = dltProducer;
|
||||||
|
this.consumerStats = consumerStats;
|
||||||
|
this.executorService = consumerPool.executorService();
|
||||||
|
this.workerThreads = consumerPool.poolSize();
|
||||||
|
this.workers = new ArrayList<>();
|
||||||
|
}
|
||||||
|
|
||||||
|
@EventListener(ApplicationReadyEvent.class)
|
||||||
|
public void initialize() {
|
||||||
|
log.info("[MultiWorkerConsumer] initialize");
|
||||||
|
|
||||||
|
for (int i = 0; i < workerThreads; i++) {
|
||||||
|
KafkaConsumer<String, TestRecord> consumer = consumerConfiguration.kafkaConsumer();
|
||||||
|
RecordProcessor recordProcessor = new RecordProcessor(dltProducer, consumerStats);
|
||||||
|
ConsumerWorker consumerWorker = new ConsumerWorker(consumer, recordProcessor);
|
||||||
|
workers.add(consumerWorker);
|
||||||
|
executorService.execute(consumerWorker);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@PreDestroy
|
||||||
|
public void shutdown() {
|
||||||
|
log.info("[MultiWorkerConsumer] Shutting down consumers..");
|
||||||
|
workers.forEach(ConsumerWorker::shutdown);
|
||||||
|
executorService.shutdown();
|
||||||
|
|
||||||
|
try {
|
||||||
|
if (!executorService.awaitTermination(30, TimeUnit.SECONDS)) {
|
||||||
|
log.warn("[MultiWorkerConsumer] Executor service did not terminated during 30 seconds. shutdown now.");
|
||||||
|
executorService.shutdownNow();
|
||||||
|
}
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
log.warn("[MultiWorkerConsumer] Interrupted while waiting for executors to terminate. shutdown now.");
|
||||||
|
executorService.shutdownNow();
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
}
|
||||||
|
|
||||||
|
log.info("[MultiWorkerConsumer] Shutdown complete.");
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -0,0 +1,35 @@
|
|||||||
|
package com.sangdol.consumer.infrastructure.kafka.consumer;
|
||||||
|
|
||||||
|
import java.util.Collection;
|
||||||
|
|
||||||
|
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
|
||||||
|
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
||||||
|
import org.apache.kafka.common.TopicPartition;
|
||||||
|
|
||||||
|
import com.sangdol.consumer.domain.TestRecord;
|
||||||
|
|
||||||
|
import lombok.RequiredArgsConstructor;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
|
@RequiredArgsConstructor
|
||||||
|
public class RebalanceListener implements ConsumerRebalanceListener {
|
||||||
|
|
||||||
|
private final KafkaConsumer<String, TestRecord> consumer;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
|
||||||
|
log.warn("[RebalanceListener] Partitions revoked for partitions {} ", partitions);
|
||||||
|
try {
|
||||||
|
consumer.commitSync();
|
||||||
|
log.info("[RebalanceListener] Offsets committed before partition revoked. partition: {}", partitions);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("[RebalanceListener] Failed to commit before partition revoked", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
|
||||||
|
log.info("[RebalanceListener] Partitions assigned for partitions {} ", partitions);
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -0,0 +1,58 @@
|
|||||||
|
package com.sangdol.consumer.infrastructure.kafka.consumer;
|
||||||
|
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
|
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||||
|
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
||||||
|
|
||||||
|
import com.sangdol.consumer.domain.TestRecord;
|
||||||
|
import com.sangdol.consumer.infrastructure.kafka.producer.DLTProducer;
|
||||||
|
|
||||||
|
import lombok.RequiredArgsConstructor;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
|
@RequiredArgsConstructor
|
||||||
|
public class RecordProcessor {
|
||||||
|
|
||||||
|
private static final int POLL_COMPLETION_THRESHOLD = 5;
|
||||||
|
private final DLTProducer dltProducer;
|
||||||
|
private final ConsumerStats consumerStats;
|
||||||
|
private final AtomicInteger emptyPollResultCount = new AtomicInteger(0);
|
||||||
|
private long initialRecordReceivedTime = 0;
|
||||||
|
|
||||||
|
public void process(ConsumerRecords<String, TestRecord> records) throws Exception {
|
||||||
|
initializeTime();
|
||||||
|
|
||||||
|
log.debug("[RecordProcessor] process {} records. thread: {}", records.count(),
|
||||||
|
Thread.currentThread().getName());
|
||||||
|
for (ConsumerRecord<String, TestRecord> record : records) {
|
||||||
|
try {
|
||||||
|
// 별도의 로직은 없음. 필요시 추가
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("[ConsumerWorker] failed to consume record, sending to DLT. offset: {}",
|
||||||
|
record.offset(), e);
|
||||||
|
dltProducer.sendToDeadLetterTopic(record.key(), record.value(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void processIfEmptyRecords() {
|
||||||
|
if (initialRecordReceivedTime != 0) {
|
||||||
|
emptyPollResultCount.incrementAndGet();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (emptyPollResultCount.get() == POLL_COMPLETION_THRESHOLD) {
|
||||||
|
long elapsedTime = System.currentTimeMillis() - initialRecordReceivedTime;
|
||||||
|
consumerStats.update(elapsedTime);
|
||||||
|
initialRecordReceivedTime = 0;
|
||||||
|
emptyPollResultCount.set(0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void initializeTime() {
|
||||||
|
if (initialRecordReceivedTime == 0) {
|
||||||
|
initialRecordReceivedTime = System.currentTimeMillis();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -17,6 +17,7 @@ import org.springframework.test.context.bean.override.mockito.MockitoBean;
|
|||||||
|
|
||||||
import com.sangdol.consumer.domain.TestRecord;
|
import com.sangdol.consumer.domain.TestRecord;
|
||||||
import com.sangdol.consumer.infrastructure.kafka.common.serialize.JsonObjectMapper;
|
import com.sangdol.consumer.infrastructure.kafka.common.serialize.JsonObjectMapper;
|
||||||
|
import com.sangdol.consumer.infrastructure.kafka.consumer.MultiWorkerConsumer;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Spring Retryable 동작 및 로그 출력 확인용 테스트
|
* Spring Retryable 동작 및 로그 출력 확인용 테스트
|
||||||
@ -24,6 +25,9 @@ import com.sangdol.consumer.infrastructure.kafka.common.serialize.JsonObjectMapp
|
|||||||
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
|
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
|
||||||
class DLTProducerTest {
|
class DLTProducerTest {
|
||||||
|
|
||||||
|
@MockitoBean
|
||||||
|
private MultiWorkerConsumer multiWorkerConsumer;
|
||||||
|
|
||||||
@MockitoBean
|
@MockitoBean
|
||||||
private KafkaProducer<String, String> kafkaProducer;
|
private KafkaProducer<String, String> kafkaProducer;
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user