[리엑티브 생산자 Publisher  2]  Flowable & Observable  (feat. 배압전략)
Language/Reactive

[리엑티브 생산자 Publisher 2] Flowable & Observable (feat. 배압전략)

728x90
반응형

 생산자 Flowable vs Observable 비교

 

  Floawable Observable
Reactive Streams 인터페이스 구현 O 구현 X
데이터 처리 Subscripber 에서 처리 Observer 에서 처리
배압 기능 (데이터 갯수 제어) O X
  Subscription 으로 전달받는
데이터 갯수를 제어할 수 있음
배압기능이 없기때문에
데이터 갯수를 제어할 수 없음
구독 해지 Subscription로 해지 Disposeable으로 해지

 

 

 


Flowable 생산자 (배압 전략)


 

배압이란,

Flowable(생산자)에서 데이터를 통지하는 속도가

Subscriber(구독자)에서 통지된 데이터를 처리하는 속도보다 빠를 때

밸런스를 맞추기위해 데이터 통지량을 제어하는 기능을 말한다. 

 

 

 

 

데이터 통지속도와 데이터 처리 속도가 균형을 이루지 못할때

에러가 발생하는데 코드로 살펴보자

 

(사실 이것은 Flowable 배압전략 중 하나인 ERROR 전략 이다. )

Flowable.interval(1L, TimeUnit.MILLISECONDS)		// interval: 데이터 통지
        .doOnNext(data -> Logger.log(LogType.DO_ON_NEXT, data))	// doOnNext: interval함수에서 데이터를 통지할때 호출되는 콜백함수 인터벌 함수에서 어떻게 데이터가 통지하는제 두온넥스트 함수에서 확인할 수 잇다. 
        .observeOn(Schedulers.computation())		// observeOn: 데이터 처리하는 스레드를 분리하는 역할
        .subscribe(					// subscribe: 데이터 구독
                data -> {				// 람다식: 데이터 처리
                    Logger.log(LogType.PRINT, "# 소비자 처리 대기 중..");
                    TimeUtil.sleep(1000L);
                    Logger.log(LogType.ON_NEXT, data);
                },
                error -> Logger.log(LogType.ON_ERROR, error),
                () -> Logger.log(LogType.ON_COMPLETE)
        );

Thread.sleep(2000L);

interval 함수에서 데이터를 1밀리세컨즈마다 데이터 생성하고,

subscribe 함수에서 구독해서 데이터를 람다식으로 처리하는 속도는 1000밀리세컨즈(1초)마다 처리한다. 

즉, 데이터를 통지하는 속도가 데이터를 구독해서 처리하는 속도보다 천배 빠르다. 

 

이 소스코드를 실행시켜보자

 

 

코드 실행결과

생산자쪽에서 통지한 데이터는 RxComputationThreadPool-2 스레드에서 실행되고

소비자 쪽에서 데이터를 처리하는 부분은 RxComputationThreadPool-1 스레드에서 실행되고 있다. 

 

onOnNext() 함수를 통해 확인해보면 생산자는 쭉 계속 데이터를 통지하다가

127 데이터를 통지했을때

소비자쪽에서는 첫번째 데이터인 0을 이제야 처리를 마쳤다고 나온다. 

 

그리고 MissignBackpressureException 에러(연두색 줄)가 발생했다. 

 

화면에 잘린 에러문구를 다시 보면 다음과 같다.

 

 

 

사실 방금 예제는

배압전략(Backpressure Strategy) 중 그냥 에러를 발생시켜버리는 전략인 ERROR 전략 을 사용한 예제이다. 

그밖에 Flowable 생산자를 사용한 다른 배압전략 종류를 살펴보자.  

 

 

 

 

배압 전략 (Backpressure Strategy)

Rxjava에서는 Backpressure Strategy를 통해 Flowable이 통지 대기 중인 데이터를 어떻게 다룰지에 대한 배압 전략을 제공한다. 데이터를 통지하는 속도와 받은 데이터를 처리하는 속도 차이로 인해 발생되는 에러를 방지한다.

1. MISSING 전략
배압을 적용하지 않는다.
나중에 onBackpressureXXX( ) 로 배압 적용을 할 수 있다.

2. ERROR 전략
통지된 데이터가  버퍼의 크기를 초과하면 MissingBackpressureException 에러를 통지한다.
즉, 소비자가 생산자의 통지 속도를 따라 잡지 못할 때 발생한다.

