Reviewed-on: #8
This commit is contained in:
commit
63d2f91713
@ -2,10 +2,10 @@ FROM amazoncorretto:17-alpine-jdk
|
|||||||
|
|
||||||
COPY docker/jmx/jmx_prometheus_javaagent-0.3.1.jar /opt/jmx_exporter/jmx_prometheus_javaagent-0.3.1.jar
|
COPY docker/jmx/jmx_prometheus_javaagent-0.3.1.jar /opt/jmx_exporter/jmx_prometheus_javaagent-0.3.1.jar
|
||||||
COPY docker/jmx/client-metrics.yml /opt/jmx_exporter/client-metrics.yml
|
COPY docker/jmx/client-metrics.yml /opt/jmx_exporter/client-metrics.yml
|
||||||
COPY build/libs/consumer-ssl-0.0.1-SNAPSHOT.jar /app/consumer-ssl-0.0.1-SNAPSHOT.jar
|
COPY build/libs/consumer-0.0.1-SNAPSHOT.jar /app/consumer-0.0.1-SNAPSHOT.jar
|
||||||
COPY ssl /ssl
|
COPY ssl /ssl
|
||||||
|
|
||||||
EXPOSE 8080
|
EXPOSE 8080
|
||||||
EXPOSE 9400
|
EXPOSE 9400
|
||||||
|
|
||||||
ENTRYPOINT ["java", "-javaagent:/opt/jmx_exporter/jmx_prometheus_javaagent-0.3.1.jar=9400:/opt/jmx_exporter/client-metrics.yml", "-jar", "/app/consumer-ssl-0.0.1-SNAPSHOT.jar"]
|
ENTRYPOINT ["java", "-javaagent:/opt/jmx_exporter/jmx_prometheus_javaagent-0.3.1.jar=9400:/opt/jmx_exporter/client-metrics.yml", "-jar", "/app/consumer-0.0.1-SNAPSHOT.jar"]
|
||||||
6
run.sh
6
run.sh
@ -47,12 +47,12 @@ docker compose -f docker/docker-compose-application.yml up -d
|
|||||||
echo "[4-1] 실행 후 30초 대기..."
|
echo "[4-1] 실행 후 30초 대기..."
|
||||||
sleep 30
|
sleep 30
|
||||||
echo "[4-1] 테스트 시작"
|
echo "[4-1] 테스트 시작"
|
||||||
./gradlew clean test --tests "org.study.consumerssl.data.TestUtils.produceRecords"
|
./gradlew clean test --tests "com.sangdol.consumer.test.TestUtils.produceRecords"
|
||||||
echo "[4-1] 테스트 완료. SSL 모드 실행 전 2분 대기.."
|
echo "[4-1] 테스트 완료. SSL 모드 실행 전 2분 대기.."
|
||||||
sleep 120
|
sleep 120
|
||||||
|
|
||||||
echo "[4-1] 토픽 초기화.."
|
echo "[4-1] 토픽 초기화.."
|
||||||
./gradlew clean test --tests "org.study.consumerssl.data.TestUtils.deleteAllExistTopics"
|
./gradlew clean test --tests "com.sangdol.consumer.test.TestUtils.deleteAllExistTopics"
|
||||||
|
|
||||||
echo " "
|
echo " "
|
||||||
echo "---------------------"
|
echo "---------------------"
|
||||||
@ -71,7 +71,7 @@ echo "[4-2] 실행 후 30초 대기..."
|
|||||||
sleep 30
|
sleep 30
|
||||||
|
|
||||||
echo "[4-2] 테스트 시작"
|
echo "[4-2] 테스트 시작"
|
||||||
./gradlew clean test --tests "org.study.consumerssl.data.TestUtils.produceRecords"
|
./gradlew clean test --tests "com.sangdol.consumer.test.TestUtils.produceRecords"
|
||||||
|
|
||||||
echo "[4-2] 이전과 동일하게 2분 대기 후 애플리케이션 종료.."
|
echo "[4-2] 이전과 동일하게 2분 대기 후 애플리케이션 종료.."
|
||||||
sleep 120
|
sleep 120
|
||||||
|
|||||||
@ -10,6 +10,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
|
|||||||
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
||||||
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
||||||
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
|
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
|
||||||
|
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
|
||||||
import org.apache.kafka.common.TopicPartition;
|
import org.apache.kafka.common.TopicPartition;
|
||||||
import org.apache.kafka.common.errors.WakeupException;
|
import org.apache.kafka.common.errors.WakeupException;
|
||||||
|
|
||||||
@ -47,6 +48,38 @@ public class ConsumerWorker implements Runnable {
|
|||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.info("[ConsumerWorker] Unexpected Exception. thread: {}", Thread.currentThread().getName());
|
log.info("[ConsumerWorker] Unexpected Exception. thread: {}", Thread.currentThread().getName());
|
||||||
} finally {
|
} finally {
|
||||||
|
closeAfterManualCommit();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void shutdown() {
|
||||||
|
running.set(false);
|
||||||
|
consumer.wakeup();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void commitAsync(ConsumerRecords<String, TestRecord> records) {
|
||||||
|
Map<TopicPartition, OffsetAndMetadata> offsets = records.partitions().stream()
|
||||||
|
.collect(Collectors.toMap(tp -> tp, tp -> {
|
||||||
|
List<ConsumerRecord<String, TestRecord>> partitionRecords = records.records(tp);
|
||||||
|
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
|
||||||
|
|
||||||
|
return new OffsetAndMetadata(lastOffset + 1);
|
||||||
|
}));
|
||||||
|
|
||||||
|
OffsetCommitCallback callback = (o, exception) -> {
|
||||||
|
if (exception != null) {
|
||||||
|
log.error("[ConsumerWorker] failed to commit offsets for offset: {}, partition {}", o,
|
||||||
|
records.partitions(), exception);
|
||||||
|
} else {
|
||||||
|
log.debug("[ConsumerWorker] committed offsets for offset: {}, partition {}", o,
|
||||||
|
records.partitions());
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
consumer.commitAsync(offsets, callback);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void closeAfterManualCommit() {
|
||||||
try {
|
try {
|
||||||
consumer.commitSync();
|
consumer.commitSync();
|
||||||
log.info("[ConsumerWorker] Final offset committed. Close consumer.. ");
|
log.info("[ConsumerWorker] Final offset committed. Close consumer.. ");
|
||||||
@ -59,29 +92,3 @@ public class ConsumerWorker implements Runnable {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void commitAsync(ConsumerRecords<String, TestRecord> records) {
|
|
||||||
Map<TopicPartition, OffsetAndMetadata> offsets = records.partitions().stream()
|
|
||||||
.collect(Collectors.toMap(tp -> tp, tp -> {
|
|
||||||
List<ConsumerRecord<String, TestRecord>> partitionRecords = records.records(tp);
|
|
||||||
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
|
|
||||||
|
|
||||||
return new OffsetAndMetadata(lastOffset + 1);
|
|
||||||
}));
|
|
||||||
|
|
||||||
consumer.commitAsync(offsets, (o, exception) -> {
|
|
||||||
if (exception != null) {
|
|
||||||
log.error("[ConsumerWorker] failed to commit offsets for offset: {}, partition {}", o,
|
|
||||||
records.partitions(), exception);
|
|
||||||
} else {
|
|
||||||
log.debug("[ConsumerWorker] committed offsets for offset: {}, partition {}", o,
|
|
||||||
records.partitions());
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
public void shutdown() {
|
|
||||||
running.set(false);
|
|
||||||
consumer.wakeup();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|||||||
85
src/test/java/com/sangdol/consumer/test/TestAdminClient.java
Normal file
85
src/test/java/com/sangdol/consumer/test/TestAdminClient.java
Normal file
@ -0,0 +1,85 @@
|
|||||||
|
package com.sangdol.consumer.test;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Properties;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
import org.apache.kafka.clients.admin.AdminClient;
|
||||||
|
import org.apache.kafka.clients.admin.AdminClientConfig;
|
||||||
|
import org.apache.kafka.clients.admin.NewTopic;
|
||||||
|
import org.apache.kafka.common.config.TopicConfig;
|
||||||
|
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
|
public class TestAdminClient implements AutoCloseable {
|
||||||
|
|
||||||
|
private final AdminClient adminClient;
|
||||||
|
|
||||||
|
public TestAdminClient(String bootStrapServers) {
|
||||||
|
this.adminClient = adminClient(bootStrapServers);
|
||||||
|
}
|
||||||
|
|
||||||
|
private AdminClient adminClient(String bootStrapServers) {
|
||||||
|
Properties props = new Properties();
|
||||||
|
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
|
||||||
|
return AdminClient.create(props);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void createTopicsIfNotExists(List<String> topics) {
|
||||||
|
Set<String> currentTopics = getCurrentTopics();
|
||||||
|
if (currentTopics.containsAll(topics)) {
|
||||||
|
log.info("[TestAdminClient] Topic Already Exists ");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
int partitions = 3;
|
||||||
|
short replicationFactor = 3;
|
||||||
|
Map<String, String> topicConfigs = Map.of(
|
||||||
|
TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "2"
|
||||||
|
);
|
||||||
|
|
||||||
|
List<NewTopic> newTopics = topics.stream()
|
||||||
|
.map(topic -> new NewTopic(topic, partitions, replicationFactor).configs(topicConfigs))
|
||||||
|
.toList();
|
||||||
|
|
||||||
|
try {
|
||||||
|
adminClient.createTopics(newTopics)
|
||||||
|
.all()
|
||||||
|
.get();
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void deleteTopicsIfExists() {
|
||||||
|
try {
|
||||||
|
Set<String> topics = getCurrentTopics();
|
||||||
|
if (topics.isEmpty()) {
|
||||||
|
log.info("[TestAdminClient] no exist topics");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
adminClient.deleteTopics(topics)
|
||||||
|
.all()
|
||||||
|
.get();
|
||||||
|
log.info("[ConsumerWorker] Successfully delete {} topics( {} )", topics.size(), String.join(", ", topics));
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private Set<String> getCurrentTopics() {
|
||||||
|
try {
|
||||||
|
return adminClient.listTopics().names().get();
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() throws Exception {
|
||||||
|
adminClient.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
78
src/test/java/com/sangdol/consumer/test/TestUtils.java
Normal file
78
src/test/java/com/sangdol/consumer/test/TestUtils.java
Normal file
@ -0,0 +1,78 @@
|
|||||||
|
package com.sangdol.consumer.test;
|
||||||
|
|
||||||
|
import java.time.Instant;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Properties;
|
||||||
|
import java.util.concurrent.ThreadLocalRandom;
|
||||||
|
|
||||||
|
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||||
|
import org.apache.kafka.clients.producer.ProducerConfig;
|
||||||
|
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||||
|
import org.apache.kafka.common.serialization.StringSerializer;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
import com.sangdol.consumer.domain.TestRecord;
|
||||||
|
import com.sangdol.consumer.infrastructure.kafka.common.serialize.TestRecordSerializer;
|
||||||
|
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
|
public class TestUtils {
|
||||||
|
|
||||||
|
private static final String BOOTSTRAP_SERVERS = "localhost:29092,localhost:39092,localhost:49092";
|
||||||
|
private static final int RECORD_PER_TOPIC = 1_000_000;
|
||||||
|
private final List<String> initialTopics = List.of("topic-0", "topic-1", "topic-2");
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void deleteAllExistTopics() {
|
||||||
|
try (TestAdminClient adminClient = new TestAdminClient(BOOTSTRAP_SERVERS)) {
|
||||||
|
adminClient.deleteTopicsIfExists();
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void produceRecords() {
|
||||||
|
try (TestAdminClient adminClient = new TestAdminClient(BOOTSTRAP_SERVERS)) {
|
||||||
|
adminClient.createTopicsIfNotExists(initialTopics);
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
|
||||||
|
try (KafkaProducer<String, TestRecord> producer = kafkaProducer()) {
|
||||||
|
for (String topic : initialTopics) {
|
||||||
|
long start = System.currentTimeMillis();
|
||||||
|
|
||||||
|
for (int i = 0; i < RECORD_PER_TOPIC; i++) {
|
||||||
|
int randomId = ThreadLocalRandom.current().nextInt(100_000, 1_000_000);
|
||||||
|
String timeStamp = Instant.now().toString();
|
||||||
|
TestRecord data = new TestRecord(randomId, topic, timeStamp);
|
||||||
|
ProducerRecord<String, TestRecord> producerRecord = new ProducerRecord<>(
|
||||||
|
topic,
|
||||||
|
String.valueOf(randomId),
|
||||||
|
data
|
||||||
|
);
|
||||||
|
producer.send(producerRecord);
|
||||||
|
}
|
||||||
|
|
||||||
|
long end = System.currentTimeMillis();
|
||||||
|
|
||||||
|
log.info("[TestUtils] topic: {} finished in {} ms", topic, end - start);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private KafkaProducer<String, TestRecord> kafkaProducer() {
|
||||||
|
Properties props = new Properties();
|
||||||
|
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
|
||||||
|
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
|
||||||
|
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, TestRecordSerializer.class);
|
||||||
|
props.put(ProducerConfig.ACKS_CONFIG, "1");
|
||||||
|
props.put(ProducerConfig.LINGER_MS_CONFIG, 20);
|
||||||
|
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67108864);
|
||||||
|
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 65536);
|
||||||
|
|
||||||
|
return new KafkaProducer<>(props);
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
x
Reference in New Issue
Block a user