0001 // 0002 // BlockingObservable+Operators.swift 0003 // Rx 0004 // 0005 // Created by Krunoslav Zaher on 10/19/15. 0006 // Copyright © 2015 Krunoslav Zaher. All rights reserved. 0007 // 0008 0009 import Foundation 0010 #if !RX_NO_MODULE 0011 import RxSwift 0012 #endif 0013 0014 extension BlockingObservable { 0015 /** 0016 Blocks current thread until sequence terminates. 0017 0018 If sequence terminates with error, terminating error will be thrown. 0019 0020 - returns: All elements of sequence. 0021 */ 0022 public func toArray() throws -> [E] { 0023 var elements: [E] = Array<E>() 0024 0025 var error: ErrorType? 0026 0027 let lock = RunLoopLock() 0028 0029 let d = SingleAssignmentDisposable() 0030 0031 lock.dispatch { 0032 d.disposable = self.source.subscribe { e in 0033 if d.disposed { 0034 return 0035 } 0036 switch e { 0037 case .Next(let element): 0038 elements.append(element) 0039 case .Error(let e): 0040 error = e 0041 d.dispose() 0042 lock.stop() 0043 case .Completed: 0044 d.dispose() 0045 lock.stop() 0046 } 0047 } 0048 } 0049 0050 lock.run() 0051 0052 d.dispose() 0053 0054 if let error = error { 0055 throw error 0056 } 0057 0058 return elements 0059 } 0060 } 0061 0062 extension BlockingObservable { 0063 /** 0064 Blocks current thread until sequence produces first element. 0065 0066 If sequence terminates with error before producing first element, terminating error will be thrown. 0067 0068 - returns: First element of sequence. If sequence is empty `nil` is returned. 0069 */ 0070 public func first() throws -> E? { 0071 var element: E? 0072 0073 var error: ErrorType? 0074 0075 let d = SingleAssignmentDisposable() 0076 0077 let lock = RunLoopLock() 0078 0079 lock.dispatch { 0080 d.disposable = self.source.subscribe { e in 0081 if d.disposed { 0082 return 0083 } 0084 0085 switch e { 0086 case .Next(let e): 0087 if element == nil { 0088 element = e 0089 } 0090 break 0091 case .Error(let e): 0092 error = e 0093 default: 0094 break 0095 } 0096 0097 d.dispose() 0098 lock.stop() 0099 } 0100 } 0101 0102 lock.run() 0103 0104 d.dispose() 0105 0106 if let error = error { 0107 throw error 0108 } 0109 0110 return element 0111 } 0112 } 0113 0114 extension BlockingObservable { 0115 /** 0116 Blocks current thread until sequence terminates. 0117 0118 If sequence terminates with error, terminating error will be thrown. 0119 0120 - returns: Last element in the sequence. If sequence is empty `nil` is returned. 0121 */ 0122 public func last() throws -> E? { 0123 var element: E? 0124 0125 var error: ErrorType? 0126 0127 let d = SingleAssignmentDisposable() 0128 0129 let lock = RunLoopLock() 0130 0131 lock.dispatch { 0132 d.disposable = self.source.subscribe { e in 0133 if d.disposed { 0134 return 0135 } 0136 switch e { 0137 case .Next(let e): 0138 element = e 0139 return 0140 case .Error(let e): 0141 error = e 0142 default: 0143 break 0144 } 0145 0146 d.dispose() 0147 lock.stop() 0148 } 0149 } 0150 0151 lock.run() 0152 0153 d.dispose() 0154 0155 if let error = error { 0156 throw error 0157 } 0158 0159 return element 0160 } 0161 } 0162 0163 extension BlockingObservable { 0164 /** 0165 Blocks current thread until sequence terminates. 0166 0167 If sequence terminates with error before producing first element, terminating error will be thrown. 0168 0169 - returns: Returns the only element of an sequence, and reports an error if there is not exactly one element in the observable sequence. 0170 */ 0171 public func single() throws -> E? { 0172 return try single { _ in true } 0173 } 0174 0175 /** 0176 Blocks current thread until sequence terminates. 0177 0178 If sequence terminates with error before producing first element, terminating error will be thrown. 0179 0180 - parameter predicate: A function to test each source element for a condition. 0181 - returns: Returns the only element of an sequence that satisfies the condition in the predicate, and reports an error if there is not exactly one element in the sequence. 0182 */ 0183 public func single(predicate: (E) throws -> Bool) throws -> E? { 0184 var element: E? 0185 0186 var error: ErrorType? 0187 0188 let d = SingleAssignmentDisposable() 0189 0190 let lock = RunLoopLock() 0191 0192 lock.dispatch { 0193 d.disposable = self.source.subscribe { e in 0194 if d.disposed { 0195 return 0196 } 0197 switch e { 0198 case .Next(let e): 0199 do { 0200 if try !predicate(e) { 0201 return 0202 } 0203 if element == nil { 0204 element = e 0205 } else { 0206 throw RxError.MoreThanOneElement 0207 } 0208 } catch (let err) { 0209 error = err 0210 d.dispose() 0211 lock.stop() 0212 } 0213 return 0214 case .Error(let e): 0215 error = e 0216 case .Completed: 0217 if element == nil { 0218 error = RxError.NoElements 0219 } 0220 } 0221 0222 d.dispose() 0223 lock.stop() 0224 } 0225 } 0226 0227 lock.run() 0228 d.dispose() 0229 0230 if let error = error { 0231 throw error 0232 } 0233 0234 return element 0235 } 0236 } 0237
BlockingObservable+Operators.swift:172 return try single { _ in true }