3. BUFFER 전략 : DROP_LATEST
버퍼가 가득 찬 시점에 버퍼내에서 가장 최근에 버퍼로 들어온 데이터를 DROP한다.
DROP 된 빈 자리에 버퍼 밖에서 대기하던 데이터를 채운다.


4. BUFFER 전략 : DROP_OLDEST
버퍼가 가득 찬 시점에 버퍼내에서 가장 오래전에(먼저) 버퍼로 들어온 데이터를 DROP한다.
DROP 된 빈 자리에는 버퍼 밖에서 대기하던 데이터를 채운다.

5. DROP 전략
버퍼에 데이터가 모두 채워진 상태가 되면 이후에 생성되는 데이터를 버리고(DROP), 버퍼가 비워지는 시점에 DROP 되지 않은 데이터부터 다시 버퍼에 담는다.

6. LATEST 전략
버퍼에 데이터가 모두 채워진 상태가 되면 버퍼가 비워질 때까지 통지된 데이터는 버퍼 밖에서 대기하며 버퍼가 비워지는 시점에 가장 나중에(최근에) 통지된 데이터부터 버퍼에 담는다.

 

 

 

 

 

1. MISSING 전략 예제 코드

Flowable.interval(1L, TimeUnit.MILLISECONDS)
        .doOnNext(data -> Logger.log(LogType.DO_ON_NEXT, data))
        .observeOn(Schedulers.computation())
        .subscribe(
                data -> {
                    Logger.log(LogType.PRINT, "# 소비자 처리 대기 중..");
                    TimeUtil.sleep(1000L);
                    Logger.log(LogType.ON_NEXT, data);
                },
                error -> Logger.log(LogType.ON_ERROR, error),
                () -> Logger.log(LogType.ON_COMPLETE)
        );

 

처음에는 배압을 적용하지 않고 나중에 배압을 적용하는 전략이다.

따라서 에러가 발생한다. 

 

 

 

2. ERROR 전략 예제 코드
위에서 이미 설명함

 


3. BUFFER 전략 : DROP_LATEST 예제코드

 

통지된 데이터로 채워진 버퍼의 데이터를 소비자가 모두 소비하면

버퍼 밖에서 대기중인 통지된 데이터 중에서  

가장 나중에(최근에) 통지된 데이터부터 다시 버퍼에 채운다.

 

버퍼 안에서 가장 최근에 들어온 데이터 11을 Drop 시킨다. 

소비자가 1을 아직도 처리하지 못한 상태인데 추가로 13이 들어오면 가장 최근에 들어온 12가 Drop 된다. 

 

 

 

System.out.println("# start : " +TimeUtil.getCurrentTimeFormatted());
Flowable.interval(300L, TimeUnit.MILLISECONDS)
        .doOnNext(data -> Logger.log("#inverval doOnNext()", data))
        .onBackpressureBuffer(
                2,	// 버퍼 용량
                () -> Logger.log("overflow!"),
                BackpressureOverflowStrategy.DROP_LATEST)
        .doOnNext(data -> Logger.log("#onBackpressureBuffer doOnNext()", data))
        .observeOn(Schedulers.computation(), false, 1) // 세번째 파리미터 1은 소비자쪽에서 요청하는 데이터 갯수
        .subscribe(
                data -> {
                    TimeUtil.sleep(1000L);
                    Logger.log(LogType.ON_NEXT, data);
                },
                error -> Logger.log(LogType.ON_ERROR, error)
        );

onBackpressureBuffer() 함수 마지막 파라미터에 BackpressureOverflowStrategy.DROP_LATEST 를 주어 Drop latest 전략을 사용했다. 

버퍼 용량이 2이고,

소비자는 한번에 1만큼의 데이터를 처리한다. ( = observeOn 함수를 통해 1만큼씩의 데이터를 요청함 )

 

실행결과는 다음과 같다. 

첫번째 오버플로우 발생시 2 drop 하고 3이 들어가게된다. 

 

따라서 소비자가 처리한 데이터를 보면

0, 1, 다음 2가 아닌 3 순서로 처리되는것을 확인할 수 있다. 

 


4. BUFFER 전략 : DROP_OLDEST 예제 코드

생산자쪽에서 데이터 통지 시점에 버퍼가 가득 차있으면 버퍼내에 있는 데이터 중에서

