지난시간엔 단일 데이터에 대해 latency 를 지원하는 Future, Promise 에 대해서 알아봤다. 이번에는 컬렉션에서 latency 를 지원하는 방법인 Observable 을 배워보자.
One Many
Synchronous T/Try[T] Iterable[T]
Asynchronous Future[T] Observable[T]
Future
의 정의를 다시 보면,
trait Future[T] {
def onComplete[U](f: Try[T] => U)
(implicit ex: ExecutionContext): Unit
}
여기서 중요한 부분은 콜백 f
를 받아 Unit
을 돌려준다는 것이다.
(Try[T] => Unit) => Unit
이제 =>
를 뒤집고, Unit
을 ()
로 표기해서 어떤 intuition 을 얻어보자.
(Try[T] => Unit) => Unit
Unit => (Unit => Try[T]) // reverse
() => (()=> Try[T]) // Unit -> ()
Try[T] // simplify
여기서 핵심은 ()
는 사이드 이펙트를 위해 존재하므로 그 부분을 제거하면 타Future[T]
의 결과는 Try[T]
와 같다는 것이다.
Future[T]
andTry[T]
are dual
duality 란 Category Theory 에 의하면
Every statement, theorem, or definition in category theory has a dual which is essentially obtained by “reversing all the arrows”.
처음 시작 부분에서 이런 테이블을 봤을텐데, 여기서도 duality 관계가 나타난다.
One Many
Synchronous T/Try[T] Iterable[T]
Asynchronous Future[T] Observable[T]
그리고 지난시간에 이메일을 발송하는 코드에서 onComplete
를 몇 번 호출하던 콜백이 받는 Try[T]
는 동일하다는 것을 봤다. 이것은 Future[T]
가 Try[T]
를 돌려주고, 그 값이 일정하다는 사실을 말한다.
이렇게 생각할수도 있다. 콜백 f: Try[T] => Unit
을 넘기고 Try[T]
를 얻기 위해 asynchronous()
를 사용할 수 있고, Try[T]
를 얻기 전까지 ¸럭되는 synchronous
를 이용할 수도 있다는 식으로
def asynchronous(): Future[T]
def synchronous(): Try[T]
Observable[T]
를 보기전에 synchronous data stream 인 Iterable[T]
를 좀 살펴보자.
trait Iterable[T] { def iterator(): iterator[T] }
trait Iterator[T] { def hasNext: Boolean; def next(): T }
while (hasNext) next()
처럼 사용할 수 있다.
그리고 Iterable[T]
를 위한 flatMap
도 정의되어 있다. 따라서 Iterable
은 모나드다.
def flatMap[B](f: A => Iterable[B]): Iterable[B]
이 Iterator
를 이용해서 디스크에서 파일을 읽는 코드를 작성하면
def ReadLinesFromDisk(path: String): iterator[String] = {
Source.fromFile(path).getLines()
}
val lines = ReadLinesFromDisk(path)
for (line <- lines) {
... DoWork(line) ... // latency
}
한 라인이 100K
로 어마어마하게 길다면 디스크를 읽기 전까지 기다려야할까? Future
처럼 비동°로 IO 연산을 수행하는 방법을 찾아보자. 이전과 좀 다른점은 지금은 컬렉션을 다루고 있다는 점이다.
이 문제를 해결하기 위해 컬렉션을 순회하는 trait
를 좀 살펴보자. 어떤 일을 해야 하는지 알아야하니까.
trait Iterable[T] {
def iterator(): Iterator[T]
}
trait Iterator[T] {
def hasNext: Boolean
def next(): T
}
Iterable
, Iterator
을 좀 간략화 하면,
Iterable
는 ()
를 인자로 받아 Iterator
를 돌려주고Iterator
는 ()
를 인자로 받아 Try[Option[T]]
를 돌려준다전체적으로 보면 () => (() => Try[Option[T]])
다. 사이드이펙트로 예외를 돌려주거나, None
일수 있거나, 아니면 정상적인 값을 얻을 수 있다는 뜻이다.
타입이 좀 복잡한데, 아까 arrow =>
를 뒤집었던 방법을 다시 사용해서 간단히 만들어 보자. 강의에서는 dualization trick 이라 부른다.
뒤집은 후에는 Try[Option[T]]
를 ¶해해 보자. 예외를 주거나, 아무 값도 주지 않거나(끝나거나), 값을 주거나.
() => (() => Try[Option[T]])
// reverse
(Try[Option[T]] => Unit) => Unit)
// simplify
( T => Unit, // Value
Throwable => Unit, // Exception
() => Unit // Nothing, Terminate
) => Unit
즉, 비동기로 컬렉션을 순회하기 위해서는 이런 작업을 처리해줄 무언가가 필요하다. 스칼라에서는 Observable, Observer, Subscription
이 그 일을 담당한다.
trait Observable[T] {
def Subscribe(observer: Observer[T]): Subscription
}
trait Observer[T] {
def onNext(value: T): Unit
def onError(error: Throwable): Unit
def OnCompleted(): Unit
}
trait Subscription {
def unsubscribe(): Unit
}
즉, Observable
에 Try[Option[T]]
에 따라 할일을 지정해 놓은 Observer
를 세팅하고, Subscription
을 얻은 뒤 이후에 필요에 의해 중단해야 하면 unsubscribe
를 호출하는 방식이다. 이는 작업하 대상이 컬렉션이므로 Future
와는 달리, 하나의 값이 아니라 무한한 값들을 얻어올 수 있기 때문.
초반에, 이 테이블을 다시 보면 Iterable
과 Observable
이 dual 이다.
One Many
Synchronous T/Try[T] Iterable[T]
Asynchronous Future[T] Observable[T]
그리고 이 테이블에 의하면, Future
와 Observable
을 비교해보면 one 과 many 가 의미하는 바를 type 으로 이해할 수 있다.
Observable[T] = (Try[Option[T]] => Unit) => Unit
Future[T] = (Try[T] => Unit) => Unit
타입을 보면 Future
는 Option
부분이 없지만 Observable
은 있다. 즉 Observable
은 아무런 값도 없다는 사실을 의미하는 타입 Option
을 이용해 종료시점 을 알려줄 수 있기 때문에 multiple values 를 처리할 수 있다.
concurrency 측면에선 어떨까? 타입을 살펴보면
object Future {
def apply[T](body: => T)
(implicit executor: ExecutionContext): Future[T]
}
trait Observable[T] {
def observeOn(scheduler: Scheduler): Observable[T]
}
Observable
의 경우엔 하나의 ExecutionContext
가 아니라 여러개를 가져야 하기 때문에 Scheduler
를 이용한다. 이 부분은 나중에 더 자세히 살펴보자.
val ticks: Observable[Long] = Observable.interval(1 seconds)
val evens: Observable[Long] = ticks.filter(s => s % 2 == 0)
val bufs: Observable[Seq[Long]] = evens.buffer(2, 1)
val s = bufs.subscribe(b => println(b))
readLine()
s.unsubscribe
Observable
을 latency 를 지원하는 컬렉션으로 이해하면 쉽다. interval
을 이용해 간격을 지정하거나, 일반 컬렉션처럼 filter
도 사용할 수 있다. evens.buffer
는 그냥 버퍼링이라고 생각하면 된다. 버퍼 크기가 2인 것으로.
이런 것도 가능하다.
val xs = Observable.range(1, 10)
val ys = xs.map(x => x + 1)
xs
는 비동기 순회를 지원하는 컬렉션이다. filter
, map
, flatMap
, take
, zip
등 을 지원한다.
색깔의 순서를 봐야할 필요가 있다. Observable
은 비동기 연산이기 때문에 순서가 좀 달라질 수 있다. 구현에도 그런 부분이 나타나 있는데 아래의 코드에서 flatten
이 의미하는 바는 non-deterministic merge 다.
def flatMap(f: T => Observable[S]): Observable[S] = {
map(f).flatten()
}
다른 코드도 좀 보면
val xs: Observable[Int] = Observable(3, 2, 1)
val yss: Observable[Observable[Int]] =
xs.map(x => Observable.Interval(x seconds).map(_ => x).take(2))
val zs: Observalble[Int] = yss.flatten()
위 코드는 x
초 후에 x
2개를 뱉는 Oberservable
을 만든 후 flatten
을 사용해 껍데기를 벗긴다. List(List(1, 2), List(3)).flatten
하면 List(1, 2, 3)
이 되듯이 Observable[Observable].flatten
도 Observable
을 만든다고 생각하면 쉽다.
예외나, 종료 등 어떤 이유에서든지 먼저 끝나는 Observable
에 의해 merge
가 종료된다는 점에 주의하자.
val xs: Observable[Int] = Observable(3, 2, 1)
val yss: Observable[Observable[Int]] =
xs.map(x => Observable.Interval(x seconds).map(_ => x).take(2))
val zs: Observalble[Int] = yss.concat
여기서 재밌는점은 yss
의 첫번째 원소인
Observable.Interval(3 seconds).map(_ => x).take(2)
가 끝나기 전까지 다른 원소들이 버퍼링 되므로 주의해야 한다는 점이다. marble diagram 으로 보면
def usgs(): Observable[EarthQuake] = { ... }
class EarthQuake {
...
def magnitude: Double
def location: GeoCoordinate
}
object Magnitude extends Enumeration {
def apply(magnitude: Double): Magnitude = { ... }
type Magnitude = Value
val Micro, Minor, Light, Moderate, Strong, Major, Great = Value
}
val major = quakes.
map(q => (q.location, Magnitude(q.magnitude))).
filter { case (loc, mag) => mag => Major }
major.subscribe({ case (loc, mag) =>
println($"Magnitude ${ msg } quake at ${ loc }")
})
이런식으로 사용할 수 있다. 더 실제 동작하는 코드는 여기로. 조금 복잡하다.
위치를 GeoCoordinate
로 받기 때문에, 해당 위치의 나라를 돌려준다든지 등으로 개선할 수 있다.
def reverseGeocode(g: GeoCoordinate): Future[Country] = { ... }
이 함수를 구현하면
val withCountry: Observable[Observable[EarthQuake, Country)]] =
usgs().map(q => {
val country: Future[Country] = reverseGeocode(q.location)
Objservable(country.map(country => (quake, country)))
})
// val merged: Observable[(EarthQuake, Country)] =
// withCountry.flatten()
val merged: Observable[(EarthQuake, Country)] = withCountry.concat()
여기서 머징하기 위해 flatten
이나 concat
을 사용할 수 있는데, 언급했듯이 어떤걸 쓰느냐에 따라 순서가 달라질 수 있다. 아래 그¼은 각각 flatten
, concat
을 설명한다.
def groupBy[K](keySelector: T => K): Observable[(K, Observable[T])]
즉 T
를 받아 키 K
를 만들고, 이것에 따라 Observable
을 그룹짓는다. 이걸 응용하면 나라별로 지진을 취합하는 것이 가능하다.
val byCountry: Observable[(Country, Observable[(EarthQuake, Country)]] =
merged.groupBy( case (q, c) => c }
이제 runningAverage
란 함수가 있다고 해 보자. Observable[Double]
을 받아 업데이트 후 Observable[Double]
을 돌려주는 함수. 그러면 runningAveragePerCountry
는 어떻게 구현할까?
val byCountry: Observable[(Country, Observable[(EarthQuake, Country)]]
def runningAverage(s: Observable[Double]): Observable[Double] =
{ ... }
val runningAveragePerCountry: Observable[(Country, Observable[Double])] =
byCountry.map { case (country, cqs) =>
(country, runningAverage(cqs.map(_._1.magnitude))
}
지진 예제를 다시 가져오면, 더이상 관심 없을때 unsubscribe
를 호출할 수 있다.
val quakes: Observable[EarthQuake] = { ... }
val s: Subscription = quakes.Subscribe(...)
s.unsubscribe()
근데, 생각해보면 여러 곳에서 subscription 할 수 있다. UI °은 경우 그 수가 많을 것이다. 이 경우 unsubscribing 이 cancellation 을 의미하지 않는다. 왜냐하면 다른곳에서 subscribing 하고 있을 수 있기 때문이다.
타입을 좀 보면
trait Subscription {
def unsubscribe(): Unit
}
object Subscription {
def apply(unsubscribe: => Unit): Subscription
}
trait BooleanSubscription extends Subscription {
def isUnsubscribed: Boolean
}
trait CompositeSubscription extends BooleanSubscription {
def +=(s: Subscription): this.type
def -=(s: Subscription): this.type
}
trait MultipleAssignmentSubscription extends BooleanSubscription {
def subscription: Subscription
def subscription_=(that: Subscription): this.type
}
여기서 CompositeSubscription
은 컬렉션처럼 Subscription
을 추가하거나, 제거할 수 있고 unsubscribe
하면 나머지도 모두 취소 된다.
MultipleAssignmentSubscription
은 일종의 inner subscription 을 위한 프록시처럼 동작한다. 세팅하고, 교²´할 수 있지만, 항상 내부에는 동작하는 하나의 Subscription
이 있다.
import rx.lang.scala.subscriptions._
import rx.lang.scala.Subscription
val s = Subscription {
println("bye, bye")
}
s.unsubscribe()
s.unsubscribe() // buggy
이 경우 두번째 unsubscribe()
를 호출했을때 "bye, bye"
가 호출되지 않는다. 먼저 unsubscribe()
를 호출했기 때문이다.
직접 Subscription
을 구현할때는 다수의 스레드에서 저마다 unsubscribe()
를 호출할 수 있기 때문에 이 메소드는 idempotent 하게 구현되야 한다.
CompositeSubscription
을 이미 unsubscribe
했을땐, 새로운 Subscription
을 추가한다 하더라도 자동으로 unsubscribe
가 호출된다.
MultiAssignmentSubscription
의 경우에는 여러번 할당할 수 있으나, 단 하나의 Subscription
만 가리킨다. 따라서 다음 코드를 실행할 경우 b.unsubscribe
만 호출된다.
val a = Subscription { println("A") }
val b = Subscription { println("B") }
val m = MultiAssignmentSubscription()
multi.subscription = a
multi.subscription = b
multi.unsubscribe
CompositeSubscription
과 마찬가지로 이미 unsubscribe
되었다면, 할당되는 Subscription
도 자동으로 unsubscribe
된다.
CompositeSubscription
이나 MultiAssignment
를 연산을 공유하는 컨테이너라 볼 수 있겠는데, 그럼 여기서 내부의 것만 unsubscribe
하면 어떻게 될까? 당연히 외부의 MultiAssignment
나 Composite
는 알 길이 없으니 isUnsubscribe
는 false
가 된다.
자주 보게 될 타입부터 소개하면
object Observable {
def apply[T](s: Observer[T] => Subscription): Oberservable[T]
}
trait Observable[T] {
def subscribe(observer: Observer[T]): Subscription
}
trait Observer[T] {
def onNext(value: T): Unit
def onError(error: Throwable): Unit
def OnCompleted(): Unit
}
trait Subscription {
def unsubscribe(): Unit
}
아무런 알림도 못받는 Observable
을 만드는 never
와, onError
를 호출하는 apply
를 구현해 보자.
def never(): Observable[Nothing] = Observable[Nothing](observer => {
Subscription {}
})
def apply[T](error: Throwable): Observable[T] =
Observable[T](observer => {
observer.onError(error)
Subscription {}
}
이제 이 함수들을 이용해 다양한 함수를 구현해 보자.
object Observable {
def apply[T](s: Observer[T] => Subscription): Oberservable[T]
}
def switchWith(ss: T*): Observable[T] = {
Observer[T](observer => {
for(s <- ss) observer onNext(s)
subscribe(observer)
}
}
object Observable {
def apply[T](s: Observer[T] => Subscription): Oberservable[T]
}
def filter(p: T => Boolean): Observable[T] = {
Observable[T](observer => {
subscribe(
(t: T) => { if (p(t)) observer.onNext(t) },
(e: Throwable) => { observer.onError(e) },
() => { observer.onCompleted() }
)
})
}
object Observable {
def apply[T](s: Observer[T] => Subscription): Oberservable[T]
}
def map[S](f: T => S): Observable[S] = {
Observable[T](observer => {
subscribe(
(t: T) => { if (p(t)) observer.onNext(f(t)) },
(e: Throwable) => { observer.onError(e) },
() => { observer.onCompleted() }
)
})
}
그림을 잘 보면 input stream 으로 부터 값을 얻어 함수를 적용하고 output stream 으로 뱉는다. 구현도 마찬가지로 현재의 컨테이너인 Observable
로 부터 값을 얻었을때 함수를 적용하고 어떻게 넘겨줄지를 정의한다.
duality 관계인 Iterable
의 map
구현을 보면 더 명확히 알 수 있다.
def map[S](f: T => S): Iterable[S] = {
new Iterable[S] {
val it = this.iterator()
def iterator: Iterator[S] = new Iterator[S] {
def hasNext: Boolean = { it.hasNext }
def next(): S = { f(it.next()) }
}
}
}
Future[T]
를 얻어 Observable[T]
로 바꿔보자. T
를 List[T]
로 바꾸듯이. 그럴려면 Subject
를 알아야 하는데, 이건 지난시간에 배운 Promise
비슷한 역할을 한다.
import scala.concurrent.ExecutionContext.Implicits.global
def race[T](left: Future[T], right: Future[T]): Future[T] = {
val p = Promise[T]()
left onComplete { p.tryComplete(_) }
right onComplete { p.tryComplete(_) }
p.future
}
Promise
로 부터 Future
를 얻고, Future.onComplete
에 콜백을 넘기면, 완료되었을때 Promise.complete
에 의해 호출된다. Promise
는 Future
를 위한 대리자? 프록시쯤으로 볼 수 있다.
Observable
과 Subject
도 비슷한 관계다.
코드로 이해해 보자.
val channel = PublishSubject[Int]()
val a = channel.subscribe(x => println("a: " + x))
val b = channel.subscribe(x => println("b: " + x))
channel.onNext(42)
a.unsubscribe()
channel.onNext(4711)
channel.onComplete()
val c = channel.subscribe(x => println("c: " + x))
channel.onNext(13)
Subject
는 일종의 채널이라 보면 된다. 위 코드에서 흥미로운 점은 onComplete
(!
로 표시) 가 호출 된 뒤에 옵저버 c
를 Subject
에 추가했음에도 c
도 onComplete
가 호출된 것을 알고 있다는 사실이다.
val channel = ReplaySubject[Int]()
val a = channel.subscribe(x => println("a: " + x))
val b = channel.subscribe(x => println("b: " + x))
channel.onNext(42)
a.unsubscribe()
channel.onNext(4711)
channel.onComplete()
val c = channel.subscribe(x => println("c: " + x))
channel.onNext(13)
ReplaySubject
의 경우에는 c
에도 모든 데이터를 받는다. 이는 ReplaySubject
가 히스토리를 캐싱하고있기 때문이다.
다양한 종류의 Subject
를 그림으로 보면
object Observable {
def apply[T](f: Future[T]): Observable[T] = {
val as = AsyncSubject[T]()
f onComplete {
case Failure(e) => { as.onError(e) }
case Success(c) => { as.onNext(c); as.onCompleted() }
}
as
}
}
복잡하게 생각하지 말고 그냥 Promise
랑 비슷한 일을 한다고 이해면 쉽다.
지난 시간에 Future
가 Try
를 이용하는걸 봤다. Future[Try[T]
처럼. Notification
도 이와 비슷하다. Observable[Notification[T]]
처럼 사용한다.
abstract class Try[+T]
case class Success[T](elem: T) extends Try[T]
case class Failure(t: Throwable) extends Try[Nothing]
abstract class Notification[+T]
case class OnNext[T](elem: T) extends Notification[T]
case class OnError(t: Throwable) extends Notification[Nothing]
case object onCompleted extends Notification[Nothing]
def materialize: Observable[Notification[T]] = { ... }
차이라면, 종료 를 알려주는 onCompleted
가 있다는 것이다. materialize
는 Observable[T]
를 감싸 Observable[Notification[T]]
로 만든다.
권할만한 방법은 아니지만, 만약에, 만약에 블러킹이 필요하다면 이런식으로 코드를 작성할 수도 있다는 것 지난시간에 배웠다.
val f: Future[String] = { ... }
val text: String = Await.result(f, 10 seconds)
Observable
도 마찬가지다.
val xs: Observable[Long] = Observable.interval(1 seconds).take(5)
val ys: List[Long] = xs.toBlockingObservable.toList
println(ys)
// all Rx operators are non-blocking
val zs: Observable[Long] = xs.sum
val s: Long = zs.toBlockingObservable.single
Observable
내에 있는 값들을 계산하기 위해 reduce
를 사용할 수 있다. fold
와 비슷하달까
def reduce(f: (T, T) => T): Observable[T]
재밌는 사실은 리턴타입이 원소가 Observable
을 돌려주기 때문에 Future
와 비슷다는 것이다.
잘못된 구현을 먼저 보자.
def from[T](seq: Iterable[T]): Observable[T] =
Observable(o => {
seq.foreach(s => o.onNext(s)) // What if seq is infinite?
o.onCompleted // What if seq fails?
Subscription {}
})
이 구현의 문제점은, Iterable
이 무한하거나, 실패하면 어떻게 처리할지 전혀 고려하지 않았다는 것이다. 게다가 빈 Subscription
을 돌려주기 때문에, unsubscribe
할 수도 없다.
이 문제를 풀기 위해서는 scheduler 가 필요하다.
우선 돌아가는 코드를 만들기 전에 테스트 케이스부터 작성하자
// factory method
object Observable {
def apply[T](subscribe: Observer[T] => Subscription): Observable[T]
}
def from[T](seq: Iterable[T]): Observable[T] = { ... }
// infinite seq
def nats(): Iterable[Int] = new Iterable[Int] {
val i = -1
def iterator: Iterator[Int] = new Itertor[Int] {
def hasNext: Boolean = { true }
def next(): Int = { i += 1; i }
}
}
val infinite: Iterable[Int] = nats()
val subscription = from(infinite).subscribe(x => println(x))
subscription.unsubscribe()
만약 from
이 위에서 본 것처럼 구현되어 있다면 subscription.unsubscribe()
에 도달하지 못한다. 따라서 iteration 을 진행하는 것과는 다른 컨텍스트를 도입해 unsubscribe
를 호출해야 한다. 그래서 스케쥴러가 필요하다.
Future
에서는 ExecutionContext
가 있었지만, Observable
은 복수개의 컨텍스트를 조작해야 하므로 스케쥴러를 써야한다.
object Future[
def apply[T](body: => T)
(implicit executor: ExecutionContext): Future[T]
}
trait Observable[T] {
def observeOn(scheduler: Scheduler): Observable[T]
}
// Runnable == Java's Runnable
trait ExecutionContext {
def execute(runnable: Runnable): Unit
}
// '=> Unit' == Runnable
trait Scheduler {
def schedule(work: => Unit) Subsciption
}
// example
val scheduler = Scheduler.newThreadScheduler
val subscription = scheduler.schedule {
println("Hello World!")
}
Future
는 Runnable
을 취소할 수 있는 방법이 없지만, Scheduler
는 Subscription
을 리턴하기 때문에 취소할 수 있다. 그러나 일단 작업이 시작되면 취소할 수 있는 방법은 없다. 아래 예제를 보자
def from[T](seq: Iterable[T])
(implicit s: Scheduler): Observable[T] = {
Observable[T](o => {
s.schedule {
seq.foreach(x => observer.onNext(x))
observer.onCompleted()
}
}
}
onNext
가 호출되기 전, 아주 잠깐동안만 작업을 취소할 수 있는 기회가 있다. 다시 말해서, 이터레이션이 통채로 스케쥴링 되기 때문에 좀 별로라는 것이다. 매 이터레이션마다 취소할 기회가 있는 from
을 구현하고 싶다.
scheduler
의 다른 시그니쳐를 좀 보자.
trait Scheduler {
def schedule(work: => Unit): Subscription
def schedule(work: Scheduler => Subscription): Subscription
def schedule(work: (=> Unit) => Unit): Subscription
}
두번째 시그니쳐를 보자. schedule
함수가 하는 일이 Scheduler
를 받아 등록하고 Subscription
을 돌려주는 일이라면 그것 자체를 work
로 받고, 해당 work
에서 한번씩만 이터레이션 한다면 매 이터레이션에서 취소할 기회를 가질 수 있다.
이건 사실 세번째 시그니쳐와 동일한데 이유는 뒤에서 보겠다.
from
의 새로운 구현을 보면
def from[T](seq: Iterable[T])
(implicit) scheduler: Scheduler): Observable[T] = {
Observable[T](o => {
val it = seq.iterator()
scheduler.schedule(self => {
if (it.hasNext) { o.onNext(it.next()); self() }
else { o.onCompleted() }
}
}
}
으사양반 이게 무슨 개소리요!
조금 난해한데, it.hasNext
가 있어서 다음 이터레이션으로 넘어갈 수 있으면 self()
를 호출해 자기 자신을 스케쥴링다. 따라서 매 이터레이션마다 사용 가능한 Subscription
이 있으므로 취소할 수 있는 기회가 생긴다.
물론 Subscription
이 갱신되는데 어떻게 하나의 레퍼런스로 그게 가능하느냐 하는 질문이 나올 수 있는데, 우리는 이미 MultipleAssignmentSubscription
을 배웠다. schedule
함수의 내부를 보자.
def schedule(work: (=> Unit) => Unit): Subscription = {
val subs = new MultipleAssignmentSubscription()
schedule(scheduler => {
def loop(): Unit = {
subs.Subscription = scheduler.schedule {
work { loop() }
}
}
loop()
subs
})
subs
}
def from[T](seq: Iterable[T])
(implicit) scheduler: Scheduler): Observable[T] = {
Observable[T](o => {
val it = seq.iterator()
scheduler.schedule(self => {
if (it.hasNext) { o.onNext(it.next()); self() }
else { o.onCompleted() }
}
}
}
즉, self
가 바로 loop
다. 자기 자신을 스케쥴링하는 함수인데,
work -> loop -> work -> loop -> ...
을 반복하면서 더 이터레이션할 멤버가 없거나, unsubscribe
하기 전까지 재귀적으로 돈다.
돌려주는 값 없이 행위 그 자체만 보면, 스케쥴러 그 자체는 Observable[Unit]
에 대응된다.
object Observable {
def apply() (implicit s: Scheduler): Observable[Unit] = {
Observable(o => {
s.schedule(self => {
o.onNext(()); self
})
})
}
}
implicit val s = Scheduler.NewThreadScheduler
val ticks: Observable[Unit] = Observable()
이게 실제로 어떻게 동작하나 보면
object Observable {
def apply(s: Observer[T] => Subscription) = new Observable[T] {
def subscribe(o: Observer[T]): Subscription = { Magic(s(o)) }
}
}
val subs = Observable(o => F(o)).subscribe(observer)
// = conceptually
val subs = Magic(F(observer))
여기서 F
나 Magic
는 임의의 함수라 생각하면 된다. (그런게 있나보다 하자.)
이걸 왜 이야기하냐 하면 auto unsubscribe 가 가능하기 때문이다. 스케쥴링 하는 행위를 observable 로 변경할 수 있다면, 스케쥴링이 불가능할때 unsubscribe 하도록 만드는 것이다.
F
가 observer.onCompleted
나 observer.OnError
를 호출한다면, Magic
함수에 의해 자동으로 unsubscribe
가 호출된다. 이로인해 다음에 호출되는 onNext
는 아무런 영향도 미치지 않게 된다.
이럴 수 있는 이유는 Observable
을 생성하는 방식이 Rx Contract 을 만족하기 때문이다. (따라서 직접 Observable, Observer
를 만들지 말고 팩토리 메소드를 사용해야한다)
(onNext)*(onCompleted + onError)?
onNext
는 여러번 호출될 수 있으나 겹치지 않고, onCompleted
나 OnError
는 옵션이지만 (무한한 시퀀스가 존재하기때문) 호출된다면 둘 중 단 한개만, 단 한번 호출되야한다는 것이다. 아까 본 코드를 다시 나열해서 어떻게 그렇게 되나 살펴보자.
object Observable {
def apply() (implicit s: Scheduler): Observable[Unit] = {
Observable(o => {
s.schedule(self => {
o.onNext(()); self
})
})
}
}
def schedule(work: (=> Unit) => Unit): Subscription = {
val subs = new MultipleAssignmentSubscription()
schedule(scheduler => {
def loop(): Unit = {
subs.Subscription = scheduler.schedule {
work { loop() }
}
}
loop()
subs
})
subs
}
implicit val s = Scheduler.NewThreadScheduler
val ticks: Observable[Unit] = Observable()
ticks.subscribe(observer)
여기서 ticks.subscribe(observer)
를 계속 풀면
Observable({ o => scheduler.schedule {
self => o.onNext(()); self()
}}).subscribe(observer)
// unfold create
scheduler.schedule {
self => observer.onNext(()); self()
}
// unfold schedule
val m = new MultipleAssignmentSubscription()
schedule(scheduler => {
def loop(): Unit = {
m.Subscription = scheduler.schedule {
{ self => observer.onNext(()); self() }({ loop() })
}
}
loop()
m
})
// `self` is a continuation
val m = new MultipleAssignmentSubscription()
schedule(scheduler => {
def loop(): Unit = {
m.Subscription = scheduler.schedule {
{ observer.onNext(()); loop() }
}
}
loop()
m
})
// extract loop
val m = new MultipleAssignmentSubscription()
def loop(): Unit = {
m.Subscription = scheduler.schedule {
{ observer.onNext(()); loop() }
}
}
schedule(scheduler => {
loop()
m
})
// apply loop
schedule(scheduler => {
m.Subscription = scheduler.schedule {
{ observer.onNext(()); loop() }
}
m
})
즉 매 스케쥴링마다, subscription 을 갱신하고, 작업을 진행한뒤, 자기 자신을 다시 스케쥴링 한다.
이렇게 응용할 수 있다.
implicit val scheduler: Scheduler = Scheduler.NewThreadScheduler
def range(start, Int, count: Int):
(implicit s: Scheduler) Observable[Int] = {
Observable(o => {
var i = 0
Observable().subscribe(u => {
if (i < count) { o.onNext(start + i); i += 1 }
else { o.onCompleted() }
})
})
}
val xs = range(1, 10)
xs.subscribe(x => println(x))
println("range out")
즉 Observable()
은 일종의 무한히 반복되는 스케쥴러고 여기에 액션을 추가해 원하는 작업을 해낼 수 있다. 그리고 작업이 완료되면 자동으로 unsubscribe 를 수행한다. 이제 무한히 긴 스트림을 Observable
로도 다룰 수 있게 되었다.
(1) Reactive Programming by Martin Ordersky
(2) http://reactivex.io/
(3) https://github.com/iirvine