- RxJava는 Flow.Publisher를 구현하는 두 클래스를 제공
- io.reactive.Flowable: 역압력 기능을 제공
- io.reactivex.Observable: 역압력 기능을 제공하지 않는 기존 클래스
- 단순한 프로그램, 마우스 움직임 같은 사용자 인터페이스에 적합
- 이벤트 스트림에는 역압력을 적용하기 어렵기 때문 (마우스 움직임을 느리게 하거나 멈출 수 없음)
- RxJava는 천 개 이하의 요소를 가진 스트림이나, 마우스 & 터치 이벤트 등 역압력을 적용하기 힘든 GUI 이벤트, 자주 발생하지 않는 종류의 이벤트에 역압력을 적용하지 말 것을 권장함
- 여기서는 Obsevable 인터페이스를 사용하는 방법을 설명함
- 모든 subscriber는 request(Long.MAX_VALUE) 메서드를 이용해서 역압력 기능을 끌 수 있음.
- subscriber가 정해진 시간 안에 수신한 모든 이벤트를 처리할 수 있는 상황이 아니라면 역압력 기능을 끄지 않는 것이 좋음
1. Observable 만들고 사용하기
Observable, Flowable 클래스는 다양한 종류의 리액티브 스트림을 편리하게 만들 수 있도록 여러 팩토리 메서드 제공
Observable, Flowable은 Publisher를 구현하므로 팩토리 메서드는 리액티브 스트림을 만듦
just() 팩토리 메서드
Observable<String> strings = Observable.just("first", "second");
하나 이상의 요소를 이용해 방출하는 Observable로 변환
Observable의 구독자는 onNext("first"), onNext("second"), onComplete()의 순서로 메시지를 받음
interval() 팩토리 메서드
Observable<Long> onePerSec = Observable.interval(1, TimeUnit.SECONDS);
onePerSec은 0에서 시작해 1초 간격으로 long 형식의 값을 무한으로 증가시키며 값을 방출함
RxJava
- Observable: Flow api의 Publisher 역할을 함
- Observer: Flow api의 Subscriber 역할을 함
- Subscription 대신 onSubscribe 메서드를 가짐
public interface Observer<T>{
void onSubscribe(Disposable d);
void onNext(T t);
void onError(Throwable t);
void onComplete();
}
Emitter
public interface Emitter<T>{
void onNext(T t);
void onError(Throwable t);
void onComplete();
}
ObservableEmitter 인터페이스는 Emitter를 상속함
필요한 이벤트를 전송하는 역할을 함
예제
package observable_example;
import org.example.TempInfo;
import java.io.reactivex.*;
import java.util.concurrent.TimeUnit;
public class Main {
public static void main(String[] args){
Observable<TempInfo> observable = getTemperature("New York");
observable.blockingSubscribe(new TempObserver()); // observer 가입 -> 온도 출력하기
}
public static Observable<TempInfo> getTemperature(String town){
// Observable.create(@NonNull ObservableOnSubscriber<T>)
// ObservableOnSubscriber.subscribe(@NonNull ObservableEmitter<T>)
return Observable.create(emitter ->
Observable.interval(1, TimeUnit.SECONDS).subscribe(
i->{
if(!emitter.isDisposed()){ // observer가 폐기되지 않았으면 작업 수행
if(i>=5){
emitter.onComplete();
} else{
try{
emitter.onNext(TempInfo.fetch(town));
}catch(Exception e){
emitter.onError(e);
}
}
}
}
)
);
}
}
getTemperature 메서드: 1초마다 한 개의 온도를 방출하는 Observable 생성
package observable_example;
public class TempObserver implements Observer<TempInfo>{
@Override
public void onComplete(){
System.out.println("Done!");
}
@Override
public void onError(Throwable throwable){
System.out.println("Got problem: " + throwable.getMessage());
}
@Override
public void onSubscribe(Disposable disposable){
}
@Override
public void onNext(TempInfo tempInfo){
System.out.println(tempInfo);
}
}
TempObserver.java: 수신한 온도를 출력하는 Observer
2. Observable을 변환하고 합치기
RxJava는 스트림을 합치고, 만들고, 거르를 수 있음
마블 다이어그램: 수평선과 도형으로 표현한 리액티브 스트림
특수 기호: 에러나 완료신호
박스: 해당 연산이 요소를 어떻게 변화하거나 여러 스트림을 어떻게 합치는지 보여줌
merge: 두 개 이상의 Observable이 방출한 이벤트를 하나로 합침
map: Observable이 발행하는 요소를 반환함
public static Observable<TempInfo> getCelsiusTemperature(String town){
return getTemperature(town).map(tmp -> new TempInfo(tmp.getTown(), (tmp.getTemp()-32)*5/9));
}
Observable에 map을 이용해 화씨를 섭씨로 변환할 수 있음!
'백엔드 > 모던자바인액션' 카테고리의 다른 글
ch17.2.3 Flow 라이브러리 예제 (0) | 2023.04.13 |
---|---|
ch17. 리액티브 프로그래밍 (0) | 2023.04.12 |
ch16. CompletableFuture: 안정적 비동기 프로그래밍 (0) | 2023.04.04 |
ch15. CompletableFuture와 리액티브 프로그래밍 컨셉의 기초 (0) | 2023.03.23 |
[ch4] 스트림 소개 (0) | 2022.09.19 |
댓글