0001    //
0002    //  BlockingObservable+Operators.swift
0003    //  Rx
0004    //
0005    //  Created by Krunoslav Zaher on 10/19/15.
0006    //  Copyright © 2015 Krunoslav Zaher. All rights reserved.
0007    //
0008    
0009    import Foundation
0010    #if !RX_NO_MODULE
0011        import RxSwift
0012    #endif
0013    
0014    extension BlockingObservable {
0015        /**
0016         Blocks current thread until sequence terminates.
0017    
0018         If sequence terminates with error, terminating error will be thrown.
0019    
0020         - returns: All elements of sequence.
0021         */
0022        public func toArray() throws -> [E] {
0023            var elements: [E] = Array<E>()
0024    
0025            var error: ErrorType?
0026    
0027            let lock = RunLoopLock()
0028    
0029            let d = SingleAssignmentDisposable()
0030    
0031            lock.dispatch {
0032                d.disposable = self.source.subscribe { e in
0033                    if d.disposed {
0034                        return
0035                    }
0036                    switch e {
0037                    case .Next(let element):
0038                        elements.append(element)
0039                    case .Error(let e):
0040                        error = e
0041                        d.dispose()
0042                        lock.stop()
0043                    case .Completed:
0044                        d.dispose()
0045                        lock.stop()
0046                    }
0047                }
0048            }
0049    
0050            lock.run()
0051    
0052            d.dispose()
0053    
0054            if let error = error {
0055                throw error
0056            }
0057    
0058            return elements
0059        }
0060    }
0061    
0062    extension BlockingObservable {
0063        /**
0064         Blocks current thread until sequence produces first element.
0065    
0066         If sequence terminates with error before producing first element, terminating error will be thrown.
0067    
0068         - returns: First element of sequence. If sequence is empty `nil` is returned.
0069         */
0070        public func first() throws -> E? {
0071            var element: E?
0072    
0073            var error: ErrorType?
0074    
0075            let d = SingleAssignmentDisposable()
0076    
0077            let lock = RunLoopLock()
0078    
0079            lock.dispatch {
0080                d.disposable = self.source.subscribe { e in
0081                    if d.disposed {
0082                        return
0083                    }
0084    
0085                    switch e {
0086                    case .Next(let e):
0087                        if element == nil {
0088                            element = e
0089                        }
0090                        break
0091                    case .Error(let e):
0092                        error = e
0093                    default:
0094                        break
0095                    }
0096    
0097                    d.dispose()
0098                    lock.stop()
0099                }
0100            }
0101    
0102            lock.run()
0103    
0104            d.dispose()
0105    
0106            if let error = error {
0107                throw error
0108            }
0109    
0110            return element
0111        }
0112    }
0113    
0114    extension BlockingObservable {
0115        /**
0116         Blocks current thread until sequence terminates.
0117    
0118         If sequence terminates with error, terminating error will be thrown.
0119    
0120         - returns: Last element in the sequence. If sequence is empty `nil` is returned.
0121         */
0122        public func last() throws -> E? {
0123            var element: E?
0124    
0125            var error: ErrorType?
0126    
0127            let d = SingleAssignmentDisposable()
0128    
0129            let lock = RunLoopLock()
0130    
0131            lock.dispatch {
0132                d.disposable = self.source.subscribe { e in
0133                    if d.disposed {
0134                        return
0135                    }
0136                    switch e {
0137                    case .Next(let e):
0138                        element = e
0139                        return
0140                    case .Error(let e):
0141                        error = e
0142                    default:
0143                        break
0144                    }
0145    
0146                    d.dispose()
0147                    lock.stop()
0148                }
0149            }
0150            
0151            lock.run()
0152            
0153            d.dispose()
0154            
0155            if let error = error {
0156                throw error
0157            }
0158            
0159            return element
0160        }
0161    }
0162    
0163    extension BlockingObservable {
0164        /**
0165         Blocks current thread until sequence terminates.
0166         
0167         If sequence terminates with error before producing first element, terminating error will be thrown.
0168         
0169         - returns: Returns the only element of an sequence, and reports an error if there is not exactly one element in the observable sequence.
0170         */
0171        public func single() throws -> E? {
0172            return try single { _ in true }
0173        }
0174    
0175        /**
0176         Blocks current thread until sequence terminates.
0177         
0178         If sequence terminates with error before producing first element, terminating error will be thrown.
0179         
0180         - parameter predicate: A function to test each source element for a condition.
0181         - returns: Returns the only element of an sequence that satisfies the condition in the predicate, and reports an error if there is not exactly one element in the sequence.
0182         */
0183        public func single
BlockingObservable+Operators.swift:172
        return try single { _ in true }
(predicate: (E) throws -> Bool) throws -> E? { 0184 var element: E? 0185 0186 var error: ErrorType? 0187 0188 let d = SingleAssignmentDisposable() 0189 0190 let lock = RunLoopLock() 0191 0192 lock.dispatch { 0193 d.disposable = self.source.subscribe { e in 0194 if d.disposed { 0195 return 0196 } 0197 switch e { 0198 case .Next(let e): 0199 do { 0200 if try !predicate(e) { 0201 return 0202 } 0203 if element == nil { 0204 element = e 0205 } else { 0206 throw RxError.MoreThanOneElement 0207 } 0208 } catch (let err) { 0209 error = err 0210 d.dispose() 0211 lock.stop() 0212 } 0213 return 0214 case .Error(let e): 0215 error = e 0216 case .Completed: 0217 if element == nil { 0218 error = RxError.NoElements 0219 } 0220 } 0221 0222 d.dispose() 0223 lock.stop() 0224 } 0225 } 0226 0227 lock.run() 0228 d.dispose() 0229 0230 if let error = error { 0231 throw error 0232 } 0233 0234 return element 0235 } 0236 } 0237