Reactive Programming 4

Observable, Rx

지난시간엔 단일 데이터에 대해 latency 를 지원하는 Future, Promise 에 대해서 알아봤다. 이번에는 컬렉션에서 latency 를 지원하는 방법인 Observable 을 배워보자.

                  One           Many
                  
Synchronous     T/Try[T]     Iterable[T]
Asynchronous    Future[T]    Observable[T]

From Futures to Observables

Future 의 정의를 다시 보면,

trait Future[T] {
  def onComplete[U](f: Try[T] => U)
  (implicit ex: ExecutionContext): Unit
}

여기서 중요한 부분은 콜백 f 를 받아 Unit 을 돌려준다는 것이다.

(Try[T] => Unit) => Unit

이제 => 를 뒤집고, Unit() 로 표기해서 어떤 intuition 을 얻어보자.

(Try[T] => Unit) => Unit
Unit => (Unit => Try[T]) // reverse
() => (()=> Try[T])      // Unit -> ()

Try[T]                   // simplify

여기서 핵심은 () 는 사이드 이펙트를 위해 존재하므로 그 부분을 제거하면 타Future[T] 의 결과는 Try[T] 와 같다는 것이다.

Future[T] and Try[T] are dual

dualityCategory Theory 에 의하면

Every statement, theorem, or definition in category theory has a dual which is essentially obtained by “reversing all the arrows”.

처음 시작 부분에서 이런 테이블을 봤을텐데, 여기서도 duality 관계가 나타난다.

                  One           Many
                  
Synchronous     T/Try[T]     Iterable[T]
Asynchronous    Future[T]    Observable[T]

그리고 지난시간에 이메일을 발송하는 코드에서 onComplete 를 몇 번 호출하던 콜백이 받는 Try[T] 는 동일하다는 것을 봤다. 이것은 Future[T]Try[T] 를 돌려주고, 그 값이 일정하다는 사실을 말한다.

이렇게 생각할수도 있다. 콜백 f: Try[T] => Unit 을 넘기고 Try[T] 를 얻기 위해 asynchronous() 를 사용할 수 있고, Try[T] 를 얻기 전까지 ¸”럭되는 synchronous 를 이용할 수도 있다는 식으로

def asynchronous(): Future[T]
def synchronous(): Try[T]

Iterable

Observable[T] 를 보기전에 synchronous data streamIterable[T] 를 좀 살펴보자.

trait Iterable[T] { def iterator(): iterator[T] }
trait Iterator[T] { def hasNext: Boolean; def next(): T }

while (hasNext) next() 처럼 사용할 수 있다.

그리고 Iterable[T] 를 위한 flatMap 도 정의되어 있다. 따라서 Iterable 은 모나드다.

def flatMap[B](f: A => Iterable[B]): Iterable[B]

Iterator 를 이용해서 디스크에서 파일을 읽는 코드를 작성하면

def ReadLinesFromDisk(path: String): iterator[String] = {
  Source.fromFile(path).getLines()
}

val lines = ReadLinesFromDisk(path)

for (line <- lines) {
  ... DoWork(line) ... // latency
}

한 라인이 100K 로 어마어마하게 길다면 디스크를 읽기 전까지 기다려야할까? Future 처럼 비동°로 IO 연산을 수행하는 방법을 찾아보자. 이전과 좀 다른점은 지금은 컬렉션을 다루고 있다는 점이다.

이 문제를 해결하기 위해 컬렉션을 순회하는 trait 를 좀 살펴보자. 어떤 일을 해야 하는지 알아야하니까.

trait Iterable[T] {
  def iterator(): Iterator[T]
}

trait Iterator[T] {
  def hasNext: Boolean
  def next(): T
}

Iterable, Iterator 을 좀 간략화 하면,

  • Iterable() 를 인자로 받아 Iterator 를 돌려주고
  • Iterator() 를 인자로 받아 Try[Option[T]] 를 돌려준다

전체적으로 보면 () => (() => Try[Option[T]]) 다. 사이드이펙트로 예외를 돌려주거나, None 일수 있거나, 아니면 정상적인 값을 얻을 수 있다는 뜻이다.

