diff --git a/src/main/java/com/sangdol/consumer/domain/TestRecord.java b/src/main/java/com/sangdol/consumer/domain/TestRecord.java new file mode 100644 index 0000000..e08bae4 --- /dev/null +++ b/src/main/java/com/sangdol/consumer/domain/TestRecord.java @@ -0,0 +1,8 @@ +package com.sangdol.consumer.domain; + +public record TestRecord( + Integer userId, + String topic, + String timeStamp +) { +} diff --git a/src/main/java/com/sangdol/consumer/infrastructure/kafka/common/serialize/JsonObjectMapper.java b/src/main/java/com/sangdol/consumer/infrastructure/kafka/common/serialize/JsonObjectMapper.java new file mode 100644 index 0000000..dd3e3e3 --- /dev/null +++ b/src/main/java/com/sangdol/consumer/infrastructure/kafka/common/serialize/JsonObjectMapper.java @@ -0,0 +1,50 @@ +package com.sangdol.consumer.infrastructure.kafka.common.serialize; + +import org.springframework.stereotype.Component; + +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Component +public class JsonObjectMapper { + + private final ObjectMapper objectMapper; + + public JsonObjectMapper() { + this.objectMapper = new ObjectMapper() + .registerModule(new JavaTimeModule()) + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + } + + public T deserialize(String json, Class clazz) { + try { + return objectMapper.readValue(json, clazz); + } catch (Exception e) { + log.info("[JsonObjectMapper] Exception Occurred while deserializing data={} to class = {}", json, clazz, e); + return null; + } + } + + public T deserialize(Object object, Class clazz) { + try { + return objectMapper.convertValue(object, clazz); + } catch (Exception e) { + log.info("[JsonObjectMapper] Exception Occurred while deserializing data={} to class = {}", object, clazz, + e); + return null; + } + } + + public String serialize(Object object) { + try { + return objectMapper.writeValueAsString(object); + } catch (Exception e) { + log.info("[JsonObjectMapper] Exception Occurred while serializing object = {}", object, e); + return null; + } + } +} diff --git a/src/main/java/com/sangdol/consumer/infrastructure/kafka/common/serialize/TestRecordDeserializer.java b/src/main/java/com/sangdol/consumer/infrastructure/kafka/common/serialize/TestRecordDeserializer.java new file mode 100644 index 0000000..c400cf4 --- /dev/null +++ b/src/main/java/com/sangdol/consumer/infrastructure/kafka/common/serialize/TestRecordDeserializer.java @@ -0,0 +1,26 @@ +package com.sangdol.consumer.infrastructure.kafka.common.serialize; + +import java.nio.charset.StandardCharsets; + +import org.apache.kafka.common.serialization.Deserializer; + +import com.sangdol.consumer.domain.TestRecord; + +public class TestRecordDeserializer implements Deserializer { + + private final JsonObjectMapper jsonObjectMapper; + + public TestRecordDeserializer() { + this.jsonObjectMapper = new JsonObjectMapper(); + } + + // null-return이 아닌 DLT 전송 등의 예외 처리 필요. + @Override + public TestRecord deserialize(String topic, byte[] data) { + if (data == null) { + return null; + } + + return jsonObjectMapper.deserialize(new String(data, StandardCharsets.UTF_8), TestRecord.class); + } +} diff --git a/src/main/java/com/sangdol/consumer/infrastructure/kafka/common/serialize/TestRecordSerializer.java b/src/main/java/com/sangdol/consumer/infrastructure/kafka/common/serialize/TestRecordSerializer.java new file mode 100644 index 0000000..05caa0a --- /dev/null +++ b/src/main/java/com/sangdol/consumer/infrastructure/kafka/common/serialize/TestRecordSerializer.java @@ -0,0 +1,25 @@ +package com.sangdol.consumer.infrastructure.kafka.common.serialize; + +import java.nio.charset.StandardCharsets; + +import org.apache.kafka.common.serialization.Serializer; + +import com.sangdol.consumer.domain.TestRecord; + +public class TestRecordSerializer implements Serializer { + + private final JsonObjectMapper jsonObjectMapper; + + public TestRecordSerializer() { + this.jsonObjectMapper = new JsonObjectMapper(); + } + + @Override + public byte[] serialize(String topic, TestRecord data) { + if (data == null) { + return null; + } + + return jsonObjectMapper.serialize(data).getBytes(StandardCharsets.UTF_8); + } +}