[#3] 컨슈머를 제외한 나머지 애플리케이션 코드 마이그레이션 #4
@ -0,0 +1,8 @@
|
|||||||
|
package com.sangdol.consumer.domain;
|
||||||
|
|
||||||
|
public record TestRecord(
|
||||||
|
Integer userId,
|
||||||
|
String topic,
|
||||||
|
String timeStamp
|
||||||
|
) {
|
||||||
|
}
|
||||||
@ -0,0 +1,50 @@
|
|||||||
|
package com.sangdol.consumer.infrastructure.kafka.common.serialize;
|
||||||
|
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.DeserializationFeature;
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
|
||||||
|
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
|
@Component
|
||||||
|
public class JsonObjectMapper {
|
||||||
|
|
||||||
|
private final ObjectMapper objectMapper;
|
||||||
|
|
||||||
|
public JsonObjectMapper() {
|
||||||
|
this.objectMapper = new ObjectMapper()
|
||||||
|
.registerModule(new JavaTimeModule())
|
||||||
|
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
public <T> T deserialize(String json, Class<T> clazz) {
|
||||||
|
try {
|
||||||
|
return objectMapper.readValue(json, clazz);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.info("[JsonObjectMapper] Exception Occurred while deserializing data={} to class = {}", json, clazz, e);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public <T> T deserialize(Object object, Class<T> clazz) {
|
||||||
|
try {
|
||||||
|
return objectMapper.convertValue(object, clazz);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.info("[JsonObjectMapper] Exception Occurred while deserializing data={} to class = {}", object, clazz,
|
||||||
|
e);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public String serialize(Object object) {
|
||||||
|
try {
|
||||||
|
return objectMapper.writeValueAsString(object);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.info("[JsonObjectMapper] Exception Occurred while serializing object = {}", object, e);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -0,0 +1,26 @@
|
|||||||
|
package com.sangdol.consumer.infrastructure.kafka.common.serialize;
|
||||||
|
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
|
||||||
|
import org.apache.kafka.common.serialization.Deserializer;
|
||||||
|
|
||||||
|
import com.sangdol.consumer.domain.TestRecord;
|
||||||
|
|
||||||
|
public class TestRecordDeserializer implements Deserializer<TestRecord> {
|
||||||
|
|
||||||
|
private final JsonObjectMapper jsonObjectMapper;
|
||||||
|
|
||||||
|
public TestRecordDeserializer() {
|
||||||
|
this.jsonObjectMapper = new JsonObjectMapper();
|
||||||
|
}
|
||||||
|
|
||||||
|
// null-return이 아닌 DLT 전송 등의 예외 처리 필요.
|
||||||
|
@Override
|
||||||
|
public TestRecord deserialize(String topic, byte[] data) {
|
||||||
|
if (data == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
return jsonObjectMapper.deserialize(new String(data, StandardCharsets.UTF_8), TestRecord.class);
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -0,0 +1,25 @@
|
|||||||
|
package com.sangdol.consumer.infrastructure.kafka.common.serialize;
|
||||||
|
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
|
||||||
|
import org.apache.kafka.common.serialization.Serializer;
|
||||||
|
|
||||||
|
import com.sangdol.consumer.domain.TestRecord;
|
||||||
|
|
||||||
|
public class TestRecordSerializer implements Serializer<TestRecord> {
|
||||||
|
|
||||||
|
private final JsonObjectMapper jsonObjectMapper;
|
||||||
|
|
||||||
|
public TestRecordSerializer() {
|
||||||
|
this.jsonObjectMapper = new JsonObjectMapper();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public byte[] serialize(String topic, TestRecord data) {
|
||||||
|
if (data == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
return jsonObjectMapper.serialize(data).getBytes(StandardCharsets.UTF_8);
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
x
Reference in New Issue
Block a user