타입이 좀 복잡한데, 아까 arrow => 를 뒤집었던 방법을 다시 사용해서 간단히 만들어 보자. 강의에서는 dualization trick 이라 부른다.

뒤집은 후에는 Try[Option[T]] 를 ¶„해해 보자. 예외를 주거나, 아무 값도 주지 않거나(끝나거나), 값을 주거나.

() => (() => Try[Option[T]])

// reverse
(Try[Option[T]] => Unit) => Unit)

// simplify
( T         => Unit, // Value
  Throwable => Unit, // Exception
  ()        => Unit  // Nothing, Terminate
) => Unit

즉, 비동기로 컬렉션을 순회하기 위해서는 이런 작업을 처리해줄 무언가가 필요하다. 스칼라에서는 Observable, Observer, Subscription 이 그 일을 담당한다.

trait Observable[T] {
  def Subscribe(observer: Observer[T]): Subscription
}

trait Observer[T] {
  def onNext(value: T): Unit
  def onError(error: Throwable): Unit
  def OnCompleted(): Unit
}

trait Subscription {
  def unsubscribe(): Unit
}

즉, ObservableTry[Option[T]] 에 따라 할일을 지정해 놓은 Observer 를 세팅하고, Subscription 을 얻은 뒤 이후에 필요에 의해 중단해야 하면 unsubscribe 를 호출하는 방식이다. 이는 작업하Š” 대상이 컬렉션이므로 Future 와는 달리, 하나의 값이 아니라 무한한 값들을 얻어올 수 있기 때문.

Future vs Observable

초반에, 이 테이블을 다시 보면 IterableObservabledual 이다.

                  One           Many
                  
Synchronous     T/Try[T]     Iterable[T]
Asynchronous    Future[T]    Observable[T]

그리고 이 테이블에 의하면, FutureObservable 을 비교해보면 onemany 가 의미하는 바를 type 으로 이해할 수 있다.

Observable[T] = (Try[Option[T]] => Unit) => Unit
Future[T]     = (Try[T]         => Unit) => Unit

타입을 보면 FutureOption 부분이 없지만 Observable 은 있다. 즉 Observable 은 아무런 값도 없다는 사실을 의미하는 타입 Option 을 이용해 종료시점 을 알려줄 수 있기 때문에 multiple values 를 처리할 수 있다.

concurrency 측면에선 어떨까? 타입을 살펴보면

object Future {
  def apply[T](body: => T)
   (implicit executor: ExecutionContext): Future[T]
}

trait Observable[T] {
  def observeOn(scheduler: Scheduler): Observable[T]
}

Observable 의 경우엔 하나의 ExecutionContext 가 아니라 여러개를 가져야 하기 때문에 Scheduler 를 이용한다. 이 부분은 나중에 더 자세히 살펴보자.

Observable example

val ticks: Observable[Long] = Observable.interval(1 seconds)
val evens: Observable[Long] = ticks.filter(s => s % 2 == 0)
val bufs:  Observable[Seq[Long]] = evens.buffer(2, 1)
val s = bufs.subscribe(b => println(b))

readLine()
s.unsubscribe

Observablelatency 를 지원하는 컬렉션으로 이해하면 쉽다. interval 을 이용해 간격을 지정하거나, 일반 컬렉션처럼 filter 도 사용할 수 있다. evens.buffer 는 그냥 버퍼링이라고 생각하면 된다. 버퍼 크기가 2인 것으로.

이런 것도 가능하다.

val xs = Observable.range(1, 10)
val ys = xs.map(x => x + 1)

xs 는 비동기 순회를 지원하는 컬렉션이다. filter, map, flatMap, take, zip 등 을 지원한다.

flatMap

