[스프링 인 액션] 8장 Kafka : 비동기 메시지 전송하기
- 📕 Book/스프링 인 액션
- 2021. 7. 31.
Kafka 로 메시지 전송하기
💻 실습 : https://github.com/cusbert/spring-in-action-5th
🎯 이 장에서 배우는 내용
- 비동기 메시지 전송
- JMS, RabbitMQ, 카프카 Kafka를 사용해서 메시지 전송하기
- 브로커에서 메시지 가져오기
- 메시지 리스닝하기
8.3 카프카 사용하기
카프카는 확장성을 제공하는 cluster 로 실행되도록 설계되었다.
클러스터의 모든 카프카 인스턴스에 걸쳐 토픽(topic) 을 파티션(Partition) 으로 분할하여 메시지를 관리한다.
RabbitMQ 가 exchange 와 큐를 사용해서 메시지를 처리하는 반면 카프카는 토픽만 사용한다.
카프카의 토픽은 클러스터의 모든 브로커에 걸쳐 복제된다. (replicated)
클러스터의 각 노드는 하나 이상의 토픽에 대한 리더(leader) 로 동작하며, 토픽 데이터를 관리하고 클러스터의 다른 노드로 데이터를 복제한다.
각 토픽은 여러개의 파티션으로 분할 될 수 있다. 이 경우 클러스터의 각 노드는 한 토픽의 하나 이상의 파티션(토픽 전체가 아닌) 의 리더가 된다.
카프카 클러스터는 여러 개의 브로커로 구성되며 각 브로커는 토픽의 파티션 리더로 동작한다.
8.3.0 카프카 설치하기
docoker 로 카프카 설치
https://www.baeldung.com/ops/kafka-docker-setup
8.3.1 카프카 사용을 위해 스프링 설정하기
pom.xml 에 카프카 의존성 추가
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
KaftaTemplate 은 기본적으로 localhost에서 실행되면서 9092 포트를 리스닝 하는 카프카 브로커를 사용한다. 실무 환경으로 이양할 때는 다른 호스트와 포트로 구성해야한다.
- spring.kafka.bootstrap-servers : 카프카 클러스터로의 초기 연결에 사용되는 복수개의 카프카 서버들의 위치 설정
aplication.yml 에 설정 추가
spring:
kafka:
bootstrap-servers:
- kafka.tacocloud.com:9092
- kafka.tacocloud.com:9093
- kafka.tacocloud.com:9094
8.3.2 KafkaTemplate을 사용해서 메시지 전송하기
ListenableFuture<SendResult<K, V>> send(String topic, V data);
ListenableFuture<SendResult<K, V>> send(String topic, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);
ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);
ListenableFuture<SendResult<K, V>> send(Message<?> message);
ListenableFuture<SendResult<K, V>> sendDefault(V data);
ListenableFuture<SendResult<K, V>> sendDefault(K key, V data);
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);
카프카에서 메시지를 전송할 때는 메시지가 전송되는 방법을 알려주는 다음 매개변수를 지정할 수 있다.
- 메시지가 전송될 토픽 (Required send() )
- payload(Required, 메시지에 적재된 순수한 데이터)
- 토픽 데이터를 쓰는 파티션(Optional)
- 레코드 전송(Optional)
- 타입스탬프(Optional,defaults to System.currentTimeMillis())
topic 과 payload 는 가장 중요한 매개변수들이다. 파티션과 키는 send() 와 sendDefault()에 매개변수로 제공되는 추가 정보일 뿐이다.
KafkaTemplate 를 이용해서 주문 데이터 전송하기
@Service
public class KafkaOrderMessagingService implements OrderMessagingService {
private KafkaTemplate<String, Order> kafkaTemplate;
@Autowired
public KafkaOrderMessagingService(
KafkaTemplate<String, Order> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
@Override
public void sendOrder(Order order) {
kafkaTemplate.send("tacocloud.orders.topic", order);
}
}
spring.kafka.template.default-topic을 지정 후 sendDefault() 로 데이터 전송하기
이 때는 토픽 이름을 인자로 지정하지 않아도 된다.
spring:
kafka:
template:
default-topic: tacocloud.orders.topic
@Override
public void sendOrder(Order order) {
kafkaTemplate.sendDefault(order);
}
8.3.3 카프카 리스너 작성하기
kafkaTemplate 은 메시지를 수신하는 메서드를 제공하지 않는다. 이는 JmsTemplate 이나 RabbitTemplate 과 다른 점이다. 따라서 스프링을 사용해서 카프카 토픽의 메시지를 가져오는 유일한 방법은 메시지 리스너를 작성하는 것이다.
카프카의 메시지 리스너는 @KafkaListener
어노테이션이 지정된 메서드에 지정된다.
@KafkaListener
를 사용해서 데이터 수신하기
@Component
@Slf4j
public class OrderListener {
private KitchenUI kitchenUI;
@Autowired
public OrderListener(KitchenUI kitchenUI) {
this.kitchenUI = kitchenUI;
}
@KafkaListener(topics = "tacocloud.orders.topic")
public void handle(Order order) {
kitchenUI.displayOrder(order);
}
// record 부가정보 받기
@KafkaListener(topics="tacocloud.orders.topic")
public void handle(Order order, ConsumerRecord<String, Order> record) {
log.info("Received from partition {} with timestamp {}",
record.partition(), record.timestamp());
kitchenUI.displayOrder(order);
}
// Message 부가정보 받기
@KafkaListener(topics = "tacocloud.orders.topic")
public void handle(Order order, Message<Order> message) {
MessageHeaders headers = message.getHeaders();
log.info("Received from partition {} with timestamp {}",
headers.get(KafkaHeaders.RECEIVED_PARTITION_ID),
headers.get(KafkaHeaders.RECEIVED_TIMESTAMP));
kitchenUI.displayOrder(order);
}
}
@KafkaListener(topics="tacocloud.orders.topic")
: tacocloud.orders.topic 이라는 이름의 topic 에 메시지가 도착할 때 자동으로 호출된다.- payload 인 Order 객체를 handle() 메소드의 인자로 받는다
- 추가적인 메타데이터가 필요하면 ConsumerRecord 나 Message 객체도 받을 수 있다.
메시지 페이로드는 ConsumerRecord.value()
나 Message.getPayload()
를 사용해서도 받을 수 있다. 즉 handle()의 매개 변수로 직접 Order 객체를 요청하는 대신 ConsumerRecord 나 Message 객체를 통해 Order 객체를 요청할 수 있다.
📌 요약
- 애플리케이션 간 비동기 메시지 큐를 이용한 통신 방식은 간접 계층을 제공하므로 애플리케이션 간의 결합도는 낮추면서 확장성은 높인다
- 스프링은 JMS, RabbitMQ 또는 아파치 카프카를 사용해서 비동기 메시지을 지원한다.
- template-based clients (JmsTemplate, RabbitTemplate, or KafkaTemplate)을 사용해서 메시지 브로커를 통한 메시지 전송을 할 수 있다.
- 메시지 수신 애플리케이션은 같은 템플릿 기반의 클라이언트들을 사용해서 Pull 모델 형태의 메시지 consume 할 수 있다.
- message listener 어노테이션 (
@JmsListener
,@RabbitListener
, or@KafkaListener
) 를 Bean 메서드에 지정하면 푸쉬모델 형태로 컨슈머에게 메시지가 전송될 수 있다.