0001    //
0002    //  Observable+StandardSequenceOperators.swift
0003    //  Rx
0004    //
0005    //  Created by Krunoslav Zaher on 2/17/15.
0006    //  Copyright © 2015 Krunoslav Zaher. All rights reserved.
0007    //
0008    
0009    import Foundation
0010    
0011    // MARK: filter aka where
0012    
0013    extension ObservableType {
0014        
0015        /**
0016        Filters the elements of an observable sequence based on a predicate.
0017    
0018        - seealso: [filter operator on reactivex.io](http://reactivex.io/documentation/operators/filter.html)
0019        
0020        - parameter predicate: A function to test each source element for a condition.
0021        - returns: An observable sequence that contains elements from the input sequence that satisfy the condition.
0022        */
0023        @warn_unused_result(message="http://git.io/rxs.uo")
0024        public func filter
Observable+Time.swift:170
            return filter { _ -> Bool in
(predicate: (E) throws -> Bool) 0025 -> Observable<E> { 0026 return Filter(source: asObservable(), predicate: predicate) 0027 } 0028 } 0029 0030 // MARK: takeWhile 0031 0032 extension ObservableType { 0033 0034 /** 0035 Returns elements from an observable sequence as long as a specified condition is true. 0036 0037 - seealso: [takeWhile operator on reactivex.io](http://reactivex.io/documentation/operators/takewhile.html) 0038 0039 - parameter predicate: A function to test each element for a condition. 0040 - returns: An observable sequence that contains the elements from the input sequence that occur before the element at which the test no longer passes. 0041 */ 0042 @warn_unused_result(message="http://git.io/rxs.uo") 0043 public func takeWhile(predicate: (E) throws -> Bool) 0044 -> Observable<E> { 0045 return TakeWhile(source: asObservable(), predicate: predicate) 0046 } 0047 0048 /** 0049 Returns elements from an observable sequence as long as a specified condition is true. 0050 0051 The element's index is used in the logic of the predicate function. 0052 0053 - seealso: [takeWhile operator on reactivex.io](http://reactivex.io/documentation/operators/takewhile.html) 0054 0055 - parameter predicate: A function to test each element for a condition; the second parameter of the function represents the index of the source element. 0056 - returns: An observable sequence that contains the elements from the input sequence that occur before the element at which the test no longer passes. 0057 */ 0058 @warn_unused_result(message="http://git.io/rxs.uo") 0059 public func takeWhileWithIndex(predicate: (E, Int) throws -> Bool) 0060 -> Observable<E> { 0061 return TakeWhile(source: asObservable(), predicate: predicate) 0062 } 0063 } 0064 0065 // MARK: take 0066 0067 extension ObservableType { 0068 0069 /** 0070 Returns a specified number of contiguous elements from the start of an observable sequence. 0071 0072 - seealso: [take operator on reactivex.io](http://reactivex.io/documentation/operators/take.html) 0073 0074 - parameter count: The number of elements to return. 0075 - returns: An observable sequence that contains the specified number of elements from the start of the input sequence. 0076 */ 0077 @warn_unused_result(message="http://git.io/rxs.uo") 0078 public func take(count: Int) 0079 -> Observable<E> { 0080 if count == 0 { 0081 return Observable.empty() 0082 } 0083 else { 0084 return TakeCount(source: asObservable(), count: count) 0085 } 0086 } 0087 } 0088 0089 // MARK: takeLast 0090 0091 extension ObservableType { 0092 0093 /** 0094 Returns a specified number of contiguous elements from the end of an observable sequence. 0095 0096 This operator accumulates a buffer with a length enough to store elements count elements. Upon completion of the source sequence, this buffer is drained on the result sequence. This causes the elements to be delayed. 0097 0098 - seealso: [takeLast operator on reactivex.io](http://reactivex.io/documentation/operators/takelast.html) 0099 0100 - parameter count: Number of elements to take from the end of the source sequence. 0101 - returns: An observable sequence containing the specified number of elements from the end of the source sequence. 0102 */ 0103 @warn_unused_result(message="http://git.io/rxs.uo") 0104 public func takeLast(count: Int) 0105 -> Observable<E> { 0106 return TakeLast(source: asObservable(), count: count) 0107 } 0108 } 0109 0110 0111 // MARK: skip 0112 0113 extension ObservableType { 0114 0115 /** 0116 Bypasses a specified number of elements in an observable sequence and then returns the remaining elements. 0117 0118 - seealso: [skip operator on reactivex.io](http://reactivex.io/documentation/operators/skip.html) 0119 0120 - parameter count: The number of elements to skip before returning the remaining elements. 0121 - returns: An observable sequence that contains the elements that occur after the specified index in the input sequence. 0122 */ 0123 @warn_unused_result(message="http://git.io/rxs.uo") 0124 public func skip(count: Int) 0125 -> Observable<E> { 0126 return SkipCount(source: asObservable(), count: count) 0127 } 0128 } 0129 0130 // MARK: SkipWhile 0131 0132 extension ObservableType { 0133 0134 /** 0135 Bypasses elements in an observable sequence as long as a specified condition is true and then returns the remaining elements. 0136 0137 - seealso: [skipWhile operator on reactivex.io](http://reactivex.io/documentation/operators/skipwhile.html) 0138 0139 - parameter predicate: A function to test each element for a condition. 0140 - returns: An observable sequence that contains the elements from the input sequence starting at the first element in the linear series that does not pass the test specified by predicate. 0141 */ 0142 @warn_unused_result(message="http://git.io/rxs.uo") 0143 public func skipWhile(predicate: (E) throws -> Bool) -> Observable<E> { 0144 return SkipWhile(source: asObservable(), predicate: predicate) 0145 } 0146 0147 /** 0148 Bypasses elements in an observable sequence as long as a specified condition is true and then returns the remaining elements. 0149 The element's index is used in the logic of the predicate function. 0150 0151 - seealso: [skipWhile operator on reactivex.io](http://reactivex.io/documentation/operators/skipwhile.html) 0152 0153 - parameter predicate: A function to test each element for a condition; the second parameter of the function represents the index of the source element. 0154 - returns: An observable sequence that contains the elements from the input sequence starting at the first element in the linear series that does not pass the test specified by predicate. 0155 */ 0156 @warn_unused_result(message="http://git.io/rxs.uo") 0157 public func skipWhileWithIndex(predicate: (E, Int) throws -> Bool) -> Observable<E> { 0158 return SkipWhile(source: asObservable(), predicate: predicate) 0159 } 0160 } 0161 0162 // MARK: map aka select 0163 0164 extension ObservableType { 0165 0166 /** 0167 Projects each element of an observable sequence into a new form. 0168 0169 - seealso: [map operator on reactivex.io](http://reactivex.io/documentation/operators/map.html) 0170 0171 - parameter selector: A transform function to apply to each source element. 0172 - returns: An observable sequence whose elements are the result of invoking the transform function on each element of source. 0173 0174 */ 0175 @warn_unused_result(message="http://git.io/rxs.uo") 0176 public func map<R>(selector: E throws -> R) 0177 -> Observable<R> { 0178 return self.asObservable().composeMap(selector) 0179 } 0180 0181 /** 0182 Projects each element of an observable sequence into a new form by incorporating the element's index. 0183 0184 - seealso: [map operator on reactivex.io](http://reactivex.io/documentation/operators/map.html) 0185 0186 - parameter selector: A transform function to apply to each source element; the second parameter of the function represents the index of the source element. 0187 - returns: An observable sequence whose elements are the result of invoking the transform function on each element of source. 0188 */ 0189 @warn_unused_result(message="http://git.io/rxs.uo") 0190 public func mapWithIndex<R>(selector: (E, Int) throws -> R) 0191 -> Observable<R> { 0192 return MapWithIndex(source: asObservable(), selector: selector) 0193 } 0194 } 0195 0196 // MARK: flatMap 0197 0198 extension ObservableType { 0199 0200 /** 0201 Projects each element of an observable sequence to an observable sequence and merges the resulting observable sequences into one observable sequence. 0202 0203 - seealso: [flatMap operator on reactivex.io](http://reactivex.io/documentation/operators/flatmap.html) 0204 0205 - parameter selector: A transform function to apply to each element. 0206 - returns: An observable sequence whose elements are the result of invoking the one-to-many transform function on each element of the input sequence. 0207 */ 0208 @warn_unused_result(message="http://git.io/rxs.uo") 0209 public func flatMap<O: ObservableConvertibleType>(selector: (E) throws -> O) 0210 -> Observable<O.E> { 0211 return FlatMap(source: asObservable(), selector: selector) 0212 } 0213 0214 /** 0215 Projects each element of an observable sequence to an observable sequence by incorporating the element's index and merges the resulting observable sequences into one observable sequence. 0216 0217 - seealso: [flatMap operator on reactivex.io](http://reactivex.io/documentation/operators/flatmap.html) 0218 0219 - parameter selector: A transform function to apply to each element; the second parameter of the function represents the index of the source element. 0220 - returns: An observable sequence whose elements are the result of invoking the one-to-many transform function on each element of the input sequence. 0221 */ 0222 @warn_unused_result(message="http://git.io/rxs.uo") 0223 public func flatMapWithIndex<O: ObservableConvertibleType>(selector: (E, Int) throws -> O) 0224 -> Observable<O.E> { 0225 return FlatMapWithIndex(source: asObservable(), selector: selector) 0226 } 0227 } 0228 0229 // MARK: flatMapFirst 0230 0231 extension ObservableType { 0232 0233 /** 0234 Projects each element of an observable sequence to an observable sequence and merges the resulting observable sequences into one observable sequence. 0235 If element is received while there is some projected observable sequence being merged it will simply be ignored. 0236 0237 - seealso: [flatMapFirst operator on reactivex.io](http://reactivex.io/documentation/operators/flatmap.html) 0238 0239 - parameter selector: A transform function to apply to element that was observed while no observable is executing in parallel. 0240 - returns: An observable sequence whose elements are the result of invoking the one-to-many transform function on each element of the input sequence that was received while no other sequence was being calculated. 0241 */ 0242 @warn_unused_result(message="http://git.io/rxs.uo") 0243 public func flatMapFirst<O: ObservableConvertibleType>(selector: (E) throws -> O) 0244 -> Observable<O.E> { 0245 return FlatMapFirst(source: asObservable(), selector: selector) 0246 } 0247 } 0248 0249 // MARK: flatMapLatest 0250 0251 extension ObservableType { 0252 /** 0253 Projects each element of an observable sequence into a new sequence of observable sequences and then 0254 transforms an observable sequence of observable sequences into an observable sequence producing values only from the most recent observable sequence. 0255 0256 It is a combination of `map` + `switchLatest` operator 0257 0258 - seealso: [flatMapLatest operator on reactivex.io](http://reactivex.io/documentation/operators/flatmap.html) 0259 0260 - parameter selector: A transform function to apply to each element. 0261 - returns: An observable sequence whose elements are the result of invoking the transform function on each element of source producing an 0262 Observable of Observable sequences and that at any point in time produces the elements of the most recent inner observable sequence that has been received. 0263 */ 0264 @warn_unused_result(message="http://git.io/rxs.uo") 0265 public func flatMapLatest<O: ObservableConvertibleType>(selector: (E) throws -> O) 0266 -> Observable<O.E> { 0267 return FlatMapLatest(source: asObservable(), selector: selector) 0268 } 0269 } 0270 0271 // MARK: elementAt 0272 0273 extension ObservableType { 0274 0275 /** 0276 Returns a sequence emitting only item _n_ emitted by an Observable 0277 0278 - seealso: [elementAt operator on reactivex.io](http://reactivex.io/documentation/operators/elementat.html) 0279 0280 - parameter index: The index of the required item (starting from 0). 0281 - returns: An observable sequence that emits the desired item as its own sole emission. 0282 */ 0283 @warn_unused_result(message="http://git.io/rxs.uo") 0284 public func elementAt(index: Int) 0285 -> Observable<E> { 0286 return ElementAt(source: asObservable(), index: index, throwOnEmpty: true) 0287 } 0288 } 0289 0290 // MARK: single 0291 0292 extension ObservableType { 0293 0294 /** 0295 The single operator is similar to first, but throws a `RxError.NoElements` or `RxError.MoreThanOneElement` 0296 if the source Observable does not emit exactly one item before successfully completing. 0297 0298 - seealso: [single operator on reactivex.io](http://reactivex.io/documentation/operators/first.html) 0299 0300 - returns: An observable sequence that emits a single item or throws an exception if more (or none) of them are emitted. 0301 */ 0302 @warn_unused_result(message="http://git.io/rxs.uo") 0303 public func single() 0304 -> Observable<E> { 0305 return SingleAsync(source: asObservable()) 0306 } 0307 0308 /** 0309 The single operator is similar to first, but throws a `RxError.NoElements` or `RxError.MoreThanOneElement` 0310 if the source Observable does not emit exactly one item before successfully completing. 0311 0312 - seealso: [single operator on reactivex.io](http://reactivex.io/documentation/operators/first.html) 0313 0314 - parameter predicate: A function to test each source element for a condition. 0315 - returns: An observable sequence that emits a single item or throws an exception if more (or none) of them are emitted. 0316 */ 0317 @warn_unused_result(message="http://git.io/rxs.uo") 0318 public func single(predicate: (E) throws -> Bool) 0319 -> Observable<E> { 0320 return SingleAsync(source: asObservable(), predicate: predicate) 0321 } 0322 0323 }