[#7] 테스트 스크립트 실행을 위한 나머지 환경 추가 및 테스트 진행 #8

Merged
pricelees merged 4 commits from infra/#7 into main 2025-06-27 06:09:37 +00:00
5 changed files with 190 additions and 20 deletions

View File

@ -2,10 +2,10 @@ FROM amazoncorretto:17-alpine-jdk
COPY docker/jmx/jmx_prometheus_javaagent-0.3.1.jar /opt/jmx_exporter/jmx_prometheus_javaagent-0.3.1.jar
COPY docker/jmx/client-metrics.yml /opt/jmx_exporter/client-metrics.yml
COPY build/libs/consumer-ssl-0.0.1-SNAPSHOT.jar /app/consumer-ssl-0.0.1-SNAPSHOT.jar
COPY build/libs/consumer-0.0.1-SNAPSHOT.jar /app/consumer-0.0.1-SNAPSHOT.jar
COPY ssl /ssl
EXPOSE 8080
EXPOSE 9400
ENTRYPOINT ["java", "-javaagent:/opt/jmx_exporter/jmx_prometheus_javaagent-0.3.1.jar=9400:/opt/jmx_exporter/client-metrics.yml", "-jar", "/app/consumer-ssl-0.0.1-SNAPSHOT.jar"]
ENTRYPOINT ["java", "-javaagent:/opt/jmx_exporter/jmx_prometheus_javaagent-0.3.1.jar=9400:/opt/jmx_exporter/client-metrics.yml", "-jar", "/app/consumer-0.0.1-SNAPSHOT.jar"]

6
run.sh
View File

@ -47,12 +47,12 @@ docker compose -f docker/docker-compose-application.yml up -d
echo "[4-1] 실행 후 30초 대기..."
sleep 30
echo "[4-1] 테스트 시작"
./gradlew clean test --tests "org.study.consumerssl.data.TestUtils.produceRecords"
./gradlew clean test --tests "com.sangdol.consumer.test.TestUtils.produceRecords"
echo "[4-1] 테스트 완료. SSL 모드 실행 전 2분 대기.."
sleep 120
echo "[4-1] 토픽 초기화.."
./gradlew clean test --tests "org.study.consumerssl.data.TestUtils.deleteAllExistTopics"
./gradlew clean test --tests "com.sangdol.consumer.test.TestUtils.deleteAllExistTopics"
echo " "
echo "---------------------"
@ -71,7 +71,7 @@ echo "[4-2] 실행 후 30초 대기..."
sleep 30
echo "[4-2] 테스트 시작"
./gradlew clean test --tests "org.study.consumerssl.data.TestUtils.produceRecords"
./gradlew clean test --tests "com.sangdol.consumer.test.TestUtils.produceRecords"
echo "[4-2] 이전과 동일하게 2분 대기 후 애플리케이션 종료.."
sleep 120

View File

@ -10,6 +10,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
@ -47,6 +48,38 @@ public class ConsumerWorker implements Runnable {
} catch (Exception e) {
log.info("[ConsumerWorker] Unexpected Exception. thread: {}", Thread.currentThread().getName());
} finally {
closeAfterManualCommit();
}
}
public void shutdown() {
running.set(false);
consumer.wakeup();
}
private void commitAsync(ConsumerRecords<String, TestRecord> records) {
Map<TopicPartition, OffsetAndMetadata> offsets = records.partitions().stream()
.collect(Collectors.toMap(tp -> tp, tp -> {
List<ConsumerRecord<String, TestRecord>> partitionRecords = records.records(tp);
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
return new OffsetAndMetadata(lastOffset + 1);
}));
OffsetCommitCallback callback = (o, exception) -> {
if (exception != null) {
log.error("[ConsumerWorker] failed to commit offsets for offset: {}, partition {}", o,
records.partitions(), exception);
} else {
log.debug("[ConsumerWorker] committed offsets for offset: {}, partition {}", o,
records.partitions());
}
};
consumer.commitAsync(offsets, callback);
}
private void closeAfterManualCommit() {
try {
consumer.commitSync();
log.info("[ConsumerWorker] Final offset committed. Close consumer.. ");
@ -59,29 +92,3 @@ public class ConsumerWorker implements Runnable {
}
}
}
private void commitAsync(ConsumerRecords<String, TestRecord> records) {
Map<TopicPartition, OffsetAndMetadata> offsets = records.partitions().stream()
.collect(Collectors.toMap(tp -> tp, tp -> {
List<ConsumerRecord<String, TestRecord>> partitionRecords = records.records(tp);
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
return new OffsetAndMetadata(lastOffset + 1);
}));
consumer.commitAsync(offsets, (o, exception) -> {
if (exception != null) {
log.error("[ConsumerWorker] failed to commit offsets for offset: {}, partition {}", o,
records.partitions(), exception);
} else {
log.debug("[ConsumerWorker] committed offsets for offset: {}, partition {}", o,
records.partitions());
}
});
}
public void shutdown() {
running.set(false);
consumer.wakeup();
}
}

