[리액티브 연산자 Operator 1]  생성, 필터링, 변환, 결합,
Language/Reactive

[리액티브 연산자 Operator 1] 생성, 필터링, 변환, 결합,

728x90
반응형

 리액티브 연산자

RxJava에서의 연산자는 메서드(함수)입니다. 

연산자를 이용하여 데이터를 생성하고 통지하는 Flowable이나 Observable 등의 생산자를 생성할 수 있습니다.
Flowable이나 Observable에서 통지한  데이터를 다양한 연산자를 사용하여 가공 처리하여 결괏값을 만들어 냅니다.

리액티브 연산자는 3-4백여개의 연산자가 있습니다. 

 

  1. Flowable/Observable 생성 연산자
  2. 통지된 데이터를 필터링 해주는 연산자
  3. 통지된 데이터를 변환 해주는 연산자
  4. 여러 개의 Flowable/Observable을 결합하는 연산자
  5. 에러 처리 연산자
  6. 유틸리티 연산자
  7. 조건과 불린 연산자
  8. 통지된 데이터를 집계 해주는 연산자 

 

 


각 연산자들을 마블 다이어그램으로 볼 수 있는 사이트

 

https://rxmarbles.com/ 

 

RxMarbles: Interactive diagrams of Rx Observables

 

rxmarbles.com

 


1. CREATION OBSERVABLES (Observable 생성)


create 연산자

interval 연산자
range 연산자
timer 연산자
defer 연산자
fromIterable 연산자
fromFuture 연산자


create( ) 연산자

Observable<String> observable = Observable.create(
        new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("Canada");
                emitter.onNext("USA");
                emitter.onNext("Korea");
                emitter.onComplete();
            }
        }
);



observable.subscribe(new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {
        // 아무것도 안함
    }
    @Override
    public void onNext(String data) {
        Logger.log(LogType.ON_NEXT, data);
    }
    @Override
    public void onError(Throwable error) {
        Logger.log(LogType.ON_ERROR, error);
    }
    @Override
    public void onComplete() {
        Logger.log(LogType.ON_COMPLETE);
    }
});

람다식으로 표현하면 다음과 같다. 

Observable.create(emitter -> {
    emitter.onNext("Canada");
    emitter.onNext("USA");
    emitter.onNext("Korea");
    emitter.onComplete();
}).subscribe(System.out::println);

결과

 


interval( ) 연산자

* initialDelay 파라미터 이용해서 최초 통지에 대한 대기 시간을 지정할 수 있다.
* 완료 없이 계속 통지한다.
* 호출한 스레드와는 별도의 스레드에서 실행된다.
* polling(어떤 특정 요청을 반복적으로 수행하는것)용도로 주로 사용.

System.out.println("# start : " + TimeUtil.getCurrentTimeFormatted());

// 0초 후에 시작해서 1초 간격으로 생성해라
Observable.interval(0L, 1000L, TimeUnit.MILLISECONDS)
        .map(num -> num + " count")
        .subscribe(data -> Logger.log(LogType.ON_NEXT, data));

// main 스레드를 딜레이 시켜줘야 RxComputationThreadPooldml 결과를 볼 수 있다.
TimeUtil.sleep(3000);

 



range( ) 연산자

* 지정한 값(n) 부터 m 개의 숫자(Integer)를 통지한다.
* 절차형 프로그래밍방식의 for, while 문 등의 반복문을 대체할 수 있다.

// 0부터 시작해서 5개의 숫자를 반복해라!
Observable<Integer> source =
        Observable.range(0, 5);

source.subscribe(num -> Logger.log(LogType.ON_NEXT, num));

 


timer( ) 연산자

* 지정한 시간이 지나면 0(Long)을 통지한다.
* 0을 통지하고 onComplete( ) 이벤트가 발생하여 종료한다.
* 호출한 스레드와는 별도의 스레드에서 실행된다.
* 설정한 시간이 지난 후에 특정 동작을 수행하고자 할때 사용

Logger.log(LogType.PRINT, "# Start!");

