[#61] 커넥션 고갈 해결을 위한 로그인 이력 저장 비동기 처리 #62

Merged
pricelees merged 9 commits from refactor/#61 into main 2025-10-14 00:28:45 +00:00
2 changed files with 167 additions and 0 deletions
Showing only changes of commit 49e00d91e5 - Show all commits

View File

@ -0,0 +1,96 @@
package com.sangdol.roomescape.auth.business
import com.sangdol.common.persistence.IDGenerator
import com.sangdol.roomescape.auth.business.domain.LoginHistoryEvent
import com.sangdol.roomescape.auth.infrastructure.persistence.LoginHistoryEntity
import com.sangdol.roomescape.auth.infrastructure.persistence.LoginHistoryRepository
import com.sangdol.roomescape.auth.mapper.toEntity
import io.github.oshai.kotlinlogging.KLogger
import io.github.oshai.kotlinlogging.KotlinLogging
import jakarta.annotation.PreDestroy
import org.springframework.beans.factory.annotation.Value
import org.springframework.context.event.EventListener
import org.springframework.scheduling.annotation.Async
import org.springframework.scheduling.annotation.EnableAsync
import org.springframework.scheduling.annotation.EnableScheduling
import org.springframework.scheduling.annotation.Scheduled
import org.springframework.stereotype.Component
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.TimeUnit
private val log: KLogger = KotlinLogging.logger {}
@Component
@EnableAsync
@EnableScheduling
class LoginHistoryEventListener(
private val idGenerator: IDGenerator,
private val loginHistoryRepository: LoginHistoryRepository,
private val queue: ConcurrentLinkedQueue<LoginHistoryEntity> = ConcurrentLinkedQueue()
) {
@Value(value = "\${spring.jpa.properties.hibernate.jdbc.batch_size:100}")
private var batchSize: Int = 0
@Async
@EventListener(classes = [LoginHistoryEvent::class])
fun onLoginCompleted(event: LoginHistoryEvent) {
log.info { "[onLoginCompleted] 로그인 이력 저장 이벤트 수신: id=${event.id}, type=${event.type}" }
queue.add(event.toEntity(idGenerator.create())).also {
log.info { "[onLoginCompleted] 로그인 이력 저장 이벤트 큐 저장 완료: id=${event.id}, type=${event.type}" }
}
if (queue.size >= batchSize) {
flush()
}
}
@Scheduled(fixedRate = 30, timeUnit = TimeUnit.SECONDS)
fun flushScheduled() {
log.info { "[flushScheduled] 큐에 저장된 로그인 이력 저장 시작: size=${queue.size}" }
if (queue.isEmpty()) {
log.info { "[flushScheduled] 큐에 있는 로그인 이력이 없음." }
return
}
flush()
log.info { "[flushScheduled] 큐에 저장된 로그인 이력 저장 완료: size=${queue.size}" }
}
@PreDestroy
fun flushAll() {
log.info { "[flushAll] 애플리케이션 종료. 큐에 있는 모든 이력 저장 시작: size=${queue.size}" }
while (!queue.isEmpty()) {
flush()
}
log.info { "[flushAll] 애플리케이션 종료. 큐에 있는 모든 이력 저장 완료: size=${queue.size}" }
}
private fun flush() {
log.info { "[flush] 큐에 저장된 로그인 이력 저장 시작: size=${queue.size}" }
if (queue.isEmpty()) {
log.info { "[flush] 큐에 있는 로그인 이력이 없음." }
return;
}
val batch = mutableListOf<LoginHistoryEntity>()
repeat(batchSize) {
val entity: LoginHistoryEntity? = queue.poll()
if (entity != null) {
batch.add(entity)
} else {
return@repeat
}
}
if (batch.isEmpty()) {
return
}
loginHistoryRepository.saveAll(batch).also {
log.info { "[flush] 큐에 저장된 로그인 이력 저장 완료: size=${batch.size}" }
}
}
}

View File

@ -0,0 +1,71 @@
package com.sangdol.roomescape.auth.business
import com.sangdol.roomescape.auth.business.domain.PrincipalType
import com.sangdol.roomescape.auth.infrastructure.persistence.LoginHistoryEntity
import com.sangdol.roomescape.supports.IDGenerator
import io.kotest.core.spec.style.FunSpec
import io.kotest.matchers.collections.shouldBeEmpty
import io.kotest.matchers.collections.shouldNotContainAnyOf
import io.kotest.matchers.shouldBe
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.async
import kotlinx.coroutines.withContext
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.CountDownLatch
class LoginHistoryEventListenerTest : FunSpec() {
val histories: ConcurrentLinkedQueue<LoginHistoryEntity> = ConcurrentLinkedQueue((1..1000).map {
LoginHistoryEntity(
id = IDGenerator.create(),
principalId = IDGenerator.create(),
principalType = PrincipalType.USER,
success = true,
ipAddress = "127.0.0.1",
userAgent = "UserAgent"
)
})
init {
test("ConcurrentLinkedQueue에서 데이터를 꺼내는 작업을 여러 스레드에서 동시 호출해도 중복 처리되지 않는다.") {
withContext(Dispatchers.Default) {
val latch = CountDownLatch(2)
val flushJob1 = async {
latch.countDown()
latch.await()
flush()
}
val flushJob2 = async {
latch.countDown()
latch.await()
flush()
}
val flushJob1Result = flushJob1.await()
val flushJob2Result = flushJob2.await()
flushJob2Result shouldNotContainAnyOf flushJob1Result
flushJob1Result shouldNotContainAnyOf flushJob2Result
(flushJob1Result.size + flushJob2Result.size) shouldBe 1000
histories.shouldBeEmpty()
}
}
}
private fun flush(batchSize: Int = 500): MutableList<LoginHistoryEntity> {
val batch = mutableListOf<LoginHistoryEntity>()
repeat(batchSize) {
val entity: LoginHistoryEntity? = histories.poll()
if (entity != null) {
batch.add(entity)
} else {
return@repeat
}
}
return batch
}
}