[스프링 인 액션] 8장 RabbitMQ : 비동기 메시지 전송하기

    RabbitMQ 메시지 전송하기

    💻 실습 : https://github.com/cusbert/spring-in-action-5th

    🎯 이 장에서 배우는 내용

    • 비동기 메시지 전송
    • JMS, RabbitMQ, 카프카 Kafka를 사용해서 메시지 전송하기
    • 브로커에서 메시지 가져오기
    • 메시지 리스닝하기

    8.2 RabbitMQ 와 AMQP 사용하기

    AMQP(Advanced Message Queueing Protocol)의 가장 중요한 구현이라 할 수 있는 RabbitMQ 는 JMS 보다 더 진보된 메시지 라우팅 전략을 제공한다. JMS 메시지가 수신자가 가져갈 메시지 도착지 이름을 주소로 사용하는 반면, AMQP 메시지는 수신자가 리스닝하는 큐와 비리된 거래소(exchange) 이름과 라우팅 키를 주소로 사용한다.

    RabbitMQ exchange 로 전송되는 메시지는 라우팅 키와 바인등을 기반으로 하나 이상의 큐로 전달 된다.

    RabbitMQ 브로커 구조

    Producers -> RabbitMQ Broker [Exchange -- Binding --> Queue] -> Consumers

    exchange 의 종류

    • Default : 브로커가 자동으로 생성하는 특별한 거래소. 메시지의 routing key와 이름이 같은 큐로 메시지를 전달 한다.
    • Direct : 바인딩 키가 메시지의 routing key와 같은 큐에 메시지를 전달한다.
    • Topic : 바인딩 키(와일드카드를 포함하는)가 메시지의 routing key와 일치하는 하나 이상의 큐에 메시지를 전달한다.
    • Fanout : 바인딩 키나 라우팅 키에 상관 없이 모든 연결된 큐에 메세지를 전달한다.
    • Header : 토픽 exchange와 유사하며, 라우팅 키 대신 메시지 헤더 값을 기반으로 한다.
    • Dead letter: 전달 불가능한 즉, 어떤 exchage-큐 바인딩과도 일치하지 않는 모든 메시지를 보관하는 잡동사니

    거래소의 가장 간단한 형태는 default 와 Fanout 이며 이는 JMS 의 큐 및 토픽과 거의 일치 한다.

    메시지는 routing key 를 갖고 거래소로 전달되고 큐에서 읽혀져 소비된다. 메시지는 바인딩 정의를 기반으로 거래소로부터 큐로 전달 된다.

    8.2.1 RabbitMQ를 셋업

    Docker 로 RabbitMQ 설치

    docker run -d --hostname my-rabbit --name some-rabbit \
    -p 5672:5672 -p 8090:15672 \
    -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=secret \
    rabbitmq:3-management

    RabbitMQ 의존성 추가

    pom.xml 에 스프링부트 AMQP 스타터 의존성 빌드 추가

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>

    aplication.yml 에 설정 추가

    spring
        rabbitmq:
            host: rabbit.tacocloud.com
            port: 5672
            username: admin
            password: secret
    • RabbitMQ 브로커의 위치와 인증 정보를 구성하는 속성
    속성 설명
    spring.rabbitmq.addresses 쉼표로 구분된 리스트 형태의 RabbitMQ 브로커 주소
    spring.rabbitmq.host 브로커의 호스트 (defalt는 localhost)
    spring.rabbitmq.port 브로커 포트 (defalt는 5672)
    spring.rabbitmq.username 브로커 사용자 (선택 속성)
    spring.rabbitmq.password 브로커 사용자 암호 (선택 속성)

    8.2.2 RabbitTemplate을 사용해서 메시지 전송하기

    RabbitTemplate 의 메서드를 이용하요 메시지를 전송 할 수 있다.

    // Send raw messages
    void send(Message message) throws AmqpException;
    void send(String routingKey, Message message) throws AmqpException;
    void send(String exchange, String routingKey, Message message)
                throws AmqpException;
    // Send messages converted from objects
    // 객체로부터 변환된 메시지 전송
    void convertAndSend(Object message) throws AmqpException;
    void convertAndSend(String routingKey, Object message)
                                throws AmqpException;
    void convertAndSend(String exchange, String routingKey, Object message)  
                                throws AmqpException;
    // Send messages converted from objects with post-processing
    // 객체로부터 변환되고 후처리 되는 메시지 전송
    void convertAndSend(Object message, MessagePostProcessor mPP)
                            throws AmqpException;
    void convertAndSend(String routingKey, Object message, MessagePostProcessor messagePostProcessor)
                            throws AmqpException;
    void convertAndSend(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor)
                            throws AmqpException;

    1. RabbitTemplate.send() 로 메시지 전송

    rabbitTemplate.send() 로 메시지를 전송한다.

    • 기본 exchange : "" , RabbitMQ 브로커가 자동으로 생성하는 기본 exchange 와 일치한다.
    • 기본 라우팅 키 : "", 이 경우 거래소와 바인딩에 따라 전달된다.

    이 기본값은 spring.rabbitmq.template.exchangespring.rabbitmq.template.routing-key 설정을 변경해서 변경 할 수 있다.

    spring:
        rabbitmq:
            template:
            exchange: tacocloud.orders
            routing-key: kitchens.central

    이 경우 exchange 를 지정하지 않은 모든 메시지는 이름이 tacocloud.orders 인 거래소로 자동 전송된다.
    만일 send()convertAndSend() 를 호출할 때 라우팅 키도 지정되지 않으면 해당 메시지는 kitchens.central로 자동 지정된다.

    send() 로 메시지 전송하기

    @Service
    public class RabbitOrderMessagingService implements OrderMessagingService {
        private RabbitTemplate rabbitTemplate;
    
        @Autowired
        public RabbitOrderMessagingService(RabbitTemplate rabbitTemplate) {
            this.rabbitTemplate = rabbitTemplate;
        }
        public void sendOrder(Order order) {
            MessageConverter converter = rabbitTemplate.getMessageConverter();
            MessageProperties props = new MessageProperties();
            Message message = converter.toMessage(order, props);
            rabbitTemplate.send("tacocloud.order", message);
        }
    }

    2. rabbitTemplate.convertAndSend() 로 메시지 변환 후 전송하기

    convertAndSend() 로 메시지 전송하기

    public void sendOrder(Order order) {
        rabbitTemplate.convertAndSend("tacocloud.order", order);
    }
    메시지 변환기 구성하기

    기본적으로 메시지 변환은 SimpleMessageConverter 로 수행된다
    또한 스프링은 RabbitTemplate에서 사용할 수 있는 여러 개의 메시지 변환기를 제공한다.

    • Jackson2JsonMessageConverter
    • MarshallingMessageConverter
    • SerializerMessageConverter
    • SimpleMessageConverter
    • ContentTypeDelegatingMessageConverter
    • MessagingMessageConverter

    다음과 같이 Bean 으로 등록하면 스프링 부트 자동-구성에서 이 Bean을 찾아 기본 메시지 변환기 대신 이 Bean을 RabbitTemplate으로 주입한다.

    @Bean
    public MessageConverter messageConverter() {
         new Jackson2JsonMessageConverter();
    }

    메시지 속성 설정하기

    다음과 같이 헤더를 설정하는 코드를 추가 할 수 있다.
    MessageProperties를 직접 사용하는 것이 아니라 MessagePostProcessor에서 해야 한다.

    public void sendOrder(Order order) {
        System.out.println("### Send Order");
        rabbit.convertAndSend("tacocloud.order.queue", order,
                new MessagePostProcessor() {
                    @Override
                    public Message postProcessMessage(Message message) throws AmqpException {
                        MessageProperties props = message.getMessageProperties();
                        props.setHeader("X_ORDER_SOURCE", "WEB");
                        return message;
                    }
                });
    }

    8.2.3 RabbitMQ로부터 메시지 수신하기

    RabbitMQ 의 경우에도 다음 두가지 방법이 있다

    • RabbitTemplate을 사용해서 큐로부터 메시지를 가지고 온다. (Pull Model)
    • @RabbitListener 가 지정된 메서드로 메시지가 푸쉬된다. (Push Model)

    1. RabbitTemplate (Pull model) 로 메시지 수신하기

    RabbitTemplate 으로 메시지 수신하기

    // Receive messages
    Message receive() throws AmqpException;
    Message receive(String queueName) throws AmqpException;
    Message receive(long timeoutMillis) throws AmqpException;
    Message receive(String queueName, long timeoutMillis) throws AmqpException;
    // Receive objects converted from messages
    // 메시지로부터 변환된 객체를 수신한다.
    Object receiveAndConvert() throws AmqpException;
    Object receiveAndConvert(String queueName) throws AmqpException;
    Object receiveAndConvert(long timeoutMillis) throws AmqpException;
    Object receiveAndConvert(String queueName, long timeoutMillis) throws AmqpException;
    // Receive type-safe objects converted from messages
    // 메시지로부터 변환된 Type-safe 객체를 수신한다.
    <T> T receiveAndConvert(ParameterizedTypeReference<T> type) throws AmqpException;
    <T> T receiveAndConvert(String queueName, ParameterizedTypeReference<T> type) throws AmqpException;
    <T> T receiveAndConvert(long timeoutMillis, ParameterizedTypeReference<T> type) throws AmqpException;
    <T> T receiveAndConvert(String queueName, long timeoutMillis, ParameterizedTypeReference<T> type)
                            throws AmqpException;

    receive()receiveAndConvert()send()sendAndConvert()와 대칭된다.
    receive() : 큐로부터 원시 Message 객체를 수신한다.
    receiveAndConvert(): 메시지를 수신한 뒤 메시지 변환기를 사용하여 수신 메시즈를 도메인 객체로 변환 후 반환한다.

    그러나 매개변수에서 차이가 있다. 우선 수신 메서드는 exchange 나 라우팅 키를 매개변수로 가지지 않는다. 왜냐면 exchange 와 라우팅 키는 메시지를 큐로 전달하는데 사용되지만 일단 메시지가 큐에 들어가면 다음 메시지 도착지는 큐로부터 메시지를 소비하는 consumer 이기 때문이다. 따라서 이제 큐만 신경쓰면 된다.

    @Service
    public class RabbitOrderReceiver implements OrderReceiver {
    
        private RabbitTemplate rabbit;
    
        public RabbitOrderReceiver(RabbitTemplate rabbit) {
            this.rabbit = rabbit;
        }
    
        public Order receiveOrder() {
            return (Order) rabbit.receiveAndConvert("tacocloud.order.queue");
        } 
    }

    2. 메시지 리스너 (Push model) 을 이용해서 RabbitMQ 메시지 처리하기

    @RabbitListener 어노태이션을 사용하면 메시지가 큐에 도착할 때 메서드가 자동 호출 되도록 할 수 있다.

    @Component
    public class OrderListener {
    
        private KitchenUI ui;
    
        @Autowired
        public OrderListener(KitchenUI ui) {
        this.ui = ui;
        }
    
        @RabbitListener(queues = "tacocloud.order.queue")
        public void receiveOrder(Order order) {
        ui.displayOrder(order);
        }
    }

    📌 요약

    • 애플리케이션 간 비동기 메시지 큐를 이용한 통신 방식은 간접 계층을 제공하므로 애플리케이션 간의 결합도는 낮추면서 확장성은 높인다
    • 스프링은 JMS, RabbitMQ 또는 아파치 카프카를 사용해서 비동기 메시지을 지원한다.
    • template-based clients (JmsTemplate, RabbitTemplate, or KafkaTemplate)을 사용해서 메시지 브로커를 통한 메시지 전송을 할 수 있다.
    • 메시지 수신 애플리케이션은 같은 템플릿 기반의 클라이언트들을 사용해서 Pull 모델 형태의 메시지 consume 할 수 있다.
    • message listener 어노테이션 (@JmsListener, @RabbitListener, or @KafkaListener) 를 Bean 메서드에 지정하면 푸쉬모델 형태로 컨슈머에게 메시지가 전송될 수 있다.

    참고

    반응형

    댓글

    Designed by JB FACTORY