From ad56fa909afdddfc0411d3003c98a61e6bfea406 Mon Sep 17 00:00:00 2001 From: pricelees Date: Fri, 27 Jun 2025 14:48:31 +0900 Subject: [PATCH 1/4] =?UTF-8?q?test:=20=ED=85=8C=EC=8A=A4=ED=8A=B8?= =?UTF-8?q?=EB=A5=BC=20=EC=9C=84=ED=95=9C=20=ED=86=A0=ED=94=BD=20=EA=B4=80?= =?UTF-8?q?=EB=A6=AC=EC=9A=A9=20AdminClient=20=EB=B0=8F=20=EA=B4=80?= =?UTF-8?q?=EB=A0=A8=20=EC=B2=98=EB=A6=AC=20=EB=A1=9C=EC=A7=81=20=EC=B6=94?= =?UTF-8?q?=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../consumer/test/TestAdminClient.java | 85 +++++++++++++++++++ .../com/sangdol/consumer/test/TestUtils.java | 20 +++++ 2 files changed, 105 insertions(+) create mode 100644 src/test/java/com/sangdol/consumer/test/TestAdminClient.java create mode 100644 src/test/java/com/sangdol/consumer/test/TestUtils.java diff --git a/src/test/java/com/sangdol/consumer/test/TestAdminClient.java b/src/test/java/com/sangdol/consumer/test/TestAdminClient.java new file mode 100644 index 0000000..a74de92 --- /dev/null +++ b/src/test/java/com/sangdol/consumer/test/TestAdminClient.java @@ -0,0 +1,85 @@ +package com.sangdol.consumer.test; + +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; + +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.common.config.TopicConfig; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class TestAdminClient implements AutoCloseable { + + private final AdminClient adminClient; + + public TestAdminClient(String bootStrapServers) { + this.adminClient = adminClient(bootStrapServers); + } + + private AdminClient adminClient(String bootStrapServers) { + Properties props = new Properties(); + props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers); + return AdminClient.create(props); + } + + public void createTopicsIfNotExists(List topics) { + Set currentTopics = getCurrentTopics(); + if (currentTopics.containsAll(topics)) { + log.info("[TestAdminClient] Topic Already Exists "); + return; + } + + int partitions = 3; + short replicationFactor = 3; + Map topicConfigs = Map.of( + TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "2" + ); + + List newTopics = topics.stream() + .map(topic -> new NewTopic(topic, partitions, replicationFactor).configs(topicConfigs)) + .toList(); + + try { + adminClient.createTopics(newTopics) + .all() + .get(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public void deleteTopicsIfExists() { + try { + Set topics = getCurrentTopics(); + if (topics.isEmpty()) { + log.info("[TestAdminClient] no exist topics"); + return; + } + + adminClient.deleteTopics(topics) + .all() + .get(); + log.info("[ConsumerWorker] Successfully delete {} topics( {} )", topics.size(), String.join(", ", topics)); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private Set getCurrentTopics() { + try { + return adminClient.listTopics().names().get(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public void close() throws Exception { + adminClient.close(); + } +} diff --git a/src/test/java/com/sangdol/consumer/test/TestUtils.java b/src/test/java/com/sangdol/consumer/test/TestUtils.java new file mode 100644 index 0000000..3e4f33d --- /dev/null +++ b/src/test/java/com/sangdol/consumer/test/TestUtils.java @@ -0,0 +1,20 @@ +package com.sangdol.consumer.test; + +import org.junit.jupiter.api.Test; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class TestUtils { + + private static final String BOOTSTRAP_SERVERS = "localhost:29092,localhost:39092,localhost:49092"; + + @Test + void deleteAllExistTopics() { + try (TestAdminClient adminClient = new TestAdminClient(BOOTSTRAP_SERVERS)) { + adminClient.deleteTopicsIfExists(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } +} From 1eb46b7921aca8482c84c4e959eeebe9cc7bdd4d Mon Sep 17 00:00:00 2001 From: pricelees Date: Fri, 27 Jun 2025 14:49:06 +0900 Subject: [PATCH 2/4] =?UTF-8?q?test:=20=ED=85=8C=EC=8A=A4=ED=8A=B8?= =?UTF-8?q?=EC=97=90=20=EC=82=AC=EC=9A=A9=EB=90=98=EB=8A=94=20=EB=A0=88?= =?UTF-8?q?=EC=BD=94=EB=93=9C=20=EC=B6=94=EA=B0=80=20=EA=B8=B0=EB=8A=A5=20?= =?UTF-8?q?=EA=B5=AC=ED=98=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/sangdol/consumer/test/TestUtils.java | 58 +++++++++++++++++++ 1 file changed, 58 insertions(+) diff --git a/src/test/java/com/sangdol/consumer/test/TestUtils.java b/src/test/java/com/sangdol/consumer/test/TestUtils.java index 3e4f33d..47d5749 100644 --- a/src/test/java/com/sangdol/consumer/test/TestUtils.java +++ b/src/test/java/com/sangdol/consumer/test/TestUtils.java @@ -1,13 +1,27 @@ package com.sangdol.consumer.test; +import java.time.Instant; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.ThreadLocalRandom; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringSerializer; import org.junit.jupiter.api.Test; +import com.sangdol.consumer.domain.TestRecord; +import com.sangdol.consumer.infrastructure.kafka.common.serialize.TestRecordSerializer; + import lombok.extern.slf4j.Slf4j; @Slf4j public class TestUtils { private static final String BOOTSTRAP_SERVERS = "localhost:29092,localhost:39092,localhost:49092"; + private static final int RECORD_PER_TOPIC = 1_000_000; + private final List initialTopics = List.of("topic-0", "topic-1", "topic-2"); @Test void deleteAllExistTopics() { @@ -17,4 +31,48 @@ public class TestUtils { throw new RuntimeException(e); } } + + @Test + void produceRecords() { + try (TestAdminClient adminClient = new TestAdminClient(BOOTSTRAP_SERVERS)) { + adminClient.createTopicsIfNotExists(initialTopics); + } catch (Exception e) { + throw new RuntimeException(e); + } + + try (KafkaProducer producer = kafkaProducer()) { + for (String topic : initialTopics) { + long start = System.currentTimeMillis(); + + for (int i = 0; i < RECORD_PER_TOPIC; i++) { + int randomId = ThreadLocalRandom.current().nextInt(100_000, 1_000_000); + String timeStamp = Instant.now().toString(); + TestRecord data = new TestRecord(randomId, topic, timeStamp); + ProducerRecord producerRecord = new ProducerRecord<>( + topic, + String.valueOf(randomId), + data + ); + producer.send(producerRecord); + } + + long end = System.currentTimeMillis(); + + log.info("[TestUtils] topic: {} finished in {} ms", topic, end - start); + } + } + } + + private KafkaProducer kafkaProducer() { + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, TestRecordSerializer.class); + props.put(ProducerConfig.ACKS_CONFIG, "1"); + props.put(ProducerConfig.LINGER_MS_CONFIG, 20); + props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67108864); + props.put(ProducerConfig.BATCH_SIZE_CONFIG, 65536); + + return new KafkaProducer<>(props); + } } From c348ae6dbddaef2a1476f6982b59cf140e423e5d Mon Sep 17 00:00:00 2001 From: pricelees Date: Fri, 27 Jun 2025 14:52:33 +0900 Subject: [PATCH 3/4] =?UTF-8?q?fix:=20=ED=94=84=EB=A1=9C=EC=A0=9D=ED=8A=B8?= =?UTF-8?q?=EB=AA=85=20&=20=ED=8C=A8=ED=82=A4=EC=A7=80=EB=AA=85=20?= =?UTF-8?q?=EC=88=98=EC=A0=95=20=EB=B0=98=EC=98=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- docker/application/Dockerfile_Application | 4 ++-- run.sh | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/docker/application/Dockerfile_Application b/docker/application/Dockerfile_Application index 2e3f186..74aa44e 100644 --- a/docker/application/Dockerfile_Application +++ b/docker/application/Dockerfile_Application @@ -2,10 +2,10 @@ FROM amazoncorretto:17-alpine-jdk COPY docker/jmx/jmx_prometheus_javaagent-0.3.1.jar /opt/jmx_exporter/jmx_prometheus_javaagent-0.3.1.jar COPY docker/jmx/client-metrics.yml /opt/jmx_exporter/client-metrics.yml -COPY build/libs/consumer-ssl-0.0.1-SNAPSHOT.jar /app/consumer-ssl-0.0.1-SNAPSHOT.jar +COPY build/libs/consumer-0.0.1-SNAPSHOT.jar /app/consumer-0.0.1-SNAPSHOT.jar COPY ssl /ssl EXPOSE 8080 EXPOSE 9400 -ENTRYPOINT ["java", "-javaagent:/opt/jmx_exporter/jmx_prometheus_javaagent-0.3.1.jar=9400:/opt/jmx_exporter/client-metrics.yml", "-jar", "/app/consumer-ssl-0.0.1-SNAPSHOT.jar"] \ No newline at end of file +ENTRYPOINT ["java", "-javaagent:/opt/jmx_exporter/jmx_prometheus_javaagent-0.3.1.jar=9400:/opt/jmx_exporter/client-metrics.yml", "-jar", "/app/consumer-0.0.1-SNAPSHOT.jar"] \ No newline at end of file diff --git a/run.sh b/run.sh index d2d2d9d..666bce7 100755 --- a/run.sh +++ b/run.sh @@ -47,12 +47,12 @@ docker compose -f docker/docker-compose-application.yml up -d echo "[4-1] 실행 후 30초 대기..." sleep 30 echo "[4-1] 테스트 시작" -./gradlew clean test --tests "org.study.consumerssl.data.TestUtils.produceRecords" +./gradlew clean test --tests "com.sangdol.consumer.test.TestUtils.produceRecords" echo "[4-1] 테스트 완료. SSL 모드 실행 전 2분 대기.." sleep 120 echo "[4-1] 토픽 초기화.." -./gradlew clean test --tests "org.study.consumerssl.data.TestUtils.deleteAllExistTopics" +./gradlew clean test --tests "com.sangdol.consumer.test.TestUtils.deleteAllExistTopics" echo " " echo "---------------------" @@ -71,7 +71,7 @@ echo "[4-2] 실행 후 30초 대기..." sleep 30 echo "[4-2] 테스트 시작" -./gradlew clean test --tests "org.study.consumerssl.data.TestUtils.produceRecords" +./gradlew clean test --tests "com.sangdol.consumer.test.TestUtils.produceRecords" echo "[4-2] 이전과 동일하게 2분 대기 후 애플리케이션 종료.." sleep 120 From 77ee1dff8c61e65f33fb083038d3a1eafba51c29 Mon Sep 17 00:00:00 2001 From: pricelees Date: Fri, 27 Jun 2025 15:07:15 +0900 Subject: [PATCH 4/4] =?UTF-8?q?refactor:=20ConsumerWorker=EC=97=90?= =?UTF-8?q?=EC=84=9C=EC=9D=98=20=EB=A9=94=EC=84=9C=EB=93=9C=20=EB=B0=8F=20?= =?UTF-8?q?=EB=B3=80=EC=88=98=20=EB=B6=84=EB=A6=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - finally 구문에서 실행되는 종료 로직을 closeAfterManualCommit() 으로 분리 - consumer.commitAsync에 들어가는 OffsetCommitCallback을 변수로 분리 --- .../kafka/consumer/ConsumerWorker.java | 37 +++++++++++-------- 1 file changed, 22 insertions(+), 15 deletions(-) diff --git a/src/main/java/com/sangdol/consumer/infrastructure/kafka/consumer/ConsumerWorker.java b/src/main/java/com/sangdol/consumer/infrastructure/kafka/consumer/ConsumerWorker.java index 84b7492..13f7472 100644 --- a/src/main/java/com/sangdol/consumer/infrastructure/kafka/consumer/ConsumerWorker.java +++ b/src/main/java/com/sangdol/consumer/infrastructure/kafka/consumer/ConsumerWorker.java @@ -10,6 +10,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.consumer.OffsetCommitCallback; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.WakeupException; @@ -47,19 +48,15 @@ public class ConsumerWorker implements Runnable { } catch (Exception e) { log.info("[ConsumerWorker] Unexpected Exception. thread: {}", Thread.currentThread().getName()); } finally { - try { - consumer.commitSync(); - log.info("[ConsumerWorker] Final offset committed. Close consumer.. "); - } catch (Exception e) { - log.error("[ConsumerWorker] Failed to commit final offsets when shutdown. thread: {}", - Thread.currentThread().getName()); - } finally { - consumer.close(); - log.info("[ConsumerWorker] Closed consumer thread. thread: {}", Thread.currentThread().getName()); - } + closeAfterManualCommit(); } } + public void shutdown() { + running.set(false); + consumer.wakeup(); + } + private void commitAsync(ConsumerRecords records) { Map offsets = records.partitions().stream() .collect(Collectors.toMap(tp -> tp, tp -> { @@ -69,7 +66,7 @@ public class ConsumerWorker implements Runnable { return new OffsetAndMetadata(lastOffset + 1); })); - consumer.commitAsync(offsets, (o, exception) -> { + OffsetCommitCallback callback = (o, exception) -> { if (exception != null) { log.error("[ConsumerWorker] failed to commit offsets for offset: {}, partition {}", o, records.partitions(), exception); @@ -77,11 +74,21 @@ public class ConsumerWorker implements Runnable { log.debug("[ConsumerWorker] committed offsets for offset: {}, partition {}", o, records.partitions()); } - }); + }; + + consumer.commitAsync(offsets, callback); } - public void shutdown() { - running.set(false); - consumer.wakeup(); + private void closeAfterManualCommit() { + try { + consumer.commitSync(); + log.info("[ConsumerWorker] Final offset committed. Close consumer.. "); + } catch (Exception e) { + log.error("[ConsumerWorker] Failed to commit final offsets when shutdown. thread: {}", + Thread.currentThread().getName()); + } finally { + consumer.close(); + log.info("[ConsumerWorker] Closed consumer thread. thread: {}", Thread.currentThread().getName()); + } } }