From ad56fa909afdddfc0411d3003c98a61e6bfea406 Mon Sep 17 00:00:00 2001 From: pricelees Date: Fri, 27 Jun 2025 14:48:31 +0900 Subject: [PATCH] =?UTF-8?q?test:=20=ED=85=8C=EC=8A=A4=ED=8A=B8=EB=A5=BC=20?= =?UTF-8?q?=EC=9C=84=ED=95=9C=20=ED=86=A0=ED=94=BD=20=EA=B4=80=EB=A6=AC?= =?UTF-8?q?=EC=9A=A9=20AdminClient=20=EB=B0=8F=20=EA=B4=80=EB=A0=A8=20?= =?UTF-8?q?=EC=B2=98=EB=A6=AC=20=EB=A1=9C=EC=A7=81=20=EC=B6=94=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../consumer/test/TestAdminClient.java | 85 +++++++++++++++++++ .../com/sangdol/consumer/test/TestUtils.java | 20 +++++ 2 files changed, 105 insertions(+) create mode 100644 src/test/java/com/sangdol/consumer/test/TestAdminClient.java create mode 100644 src/test/java/com/sangdol/consumer/test/TestUtils.java diff --git a/src/test/java/com/sangdol/consumer/test/TestAdminClient.java b/src/test/java/com/sangdol/consumer/test/TestAdminClient.java new file mode 100644 index 0000000..a74de92 --- /dev/null +++ b/src/test/java/com/sangdol/consumer/test/TestAdminClient.java @@ -0,0 +1,85 @@ +package com.sangdol.consumer.test; + +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; + +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.common.config.TopicConfig; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class TestAdminClient implements AutoCloseable { + + private final AdminClient adminClient; + + public TestAdminClient(String bootStrapServers) { + this.adminClient = adminClient(bootStrapServers); + } + + private AdminClient adminClient(String bootStrapServers) { + Properties props = new Properties(); + props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers); + return AdminClient.create(props); + } + + public void createTopicsIfNotExists(List topics) { + Set currentTopics = getCurrentTopics(); + if (currentTopics.containsAll(topics)) { + log.info("[TestAdminClient] Topic Already Exists "); + return; + } + + int partitions = 3; + short replicationFactor = 3; + Map topicConfigs = Map.of( + TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "2" + ); + + List newTopics = topics.stream() + .map(topic -> new NewTopic(topic, partitions, replicationFactor).configs(topicConfigs)) + .toList(); + + try { + adminClient.createTopics(newTopics) + .all() + .get(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public void deleteTopicsIfExists() { + try { + Set topics = getCurrentTopics(); + if (topics.isEmpty()) { + log.info("[TestAdminClient] no exist topics"); + return; + } + + adminClient.deleteTopics(topics) + .all() + .get(); + log.info("[ConsumerWorker] Successfully delete {} topics( {} )", topics.size(), String.join(", ", topics)); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private Set getCurrentTopics() { + try { + return adminClient.listTopics().names().get(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public void close() throws Exception { + adminClient.close(); + } +} diff --git a/src/test/java/com/sangdol/consumer/test/TestUtils.java b/src/test/java/com/sangdol/consumer/test/TestUtils.java new file mode 100644 index 0000000..3e4f33d --- /dev/null +++ b/src/test/java/com/sangdol/consumer/test/TestUtils.java @@ -0,0 +1,20 @@ +package com.sangdol.consumer.test; + +import org.junit.jupiter.api.Test; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class TestUtils { + + private static final String BOOTSTRAP_SERVERS = "localhost:29092,localhost:39092,localhost:49092"; + + @Test + void deleteAllExistTopics() { + try (TestAdminClient adminClient = new TestAdminClient(BOOTSTRAP_SERVERS)) { + adminClient.deleteTopicsIfExists(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } +}