0001    //
0002    //  SerialDispatchQueueScheduler.swift
0003    //  Rx
0004    //
0005    //  Created by Krunoslav Zaher on 2/8/15.
0006    //  Copyright © 2015 Krunoslav Zaher. All rights reserved.
0007    //
0008    
0009    import Foundation
0010    
0011    /**
0012    Abstracts the work that needs to be performed on a specific `dispatch_queue_t`. It will make sure 
0013    that even if concurrent dispatch queue is passed, it's transformed into a serial one.
0014    
0015    It is extemely important that this scheduler is serial, because
0016    certain operator perform optimizations that rely on that property.
0017    
0018    Because there is no way of detecting is passed dispatch queue serial or
0019    concurrent, for every queue that is being passed, worst case (concurrent)
0020    will be assumed, and internal serial proxy dispatch queue will be created.
0021    
0022    This scheduler can also be used with internal serial queue alone.
0023    
0024    In case some customization need to be made on it before usage,
0025    internal serial queue can be customized using `serialQueueConfiguration`
0026    callback.
0027    */
0028    public class SerialDispatchQueueScheduler
MainScheduler.swift:21
public final class MainScheduler : SerialDispatchQueueScheduler {
Observable+Concurrency.swift:29
        if let scheduler = scheduler as? SerialDispatchQueueScheduler {
ObserveOnSerialDispatchQueue.swift:21
    let scheduler: SerialDispatchQueueScheduler
ObserveOnSerialDispatchQueue.swift:28
    init(scheduler: SerialDispatchQueueScheduler, observer: O) {
ObserveOnSerialDispatchQueue.swift:56
    let scheduler: SerialDispatchQueueScheduler
ObserveOnSerialDispatchQueue.swift:59
    init(source: Observable<E>, scheduler: SerialDispatchQueueScheduler) {
: SchedulerType { 0029 public typealias TimeInterval
SerialDispatchQueueScheduler.swift:162
    public func schedulePeriodic<StateType>(state: StateType, startAfter: TimeInterval, period: TimeInterval, action: (StateType) -> StateType) -> Disposable {
SerialDispatchQueueScheduler.swift:162
    public func schedulePeriodic<StateType>(state: StateType, startAfter: TimeInterval, period: TimeInterval, action: (StateType) -> StateType) -> Disposable {
= NSTimeInterval 0030 public typealias Time = NSDate 0031 0032 private let _serialQueue
SerialDispatchQueueScheduler.swift:47
        _serialQueue = serialQueue
SerialDispatchQueueScheduler.swift:110
        dispatch_async(_serialQueue) {
SerialDispatchQueueScheduler.swift:131
        let timer = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, 0, _serialQueue)
SerialDispatchQueueScheduler.swift:163
        let timer = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, 0, _serialQueue)
: dispatch_queue_t 0033 0034 /** 0035 - returns: Current time. 0036 */ 0037 public var now
ConcurrentMainScheduler.swift:29
            return _mainScheduler.now
: NSDate { 0038 get { 0039 return NSDate() 0040 } 0041 } 0042 0043 // leeway for scheduling timers 0044 private var _leeway: Int64 = 0 0045 0046 init
MainScheduler.swift:29
        super.init(serialQueue: _mainQueue)
SerialDispatchQueueScheduler.swift:61
        self.init(serialQueue: queue)
SerialDispatchQueueScheduler.swift:73
        self.init(serialQueue: serialQueue)
(serialQueue: dispatch_queue_t) { 0047 _serialQueue = serialQueue 0048 } 0049 0050 /** 0051 Constructs new `SerialDispatchQueueScheduler` with internal serial queue named `internalSerialQueueName`. 0052 0053 Additional dispatch queue properties can be set after dispatch queue is created using `serialQueueConfiguration`. 0054 0055 - parameter internalSerialQueueName: Name of internal serial dispatch queue. 0056 - parameter serialQueueConfiguration: Additional configuration of internal serial dispatch queue. 0057 */ 0058 public convenience init(internalSerialQueueName: String, serialQueueConfiguration: ((dispatch_queue_t) -> Void)? = nil) { 0059 let queue = dispatch_queue_create(internalSerialQueueName, DISPATCH_QUEUE_SERIAL) 0060 serialQueueConfiguration?(queue) 0061 self.init(serialQueue: queue) 0062 } 0063 0064 /** 0065 Constructs new `SerialDispatchQueueScheduler` named `internalSerialQueueName` that wraps `queue`. 0066 0067 - parameter queue: Possibly concurrent dispatch queue used to perform work. 0068 - parameter internalSerialQueueName: Name of internal serial dispatch queue proxy. 0069 */ 0070 public convenience init
SerialDispatchQueueScheduler.swift:85
        self.init(queue: dispatch_get_global_queue(priority, UInt(0)), internalSerialQueueName: internalSerialQueueName)
(queue: dispatch_queue_t, internalSerialQueueName: String) { 0071 let serialQueue = dispatch_queue_create(internalSerialQueueName, DISPATCH_QUEUE_SERIAL) 0072 dispatch_set_target_queue(serialQueue, queue) 0073 self.init(serialQueue: serialQueue) 0074 } 0075 0076 /** 0077 Constructs new `SerialDispatchQueueScheduler` that wraps on of the global concurrent dispatch queues. 0078 0079 - parameter globalConcurrentQueueQOS: Identifier for global dispatch queue with specified quality of service class. 0080 - parameter internalSerialQueueName: Custom name for internal serial dispatch queue proxy. 0081 */ 0082 @available(iOS 8, OSX 10.10, *) 0083 public convenience init(globalConcurrentQueueQOS: DispatchQueueSchedulerQOS, internalSerialQueueName: String = "rx.global_dispatch_queue.serial") { 0084 let priority = globalConcurrentQueueQOS.QOSClass 0085 self.init(queue: dispatch_get_global_queue(priority, UInt(0)), internalSerialQueueName: internalSerialQueueName) 0086 } 0087 0088 class func convertTimeIntervalToDispatchInterval
ConcurrentDispatchQueueScheduler.swift:129
        let dispatchInterval = MainScheduler.convertTimeIntervalToDispatchInterval(period)
SerialDispatchQueueScheduler.swift:93
        return dispatch_time(DISPATCH_TIME_NOW, convertTimeIntervalToDispatchInterval(timeInterval))
SerialDispatchQueueScheduler.swift:166
        let dispatchInterval = MainScheduler.convertTimeIntervalToDispatchInterval(period)
(timeInterval: NSTimeInterval) -> Int64 { 0089 return Int64(timeInterval * Double(NSEC_PER_SEC)) 0090 } 0091 0092 class func convertTimeIntervalToDispatchTime
ConcurrentDispatchQueueScheduler.swift:96
        let dispatchInterval = MainScheduler.convertTimeIntervalToDispatchTime(dueTime)
ConcurrentDispatchQueueScheduler.swift:128
        let initial = MainScheduler.convertTimeIntervalToDispatchTime(startAfter)
SerialDispatchQueueScheduler.swift:133
        let dispatchInterval = MainScheduler.convertTimeIntervalToDispatchTime(dueTime)
SerialDispatchQueueScheduler.swift:165
        let initial = MainScheduler.convertTimeIntervalToDispatchTime(startAfter)
(timeInterval: NSTimeInterval) -> dispatch_time_t { 0093 return dispatch_time(DISPATCH_TIME_NOW, convertTimeIntervalToDispatchInterval(timeInterval)) 0094 } 0095 0096 /** 0097 Schedules an action to be executed immediatelly. 0098 0099 - parameter state: State passed to the action to be executed. 0100 - parameter action: Action to be executed. 0101 - returns: The disposable object used to cancel the scheduled action (best effort). 0102 */ 0103 public final func schedule
ObserveOnSerialDispatchQueue.swift:45
        self.scheduler.schedule((self, event), action: cachedScheduleLambda)
<StateType>(state: StateType, action: (StateType) -> Disposable) -> Disposable { 0104 return self.scheduleInternal(state, action: action) 0105 } 0106 0107 func scheduleInternal
SerialDispatchQueueScheduler.swift:104
        return self.scheduleInternal(state, action: action)
<StateType>(state: StateType, action: (StateType) -> Disposable) -> Disposable { 0108 let cancel = SingleAssignmentDisposable() 0109 0110 dispatch_async(_serialQueue) { 0111 if cancel.disposed { 0112 return 0113 } 0114 0115 0116 cancel.disposable = action(state) 0117 } 0118 0119 return cancel 0120 } 0121 0122 /** 0123 Schedules an action to be executed. 0124 0125 - parameter state: State passed to the action to be executed. 0126 - parameter dueTime: Relative time after which to execute the action. 0127 - parameter action: Action to be executed. 0128 - returns: The disposable object used to cancel the scheduled action (best effort). 0129 */ 0130 public final func scheduleRelative
ConcurrentMainScheduler.swift:77
        return _mainScheduler.scheduleRelative(state, dueTime: dueTime, action: action)
<StateType>(state: StateType, dueTime: NSTimeInterval, action: (StateType) -> Disposable) -> Disposable { 0131 let timer = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, 0, _serialQueue) 0132 0133 let dispatchInterval = MainScheduler.convertTimeIntervalToDispatchTime(dueTime) 0134 0135 let compositeDisposable = CompositeDisposable() 0136 0137 dispatch_source_set_timer(timer, dispatchInterval, DISPATCH_TIME_FOREVER, 0) 0138 dispatch_source_set_event_handler(timer, { 0139 if compositeDisposable.disposed { 0140 return 0141 } 0142 compositeDisposable.addDisposable(action(state)) 0143 }) 0144 dispatch_resume(timer) 0145 0146 compositeDisposable.addDisposable(AnonymousDisposable { 0147 dispatch_source_cancel(timer) 0148 }) 0149 0150 return compositeDisposable 0151 } 0152 0153 /** 0154 Schedules a periodic piece of work. 0155 0156 - parameter state: State passed to the action to be executed. 0157 - parameter startAfter: Period after which initial work should be run. 0158 - parameter period: Period for running the work periodically. 0159 - parameter action: Action to be executed. 0160 - returns: The disposable object used to cancel the scheduled action (best effort). 0161 */ 0162 public func schedulePeriodic
ConcurrentMainScheduler.swift:90
        return _mainScheduler.schedulePeriodic(state, startAfter: startAfter, period: period, action: action)
<StateType>(state: StateType, startAfter: TimeInterval, period: TimeInterval, action: (StateType) -> StateType) -> Disposable { 0163 let timer = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, 0, _serialQueue) 0164 0165 let initial = MainScheduler.convertTimeIntervalToDispatchTime(startAfter) 0166 let dispatchInterval = MainScheduler.convertTimeIntervalToDispatchInterval(period) 0167 0168 var timerState = state 0169 0170 let validDispatchInterval = dispatchInterval < 0 ? 0 : UInt64(dispatchInterval) 0171 0172 dispatch_source_set_timer(timer, initial, validDispatchInterval, 0) 0173 let cancel = AnonymousDisposable { 0174 dispatch_source_cancel(timer) 0175 } 0176 dispatch_source_set_event_handler(timer, { 0177 if cancel.disposed { 0178 return 0179 } 0180 timerState = action(timerState) 0181 }) 0182 dispatch_resume(timer) 0183 0184 return cancel 0185 } 0186 } 0187