test: 테스트를 위한 토픽 관리용 AdminClient 및 관련 처리 로직 추가

This commit is contained in:
이상진 2025-06-27 14:48:31 +09:00
parent ebd003b79e
commit ad56fa909a
2 changed files with 105 additions and 0 deletions

View File

@ -0,0 +1,85 @@
package com.sangdol.consumer.test;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.config.TopicConfig;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class TestAdminClient implements AutoCloseable {
private final AdminClient adminClient;
public TestAdminClient(String bootStrapServers) {
this.adminClient = adminClient(bootStrapServers);
}
private AdminClient adminClient(String bootStrapServers) {
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
return AdminClient.create(props);
}
public void createTopicsIfNotExists(List<String> topics) {
Set<String> currentTopics = getCurrentTopics();
if (currentTopics.containsAll(topics)) {
log.info("[TestAdminClient] Topic Already Exists ");
return;
}
int partitions = 3;
short replicationFactor = 3;
Map<String, String> topicConfigs = Map.of(
TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "2"
);
List<NewTopic> newTopics = topics.stream()
.map(topic -> new NewTopic(topic, partitions, replicationFactor).configs(topicConfigs))
.toList();
try {
adminClient.createTopics(newTopics)
.all()
.get();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public void deleteTopicsIfExists() {
try {
Set<String> topics = getCurrentTopics();
if (topics.isEmpty()) {
log.info("[TestAdminClient] no exist topics");
return;
}
adminClient.deleteTopics(topics)
.all()
.get();
log.info("[ConsumerWorker] Successfully delete {} topics( {} )", topics.size(), String.join(", ", topics));
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private Set<String> getCurrentTopics() {
try {
return adminClient.listTopics().names().get();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public void close() throws Exception {
adminClient.close();
}
}

View File

@ -0,0 +1,20 @@
package com.sangdol.consumer.test;
import org.junit.jupiter.api.Test;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class TestUtils {
private static final String BOOTSTRAP_SERVERS = "localhost:29092,localhost:39092,localhost:49092";
@Test
void deleteAllExistTopics() {
try (TestAdminClient adminClient = new TestAdminClient(BOOTSTRAP_SERVERS)) {
adminClient.deleteTopicsIfExists();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}