Compare commits
No commits in common. "e49841575deb2341c8e487da11e1fbe5b3005246" and "ebd003b79edc9c644478c4dae0dcd3e3f9b1e0d8" have entirely different histories.
e49841575d
...
ebd003b79e
@ -4,8 +4,6 @@ import java.util.Map;
|
|||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
|
||||||
|
|
||||||
public class ConsumerStats {
|
public class ConsumerStats {
|
||||||
|
|
||||||
private final int workerThreads;
|
private final int workerThreads;
|
||||||
@ -29,9 +27,4 @@ public class ConsumerStats {
|
|||||||
.collect(Collectors.averagingLong(Long::longValue))
|
.collect(Collectors.averagingLong(Long::longValue))
|
||||||
.longValue();
|
.longValue();
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
|
||||||
Map<String, Long> getThreadElapsedTime() {
|
|
||||||
return threadElapsedTime;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -22,7 +22,7 @@ public class RecordProcessor {
|
|||||||
private long initialRecordReceivedTime = 0;
|
private long initialRecordReceivedTime = 0;
|
||||||
|
|
||||||
public void process(ConsumerRecords<String, TestRecord> records) throws Exception {
|
public void process(ConsumerRecords<String, TestRecord> records) throws Exception {
|
||||||
preProcess();
|
initializeTime();
|
||||||
|
|
||||||
log.debug("[RecordProcessor] process {} records. thread: {}", records.count(),
|
log.debug("[RecordProcessor] process {} records. thread: {}", records.count(),
|
||||||
Thread.currentThread().getName());
|
Thread.currentThread().getName());
|
||||||
@ -50,11 +50,9 @@ public class RecordProcessor {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void preProcess() {
|
private void initializeTime() {
|
||||||
if (initialRecordReceivedTime == 0) {
|
if (initialRecordReceivedTime == 0) {
|
||||||
initialRecordReceivedTime = System.currentTimeMillis();
|
initialRecordReceivedTime = System.currentTimeMillis();
|
||||||
}
|
}
|
||||||
|
|
||||||
emptyPollResultCount.set(0);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,90 +0,0 @@
|
|||||||
package com.sangdol.consumer.infrastructure.kafka.consumer;
|
|
||||||
|
|
||||||
import static org.assertj.core.api.Assertions.*;
|
|
||||||
import static org.mockito.Mockito.*;
|
|
||||||
|
|
||||||
import java.time.Instant;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
|
||||||
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
|
||||||
import org.apache.kafka.common.TopicPartition;
|
|
||||||
import org.junit.jupiter.api.DisplayName;
|
|
||||||
import org.junit.jupiter.api.Test;
|
|
||||||
|
|
||||||
import com.sangdol.consumer.domain.TestRecord;
|
|
||||||
import com.sangdol.consumer.infrastructure.kafka.producer.DLTProducer;
|
|
||||||
|
|
||||||
class RecordProcessorTest {
|
|
||||||
|
|
||||||
private static final ConsumerRecords<String, TestRecord> RECORDS = new ConsumerRecords<>(Map.of(
|
|
||||||
new TopicPartition("test", 1),
|
|
||||||
List.of(new ConsumerRecord<>(
|
|
||||||
"test",
|
|
||||||
1,
|
|
||||||
2,
|
|
||||||
"key",
|
|
||||||
new TestRecord(123, "test", Instant.now().toString())
|
|
||||||
))
|
|
||||||
));
|
|
||||||
|
|
||||||
private static final ConsumerRecords<String, TestRecord> EMPTY = ConsumerRecords.empty();
|
|
||||||
|
|
||||||
@DisplayName("5회 연속으로 빈 레코드를 받으면 poll 작업의 완료로 판단한다.")
|
|
||||||
@Test
|
|
||||||
void receiveEmptyRecordsMoreThanOrEqualThreshold() {
|
|
||||||
// given
|
|
||||||
ConsumerStats consumerStats = new ConsumerStats(3);
|
|
||||||
RecordProcessor recordProcessor = new RecordProcessor(mock(DLTProducer.class), consumerStats);
|
|
||||||
|
|
||||||
// 시간 초기화를 위해 레코드를 담아 1회 전송
|
|
||||||
test(recordProcessor, RECORDS);
|
|
||||||
|
|
||||||
// when
|
|
||||||
for (int i = 0; i < 5; i++) {
|
|
||||||
test(recordProcessor, EMPTY);
|
|
||||||
}
|
|
||||||
|
|
||||||
Map<String, Long> result = consumerStats.getThreadElapsedTime();
|
|
||||||
|
|
||||||
assertThat(result).isNotEmpty();
|
|
||||||
assertThat(result.containsKey(Thread.currentThread().getName())).isTrue();
|
|
||||||
}
|
|
||||||
|
|
||||||
@DisplayName("5회 연속으로 빈 레코드를 받지 않으면 poll 작업의 완료로 판단하지 않는다.")
|
|
||||||
@Test
|
|
||||||
void receiveEmptyRecordsLessThanThreshold() {
|
|
||||||
// given
|
|
||||||
ConsumerStats consumerStats = new ConsumerStats(3);
|
|
||||||
RecordProcessor recordProcessor = new RecordProcessor(mock(DLTProducer.class), consumerStats);
|
|
||||||
|
|
||||||
// 시간 초기화를 위해 레코드를 담아 1회 전송
|
|
||||||
test(recordProcessor, RECORDS);
|
|
||||||
|
|
||||||
// when
|
|
||||||
// 3회 빈 레코드 -> 이후 비어있지 않은 레코드 -> 빈 레코드
|
|
||||||
for (int i = 0; i < 3; i++) {
|
|
||||||
test(recordProcessor, EMPTY);
|
|
||||||
}
|
|
||||||
test(recordProcessor, RECORDS);
|
|
||||||
test(recordProcessor, EMPTY);
|
|
||||||
|
|
||||||
// then
|
|
||||||
Map<String, Long> result = consumerStats.getThreadElapsedTime();
|
|
||||||
|
|
||||||
assertThat(result).isEmpty();
|
|
||||||
}
|
|
||||||
|
|
||||||
private void test(RecordProcessor recordProcessor, ConsumerRecords<String, TestRecord> records) {
|
|
||||||
if (records.isEmpty()) {
|
|
||||||
recordProcessor.processIfEmptyRecords();
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
recordProcessor.process(records);
|
|
||||||
} catch (Exception e) {
|
|
||||||
throw new RuntimeException(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Loading…
x
Reference in New Issue
Block a user