Compare commits

...

2 Commits

3 changed files with 101 additions and 2 deletions

View File

@ -4,6 +4,8 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
public class ConsumerStats {
private final int workerThreads;
@ -27,4 +29,9 @@ public class ConsumerStats {
.collect(Collectors.averagingLong(Long::longValue))
.longValue();
}
@VisibleForTesting
Map<String, Long> getThreadElapsedTime() {
return threadElapsedTime;
}
}

View File

@ -22,7 +22,7 @@ public class RecordProcessor {
private long initialRecordReceivedTime = 0;
public void process(ConsumerRecords<String, TestRecord> records) throws Exception {
initializeTime();
preProcess();
log.debug("[RecordProcessor] process {} records. thread: {}", records.count(),
Thread.currentThread().getName());
@ -50,9 +50,11 @@ public class RecordProcessor {
}
}
private void initializeTime() {
private void preProcess() {
if (initialRecordReceivedTime == 0) {
initialRecordReceivedTime = System.currentTimeMillis();
}
emptyPollResultCount.set(0);
}
}

View File

@ -0,0 +1,90 @@
package com.sangdol.consumer.infrastructure.kafka.consumer;
import static org.assertj.core.api.Assertions.*;
import static org.mockito.Mockito.*;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import com.sangdol.consumer.domain.TestRecord;
import com.sangdol.consumer.infrastructure.kafka.producer.DLTProducer;
class RecordProcessorTest {
private static final ConsumerRecords<String, TestRecord> RECORDS = new ConsumerRecords<>(Map.of(
new TopicPartition("test", 1),
List.of(new ConsumerRecord<>(
"test",
1,
2,
"key",
new TestRecord(123, "test", Instant.now().toString())
))
));
private static final ConsumerRecords<String, TestRecord> EMPTY = ConsumerRecords.empty();
@DisplayName("5회 연속으로 빈 레코드를 받으면 poll 작업의 완료로 판단한다.")
@Test
void receiveEmptyRecordsMoreThanOrEqualThreshold() {
// given
ConsumerStats consumerStats = new ConsumerStats(3);
RecordProcessor recordProcessor = new RecordProcessor(mock(DLTProducer.class), consumerStats);
// 시간 초기화를 위해 레코드를 담아 1회 전송
test(recordProcessor, RECORDS);
// when
for (int i = 0; i < 5; i++) {
test(recordProcessor, EMPTY);
}
Map<String, Long> result = consumerStats.getThreadElapsedTime();
assertThat(result).isNotEmpty();
assertThat(result.containsKey(Thread.currentThread().getName())).isTrue();
}
@DisplayName("5회 연속으로 빈 레코드를 받지 않으면 poll 작업의 완료로 판단하지 않는다.")
@Test
void receiveEmptyRecordsLessThanThreshold() {
// given
ConsumerStats consumerStats = new ConsumerStats(3);
RecordProcessor recordProcessor = new RecordProcessor(mock(DLTProducer.class), consumerStats);
// 시간 초기화를 위해 레코드를 담아 1회 전송
test(recordProcessor, RECORDS);
// when
// 3회 레코드 -> 이후 비어있지 않은 레코드 -> 레코드
for (int i = 0; i < 3; i++) {
test(recordProcessor, EMPTY);
}
test(recordProcessor, RECORDS);
test(recordProcessor, EMPTY);
// then
Map<String, Long> result = consumerStats.getThreadElapsedTime();
assertThat(result).isEmpty();
}
private void test(RecordProcessor recordProcessor, ConsumerRecords<String, TestRecord> records) {
if (records.isEmpty()) {
recordProcessor.processIfEmptyRecords();
return;
}
try {
recordProcessor.process(records);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}