test: 테스트에 사용되는 레코드 추가 기능 구현

This commit is contained in:
이상진 2025-06-27 14:49:06 +09:00
parent ad56fa909a
commit 1eb46b7921

View File

@ -1,13 +1,27 @@
package com.sangdol.consumer.test; package com.sangdol.consumer.test;
import java.time.Instant;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import com.sangdol.consumer.domain.TestRecord;
import com.sangdol.consumer.infrastructure.kafka.common.serialize.TestRecordSerializer;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@Slf4j @Slf4j
public class TestUtils { public class TestUtils {
private static final String BOOTSTRAP_SERVERS = "localhost:29092,localhost:39092,localhost:49092"; private static final String BOOTSTRAP_SERVERS = "localhost:29092,localhost:39092,localhost:49092";
private static final int RECORD_PER_TOPIC = 1_000_000;
private final List<String> initialTopics = List.of("topic-0", "topic-1", "topic-2");
@Test @Test
void deleteAllExistTopics() { void deleteAllExistTopics() {
@ -17,4 +31,48 @@ public class TestUtils {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
} }
@Test
void produceRecords() {
try (TestAdminClient adminClient = new TestAdminClient(BOOTSTRAP_SERVERS)) {
adminClient.createTopicsIfNotExists(initialTopics);
} catch (Exception e) {
throw new RuntimeException(e);
}
try (KafkaProducer<String, TestRecord> producer = kafkaProducer()) {
for (String topic : initialTopics) {
long start = System.currentTimeMillis();
for (int i = 0; i < RECORD_PER_TOPIC; i++) {
int randomId = ThreadLocalRandom.current().nextInt(100_000, 1_000_000);
String timeStamp = Instant.now().toString();
TestRecord data = new TestRecord(randomId, topic, timeStamp);
ProducerRecord<String, TestRecord> producerRecord = new ProducerRecord<>(
topic,
String.valueOf(randomId),
data
);
producer.send(producerRecord);
}
long end = System.currentTimeMillis();
log.info("[TestUtils] topic: {} finished in {} ms", topic, end - start);
}
}
}
private KafkaProducer<String, TestRecord> kafkaProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, TestRecordSerializer.class);
props.put(ProducerConfig.ACKS_CONFIG, "1");
props.put(ProducerConfig.LINGER_MS_CONFIG, 20);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67108864);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 65536);
return new KafkaProducer<>(props);
}
} }