diff --git a/src/test/java/com/sangdol/consumer/test/TestAdminClient.java b/src/test/java/com/sangdol/consumer/test/TestAdminClient.java new file mode 100644 index 0000000..a74de92 --- /dev/null +++ b/src/test/java/com/sangdol/consumer/test/TestAdminClient.java @@ -0,0 +1,85 @@ +package com.sangdol.consumer.test; + +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; + +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.common.config.TopicConfig; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class TestAdminClient implements AutoCloseable { + + private final AdminClient adminClient; + + public TestAdminClient(String bootStrapServers) { + this.adminClient = adminClient(bootStrapServers); + } + + private AdminClient adminClient(String bootStrapServers) { + Properties props = new Properties(); + props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers); + return AdminClient.create(props); + } + + public void createTopicsIfNotExists(List topics) { + Set currentTopics = getCurrentTopics(); + if (currentTopics.containsAll(topics)) { + log.info("[TestAdminClient] Topic Already Exists "); + return; + } + + int partitions = 3; + short replicationFactor = 3; + Map topicConfigs = Map.of( + TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "2" + ); + + List newTopics = topics.stream() + .map(topic -> new NewTopic(topic, partitions, replicationFactor).configs(topicConfigs)) + .toList(); + + try { + adminClient.createTopics(newTopics) + .all() + .get(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public void deleteTopicsIfExists() { + try { + Set topics = getCurrentTopics(); + if (topics.isEmpty()) { + log.info("[TestAdminClient] no exist topics"); + return; + } + + adminClient.deleteTopics(topics) + .all() + .get(); + log.info("[ConsumerWorker] Successfully delete {} topics( {} )", topics.size(), String.join(", ", topics)); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private Set getCurrentTopics() { + try { + return adminClient.listTopics().names().get(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public void close() throws Exception { + adminClient.close(); + } +} diff --git a/src/test/java/com/sangdol/consumer/test/TestUtils.java b/src/test/java/com/sangdol/consumer/test/TestUtils.java new file mode 100644 index 0000000..3e4f33d --- /dev/null +++ b/src/test/java/com/sangdol/consumer/test/TestUtils.java @@ -0,0 +1,20 @@ +package com.sangdol.consumer.test; + +import org.junit.jupiter.api.Test; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class TestUtils { + + private static final String BOOTSTRAP_SERVERS = "localhost:29092,localhost:39092,localhost:49092"; + + @Test + void deleteAllExistTopics() { + try (TestAdminClient adminClient = new TestAdminClient(BOOTSTRAP_SERVERS)) { + adminClient.deleteTopicsIfExists(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } +}