Compare commits

...

4 Commits

Author SHA1 Message Date
77ee1dff8c refactor: ConsumerWorker에서의 메서드 및 변수 분리
- finally 구문에서 실행되는 종료 로직을 closeAfterManualCommit() 으로 분리

- consumer.commitAsync에 들어가는 OffsetCommitCallback을 변수로 분리
2025-06-27 15:07:15 +09:00
c348ae6dbd fix: 프로젝트명 & 패키지명 수정 반영 2025-06-27 14:52:33 +09:00
1eb46b7921 test: 테스트에 사용되는 레코드 추가 기능 구현 2025-06-27 14:49:06 +09:00
ad56fa909a test: 테스트를 위한 토픽 관리용 AdminClient 및 관련 처리 로직 추가 2025-06-27 14:48:31 +09:00
5 changed files with 190 additions and 20 deletions

View File

@ -2,10 +2,10 @@ FROM amazoncorretto:17-alpine-jdk
COPY docker/jmx/jmx_prometheus_javaagent-0.3.1.jar /opt/jmx_exporter/jmx_prometheus_javaagent-0.3.1.jar COPY docker/jmx/jmx_prometheus_javaagent-0.3.1.jar /opt/jmx_exporter/jmx_prometheus_javaagent-0.3.1.jar
COPY docker/jmx/client-metrics.yml /opt/jmx_exporter/client-metrics.yml COPY docker/jmx/client-metrics.yml /opt/jmx_exporter/client-metrics.yml
COPY build/libs/consumer-ssl-0.0.1-SNAPSHOT.jar /app/consumer-ssl-0.0.1-SNAPSHOT.jar COPY build/libs/consumer-0.0.1-SNAPSHOT.jar /app/consumer-0.0.1-SNAPSHOT.jar
COPY ssl /ssl COPY ssl /ssl
EXPOSE 8080 EXPOSE 8080
EXPOSE 9400 EXPOSE 9400
ENTRYPOINT ["java", "-javaagent:/opt/jmx_exporter/jmx_prometheus_javaagent-0.3.1.jar=9400:/opt/jmx_exporter/client-metrics.yml", "-jar", "/app/consumer-ssl-0.0.1-SNAPSHOT.jar"] ENTRYPOINT ["java", "-javaagent:/opt/jmx_exporter/jmx_prometheus_javaagent-0.3.1.jar=9400:/opt/jmx_exporter/client-metrics.yml", "-jar", "/app/consumer-0.0.1-SNAPSHOT.jar"]

6
run.sh
View File

@ -47,12 +47,12 @@ docker compose -f docker/docker-compose-application.yml up -d
echo "[4-1] 실행 후 30초 대기..." echo "[4-1] 실행 후 30초 대기..."
sleep 30 sleep 30
echo "[4-1] 테스트 시작" echo "[4-1] 테스트 시작"
./gradlew clean test --tests "org.study.consumerssl.data.TestUtils.produceRecords" ./gradlew clean test --tests "com.sangdol.consumer.test.TestUtils.produceRecords"
echo "[4-1] 테스트 완료. SSL 모드 실행 전 2분 대기.." echo "[4-1] 테스트 완료. SSL 모드 실행 전 2분 대기.."
sleep 120 sleep 120
echo "[4-1] 토픽 초기화.." echo "[4-1] 토픽 초기화.."
./gradlew clean test --tests "org.study.consumerssl.data.TestUtils.deleteAllExistTopics" ./gradlew clean test --tests "com.sangdol.consumer.test.TestUtils.deleteAllExistTopics"
echo " " echo " "
echo "---------------------" echo "---------------------"
@ -71,7 +71,7 @@ echo "[4-2] 실행 후 30초 대기..."
sleep 30 sleep 30
echo "[4-2] 테스트 시작" echo "[4-2] 테스트 시작"
./gradlew clean test --tests "org.study.consumerssl.data.TestUtils.produceRecords" ./gradlew clean test --tests "com.sangdol.consumer.test.TestUtils.produceRecords"
echo "[4-2] 이전과 동일하게 2분 대기 후 애플리케이션 종료.." echo "[4-2] 이전과 동일하게 2분 대기 후 애플리케이션 종료.."
sleep 120 sleep 120

View File

