diff --git a/src/test/java/com/sangdol/consumer/test/TestUtils.java b/src/test/java/com/sangdol/consumer/test/TestUtils.java index 3e4f33d..47d5749 100644 --- a/src/test/java/com/sangdol/consumer/test/TestUtils.java +++ b/src/test/java/com/sangdol/consumer/test/TestUtils.java @@ -1,13 +1,27 @@ package com.sangdol.consumer.test; +import java.time.Instant; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.ThreadLocalRandom; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringSerializer; import org.junit.jupiter.api.Test; +import com.sangdol.consumer.domain.TestRecord; +import com.sangdol.consumer.infrastructure.kafka.common.serialize.TestRecordSerializer; + import lombok.extern.slf4j.Slf4j; @Slf4j public class TestUtils { private static final String BOOTSTRAP_SERVERS = "localhost:29092,localhost:39092,localhost:49092"; + private static final int RECORD_PER_TOPIC = 1_000_000; + private final List initialTopics = List.of("topic-0", "topic-1", "topic-2"); @Test void deleteAllExistTopics() { @@ -17,4 +31,48 @@ public class TestUtils { throw new RuntimeException(e); } } + + @Test + void produceRecords() { + try (TestAdminClient adminClient = new TestAdminClient(BOOTSTRAP_SERVERS)) { + adminClient.createTopicsIfNotExists(initialTopics); + } catch (Exception e) { + throw new RuntimeException(e); + } + + try (KafkaProducer producer = kafkaProducer()) { + for (String topic : initialTopics) { + long start = System.currentTimeMillis(); + + for (int i = 0; i < RECORD_PER_TOPIC; i++) { + int randomId = ThreadLocalRandom.current().nextInt(100_000, 1_000_000); + String timeStamp = Instant.now().toString(); + TestRecord data = new TestRecord(randomId, topic, timeStamp); + ProducerRecord producerRecord = new ProducerRecord<>( + topic, + String.valueOf(randomId), + data + ); + producer.send(producerRecord); + } + + long end = System.currentTimeMillis(); + + log.info("[TestUtils] topic: {} finished in {} ms", topic, end - start); + } + } + } + + private KafkaProducer kafkaProducer() { + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, TestRecordSerializer.class); + props.put(ProducerConfig.ACKS_CONFIG, "1"); + props.put(ProducerConfig.LINGER_MS_CONFIG, 20); + props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67108864); + props.put(ProducerConfig.BATCH_SIZE_CONFIG, 65536); + + return new KafkaProducer<>(props); + } }