Reviewed-on: #10
This commit is contained in:
commit
6eac52c3b8
@ -4,6 +4,8 @@ import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
public class ConsumerStats {
|
||||
|
||||
private final int workerThreads;
|
||||
@ -27,4 +29,9 @@ public class ConsumerStats {
|
||||
.collect(Collectors.averagingLong(Long::longValue))
|
||||
.longValue();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
Map<String, Long> getThreadElapsedTime() {
|
||||
return threadElapsedTime;
|
||||
}
|
||||
}
|
||||
|
||||
@ -22,7 +22,7 @@ public class RecordProcessor {
|
||||
private long initialRecordReceivedTime = 0;
|
||||
|
||||
public void process(ConsumerRecords<String, TestRecord> records) throws Exception {
|
||||
initializeTime();
|
||||
preProcess();
|
||||
|
||||
log.debug("[RecordProcessor] process {} records. thread: {}", records.count(),
|
||||
Thread.currentThread().getName());
|
||||
@ -50,9 +50,11 @@ public class RecordProcessor {
|
||||
}
|
||||
}
|
||||
|
||||
private void initializeTime() {
|
||||
private void preProcess() {
|
||||
if (initialRecordReceivedTime == 0) {
|
||||
initialRecordReceivedTime = System.currentTimeMillis();
|
||||
}
|
||||
|
||||
emptyPollResultCount.set(0);
|
||||
}
|
||||
}
|
||||
|
||||
@ -0,0 +1,90 @@
|
||||
package com.sangdol.consumer.infrastructure.kafka.consumer;
|
||||
|
||||
import static org.assertj.core.api.Assertions.*;
|
||||
import static org.mockito.Mockito.*;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.junit.jupiter.api.DisplayName;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import com.sangdol.consumer.domain.TestRecord;
|
||||
import com.sangdol.consumer.infrastructure.kafka.producer.DLTProducer;
|
||||
|
||||
class RecordProcessorTest {
|
||||
|
||||
private static final ConsumerRecords<String, TestRecord> RECORDS = new ConsumerRecords<>(Map.of(
|
||||
new TopicPartition("test", 1),
|
||||
List.of(new ConsumerRecord<>(
|
||||
"test",
|
||||
1,
|
||||
2,
|
||||
"key",
|
||||
new TestRecord(123, "test", Instant.now().toString())
|
||||
))
|
||||
));
|
||||
|
||||
private static final ConsumerRecords<String, TestRecord> EMPTY = ConsumerRecords.empty();
|
||||
|
||||
@DisplayName("5회 연속으로 빈 레코드를 받으면 poll 작업의 완료로 판단한다.")
|
||||
@Test
|
||||
void receiveEmptyRecordsMoreThanOrEqualThreshold() {
|
||||
// given
|
||||
ConsumerStats consumerStats = new ConsumerStats(3);
|
||||
RecordProcessor recordProcessor = new RecordProcessor(mock(DLTProducer.class), consumerStats);
|
||||
|
||||
// 시간 초기화를 위해 레코드를 담아 1회 전송
|
||||
test(recordProcessor, RECORDS);
|
||||
|
||||
// when
|
||||
for (int i = 0; i < 5; i++) {
|
||||
test(recordProcessor, EMPTY);
|
||||
}
|
||||
|
||||
Map<String, Long> result = consumerStats.getThreadElapsedTime();
|
||||
|
||||
assertThat(result).isNotEmpty();
|
||||
assertThat(result.containsKey(Thread.currentThread().getName())).isTrue();
|
||||
}
|
||||
|
||||
@DisplayName("5회 연속으로 빈 레코드를 받지 않으면 poll 작업의 완료로 판단하지 않는다.")
|
||||
@Test
|
||||
void receiveEmptyRecordsLessThanThreshold() {
|
||||
// given
|
||||
ConsumerStats consumerStats = new ConsumerStats(3);
|
||||
RecordProcessor recordProcessor = new RecordProcessor(mock(DLTProducer.class), consumerStats);
|
||||
|
||||
// 시간 초기화를 위해 레코드를 담아 1회 전송
|
||||
test(recordProcessor, RECORDS);
|
||||
|
||||
// when
|
||||
// 3회 빈 레코드 -> 이후 비어있지 않은 레코드 -> 빈 레코드
|
||||
for (int i = 0; i < 3; i++) {
|
||||
test(recordProcessor, EMPTY);
|
||||
}
|
||||
test(recordProcessor, RECORDS);
|
||||
test(recordProcessor, EMPTY);
|
||||
|
||||
// then
|
||||
Map<String, Long> result = consumerStats.getThreadElapsedTime();
|
||||
|
||||
assertThat(result).isEmpty();
|
||||
}
|
||||
|
||||
private void test(RecordProcessor recordProcessor, ConsumerRecords<String, TestRecord> records) {
|
||||
if (records.isEmpty()) {
|
||||
recordProcessor.processIfEmptyRecords();
|
||||
return;
|
||||
}
|
||||
try {
|
||||
recordProcessor.process(records);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user