가장 먼저(OLDEST) 버퍼 안에 들어온 데이터를 삭제하고 버퍼 밖에서 대기하는 데이터를 채운다.

 

버퍼가 가득찬 상태에서 12가 들어오게되면 버퍼안의 데이터 중 가장 처음 들어온 2가 Drop 되고

추가로 13이 들어오면 버퍼안의 데이터중 가장 처음 들어온 3이 Drop 된다.

 

코드로 살펴보자

System.out.println("# start : " +TimeUtil.getCurrentTimeFormatted());

Flowable.interval(300L, TimeUnit.MILLISECONDS)
        .doOnNext(data -> Logger.log("#inverval doOnNext()", data))
        .onBackpressureBuffer(
                2,
                () -> Logger.log("overflow!"),
                BackpressureOverflowStrategy.DROP_OLDEST)
        .doOnNext(data -> Logger.log("#onBackpressureBuffer doOnNext()", data))
        .observeOn(Schedulers.computation(), false, 1)
        .subscribe(
                data -> {
                    TimeUtil.sleep(1000L);
                    Logger.log(LogType.ON_NEXT, data);
                },
                error -> Logger.log(LogType.ON_ERROR, error)
        );

onBackpressureBuffer() 함수 마지막 파라미터에 BackpressureOverflowStrategy.DROP_OLDEST 를 주어 Drop oldest 전략을 사용했다.

 

버퍼 용량이 2이고, 소비자는 한번에 1만큼의 데이터를 요청한다. 

 

버퍼가 1, 2 가 들어있어 가득 찬 상태에서 생산자로부터 추가로 3이 통지되었을때, 

overflow 가 발생하고, 버퍼안의 데이터중 가장 처음 들어온 데이터이 1이  Drop 된다. 

 

두번째  overflow 발생은 5가 들어왔을때 3이 Drop 되고, 

세번째 overflow 발생은 6이 들어왔을때 4가 Drop 된다. 

 

그 결과 소비자쪽에서는 맨 처음 0이 처리되고

1은 Drop 되었기때문에 그 다음 2가 처리되고, 

그다음 3, 4 가 Drop 되었기때문에 5가 처리된것을 확인 할 수 있다. 


5. DROP 전략 예제 코드

버퍼가 가득차면 버퍼 바깥쪽에서

통지 대기중인 데이터들은 계속 파기(DROP)하고 

버퍼를 비운 시점에 Drop되지 않고 대기중인 데이터부터 버퍼에 담는다.

버퍼가 가득찼을때 추가로 통지되어 대기중인 데이터들(11, 12, 13 ...)은 

버퍼가 비워질때까지 파기된다.

이후 버퍼가 비워지면 Drop 되지 않고 대기중인 데이터부터 차례로 버퍼에 담기게 된다. 

 

 

 

Flowable.interval(300L, TimeUnit.MILLISECONDS)
        .doOnNext(data -> Logger.log("#inverval doOnNext()", data))
        .onBackpressureDrop(
                dropData -> Logger.log(LogType.PRINT, dropData + " Drop!"))
        .observeOn(Schedulers.computation(), false, 1)
        .subscribe(
                data -> {
                    TimeUtil.sleep(1000L);
                    Logger.log(LogType.ON_NEXT, data);
                },
                error -> Logger.log(LogType.ON_ERROR, error)
        );

onBackpressureDrop() 함수를 통해 Drop 전략을 구현했다. 

버퍼의 크기는 1이고 

소비자쪽에서는 데이터를 1개씩 처리한다. 

 

 

interval 함수에서 맨 처음 통지한 0을 약 1초뒤에 소비자쪽에서 처리했음을 onNext 함수로 확인할수 있다.  

그 1초 사이에 intervbal 함수는 1을 추가로 통지하게되는데

그 1은 버퍼에 저장되기 된다. 

그 다음 추가로 2를 통지했는데 

이때 소비자쪽에서는 아직 0을 처리하지 못한 상태이기때문에 2는 Drop 되었다. 

그 다음 추가로 3을 발행했는데 

아직도 소비자쪽에서는 0 처리를 완료하지 못한 상태이기때문에 3도 Drop 되었다. 

 

그 다음 0이 드디어 처리완료되었고,

그 다음 통지된 4는 Drop 되지 않고 버퍼에 저장 된다. 

그 사이 5, 6, 7 도 차례로 Drop 되고

