본문 바로가기
java & kotlin

[RxKotlin] 1. 기본 개념 (Observable 과 Observer)

by algosketch 2022. 2. 11.

RxXX 는 ReactiveX 라이브러리의 각 언어별 라이브러리 이름이다. 나는 RxKotlin 을 Intelij 에서 연습하고 있다. 아래와 같이 RxKotlin 과 Coroutine 의존성을 추가하고 있다.

implementation("io.reactivex.rxjava2:rxkotlin:2.4.0")
implementation('org.jetbrains.kotlinx:kotlinx-coroutines-android:1.6.0')

 

RxKotlin 에 관련한 내용은 시리즈로 작성할 예정이다. RxKotlin 에 대한 글은 그래도 조금 있는 편이라 생각하지만 러닝커브가 있는 편이라 양질의 글을 찾아보기 어렵다. 영어로 공부하거나, 여러 한글자료를 읽어보고 종합시켜서 이해하거나 책을 읽어야 한다. 이 시리즈는 영어 자료나 책을 읽기는 부담되는 사람들을 위한 글이다. 책도 사실 Kotlin 으로 작성된 최신 책은 거의 없다. 다만 이 글을 읽기 전에 코틀린과 코루틴에 대해 어느정도 알고 있어야 읽기 쉬울 것이다.

 

1.  Reactive Programming 패러다임

기존의 프로그래밍 패러다임은 pull 방식이다. 다음 코드 예시를 보자.

CoroutineScope(Dispatchers.IO).launch {
    val response = getResponse()
    ...
}

/*
suspend fun getResponse() = withContext(Dispatchers.IO) {
    ...
}
*/

response 에 결과를 받아와서 이 값을 처리하는 방식으로 처리하는 게 일반적인 방법이다. 

Reactive Programming 에서는 데이터가 발생하는 source 가 있고 source 를 구독하는 observer 가 있다. source 가 observer 로 데이터를 push 하는 방식으로 프로그래밍 한다. 다음 코드 예시를 보자.

val list = listOf(2, 1, 3, 4, 5, 6)
val observable = list.toObservable()
val observer = object : Observer<Any> {
    override fun onSubscribe(d: Disposable) {
        println("new subscription")
    }
    override fun onNext(t: Any) {
        println("on next called! : $t")
    }
    override fun onError(e: Throwable) {
        println("error!")
    }
    override fun onComplete() {
        println("on complete")
    }
}
observable.subscribe(observer)

여기서 데이터가 발생하는 source 의 타입은 observable 이고 데이터를 사용하는 객체는 observer 이다. observer 는 onNext 를 포함한 여러 메서드를 오버라이드 하고, 데이터가 발생할 때마다 onNext 를 호출하여 데이터를 처리한다.

 

2. Observable 과 Observer

ReactiveX 에서는 Observable 과 Observer 두 가지 개념이 있다. 나머지는 이 두가지에서 파생된 것이다.

  • Observable : 데이터를 발생시킨다. Flowable, Subject, Single 도 마찬가지로 데이터를 생산하는 역할을 한다.
  • Observer : Observable 을 구독하여 데이터를 소비한다. Subject, Subscriber 도 마찬가지로 데이터를 소비하는 역할을 한다.

Subject 는 두 군데 모두 포함되는 것은 오타가 아니라 실제로 두 가지 역할을 모두 할 수 있기 때문이다. 이번 글에서는 가장 기본이 되는 Observable 과 Observer 에 대해서 알아본다. 참고로 생산자와 소비자를 각각 upstream 과 downstream 이라고도 부른다.

 

3. Observable

Observable 은 데이터를 생산하는 역할을 한다. 크게 나누면 두 가지 방법으로 Observable 객체를 만들 수 있다. 리스트를 Observable 로 바꾸는 방법과 Observable 클래스의 팩토리 메서드를 이용하는 방법이다. 우선 리스트를 이용하여 Observable 객체를 만들어 보자.

val list = listOf(2, 1, 3, 4, 5, 6)
val observable: Observable<Int> = list.toObservable()