Observable<String> observable =
        Observable.timer(2000, TimeUnit.MILLISECONDS)
                .map(count -> "Do work!");

observable.subscribe(data -> Logger.log(LogType.ON_NEXT, data));

TimeUtil.sleep(3000);

2초가 지난 후에 timer 함수에서 통지된 데이터를 map 함수를 통해 전달받는다
소비자는 timer에서 통지한 데이터 2초를 받는것이 아닌,
map 함수에서 string으로 변환한 Do work 문자열을 소비자쪽을 전달한다

 


defer( ) 연산자

* 구독이 발생할 때마다 즉, subscribe( )가 호출될 때마다 '새로운' Observable을 생성한다.
* 선언한 시점의 데이터를 통지하는 것이 아니라 호출 시점의 데이터를 통지한다.
* 실제 구독이 발생할 때 Observable을 새로 반환하여 새로운 Observable을 생성한다
* defer()를 활용하면 데이터 흐름의 생성을 지연하는 효과를 보여준다.
* 데이터 생성을 미루는 효과가 있기때문에 최신 데이터를 얻고자할 때 활용할 수 있다.

// defer()를 이용해 통지
Observable<LocalTime> observable = Observable.defer(() -> {
            LocalTime currentTime = LocalTime.now();
            return Observable.just(currentTime);
        });

// just()를 이용해 통지
Observable<LocalTime> observableJust = Observable.just(LocalTime.now());

observable.subscribe(time -> Logger.log(LogType.PRINT, " # defer() 구독1의 구독 시간: " + time));
observableJust.subscribe(time -> Logger.log(LogType.PRINT, " # just() 구독1의 구독 시간: " + time));

//  defer 는 선언한 시점의 데이터를 통지하는 것이 아니라 호출 시점의 데이터를 통지함
observable.subscribe(time -> Logger.log(LogType.PRINT, " # 3초후 defer() 구독2의 구독 시간: " + time));
observableJust.subscribe(time -> Logger.log(LogType.PRINT, " # 3초후 just() 구독2의 구독 시간: " + time));

 

 


fromIterable( ) 연산자

* Iterable 인터페이스를 구현한 클래스(ArrayList )를 파라미터로 받는다.
* Iterable에 담긴 데이터를 순서대로 통지한다.

List<String> countries = Arrays.asList("Korea", "Canada", "USA", "Italy");

Observable.fromIterable(countries)
        .subscribe(country -> Logger.log(LogType.ON_NEXT, country));

 


fromFuture( ) 연산자

* Future 인터페이스는 자바 5에서 비동기 처리를 위해 추가된 "동시성 API"이다.
* 시간이 오래 걸리는 작업은 Future를 반환하는 ExcutorService에게 맡기고
* 비동기로 다른 작업을 수행할 수 있다.
* Java 8에서는 CompletableFuture 클래스를 통해 구현이 간결해졌다.

Logger.log(LogType.PRINT, "# start time");

// 긴 처리 시간이 걸리는 작업
Future<Double> future = longTimeWork();

// 짧은 처리 시간이 걸리는 작업
shortTimeWork();

Observable.fromFuture(future)
        .subscribe(data -> Logger.log(LogType.PRINT, "# 긴 처리 시간 작업 결과 : " + data));

Logger.log(LogType.PRINT, "# end time");
}


public static CompletableFuture<Double> longTimeWork(){
return CompletableFuture.supplyAsync(() -> calculate());
}

private static Double calculate() {
Logger.log(LogType.PRINT, "# 긴 처리 시간이 걸리는 작업 중.........");
TimeUtil.sleep(6000L);
return 100000000000000000.0;
}

private static void shortTimeWork() {
TimeUtil.sleep(3000L);
Logger.log(LogType.PRINT, "# 짧은 처리 시간 작업 완료!");
}

 

 

 


2. FILTERING(필터링) 연산자


filter 연산자

 


distinct 연산자

 


take 연산자

 


takeUntil 연산자

(case 1)

