0001    //
0002    //  ObserveOn.swift
0003    //  RxSwift
0004    //
0005    //  Created by Krunoslav Zaher on 7/25/15.
0006    //  Copyright © 2015 Krunoslav Zaher. All rights reserved.
0007    //
0008    
0009    import Foundation
0010    
0011    class ObserveOn
Observable+Concurrency.swift:33
            return ObserveOn(source: self.asObservable(), scheduler: scheduler)
<E
ObserveOn.swift:11
class ObserveOn<E> : Producer<E> {
> : Producer<E> { 0012 let scheduler
ObserveOn.swift:16
        self.scheduler = scheduler
ObserveOn.swift:25
        let sink = ObserveOnSink(scheduler: scheduler, observer: observer)
: ImmediateSchedulerType 0013 let source
ObserveOn.swift:17
        self.source = source
ObserveOn.swift:26
        sink._subscription.disposable = source.subscribe(sink)
: Observable<E> 0014 0015 init
Observable+Concurrency.swift:33
            return ObserveOn(source: self.asObservable(), scheduler: scheduler)
(source: Observable<E>, scheduler: ImmediateSchedulerType) { 0016 self.scheduler = scheduler 0017 self.source = source 0018 0019 #if TRACE_RESOURCES 0020 AtomicIncrement(&resourceCount) 0021 #endif 0022 } 0023 0024 override func run<O : ObserverType where O.E == E>(observer: O) -> Disposable { 0025 let sink = ObserveOnSink(scheduler: scheduler, observer: observer) 0026 sink._subscription.disposable = source.subscribe(sink) 0027 return sink 0028 } 0029 0030 #if TRACE_RESOURCES 0031 deinit { 0032 AtomicDecrement(&resourceCount) 0033 } 0034 #endif 0035 } 0036 0037 enum ObserveOnState
ObserveOn.swift:52
    var _state = ObserveOnState.Stopped
: Int32 { 0038 // pump is not running 0039 case Stopped
ObserveOn.swift:52
    var _state = ObserveOnState.Stopped
ObserveOn.swift:69
            case .Stopped:
ObserveOn.swift:88
                self._state = .Stopped
ObserveOn.swift:116
                self._state = .Stopped
= 0 0040 // pump is running 0041 case Running
ObserveOn.swift:70
                self._state = .Running
ObserveOn.swift:72
            case .Running:
= 1 0042 } 0043 0044 class ObserveOnSink
ObserveOn.swift:25
        let sink = ObserveOnSink(scheduler: scheduler, observer: observer)
<O
ObserveOn.swift:44
class ObserveOnSink<O: ObserverType> : ObserverBase<O.E> {
ObserveOn.swift:45
    typealias E = O.E
ObserveOn.swift:53
    var _observer: O?
ObserveOn.swift:59
    init(scheduler: ImmediateSchedulerType, observer: O) {
ObserveOn.swift:83
        let (nextEvent, observer) = self._lock.calculateLocked { () -> (Event<E>?, O?) in
: ObserverType> : ObserverBase<O.E> { 0045 typealias E
ObserveOn.swift:54
    var _queue = Queue<Event<E>>(capacity: 10)
ObserveOn.swift:64
    override func onCore(event: Event<E>) {
ObserveOn.swift:83
        let (nextEvent, observer) = self._lock.calculateLocked { () -> (Event<E>?, O?) in
= O.E 0046 0047 let _scheduler
ObserveOn.swift:60
        _scheduler = scheduler
ObserveOn.swift:78
            _scheduleDisposable.disposable = self._scheduler.scheduleRecursive((), action: self.run)
: ImmediateSchedulerType 0048 0049 var _lock
ObserveOn.swift:65
        let shouldStart = _lock.calculateLocked { () -> Bool in
ObserveOn.swift:83
        let (nextEvent, observer) = self._lock.calculateLocked { () -> (Event<E>?, O?) in
ObserveOn.swift:111
        _lock.lock(); defer { _lock.unlock() } // {
ObserveOn.swift:111
        _lock.lock(); defer { _lock.unlock() } // {
ObserveOn.swift:128
        _lock.lock(); defer { _lock.unlock() } // {
ObserveOn.swift:128
        _lock.lock(); defer { _lock.unlock() } // {
= SpinLock() 0050 0051 // state 0052 var _state
ObserveOn.swift:68
            switch self._state {
ObserveOn.swift:70
                self._state = .Running
ObserveOn.swift:88
                self._state = .Stopped
ObserveOn.swift:116
                self._state = .Stopped
= ObserveOnState.Stopped 0053 var _observer
ObserveOn.swift:61
        _observer = observer
ObserveOn.swift:85
                return (self._queue.dequeue(), self._observer)
ObserveOn.swift:89
                return (nil, self._observer)
ObserveOn.swift:129
            _observer = nil
: O? 0054 var _queue
ObserveOn.swift:66
            self._queue.enqueue(event)
ObserveOn.swift:84
            if self._queue.count > 0 {
ObserveOn.swift:85
                return (self._queue.dequeue(), self._observer)
ObserveOn.swift:112
            if self._queue.count > 0 {
= Queue<Event<E>>(capacity: 10) 0055 0056 let _scheduleDisposable
ObserveOn.swift:78
            _scheduleDisposable.disposable = self._scheduler.scheduleRecursive((), action: self.run)
ObserveOn.swift:126
        _scheduleDisposable.dispose()
= SerialDisposable() 0057 let _subscription
ObserveOn.swift:26
        sink._subscription.disposable = source.subscribe(sink)
ObserveOn.swift:125
        _subscription.dispose()
= SingleAssignmentDisposable() 0058 0059 init
ObserveOn.swift:25
        let sink = ObserveOnSink(scheduler: scheduler, observer: observer)
(scheduler: ImmediateSchedulerType, observer: O) { 0060 _scheduler = scheduler 0061 _observer = observer 0062 } 0063 0064 override func onCore(event: Event<E>) { 0065 let shouldStart = _lock.calculateLocked { () -> Bool in 0066 self._queue.enqueue(event) 0067 0068 switch self._state { 0069 case .Stopped: 0070 self._state = .Running 0071 return true 0072 case .Running: 0073 return false 0074 } 0075 } 0076 0077 if shouldStart { 0078 _scheduleDisposable.disposable = self._scheduler.scheduleRecursive((), action: self.run) 0079 } 0080 } 0081 0082 func run
ObserveOn.swift:78
            _scheduleDisposable.disposable = self._scheduler.scheduleRecursive((), action: self.run)
(state: Void, recurse: Void -> Void) { 0083 let (nextEvent, observer) = self._lock.calculateLocked { () -> (Event<E>?, O?) in 0084 if self._queue.count > 0 { 0085 return (self._queue.dequeue(), self._observer) 0086 } 0087 else { 0088 self._state = .Stopped 0089 return (nil, self._observer) 0090 } 0091 } 0092 0093 if let nextEvent = nextEvent { 0094 observer?.on(nextEvent) 0095 if nextEvent.isStopEvent { 0096 dispose() 0097 } 0098 } 0099 else { 0100 return 0101 } 0102 0103 let shouldContinue = _shouldContinue_synchronized() 0104 0105 if shouldContinue { 0106 recurse() 0107 } 0108 } 0109 0110 func _shouldContinue_synchronized
ObserveOn.swift:103
        let shouldContinue = _shouldContinue_synchronized()
() -> Bool { 0111 _lock.lock(); defer { _lock.unlock() } // { 0112 if self._queue.count > 0 { 0113 return true 0114 } 0115 else { 0116 self._state = .Stopped 0117 return false 0118 } 0119 // } 0120 } 0121 0122 override func dispose
ObserveOn.swift:96
                dispose()
() { 0123 super.dispose() 0124 0125 _subscription.dispose() 0126 _scheduleDisposable.dispose() 0127 0128 _lock.lock(); defer { _lock.unlock() } // { 0129 _observer = nil 0130 0131 // } 0132 } 0133 }