선생님, 개발을 잘하고 싶어요.

[RxJava] 주요 객체 설명, Observable, Observer, Disposable 본문

개발/소프트웨어 개발

[RxJava] 주요 객체 설명, Observable, Observer, Disposable

알고싶은 승민 2020. 3. 22. 23:04

서론

 

반응형 프로그래밍에 대한 블로그 포스팅을 보고 오시는 것을 추천드립니다. 

 

오늘은 이전 포스팅의 이론적 내용을 실제로 구현해 놓은 라이브러리, RxJava에 대해서 소개해 드리려고 합니다.

 

TL;DR

  • RxJava에서 데이터 스트림을 표현하는 객체는 Observable이다.
  • RxJava에서 데이터 스트림을 읽는 것을 구독이라고 한다.
  • onNext 에서 데이터 스트림에 흐르는 값을 처리한다.
  • onError 에서 에러를 처리한다.
  • 구독을 표현하는 객체는 Disposable이다.

Observable

 

ObserverPattern에서 데이터 변경을 알리는 Subject와 데이터 변경 시 발동하는 Observer에 대해서 말씀드렸습니다.

RxJava에서는 이 Subject를 Observable이라고 말합니다.

 

Observable은 하나의 값을 나타내는 것이 아니라 값들이 흐르는 데이터 스트림을 상징합니다. 다음 다이어 그램처럼 말이죠.

http://reactivex.io/documentation/operators/create.html

 

이 그림에서 아래의 가로로 긴 화살표가 Observable입니다.

그리고 빨간 원, 파란 원은 Observable에 흐르는 데이터를 의미합니다.

마지막의 세로로 짧은 막대기는 해당 Observable이 끝났다. 데이터를 더 이상 방출하지 않는다는 것을 의미합니다.

 

앞으로 이런 그림을 자주 보게 될 테니 기억해두세요.

val observable = Observable.just(1,2,3,4)

Observable은 just라는 함수를 사용하면 손쉽게 만들 수 있습니다. (다른 다양한 생성 연산이 있습니다만 다른 포스트에서 다루겠습니다.)

 

이 경우에는 이런 다이어그램이 생기는 거죠.

1,2,3,4가 순차적으로 들어오고 종료됩니다.

그러면 이렇게 흐르는 데이터를 어떻게 받아서 사용할까요? 빨리 다음으로 넘어가 봅시다.

Obesrver

 

데이터가 단순히 흐르는 것은 의미가 없습니다. 데이터의 변화에 유연하게 반응할 코드를 작성해야 의미가 있겠죠.

데이터 변화에 반응하는 인터페이스를 제공하는 녀석이 바로 Observer입니다.

 

우리의 데이터 흐름에 발생할 수 있는 경우의 수는 여러 가지가 있는데요.

  • 정상적으로 데이터가 변경될 경우 -> onNext
  • 데이터 변환 과정에서 오류가 나는 경우 -> onError
  • 데이터 흐름이 끝나는 경우 -> onComplete

각 경우는 각각 Observer 인터페이스의 onNext, onError, onComplete를 통해 정의됩니다.

 

ObserverPattern으로 잠시 이동해 보면

값이 변경될 때

  • Subject의 SetValue가 호출된다.
  • Subject를 구독하는 모든 Observer의 update함수를 호출한다.

RxJava로 말하면,

  • Observable에 onNext(value)를 호출하여 데이터를 변경시킨다.
  • 데이터가 정상적으로 변경되면 Observer의 onNext를 호출시킨다.
  • 데이터 변환 과정에서 에러가 발생하면 Observer의 onError를 호출시킨다.
  • 데이터 흐름이 종료되면 Observer의 onComplete를 호출시킨다.

가 되겠습니다. 그냥 이론이었던 ObserverPattern에 규격화된 가이드를 제시한 셈이죠.

Observable은 이렇게 등록된 Observer의 인터페이스를 호출 할 것입니다.

다이어그램은 질리시죠. 그래서 이제 Observable, Observer를 사용하는 예제를 보여드리겠습니다.

Hello RxJava

앞에서 본 것처럼, Observable.just를 사용해서 1,2,3,4가 흐를 데이터 스트림을 만들어 두었습니다.

Observer 인터페이스를 구현하는 익명 클래스를 만들었고, 단순히 로그만 찍도록 하였습니다.

observable의 메서드인 subscribe를 사용해서 observer를 구독시켰습니다.

짠, 결과

데이터를 구독하는 좀 더 스마트한 방법

사실, Observer를 익명 객체로 만드는 행동은 더 이상 추천하는 방식이 아닙니다. 오히려 RxJava 상위 버전과 호환 안 되는 구식 방법이죠. 

코틀린에선 행동 뭉치를 넘기기 위해 람다를 사용하는 것이 좋습니다. 

구독 시점에, 콜백을 람다로 넘긴다.

Disposable

 

앞서 살펴본 대로, 우리는. subscribe(Observer)를 이용해서 데이터 스트림을 구독했습니다.

그런데 그걸로 끝인가요?

특정 시점까지만 구독하고 싶은 경우는 어떤가요?

이를 해결하기 위해서 RxJava에서는. subsribe(Observer)를 호출하면 Disposable이라는 객체를 반환합니다.

 

프로그래머는 이 객체의 인터페이스인 dispose를 호출하여 구독을 취소하고 Observable로부터 데이터를 받지 않을 수 있습니다.

 

Subject

구독 취소를 설명하기에 앞서 좀 더 다이내믹하고 재밌는 Observable 객체를 사용해봅시다. 

지금까지 우리는 생성된 시점에 값이 결정된 Observable을 만들었죠. (just를 사용해서)

 

우리가 원하는 시점에 데이터 스트림에 값을 흘려주고 싶습니다. 그럴 때 사용할 수 있는 게 Subject입니다.

 

subject의 메서드인 onNext(value)를 사용해서 데이터 스트림에 값을 추가합니다.

위 코드는 다음과 같은 상황이라는 것이죠.

(onComplete가 호출되지 않았다는 것에 주의하세요. Subject는 임의의 타이밍에 값이 넘어올 수 있기 때문에 코드상 이 데이터 흐름이 끝났다고 확신할 수 없습니다.)

onComplete가 없다.

실제 실행 결과는 다음과 같습니다.

결과

구독 취소란?

이제 이 코드의 뉘앙스가 느껴지십니까?

구독을 취소하면, 더 이상 데이터 스트림으로 부터 값을 받아오지 않습니다.

결과

결론

Observable, Observer는 사실 더이상 RxJava 코드에서 찾아보기 힘든 원시 타입입니다. 하지만 기본적인 RxJava의 콘셉트를 이해하는 데는 더할 나위 없다고 생각합니다.

 

Observable, Observer, Disposable 이 세 가지 개념만 이해하고 있다면, 쉽게 RxJava의 세계로 빠져들 수 있습니다.

 

다음 포스팅에선 데이터 스트림에 흐르는 데이터에 장난질을 해보는 것을 포스팅해볼 예정입니다.

어떤 데이터는 무시하고, 데이터를 다른 포맷으로 변경하던가, 데이터를 다른 데이터 흐름으로 변경하는 작업을 할 수 있습니다.

또한 이러한 연산을 각자 다른 스레드에서 동작시키는 편리한 방법을 소개해 드리겠습니다 :)

Comments