(http://reactivex.io/)

색깔의 순서를 봐야할 필요가 있다. Observable 은 비동기 연산이기 때문에 순서가 좀 달라질 수 있다. 구현에도 그런 부분이 나타나 있는데 아래의 코드에서 flatten 이 의미하는 바는 non-deterministic merge 다.

def flatMap(f: T => Observable[S]): Observable[S] = {
  map(f).flatten()
}

다른 코드도 좀 보면

val xs: Observable[Int] = Observable(3, 2, 1)

val yss: Observable[Observable[Int]] = 
  xs.map(x => Observable.Interval(x seconds).map(_ => x).take(2))
  
val zs: Observalble[Int] = yss.flatten()

(https://github.com/iirvine)

위 코드는 x 초 후에 x 2개를 뱉는 Oberservable 을 만든 후 flatten 을 사용해 껍데기를 벗긴다. List(List(1, 2), List(3)).flatten 하면 List(1, 2, 3) 이 되듯이 Observable[Observable].flattenObservable 을 만든다고 생각하면 쉽다.

Merge

(https://github.com/iirvine)

예외나, 종료 등 어떤 이유에서든지 먼저 끝나는 Observable 에 의해 merge 가 종료된다는 점에 주의하자.

Concat

val xs: Observable[Int] = Observable(3, 2, 1)

val yss: Observable[Observable[Int]] = 
  xs.map(x => Observable.Interval(x seconds).map(_ => x).take(2))
  
val zs: Observalble[Int] = yss.concat

(https://github.com/iirvine)

여기서 재밌는점은 yss 의 첫번째 원소인

Observable.Interval(3 seconds).map(_ => x).take(2)

가 끝나기 전까지 다른 원소들이 버퍼링 되므로 주의해야 한다는 점이다. marble diagram 으로 보면

(https://github.com/iirvine)

Earthquakes example

def usgs(): Observable[EarthQuake] = { ... }

class EarthQuake {
  ...
  def magnitude: Double
  def location: GeoCoordinate
}

object Magnitude extends Enumeration {
  def apply(magnitude: Double): Magnitude = { ... }
  type Magnitude = Value
  
  val Micro, Minor, Light, Moderate, Strong, Major, Great = Value
}

val major = quakes.
  map(q => (q.location, Magnitude(q.magnitude))).
  filter { case (loc, mag) => mag => Major }
  
major.subscribe({ case (loc, mag) => 
  println($"Magnitude ${ msg } quake at ${ loc }")
})

이런식으로 사용할 수 있다. 더 실제 동작하는 코드는 여기로. 조금 복잡하다.

위치를 GeoCoordinate 로 받기 때문에, 해당 위치의 나라를 돌려준다든지 등으로 개선할 수 있다.

def reverseGeocode(g: GeoCoordinate): Future[Country] = { ... }

이 함수를 구현하면

val withCountry: Observable[Observable[EarthQuake, Country)]] = 
  usgs().map(q => {
    val country: Future[Country] = reverseGeocode(q.location)
    Objservable(country.map(country => (quake, country)))
  })
  
// val merged: Observable[(EarthQuake, Country)] =
//  withCountry.flatten()
val merged: Observable[(EarthQuake, Country)] = withCountry.concat()

여기서 머징하기 위해 flatten 이나 concat 을 사용할 수 있는데, 언급했듯이 어떤걸 쓰느냐에 따라 순서가 달라질 수 있다. 아래 그¼은 각각 flatten, concat 을 설명한다.

(https://github.com/iirvine)

group by

(https://github.com/iirvine)

def groupBy[K](keySelector: T => K): Observable[(K, Observable[T])]

T 를 받아 키 K 를 만들고, 이것에 따라 Observable 을 그룹짓는다. 이걸 응용하면 나라별로 지진을 취합하는 것이 가능하다.

val byCountry: Observable[(Country, Observable[(EarthQuake, Country)]] = 
  merged.groupBy( case (q, c) => c }

(https://github.com/iirvine)

이제 runningAverage 란 함수가 있다고 해 보자. Observable[Double] 을 받아 업데이트 후 Observable[Double] 을 돌려주는 함수. 그러면 runningAveragePerCountry 는 어떻게 구현할까?

val byCountry: Observable[(Country, Observable[(EarthQuake, Country)]]
def runningAverage(s: Observable[Double]): Observable[Double] = 
  { ... }
val runningAveragePerCountry: Observable[(Country, Observable[Double])] = 
  byCountry.map { case (country, cqs) => 
    (country, runningAverage(cqs.map(_._1.magnitude))
  }

Subscription

지진 예제를 다시 가져오면, 더이상 관심 없을때 unsubscribe 를 호출할 수 있다.

val quakes: Observable[EarthQuake] = { ... }
val s: Subscription = quakes.Subscribe(...)

s.unsubscribe()

근데, 생각해보면 여러 곳에서 subscription 할 수 있다. UI °™은 경우 그 수가 많을 것이다. 이 경우 unsubscribingcancellation 을 의미하지 않는다. 왜냐하면 다른곳에서 subscribing 하고 있을 수 있기 때문이다.

타입을 좀 보면

trait Subscription {
  def unsubscribe(): Unit
}

object Subscription {
  def apply(unsubscribe: => Unit): Subscription
}

trait BooleanSubscription extends Subscription {
  def isUnsubscribed: Boolean
}

trait CompositeSubscription extends BooleanSubscription {
  def +=(s: Subscription): this.type
  def -=(s: Subscription): this.type
}

trait MultipleAssignmentSubscription extends BooleanSubscription {
  def subscription: Subscription
  def subscription_=(that: Subscription): this.type
}

여기서 CompositeSubscription 은 컬렉션처럼 Subscription 을 추가하거나, 제거할 수 있고 unsubscribe 하면 나머지도 모두 취소 된다.

MultipleAssignmentSubscription 은 일종의 inner subscription 을 위한 프록시처럼 동작한다. 세팅하고, 교²´할 수 있지만, 항상 내부에는 동작하는 하나의 Subscription 이 있다.

import rx.lang.scala.subscriptions._
import rx.lang.scala.Subscription

val s = Subscription {
  println("bye, bye")
}

s.unsubscribe()
s.unsubscribe() // buggy

이 경우 두번째 unsubscribe() 를 호출했을때 "bye, bye" 가 호출되지 않는다. 먼저 unsubscribe() 를 호출했기 때문이다.

직접 Subscription 을 구현할때는 다수의 스레드에서 저마다 unsubscribe() 를 호출할 수 있기 때문에 이 메소드는 idempotent 하게 구현되야 한다.

CompositeSubscription 을 이미 unsubscribe 했을땐, 새로운 Subscription 을 추가한다 하더라도 자동으로 unsubscribe 가 호출된다.

MultiAssignmentSubscription 의 경우에는 여러번 할당할 수 있으나, 단 하나의 Subscription 만 가리킨다. 따라서 다음 코드를 실행할 경우 b.unsubscribe 만 호출된다.

val a = Subscription { println("A") }
val b = Subscription { println("B") }
val m = MultiAssignmentSubscription()

multi.subscription = a
multi.subscription = b
multi.unsubscribe

CompositeSubscription 과 마찬가지로 이미 unsubscribe 되었다면, 할당되는 Subscription 도 자동으로 unsubscribe 된다.

CompositeSubscription 이나 MultiAssignment 를 연산을 공유하는 컨테이너라 볼 수 있겠는데, 그럼 여기서 내부의 것만 unsubscribe 하면 어떻게 될까? 당연히 외부의 MultiAssignmentComposite 는 알 길이 없으니 isUnsubscribefalse 가 된다.

Rx Stream

자주 보게 될 타입부터 소개하면

object Observable {
  def apply[T](s: Observer[T] => Subscription): Oberservable[T]
}

trait Observable[T] {
  def subscribe(observer: Observer[T]): Subscription
}

trait Observer[T] {
  def onNext(value: T): Unit
  def onError(error: Throwable): Unit
  def OnCompleted(): Unit
}

trait Subscription {
  def unsubscribe(): Unit
}

아무런 알림도 못받는 Observable 을 만드는 never 와, onError 를 호출하는 apply 를 구현해 보자.

def never(): Observable[Nothing] = Observable[Nothing](observer => {
  Subscription {}
})

def apply[T](error: Throwable): Observable[T] = 
  Observable[T](observer => {
    observer.onError(error)
    Subscription {}
  }

이제 이 함수들을 이용해 다양한 함수를 구현해 보자.

startWith

object Observable {
  def apply[T](s: Observer[T] => Subscription): Oberservable[T]
}

def switchWith(ss: T*): Observable[T] = {
  Observer[T](observer => {
    for(s <- ss) observer onNext(s)
    subscribe(observer)
  }
}

filter

object Observable {
  def apply[T](s: Observer[T] => Subscription): Oberservable[T]
}

def filter(p: T => Boolean): Observable[T] = {
  Observable[T](observer => {
    subscribe(
      (t: T) => { if (p(t)) observer.onNext(t) },
      (e: Throwable) => { observer.onError(e) },
      () => { observer.onCompleted() }
    )
  })
}


map

object Observable {
  def apply[T](s: Observer[T] => Subscription): Oberservable[T]
}

def map[S](f: T => S): Observable[S] = {
  Observable[T](observer => {
    subscribe(
      (t: T) => { if (p(t)) observer.onNext(f(t)) },
      (e: Throwable) => { observer.onError(e) },
      () => { observer.onCompleted() }
    )
  })
}

그림을 잘 보면 input stream 으로 부터 값을 얻어 함수를 적용하고 output stream 으로 뱉는다. 구현도 마찬가지로 현재의 컨테이너인 Observable 로 부터 값을 얻었을때 함수를 적용하고 어떻게 넘겨줄지를 정의한다.

duality 관계인 Iterablemap 구현을 보면 더 명확히 알 수 있다.

def map[S](f: T => S): Iterable[S] = {
  new Iterable[S] {
    val it = this.iterator()
    def iterator: Iterator[S] = new Iterator[S] {
      def hasNext: Boolean = { it.hasNext }
      def next(): S = { f(it.next()) }
    }
  }
}

Future to Observable

Future[T] 를 얻어 Observable[T] 로 바꿔보자. TList[T] 로 바꾸듯이. 그럴려면 Subject 를 알아야 하는데, 이건 지난시간에 배운 Promise 비슷한 역할을 한다.

import scala.concurrent.ExecutionContext.Implicits.global

def race[T](left: Future[T], right: Future[T]): Future[T] = {  
  val p = Promise[T]()

  left  onComplete { p.tryComplete(_) }
  right onComplete { p.tryComplete(_) }

  p.future
}

Promise 로 부터 Future 를 얻고, Future.onComplete 에 콜백을 넘기면, 완료되었을때 Promise.complete 에 의해 호출된다. PromiseFuture 를 위한 대리자? 프록시쯤으로 볼 수 있다.

ObservableSubject 도 비슷한 관계다.

(https://github.com/iirvine)

코드로 이해해 보자.

val channel = PublishSubject[Int]()

val a = channel.subscribe(x => println("a: " + x))
val b = channel.subscribe(x => println("b: " + x))

channel.onNext(42)
a.unsubscribe()

channel.onNext(4711)
channel.onComplete()

val c = channel.subscribe(x => println("c: " + x))
channel.onNext(13)

(https://github.com/iirvine)

Subject 는 일종의 채널이라 보면 된다. 위 코드에서 흥미로운 점은 onComplete (! 로 표시) 가 호출 된 뒤에 옵저버 cSubject 에 추가했음에도 conComplete 가 호출된 것을 알고 있다는 사실이다.


val channel = ReplaySubject[Int]()

val a = channel.subscribe(x => println("a: " + x))
val b = channel.subscribe(x => println("b: " + x))

channel.onNext(42)
a.unsubscribe()

channel.onNext(4711)
channel.onComplete()

val c = channel.subscribe(x => println("c: " + x))
channel.onNext(13)

ReplaySubject 의 경우에는 c 에도 모든 데이터를 받는다. 이는 ReplaySubject 가 히스토리를 캐싱하고있기 때문이다.

다양한 종류의 Subject 를 그림으로 보면

(https://github.com/iirvine)

Converting Future to Observable

object Observable {
  def apply[T](f: Future[T]): Observable[T] = {
    val as = AsyncSubject[T]()
    
    f onComplete {
      case Failure(e) => { as.onError(e) }
      case Success(c) => { as.onNext(c); as.onCompleted() }
    }
    
    as
  }
}

복잡하게 생각하지 말고 그냥 Promise 랑 비슷한 일을 한다고 이해˜면 쉽다.

Notifications

지난 시간에 FutureTry 를 이용하는걸 봤다. Future[Try[T] 처럼. Notification 도 이와 비슷하다. Observable[Notification[T]] 처럼 사용한다.

abstract class Try[+T]
case class Success[T](elem: T) extends Try[T]
case class Failure(t: Throwable) extends Try[Nothing]

abstract class Notification[+T]
case class OnNext[T](elem: T) extends Notification[T]
case class OnError(t: Throwable) extends Notification[Nothing]
case object onCompleted extends Notification[Nothing]

def materialize: Observable[Notification[T]] = { ... }

차이라면, 종료 를 알려주는 onCompleted 가 있다는 것이다. materializeObservable[T] 를 감싸 Observable[Notification[T]] 로 만든다.

Blocking

권할만한 방법은 아니지만, 만약에, 만약에 블러킹이 필요하다면 이런식으로 코드를 작성할 수도 있다는 것„ 지난시간에 배웠다.

val f: Future[String] = { ... }
val text: String = Await.result(f, 10 seconds)

Observable 도 마찬가지다.

val xs: Observable[Long] = Observable.interval(1 seconds).take(5)
val ys: List[Long] = xs.toBlockingObservable.toList

println(ys)

// all Rx operators are non-blocking
val zs: Observable[Long] = xs.sum
val s: Long = zs.toBlockingObservable.single

Observable to Scalar Types

Observable 내에 있는 값들을 계산하기 위해 reduce 를 사용할 수 있다. fold 와 비슷하달까

def reduce(f: (T, T) => T): Observable[T]

재밌는 사실은 리턴타입이 원소가 Observable 을 돌려주기 때문에 Future 와 비슷˜다는 것이다.

Iterable to Observable

잘못된 구현을 먼저 보자.

def from[T](seq: Iterable[T]): Observable[T] = 
  Observable(o => {
    seq.foreach(s => o.onNext(s)) // What if seq is infinite?
    o.onCompleted // What if seq fails?
    Subscription {}
  })

이 구현의 문제점은, Iterable 이 무한하거나, 실패하면 어떻게 처리할지 전혀 고려하지 않았다는 것이다. 게다가 빈 Subscription 을 돌려주기 때문에, unsubscribe 할 수도 없다.

이 문제를 풀기 위해서는 scheduler 가 필요하다.

Scheduler

우선 돌아가는 코드를 만들기 전에 테스트 케이스부터 작성하자

// factory method
object Observable {
  def apply[T](subscribe: Observer[T] => Subscription): Observable[T]
}

def from[T](seq: Iterable[T]): Observable[T] = { ... }

// infinite seq
def nats(): Iterable[Int] = new Iterable[Int] {
  val i = -1
  def iterator: Iterator[Int] = new Itertor[Int] {
    def hasNext: Boolean = { true }
    def next(): Int = { i += 1; i }
  }
}

val infinite: Iterable[Int] = nats()
val subscription = from(infinite).subscribe(x => println(x))

subscription.unsubscribe()

만약 from 이 위에서 본 것처럼 구현되어 있다면 subscription.unsubscribe() 에 도달하지 못한다. 따라서 iteration 을 진행하는 것과는 다른 컨텍스트를 도입해 unsubscribe 를 호출해야 한다. 그래서 스케쥴러가 필요하다.

Future 에서는 ExecutionContext 가 있었지만, Observable 은 복수개의 컨텍스트를 조작해야 하므로 스케쥴러를 써야한다.

object Future[
  def apply[T](body: => T)
   (implicit executor: ExecutionContext): Future[T]
}

trait Observable[T] {
  def observeOn(scheduler: Scheduler): Observable[T]
}

// Runnable == Java's Runnable
trait ExecutionContext {
  def execute(runnable: Runnable): Unit
}

// '=> Unit' == Runnable
trait Scheduler {
  def schedule(work: => Unit) Subsciption
}

// example
val scheduler = Scheduler.newThreadScheduler
val subscription = scheduler.schedule {
  println("Hello World!")
}

FutureRunnable 을 취소할 수 있는 방법이 없지만, SchedulerSubscription 을 리턴하기 때문에 취소할 수 있다. 그러나 일단 작업이 시작되면 취소할 수 있는 방법은 없다. 아래 예제를 보자

def from[T](seq: Iterable[T])
    (implicit s: Scheduler): Observable[T] = {
   
  Observable[T](o => {
    s.schedule {
      seq.foreach(x => observer.onNext(x))
      observer.onCompleted()
    }
  }
}

onNext 가 호출되기 전, 아주 잠깐동안만 작업을 취소할 수 있는 기회가 있다. 다시 말해서, 이터레이션이 통채로 스케쥴링 되기 때문에 좀 별로라는 것이다. 매 이터레이션마다 취소할 기회가 있는 from 을 구현하고 싶다.

scheduler 의 다른 시그니쳐를 좀 보자.

trait Scheduler {
  def schedule(work: => Unit): Subscription
  
  def schedule(work: Scheduler => Subscription): Subscription
  
  def schedule(work: (=> Unit) => Unit): Subscription
}

두번째 시그니쳐를 보자. schedule 함수가 하는 일이 Scheduler 를 받아 등록하고 Subscription 을 돌려주는 일이라면 그것 자체를 work 로 받고, 해당 work 에서 한번씩만 이터레이션 한다면 매 이터레이션에서 취소할 기회를 가질 수 있다.

이건 사실 세번째 시그니쳐와 동일한데 이유는 뒤에서 보겠다.

from 의 새로운 구현을 보면

def from[T](seq: Iterable[T])
    (implicit) scheduler: Scheduler): Observable[T] = {
    
  Observable[T](o => {
    val it = seq.iterator()
    
    scheduler.schedule(self => {
      if (it.hasNext) { o.onNext(it.next()); self() }
      else { o.onCompleted() }
    }
  }
}

으사양반 이게 무슨 개소리요!

조금 난해한데, it.hasNext 가 있어서 다음 이터레이션으로 넘어갈 수 있으면 self() 를 호출해 자기 자신을 스케쥴링•œ다. 따라서 매 이터레이션마다 사용 가능한 Subscription 이 있으므로 취소할 수 있는 기회가 생긴다.

물론 Subscription 이 갱신되는데 어떻게 하나의 레퍼런스로 그게 가능하느냐 하는 질문이 나올 수 있는데, 우리는 이미 MultipleAssignmentSubscription 을 배웠다. schedule 함수의 내부를 보자.

def schedule(work: (=> Unit) => Unit): Subscription = {
  val subs = new MultipleAssignmentSubscription()
  
  schedule(scheduler => {
    def loop(): Unit = {
      subs.Subscription = scheduler.schedule {
        work { loop() }
      }
    }
    
    loop()
    subs
  })
  
  subs
}

def from[T](seq: Iterable[T])
    (implicit) scheduler: Scheduler): Observable[T] = {
    
  Observable[T](o => {
    val it = seq.iterator()
    
    scheduler.schedule(self => {
      if (it.hasNext) { o.onNext(it.next()); self() }
      else { o.onCompleted() }
    }
  }
}

즉, self 가 바로 loop 다. 자기 자신을 스케쥴링하는 함수인데, work -> loop -> work -> loop -> ... 을 반복하면서 더 이터레이션할 멤버가 없거나, unsubscribe 하기 전까지 재귀적으로 돈다.

Scheduler to Observable

돌려주는 값 없이 행위 그 자체만 보면, 스케쥴러 그 자체는 Observable[Unit] 에 대응된다.

object Observable {
  def apply() (implicit s: Scheduler): Observable[Unit] = {
    Observable(o => {
      s.schedule(self => {
        o.onNext(()); self
      })
    })
  }
}

implicit val s = Scheduler.NewThreadScheduler
val ticks: Observable[Unit] = Observable()

이게 실제로 어떻게 동작하나 보면

object Observable {
  def apply(s: Observer[T] => Subscription) = new Observable[T] {
    def subscribe(o: Observer[T]): Subscription = { Magic(s(o)) }
  }
}

val subs = Observable(o => F(o)).subscribe(observer)

// = conceptually
val subs = Magic(F(observer))

여기서 FMagic 는 임의의 함수라 생각하면 된다. (그런게 있나보다 하자.)

이걸 왜 이야기하냐 하면 auto unsubscribe 가 가능하기 때문이다. 스케쥴링 하는 행위를 observable 로 변경할 수 있다면, 스케쥴링이 불가능할때 unsubscribe 하도록 만드는 것이다.

Fobserver.onCompletedobserver.OnError 를 호출한다면, Magic 함수에 의해 자동으로 unsubscribe 가 호출된다. 이로인해 다음에 호출되는 onNext 는 아무런 영향도 미치지 않게 된다.

이럴 수 있는 이유는 Observable 을 생성하는 방식이 Rx Contract 을 만족하기 때문이다. (따라서 직접 Observable, Observer 를 만들지 말고 팩토리 메소드를 사용해야한다)

(onNext)*(onCompleted + onError)?

onNext 는 여러번 호출될 수 있으나 겹치지 않고, onCompletedOnError 는 옵션이지만 (무한한 시퀀스가 존재하기때문) 호출된다면 둘 중 단 한개만, 단 한번 호출되야한다는 것이다. 아까 본 코드를 다시 나열해서 어떻게 그렇게 되나 살펴보자.

object Observable {
  def apply() (implicit s: Scheduler): Observable[Unit] = {
    Observable(o => {
      s.schedule(self => {
        o.onNext(()); self
      })
    })
  }
}

def schedule(work: (=> Unit) => Unit): Subscription = {
  val subs = new MultipleAssignmentSubscription()
  
  schedule(scheduler => {
    def loop(): Unit = {
      subs.Subscription = scheduler.schedule {
        work { loop() }
      }
    }
    
    loop()
    subs
  })
  
  subs
}


implicit val s = Scheduler.NewThreadScheduler
val ticks: Observable[Unit] = Observable()

ticks.subscribe(observer)

여기서 ticks.subscribe(observer) 를 계속 풀면

Observable({ o => scheduler.schedule {
  self => o.onNext(()); self()
}}).subscribe(observer)

// unfold create
scheduler.schedule {
  self => observer.onNext(()); self()
}

// unfold schedule
val m = new MultipleAssignmentSubscription()

schedule(scheduler => {
  def loop(): Unit = {
    m.Subscription = scheduler.schedule {
      { self => observer.onNext(()); self() }({ loop() })
    }
  }
  
  loop()
  m
})

// `self` is a continuation
val m = new MultipleAssignmentSubscription()

schedule(scheduler => {
  def loop(): Unit = {
    m.Subscription = scheduler.schedule {
      { observer.onNext(()); loop() }
    }
  }
  
  loop()
  m
})

// extract loop
val m = new MultipleAssignmentSubscription()

def loop(): Unit = {
    m.Subscription = scheduler.schedule {
      { observer.onNext(()); loop() }
    }
}

schedule(scheduler => {
  loop()
  m
})

// apply loop
schedule(scheduler => {
  m.Subscription = scheduler.schedule {
      { observer.onNext(()); loop() }
  }
    
  m
})

즉 매 스케쥴링마다, subscription 을 갱신하고, 작업을 진행한뒤, 자기 자신을 다시 스케쥴링 한다.

Range

이렇게 응용할 수 있다.

implicit val scheduler: Scheduler = Scheduler.NewThreadScheduler

def range(start, Int, count: Int): 
  (implicit s: Scheduler) Observable[Int] = {
  
  Observable(o => {
    var i = 0
    Observable().subscribe(u => {
      if (i < count) { o.onNext(start + i); i += 1 }
      else { o.onCompleted() }
    })
  })
}

val xs = range(1, 10)
    xs.subscribe(x => println(x))
    println("range out")

Observable() 은 일종의 무한히 반복되는 스케쥴러고 여기에 액션을 추가해 원하는 작업을 해낼 수 있다. 그리고 작업이 완료되면 자동으로 unsubscribe 를 수행한다. 이제 무한히 긴 스트림을 Observable 로도 다룰 수 있게 되었다.

References

(1) Reactive Programming by Martin Ordersky
(2) http://reactivex.io/
(3) https://github.com/iirvine



comments powered by Disqus