파라미터로 지정한 조건이 true가 될 때까지 데이터를 계속 통지한다.
파리미터로 지정한 조건이 될 때까지 데이터를 계속 발행

 public static List<Car> carList =
            Arrays.asList(
                    new Car(CarMaker.CHEVROLET, "말리부", CarType.SEDAN, 23_000_000),
                    new Car(CarMaker.HYUNDAE, "쏘렌토", CarType.SUV, 33_000_000),
                    new Car(CarMaker.CHEVROLET, "트래버스", CarType.SUV, 50_000_000),
                    new Car(CarMaker.HYUNDAE, "팰리세이드", CarType.SEDAN, 28_000_000),
                    new Car(CarMaker.CHEVROLET, "트랙스", CarType.SUV, 18_000_000),
                    new Car(CarMaker.SSANGYOUNG, "티볼리", CarType.SUV, 23_000_000),
                    new Car(CarMaker.SAMSUNG, "SM6", CarType.SUV, 40_000_000),
                    new Car(CarMaker.SSANGYOUNG, "G4렉스턴", CarType.SUV, 43_000_000),
                    new Car(CarMaker.SAMSUNG, "SM5", CarType.SEDAN, 35_000_000)
            );

 

Observable.fromIterable(SampleData.carList)
        .takeUntil((Car car) -> car.getCarName().equals("트랙스"))
        .subscribe(car -> System.out.println(car.getCarName()));

TimeUtil.sleep(300L);

 

(case 2)

파라미터로 지정한 Observable이 최초 데이터를 통지할 때까지 데이터를 계속 통지한다.
파라미터로 받은 Flowable/Observable이 최초로 데이터를 발행할 때까지 계속 데이터를 발행
timer와 함께 사용하여 특정 시점이 되기 전까지 데이터를 발행하는데 활용하기 용이

// 5.5초
Observable.interval(1000L, TimeUnit.MILLISECONDS)
        .takeUntil(Observable.timer(5500L, TimeUnit.MILLISECONDS))
        .subscribe(data -> Logger.log(LogType.ON_NEXT, data));

TimeUtil.sleep(5500L);

5.5초가 되기 전까지


skip 연산자

(case 1)

파라미터로 지정한 숫자만큼 데이터를 건너뛴 후 나머지 데이터를 통지한다.

Observable.range(1, 7)
        .skip(3)
        .subscribe(data -> Logger.log(LogType.ON_NEXT, data));

 

(case 2)

파라미터로 지정한 시간 동안에는 데이터를 통지를 건너뛴 후 지정한 시간 이후,, 나머지 데이터를 통지한다.

 

Observable.interval(300L, TimeUnit.MILLISECONDS)
        .skip(1000L, TimeUnit.MILLISECONDS)
        .subscribe(data -> Logger.log(LogType.ON_NEXT, data));

TimeUtil.sleep(3000L);

 


3. TRANSFORMATION(변환) 연산자


map 연산자

원본 Observable에서 통지하는 데이터를 원하는 값으로 변환 후 통지한다.
변환 전, 후 데이터 타입은 달라도 상관없다.
null을 반환하면 NullpointException이 발생하므로 null이 아닌 데이터 하나를 반드시 반환해야 한다.
Observable이 통지한 항목에 함수를 적용하여 통지된 값을 변환시킨다.

List<Integer> oddList = Arrays.asList(1, 3, 5, 7);
Observable.fromIterable(oddList)
        .map(num -> "1을 더한 결과: " + (num + 1))
        .subscribe(data -> Logger.log(LogType.ON_NEXT, data));


flatMap 연산자

 

 


concatMap 연산자

 

 


switchMap 연산자

 

 

 


4. COMBINATION(결합) 연산자


 

merge 연산자



concat 연산자



zip 연산자



combineLatest

 

 

 

 

참고

www.inflearn.com/course/%EC%9E%90%EB%B0%94-%EB%A6%AC%EC%95%A1%ED%8B%B0%EB%B8%8C%ED%94%84%EB%A1%9C%EA%B7%B8%EB%9E%98%EB%B0%8D-1

https://github.com/ITVillage-Kevin/rxjava

728x90
반응형