문제
Spring Kafka를 이용해서 아웃박스 패턴을 간단히 만들어보고 있었다. 도커로 카프카를 띄우고, 프로듀서가 메시지를 잘 전송하는 것을 확인했는데 컨슈머가 브로커와 연결을 못하고 있었다.
상황
도커 컴포즈로 카프카 실행하기
services:
kafka:
image: bitnami/kafka:3.7.1
container_name: kafka
ports:
- "19092:19092"
environment:
- KAFKA_KRAFT_CLUSTER_ID=A616ADF4-FA94-440F-BE52-CC89ED7EC507
- KAFKA_CFG_NODE_ID=0
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_LISTENERS=INTERNAL://:9092,EXTERNAL://:19092,CONTROLLER://:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=INTERNAL://kafka:9092,EXTERNAL://localhost:19092
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=INTERNAL
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@localhost:9093
- ALLOW_PLAINTEXT_LISTENER=yes
volumes:
- kafka_data:/bitnami/kafka
kafka-ui:
image: provectuslabs/kafka-ui:latest
ports:
- "8081:8080"
depends_on:
- kafka
environment:
- DYNAMIC_CONFIG_ENABLED=true
- KAFKA_CLUSTERS_0_NAME=local
- KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:9092
- KAFKA_CLUSTERS_0_PROPERTIES_SECURITY_PROTOCOL=PLAINTEXT
volumes:
kafka_data:
도커 컴포즈는 위와 같다. 사실 설정을 잘 한건지 아직도 모르겠다. 계속 공부하는데 어려워..ㅠ
일단 간략하게 설명하자면, KAFKA_CFG_ADVERTISED_LISTENERS는 advertised.listeners 설정으로, 클라이언트에게 브로커의 접속 주소를 알려주는 설정이다. 그리고 EXTERNAL://localhost:19092는 내부 브로커 간이 아닌, 외부 클라이언트는 여기로 접속하라는 의미이다.
따라서 클라이언트는 이를 통해 localhost:19092로 요청을 보내게 된다. 즉, 같은 로컬이면 상관 없겠지만, 다른 네트워크라면 못 찾게 되니 실제 서버를 띄울 때는 도메인을 입력해야 한다.
또 나는 도커를 사용했고, 컨테이너의 포트를 19092:19092로 외부-내부를 연결해뒀으니, 로컬 컴퓨터에서 19092로 요청을 보내면 이 카프카 컨테이너로 요청이 가고, 컨테이너는 이를 내부의 19092 포트로 포워딩한다.
그리고 KAFKA_CFG_LISTENERS=EXTERNAL://:19092 설정은 카프카 브로커가 실제로 수신할 주소이다. 이때 :19092는 0.0.0.0:19092를 줄인 것으로, 모든 IP에 대해서 포트만 19092 이기만 하면 받겠다는 뜻.
스프링 카프카로 컨슈머 설정하기
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.ContainerProperties.AckMode;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.kafka.support.ExponentialBackOffWithMaxRetries;
import org.springframework.kafka.support.serializer.JsonSerializer;
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092"); // 브로커 접속 주소
props.put(ConsumerConfig.GROUP_ID_CONFIG, "payment-group"); // 컨슈머 그룹
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); // 키는 문자열
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); // 값은 문자열
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 수동 커밋
return new DefaultKafkaConsumerFactory<>(props);
}
// 여기 !!!
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> paymentRefundListenerContainerFactory(
KafkaTemplate<Object, Object> kafkaDlqTemplate) {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
// 수동 커밋 설정
factory.getContainerProperties().setAckMode(AckMode.MANUAL);
// 에러 핸들러 + 재시도 5회 + DLQ 설정
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(
kafkaDlqTemplate,
(record, ex) -> new TopicPartition(record.topic() + ".DLT", record.partition())
);
// 지수 백오프 설정: 초기 1초, 최대 5번 (4회 재시도 + 마지막 1회)
ExponentialBackOffWithMaxRetries backOff = new ExponentialBackOffWithMaxRetries(4); // 4회 재시도
backOff.setInitialInterval(1000L); // 1초
backOff.setMultiplier(2.0); // x2씩 증가 → 1s, 2s, 4s, 8s
backOff.setMaxInterval(10000L); // 최대 10초
factory.setCommonErrorHandler(new DefaultErrorHandler(recoverer, backOff));
return factory;
}
@Bean
public KafkaTemplate<Object, Object> kafkaDlqTemplate() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(props));
}
}
코드를 전부 다 볼 필요는 없다.
2번째 빈에 여기 !!! 라고 적어둔 곳의 paymentRefundListenerContainerFactory라는 메서드명만 보면 된다.
스프링은 빈 등록 시 메서드명을 기준으로 빈이름이 등록된다. 그리고 빈 조회 시 타입을 우선으로 찾고, 같은 타입의 빈이 여러 개면 빈 이름으로 찾는다.
문제는, 스프링 카프카가 제공하는 @KafkaListener 어노테이션은 따로 속성으로 containerFactory 속성에 팩토리명을 지정하지 않으면 기본값으로 "kafkaListenerContainerFactory"를 조회한다.
INFO [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-payment-group-1, groupId=payment-group] Node -1 disconnected.
WARN [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-payment-group-1, groupId=payment-group] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Node may not be available.
WARN [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-payment-group-1, groupId=payment-group] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected

