Compare commits
2 Commits
ebd003b79e
...
e49841575d
| Author | SHA1 | Date | |
|---|---|---|---|
| e49841575d | |||
| e5916eb305 |
@ -4,6 +4,8 @@ import java.util.Map;
|
|||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
public class ConsumerStats {
|
public class ConsumerStats {
|
||||||
|
|
||||||
private final int workerThreads;
|
private final int workerThreads;
|
||||||
@ -27,4 +29,9 @@ public class ConsumerStats {
|
|||||||
.collect(Collectors.averagingLong(Long::longValue))
|
.collect(Collectors.averagingLong(Long::longValue))
|
||||||
.longValue();
|
.longValue();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
Map<String, Long> getThreadElapsedTime() {
|
||||||
|
return threadElapsedTime;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -22,7 +22,7 @@ public class RecordProcessor {
|
|||||||
private long initialRecordReceivedTime = 0;
|
private long initialRecordReceivedTime = 0;
|
||||||
|
|
||||||
public void process(ConsumerRecords<String, TestRecord> records) throws Exception {
|
public void process(ConsumerRecords<String, TestRecord> records) throws Exception {
|
||||||
initializeTime();
|
preProcess();
|
||||||
|
|
||||||
log.debug("[RecordProcessor] process {} records. thread: {}", records.count(),
|
log.debug("[RecordProcessor] process {} records. thread: {}", records.count(),
|
||||||
Thread.currentThread().getName());
|
Thread.currentThread().getName());
|
||||||
@ -50,9 +50,11 @@ public class RecordProcessor {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void initializeTime() {
|
private void preProcess() {
|
||||||
if (initialRecordReceivedTime == 0) {
|
if (initialRecordReceivedTime == 0) {
|
||||||
initialRecordReceivedTime = System.currentTimeMillis();
|
initialRecordReceivedTime = System.currentTimeMillis();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
emptyPollResultCount.set(0);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -0,0 +1,90 @@
|
|||||||
|
package com.sangdol.consumer.infrastructure.kafka.consumer;
|
||||||
|
|
||||||
|
import static org.assertj.core.api.Assertions.*;
|
||||||
|
import static org.mockito.Mockito.*;
|
||||||
|
|
||||||
|
import java.time.Instant;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||||
|
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
||||||
|
import org.apache.kafka.common.TopicPartition;
|
||||||
|
import org.junit.jupiter.api.DisplayName;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
import com.sangdol.consumer.domain.TestRecord;
|
||||||
|
import com.sangdol.consumer.infrastructure.kafka.producer.DLTProducer;
|
||||||
|
|
||||||
|
class RecordProcessorTest {
|
||||||
|
|
||||||
|
private static final ConsumerRecords<String, TestRecord> RECORDS = new ConsumerRecords<>(Map.of(
|
||||||
|
new TopicPartition("test", 1),
|
||||||
|
List.of(new ConsumerRecord<>(
|
||||||
|
"test",
|
||||||
|
1,
|
||||||
|
2,
|
||||||
|
"key",
|
||||||
|
new TestRecord(123, "test", Instant.now().toString())
|
||||||
|
))
|
||||||
|
));
|
||||||
|
|
||||||
|
private static final ConsumerRecords<String, TestRecord> EMPTY = ConsumerRecords.empty();
|
||||||
|
|
||||||
|
@DisplayName("5회 연속으로 빈 레코드를 받으면 poll 작업의 완료로 판단한다.")
|
||||||
|
@Test
|
||||||
|
void receiveEmptyRecordsMoreThanOrEqualThreshold() {
|
||||||
|
// given
|
||||||
|
ConsumerStats consumerStats = new ConsumerStats(3);
|
||||||
|
RecordProcessor recordProcessor = new RecordProcessor(mock(DLTProducer.class), consumerStats);
|
||||||
|
|
||||||
|
// 시간 초기화를 위해 레코드를 담아 1회 전송
|
||||||
|
test(recordProcessor, RECORDS);
|
||||||
|
|
||||||
|
// when
|
||||||
|
for (int i = 0; i < 5; i++) {
|
||||||
|
test(recordProcessor, EMPTY);
|
||||||
|
}
|
||||||
|
|
||||||
|
Map<String, Long> result = consumerStats.getThreadElapsedTime();
|
||||||
|
|
||||||
|
assertThat(result).isNotEmpty();
|
||||||
|
assertThat(result.containsKey(Thread.currentThread().getName())).isTrue();
|
||||||
|
}
|
||||||
|
|
||||||
|
@DisplayName("5회 연속으로 빈 레코드를 받지 않으면 poll 작업의 완료로 판단하지 않는다.")
|
||||||
|
@Test
|
||||||
|
void receiveEmptyRecordsLessThanThreshold() {
|
||||||
|
// given
|
||||||
|
ConsumerStats consumerStats = new ConsumerStats(3);
|
||||||
|
RecordProcessor recordProcessor = new RecordProcessor(mock(DLTProducer.class), consumerStats);
|
||||||
|
|
||||||
|
// 시간 초기화를 위해 레코드를 담아 1회 전송
|
||||||
|
test(recordProcessor, RECORDS);
|
||||||
|
|
||||||
|
// when
|
||||||
|
// 3회 빈 레코드 -> 이후 비어있지 않은 레코드 -> 빈 레코드
|
||||||
|
for (int i = 0; i < 3; i++) {
|
||||||
|
test(recordProcessor, EMPTY);
|
||||||
|
}
|
||||||
|
test(recordProcessor, RECORDS);
|
||||||
|
test(recordProcessor, EMPTY);
|
||||||
|
|
||||||
|
// then
|
||||||
|
Map<String, Long> result = consumerStats.getThreadElapsedTime();
|
||||||
|
|
||||||
|
assertThat(result).isEmpty();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void test(RecordProcessor recordProcessor, ConsumerRecords<String, TestRecord> records) {
|
||||||
|
if (records.isEmpty()) {
|
||||||
|
recordProcessor.processIfEmptyRecords();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
recordProcessor.process(records);
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
x
Reference in New Issue
Block a user