From 1eb46b7921aca8482c84c4e959eeebe9cc7bdd4d Mon Sep 17 00:00:00 2001 From: pricelees Date: Fri, 27 Jun 2025 14:49:06 +0900 Subject: [PATCH] =?UTF-8?q?test:=20=ED=85=8C=EC=8A=A4=ED=8A=B8=EC=97=90=20?= =?UTF-8?q?=EC=82=AC=EC=9A=A9=EB=90=98=EB=8A=94=20=EB=A0=88=EC=BD=94?= =?UTF-8?q?=EB=93=9C=20=EC=B6=94=EA=B0=80=20=EA=B8=B0=EB=8A=A5=20=EA=B5=AC?= =?UTF-8?q?=ED=98=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/sangdol/consumer/test/TestUtils.java | 58 +++++++++++++++++++ 1 file changed, 58 insertions(+) diff --git a/src/test/java/com/sangdol/consumer/test/TestUtils.java b/src/test/java/com/sangdol/consumer/test/TestUtils.java index 3e4f33d..47d5749 100644 --- a/src/test/java/com/sangdol/consumer/test/TestUtils.java +++ b/src/test/java/com/sangdol/consumer/test/TestUtils.java @@ -1,13 +1,27 @@ package com.sangdol.consumer.test; +import java.time.Instant; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.ThreadLocalRandom; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringSerializer; import org.junit.jupiter.api.Test; +import com.sangdol.consumer.domain.TestRecord; +import com.sangdol.consumer.infrastructure.kafka.common.serialize.TestRecordSerializer; + import lombok.extern.slf4j.Slf4j; @Slf4j public class TestUtils { private static final String BOOTSTRAP_SERVERS = "localhost:29092,localhost:39092,localhost:49092"; + private static final int RECORD_PER_TOPIC = 1_000_000; + private final List initialTopics = List.of("topic-0", "topic-1", "topic-2"); @Test void deleteAllExistTopics() { @@ -17,4 +31,48 @@ public class TestUtils { throw new RuntimeException(e); } } + + @Test + void produceRecords() { + try (TestAdminClient adminClient = new TestAdminClient(BOOTSTRAP_SERVERS)) { + adminClient.createTopicsIfNotExists(initialTopics); + } catch (Exception e) { + throw new RuntimeException(e); + } + + try (KafkaProducer producer = kafkaProducer()) { + for (String topic : initialTopics) { + long start = System.currentTimeMillis(); + + for (int i = 0; i < RECORD_PER_TOPIC; i++) { + int randomId = ThreadLocalRandom.current().nextInt(100_000, 1_000_000); + String timeStamp = Instant.now().toString(); + TestRecord data = new TestRecord(randomId, topic, timeStamp); + ProducerRecord producerRecord = new ProducerRecord<>( + topic, + String.valueOf(randomId), + data + ); + producer.send(producerRecord); + } + + long end = System.currentTimeMillis(); + + log.info("[TestUtils] topic: {} finished in {} ms", topic, end - start); + } + } + } + + private KafkaProducer kafkaProducer() { + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, TestRecordSerializer.class); + props.put(ProducerConfig.ACKS_CONFIG, "1"); + props.put(ProducerConfig.LINGER_MS_CONFIG, 20); + props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67108864); + props.put(ProducerConfig.BATCH_SIZE_CONFIG, 65536); + + return new KafkaProducer<>(props); + } }