Compare commits
5 Commits
ebd003b79e
...
63d2f91713
| Author | SHA1 | Date | |
|---|---|---|---|
| 63d2f91713 | |||
| 77ee1dff8c | |||
| c348ae6dbd | |||
| 1eb46b7921 | |||
| ad56fa909a |
@ -2,10 +2,10 @@ FROM amazoncorretto:17-alpine-jdk
|
||||
|
||||
COPY docker/jmx/jmx_prometheus_javaagent-0.3.1.jar /opt/jmx_exporter/jmx_prometheus_javaagent-0.3.1.jar
|
||||
COPY docker/jmx/client-metrics.yml /opt/jmx_exporter/client-metrics.yml
|
||||
COPY build/libs/consumer-ssl-0.0.1-SNAPSHOT.jar /app/consumer-ssl-0.0.1-SNAPSHOT.jar
|
||||
COPY build/libs/consumer-0.0.1-SNAPSHOT.jar /app/consumer-0.0.1-SNAPSHOT.jar
|
||||
COPY ssl /ssl
|
||||
|
||||
EXPOSE 8080
|
||||
EXPOSE 9400
|
||||
|
||||
ENTRYPOINT ["java", "-javaagent:/opt/jmx_exporter/jmx_prometheus_javaagent-0.3.1.jar=9400:/opt/jmx_exporter/client-metrics.yml", "-jar", "/app/consumer-ssl-0.0.1-SNAPSHOT.jar"]
|
||||
ENTRYPOINT ["java", "-javaagent:/opt/jmx_exporter/jmx_prometheus_javaagent-0.3.1.jar=9400:/opt/jmx_exporter/client-metrics.yml", "-jar", "/app/consumer-0.0.1-SNAPSHOT.jar"]
|
||||
6
run.sh
6
run.sh
@ -47,12 +47,12 @@ docker compose -f docker/docker-compose-application.yml up -d
|
||||
echo "[4-1] 실행 후 30초 대기..."
|
||||
sleep 30
|
||||
echo "[4-1] 테스트 시작"
|
||||
./gradlew clean test --tests "org.study.consumerssl.data.TestUtils.produceRecords"
|
||||
./gradlew clean test --tests "com.sangdol.consumer.test.TestUtils.produceRecords"
|
||||
echo "[4-1] 테스트 완료. SSL 모드 실행 전 2분 대기.."
|
||||
sleep 120
|
||||
|
||||
echo "[4-1] 토픽 초기화.."
|
||||
./gradlew clean test --tests "org.study.consumerssl.data.TestUtils.deleteAllExistTopics"
|
||||
./gradlew clean test --tests "com.sangdol.consumer.test.TestUtils.deleteAllExistTopics"
|
||||
|
||||
echo " "
|
||||
echo "---------------------"
|
||||
@ -71,7 +71,7 @@ echo "[4-2] 실행 후 30초 대기..."
|
||||
sleep 30
|
||||
|
||||
echo "[4-2] 테스트 시작"
|
||||
./gradlew clean test --tests "org.study.consumerssl.data.TestUtils.produceRecords"
|
||||
./gradlew clean test --tests "com.sangdol.consumer.test.TestUtils.produceRecords"
|
||||
|
||||
echo "[4-2] 이전과 동일하게 2분 대기 후 애플리케이션 종료.."
|
||||
sleep 120
|
||||
|
||||
@ -10,6 +10,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
||||
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
||||
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
|
||||
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.common.errors.WakeupException;
|
||||
|
||||
@ -47,19 +48,15 @@ public class ConsumerWorker implements Runnable {
|
||||
} catch (Exception e) {
|
||||
log.info("[ConsumerWorker] Unexpected Exception. thread: {}", Thread.currentThread().getName());
|
||||
} finally {
|
||||
try {
|
||||
consumer.commitSync();
|
||||
log.info("[ConsumerWorker] Final offset committed. Close consumer.. ");
|
||||
} catch (Exception e) {
|
||||
log.error("[ConsumerWorker] Failed to commit final offsets when shutdown. thread: {}",
|
||||
Thread.currentThread().getName());
|
||||
} finally {
|
||||
consumer.close();
|
||||
log.info("[ConsumerWorker] Closed consumer thread. thread: {}", Thread.currentThread().getName());
|
||||
}
|
||||
closeAfterManualCommit();
|
||||
}
|
||||
}
|
||||
|
||||
public void shutdown() {
|
||||
running.set(false);
|
||||
consumer.wakeup();
|
||||
}
|
||||
|
||||
private void commitAsync(ConsumerRecords<String, TestRecord> records) {
|
||||
Map<TopicPartition, OffsetAndMetadata> offsets = records.partitions().stream()
|
||||
.collect(Collectors.toMap(tp -> tp, tp -> {
|
||||
@ -69,7 +66,7 @@ public class ConsumerWorker implements Runnable {
|
||||
return new OffsetAndMetadata(lastOffset + 1);
|
||||
}));
|
||||
|
||||
consumer.commitAsync(offsets, (o, exception) -> {
|
||||
OffsetCommitCallback callback = (o, exception) -> {
|
||||
if (exception != null) {
|
||||
log.error("[ConsumerWorker] failed to commit offsets for offset: {}, partition {}", o,
|
||||
records.partitions(), exception);
|
||||
@ -77,11 +74,21 @@ public class ConsumerWorker implements Runnable {
|
||||
log.debug("[ConsumerWorker] committed offsets for offset: {}, partition {}", o,
|
||||
records.partitions());
|
||||
}
|
||||
});
|
||||
};
|
||||
|
||||
consumer.commitAsync(offsets, callback);
|
||||
}
|
||||
|
||||
public void shutdown() {
|
||||
running.set(false);
|
||||
consumer.wakeup();
|
||||
private void closeAfterManualCommit() {
|
||||
try {
|
||||
consumer.commitSync();
|
||||
log.info("[ConsumerWorker] Final offset committed. Close consumer.. ");
|
||||
} catch (Exception e) {
|
||||
log.error("[ConsumerWorker] Failed to commit final offsets when shutdown. thread: {}",
|
||||
Thread.currentThread().getName());
|
||||
} finally {
|
||||
consumer.close();
|
||||
log.info("[ConsumerWorker] Closed consumer thread. thread: {}", Thread.currentThread().getName());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
85
src/test/java/com/sangdol/consumer/test/TestAdminClient.java
Normal file
85
src/test/java/com/sangdol/consumer/test/TestAdminClient.java
Normal file
@ -0,0 +1,85 @@
|
||||
package com.sangdol.consumer.test;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.kafka.clients.admin.AdminClient;
|
||||
import org.apache.kafka.clients.admin.AdminClientConfig;
|
||||
import org.apache.kafka.clients.admin.NewTopic;
|
||||
import org.apache.kafka.common.config.TopicConfig;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
@Slf4j
|
||||
public class TestAdminClient implements AutoCloseable {
|
||||
|
||||
private final AdminClient adminClient;
|
||||
|
||||
public TestAdminClient(String bootStrapServers) {
|
||||
this.adminClient = adminClient(bootStrapServers);
|
||||
}
|
||||
|
||||
private AdminClient adminClient(String bootStrapServers) {
|
||||
Properties props = new Properties();
|
||||
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
|
||||
return AdminClient.create(props);
|
||||
}
|
||||
|
||||
public void createTopicsIfNotExists(List<String> topics) {
|
||||
Set<String> currentTopics = getCurrentTopics();
|
||||
if (currentTopics.containsAll(topics)) {
|
||||
log.info("[TestAdminClient] Topic Already Exists ");
|
||||
return;
|
||||
}
|
||||
|
||||
int partitions = 3;
|
||||
short replicationFactor = 3;
|
||||
Map<String, String> topicConfigs = Map.of(
|
||||
TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "2"
|
||||
);
|
||||
|
||||
List<NewTopic> newTopics = topics.stream()
|
||||
.map(topic -> new NewTopic(topic, partitions, replicationFactor).configs(topicConfigs))
|
||||
.toList();
|
||||
|
||||
try {
|
||||
adminClient.createTopics(newTopics)
|
||||
.all()
|
||||
.get();
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public void deleteTopicsIfExists() {
|
||||
try {
|
||||
Set<String> topics = getCurrentTopics();
|
||||
if (topics.isEmpty()) {
|
||||
log.info("[TestAdminClient] no exist topics");
|
||||
return;
|
||||
}
|
||||
|
||||
adminClient.deleteTopics(topics)
|
||||
.all()
|
||||
.get();
|
||||
log.info("[ConsumerWorker] Successfully delete {} topics( {} )", topics.size(), String.join(", ", topics));
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private Set<String> getCurrentTopics() {
|
||||
try {
|
||||
return adminClient.listTopics().names().get();
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws Exception {
|
||||
adminClient.close();
|
||||
}
|
||||
}
|
||||
78
src/test/java/com/sangdol/consumer/test/TestUtils.java
Normal file
78
src/test/java/com/sangdol/consumer/test/TestUtils.java
Normal file
@ -0,0 +1,78 @@
|
||||
package com.sangdol.consumer.test;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
|
||||
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||
import org.apache.kafka.clients.producer.ProducerConfig;
|
||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||
import org.apache.kafka.common.serialization.StringSerializer;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import com.sangdol.consumer.domain.TestRecord;
|
||||
import com.sangdol.consumer.infrastructure.kafka.common.serialize.TestRecordSerializer;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
@Slf4j
|
||||
public class TestUtils {
|
||||
|
||||
private static final String BOOTSTRAP_SERVERS = "localhost:29092,localhost:39092,localhost:49092";
|
||||
private static final int RECORD_PER_TOPIC = 1_000_000;
|
||||
private final List<String> initialTopics = List.of("topic-0", "topic-1", "topic-2");
|
||||
|
||||
@Test
|
||||
void deleteAllExistTopics() {
|
||||
try (TestAdminClient adminClient = new TestAdminClient(BOOTSTRAP_SERVERS)) {
|
||||
adminClient.deleteTopicsIfExists();
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void produceRecords() {
|
||||
try (TestAdminClient adminClient = new TestAdminClient(BOOTSTRAP_SERVERS)) {
|
||||
adminClient.createTopicsIfNotExists(initialTopics);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
try (KafkaProducer<String, TestRecord> producer = kafkaProducer()) {
|
||||
for (String topic : initialTopics) {
|
||||
long start = System.currentTimeMillis();
|
||||
|
||||
for (int i = 0; i < RECORD_PER_TOPIC; i++) {
|
||||
int randomId = ThreadLocalRandom.current().nextInt(100_000, 1_000_000);
|
||||
String timeStamp = Instant.now().toString();
|
||||
TestRecord data = new TestRecord(randomId, topic, timeStamp);
|
||||
ProducerRecord<String, TestRecord> producerRecord = new ProducerRecord<>(
|
||||
topic,
|
||||
String.valueOf(randomId),
|
||||
data
|
||||
);
|
||||
producer.send(producerRecord);
|
||||
}
|
||||
|
||||
long end = System.currentTimeMillis();
|
||||
|
||||
log.info("[TestUtils] topic: {} finished in {} ms", topic, end - start);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private KafkaProducer<String, TestRecord> kafkaProducer() {
|
||||
Properties props = new Properties();
|
||||
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
|
||||
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
|
||||
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, TestRecordSerializer.class);
|
||||
props.put(ProducerConfig.ACKS_CONFIG, "1");
|
||||
props.put(ProducerConfig.LINGER_MS_CONFIG, 20);
|
||||
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67108864);
|
||||
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 65536);
|
||||
|
||||
return new KafkaProducer<>(props);
|
||||
}
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user