[스프링 인 액션] 8장 RabbitMQ : 비동기 메시지 전송하기
- 📕 Book/스프링 인 액션
- 2021. 7. 27.
RabbitMQ 메시지 전송하기
💻 실습 : https://github.com/cusbert/spring-in-action-5th
🎯 이 장에서 배우는 내용
- 비동기 메시지 전송
- JMS, RabbitMQ, 카프카 Kafka를 사용해서 메시지 전송하기
- 브로커에서 메시지 가져오기
- 메시지 리스닝하기
8.2 RabbitMQ 와 AMQP 사용하기
AMQP(Advanced Message Queueing Protocol)의 가장 중요한 구현이라 할 수 있는 RabbitMQ 는 JMS 보다 더 진보된 메시지 라우팅 전략을 제공한다. JMS 메시지가 수신자가 가져갈 메시지 도착지 이름을 주소로 사용하는 반면, AMQP 메시지는 수신자가 리스닝하는 큐와 비리된 거래소(exchange) 이름과 라우팅 키를 주소로 사용한다.
RabbitMQ exchange 로 전송되는 메시지는 라우팅 키와 바인등을 기반으로 하나 이상의 큐로 전달 된다.
Producers -> RabbitMQ Broker [Exchange -- Binding --> Queue] -> Consumers
exchange 의 종류
- Default : 브로커가 자동으로 생성하는 특별한 거래소. 메시지의 routing key와 이름이 같은 큐로 메시지를 전달 한다.
- Direct : 바인딩 키가 메시지의 routing key와 같은 큐에 메시지를 전달한다.
- Topic : 바인딩 키(와일드카드를 포함하는)가 메시지의 routing key와 일치하는 하나 이상의 큐에 메시지를 전달한다.
- Fanout : 바인딩 키나 라우팅 키에 상관 없이 모든 연결된 큐에 메세지를 전달한다.
- Header : 토픽 exchange와 유사하며, 라우팅 키 대신 메시지 헤더 값을 기반으로 한다.
- Dead letter: 전달 불가능한 즉, 어떤 exchage-큐 바인딩과도 일치하지 않는 모든 메시지를 보관하는 잡동사니
거래소의 가장 간단한 형태는 default 와 Fanout 이며 이는 JMS 의 큐 및 토픽과 거의 일치 한다.
메시지는 routing key 를 갖고 거래소로 전달되고 큐에서 읽혀져 소비된다. 메시지는 바인딩 정의를 기반으로 거래소로부터 큐로 전달 된다.
8.2.1 RabbitMQ를 셋업
Docker 로 RabbitMQ 설치
docker run -d --hostname my-rabbit --name some-rabbit \
-p 5672:5672 -p 8090:15672 \
-e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=secret \
rabbitmq:3-management
RabbitMQ 의존성 추가
pom.xml 에 스프링부트 AMQP 스타터 의존성 빌드 추가
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
aplication.yml 에 설정 추가
spring
rabbitmq:
host: rabbit.tacocloud.com
port: 5672
username: admin
password: secret
- RabbitMQ 브로커의 위치와 인증 정보를 구성하는 속성
속성 | 설명 |
---|---|
spring.rabbitmq.addresses | 쉼표로 구분된 리스트 형태의 RabbitMQ 브로커 주소 |
spring.rabbitmq.host | 브로커의 호스트 (defalt는 localhost) |
spring.rabbitmq.port | 브로커 포트 (defalt는 5672) |
spring.rabbitmq.username | 브로커 사용자 (선택 속성) |
spring.rabbitmq.password | 브로커 사용자 암호 (선택 속성) |
8.2.2 RabbitTemplate을 사용해서 메시지 전송하기
RabbitTemplate 의 메서드를 이용하요 메시지를 전송 할 수 있다.
// Send raw messages
void send(Message message) throws AmqpException;
void send(String routingKey, Message message) throws AmqpException;
void send(String exchange, String routingKey, Message message)
throws AmqpException;
// Send messages converted from objects
// 객체로부터 변환된 메시지 전송
void convertAndSend(Object message) throws AmqpException;
void convertAndSend(String routingKey, Object message)
throws AmqpException;
void convertAndSend(String exchange, String routingKey, Object message)
throws AmqpException;
// Send messages converted from objects with post-processing
// 객체로부터 변환되고 후처리 되는 메시지 전송
void convertAndSend(Object message, MessagePostProcessor mPP)
throws AmqpException;
void convertAndSend(String routingKey, Object message, MessagePostProcessor messagePostProcessor)
throws AmqpException;
void convertAndSend(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor)
throws AmqpException;
1. RabbitTemplate.send() 로 메시지 전송
rabbitTemplate.send()
로 메시지를 전송한다.
- 기본 exchange : "" , RabbitMQ 브로커가 자동으로 생성하는 기본 exchange 와 일치한다.
- 기본 라우팅 키 : "", 이 경우 거래소와 바인딩에 따라 전달된다.
이 기본값은 spring.rabbitmq.template.exchange
와 spring.rabbitmq.template.routing-key
설정을 변경해서 변경 할 수 있다.
spring:
rabbitmq:
template:
exchange: tacocloud.orders
routing-key: kitchens.central
이 경우 exchange 를 지정하지 않은 모든 메시지는 이름이 tacocloud.orders 인 거래소로 자동 전송된다.
만일 send()
나 convertAndSend()
를 호출할 때 라우팅 키도 지정되지 않으면 해당 메시지는 kitchens.central로 자동 지정된다.
send()
로 메시지 전송하기
@Service
public class RabbitOrderMessagingService implements OrderMessagingService {
private RabbitTemplate rabbitTemplate;
@Autowired
public RabbitOrderMessagingService(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
public void sendOrder(Order order) {
MessageConverter converter = rabbitTemplate.getMessageConverter();
MessageProperties props = new MessageProperties();
Message message = converter.toMessage(order, props);
rabbitTemplate.send("tacocloud.order", message);
}
}
2. rabbitTemplate.convertAndSend() 로 메시지 변환 후 전송하기
convertAndSend()
로 메시지 전송하기
public void sendOrder(Order order) {
rabbitTemplate.convertAndSend("tacocloud.order", order);
}
메시지 변환기 구성하기
기본적으로 메시지 변환은 SimpleMessageConverter 로 수행된다
또한 스프링은 RabbitTemplate에서 사용할 수 있는 여러 개의 메시지 변환기를 제공한다.
- Jackson2JsonMessageConverter
- MarshallingMessageConverter
- SerializerMessageConverter
- SimpleMessageConverter
- ContentTypeDelegatingMessageConverter
- MessagingMessageConverter
다음과 같이 Bean 으로 등록하면 스프링 부트 자동-구성에서 이 Bean을 찾아 기본 메시지 변환기 대신 이 Bean을 RabbitTemplate으로 주입한다.
@Bean
public MessageConverter messageConverter() {
new Jackson2JsonMessageConverter();
}
메시지 속성 설정하기
다음과 같이 헤더를 설정하는 코드를 추가 할 수 있다.
MessageProperties를 직접 사용하는 것이 아니라 MessagePostProcessor에서 해야 한다.
public void sendOrder(Order order) {
System.out.println("### Send Order");
rabbit.convertAndSend("tacocloud.order.queue", order,
new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
MessageProperties props = message.getMessageProperties();
props.setHeader("X_ORDER_SOURCE", "WEB");
return message;
}
});
}
8.2.3 RabbitMQ로부터 메시지 수신하기
RabbitMQ 의 경우에도 다음 두가지 방법이 있다
- RabbitTemplate을 사용해서 큐로부터 메시지를 가지고 온다. (Pull Model)
- @RabbitListener 가 지정된 메서드로 메시지가 푸쉬된다. (Push Model)
1. RabbitTemplate (Pull model) 로 메시지 수신하기
RabbitTemplate 으로 메시지 수신하기
// Receive messages
Message receive() throws AmqpException;
Message receive(String queueName) throws AmqpException;
Message receive(long timeoutMillis) throws AmqpException;
Message receive(String queueName, long timeoutMillis) throws AmqpException;
// Receive objects converted from messages
// 메시지로부터 변환된 객체를 수신한다.
Object receiveAndConvert() throws AmqpException;
Object receiveAndConvert(String queueName) throws AmqpException;
Object receiveAndConvert(long timeoutMillis) throws AmqpException;
Object receiveAndConvert(String queueName, long timeoutMillis) throws AmqpException;
// Receive type-safe objects converted from messages
// 메시지로부터 변환된 Type-safe 객체를 수신한다.
<T> T receiveAndConvert(ParameterizedTypeReference<T> type) throws AmqpException;
<T> T receiveAndConvert(String queueName, ParameterizedTypeReference<T> type) throws AmqpException;
<T> T receiveAndConvert(long timeoutMillis, ParameterizedTypeReference<T> type) throws AmqpException;
<T> T receiveAndConvert(String queueName, long timeoutMillis, ParameterizedTypeReference<T> type)
throws AmqpException;
receive()
와 receiveAndConvert()
는 send()
와 sendAndConvert()
와 대칭된다.receive()
: 큐로부터 원시 Message 객체를 수신한다.receiveAndConvert()
: 메시지를 수신한 뒤 메시지 변환기를 사용하여 수신 메시즈를 도메인 객체로 변환 후 반환한다.
그러나 매개변수에서 차이가 있다. 우선 수신 메서드는 exchange 나 라우팅 키를 매개변수로 가지지 않는다. 왜냐면 exchange 와 라우팅 키는 메시지를 큐로 전달하는데 사용되지만 일단 메시지가 큐에 들어가면 다음 메시지 도착지는 큐로부터 메시지를 소비하는 consumer 이기 때문이다. 따라서 이제 큐만 신경쓰면 된다.
@Service
public class RabbitOrderReceiver implements OrderReceiver {
private RabbitTemplate rabbit;
public RabbitOrderReceiver(RabbitTemplate rabbit) {
this.rabbit = rabbit;
}
public Order receiveOrder() {
return (Order) rabbit.receiveAndConvert("tacocloud.order.queue");
}
}
2. 메시지 리스너 (Push model) 을 이용해서 RabbitMQ 메시지 처리하기
@RabbitListener
어노태이션을 사용하면 메시지가 큐에 도착할 때 메서드가 자동 호출 되도록 할 수 있다.
@Component
public class OrderListener {
private KitchenUI ui;
@Autowired
public OrderListener(KitchenUI ui) {
this.ui = ui;
}
@RabbitListener(queues = "tacocloud.order.queue")
public void receiveOrder(Order order) {
ui.displayOrder(order);
}
}
📌 요약
- 애플리케이션 간 비동기 메시지 큐를 이용한 통신 방식은 간접 계층을 제공하므로 애플리케이션 간의 결합도는 낮추면서 확장성은 높인다
- 스프링은 JMS, RabbitMQ 또는 아파치 카프카를 사용해서 비동기 메시지을 지원한다.
- template-based clients (JmsTemplate, RabbitTemplate, or KafkaTemplate)을 사용해서 메시지 브로커를 통한 메시지 전송을 할 수 있다.
- 메시지 수신 애플리케이션은 같은 템플릿 기반의 클라이언트들을 사용해서 Pull 모델 형태의 메시지 consume 할 수 있다.
- message listener 어노테이션 (
@JmsListener
,@RabbitListener
, or@KafkaListener
) 를 Bean 메서드에 지정하면 푸쉬모델 형태로 컨슈머에게 메시지가 전송될 수 있다.