[스프링 인 액션] 8장 Kafka : 비동기 메시지 전송하기

    Kafka 로 메시지 전송하기

    💻 실습 : https://github.com/cusbert/spring-in-action-5th

    🎯 이 장에서 배우는 내용

    • 비동기 메시지 전송
    • JMS, RabbitMQ, 카프카 Kafka를 사용해서 메시지 전송하기
    • 브로커에서 메시지 가져오기
    • 메시지 리스닝하기

    8.3 카프카 사용하기

    카프카는 확장성을 제공하는 cluster 로 실행되도록 설계되었다.
    클러스터의 모든 카프카 인스턴스에 걸쳐 토픽(topic) 을 파티션(Partition) 으로 분할하여 메시지를 관리한다.
    RabbitMQ 가 exchange 와 큐를 사용해서 메시지를 처리하는 반면 카프카는 토픽만 사용한다.

    카프카의 토픽은 클러스터의 모든 브로커에 걸쳐 복제된다. (replicated)
    클러스터의 각 노드는 하나 이상의 토픽에 대한 리더(leader) 로 동작하며, 토픽 데이터를 관리하고 클러스터의 다른 노드로 데이터를 복제한다.
    각 토픽은 여러개의 파티션으로 분할 될 수 있다. 이 경우 클러스터의 각 노드는 한 토픽의 하나 이상의 파티션(토픽 전체가 아닌) 의 리더가 된다.

    카프카 클러스터는 여러 개의 브로커로 구성되며 각 브로커는 토픽의 파티션 리더로 동작한다.

    8.3.0 카프카 설치하기

    docoker 로 카프카 설치
    https://www.baeldung.com/ops/kafka-docker-setup

    8.3.1 카프카 사용을 위해 스프링 설정하기

    pom.xml 에 카프카 의존성 추가

    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>

    KaftaTemplate 은 기본적으로 localhost에서 실행되면서 9092 포트를 리스닝 하는 카프카 브로커를 사용한다. 실무 환경으로 이양할 때는 다른 호스트와 포트로 구성해야한다.

    • spring.kafka.bootstrap-servers : 카프카 클러스터로의 초기 연결에 사용되는 복수개의 카프카 서버들의 위치 설정

    aplication.yml 에 설정 추가

    spring:
        kafka:
        bootstrap-servers:
        - kafka.tacocloud.com:9092
        - kafka.tacocloud.com:9093
        - kafka.tacocloud.com:9094

    8.3.2 KafkaTemplate을 사용해서 메시지 전송하기

    ListenableFuture<SendResult<K, V>> send(String topic, V data);
    ListenableFuture<SendResult<K, V>> send(String topic, K key, V data);
    ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);
    ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);
    ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);
    ListenableFuture<SendResult<K, V>> send(Message<?> message);
    ListenableFuture<SendResult<K, V>> sendDefault(V data);
    ListenableFuture<SendResult<K, V>> sendDefault(K key, V data);
    ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);
    ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);

    카프카에서 메시지를 전송할 때는 메시지가 전송되는 방법을 알려주는 다음 매개변수를 지정할 수 있다.

    • 메시지가 전송될 토픽 (Required send() )
    • payload(Required, 메시지에 적재된 순수한 데이터)
    • 토픽 데이터를 쓰는 파티션(Optional)
    • 레코드 전송(Optional)
    • 타입스탬프(Optional,defaults to System.currentTimeMillis())

    topic 과 payload 는 가장 중요한 매개변수들이다. 파티션과 키는 send() 와 sendDefault()에 매개변수로 제공되는 추가 정보일 뿐이다.

    KafkaTemplate 를 이용해서 주문 데이터 전송하기

    @Service
    public class KafkaOrderMessagingService implements OrderMessagingService {
        private KafkaTemplate<String, Order> kafkaTemplate;
    
        @Autowired
        public KafkaOrderMessagingService(
                KafkaTemplate<String, Order> kafkaTemplate) {
            this.kafkaTemplate = kafkaTemplate;
        }
    
        @Override
        public void sendOrder(Order order) {
            kafkaTemplate.send("tacocloud.orders.topic", order);
        }
    }

    spring.kafka.template.default-topic을 지정 후 sendDefault() 로 데이터 전송하기
    이 때는 토픽 이름을 인자로 지정하지 않아도 된다.

    spring:
        kafka:
            template:
                default-topic: tacocloud.orders.topic
    @Override
    public void sendOrder(Order order) {
        kafkaTemplate.sendDefault(order);
    }

    8.3.3 카프카 리스너 작성하기

    kafkaTemplate 은 메시지를 수신하는 메서드를 제공하지 않는다. 이는 JmsTemplate 이나 RabbitTemplate 과 다른 점이다. 따라서 스프링을 사용해서 카프카 토픽의 메시지를 가져오는 유일한 방법은 메시지 리스너를 작성하는 것이다.

    카프카의 메시지 리스너는 @KafkaListener 어노테이션이 지정된 메서드에 지정된다.

    @KafkaListener 를 사용해서 데이터 수신하기

    @Component
    @Slf4j
    public class OrderListener {
    
        private KitchenUI kitchenUI;
    
        @Autowired
        public OrderListener(KitchenUI kitchenUI) {
            this.kitchenUI = kitchenUI;
        }
    
        @KafkaListener(topics = "tacocloud.orders.topic")
        public void handle(Order order) {
            kitchenUI.displayOrder(order);
        }
    
        // record 부가정보 받기
        @KafkaListener(topics="tacocloud.orders.topic")
        public void handle(Order order, ConsumerRecord<String, Order> record) {
    
            log.info("Received from partition {} with timestamp {}",
                    record.partition(), record.timestamp());
    
            kitchenUI.displayOrder(order);
        }
    
        // Message 부가정보 받기
        @KafkaListener(topics = "tacocloud.orders.topic")
        public void handle(Order order, Message<Order> message) {
    
            MessageHeaders headers = message.getHeaders();
            log.info("Received from partition {} with timestamp {}",
                    headers.get(KafkaHeaders.RECEIVED_PARTITION_ID),
                    headers.get(KafkaHeaders.RECEIVED_TIMESTAMP));
    
            kitchenUI.displayOrder(order);
        }
    }
    • @KafkaListener(topics="tacocloud.orders.topic") : tacocloud.orders.topic 이라는 이름의 topic 에 메시지가 도착할 때 자동으로 호출된다.
    • payload 인 Order 객체를 handle() 메소드의 인자로 받는다
    • 추가적인 메타데이터가 필요하면 ConsumerRecord 나 Message 객체도 받을 수 있다.

    메시지 페이로드는 ConsumerRecord.value()Message.getPayload()를 사용해서도 받을 수 있다. 즉 handle()의 매개 변수로 직접 Order 객체를 요청하는 대신 ConsumerRecord 나 Message 객체를 통해 Order 객체를 요청할 수 있다.

    📌 요약

    • 애플리케이션 간 비동기 메시지 큐를 이용한 통신 방식은 간접 계층을 제공하므로 애플리케이션 간의 결합도는 낮추면서 확장성은 높인다
    • 스프링은 JMS, RabbitMQ 또는 아파치 카프카를 사용해서 비동기 메시지을 지원한다.
    • template-based clients (JmsTemplate, RabbitTemplate, or KafkaTemplate)을 사용해서 메시지 브로커를 통한 메시지 전송을 할 수 있다.
    • 메시지 수신 애플리케이션은 같은 템플릿 기반의 클라이언트들을 사용해서 Pull 모델 형태의 메시지 consume 할 수 있다.
    • message listener 어노테이션 (@JmsListener, @RabbitListener, or @KafkaListener) 를 Bean 메서드에 지정하면 푸쉬모델 형태로 컨슈머에게 메시지가 전송될 수 있다.

    참고

    반응형

    댓글

    Designed by JB FACTORY