0001 // 0002 // ObserveOn.swift 0003 // RxSwift 0004 // 0005 // Created by Krunoslav Zaher on 7/25/15. 0006 // Copyright © 2015 Krunoslav Zaher. All rights reserved. 0007 // 0008 0009 import Foundation 0010 0011 class ObserveOn<E
Observable+Concurrency.swift:33 return ObserveOn(source: self.asObservable(), scheduler: scheduler)> : Producer<E> { 0012 let scheduler
ObserveOn.swift:11 class ObserveOn<E> : Producer<E> {: ImmediateSchedulerType 0013 let source
ObserveOn.swift:16 self.scheduler = schedulerObserveOn.swift:25 let sink = ObserveOnSink(scheduler: scheduler, observer: observer): Observable<E> 0014 0015 init
ObserveOn.swift:17 self.source = sourceObserveOn.swift:26 sink._subscription.disposable = source.subscribe(sink)(source: Observable<E>, scheduler: ImmediateSchedulerType) { 0016 self.scheduler = scheduler 0017 self.source = source 0018 0019 #if TRACE_RESOURCES 0020 AtomicIncrement(&resourceCount) 0021 #endif 0022 } 0023 0024 override func run<O : ObserverType where O.E == E>(observer: O) -> Disposable { 0025 let sink = ObserveOnSink(scheduler: scheduler, observer: observer) 0026 sink._subscription.disposable = source.subscribe(sink) 0027 return sink 0028 } 0029 0030 #if TRACE_RESOURCES 0031 deinit { 0032 AtomicDecrement(&resourceCount) 0033 } 0034 #endif 0035 } 0036 0037 enum ObserveOnState
Observable+Concurrency.swift:33 return ObserveOn(source: self.asObservable(), scheduler: scheduler): Int32 { 0038 // pump is not running 0039 case Stopped
ObserveOn.swift:52 var _state = ObserveOnState.Stopped= 0 0040 // pump is running 0041 case Running
ObserveOn.swift:52 var _state = ObserveOnState.StoppedObserveOn.swift:69 case .Stopped:ObserveOn.swift:88 self._state = .StoppedObserveOn.swift:116 self._state = .Stopped= 1 0042 } 0043 0044 class ObserveOnSink
ObserveOn.swift:70 self._state = .RunningObserveOn.swift:72 case .Running:<O
ObserveOn.swift:25 let sink = ObserveOnSink(scheduler: scheduler, observer: observer): ObserverType> : ObserverBase<O.E> { 0045 typealias E
ObserveOn.swift:44 class ObserveOnSink<O: ObserverType> : ObserverBase<O.E> {ObserveOn.swift:45 typealias E = O.EObserveOn.swift:53 var _observer: O?ObserveOn.swift:59 init(scheduler: ImmediateSchedulerType, observer: O) {ObserveOn.swift:83 let (nextEvent, observer) = self._lock.calculateLocked { () -> (Event<E>?, O?) in= O.E 0046 0047 let _scheduler
ObserveOn.swift:54 var _queue = Queue<Event<E>>(capacity: 10)ObserveOn.swift:64 override func onCore(event: Event<E>) {ObserveOn.swift:83 let (nextEvent, observer) = self._lock.calculateLocked { () -> (Event<E>?, O?) in: ImmediateSchedulerType 0048 0049 var _lock
ObserveOn.swift:60 _scheduler = schedulerObserveOn.swift:78 _scheduleDisposable.disposable = self._scheduler.scheduleRecursive((), action: self.run)= SpinLock() 0050 0051 // state 0052 var _state
ObserveOn.swift:65 let shouldStart = _lock.calculateLocked { () -> Bool inObserveOn.swift:83 let (nextEvent, observer) = self._lock.calculateLocked { () -> (Event<E>?, O?) inObserveOn.swift:111 _lock.lock(); defer { _lock.unlock() } // {ObserveOn.swift:111 _lock.lock(); defer { _lock.unlock() } // {ObserveOn.swift:128 _lock.lock(); defer { _lock.unlock() } // {ObserveOn.swift:128 _lock.lock(); defer { _lock.unlock() } // {= ObserveOnState.Stopped 0053 var _observer
ObserveOn.swift:68 switch self._state {ObserveOn.swift:70 self._state = .RunningObserveOn.swift:88 self._state = .StoppedObserveOn.swift:116 self._state = .Stopped: O? 0054 var _queue
ObserveOn.swift:61 _observer = observerObserveOn.swift:85 return (self._queue.dequeue(), self._observer)ObserveOn.swift:89 return (nil, self._observer)ObserveOn.swift:129 _observer = nil= Queue<Event<E>>(capacity: 10) 0055 0056 let _scheduleDisposable
ObserveOn.swift:66 self._queue.enqueue(event)ObserveOn.swift:84 if self._queue.count > 0 {ObserveOn.swift:85 return (self._queue.dequeue(), self._observer)ObserveOn.swift:112 if self._queue.count > 0 {= SerialDisposable() 0057 let _subscription
ObserveOn.swift:78 _scheduleDisposable.disposable = self._scheduler.scheduleRecursive((), action: self.run)ObserveOn.swift:126 _scheduleDisposable.dispose()= SingleAssignmentDisposable() 0058 0059 init
ObserveOn.swift:26 sink._subscription.disposable = source.subscribe(sink)ObserveOn.swift:125 _subscription.dispose()(scheduler: ImmediateSchedulerType, observer: O) { 0060 _scheduler = scheduler 0061 _observer = observer 0062 } 0063 0064 override func onCore(event: Event<E>) { 0065 let shouldStart = _lock.calculateLocked { () -> Bool in 0066 self._queue.enqueue(event) 0067 0068 switch self._state { 0069 case .Stopped: 0070 self._state = .Running 0071 return true 0072 case .Running: 0073 return false 0074 } 0075 } 0076 0077 if shouldStart { 0078 _scheduleDisposable.disposable = self._scheduler.scheduleRecursive((), action: self.run) 0079 } 0080 } 0081 0082 func run
ObserveOn.swift:25 let sink = ObserveOnSink(scheduler: scheduler, observer: observer)(state: Void, recurse: Void -> Void) { 0083 let (nextEvent, observer) = self._lock.calculateLocked { () -> (Event<E>?, O?) in 0084 if self._queue.count > 0 { 0085 return (self._queue.dequeue(), self._observer) 0086 } 0087 else { 0088 self._state = .Stopped 0089 return (nil, self._observer) 0090 } 0091 } 0092 0093 if let nextEvent = nextEvent { 0094 observer?.on(nextEvent) 0095 if nextEvent.isStopEvent { 0096 dispose() 0097 } 0098 } 0099 else { 0100 return 0101 } 0102 0103 let shouldContinue = _shouldContinue_synchronized() 0104 0105 if shouldContinue { 0106 recurse() 0107 } 0108 } 0109 0110 func _shouldContinue_synchronized
ObserveOn.swift:78 _scheduleDisposable.disposable = self._scheduler.scheduleRecursive((), action: self.run)() -> Bool { 0111 _lock.lock(); defer { _lock.unlock() } // { 0112 if self._queue.count > 0 { 0113 return true 0114 } 0115 else { 0116 self._state = .Stopped 0117 return false 0118 } 0119 // } 0120 } 0121 0122 override func dispose
ObserveOn.swift:103 let shouldContinue = _shouldContinue_synchronized()() { 0123 super.dispose() 0124 0125 _subscription.dispose() 0126 _scheduleDisposable.dispose() 0127 0128 _lock.lock(); defer { _lock.unlock() } // { 0129 _observer = nil 0130 0131 // } 0132 } 0133 }
ObserveOn.swift:96 dispose()