[스프링 인 액션] 10장 Reactor: 리액터 개요
- 📕 Book/스프링 인 액션
- 2021. 8. 11.
10장 리액티브 프로그래밍 & 리액터
💻 실습 : https://github.com/cusbert/spring-in-action-5th
🎯 이 장에서 배우는 내용
- 리액티브 프로그래밍 Reactive Programming 이해하기
- 프로젝트 리액터 Project Reactor
- 리액티브 데이터 오퍼레이션 Operating on data reactively
왜 리액티브 프로그램을 쓰는가?
신문을 구독받는다고 가정해보자. 1년 치 구독료를 지불하였는데 배달이 오지 않고 1년 치 신문이 모두 준비되면 그제야 배달을 시작한다면 어떻게 될까? 당연히 말이 되지 않는다.
실제로는 기사가 최신일 때 독자가 읽을 수 있도록 출간 후 가능한 빨리 배달된다. 또한 독자가 기사를 읽는 동안 기자는 새로운 다음 기사를 작성한다. 이 모든 것은 병행으로 진행된다.
애플리케이션 코드를 개발할 때는 명령형(imperative)와 리액티브(reactive 반응형) 두 가지 형태의 코드를 작성할 수 있다.
1) 명령형 코드
순차적으로 연속 되는 작업
각 작업은 한번에 하나씩 그리고 이전 작업 다음에 실행된다.
데이터는 모아서 처리되고 이전 작업이 데이터 처리를 끝낸후에 다음 작업으로 넘어간다.
자바를 비롯해서 대부분의 언어는 동시 프로그래밍(concurrent programming)을 지원한다. 자바에선 스레드가 한 작업을 진행하는 동안 이 스레드에서 다른 스레드를 시작시키고 작업을 수행하게 할 수 있다. 그러나 스레드를 생성하는 것은 쉬워도 스레드는 언떤 이유로는 결국 차단된다. 또한 다중 스레드로 동시성을 관리하는 것은 쉽지 않다.
2) 리액티브 코드
병렬로 일련의 작업들이 진행된다.
각 작업은 부분 집합의 데이터를 처리할 수 있다.
처리가 끝난 데이터를 다음 작업에 넘겨주고 다른 부분 집합의 데이터로 계속 작업 할 수 있다.
10.1 리액티브 프로그래밍 이해하기
리액티브 프로그래밍이란 비동기 데이터 스트림을 사용하는 프로그래밍 패러다임이다.
리액티브 프로그래밍은 본질적으로 함수적이면서 선언적이다. 즉 순차적 작업이 아니라 데이터가 흘러가는 pipeline 이나 stream 을 포함한다. 이런 리액티브 스트림은 데이터 전체를 사용할 수 있을 때까지 기다리지 않고 사용 가능한 데이터가 있을 때마다 처리되므로 사실상 입력되는 데이터는 무한하다.
즉 동시에 여러 작업을 수행하여 더 큰 확장성을 얻게 해 준다.
ex)
명령형 프로그래밍 - 물풍선
리액티브 프로그래밍 - 정원용 호수
그러나 리액티브 프로그래밍이 만능은 아니다. 상황에 알맞게 쓰자.
10.1.1 리액티브 스트림 정의하기
리액티브 프로그래밍은 비동기이다. 즉 동시에 여러 작업을 수행하여 큰 확장성을 얻는다.
백 프레셔는 데이터를 소비하는 컨슈머가 처리할 수 있는 만큼만 전달 데이터를 제한하여 지나치게 빠른 데이터 소스로부터 데이터 전달 폭주를 방지한다.
리액티브 스트림의 인터페이스
- Publisher 발행자
- Subscriber 구독자
- Subscription 구독
- Processor 프로세서
Publisher
publisher 는 하나의 Subscription 당 하나의 Subscriber 에 발행(전송)하는 데이터를 생성한다.
public interface Publisher<T> {
void subscribe(Subscriber<? super T> subscriber);
}
Subscriber
Subscriber가 구독 신청되면 Publisher 로부터 이벤트를 수신할 수 있다.
public interface Subscriber<T> {
void onSubscribe(Subscription sub); // Subscriber가 구독할 첫 이벤트를 호출
void onNext(T item); // publisher가 전송하는 데이터가 Subscriber에게 전달
void onError(Throwable ex); // 에러 발생시 호출됨
void onComplete(); // publisher가 Subscriber 에게 작업 종료를 공지
}
Subscription
Subscriber 는 Subscription 객체를 통해서 구독을 관리할 수 있다.
public interface Subscription {
void request(long n); // 전송되는 데이터 요청, n은 백프레셔 즉 데이터 항목수
void cancel(); // 구독 취소 요청
}
Processor
Processor 인터페이스는 Subscriber와 Publisher 를 결합한 것이다.
public interface Processor<T, R>
extends Subscriber<T>, Publisher<R> {}
10.2 리액터 시작하기
리액터란
리액터란 리액티브 스트림을 구현하는 라이브러리이며 Flux 와 Mono 두가지 타입으로 스트림을 정의한다.
Reactor is the reactive library of choice for Spring WebFlux. It provides the Mono and Flux API types to work on data sequences of 0..1 (Mono) and 0..N (Flux)
명령형 vs 리액티브
1) 명령형 프로그래밍 모델
String name = "Craig";
String capitalName = name.toUpperCase();
String greeting = "Hello, " + capitalName + "!";
System.out.println(greeting);
각 줄의 코드가 같은 스레드에서 한 단씩 차례대로 실행된다.
2) 리액티브 프로그래밍 모델
Mono.just("Craig")
.map(n -> n.toUpperCase())
.map(cn -> "Hello, " + cn + "!")
.subscribe(System.out::println);
일련의 작업 단계를 기술하는 것이 아니라 데이터가 전달될 파이프 라인을 구성하는 것이다. 이 파이프라인을 통해 데이터가 전달되는 동안 어떤 형태로든 변경 또는 사용될 수 있다.
10.2.1 리액티브 플로우의 다이어그램
1) Flux
- 0, 1 또는 다수의 (무한일 수 있는) 데이터를 갖는 파이프라인
2) Mono
- 하나의 데이터 항목만 갖는 데이터셋
리액터 vs RxJava(ReactiveX)
Mono, Flux 가 Observable, Single 과 비슷하다.
10.2.2 리액터 의존성 추가하기
아래 의존성을 pom.xml 에 추가
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>
테스트 모듈
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
10.3 리액티브 오퍼레이션 적용하기
Flux 와 Mono 는 리액터가 제공하는 가장 핵심적인 구성요소다. 그리고 Flux 와 Mono 가 제공하는 오퍼레이션들은 두 타입을 함께 결합하여 데이터가 전달될 수 있는 파이프라인을 생성한다.
1) 생성 creation 오퍼레이션just()
fromArray()
fromIterable()
fromStream()
interval()
range()
2) 조합 combination 오퍼레이션mergeWith()
zip()
3) 변환 transformation 오퍼레이션skip()
take()
filter()
distinct()
map()
flatMap()
4) 로직 logic 오퍼레이션all()
any()
10.3.1 리액티브 타입 생성하기
리액티브 타입을 사용할 때는 repository 나 service 로부터 Flux 나 Mono가 제공 되므로 우리의 리액티브 타입을 생성할 필요가 없다. 그러나 데이터를 발생(방출)하는 새로운 리액티브 publisher 를 생성해야 할 때가 있다.
Flux 나 Mono 를 생성하는 오퍼레이션을 알아보자.
just()
Flux 나 Mono의 just()
메서드를 사용하여 객체로부터 리액티브 타입을 생성할 수 있다.
@Test
public void createAFlux_just() {
Flux<String> fruitFlux = Flux
.just("Apple", "Orange", "Grape", "Banana", "Strawberry");
StepVerifier.create(fruitFlux)
.expectNext("Apple")
.expectNext("Orange")
.expectNext("Grape")
.expectNext("Banana")
.expectNext("Strawberry")
.verifyComplete();
}
fromArray()
배열로부터 Flux를 생성하려면 static 메서드인 fromArray()
를 호출하며, 이때 소스 배열을 인자로 전달한다.
@Test
public void createAFlux_fromArray() {
String[] fruits = new String[] {"Apple", "Orange", "Grape", "Banana", "Strawberry" };
Flux<String> fruitFlux = Flux.fromArray(fruits);
// Verify flux
StepVerifier.create(fruitFlux)
.expectNext("Apple")
.expectNext("Orange")
.expectNext("Grape")
.expectNext("Banana")
.expectNext("Strawberry")
.verifyComplete();
}
fromIterable()
java.util.List, java.util.Set 또는 java.lang.Iterable 로부터 Flux를 생성해야 한다면 해당 컬렉션을 인자로 전달하여 fromIterable()
을 호출하면 된다.
@Test
public void createAFlux_fromIterable() {
List<String> fruitList = new ArrayList<>();
fruitList.add("Apple");
fruitList.add("Orange");
fruitList.add("Grape");
fruitList.add("Banana");
fruitList.add("Strawberry");
Flux<String> fruitFlux = Flux.fromIterable(fruitList);
}
fromStream()
Flux를 생성하는 소스로 자바 Stream 객체를 사용해야 한다면 fromStream()
을 호출하면 된다.
@Test
public void createAFlux_fromStream() {
Stream<String> fruitStream =
Stream.of("Apple", "Orange", "Grape", "Banana", "Strawberry");
Flux<String> fruitFlux = Flux.fromStream(fruitStream);
}
range()
데이터 없이 매번 새 값으로 숫자를 방출하는 카운터 역할의 flux를 생성할 수 있다.
@Test
public void createAFlux_range() {
// 1 ~ 5 까지의 값을 포함하는 카운터 flux 생성
Flux<Integer> intervalFlux = Flux.range(1, 5);
// verify
StepVerifier.create(intervalFlux)
.expectNext(1)
.expectNext(2)
.expectNext(3)
.expectNext(4)
.expectNext(5)
.verifyComplete();
}
interval()
interval()
는 시작 값과 종료 값 대신 값이 방출되는 시간 간격이나 주기를 설정한다.
@Test
public void createAFlux_interval() {
Flux<Long> intervalFlux =
Flux.interval(Duration.ofSeconds(1))
.take(5); // 첫번째 3개 항목으로 결과 제한
StepVerifier.create(intervalFlux)
.expectNext(0L)
.expectNext(1L)
.expectNext(2L)
.expectNext(3L)
.expectNext(4L)
.verifyComplete();
}
10.3.2 리액티브 타입 조합하기
리액터의 Flux나 Mono 를 결합하거나 분할하는 오퍼레이션을 알아보자
mergerWith()
두 Flux 객체가 결합되면 하나의 Flux가 새로 생성된다. 그리고 mergedFlux를 StepVerifier가 구독할 때 데이터의 흐름이 시작되면서 두 소스 Flux 스트림을 번갈아 구독하게 된다.
mergedFlux로부터 방출되는 항목 순서는 두 Flux 로 방출되는 시간에 맞추어 결정된다. 여기서는 두 Flux 객체 모두 일정한 속도로 방출되게 설정되었으므로 번갈아 mergedFlux에 끼워진다.
@Test
public void mergeFluxes() {
Flux<String> characterFlux = Flux
.just("Tony", "Steve", "Thor")
.delayElements(Duration.ofMillis(500)); // 500ms 마다 데이터 방출
Flux<String> foodFlux = Flux
.just("Lasagna", "Lollipops", "Apples")
.delaySubscription(Duration.ofMillis(250)) // foodFlux는 characterFlux 250ms 다음에 시작
.delayElements(Duration.ofMillis(500)); // 500ms 마다 데이터 방출
Flux<String> mergedFlux = characterFlux.mergeWith(foodFlux); // 결합
StepVerifier.create(mergedFlux)
.expectNext("Tony")
.expectNext("Lasagna")
.expectNext("Steve")
.expectNext("Lollipops")
.expectNext("Thor")
.expectNext("Apples")
.verifyComplete();
}
zip()
zip()
은 각 Flux 소스로부터 한 항목식 번갈아 가져와 새로운 Flux를 생성한다.
zippedFlux로 방출되는 각 항목은 zippedFlux(두 개의 다른 객체를 전달하는 컨테이너 객체)이며 각 소스 flux가 순서대로 방출하는 항목을 포함한다.zip()
을 사용하여 두 개의 입력 Flux 요소로부터 생성된 메시지를 포함하는 flux 를 생성할 수 있다.
@Test
public void zipFluxes() {
Flux<String> characterFlux = Flux
.just("Tony", "Steve", "Thor");
Flux<String> foodFlux = Flux
.just("Lasagna", "Lollipops", "Apples");
Flux<Tuple2<String, String>> zippedFlux =
Flux.zip(characterFlux, foodFlux);
StepVerifier.create(zippedFlux)
.expectNextMatches(p ->
p.getT1().equals("Tony") &&
p.getT2().equals("Lasagna"))
.expectNextMatches(p ->
p.getT1().equals("Steve") &&
p.getT2().equals("Lollipops"))
.expectNextMatches(p ->
p.getT1().equals("Thor") &&
p.getT2().equals("Apples"))
.verifyComplete();
}
@Test
public void zipFluxesToObject() {
Flux<String> characterFlux = Flux
.just("Tony", "Steve", "Thor");
Flux<String> foodFlux = Flux
.just("Lasagna", "Lollipops", "Apples");
Flux<String> zippedFlux =
Flux.zip(characterFlux, foodFlux, (c, f) -> c + " eats " + f);
StepVerifier.create(zippedFlux)
.expectNext("Tony eats Lasagna")
.expectNext("Steve eats Lollipops")
.expectNext("Thor eats Apples")
.verifyComplete();
}
first()
first()
오퍼레이션은 먼저 값을 방출하는 소스 flux를 선택하여 메시지로 발행한다.
느린 flux는 무시하고 빠른 flux(fastFlux) 값만 발행한다.
@Test
public void firstFlux() {
Flux<String> slowFlux = Flux
.just("tortoise", "snail", "sloth")
.delaySubscription(Duration.ofMillis(100));
Flux<String> fastFlux = Flux
.just("hare", "cheetah", "squirrel");
Flux<String> firstFlux = Flux
.first(slowFlux, fastFlux); // 느린 flux는 무시하고 빠른 flux만 발행
StepVerifier.create(firstFlux)
.expectNext("hare")
.expectNext("cheetah")
.expectNext("squirrel")
.verifyComplete();
}
10.3.3 리액티브 스트링 변환과 필터링
skip()
skip()
은 지정된 수의 메시지를 건너뛴 후에 나머지 메시지를 결과 Flux로 전달한다.skip()
은 지정된 시간이 경과할 때 까지 기다렸다가 결과 flux 로 메시지를 전달한다.
@Test
public void skipAFew() {
Flux<String> countFlux = Flux.just(
"one", "two", "skip a few", "ninety nine", "one hundred")
.skip(3); // 처음 3개 항목을 띄어 넘음
StepVerifier.create(countFlux)
.expectNext("ninety nine", "one hundred")
.verifyComplete();
}
@Test
public void skipAFewSeconds() {
Flux<String> countFlux = Flux.just(
"one", "two", "skip a few", "ninety nine", "one hundred")
.delayElements(Duration.ofSeconds(1)) // 항목 간 1초 동안 지연
.skip(Duration.ofSeconds(4)); // 4초 동안 기다렸다 방출
StepVerifier.create(countFlux)
.expectNext("ninety nine", "one hundred")
.verifyComplete();
}
take()
take()
은 입력 flux로 부터 처음 지정된 수의 메시지만 전달하고 구독을 취소 시킨다.
@Test
public void take() {
Flux<String> nationalParkFlux = Flux.just(
"Yellowstone", "Yosemite", "Grand Canyon", "Zion", "Acadia")
.take(3); // 3개만 방출
StepVerifier.create(nationalParkFlux)
.expectNext("Yellowstone", "Yosemite", "Grand Canyon")
.verifyComplete();
}
@Test
public void takeForAwhile() {
Flux<String> nationalParkFlux = Flux.just(
"Yellowstone", "Yosemite", "Grand Canyon", "Zion", "Grand Teton")
.delayElements(Duration.ofSeconds(1))
.take(Duration.ofMillis(3500)); //3.5 초 동안만 방출
StepVerifier.create(nationalParkFlux)
.expectNext("Yellowstone", "Yosemite", "Grand Canyon")
.verifyComplete();
}
filter()
filter()
로 지정된 조건식에 일치되는 메시지만 결과 Flux가 수신하도록 입력 flux를 필터링 할 수 있다.
@Test
public void filter() {
// 공백 있는 메시지 필터링
Flux<String> nationalParkFlux = Flux.just(
"Yellowstone", "Yosemite", "Grand Canyon", "Zion", "Grand Teton")
.filter(np -> !np.contains(" "));
StepVerifier.create(nationalParkFlux)
.expectNext("Yellowstone", "Yosemite", "Zion")
.verifyComplete();
}
distinct()
distinct()
는 중복 메시지를 걸러낸다.
@Test
public void distinct() {
Flux<String> animalFlux = Flux.just(
"dog", "cat", "bird", "dog", "bird", "anteater")
.distinct(); // 중복 필터링
StepVerifier.create(animalFlux)
.expectNext("dog", "cat", "bird", "anteater")
.verifyComplete();
}
map()
map()
은 입력 메시지의 변환을 수행하여 결과 스트림의 새로운 메시지로 발행한다.
@Test
public void map() {
Flux<Player> playerFlux = Flux
.just("Michael Jordan", "Scottie Pippen", "Steve Kerr")
.map(n -> {
// map()으로 하여 지정된 함수 수행
// 공백 기준으로 분리 하여 배열에 넣고 player 객체 생성
String[] split = n.split("\\s");
return new Player(split[0], split[1]);
});
StepVerifier.create(playerFlux)
.expectNext(new Player("Michael", "Jordan"))
.expectNext(new Player("Scottie", "Pippen"))
.expectNext(new Player("Steve", "Kerr"))
.verifyComplete();
}
flatMap()
flatMap()
오퍼레이션은 수행 도중 생성되는 임시 Flux 를 사용하여 변환을 수행하므로 비동기 변환이 가능하다.subscribe()
: 리액티브 플로우를 구독 요청하고 실제로 구독subscribeOn()
: 구독이 동시적으로 처리
@Test
public void flatMap() {
Flux<Player> playerFlux = Flux
.just("Michael Jordan", "Scottie Pippen", "Steve Kerr")
.flatMap(n -> Mono.just(n)
.map(p -> {
String[] split = p.split("\\s");
return new Player(split[0], split[1]);
})
.subscribeOn(Schedulers.parallel()) // 구독을 병렬 스레드로 수행
);
List<Player> playerList = Arrays.asList(
new Player("Michael", "Jordan"),
new Player("Scottie", "Pippen"),
new Player("Steve", "Kerr"));
StepVerifier.create(playerFlux)
.expectNextMatches(p -> playerList.contains(p))
.expectNextMatches(p -> playerList.contains(p))
.expectNextMatches(p -> playerList.contains(p))
.verifyComplete();
}
buffer()
buffer()
는 지정된 최대 크기의 리스트(입력 Flux로부터 수집된)로 된 flux를 생성한다.
@Test
public void buffer() {
Flux<String> fruitFlux = Flux.just(
"apple", "orange", "banana", "kiwi", "strawberry");
Flux<List<String>> bufferedFlux = fruitFlux.buffer(3); // 각각 3개 미만을 포함하도록함
StepVerifier
.create(bufferedFlux)
.expectNext(Arrays.asList("apple", "orange", "banana"))
.expectNext(Arrays.asList("kiwi", "strawberry"))
.verifyComplete();
}
@Test
public void bufferAndFlatMap() throws Exception {
Flux.just(
"apple", "orange", "banana", "kiwi", "strawberry")
.buffer(3) // 각각 3개 미만을 포함하도록함
.flatMap(x ->
Flux.fromIterable(x)
.map(y -> y.toUpperCase())
.subscribeOn(Schedulers.parallel())
.log()
).subscribe();
}
collectionList()
collectionList()
는 입력 Flux가 방출한 모든 메시지를 갖는 List의 Mono를 생성한다.
@Test
public void collectList() {
Flux<String> fruitFlux = Flux.just(
"apple", "orange", "banana", "kiwi", "strawberry");
Mono<List<String>> fruitListMono = fruitFlux.collectList();
StepVerifier
.create(fruitListMono)
.expectNext(Arrays.asList(
"apple", "orange", "banana", "kiwi", "strawberry"))
.verifyComplete();
}
collectMap()
collectMap()
은 Map을 포함하는 Mono를 생성한다. 이때 입력 Flux가 방출한 메시지가 해당 Map의 항목으로 저장되며, 각 항목의 키는 입력 메시지의 특성에 따라 추출된다.
@Test
public void collectMap() {
// 동물 이름 방출
Flux<String> animalFlux = Flux.just(
"aardvark", "elephant", "koala", "eagle", "kangaroo");
// animalFlux로부터 collectMap()dmfh 새로운 Mono animalMapMono 생성
// Map 키는 동물 이름의 첫번째 문자, value는 동물 이름
Mono<Map<Character, String>> animalMapMono =
animalFlux.collectMap(a -> a.charAt(0));
StepVerifier
.create(animalMapMono)
.expectNextMatches(map -> {
return
map.size() == 3 &&
map.get('a').equals("aardvark") &&
map.get('e').equals("eagle") &&
map.get('k').equals("kangaroo");
})
.verifyComplete();
}
10.3.4 리액티브 타입에 로직 오퍼레이션 수행하기
all, any()
all()
은 모든 메시지가 조건을 충족하는지 확인한다.any()
는 최소한 하나의 메시지가 조건을 충족하는지 검사한다.
@Test
public void all() {
Flux<String> animalFlux = Flux.just(
"aardvark", "elephant", "koala", "eagle", "kangaroo");
// 모든 문자열이 a를 포함 하는가
Mono<Boolean> hasAMono = animalFlux.all(a -> a.contains("a"));
StepVerifier.create(hasAMono)
.expectNext(true)
.verifyComplete();
// 모든 문자열이 k를 포함 하는가
Mono<Boolean> hasKMono = animalFlux.all(a -> a.contains("k"));
StepVerifier.create(hasKMono)
.expectNext(false)
.verifyComplete();
}
@Test
public void any() {
Flux<String> animalFlux = Flux.just(
"aardvark", "elephant", "koala", "eagle", "kangaroo");
// a를 포함하는 문자열이 최소 1개 존재하는가?
Mono<Boolean> hasAMono = animalFlux.any(a -> a.contains("a"));
StepVerifier.create(hasAMono)
.expectNext(true)
.verifyComplete();
// z를 포함하는 문자열이 최소 1개 존재하는가?
Mono<Boolean> hasZMono = animalFlux.any(a -> a.contains("z"));
StepVerifier.create(hasZMono)
.expectNext(false)
.verifyComplete();
}
📌 요약
- 리액티브 프로그래밍에서는 데이터가 흘러가는 파이프라인을 생성한다.
- 리액티브 스트림은 Publisher, Subscriber, Subscription, Transformer의 4가지 타입을 생성한다.
- 프로젝트 리액터는 리액티브 스트림을 구현하며 수많은 오퍼레이션을 제공하는 Flux와 Mono 두 가지 타입으로 스트림을 정의한다.
- 스프링 5는 리액터를 사용해서 Reactive Controller, Repository, Rest Client 를 생성하고 다른 리액티브 프레임워크를 지원한다.