0001 // 0002 // Observable+Time.swift 0003 // Rx 0004 // 0005 // Created by Krunoslav Zaher on 3/22/15. 0006 // Copyright © 2015 Krunoslav Zaher. All rights reserved. 0007 // 0008 0009 import Foundation 0010 0011 // MARK: throttle 0012 extension ObservableType { 0013 0014 /** 0015 Ignores elements from an observable sequence which are followed by another element within a specified relative time duration, using the specified scheduler to run throttling timers. 0016 0017 `throttle` and `debounce` are synonyms. 0018 0019 - seealso: [debounce operator on reactivex.io](http://reactivex.io/documentation/operators/debounce.html) 0020 0021 - parameter dueTime: Throttling duration for each element. 0022 - parameter scheduler: Scheduler to run the throttle timers and send events on. 0023 - returns: The throttled sequence. 0024 */ 0025 @warn_unused_result(message="http://git.io/rxs.uo") 0026 public func throttle(dueTime: RxTimeInterval, scheduler: SchedulerType) 0027 -> Observable<E> { 0028 return Throttle(source: self.asObservable(), dueTime: dueTime, scheduler: scheduler) 0029 } 0030 0031 /** 0032 Ignores elements from an observable sequence which are followed by another element within a specified relative time duration, using the specified scheduler to run throttling timers. 0033 0034 `throttle` and `debounce` are synonyms. 0035 0036 - seealso: [debounce operator on reactivex.io](http://reactivex.io/documentation/operators/debounce.html) 0037 0038 - parameter dueTime: Throttling duration for each element. 0039 - parameter scheduler: Scheduler to run the throttle timers and send events on. 0040 - returns: The throttled sequence. 0041 */ 0042 @warn_unused_result(message="http://git.io/rxs.uo") 0043 public func debounce(dueTime: RxTimeInterval, scheduler: SchedulerType) 0044 -> Observable<E> { 0045 return Throttle(source: self.asObservable(), dueTime: dueTime, scheduler: scheduler) 0046 } 0047 } 0048 0049 // MARK: sample 0050 0051 extension ObservableType { 0052 0053 /** 0054 Samples the source observable sequence using a samper observable sequence producing sampling ticks. 0055 0056 Upon each sampling tick, the latest element (if any) in the source sequence during the last sampling interval is sent to the resulting sequence. 0057 0058 **In case there were no new elements between sampler ticks, no element is sent to the resulting sequence.** 0059 0060 - seealso: [sample operator on reactivex.io](http://reactivex.io/documentation/operators/sample.html) 0061 0062 - parameter sampler: Sampling tick sequence. 0063 - returns: Sampled observable sequence. 0064 */ 0065 @warn_unused_result(message="http://git.io/rxs.uo") 0066 public func sample<O: ObservableType>(sampler: O) 0067 -> Observable<E> { 0068 return Sample(source: self.asObservable(), sampler: sampler.asObservable(), onlyNew: true) 0069 } 0070 } 0071 0072 extension Observable where Element : SignedIntegerType { 0073 /** 0074 Returns an observable sequence that produces a value after each period, using the specified scheduler to run timers and to send out observer messages. 0075 0076 - seealso: [interval operator on reactivex.io](http://reactivex.io/documentation/operators/interval.html) 0077 0078 - parameter period: Period for producing the values in the resulting sequence. 0079 - parameter scheduler: Scheduler to run the timer on. 0080 - returns: An observable sequence that produces a value after each period. 0081 */ 0082 @warn_unused_result(message="http://git.io/rxs.uo") 0083 public static func interval(period: RxTimeInterval, scheduler: SchedulerType) 0084 -> Observable<E> { 0085 return Timer(dueTime: period, 0086 period: period, 0087 scheduler: scheduler 0088 ) 0089 } 0090 } 0091 0092 // MARK: timer 0093 0094 extension Observable where Element: SignedIntegerType { 0095 /** 0096 Returns an observable sequence that periodically produces a value after the specified initial relative due time has elapsed, using the specified scheduler to run timers. 0097 0098 - seealso: [timer operator on reactivex.io](http://reactivex.io/documentation/operators/timer.html) 0099 0100 - parameter dueTime: Relative time at which to produce the first value. 0101 - parameter period: Period to produce subsequent values. 0102 - parameter scheduler: Scheduler to run timers on. 0103 - returns: An observable sequence that produces a value after due time has elapsed and then each period. 0104 */ 0105 @warn_unused_result(message="http://git.io/rxs.uo") 0106 public static func timer(dueTime: RxTimeInterval, period: RxTimeInterval? = nil, scheduler: SchedulerType) 0107 -> Observable<E> { 0108 return Timer( 0109 dueTime: dueTime, 0110 period: period, 0111 scheduler: scheduler 0112 ) 0113 } 0114 } 0115 0116 // MARK: take 0117 0118 extension ObservableType { 0119 0120 /** 0121 Takes elements for the specified duration from the start of the observable source sequence, using the specified scheduler to run timers. 0122 0123 - seealso: [take operator on reactivex.io](http://reactivex.io/documentation/operators/take.html) 0124 0125 - parameter duration: Duration for taking elements from the start of the sequence. 0126 - parameter scheduler: Scheduler to run the timer on. 0127 - returns: An observable sequence with the elements taken during the specified duration from the start of the source sequence. 0128 */ 0129 @warn_unused_result(message="http://git.io/rxs.uo") 0130 public func take(duration: RxTimeInterval, scheduler: SchedulerType) 0131 -> Observable<E> { 0132 return TakeTime(source: self.asObservable(), duration: duration, scheduler: scheduler) 0133 } 0134 } 0135 0136 // MARK: skip 0137 0138 extension ObservableType { 0139 0140 /** 0141 Skips elements for the specified duration from the start of the observable source sequence, using the specified scheduler to run timers. 0142 0143 - seealso: [skip operator on reactivex.io](http://reactivex.io/documentation/operators/skip.html) 0144 0145 - parameter duration: Duration for skipping elements from the start of the sequence. 0146 - parameter scheduler: Scheduler to run the timer on. 0147 - returns: An observable sequence with the elements skipped during the specified duration from the start of the source sequence. 0148 */ 0149 @warn_unused_result(message="http://git.io/rxs.uo") 0150 public func skip(duration: RxTimeInterval, scheduler: SchedulerType) 0151 -> Observable<E> { 0152 return SkipTime(source: self.asObservable(), duration: duration, scheduler: scheduler) 0153 } 0154 } 0155 0156 // MARK: ignoreElements 0157 0158 extension ObservableType { 0159 0160 /** 0161 Skips elements and completes (or errors) when the receiver completes (or errors). Equivalent to filter that always returns false. 0162 0163 - seealso: [ignoreElements operator on reactivex.io](http://reactivex.io/documentation/operators/ignoreelements.html) 0164 0165 - returns: An observable sequence that skips all elements of the source sequence. 0166 */ 0167 @warn_unused_result(message="http://git.io/rxs.uo") 0168 public func ignoreElements() 0169 -> Observable<E> { 0170 return filter { _ -> Bool in 0171 return false 0172 } 0173 } 0174 } 0175 0176 // MARK: delaySubscription 0177 0178 extension ObservableType { 0179 0180 /** 0181 Time shifts the observable sequence by delaying the subscription with the specified relative time duration, using the specified scheduler to run timers. 0182 0183 - seealso: [delay operator on reactivex.io](http://reactivex.io/documentation/operators/delay.html) 0184 0185 - parameter dueTime: Relative time shift of the subscription. 0186 - parameter scheduler: Scheduler to run the subscription delay timer on. 0187 - returns: Time-shifted sequence. 0188 */ 0189 @warn_unused_result(message="http://git.io/rxs.uo") 0190 public func delaySubscription(dueTime: RxTimeInterval, scheduler: SchedulerType) 0191 -> Observable<E> { 0192 return DelaySubscription(source: self.asObservable(), dueTime: dueTime, scheduler: scheduler) 0193 } 0194 } 0195 0196 // MARK: buffer 0197 0198 extension ObservableType { 0199 0200 /** 0201 Projects each element of an observable sequence into a buffer that's sent out when either it's full or a given amount of time has elapsed, using the specified scheduler to run timers. 0202 0203 A useful real-world analogy of this overload is the behavior of a ferry leaving the dock when all seats are taken, or at the scheduled time of departure, whichever event occurs first. 0204 0205 - seealso: [buffer operator on reactivex.io](http://reactivex.io/documentation/operators/buffer.html) 0206 0207 - parameter timeSpan: Maximum time length of a buffer. 0208 - parameter count: Maximum element count of a buffer. 0209 - parameter scheduler: Scheduler to run buffering timers on. 0210 - returns: An observable sequence of buffers. 0211 */ 0212 @warn_unused_result(message="http://git.io/rxs.uo") 0213 public func buffer(timeSpan timeSpan: RxTimeInterval, count: Int, scheduler: SchedulerType) 0214 -> Observable<[E]> { 0215 return BufferTimeCount(source: self.asObservable(), timeSpan: timeSpan, count: count, scheduler: scheduler) 0216 } 0217 } 0218 0219 // MARK: window 0220 0221 extension ObservableType { 0222 0223 /** 0224 Projects each element of an observable sequence into a window that is completed when either it’s full or a given amount of time has elapsed. 0225 0226 - seealso: [window operator on reactivex.io](http://reactivex.io/documentation/operators/window.html) 0227 0228 - parameter timeSpan: Maximum time length of a window. 0229 - parameter count: Maximum element count of a window. 0230 - parameter scheduler: Scheduler to run windowing timers on. 0231 - returns: An observable sequence of windows (instances of `Observable`). 0232 */ 0233 @warn_unused_result(message="http://git.io/rxs.uo") 0234 public func window(timeSpan timeSpan: RxTimeInterval, count: Int, scheduler: SchedulerType) 0235 -> Observable<Observable<E>> { 0236 return WindowTimeCount(source: self.asObservable(), timeSpan: timeSpan, count: count, scheduler: scheduler) 0237 } 0238 } 0239 0240 // MARK: timeout 0241 0242 extension ObservableType { 0243 0244 /** 0245 Applies a timeout policy for each element in the observable sequence. If the next element isn't received within the specified timeout duration starting from its predecessor, a TimeoutError is propagated to the observer. 0246 0247 - seealso: [timeout operator on reactivex.io](http://reactivex.io/documentation/operators/timeout.html) 0248 0249 - parameter dueTime: Maximum duration between values before a timeout occurs. 0250 - parameter scheduler: Scheduler to run the timeout timer on. 0251 - returns: An observable sequence with a TimeoutError in case of a timeout. 0252 */ 0253 @warn_unused_result(message="http://git.io/rxs.uo") 0254 public func timeout(dueTime: RxTimeInterval, scheduler: SchedulerType) 0255 -> Observable<E> { 0256 return Timeout(source: self.asObservable(), dueTime: dueTime, other: Observable.error(RxError.Timeout), scheduler: scheduler) 0257 } 0258 0259 /** 0260 Applies a timeout policy for each element in the observable sequence, using the specified scheduler to run timeout timers. If the next element isn't received within the specified timeout duration starting from its predecessor, the other observable sequence is used to produce future messages from that point on. 0261 0262 - seealso: [timeout operator on reactivex.io](http://reactivex.io/documentation/operators/timeout.html) 0263 0264 - parameter dueTime: Maximum duration between values before a timeout occurs. 0265 - parameter other: Sequence to return in case of a timeout. 0266 - parameter scheduler: Scheduler to run the timeout timer on. 0267 - returns: The source sequence switching to the other sequence in case of a timeout. 0268 */ 0269 @warn_unused_result(message="http://git.io/rxs.uo") 0270 public func timeout<O: ObservableConvertibleType where E == O.E>(dueTime: RxTimeInterval, other: O, scheduler: SchedulerType) 0271 -> Observable<E> { 0272 return Timeout(source: self.asObservable(), dueTime: dueTime, other: other.asObservable(), scheduler: scheduler) 0273 } 0274 } 0275