From ef8c526fec157e21fff15082a2981d793de5b831 Mon Sep 17 00:00:00 2001 From: pricelees Date: Fri, 27 Jun 2025 13:57:15 +0900 Subject: [PATCH] =?UTF-8?q?feat:=20=ED=85=8C=EC=8A=A4=ED=8A=B8=EC=9A=A9=20?= =?UTF-8?q?=EB=A0=88=EC=BD=94=EB=93=9C=20=ED=81=B4=EB=9E=98=EC=8A=A4?= =?UTF-8?q?=EC=99=80=20=EC=A7=81=EB=A0=AC=ED=99=94,=20=EC=97=AD=EC=A7=81?= =?UTF-8?q?=EB=A0=AC=ED=99=94=20=ED=81=B4=EB=9E=98=EC=8A=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../sangdol/consumer/domain/TestRecord.java | 8 +++ .../common/serialize/JsonObjectMapper.java | 50 +++++++++++++++++++ .../serialize/TestRecordDeserializer.java | 26 ++++++++++ .../serialize/TestRecordSerializer.java | 25 ++++++++++ 4 files changed, 109 insertions(+) create mode 100644 src/main/java/com/sangdol/consumer/domain/TestRecord.java create mode 100644 src/main/java/com/sangdol/consumer/infrastructure/kafka/common/serialize/JsonObjectMapper.java create mode 100644 src/main/java/com/sangdol/consumer/infrastructure/kafka/common/serialize/TestRecordDeserializer.java create mode 100644 src/main/java/com/sangdol/consumer/infrastructure/kafka/common/serialize/TestRecordSerializer.java diff --git a/src/main/java/com/sangdol/consumer/domain/TestRecord.java b/src/main/java/com/sangdol/consumer/domain/TestRecord.java new file mode 100644 index 0000000..e08bae4 --- /dev/null +++ b/src/main/java/com/sangdol/consumer/domain/TestRecord.java @@ -0,0 +1,8 @@ +package com.sangdol.consumer.domain; + +public record TestRecord( + Integer userId, + String topic, + String timeStamp +) { +} diff --git a/src/main/java/com/sangdol/consumer/infrastructure/kafka/common/serialize/JsonObjectMapper.java b/src/main/java/com/sangdol/consumer/infrastructure/kafka/common/serialize/JsonObjectMapper.java new file mode 100644 index 0000000..dd3e3e3 --- /dev/null +++ b/src/main/java/com/sangdol/consumer/infrastructure/kafka/common/serialize/JsonObjectMapper.java @@ -0,0 +1,50 @@ +package com.sangdol.consumer.infrastructure.kafka.common.serialize; + +import org.springframework.stereotype.Component; + +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Component +public class JsonObjectMapper { + + private final ObjectMapper objectMapper; + + public JsonObjectMapper() { + this.objectMapper = new ObjectMapper() + .registerModule(new JavaTimeModule()) + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + } + + public T deserialize(String json, Class clazz) { + try { + return objectMapper.readValue(json, clazz); + } catch (Exception e) { + log.info("[JsonObjectMapper] Exception Occurred while deserializing data={} to class = {}", json, clazz, e); + return null; + } + } + + public T deserialize(Object object, Class clazz) { + try { + return objectMapper.convertValue(object, clazz); + } catch (Exception e) { + log.info("[JsonObjectMapper] Exception Occurred while deserializing data={} to class = {}", object, clazz, + e); + return null; + } + } + + public String serialize(Object object) { + try { + return objectMapper.writeValueAsString(object); + } catch (Exception e) { + log.info("[JsonObjectMapper] Exception Occurred while serializing object = {}", object, e); + return null; + } + } +} diff --git a/src/main/java/com/sangdol/consumer/infrastructure/kafka/common/serialize/TestRecordDeserializer.java b/src/main/java/com/sangdol/consumer/infrastructure/kafka/common/serialize/TestRecordDeserializer.java new file mode 100644 index 0000000..c400cf4 --- /dev/null +++ b/src/main/java/com/sangdol/consumer/infrastructure/kafka/common/serialize/TestRecordDeserializer.java @@ -0,0 +1,26 @@ +package com.sangdol.consumer.infrastructure.kafka.common.serialize; + +import java.nio.charset.StandardCharsets; + +import org.apache.kafka.common.serialization.Deserializer; + +import com.sangdol.consumer.domain.TestRecord; + +public class TestRecordDeserializer implements Deserializer { + + private final JsonObjectMapper jsonObjectMapper; + + public TestRecordDeserializer() { + this.jsonObjectMapper = new JsonObjectMapper(); + } + + // null-return이 아닌 DLT 전송 등의 예외 처리 필요. + @Override + public TestRecord deserialize(String topic, byte[] data) { + if (data == null) { + return null; + } + + return jsonObjectMapper.deserialize(new String(data, StandardCharsets.UTF_8), TestRecord.class); + } +} diff --git a/src/main/java/com/sangdol/consumer/infrastructure/kafka/common/serialize/TestRecordSerializer.java b/src/main/java/com/sangdol/consumer/infrastructure/kafka/common/serialize/TestRecordSerializer.java new file mode 100644 index 0000000..05caa0a --- /dev/null +++ b/src/main/java/com/sangdol/consumer/infrastructure/kafka/common/serialize/TestRecordSerializer.java @@ -0,0 +1,25 @@ +package com.sangdol.consumer.infrastructure.kafka.common.serialize; + +import java.nio.charset.StandardCharsets; + +import org.apache.kafka.common.serialization.Serializer; + +import com.sangdol.consumer.domain.TestRecord; + +public class TestRecordSerializer implements Serializer { + + private final JsonObjectMapper jsonObjectMapper; + + public TestRecordSerializer() { + this.jsonObjectMapper = new JsonObjectMapper(); + } + + @Override + public byte[] serialize(String topic, TestRecord data) { + if (data == null) { + return null; + } + + return jsonObjectMapper.serialize(data).getBytes(StandardCharsets.UTF_8); + } +}