본문 바로가기
android

RxJava 의 핵심 기술 - concat, merge, zip

by Gil 2020. 2. 27.
728x90

1. Concat

Concatenate : '사슬 같이 잇다' 라는 뜻

A Task와 B Task 가 있다면, A -> B 순으로 작업을 진행한다. 

val observer1: Observable<String> = Observable.create { emitter ->
    GlobalScope.launch {
        delay(1000)
        Log.i("test_log","obs 1")

        emitter.onNext("1")
        emitter.onComplete()
    }
}
val observer2: Observable<String> = Observable.create { emitter ->
    GlobalScope.launch {
        delay(1000)
        Log.i("test_log","obs 2")


        emitter.onNext("2")
        emitter.onComplete()
    }
}

observableList.add(observer1)
observableList.add(observer2)

Observable.concat(observableList)
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(
        { Log.i("test_log","out: ${it}")},
        { Log.i("test_log","onError") },
        { Log.i("test_log","onComplete")
})

출력결과: 

obs 1
out: 1
obs 2
out: 2
onComplete

 

참고로 Observebled.create 의 emitter 는 onNext(), onComplete(), onError() 함수를 호출할 수 있고,

Observable 의 subscribe() 함수에서 받아서 콜백 처리를 할 수 있다. 

onNext, onError, onComplete 순으로 메서드 작성

 

<다이어그램으로 설명>


2. merge

A Task와 B Task 가 있다면, 동시에 처리한다. 

val observer1: Observable<String> = Observable.create { emitter ->
    GlobalScope.launch {
        delay(1000)
        Log.i("test_log","obs 1")

        emitter.onNext("1")
        emitter.onComplete()
    }
}
val observer2: Observable<String> = Observable.create { emitter ->
    GlobalScope.launch {
        delay(1000)
        Log.i("test_log","obs 2")


        emitter.onNext("2")
        emitter.onComplete()
    }
}

observableList.add(observer1)
observableList.add(observer2)

Observable.merge(observableList)
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(
        { Log.i("test_log","out: ${it}")},
        { Log.i("test_log","onError") },
        { Log.i("test_log","onComplete")
})

출력 결과:

obs 1
obs 2
out: 1
out: 2
onComplete

or

obs 1
out: 1
obs 2
out: 2
onComplete


설명: 비동기라서 순서가 바뀔 수 있다.

 

<다이어그램으로 설명>

2가지 예시


3. zip

A Task와 B Task 가 있다면, 두개를 묶어서 결과를 처리할 수 있게 한다. 
(여러 API 통신 결과가 모두 완료되는 시점을 처리하기에 적합하다.)

val observer1: Observable<String> = Observable.create { emitter ->
    GlobalScope.launch {
        delay(1000)
        Log.i("test_log","obs 1")

        emitter.onNext("1")
        emitter.onComplete()
    }
}
val observer2: Observable<String> = Observable.create { emitter ->
    GlobalScope.launch {
        delay(2000)
        Log.i("test_log","obs 2")


        emitter.onNext("2")
        emitter.onComplete()
    }
}

observableList.add(observer1)
observableList.add(observer2)

Observable.zip(observableList, { (t1, t2) -> "$t1 + $t2" })
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(
        { Log.i("test_log","out: ${it}")},
        { Log.i("test_log","onError") },
        { Log.i("test_log","onComplete")
})

출력 결과: 

obs 1
obs 2
out: 1 + 2
onComplete

설명: obs1 출력 후, 1초후에 obs2 가 출력. 그와 동시에 나머지가 출력된다. 

 

참고로 zip 함수의 두번째 파라미터는 2개의 묶음을 조합해준다. 
(Observable.zip(observableList, { (t1, t2) -> "$t1 + $t2" }) 코드 참고)

 

<다이어그램으로 설명>

 

zip 함수가 2개 이상인 경우도 묶어서 처리를 할 수 있는지는 체크해서 업데이트 하겠습니다. (아마 되지않을까 싶다.)

 

 

출처
https://namget.tistory.com/entry/RxKotlinRxJava-merge-zip-%EC%95%8C%EC%95%84%EB%B3%B4%EA%B8%B0

https://vagabond95.github.io/2019/08/10/is-this-rxjava-2/

https://m.post.naver.com/viewer/postView.nhn?volumeNo=6721006&memberNo=34635212

 

'android' 카테고리의 다른 글

jitpack - Data Binding 분석하기  (0) 2020.03.13
Android 저장소 시스템  (5) 2020.03.05
RxJava 란  (0) 2020.02.04
How to draw Rect Overlay View?  (0) 2020.01.05
ViewPager를 가진 Dialog 구현 주의사항  (0) 2020.01.02