코틀린의 확장함수를 이용하여 쉽게 리스트를 통해 Observable 로 바꿀 수 있다. observable 의 타입은 안 적어도 되지만 이해를 돕기 위해 명시했다. 그 다음으로는 factory method 를 이용하는 방법이다. factory method 는 여러 가지가 있다. 그 중 두 가지만 알아보자.

val list = listOf(2, 1, 3, 4, 5, 6)
val observable: Observable<Int> = Observable.fromIterable(list)

이전 코드와 비슷하게 리스트를 이용하여 Observable 객체를 만들었으나 팩토리 메서드를 이용했다. 나머지 하나는 just 를 이용하는 방법이다.

val observable: Observable<Int> = Observable.just(3, 2, 1, 4, 5)

 

4. Observer

Observer 는 네 가지 메소드를 오버라이딩 해야 한다.

  • onSubscribe(d: Disposable) : 구독을 시작하는 시점에 발생한다. 인자를 통해 구독을 취소할 수 있다.
  • onNext(t: Any) : 데이터가 방출될 때마다 발생한다. 인자로 전달된 값이 각 아이템 하나이다.
  • onError(e: Throwable) : 데이터 발생 과정에서 발생한 에러를 처리한다. 구독이 종료된다.
  • onComplete : 데이터 발생이 끝나면 호출된다. onError 가 호출되면 onComplete 는 호출되지 않는다.

코드 예시를 보자.

val observer = object : Observer<Any> {
    override fun onSubscribe(d: Disposable) {
        println("new subscription")
        d.dispose()
    }
    override fun onNext(t: Any) {
        println("on next called! : $t")
    }
    override fun onError(e: Throwable) {
        println("error!")
    }
    override fun onComplete() {
        println("on complete")
    }
}

observer 는 Observer<Any> 를 구현한 익명 객체이다. 물론 <Any> 를 <Int> 로 바꿔도 되고, 익명객체가 아니어도 된다.

 

5. 코드 사용 예시

fun main(args: Array<String>) {
    val observable: Observable<Int> = getResponse()
    val observer: Observer<Int> = TestObserver()
    observable.subscribe(observer) // 이 시점부터 데이터 방출이 시작된다.
}

fun getResponse() : Observable<Int> = listOf(2, 1, 3, 4, 5, 6).toObservable()

class TestObserver : Observer<Int> {
    override fun onSubscribe(d: Disposable) {
        println("TestObserver.onSubscribe")
    }

    override fun onNext(data: Int) {
        println("data = [${data}]")
    }

    override fun onError(e: Throwable) {
        println("TestObserver.onError")
    }

    override fun onComplete() {
        println("TestObserver.onComplete")
    }
}

Observable 은 subscribe 가 호출되기 전까지 데이터 방출을 대기하고 있다가 subscribe 가 호출되는 순간 데이터를 방출한다.

※ observer 가 observable 을 구독하는 형태이지만, 프로그래밍은 observable.subscribe(observer) 형태로 해야 한다. 자세히 알아보려면 디자인 패턴 중 Observer 패턴에 대해 찾아보자. 간단히 설명하자면 observable 에서 데이터가 발생하면 observable 을 구독하는 객체에 알려줘야 한다. observer 에는 notify() 와 같은 메서드가 구현되어 있다. (이 예제에서는 onNext() 와 같은 메서드가 된다.) 따라서 observable 내부적으로 observer 리스트를 갖고 있고, observer.notify() 를 호출하는 형태가 된다.

실행 결과

 

fun main(args: Array<String>) {
    val observable: Observable<Int> = getResponse()
    observable.subscribe {
        println("received $it")
    }
}

fun getResponse() : Observable<Int> = listOf(2, 1, 3, 4, 5, 6).toObservable()

이 코드 예시처럼 작성하면 subscribe 의 인자로 Observer 대신 간단히 하나의 람다만 작성할 수 있는데, Observer 의 onNext 만 구현한 것처럼 동작한다.

실행결과

이번에 RxKotlin 에 대한 가장 기본적인 형태에 대해 알아보았다. 이 내용만 보고는 한계점이 떠오르거나 궁금한 점이 있을 수 있다. 다음 글부터는 각각에 대해서 더 깊은 내용을 다루면서 각각의 궁금증을 (아마도)해결할 예정이다.