From 49e00d91e54dd0b9886649907348c4b478df7361 Mon Sep 17 00:00:00 2001 From: pricelees Date: Sun, 12 Oct 2025 13:31:45 +0900 Subject: [PATCH] =?UTF-8?q?feat:=20=EB=A1=9C=EA=B7=B8=EC=9D=B8=20=EC=9D=B4?= =?UTF-8?q?=EB=A0=A5=20=EC=A0=80=EC=9E=A5=20=EC=9D=B4=EB=B2=A4=ED=8A=B8=20?= =?UTF-8?q?=EC=B2=98=EB=A6=AC=20=EA=B8=B0=EB=8A=A5=20=EB=B0=8F=20Concurren?= =?UTF-8?q?tLinkedQueue=EC=9D=98=20=EB=8F=99=EC=8B=9C=EC=84=B1=20=EC=B2=98?= =?UTF-8?q?=EB=A6=AC=20=ED=85=8C=EC=8A=A4=ED=8A=B8=20=EC=B6=94=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../business/LoginHistoryEventListener.kt | 96 +++++++++++++++++++ .../business/LoginHistoryEventListenerTest.kt | 71 ++++++++++++++ 2 files changed, 167 insertions(+) create mode 100644 service/src/main/kotlin/com/sangdol/roomescape/auth/business/LoginHistoryEventListener.kt create mode 100644 service/src/test/kotlin/com/sangdol/roomescape/auth/business/LoginHistoryEventListenerTest.kt diff --git a/service/src/main/kotlin/com/sangdol/roomescape/auth/business/LoginHistoryEventListener.kt b/service/src/main/kotlin/com/sangdol/roomescape/auth/business/LoginHistoryEventListener.kt new file mode 100644 index 00000000..5a9f0ca1 --- /dev/null +++ b/service/src/main/kotlin/com/sangdol/roomescape/auth/business/LoginHistoryEventListener.kt @@ -0,0 +1,96 @@ +package com.sangdol.roomescape.auth.business + +import com.sangdol.common.persistence.IDGenerator +import com.sangdol.roomescape.auth.business.domain.LoginHistoryEvent +import com.sangdol.roomescape.auth.infrastructure.persistence.LoginHistoryEntity +import com.sangdol.roomescape.auth.infrastructure.persistence.LoginHistoryRepository +import com.sangdol.roomescape.auth.mapper.toEntity +import io.github.oshai.kotlinlogging.KLogger +import io.github.oshai.kotlinlogging.KotlinLogging +import jakarta.annotation.PreDestroy +import org.springframework.beans.factory.annotation.Value +import org.springframework.context.event.EventListener +import org.springframework.scheduling.annotation.Async +import org.springframework.scheduling.annotation.EnableAsync +import org.springframework.scheduling.annotation.EnableScheduling +import org.springframework.scheduling.annotation.Scheduled +import org.springframework.stereotype.Component +import java.util.concurrent.ConcurrentLinkedQueue +import java.util.concurrent.TimeUnit + +private val log: KLogger = KotlinLogging.logger {} + +@Component +@EnableAsync +@EnableScheduling +class LoginHistoryEventListener( + private val idGenerator: IDGenerator, + private val loginHistoryRepository: LoginHistoryRepository, + private val queue: ConcurrentLinkedQueue = ConcurrentLinkedQueue() +) { + + @Value(value = "\${spring.jpa.properties.hibernate.jdbc.batch_size:100}") + private var batchSize: Int = 0 + + @Async + @EventListener(classes = [LoginHistoryEvent::class]) + fun onLoginCompleted(event: LoginHistoryEvent) { + log.info { "[onLoginCompleted] 로그인 이력 저장 이벤트 수신: id=${event.id}, type=${event.type}" } + + queue.add(event.toEntity(idGenerator.create())).also { + log.info { "[onLoginCompleted] 로그인 이력 저장 이벤트 큐 저장 완료: id=${event.id}, type=${event.type}" } + } + + if (queue.size >= batchSize) { + flush() + } + } + + @Scheduled(fixedRate = 30, timeUnit = TimeUnit.SECONDS) + fun flushScheduled() { + log.info { "[flushScheduled] 큐에 저장된 로그인 이력 저장 시작: size=${queue.size}" } + + if (queue.isEmpty()) { + log.info { "[flushScheduled] 큐에 있는 로그인 이력이 없음." } + return + } + flush() + log.info { "[flushScheduled] 큐에 저장된 로그인 이력 저장 완료: size=${queue.size}" } + } + + @PreDestroy + fun flushAll() { + log.info { "[flushAll] 애플리케이션 종료. 큐에 있는 모든 이력 저장 시작: size=${queue.size}" } + while (!queue.isEmpty()) { + flush() + } + log.info { "[flushAll] 애플리케이션 종료. 큐에 있는 모든 이력 저장 완료: size=${queue.size}" } + } + + private fun flush() { + log.info { "[flush] 큐에 저장된 로그인 이력 저장 시작: size=${queue.size}" } + + if (queue.isEmpty()) { + log.info { "[flush] 큐에 있는 로그인 이력이 없음." } + return; + } + + val batch = mutableListOf() + repeat(batchSize) { + val entity: LoginHistoryEntity? = queue.poll() + + if (entity != null) { + batch.add(entity) + } else { + return@repeat + } + } + + if (batch.isEmpty()) { + return + } + loginHistoryRepository.saveAll(batch).also { + log.info { "[flush] 큐에 저장된 로그인 이력 저장 완료: size=${batch.size}" } + } + } +} diff --git a/service/src/test/kotlin/com/sangdol/roomescape/auth/business/LoginHistoryEventListenerTest.kt b/service/src/test/kotlin/com/sangdol/roomescape/auth/business/LoginHistoryEventListenerTest.kt new file mode 100644 index 00000000..55e993b0 --- /dev/null +++ b/service/src/test/kotlin/com/sangdol/roomescape/auth/business/LoginHistoryEventListenerTest.kt @@ -0,0 +1,71 @@ +package com.sangdol.roomescape.auth.business + +import com.sangdol.roomescape.auth.business.domain.PrincipalType +import com.sangdol.roomescape.auth.infrastructure.persistence.LoginHistoryEntity +import com.sangdol.roomescape.supports.IDGenerator +import io.kotest.core.spec.style.FunSpec +import io.kotest.matchers.collections.shouldBeEmpty +import io.kotest.matchers.collections.shouldNotContainAnyOf +import io.kotest.matchers.shouldBe +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.async +import kotlinx.coroutines.withContext +import java.util.concurrent.ConcurrentLinkedQueue +import java.util.concurrent.CountDownLatch + +class LoginHistoryEventListenerTest : FunSpec() { + + val histories: ConcurrentLinkedQueue = ConcurrentLinkedQueue((1..1000).map { + LoginHistoryEntity( + id = IDGenerator.create(), + principalId = IDGenerator.create(), + principalType = PrincipalType.USER, + success = true, + ipAddress = "127.0.0.1", + userAgent = "UserAgent" + ) + }) + + init { + test("ConcurrentLinkedQueue에서 데이터를 꺼내는 작업을 여러 스레드에서 동시 호출해도 중복 처리되지 않는다.") { + withContext(Dispatchers.Default) { + val latch = CountDownLatch(2) + + val flushJob1 = async { + latch.countDown() + latch.await() + flush() + } + + val flushJob2 = async { + latch.countDown() + latch.await() + flush() + } + + val flushJob1Result = flushJob1.await() + val flushJob2Result = flushJob2.await() + + flushJob2Result shouldNotContainAnyOf flushJob1Result + flushJob1Result shouldNotContainAnyOf flushJob2Result + (flushJob1Result.size + flushJob2Result.size) shouldBe 1000 + histories.shouldBeEmpty() + } + } + } + + private fun flush(batchSize: Int = 500): MutableList { + val batch = mutableListOf() + repeat(batchSize) { + val entity: LoginHistoryEntity? = histories.poll() + + if (entity != null) { + batch.add(entity) + } else { + return@repeat + } + } + + return batch + } +}