diff --git a/src/main/java/com/sangdol/consumer/infrastructure/kafka/consumer/ConsumerStats.java b/src/main/java/com/sangdol/consumer/infrastructure/kafka/consumer/ConsumerStats.java new file mode 100644 index 0000000..ac86c11 --- /dev/null +++ b/src/main/java/com/sangdol/consumer/infrastructure/kafka/consumer/ConsumerStats.java @@ -0,0 +1,30 @@ +package com.sangdol.consumer.infrastructure.kafka.consumer; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +public class ConsumerStats { + + private final int workerThreads; + private final Map threadElapsedTime; + + public ConsumerStats(int workerThreads) { + this.workerThreads = workerThreads; + this.threadElapsedTime = new ConcurrentHashMap<>(); + } + + public void update(long elapsedTime) { + threadElapsedTime.put(Thread.currentThread().getName(), elapsedTime); + } + + public long averageTime() { + if (threadElapsedTime.size() != workerThreads) { + return 0L; + } + + return threadElapsedTime.values().stream() + .collect(Collectors.averagingLong(Long::longValue)) + .longValue(); + } +}