View File

@ -0,0 +1,85 @@
package com.sangdol.consumer.test;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.config.TopicConfig;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class TestAdminClient implements AutoCloseable {
private final AdminClient adminClient;
public TestAdminClient(String bootStrapServers) {
this.adminClient = adminClient(bootStrapServers);
}
private AdminClient adminClient(String bootStrapServers) {
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
return AdminClient.create(props);
}
public void createTopicsIfNotExists(List<String> topics) {
Set<String> currentTopics = getCurrentTopics();
if (currentTopics.containsAll(topics)) {
log.info("[TestAdminClient] Topic Already Exists ");
return;
}
int partitions = 3;
short replicationFactor = 3;
Map<String, String> topicConfigs = Map.of(
TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "2"
);
List<NewTopic> newTopics = topics.stream()
.map(topic -> new NewTopic(topic, partitions, replicationFactor).configs(topicConfigs))
.toList();
try {
adminClient.createTopics(newTopics)
.all()
.get();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public void deleteTopicsIfExists() {
try {
Set<String> topics = getCurrentTopics();
if (topics.isEmpty()) {
log.info("[TestAdminClient] no exist topics");
return;
}
adminClient.deleteTopics(topics)
.all()
.get();
log.info("[ConsumerWorker] Successfully delete {} topics( {} )", topics.size(), String.join(", ", topics));
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private Set<String> getCurrentTopics() {
try {
return adminClient.listTopics().names().get();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public void close() throws Exception {
adminClient.close();
}
}

View File

@ -0,0 +1,78 @@
package com.sangdol.consumer.test;
import java.time.Instant;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.Test;
import com.sangdol.consumer.domain.TestRecord;
import com.sangdol.consumer.infrastructure.kafka.common.serialize.TestRecordSerializer;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class TestUtils {
private static final String BOOTSTRAP_SERVERS = "localhost:29092,localhost:39092,localhost:49092";
private static final int RECORD_PER_TOPIC = 1_000_000;
private final List<String> initialTopics = List.of("topic-0", "topic-1", "topic-2");
@Test
void deleteAllExistTopics() {
try (TestAdminClient adminClient = new TestAdminClient(BOOTSTRAP_SERVERS)) {
adminClient.deleteTopicsIfExists();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Test
void produceRecords() {
try (TestAdminClient adminClient = new TestAdminClient(BOOTSTRAP_SERVERS)) {
adminClient.createTopicsIfNotExists(initialTopics);
} catch (Exception e) {
throw new RuntimeException(e);
}
try (KafkaProducer<String, TestRecord> producer = kafkaProducer()) {
for (String topic : initialTopics) {
long start = System.currentTimeMillis();
for (int i = 0; i < RECORD_PER_TOPIC; i++) {
int randomId = ThreadLocalRandom.current().nextInt(100_000, 1_000_000);
String timeStamp = Instant.now().toString();
TestRecord data = new TestRecord(randomId, topic, timeStamp);
ProducerRecord<String, TestRecord> producerRecord = new ProducerRecord<>(
topic,
String.valueOf(randomId),
data
);
producer.send(producerRecord);
}
long end = System.currentTimeMillis();
log.info("[TestUtils] topic: {} finished in {} ms", topic, end - start);
}
}
}
private KafkaProducer<String, TestRecord> kafkaProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, TestRecordSerializer.class);
props.put(ProducerConfig.ACKS_CONFIG, "1");
props.put(ProducerConfig.LINGER_MS_CONFIG, 20);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67108864);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 65536);
return new KafkaProducer<>(props);
}
}