문제 : 카프카 직렬화/역직렬화 처리 중 만난 에러들
하도 많이 만나서 일단 생각나는 것만 적고..
에러 1 : 객체로 요청 및 응답받고 싶으면 Value의 시리얼라이저를 Json으로 하기.
물론 문자열로도 가능하긴 한데, 요청/응답마다 ObjectMapper로 직렬화/역직렬화하기보다, 카프카에서도 JsonSerializer를 제공해 주는데 굳이 문자열로 쓸 필요는 없을 것 같다. 또 컴파일 때 요청 보낼 객체의 타입검사를 해주기도 하고.
import ex.application.order.message.DeliveryMessage;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, DeliveryMessage> producerFactory() { // 여기 제네릭 부분 !!!
Map<String, Object> configProperties = Map.of(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092",
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class, // 여기 !!!
);
return new DefaultKafkaProducerFactory<>(configProperties);
}
@Bean
public KafkaTemplate<String, DeliveryMessage> kafkaTemplate() { // 여기 제네릭 부분 !!!
return new KafkaTemplate<>(producerFactory());
}
}
에러 2 : 컨슈머 측은 역직렬화로 설정해주어야 한다.
사실 당연한 건데.. 컨슈머 측에서 시리얼라이저로 했었다.
import ex.application.product.message.DeliveryMessage;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, DeliveryMessage> consumerFactory() {
Map<String, Object> configProperties = Map.of(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class, // 여기 !!!
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class, // 여기 !!!
);
return new DefaultKafkaConsumerFactory<>(configProperties);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, DeliveryMessage> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, DeliveryMessage> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG 속성의 값을 JsonDeserializer.class 로 해줬다.
ConsumerConfig 인 것도 몰랐고,, 복붙하느라,, Deserializer 인 줄도 몰랐다...
에러 3 : 프로듀서, 컨슈머 측에서 서로 다른 패키지 및 타입 정보를 맞춰줘야 한다.
맞다.. 난 이것도 모르는 감자다..
우선 위 코드로 보내면 컨슈머 측에서 에러난다.
// key
0059d5e2-dcfc-4cc7-853c-0207890c772f
// value
{
"orderId": "0059d5e2-dcfc-4cc7-853c-0207890c772f",
"paymentId": null,
"userId": "yun01",
"productId": 1,
"productQuantity": 10,
"payAmount": 100,
"errorType": null
}
// headers // 여기 !!!
{
"__TypeId__": "ex.application.order.message.DeliveryMessage"
}
일단 프로듀서 측에서는 DeliveryMessage 객체의 위치를 헤더에 담아서 보낸다.
하지만 컨슈머 측의 DeliveryMessage 객체의 위치는 ex.application.product.message.DeliveryMessage이다.
따라서 컨슈머는 ex.application.order.message.DeliveryMessage 에서 DeliveryMessage객체를 찾다가 없어서 에러를 뱉는다.
@Bean
public ProducerFactory<String, DeliveryMessage> producerFactory() {
Map<String, Object> configProperties = Map.of(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092",
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class,
JsonSerializer.ADD_TYPE_INFO_HEADERS, false // 여기!!!
);
return new DefaultKafkaProducerFactory<>(configProperties);
}
일단 나는 프로젝트마다 패키지명을 다르게 해 줄 생각이어서 프로듀서 측에서 타입 정보를 보내지 않도록 했다.
@Bean
public ConsumerFactory<String, DeliveryMessage> consumerFactory() {
Map<String, Object> configProperties = Map.of(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class,
JsonDeserializer.USE_TYPE_INFO_HEADERS, false, // 여기 !!!
JsonDeserializer.VALUE_DEFAULT_TYPE, "ex.application.product.message.DeliveryMessage" // 여기 !!!
);
return new DefaultKafkaConsumerFactory<>(configProperties);
}
그다음 컨슈머 측에서는 어차피 필요 없는 타입 정보를 사용하지 않도록 하고, 컨슈머 프로젝트의 DeliveryMessage 객체의 위치를 지정해 주었다.
해결..!
최종 에러 없는 코드
프로듀서 측 코드
import ex.application.order.message.DeliveryMessage;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, DeliveryMessage> producerFactory() {
Map<String, Object> configProperties = Map.of(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092",
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class,
JsonSerializer.ADD_TYPE_INFO_HEADERS, false
);
return new DefaultKafkaProducerFactory<>(configProperties);
}
@Bean
public KafkaTemplate<String, DeliveryMessage> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
컨슈머 측 코드
import ex.application.product.message.DeliveryMessage;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, DeliveryMessage> consumerFactory() {
Map<String, Object> configProperties = Map.of(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class,
JsonDeserializer.USE_TYPE_INFO_HEADERS, false,
JsonDeserializer.VALUE_DEFAULT_TYPE, "ex.application.product.message.DeliveryMessage"
);
return new DefaultKafkaConsumerFactory<>(configProperties);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, DeliveryMessage> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, DeliveryMessage> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
'TIL ✍️' 카테고리의 다른 글
TIL #111 : ID를 가진 JPA 엔티티 생성 시 SELECT 문이 나가는 문제 (2) | 2024.11.11 |
---|---|
24/10/04(금) 110번째 TIL : 컴포넌트 스캔 패키지 구조 문제 (0) | 2024.10.07 |
24/08/28(수) 108번째 TIL : EmbeddedId 식별자 값객체 (0) | 2024.09.09 |
24/08/27(화) 107번째 TIL : Embeddable 값객체 (0) | 2024.09.09 |
24/08/26(월) 106번째 TIL : mapStruct와 @Getter 사용 시 boolean 필드 매핑 (1) | 2024.09.01 |