소비자는 버퍼에 저장된 4를 처리한다. 

 

다음 과정을 반복하고..

결과적으로 소비자는 0 4, 8 등을 차례로 처리하게된다. 

 

 


6. LATEST 전략 예제 코드

통지된 데이터로 채워진 버퍼의 데이터를 소비자가 모두 소비하면

버퍼 밖에서 대기중인 통지된 데이터 중에서

가장 나중에(최근에) 통지된 데이터부터 다시 버퍼에 채운다.

Flowable.interval(300L, TimeUnit.MILLISECONDS)
        .doOnNext(data -> Logger.log("#inverval doOnNext()", data))
        .onBackpressureLatest()
        .observeOn(Schedulers.computation(), false, 1)
        .subscribe(
                data -> {
                    TimeUtil.sleep(1000L);
                    Logger.log(LogType.ON_NEXT, data);
                },
                error -> Logger.log(LogType.ON_ERROR, error)
        );

 

 

 

 

 

 


Observable vs Flowable 데이터 통지/구독/처리


Flowable로 데이터를 생성하는 예제코드

 

기본 형식으로 표현 해보았다. 

 

먼저 데이터를 생성해보자. 

Flowable<String> flowable =
        Flowable.create(new FlowableOnSubscribe<String>() {
            @Override
            public void subscribe(FlowableEmitter<String> emitter) throws Exception {
                String[] datas = {"Hello", "RxJava!"};
                for(String data : datas) {
                    // 구독이 해지되면 처리 중단
                    if (emitter.isCancelled()) { return; }
                    // 데이터 통지
                    emitter.onNext(data);
                }
                // 데이터 통지 완료를 알린다
                emitter.onComplete();
            }
        }, BackpressureStrategy.BUFFER); // 구독자의 처리가 늦을 경우 데이터를 버퍼에 담아두는 설정.

Flowable 의 create 함수는

첫번째 파라미터로 FlowableOnSubscribe 인터페이스 구현객체,

두번째 파라미터로 배압전략(BackpressureStrategy.BUFFER)을 받는다.

 

FlowableOnSubscribe 의 구현 객체는 익명클래스로 구현되어있다. 

FlowableOnSubscribe 인터페이스subscribe() 함수라는 하나의 함수를 구현해야하는데

subscribe() 함수는 FlowableEmitter 라는 구현객체를 파라미터로 받는다. 

 FlowableEmitter 가 실제적으로 파라미터를 통지하는 역할을 한다. 

 

flowable.observeOn(Schedulers.computation())
        .subscribe(new Subscriber<String>() {
            // 데이터 개수 요청 및 구독을 취소하기 위한 Subscription 객체
            private Subscription subscription;

            @Override
            public void onSubscribe(Subscription subscription) {
                    this.subscription = subscription;
                    this.subscription.request(Long.MAX_VALUE);
            }

            @Override
            public void onNext(String data) {
                Logger.log(LogType.ON_NEXT, data);
            }

            @Override
            public void onError(Throwable error) {
                Logger.log(LogType.ON_ERROR, error);
            }

            @Override
            public void onComplete() {
                Logger.log(LogType.ON_COMPLETE);
            }
        });

소비자 쪽에서 subscribe() 함수는 Subscriber 인터페이스 구현객체를 파라미터로 받고 있는데

Subscriber 인터페이스는 4가지 함수 onSubscribe(), onNext,(), onError(), onComplet() 를 구현해야한다. 

그중 onSubscribe() 함수는 Subscription 인터페이스 구현객체를 파라미터로 받는데

Subscription 객체를 통해 구독을 해지 cancel() 할 수 있고, 생산자쪽에 데이터 통지를 요청 request()할 수 있다. 

 

 

이제 소비자쪽에서 데이터를 구독하는것을 시작으로 데이터를 구독하고,

생산자쪽에서 데이터를 통지하고,

통지된 데이터를 소비자쪽에서 처리하는 동작방식을 좀더 자세히 살펴보자. 

 

소비자쪽에서 데이터를 보낼 준비가 완료되었음을 알리기위해 소비자쪽의 onSubscriobe() 가 호출된다. 

onSubscribe 함수의 requst함수가 생산자쪽에 데이터를 통지해달라고 요청하면 

생산자쪽의 subscribe() 함수가 호출된다. 

 

subscribe() 함수 내부에서  datas 문자열 배열이 loop를 돌면서

