반응형
이 포스트 시리즈는 Reactive Programming은 토비의 스프링 저자 이일민님의 리액티브 프로그래밍 유튜브 강좌를 공부하며 정리한 내용입니다.
0. Tips
Stream 인터페이스에 iterate 라는 메소드가 있다.
이 메소드는 어떠한 데이터 스트림을 쉽게 만들어 낼 수 있는 메소드이다.
public List<Integer> createSampleIntegerList(int count) { return Stream.iterate(1, e -> e + 1).limit(count).collect(Collectors.toList()); }
위 코드는 1 ~ 10까지 정수를 담은 리스트를 만든다.
핵심은 iterate(시작값, 값의 변화 함수)
메소드인데, 위 예시에서 시작값은 1, 변화는 1씩 증가시킨다라는 뜻이다.
그리고 limit(10)
을 통해 10개만이라는 제한을 걸어주었다. (걸지 않으면 무한하게 증가하기 때문에 collect를 할 수 없다.)
종종 활용할 곳이 있을것 같아 기록해둔다.
1. Operators 직접 구현해보기
- Reactive Streams의 핵심 개념은 Publisher -> Data -> Subscriber의 흐름으로 데이터가 전달된다는 것이다.
- 아래에서
<-
방향으로의 흐름을 업스트림(Upstream)이라 하고->
방향으로의 흐름을 다운스트림(Downstream)이라 한다. - 그리고 데이터는 업스트림에서 다운스트림 방향으로 (
->
) 흘러간다.
Publisher -> Data -> Subscriber <- subscribe(Subscriber) -> onSubscribe(Subscription) -> onNext -> onNext -> ... -> onComplete
public class PubSub { public static void main(String[] args) { Publisher<Integer> pub = iterPub(Stream.iterate(1, e -> e + 1).limit(10).collect(Collectors.toList())); pub.subscribe(logSub()); } /** * Iterator의 데이터를 발생시키는 Publisher * @param iter 데이터 소스 * @return Publisher */ private static Publisher<Integer> iterPub(Iterable<Integer> iter) { return new Publisher<>() { @Override public void subscribe(Subscriber<? super Integer> subscriber) { subscriber.onSubscribe(new Subscription() { @Override public void request(long n) { // 이 예시 코드에선 Subscriber의 요청갯수인 'n' 인자에 대한 구현은 안되어있다. try { iter.forEach(subscriber::onNext); subscriber.onComplete(); } catch (Exception e) { subscriber.onError(e); } } @Override public void cancel() { // Subscriber가 어떠한 경우로든 Publisher에게 데이터를 그만보내라고 요청하는것 // 그에 대비해 Publisher는 이 메소드안에 적절한 내용을 구현해두어야함 // 일반적으로 flag를 두고 request쪽에서 데이터를 더 보내지 않도록 함 } }); } }; } /** * Publisher에서 수신한 데이터를 출력하는 Subscriber * @return Subscriber */ private static Subscriber<Integer> logSub() { return new Subscriber<>() { @Override public void onSubscribe(Subscription subscription) { System.out.println("onSubscribe"); subscription.request(Long.MAX_VALUE); } @Override public void onNext(Integer integer) { System.out.println("onNext: " + integer); } @Override public void onError(Throwable throwable) { System.out.println("onError: " + throwable); } @Override public void onComplete() { System.out.println("onComplete"); } }; } }
- 여기에서, 데이터를 변환하는 과정을 넣어본다면 아래와 같이 구현할 수 있다.
- 마치 기존 Publisher를 한 겹 감싸고, 데이터를 전달하는 부분인 onNext 메소드에 특정 작업을 심은 새로운 Publisher를 생성하는 것이다.
- 아래
mapPub
함수에서 스트림의 흐름을 이해하기 쉽게 방향과 대조해보면- 인자로 전달된 publisher: 업스트림 (현재 스트림보다 상위에 있는 스트림)
- 새로 생성되어 return되는 Publisher: 중개 스트림 (현재 스트림)
- 이 안에서 구현하는 subscribe함수에 인자로 전달되는 subscriber: 다운스트림 (현재 스트림보다 하위에 있는 스트림)
- publisher.subscribe 안에 새로 생성하는 Subscriber: 중개 스트림 (현재 스트림)
- 업스트림 -> 다운스트림으로의 흐름에 무언가 중개하는 새로운 흐름이 끼어든 것이다.
public class PubSub { public static void main(String[] args) { Publisher<Integer> pub = iterPub(Stream.iterate(1, e -> e + 1).limit(10).collect(Collectors.toList())); Publisher<Integer> mapPub = mapPub(pub, e -> e * 10); // 기존 값에 10을 곱하는 함수 전달 mapPub.subscribe(logSub()); } private static Publisher<Integer> mapPub(Publisher<Integer> publisher, Function<Integer, Integer> mappingFunc) { return new Publisher<Integer>() { @Override public void subscribe(Subscriber<? super Integer> subscriber) { // 인자로 전달된 기존 publisher.subscribe의 인자에 새 Subscriber를 생성하여 넘겨준다. // 이 때, 새 Subscriber에 기존 Subscriber의 기능들을 연결시키고, onNext에 데이터를 가공하는 기능만 부가적으로 추가해준다. publisher.subscribe(new Subscriber<>() { @Override public void onSubscribe(Subscription subscription) { subscriber.onSubscribe(subscription); } @Override public void onNext(Integer integer) { // 이전 퍼블리셔에서 발생된 데이터에 'mappingFunc' 적용하여 전달 subscriber.onNext(mappingFunc.apply(integer)); } @Override public void onError(Throwable throwable) { subscriber.onError(throwable); } @Override public void onComplete() { subscriber.onComplete(); } }); } }; } }
- 만약 기존 스트림(Publisher) 내 모든 값을 합한 값을 발생시키는 Publisher를 구현한다면 아래와 같이 할 수 있다.
public class PubSub { public static void main(String[] args) { Publisher<Integer> pub = iterPub(Stream.iterate(1, e -> e + 1).limit(10).collect(Collectors.toList())); Publisher<Integer> sumPub = sumPub(pub); sumPub.subscribe(logSub()); } private static Publisher<Integer> sumPub(Publisher<Integer> publisher, Function<Integer, Integer> mappingFunc) { return new Publisher<Integer>() { @Override public void subscribe(Subscriber<? super Integer> subscriber) { publisher.subscribe(new Subscriber<>() { // 업스트림에서 전달되는 값을 합해둘 멤버변수 선언 private int sum = 0; @Override public void onSubscribe(Subscription subscription) { subscriber.onSubscribe(subscription); } @Override public void onNext(Integer integer) { // 업스트림에서 데이터가 전달되면 바로 전달하지 않고 값을 더해둠 this.sum += integer; } @Override public void onError(Throwable throwable) { subscriber.onError(throwable); } @Override public void onComplete() { // 업스트림에서 데이터 전달이 끝나면 더해둔 값을 다운스트림으로 보냄 subscriber.onNext(sum); // 그리고 현재 스트림은 끝났음을 통지 subscriber.onComplete(); } }); } }; } }
- 만약
mapPub
메소드에 제네릭을 적용하면 아래와 같이 작성할 수 있다. - T타입 데이터 흐름을 R타입 데이터 흐름으로 변경한다.
private static <T, R> Publisher<R> mapPub(Publisher<T> publisher, Function<T, R> mappingFunc) { return new Publisher<R>() { @Override public void subscribe(Subscriber<? super R> subscriber) { publisher.subscribe(new Subscriber<T>() { @Override public void onSubscribe(Subscription subscription) { subscriber.onSubscribe(subscription); } @Override public void onNext(T integer) { subscriber.onNext(mappingFunc.apply(integer)); } @Override public void onError(Throwable throwable) { subscriber.onError(throwable); } @Override public void onComplete() { subscriber.onComplete(); } }); } }; }
2. Reactor 맛보기
- Reactor는 reactive streams 구현체중 하나로 Spring Webflux 엔진으로도 사용된다.
- Reactive Streams 표준을 아주 쉽게 다룰수 있게 해두었다.
- 디펜던시
implementation 'io.projectreactor:reactor-core:3.4.9'
- Reactor를 사용한 심플한 예제를 하나 보자
- Flux는 Publisher의 구현체 중 하나이다. (데이터를 생성하는 역할이라는 뜻)
- create 메소드엔 데이터 생성하는 로직이 들어가는데, next, complete 등 마치 Publisher 의 subscribe 메소드를 구현할 때 하는 작업을 편리하게 할 수 있다.
- map 메소드와 같이 생성 -> 구독 사이에 여러 Operators를 체이닝 방식으로 쉽게 걸어줄 수 있다.
- 위에 Publisher, Subscriber 직접 생성하고 연결하는 코드들은 모두 내부에 감춰져있고 아래와 같은 간단한 코드로 하나의 흐름을 만들수있다.
Flux.<Integer>create(e -> { e.next(1); e.next(2); e.next(3); e.complete(); }) .map(e -> e * 10) .reduce(0, (a, b) -> a + b) .subscribe(s -> System.out.println(s));
반응형
'Back-End > Reactive Programming' 카테고리의 다른 글
Java / Servlet 비동기 기술 흐름 with Spring (0) | 2022.01.13 |
---|---|
Java CompletableFuture 개념 및 간단한 활용 사례 (0) | 2022.01.12 |
리액티브 프로그래밍 시리즈 3 - 스레드 스케쥴링 (Thread scheduling) (0) | 2021.08.25 |
리액티브 프로그래밍 시리즈 1 - Reactive Streams 개요 (0) | 2021.08.17 |