17.3 리액티브 라이브러리 RxJava 사용하기

    • RxJava는 Flow.Publisher를 구현하는 두 클래스를 제공
      • io.reactive.Flowable: 역압력 기능을 제공
      • io.reactivex.Observable: 역압력 기능을 제공하지 않는 기존 클래스
        • 단순한 프로그램, 마우스 움직임 같은 사용자 인터페이스에 적합
        • 이벤트 스트림에는 역압력을 적용하기 어렵기 때문 (마우스 움직임을 느리게 하거나 멈출 수 없음)

     

    • RxJava는 천 개 이하의 요소를 가진 스트림이나, 마우스 & 터치 이벤트 등 역압력을 적용하기 힘든 GUI 이벤트, 자주 발생하지 않는 종류의 이벤트에 역압력을 적용하지 말 것을 권장함

     

    • 여기서는 Obsevable 인터페이스를 사용하는 방법을 설명함
    • 모든 subscriber는 request(Long.MAX_VALUE) 메서드를 이용해서 역압력 기능을 끌 수 있음.
      • subscriber가 정해진 시간 안에 수신한 모든 이벤트를 처리할 수 있는 상황이 아니라면 역압력 기능을 끄지 않는 것이 좋음

     

     

    1. Observable 만들고 사용하기

    Observable, Flowable 클래스는 다양한 종류의 리액티브 스트림을 편리하게 만들 수 있도록 여러 팩토리 메서드 제공

    Observable, Flowable은 Publisher를 구현하므로 팩토리 메서드는 리액티브 스트림을 만듦

     

    just() 팩토리 메서드

    Observable<String> strings = Observable.just("first", "second");

    하나 이상의 요소를 이용해 방출하는 Observable로 변환

    Observable의 구독자는 onNext("first"), onNext("second"), onComplete()의 순서로 메시지를 받음

     

     

    interval() 팩토리 메서드

    Observable<Long> onePerSec = Observable.interval(1, TimeUnit.SECONDS);

    onePerSec은 0에서 시작해 1초 간격으로 long 형식의 값을 무한으로 증가시키며 값을 방출함

     

     

    RxJava

    • Observable: Flow api의 Publisher 역할을 함
    • Observer: Flow api의 Subscriber 역할을 함
      • Subscription 대신 onSubscribe 메서드를 가짐
    public interface Observer<T>{
    	void onSubscribe(Disposable d);
        void onNext(T t);
        void onError(Throwable t);
        void onComplete();
    }

     

    Emitter

    public interface Emitter<T>{
    	void onNext(T t);
        void onError(Throwable t);
        void onComplete();
    }

    ObservableEmitter 인터페이스는 Emitter를 상속함

    필요한 이벤트를 전송하는 역할을 함

     

     

    예제

    package observable_example;
    
    import org.example.TempInfo;
    
    import java.io.reactivex.*;
    import java.util.concurrent.TimeUnit;
    
    public class Main {
    
        public static void main(String[] args){
            Observable<TempInfo> observable = getTemperature("New York");
            observable.blockingSubscribe(new TempObserver()); // observer 가입 -> 온도 출력하기
        }
    
        public static Observable<TempInfo> getTemperature(String town){
            // Observable.create(@NonNull ObservableOnSubscriber<T>)
            // ObservableOnSubscriber.subscribe(@NonNull ObservableEmitter<T>)
            return Observable.create(emitter ->
                        Observable.interval(1, TimeUnit.SECONDS).subscribe(
                                i->{
                                    if(!emitter.isDisposed()){ // observer가 폐기되지 않았으면 작업 수행
                                        if(i>=5){
                                            emitter.onComplete();
                                        } else{
                                            try{
                                                emitter.onNext(TempInfo.fetch(town));
                                            }catch(Exception e){
                                                emitter.onError(e);
                                            }
                                        }
                                    }
                                }
                        )
                    );
        }
    }

    getTemperature 메서드: 1초마다 한 개의 온도를 방출하는 Observable 생성

     

    package observable_example;
    
    public class TempObserver implements Observer<TempInfo>{
    
        @Override
        public void onComplete(){
            System.out.println("Done!");
        }
    
        @Override
        public void onError(Throwable throwable){
            System.out.println("Got problem: " + throwable.getMessage());
        }
    
        @Override
        public void onSubscribe(Disposable disposable){
    
        }
    
        @Override
        public void onNext(TempInfo tempInfo){
            System.out.println(tempInfo);
        }
    }

    TempObserver.java: 수신한 온도를 출력하는 Observer

     

     

    2. Observable을 변환하고 합치기

    RxJava는 스트림을 합치고, 만들고, 거르를 수 있음

    마블 다이어그램: 수평선과 도형으로 표현한 리액티브 스트림

    특수 기호: 에러나 완료신호

    박스: 해당 연산이 요소를 어떻게 변화하거나 여러 스트림을 어떻게 합치는지 보여줌

     

     

    merge: 두 개 이상의 Observable이 방출한 이벤트를 하나로 합침

     

    map: Observable이 발행하는 요소를 반환함

     

    public static Observable<TempInfo> getCelsiusTemperature(String town){
            return getTemperature(town).map(tmp -> new TempInfo(tmp.getTown(), (tmp.getTemp()-32)*5/9));
    }

    Observable에 map을 이용해 화씨를 섭씨로 변환할 수 있음!

    댓글