차례차례 emitter 객체의 onNext 함수를 통해 데이터를 통지한다. 

 

emitter 객체가 onNext 함수를 호출하게되면

소비자쪽의 subscribe 함수 내부에 구현되어있는 onNext 함수가 호출되어 여기에서 데이터를 처리하게 된다. 

 

마지막으로 onNext 를 모두 호출하게되면 데이터 통지가 완료됬음을 알리기 위해

생산자쪽에서는 emitter 객체의 onComplete() 함수를 호출하게 되고 이 함수가 호출되면

소비자쪽의  onComplete() 함수가 호출되어 데이터 통지가 완료가 된다. 

 

 

 

 

위 코드를 람다식으로 표현할 수 있다.

FlowableOnSubscribe 와 Subscriber 인터페이스를 람다 표현식으로 바꾸어 표현했다. 

더보기

람다 표현식

( 함수 입력 파라미터  ->  {함수 구현체}  )

Flowable<String> flowable =
        Flowable.create(emitter -> {
            String[] datas = {"Hello", "RxJava!"};
            for(String data : datas) {
                // 구독이 해지되면 처리 중단
                if (emitter.isCancelled()) { return; }
                // 데이터 발행
                emitter.onNext(data);
            }
            // 데이터 발행 완료를 알린다
            emitter.onComplete();
        }, BackpressureStrategy.BUFFER);
flowable.observeOn(Schedulers.computation())
        .subscribe(data -> Logger.log(LogType.ON_NEXT, data),
                error -> Logger.log(LogType.ON_ERROR, error),
                () -> Logger.log(LogType.ON_COMPLETE),
                subscription -> subscription.request(Long.MAX_VALUE));

 

 

실행결과는 다음과 같다. 

 

 

 

Observable로 데이터를 생성하는 예제코드

 

기본 형식으로 표현

// 생성자로 데이터 통지
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
        
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                String[] datas = {"Hello", "RxJava!"};
                for(String data : datas){
                    if(emitter.isDisposed()) {  return; }
                    emitter.onNext(data);
                }
                emitter.onComplete();
            }
        });

Flowable의 create 함수는 파라미터로 FlowableOnSubscribe 인터페이스 구현객체인 반면

Observable은 create 함수는 파라미터로 ObservableOnSubscribe 인터페이스 구현객체를 받는다. 

그리고 Observable은 배압 기능이 없기때문에 배압전략을 받는 두번째 파라미터가 없다. 

 

// 소비자로 데이터 처리
observable.observeOn(Schedulers.computation())	// observeOn(): 스케줄러 
        .subscribe(new Observer<String>() {	// subscribe() : 데이터 구독
        
    @Override
    public void onSubscribe(Disposable disposable) {  // 아무 처리도 하지 않음.    }

    @Override
    public void onNext(String data) {   Logger.log(LogType.ON_NEXT, data);    }

    @Override
    public void onError(Throwable error) {   Logger.log(LogType.ON_ERROR, error);    }

    @Override
    public void onComplete() {   Logger.log(LogType.ON_COMPLETE);    }
});

Flowable의 소비자 쪽에서 subscribe() 함수는 Subscriber 인터페이스 구현객체를 파라미터로

Observable의 소비자쪽에서 subscribe() 함수는 Observer 인터페이스 구현객체를 파라미터로 받는다. 

 

두 인터페이스가 클래스 명만 다를 뿐 하는 역할은 둘이 같다. 

 

단지 Observable 은 배압기능이 없기때문에

Observer의 onSubscribe 함수가 호출되면 아무 처리도 하지 않는다. 

(Flowable의 경우 requst함수가 생산자쪽에 데이터를 통지 받을 갯수를 요청함)

 

 

 

람다로 표현하면 다음과 같다. 

Observable<String> observable = Observable.create(emitter -> {
            String[] datas = {"Hello", "RxJava!"};
            for(String data : datas){
                if(emitter.isDisposed()) { return; }
                emitter.onNext(data);
            }
            emitter.onComplete();
        });

 

observable.observeOn(Schedulers.computation())
        .subscribe(
                data -> Logger.log(LogType.ON_NEXT, data),
                error -> Logger.log(LogType.ON_ERROR, error),
                () -> Logger.log(LogType.ON_COMPLETE),
        disposable -> {/**아무것도 하지 않는다.*/}
);

 

실행결과는 다음과 같다. 

 

728x90
반응형