2025-06-25 01:20:06 +09:00
2025-06-25 00:06:05 +09:00
2025-06-25 00:06:05 +09:00
2025-06-25 00:06:05 +09:00
2025-06-25 00:06:05 +09:00
2025-06-25 00:06:05 +09:00
2025-06-25 00:06:05 +09:00
2025-06-24 14:58:16 +00:00
2025-06-25 01:20:06 +09:00
2025-06-25 00:06:05 +09:00

Kafka-Consumer-Test

Kafka Selector: Consumer Test 의 실습에 사용한 코드입니다.

개요

배경

이전의 글인 Kafka Selector: NetworkReceive / Application Buffer for SSL 마무리 부분에서 언급한 다음의 내용을 테스트합니다.

  1. Kafka Client에서 SSL 프로토콜을 사용할 때는 애플리케이션 버퍼에서 데이터를 읽어오니, 평문 전송 대비 네트워크 I/O가 적게 나올수도 있다.
  2. 위 이유로, 네트워크 I/O 뿐만 아니라 데이터 처리 시간 등의 오버헤드가 생각보다 평문 전송과 대비하여 극적으로 크지는 않을수도 있다.

테스트 결과는 Kafka Selector에서 측정되는 메트릭을 기반으로 분석하며, Selector 메트릭 중 일부 메트릭이 '어떻게' 측정되는지 함께 알아봅니다.

테스트 과정

  1. 1개의 Controller, 3개의 Broker 구조의 KRaft 모드로 실행합니다.

  2. 3개의 워커 스레드를 사용하는 1개의 컨슈머 애플리케이션을 실행합니다. 컨슈머 애플리케이션은 Spring Kafka를 사용하지 않고, SpringBoot + Apache Kafka Client로 구현하였습니다.

    • 컨슈머의 읽기 작업 자체를 확인하기 위해 읽어온 레코드를 처리하는 별도의 로직은 없고, 레코드를 읽어온 뒤 비동기 커밋만 진행합니다.
    • 실패 방지용으로 간단한 Dead Letter Topic 프로듀서, 리밸런스 발생 / 컨슈머 종료 시 manual commit을 하는 간단한 방어 로직은 구현되어 있습니다.
  3. 3개의 토픽을 만들고, 각 토픽에 100만개의 레코드를 넣고 컨슈머의 처리 시간과 그 과정에서의 메트릭을 확인합니다.

    • 각 토픽에 들어가는 레코드는 수정할 수 있습니다. (방법은 아래에 작성합니다.)

환경

  • Java 17 + Springboot 3.5.3 + Apache Kafka Client 3.9.0 버전을 사용하며, 모든 애플리케이션은 Docker 컨테이너에서 실행됩니다.
  • 모든 카프카 설정(max-poll-records 등)은 src/main/resources/application.yml에서 확인하실 수 있으며, 수정 역시 가능합니다.
  • MacOS(Macbook M1 pro) 환경에서는 정상 동작함을 확인하였습니다.

실행 방법

사용 포트

테스트를 진행하기 전에, 아래의 포트가 사용중인지 확인이 필요합니다.

  1. SpringBoot에서는 8080(WAS), 9400(kafka jmx exporter) 포트를 사용합니다.
  2. Kafka에서는 7071, 7072, 7073, 29092, 29093, 39092, 39093, 49092, 49093 포트를 사용합니다.
  3. Grafana는 3000, Prometheus는 9090, Kafka UI는 8000 포트를 사용합니다.

실행 - 본문의 내용과 동일한 환경

아래의 자동화 스크립트(./run.sh)를 실행하여 본문의 내용과 동일한 테스트를 실행할 수 있습니다. 시간은 제 환경에서는 7분정도 소요되며, 스크립트 실행 완료 후 콘솔에 출력되는 대시보드 링크에서 메트릭을 확인할 수 있습니다.

git clone https://gitea.pricelees.me/pricelees/understanding-kafka-consumer.git
cd understanding-kafka-consumer

./run.sh 

테스트에 사용되는 레코드 수 수정

org.study.consumerssl.data.TestUtils 클래스의 아래 값을 수정하여 레코드 및 스레드 수를 조정할 수 있습니다. 제 환경에서는 300만개의 레코드를 추가할 때 최대 1분 정도의 시간이 소요됩니다.

public class TestUtils {
   // 토픽당 발행되는 레코드 수. 기본값은 100만개 입니다.
   private static final int RECORD_PER_TOPIC = 1_000_000;
   
   
   // 레코드 발행 시 사용되는 Producer 스레드 수. 기본값은 availableProcessors() * 2 입니다.
   @Test
   void produceRecords() {
         final int threadCount = Runtime.getRuntime().availableProcessors() * 2;
   }
}

출처 및 상세 내용

JMX Metrics

  1. Kafka Client 에서 사용하는 jmx 메트릭 설정 파일은 https://blog.voidmainvoid.net/476 의 내용과 동일합니다.
  2. Kafka Broker 에서 사용하는 jmx 메트릭 설정 파일은 https://github.com/oded-dd/prometheus-jmx-kafka/tree/master 의 내용과 동일합니다.

Grafana Dashboard

  1. 컨슈머 대시보드는 Redpanda Kafka Java Consumer 를 바탕으로 현재 환경에 맞게 수정하였으며, "Selector Metrics" 카테고리의 대시보드는 직접 구성하였습니다.
  2. 본문에서 다루지는 않을 예정이나, Springboot 대시보드는 Spring Boot 2.1 System Monitor, Kafka Producer 대시보드는 Redpanda Kafka Java Producer 를 바탕으로 현재 환경에서 동작하도록 일부 수정하였습니다.
Description
Kafka Selector를 기반으로 하는 Consumer 이해
Readme MIT 533 KiB
Languages
Java 87.3%
Shell 12.7%