이런 로그가 뜬다.
스프링 카프카의 자동 설정
나는 localhost:9092를 설정해준 적이 없다. 하지만 여기로 요청하는 이유는 스프링 카프카가 kafkaListenerContainerFactory가 없으면 스프링이 자체적으로 만들어주기 때문이다.
package org.springframework.boot.autoconfigure.kafka;
// ...
@AutoConfiguration
@ConditionalOnClass(KafkaTemplate.class)
@EnableConfigurationProperties(KafkaProperties.class)
@Import({ KafkaAnnotationDrivenConfiguration.class, KafkaStreamsAnnotationDrivenConfiguration.class })
@ImportRuntimeHints(KafkaAutoConfiguration.KafkaRuntimeHints.class)
public class KafkaAutoConfiguration {
private final KafkaProperties properties;
KafkaAutoConfiguration(KafkaProperties properties) {
this.properties = properties;
}
// ...
@Bean
@ConditionalOnMissingBean(KafkaTemplate.class)
public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory,
ProducerListener<Object, Object> kafkaProducerListener,
ObjectProvider<RecordMessageConverter> messageConverter) {
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);
messageConverter.ifUnique(kafkaTemplate::setMessageConverter);
map.from(kafkaProducerListener).to(kafkaTemplate::setProducerListener);
map.from(this.properties.getTemplate().getDefaultTopic()).to(kafkaTemplate::setDefaultTopic);
map.from(this.properties.getTemplate().getTransactionIdPrefix()).to(kafkaTemplate::setTransactionIdPrefix);
map.from(this.properties.getTemplate().isObservationEnabled()).to(kafkaTemplate::setObservationEnabled);
return kafkaTemplate;
}
@Bean
@ConditionalOnMissingBean(ProducerListener.class)
public LoggingProducerListener<Object, Object> kafkaProducerListener() {
return new LoggingProducerListener<>();
}
// 여기 !!!
@Bean
@ConditionalOnMissingBean(ConsumerFactory.class)
DefaultKafkaConsumerFactory<?, ?> kafkaConsumerFactory(KafkaConnectionDetails connectionDetails,
ObjectProvider<DefaultKafkaConsumerFactoryCustomizer> customizers) {
Map<String, Object> properties = this.properties.buildConsumerProperties();
applyKafkaConnectionDetailsForConsumer(properties, connectionDetails);
DefaultKafkaConsumerFactory<Object, Object> factory = new DefaultKafkaConsumerFactory<>(properties);
customizers.orderedStream().forEach((customizer) -> customizer.customize(factory));
return factory;
}
KafkaAutoConfiguration 클래스는 별도의 ConsumerFactory가 없으면 KafkaConsumerFactory 빈으로 등록해준다.
@ConfigurationProperties("spring.kafka")
public class KafkaProperties {
/**
* List of host:port pairs to use for establishing the initial connections to the
* Kafka cluster. Applies to all components unless overridden.
*/
private List<String> bootstrapServers = new ArrayList<>(Collections.singletonList("localhost:9092"));
// ...
}
이때 기본 설정은 KafkaProperties를 기반으로 하는데, spring.kafka.*를 기반으로 하고, 부트스트랩 서버의 기본값은 localhost:9092이다. 이래서 요청을 여기로 보냈던 것.
@Bean
@ConditionalOnMissingBean(name = "kafkaListenerContainerFactory")
ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ObjectProvider<ConsumerFactory<Object, Object>> kafkaConsumerFactory,
ObjectProvider<ContainerCustomizer<Object, Object, ConcurrentMessageListenerContainer<Object, Object>>> kafkaContainerCustomizer) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory
.getIfAvailable(() -> new DefaultKafkaConsumerFactory<>(this.properties.buildConsumerProperties())));
kafkaContainerCustomizer.ifAvailable(factory::setContainerCustomizer);
return factory;
}
그리고 KafkaAutoConfiguration은 KafkaAnnotationDrivenConfiguration를 임포트 하고 있는데, 이 클래스 내에서는 kafkaContainerFactory를 기반으로 kafkaListenerContainerFactory를 빈 등록하는 코드가 있다.
@Target({ ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE })
@Retention(RetentionPolicy.RUNTIME)
@MessageMapping
@Documented
@Repeatable(KafkaListeners.class)
public @interface KafkaListener {
/**
* The bean name of the {@link org.springframework.kafka.config.KafkaListenerContainerFactory}
* to use to create the message listener container responsible to serve this endpoint.
* <p>
* If not specified, the default container factory is used, if any. If a SpEL
* expression is provided ({@code #{...}}), the expression can either evaluate to a
* container factory instance or a bean name.
* @return the container factory bean name.
*/
String containerFactory() default "";
// ...
}

마지막으로 @KafkaListener는 containerFactory를 따로 지정하지 않으면 기본값으로 kafkaListenerContainerFactory를 사용한다.
해결
방법 1
아까 위의 빈 메서드명을 paymentRefundListenerContainerFactory에서 kafkaListenerContainerFactory로 변경해 주면 @KafkaListener가 해당 컨테이너 팩토리를 찾기 때문에 정상 동작한다.
방법 2
@KafkaListener(
// ...
containerFactory = "paymentRefundListenerContainerFactory"
)
이렇게 카프카 리스너에 컨테이너 팩토리를 명시적으로 빈 이름으로 작성해주면 된다.
'TIL' 카테고리의 다른 글
| TIL #132 : Arrays.fill의 얕은 복사로 인한 참조 공유 이슈 (0) | 2025.06.14 |
|---|---|
| TIL #130 : 컨트롤러 테스트에서 커스텀 UserDetails 인증 객체 사용하기 (0) | 2025.04.10 |
| TIL #129 : 구글 번역 → next-intl 전환으로 렌더링 속도 70% 개선 (2.34s → 0.71s) (0) | 2025.04.08 |
| TIL #128 : PNG 이미지를 WebP 확장자로 변환하여 95% 용량 절감 (0) | 2025.04.07 |
| TIL #127 : Axios Interceptor 도입으로 인증 공통화 및 141줄 절감 (0) | 2025.04.07 |