@ -10,6 +10,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.errors.WakeupException;
@ -47,17 +48,13 @@ public class ConsumerWorker implements Runnable {
} catch (Exception e) { } catch (Exception e) {
log.info("[ConsumerWorker] Unexpected Exception. thread: {}", Thread.currentThread().getName()); log.info("[ConsumerWorker] Unexpected Exception. thread: {}", Thread.currentThread().getName());
} finally { } finally {
try { closeAfterManualCommit();
consumer.commitSync();
log.info("[ConsumerWorker] Final offset committed. Close consumer.. ");
} catch (Exception e) {
log.error("[ConsumerWorker] Failed to commit final offsets when shutdown. thread: {}",
Thread.currentThread().getName());
} finally {
consumer.close();
log.info("[ConsumerWorker] Closed consumer thread. thread: {}", Thread.currentThread().getName());
} }
} }
public void shutdown() {
running.set(false);
consumer.wakeup();
} }
private void commitAsync(ConsumerRecords<String, TestRecord> records) { private void commitAsync(ConsumerRecords<String, TestRecord> records) {
@ -69,7 +66,7 @@ public class ConsumerWorker implements Runnable {
return new OffsetAndMetadata(lastOffset + 1); return new OffsetAndMetadata(lastOffset + 1);
})); }));
consumer.commitAsync(offsets, (o, exception) -> { OffsetCommitCallback callback = (o, exception) -> {
if (exception != null) { if (exception != null) {
log.error("[ConsumerWorker] failed to commit offsets for offset: {}, partition {}", o, log.error("[ConsumerWorker] failed to commit offsets for offset: {}, partition {}", o,
records.partitions(), exception); records.partitions(), exception);
@ -77,11 +74,21 @@ public class ConsumerWorker implements Runnable {
log.debug("[ConsumerWorker] committed offsets for offset: {}, partition {}", o, log.debug("[ConsumerWorker] committed offsets for offset: {}, partition {}", o,
records.partitions()); records.partitions());
} }
}); };
consumer.commitAsync(offsets, callback);
} }
public void shutdown() { private void closeAfterManualCommit() {
running.set(false); try {
consumer.wakeup(); consumer.commitSync();
log.info("[ConsumerWorker] Final offset committed. Close consumer.. ");
} catch (Exception e) {
log.error("[ConsumerWorker] Failed to commit final offsets when shutdown. thread: {}",
Thread.currentThread().getName());
} finally {
consumer.close();
log.info("[ConsumerWorker] Closed consumer thread. thread: {}", Thread.currentThread().getName());
}
} }
} }

View File

@ -0,0 +1,85 @@
package com.sangdol.consumer.test;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.config.TopicConfig;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class TestAdminClient implements AutoCloseable {
private final AdminClient adminClient;
public TestAdminClient(String bootStrapServers) {
this.adminClient = adminClient(bootStrapServers);
}
private AdminClient adminClient(String bootStrapServers) {
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
return AdminClient.create(props);
}
public void createTopicsIfNotExists(List<String> topics) {
Set<String> currentTopics = getCurrentTopics();
if (currentTopics.containsAll(topics)) {
log.info("[TestAdminClient] Topic Already Exists ");
return;
}
int partitions = 3;
short replicationFactor = 3;
Map<String, String> topicConfigs = Map.of(
TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "2"
);
List<NewTopic> newTopics = topics.stream()
.map(topic -> new NewTopic(topic, partitions, replicationFactor).configs(topicConfigs))
.toList();
try {
adminClient.createTopics(newTopics)
.all()
.get();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public void deleteTopicsIfExists() {
try {
Set<String> topics = getCurrentTopics();
if (topics.isEmpty()) {
log.info("[TestAdminClient] no exist topics");
return;
}
adminClient.deleteTopics(topics)
.all()
.get();
log.info("[ConsumerWorker] Successfully delete {} topics( {} )", topics.size(), String.join(", ", topics));
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private Set<String> getCurrentTopics() {
try {
return adminClient.listTopics().names().get();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public void close() throws Exception {
adminClient.close();
}
}

View File

@ -0,0 +1,78 @@
package com.sangdol.consumer.test;
import java.time.Instant;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.Test;
import com.sangdol.consumer.domain.TestRecord;
import com.sangdol.consumer.infrastructure.kafka.common.serialize.TestRecordSerializer;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class TestUtils {
private static final String BOOTSTRAP_SERVERS = "localhost:29092,localhost:39092,localhost:49092";
private static final int RECORD_PER_TOPIC = 1_000_000;
private final List<String> initialTopics = List.of("topic-0", "topic-1", "topic-2");
@Test
void deleteAllExistTopics() {
try (TestAdminClient adminClient = new TestAdminClient(BOOTSTRAP_SERVERS)) {
adminClient.deleteTopicsIfExists();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Test
void produceRecords() {
try (TestAdminClient adminClient = new TestAdminClient(BOOTSTRAP_SERVERS)) {
adminClient.createTopicsIfNotExists(initialTopics);
} catch (Exception e) {
throw new RuntimeException(e);
}
try (KafkaProducer<String, TestRecord> producer = kafkaProducer()) {
for (String topic : initialTopics) {
long start = System.currentTimeMillis();
for (int i = 0; i < RECORD_PER_TOPIC; i++) {
int randomId = ThreadLocalRandom.current().nextInt(100_000, 1_000_000);
String timeStamp = Instant.now().toString();
TestRecord data = new TestRecord(randomId, topic, timeStamp);
ProducerRecord<String, TestRecord> producerRecord = new ProducerRecord<>(
topic,
String.valueOf(randomId),
data
);
producer.send(producerRecord);
}
long end = System.currentTimeMillis();
log.info("[TestUtils] topic: {} finished in {} ms", topic, end - start);
}
}
}
private KafkaProducer<String, TestRecord> kafkaProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, TestRecordSerializer.class);
props.put(ProducerConfig.ACKS_CONFIG, "1");
props.put(ProducerConfig.LINGER_MS_CONFIG, 20);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67108864);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 65536);
return new KafkaProducer<>(props);
}
}