0001    //
0002    //  Observable+Time.swift
0003    //  Rx
0004    //
0005    //  Created by Krunoslav Zaher on 3/22/15.
0006    //  Copyright © 2015 Krunoslav Zaher. All rights reserved.
0007    //
0008    
0009    import Foundation
0010    
0011    // MARK: throttle
0012    extension ObservableType {
0013        
0014        /**
0015        Ignores elements from an observable sequence which are followed by another element within a specified relative time duration, using the specified scheduler to run throttling timers.
0016     
0017        `throttle` and `debounce` are synonyms.
0018    
0019        - seealso: [debounce operator on reactivex.io](http://reactivex.io/documentation/operators/debounce.html)
0020        
0021        - parameter dueTime: Throttling duration for each element.
0022        - parameter scheduler: Scheduler to run the throttle timers and send events on.
0023        - returns: The throttled sequence.
0024        */
0025        @warn_unused_result(message="http://git.io/rxs.uo")
0026        public func throttle(dueTime: RxTimeInterval, scheduler: SchedulerType)
0027            -> Observable<E> {
0028            return Throttle(source: self.asObservable(), dueTime: dueTime, scheduler: scheduler)
0029        }
0030    
0031        /**
0032        Ignores elements from an observable sequence which are followed by another element within a specified relative time duration, using the specified scheduler to run throttling timers.
0033        
0034        `throttle` and `debounce` are synonyms.
0035    
0036        - seealso: [debounce operator on reactivex.io](http://reactivex.io/documentation/operators/debounce.html)
0037        
0038        - parameter dueTime: Throttling duration for each element.
0039        - parameter scheduler: Scheduler to run the throttle timers and send events on.
0040        - returns: The throttled sequence.
0041        */
0042        @warn_unused_result(message="http://git.io/rxs.uo")
0043        public func debounce(dueTime: RxTimeInterval, scheduler: SchedulerType)
0044            -> Observable<E> {
0045            return Throttle(source: self.asObservable(), dueTime: dueTime, scheduler: scheduler)
0046        }
0047    }
0048    
0049    // MARK: sample
0050    
0051    extension ObservableType {
0052       
0053        /**
0054        Samples the source observable sequence using a samper observable sequence producing sampling ticks.
0055        
0056        Upon each sampling tick, the latest element (if any) in the source sequence during the last sampling interval is sent to the resulting sequence.
0057        
0058        **In case there were no new elements between sampler ticks, no element is sent to the resulting sequence.**
0059    
0060        - seealso: [sample operator on reactivex.io](http://reactivex.io/documentation/operators/sample.html)
0061        
0062        - parameter sampler: Sampling tick sequence.
0063        - returns: Sampled observable sequence.
0064        */
0065        @warn_unused_result(message="http://git.io/rxs.uo")
0066        public func sample<O: ObservableType>(sampler: O)
0067            -> Observable<E> {
0068            return Sample(source: self.asObservable(), sampler: sampler.asObservable(), onlyNew: true)
0069        }
0070    }
0071    
0072    extension Observable where Element : SignedIntegerType {
0073        /**
0074        Returns an observable sequence that produces a value after each period, using the specified scheduler to run timers and to send out observer messages.
0075    
0076        - seealso: [interval operator on reactivex.io](http://reactivex.io/documentation/operators/interval.html)
0077    
0078        - parameter period: Period for producing the values in the resulting sequence.
0079        - parameter scheduler: Scheduler to run the timer on.
0080        - returns: An observable sequence that produces a value after each period.
0081        */
0082        @warn_unused_result(message="http://git.io/rxs.uo")
0083        public static func interval(period: RxTimeInterval, scheduler: SchedulerType)
0084            -> Observable<E> {
0085            return Timer(dueTime: period,
0086                period: period,
0087                scheduler: scheduler
0088            )
0089        }
0090    }
0091    
0092    // MARK: timer
0093    
0094    extension Observable where Element: SignedIntegerType {
0095        /**
0096        Returns an observable sequence that periodically produces a value after the specified initial relative due time has elapsed, using the specified scheduler to run timers.
0097    
0098        - seealso: [timer operator on reactivex.io](http://reactivex.io/documentation/operators/timer.html)
0099    
0100        - parameter dueTime: Relative time at which to produce the first value.
0101        - parameter period: Period to produce subsequent values.
0102        - parameter scheduler: Scheduler to run timers on.
0103        - returns: An observable sequence that produces a value after due time has elapsed and then each period.
0104        */
0105        @warn_unused_result(message="http://git.io/rxs.uo")
0106        public static func timer(dueTime: RxTimeInterval, period: RxTimeInterval? = nil, scheduler: SchedulerType)
0107            -> Observable<E> {
0108            return Timer(
0109                dueTime: dueTime,
0110                period: period,
0111                scheduler: scheduler
0112            )
0113        }
0114    }
0115    
0116    // MARK: take
0117    
0118    extension ObservableType {
0119    
0120        /**
0121        Takes elements for the specified duration from the start of the observable source sequence, using the specified scheduler to run timers.
0122    
0123        - seealso: [take operator on reactivex.io](http://reactivex.io/documentation/operators/take.html)
0124        
0125        - parameter duration: Duration for taking elements from the start of the sequence.
0126        - parameter scheduler: Scheduler to run the timer on.
0127        - returns: An observable sequence with the elements taken during the specified duration from the start of the source sequence.
0128        */
0129        @warn_unused_result(message="http://git.io/rxs.uo")
0130        public func take(duration: RxTimeInterval, scheduler: SchedulerType)
0131            -> Observable<E> {
0132            return TakeTime(source: self.asObservable(), duration: duration, scheduler: scheduler)
0133        }
0134    }
0135    
0136    // MARK: skip
0137    
0138    extension ObservableType {
0139        
0140        /**
0141        Skips elements for the specified duration from the start of the observable source sequence, using the specified scheduler to run timers.
0142    
0143        - seealso: [skip operator on reactivex.io](http://reactivex.io/documentation/operators/skip.html)
0144        
0145        - parameter duration: Duration for skipping elements from the start of the sequence.
0146        - parameter scheduler: Scheduler to run the timer on.
0147        - returns: An observable sequence with the elements skipped during the specified duration from the start of the source sequence.
0148        */
0149        @warn_unused_result(message="http://git.io/rxs.uo")
0150        public func skip(duration: RxTimeInterval, scheduler: SchedulerType)
0151            -> Observable<E> {
0152            return SkipTime(source: self.asObservable(), duration: duration, scheduler: scheduler)
0153        }
0154    }
0155    
0156    // MARK: ignoreElements
0157    
0158    extension ObservableType {
0159    
0160        /**
0161         Skips elements and completes (or errors) when the receiver completes (or errors). Equivalent to filter that always returns false.
0162    
0163         - seealso: [ignoreElements operator on reactivex.io](http://reactivex.io/documentation/operators/ignoreelements.html)
0164    
0165         - returns: An observable sequence that skips all elements of the source sequence.
0166         */
0167        @warn_unused_result(message="http://git.io/rxs.uo")
0168        public func ignoreElements()
0169            -> Observable<E> {
0170                return filter { _ -> Bool in
0171                    return false
0172                }
0173        }
0174    }
0175    
0176    // MARK: delaySubscription
0177    
0178    extension ObservableType {
0179        
0180        /**
0181        Time shifts the observable sequence by delaying the subscription with the specified relative time duration, using the specified scheduler to run timers.
0182    
0183        - seealso: [delay operator on reactivex.io](http://reactivex.io/documentation/operators/delay.html)
0184        
0185        - parameter dueTime: Relative time shift of the subscription.
0186        - parameter scheduler: Scheduler to run the subscription delay timer on.
0187        - returns: Time-shifted sequence.
0188        */
0189        @warn_unused_result(message="http://git.io/rxs.uo")
0190        public func delaySubscription(dueTime: RxTimeInterval, scheduler: SchedulerType)
0191            -> Observable<E> {
0192            return DelaySubscription(source: self.asObservable(), dueTime: dueTime, scheduler: scheduler)
0193        }
0194    }
0195    
0196    // MARK: buffer
0197    
0198    extension ObservableType {
0199    
0200        /**
0201        Projects each element of an observable sequence into a buffer that's sent out when either it's full or a given amount of time has elapsed, using the specified scheduler to run timers.
0202        
0203        A useful real-world analogy of this overload is the behavior of a ferry leaving the dock when all seats are taken, or at the scheduled time of departure, whichever event occurs first.
0204    
0205        - seealso: [buffer operator on reactivex.io](http://reactivex.io/documentation/operators/buffer.html)
0206        
0207        - parameter timeSpan: Maximum time length of a buffer.
0208        - parameter count: Maximum element count of a buffer.
0209        - parameter scheduler: Scheduler to run buffering timers on.
0210        - returns: An observable sequence of buffers.
0211        */
0212        @warn_unused_result(message="http://git.io/rxs.uo")
0213        public func buffer(timeSpan timeSpan: RxTimeInterval, count: Int, scheduler: SchedulerType)
0214            -> Observable<[E]> {
0215            return BufferTimeCount(source: self.asObservable(), timeSpan: timeSpan, count: count, scheduler: scheduler)
0216        }
0217    }
0218    
0219    // MARK: window
0220    
0221    extension ObservableType {
0222        
0223        /**
0224         Projects each element of an observable sequence into a window that is completed when either it’s full or a given amount of time has elapsed.
0225    
0226         - seealso: [window operator on reactivex.io](http://reactivex.io/documentation/operators/window.html)
0227              
0228         - parameter timeSpan: Maximum time length of a window.
0229         - parameter count: Maximum element count of a window.
0230         - parameter scheduler: Scheduler to run windowing timers on.
0231         - returns: An observable sequence of windows (instances of `Observable`).
0232         */
0233        @warn_unused_result(message="http://git.io/rxs.uo")
0234        public func window(timeSpan timeSpan: RxTimeInterval, count: Int, scheduler: SchedulerType)
0235            -> Observable<Observable<E>> {
0236                return WindowTimeCount(source: self.asObservable(), timeSpan: timeSpan, count: count, scheduler: scheduler)
0237        }
0238    }
0239    
0240    // MARK: timeout
0241    
0242    extension ObservableType {
0243        
0244        /**
0245         Applies a timeout policy for each element in the observable sequence. If the next element isn't received within the specified timeout duration starting from its predecessor, a TimeoutError is propagated to the observer.
0246    
0247         - seealso: [timeout operator on reactivex.io](http://reactivex.io/documentation/operators/timeout.html)
0248         
0249         - parameter dueTime: Maximum duration between values before a timeout occurs.
0250         - parameter scheduler: Scheduler to run the timeout timer on.
0251         - returns: An observable sequence with a TimeoutError in case of a timeout.
0252         */
0253        @warn_unused_result(message="http://git.io/rxs.uo")
0254        public func timeout(dueTime: RxTimeInterval, scheduler: SchedulerType)
0255            -> Observable<E> {
0256                return Timeout(source: self.asObservable(), dueTime: dueTime, other: Observable.error(RxError.Timeout), scheduler: scheduler)
0257        }
0258    
0259        /**
0260         Applies a timeout policy for each element in the observable sequence, using the specified scheduler to run timeout timers. If the next element isn't received within the specified timeout duration starting from its predecessor, the other observable sequence is used to produce future messages from that point on.
0261    
0262         - seealso: [timeout operator on reactivex.io](http://reactivex.io/documentation/operators/timeout.html)
0263         
0264         - parameter dueTime: Maximum duration between values before a timeout occurs.
0265         - parameter other: Sequence to return in case of a timeout.
0266         - parameter scheduler: Scheduler to run the timeout timer on.
0267         - returns: The source sequence switching to the other sequence in case of a timeout.
0268         */
0269        @warn_unused_result(message="http://git.io/rxs.uo")
0270        public func timeout<O: ObservableConvertibleType where E == O.E>(dueTime: RxTimeInterval, other: O, scheduler: SchedulerType)
0271            -> Observable<E> {
0272                return Timeout(source: self.asObservable(), dueTime: dueTime, other: other.asObservable(), scheduler: scheduler)
0273        }
0274    }
0275