0001    //
0002    //  ObserveOnSerialDispatchQueue.swift
0003    //  RxSwift
0004    //
0005    //  Created by Krunoslav Zaher on 5/31/15.
0006    //  Copyright © 2015 Krunoslav Zaher. All rights reserved.
0007    //
0008    
0009    import Foundation
0010    
0011    #if TRACE_RESOURCES
0012    /**
0013    Counts number of `SerialDispatchQueueObservables`.
0014    
0015    Purposed for unit tests.
0016    */
0017    public var numberOfSerialDispatchQueueObservables: AtomicInt = 0
0018    #endif
0019    
0020    class ObserveOnSerialDispatchQueueSink
ObserveOnSerialDispatchQueue.swift:26
    var cachedScheduleLambda: ((ObserveOnSerialDispatchQueueSink<O>, Event<E>) -> Disposable)!
ObserveOnSerialDispatchQueue.swift:70
        let sink = ObserveOnSerialDispatchQueueSink(scheduler: scheduler, observer: observer)
<O
ObserveOnSerialDispatchQueue.swift:20
class ObserveOnSerialDispatchQueueSink<O: ObserverType> : ObserverBase<O.E> {
ObserveOnSerialDispatchQueue.swift:22
    let observer: O
ObserveOnSerialDispatchQueue.swift:26
    var cachedScheduleLambda: ((ObserveOnSerialDispatchQueueSink<O>, Event<E>) -> Disposable)!
ObserveOnSerialDispatchQueue.swift:28
    init(scheduler: SerialDispatchQueueScheduler, observer: O) {
: ObserverType> : ObserverBase<O.E> { 0021 let scheduler
ObserveOnSerialDispatchQueue.swift:29
        self.scheduler = scheduler
ObserveOnSerialDispatchQueue.swift:45
        self.scheduler.schedule((self, event), action: cachedScheduleLambda)
: SerialDispatchQueueScheduler 0022 let observer
ObserveOnSerialDispatchQueue.swift:30
        self.observer = observer
ObserveOnSerialDispatchQueue.swift:34
            sink.observer.on(event)
: O 0023 0024 let subscription
ObserveOnSerialDispatchQueue.swift:51
        subscription.dispose()
ObserveOnSerialDispatchQueue.swift:71
        sink.subscription.disposable = source.subscribe(sink)
= SingleAssignmentDisposable() 0025 0026 var cachedScheduleLambda
ObserveOnSerialDispatchQueue.swift:33
        cachedScheduleLambda = { sink, event in
ObserveOnSerialDispatchQueue.swift:45
        self.scheduler.schedule((self, event), action: cachedScheduleLambda)
: ((ObserveOnSerialDispatchQueueSink<O>, Event<E>) -> Disposable)! 0027 0028 init
ObserveOnSerialDispatchQueue.swift:70
        let sink = ObserveOnSerialDispatchQueueSink(scheduler: scheduler, observer: observer)
(scheduler: SerialDispatchQueueScheduler, observer: O) { 0029 self.scheduler = scheduler 0030 self.observer = observer 0031 super.init() 0032 0033 cachedScheduleLambda = { sink, event in 0034 sink.observer.on(event) 0035 0036 if event.isStopEvent { 0037 sink.dispose() 0038 } 0039 0040 return NopDisposable.instance 0041 } 0042 } 0043 0044 override func onCore(event: Event<E>) { 0045 self.scheduler.schedule((self, event), action: cachedScheduleLambda) 0046 } 0047 0048 override func dispose
ObserveOnSerialDispatchQueue.swift:37
                sink.dispose()
() { 0049 super.dispose() 0050 0051 subscription.dispose() 0052 } 0053 } 0054 0055 class ObserveOnSerialDispatchQueue
Observable+Concurrency.swift:30
            return ObserveOnSerialDispatchQueue(source: self.asObservable(), scheduler: scheduler)
<E
ObserveOnSerialDispatchQueue.swift:55
class ObserveOnSerialDispatchQueue<E> : Producer<E> {
> : Producer<E> { 0056 let scheduler
ObserveOnSerialDispatchQueue.swift:60
        self.scheduler = scheduler
ObserveOnSerialDispatchQueue.swift:70
        let sink = ObserveOnSerialDispatchQueueSink(scheduler: scheduler, observer: observer)
: SerialDispatchQueueScheduler 0057 let source
ObserveOnSerialDispatchQueue.swift:61
        self.source = source
ObserveOnSerialDispatchQueue.swift:71
        sink.subscription.disposable = source.subscribe(sink)
: Observable<E> 0058 0059 init
Observable+Concurrency.swift:30
            return ObserveOnSerialDispatchQueue(source: self.asObservable(), scheduler: scheduler)
(source: Observable<E>, scheduler: SerialDispatchQueueScheduler) { 0060 self.scheduler = scheduler 0061 self.source = source 0062 0063 #if TRACE_RESOURCES 0064 AtomicIncrement(&resourceCount) 0065 AtomicIncrement(&numberOfSerialDispatchQueueObservables) 0066 #endif 0067 } 0068 0069 override func run<O : ObserverType where O.E == E>(observer: O) -> Disposable { 0070 let sink = ObserveOnSerialDispatchQueueSink(scheduler: scheduler, observer: observer) 0071 sink.subscription.disposable = source.subscribe(sink) 0072 return sink 0073 } 0074 0075 #if TRACE_RESOURCES 0076 deinit { 0077 AtomicDecrement(&resourceCount) 0078 AtomicDecrement(&numberOfSerialDispatchQueueObservables) 0079 } 0080 #endif 0081 }