0001
0009 import Foundation
0010
0011
0013 class MergeLimitedSinkIter| Merge.swift:102 | let observer = MergeLimitedSinkIter(parent: self, disposeKey: key) |
<S| Merge.swift:19 | typealias Parent = MergeLimitedSink<S, O> |
: ObservableConvertibleType, O| Merge.swift:17 | typealias E = O.E |
| Merge.swift:19 | typealias Parent = MergeLimitedSink<S, O> |
: ObserverType where S.E == O.E>
0014 : ObserverType
0015 , LockOwnerType
0016 , SynchronizedOnType {
0017 typealias E| Merge.swift:33 | func on(event: Event<E>) { |
| Merge.swift:37 | func _synchronized_on(event: Event<E>) { |
= O.E
0018 typealias DisposeKey| Merge.swift:22 | private let _disposeKey: DisposeKey |
| Merge.swift:28 | init(parent: Parent, disposeKey: DisposeKey) { |
= Bag<Disposable>.KeyType
0019 typealias Parent| Merge.swift:21 | private let _parent: Parent |
| Merge.swift:28 | init(parent: Parent, disposeKey: DisposeKey) { |
= MergeLimitedSink<S, O>
0020
0021 private let _parent| Merge.swift:25 | return _parent._lock |
| Merge.swift:29 | _parent = parent |
| Merge.swift:40 | _parent.forwardOn(event) |
| Merge.swift:42 | _parent.forwardOn(event) |
| Merge.swift:43 | _parent.dispose() |
| Merge.swift:45 | _parent._group.removeDisposable(_disposeKey) |
| Merge.swift:46 | if let next = _parent._queue.dequeue() { |
| Merge.swift:47 | _parent.subscribe(next, group: _parent._group) |
| Merge.swift:47 | _parent.subscribe(next, group: _parent._group) |
| Merge.swift:50 | _parent._activeCount = _parent._activeCount - 1 |
| Merge.swift:50 | _parent._activeCount = _parent._activeCount - 1 |
| Merge.swift:52 | if _parent._stopped && _parent._activeCount == 0 { |
| Merge.swift:52 | if _parent._stopped && _parent._activeCount == 0 { |
| Merge.swift:53 | _parent.forwardOn(.Completed) |
| Merge.swift:54 | _parent.dispose() |
: Parent
0022 private let _disposeKey| Merge.swift:30 | _disposeKey = disposeKey |
| Merge.swift:45 | _parent._group.removeDisposable(_disposeKey) |
: DisposeKey
0023
0024 var _lock: NSRecursiveLock {
0025 return _parent._lock
0026 }
0027
0028 init| Merge.swift:102 | let observer = MergeLimitedSinkIter(parent: self, disposeKey: key) |
(parent: Parent, disposeKey: DisposeKey) {
0029 _parent = parent
0030 _disposeKey = disposeKey
0031 }
0032
0033 func on(event: Event<E>) {
0034 synchronizedOn(event)
0035 }
0036
0037 func _synchronized_on(event: Event<E>) {
0038 switch event {
0039 case .Next:
0040 _parent.forwardOn(event)
0041 case .Error:
0042 _parent.forwardOn(event)
0043 _parent.dispose()
0044 case .Completed:
0045 _parent._group.removeDisposable(_disposeKey)
0046 if let next = _parent._queue.dequeue() {
0047 _parent.subscribe(next, group: _parent._group)
0048 }
0049 else {
0050 _parent._activeCount = _parent._activeCount - 1
0051
0052 if _parent._stopped && _parent._activeCount == 0 {
0053 _parent.forwardOn(.Completed)
0054 _parent.dispose()
0055 }
0056 }
0057 }
0058 }
0059 }
0060
0061 class MergeLimitedSink| Merge.swift:19 | typealias Parent = MergeLimitedSink<S, O> |
| Merge.swift:156 | let sink = MergeLimitedSink<S, O>(maxConcurrent: _maxConcurrent, observer: observer) |
<S| Merge.swift:66 | typealias E = S |
| Merge.swift:67 | typealias QueueType = Queue<S> |
| Merge.swift:88 | func run(source: Observable<S>) -> Disposable { |
: ObservableConvertibleType, O| Merge.swift:62 | : Sink<O> |
| Merge.swift:81 | init(maxConcurrent: Int, observer: O) { |
: ObserverType where S.E == O.E>
0062 : Sink<O>
0063 , ObserverType
0064 , LockOwnerType
0065 , SynchronizedOnType {
0066 typealias E| Merge.swift:96 | func subscribe(innerSource: E, group: CompositeDisposable) { |
| Merge.swift:109 | func on(event: Event<E>) { |
| Merge.swift:113 | func _synchronized_on(event: Event<E>) { |
= S
0067 typealias QueueType| Merge.swift:76 | private var _queue = QueueType(capacity: 2) |
= Queue<S>
0068
0069 private let _maxConcurrent| Merge.swift:82 | _maxConcurrent = maxConcurrent |
| Merge.swift:117 | if _activeCount < _maxConcurrent { |
: Int
0070
0071 let _lock| Merge.swift:25 | return _parent._lock |
= NSRecursiveLock()
0072
0073 private var _stopped| Merge.swift:52 | if _parent._stopped && _parent._activeCount == 0 { |
| Merge.swift:141 | _stopped = true |
= false
0075 private var _activeCount| Merge.swift:50 | _parent._activeCount = _parent._activeCount - 1 |
| Merge.swift:50 | _parent._activeCount = _parent._activeCount - 1 |
| Merge.swift:52 | if _parent._stopped && _parent._activeCount == 0 { |
| Merge.swift:117 | if _activeCount < _maxConcurrent { |
| Merge.swift:118 | _activeCount += 1 |
| Merge.swift:133 | if _activeCount == 0 { |
= 0
0076 private var _queue| Merge.swift:46 | if let next = _parent._queue.dequeue() { |
| Merge.swift:122 | _queue.enqueue(value) |
= QueueType(capacity: 2)
0077
0078 private let _sourceSubscription| Merge.swift:84 | _group.addDisposable(_sourceSubscription) |
| Merge.swift:89 | _group.addDisposable(_sourceSubscription) |
| Merge.swift:92 | _sourceSubscription.disposable = disposable |
| Merge.swift:138 | _sourceSubscription.dispose() |
= SingleAssignmentDisposable()
0079 private let _group| Merge.swift:45 | _parent._group.removeDisposable(_disposeKey) |
| Merge.swift:47 | _parent.subscribe(next, group: _parent._group) |
| Merge.swift:84 | _group.addDisposable(_sourceSubscription) |
| Merge.swift:89 | _group.addDisposable(_sourceSubscription) |
| Merge.swift:93 | return _group |
| Merge.swift:127 | self.subscribe(value, group: _group) |
= CompositeDisposable()
0080
0081 init(maxConcurrent: Int, observer: O) {
0082 _maxConcurrent = maxConcurrent
0083
0084 _group.addDisposable(_sourceSubscription)
0085 super.init(observer: observer)
0086 }
0087
0088 func run| Merge.swift:157 | sink.disposable = sink.run(_source) |
(source: Observable<S>) -> Disposable {
0089 _group.addDisposable(_sourceSubscription)
0090
0091 let disposable = source.subscribe(self)
0092 _sourceSubscription.disposable = disposable
0093 return _group
0094 }
0095
0096 func subscribe| Merge.swift:47 | _parent.subscribe(next, group: _parent._group) |
| Merge.swift:127 | self.subscribe(value, group: _group) |
(innerSource: E, group: CompositeDisposable) {
0097 let subscription = SingleAssignmentDisposable()
0098
0099 let key = group.addDisposable(subscription)
0100
0101 if let key = key {
0102 let observer = MergeLimitedSinkIter(parent: self, disposeKey: key)
0103
0104 let disposable = innerSource.asObservable().subscribe(observer)
0105 subscription.disposable = disposable
0106 }
0107 }
0108
0109 func on(event: Event<E>) {
0110 synchronizedOn(event)
0111 }
0112
0113 func _synchronized_on(event: Event<E>) {
0114 switch event {
0115 case .Next(let value):
0116 let subscribe: Bool
0117 if _activeCount < _maxConcurrent {
0118 _activeCount += 1
0119 subscribe = true
0120 }
0121 else {
0122 _queue.enqueue(value)
0123 subscribe = false
0124 }
0125
0126 if subscribe {
0127 self.subscribe(value, group: _group)
0128 }
0129 case .Error(let error):
0130 forwardOn(.Error(error))
0131 dispose()
0132 case .Completed:
0133 if _activeCount == 0 {
0134 forwardOn(.Completed)
0135 dispose()
0136 }
0137 else {
0138 _sourceSubscription.dispose()
0139 }
0140
0141 _stopped = true
0142 }
0143 }
0144 }
0145
0146 class MergeLimited| Observable+Multiple.swift:173 | return MergeLimited(source: asObservable(), maxConcurrent: maxConcurrent) |
<S| Merge.swift:146 | class MergeLimited<S: ObservableConvertibleType> : Producer<S.E> { |
| Merge.swift:147 | private let _source: Observable<S> |
| Merge.swift:150 | init(source: Observable<S>, maxConcurrent: Int) { |
| Merge.swift:155 | override func run<O: ObserverType where O.E == S.E>(observer: O) -> Disposable { |
| Merge.swift:156 | let sink = MergeLimitedSink<S, O>(maxConcurrent: _maxConcurrent, observer: observer) |
: ObservableConvertibleType> : Producer<S.E> {
0147 private let _source| Merge.swift:151 | _source = source |
| Merge.swift:157 | sink.disposable = sink.run(_source) |
: Observable<S>
0148 private let _maxConcurrent| Merge.swift:152 | _maxConcurrent = maxConcurrent |
| Merge.swift:156 | let sink = MergeLimitedSink<S, O>(maxConcurrent: _maxConcurrent, observer: observer) |
: Int
0149
0150 init| Observable+Multiple.swift:173 | return MergeLimited(source: asObservable(), maxConcurrent: maxConcurrent) |
(source: Observable<S>, maxConcurrent: Int) {
0151 _source = source
0152 _maxConcurrent = maxConcurrent
0153 }
0154
0155 override func run<O: ObserverType where O.E == S.E>(observer: O) -> Disposable {
0156 let sink = MergeLimitedSink<S, O>(maxConcurrent: _maxConcurrent, observer: observer)
0157 sink.disposable = sink.run(_source)
0158 return sink
0159 }
0160 }
0161
0162
0164 final class MergeBasicSink| Merge.swift:419 | let sink = MergeBasicSink<S, O>(observer: observer) |
<S| Merge.swift:164 | final class MergeBasicSink<S: ObservableConvertibleType, O: ObserverType where O.E == S.E> : MergeSink<S, S, O> { |
| Merge.swift:164 | final class MergeBasicSink<S: ObservableConvertibleType, O: ObserverType where O.E == S.E> : MergeSink<S, S, O> { |
| Merge.swift:169 | override func performMap(element: S) throws -> S { |
| Merge.swift:169 | override func performMap(element: S) throws -> S { |
: ObservableConvertibleType, O| Merge.swift:164 | final class MergeBasicSink<S: ObservableConvertibleType, O: ObserverType where O.E == S.E> : MergeSink<S, S, O> { |
| Merge.swift:165 | override init(observer: O) { |
: ObserverType where O.E == S.E> : MergeSink<S, S, O> {
0165 override init(observer: O) {
0166 super.init(observer: observer)
0167 }
0168
0169 override func performMap(element: S) throws -> S {
0170 return element
0171 }
0172 }
0173
0174
0176 final class FlatMapSink| Merge.swift:366 | let sink = FlatMapSink(selector: _selector, observer: observer) |
<SourceType| Merge.swift:176 | final class FlatMapSink<SourceType, S: ObservableConvertibleType, O: ObserverType where O.E == S.E> : MergeSink<SourceType, S, O> { |
| Merge.swift:177 | typealias Selector = (SourceType) throws -> S |
| Merge.swift:186 | override func performMap(element: SourceType) throws -> S { |
, S| Merge.swift:176 | final class FlatMapSink<SourceType, S: ObservableConvertibleType, O: ObserverType where O.E == S.E> : MergeSink<SourceType, S, O> { |
| Merge.swift:177 | typealias Selector = (SourceType) throws -> S |
| Merge.swift:186 | override func performMap(element: SourceType) throws -> S { |
: ObservableConvertibleType, O| Merge.swift:176 | final class FlatMapSink<SourceType, S: ObservableConvertibleType, O: ObserverType where O.E == S.E> : MergeSink<SourceType, S, O> { |
| Merge.swift:181 | init(selector: Selector, observer: O) { |
: ObserverType where O.E == S.E> : MergeSink<SourceType, S, O> {
0177 typealias Selector| Merge.swift:179 | private let _selector: Selector |
| Merge.swift:181 | init(selector: Selector, observer: O) { |
= (SourceType) throws -> S
0178
0179 private let _selector| Merge.swift:182 | _selector = selector |
| Merge.swift:187 | return try _selector(element) |
: Selector
0180
0181 init| Merge.swift:366 | let sink = FlatMapSink(selector: _selector, observer: observer) |
(selector: Selector, observer: O) {
0182 _selector = selector
0183 super.init(observer: observer)
0184 }
0185
0186 override func performMap(element: SourceType) throws -> S {
0187 return try _selector(element)
0188 }
0189 }
0190
0191 final class FlatMapWithIndexSink| Merge.swift:385 | let sink = FlatMapWithIndexSink<SourceType, S, O>(selector: _selector, observer: observer) |
<SourceType| Merge.swift:191 | final class FlatMapWithIndexSink<SourceType, S: ObservableConvertibleType, O: ObserverType where O.E == S.E> : MergeSink<SourceType, S, O> { |
| Merge.swift:192 | typealias Selector = (SourceType, Int) throws -> S |
| Merge.swift:202 | override func performMap(element: SourceType) throws -> S { |
, S| Merge.swift:191 | final class FlatMapWithIndexSink<SourceType, S: ObservableConvertibleType, O: ObserverType where O.E == S.E> : MergeSink<SourceType, S, O> { |
| Merge.swift:192 | typealias Selector = (SourceType, Int) throws -> S |
| Merge.swift:202 | override func performMap(element: SourceType) throws -> S { |
: ObservableConvertibleType, O| Merge.swift:191 | final class FlatMapWithIndexSink<SourceType, S: ObservableConvertibleType, O: ObserverType where O.E == S.E> : MergeSink<SourceType, S, O> { |
| Merge.swift:197 | init(selector: Selector, observer: O) { |
: ObserverType where O.E == S.E> : MergeSink<SourceType, S, O> {
0192 typealias Selector| Merge.swift:195 | private let _selector: Selector |
| Merge.swift:197 | init(selector: Selector, observer: O) { |
= (SourceType, Int) throws -> S
0193
0194 private var _index| Merge.swift:203 | return try _selector(element, try incrementChecked(&_index)) |
= 0
0195 private let _selector| Merge.swift:198 | _selector = selector |
| Merge.swift:203 | return try _selector(element, try incrementChecked(&_index)) |
: Selector
0196
0197 init(selector: Selector, observer: O) {
0198 _selector = selector
0199 super.init(observer: observer)
0200 }
0201
0202 override func performMap(element: SourceType) throws -> S {
0203 return try _selector(element, try incrementChecked(&_index))
0204 }
0205 }
0206
0207
0209 final class FlatMapFirstSink| Merge.swift:405 | let sink = FlatMapFirstSink<SourceType, S, O>(selector: _selector, observer: observer) |
<SourceType| Merge.swift:209 | final class FlatMapFirstSink<SourceType, S: ObservableConvertibleType, O: ObserverType where O.E == S.E> : MergeSink<SourceType, S, O> { |
| Merge.swift:210 | typealias Selector = (SourceType) throws -> S |
| Merge.swift:223 | override func performMap(element: SourceType) throws -> S { |
, S| Merge.swift:209 | final class FlatMapFirstSink<SourceType, S: ObservableConvertibleType, O: ObserverType where O.E == S.E> : MergeSink<SourceType, S, O> { |
| Merge.swift:210 | typealias Selector = (SourceType) throws -> S |
| Merge.swift:223 | override func performMap(element: SourceType) throws -> S { |
: ObservableConvertibleType, O| Merge.swift:209 | final class FlatMapFirstSink<SourceType, S: ObservableConvertibleType, O: ObserverType where O.E == S.E> : MergeSink<SourceType, S, O> { |
| Merge.swift:218 | init(selector: Selector, observer: O) { |
: ObserverType where O.E == S.E> : MergeSink<SourceType, S, O> {
0210 typealias Selector| Merge.swift:212 | private let _selector: Selector |
| Merge.swift:218 | init(selector: Selector, observer: O) { |
= (SourceType) throws -> S
0211
0212 private let _selector| Merge.swift:219 | _selector = selector |
| Merge.swift:224 | return try _selector(element) |
: Selector
0213
0214 override var subscribeNext: Bool {
0215 return _group.count == MergeNoIterators
0216 }
0217
0218 init(selector: Selector, observer: O) {
0219 _selector = selector
0220 super.init(observer: observer)
0221 }
0222
0223 override func performMap(element: SourceType) throws -> S {
0224 return try _selector(element)
0225 }
0226 }
0227
0228 private let MergeNoIterators| Merge.swift:215 | return _group.count == MergeNoIterators |
| Merge.swift:262 | if _parent._stopped && _parent._group.count == MergeNoIterators { |
| Merge.swift:321 | if _group.count == MergeNoIterators { |
= 1
0230
0231 class MergeSinkIter| Merge.swift:335 | let iter = MergeSinkIter(parent: self, disposeKey: disposeKey) |
<SourceType| Merge.swift:232 | typealias Parent = MergeSink<SourceType, S, O> |
, S| Merge.swift:232 | typealias Parent = MergeSink<SourceType, S, O> |
: ObservableConvertibleType, O| Merge.swift:232 | typealias Parent = MergeSink<SourceType, S, O> |
| Merge.swift:234 | typealias E = O.E |
: ObserverType where O.E == S.E> : ObserverType {
0232 typealias Parent| Merge.swift:236 | private let _parent: Parent |
| Merge.swift:239 | init(parent: Parent, disposeKey: DisposeKey) { |
= MergeSink<SourceType, S, O>
0233 typealias DisposeKey| Merge.swift:237 | private let _disposeKey: DisposeKey |
| Merge.swift:239 | init(parent: Parent, disposeKey: DisposeKey) { |
= CompositeDisposable.DisposeKey
0234 typealias E| Merge.swift:244 | func on(event: Event<E>) { |
= O.E
0235
0236 private let _parent| Merge.swift:240 | _parent = parent |
| Merge.swift:247 | _parent._lock.lock(); defer { _parent._lock.unlock() } // lock { |
| Merge.swift:247 | _parent._lock.lock(); defer { _parent._lock.unlock() } // lock { |
| Merge.swift:248 | _parent.forwardOn(.Next(value)) |
| Merge.swift:251 | _parent._lock.lock(); defer { _parent._lock.unlock() } // lock { |
| Merge.swift:251 | _parent._lock.lock(); defer { _parent._lock.unlock() } // lock { |
| Merge.swift:252 | _parent.forwardOn(.Error(error)) |
| Merge.swift:253 | _parent.dispose() |
| Merge.swift:256 | _parent._group.removeDisposable(_disposeKey) |
| Merge.swift:262 | if _parent._stopped && _parent._group.count == MergeNoIterators { |
| Merge.swift:262 | if _parent._stopped && _parent._group.count == MergeNoIterators { |
| Merge.swift:263 | _parent._lock.lock(); defer { _parent._lock.unlock() } // lock { |
| Merge.swift:263 | _parent._lock.lock(); defer { _parent._lock.unlock() } // lock { |
| Merge.swift:264 | _parent.forwardOn(.Completed) |
| Merge.swift:265 | _parent.dispose() |
: Parent
0237 private let _disposeKey| Merge.swift:241 | _disposeKey = disposeKey |
| Merge.swift:256 | _parent._group.removeDisposable(_disposeKey) |
: DisposeKey
0238
0239 init| Merge.swift:335 | let iter = MergeSinkIter(parent: self, disposeKey: disposeKey) |
(parent: Parent, disposeKey: DisposeKey) {
0240 _parent = parent
0241 _disposeKey = disposeKey
0242 }
0243
0244 func on(event: Event<E>) {
0245 switch event {
0246 case .Next(let value):
0247 _parent._lock.lock(); defer { _parent._lock.unlock() } _parent.forwardOn(.Next(value))
0249 case .Error(let error):
0251 _parent._lock.lock(); defer { _parent._lock.unlock() } _parent.forwardOn(.Error(error))
0253 _parent.dispose()
0254 case .Completed:
0256 _parent._group.removeDisposable(_disposeKey)
0257 if _parent._stopped && _parent._group.count == MergeNoIterators {
0263 _parent._lock.lock(); defer { _parent._lock.unlock() } _parent.forwardOn(.Completed)
0265 _parent.dispose()
0266 }
0268 }
0269 }
0270 }
0271
0272
0273 class MergeSink| Merge.swift:164 | final class MergeBasicSink<S: ObservableConvertibleType, O: ObserverType where O.E == S.E> : MergeSink<S, S, O> { |
| Merge.swift:176 | final class FlatMapSink<SourceType, S: ObservableConvertibleType, O: ObserverType where O.E == S.E> : MergeSink<SourceType, S, O> { |
| Merge.swift:191 | final class FlatMapWithIndexSink<SourceType, S: ObservableConvertibleType, O: ObserverType where O.E == S.E> : MergeSink<SourceType, S, O> { |
| Merge.swift:209 | final class FlatMapFirstSink<SourceType, S: ObservableConvertibleType, O: ObserverType where O.E == S.E> : MergeSink<SourceType, S, O> { |
| Merge.swift:232 | typealias Parent = MergeSink<SourceType, S, O> |
<SourceType| Merge.swift:277 | typealias Element = SourceType |
| Merge.swift:295 | func performMap(element: SourceType) throws -> S { |
| Merge.swift:299 | func on(event: Event<SourceType>) { |
| Merge.swift:341 | func run(source: Observable<SourceType>) -> Disposable { |
, S| Merge.swift:295 | func performMap(element: SourceType) throws -> S { |
: ObservableConvertibleType, O| Merge.swift:274 | : Sink<O> |
| Merge.swift:276 | typealias ResultType = O.E |
| Merge.swift:291 | override init(observer: O) { |
| Merge.swift:332 | func subscribeInner(source: Observable<O.E>) { |
: ObserverType where O.E == S.E>
0274 : Sink<O>
0275 , ObserverType {
0276 typealias ResultType = O.E
0277 typealias Element = SourceType
0278
0279 private let _lock| Merge.swift:247 | _parent._lock.lock(); defer { _parent._lock.unlock() } // lock { |
| Merge.swift:247 | _parent._lock.lock(); defer { _parent._lock.unlock() } // lock { |
| Merge.swift:251 | _parent._lock.lock(); defer { _parent._lock.unlock() } // lock { |
| Merge.swift:251 | _parent._lock.lock(); defer { _parent._lock.unlock() } // lock { |
| Merge.swift:263 | _parent._lock.lock(); defer { _parent._lock.unlock() } // lock { |
| Merge.swift:263 | _parent._lock.lock(); defer { _parent._lock.unlock() } // lock { |
| Merge.swift:314 | _lock.lock(); defer { _lock.unlock() } // lock { |
| Merge.swift:314 | _lock.lock(); defer { _lock.unlock() } // lock { |
| Merge.swift:319 | _lock.lock(); defer { _lock.unlock() } // lock { |
| Merge.swift:319 | _lock.lock(); defer { _lock.unlock() } // lock { |
= NSRecursiveLock()
0280
0281 private var subscribeNext| Merge.swift:302 | if !subscribeNext { |
: Bool {
0282 return true
0283 }
0284
0285 private let _group| Merge.swift:215 | return _group.count == MergeNoIterators |
| Merge.swift:256 | _parent._group.removeDisposable(_disposeKey) |
| Merge.swift:262 | if _parent._stopped && _parent._group.count == MergeNoIterators { |
| Merge.swift:321 | if _group.count == MergeNoIterators { |
| Merge.swift:334 | if let disposeKey = _group.addDisposable(iterDisposable) { |
| Merge.swift:342 | _group.addDisposable(_sourceSubscription) |
| Merge.swift:347 | return _group |
= CompositeDisposable()
0287 private let _sourceSubscription| Merge.swift:326 | _sourceSubscription.dispose() |
| Merge.swift:342 | _group.addDisposable(_sourceSubscription) |
| Merge.swift:345 | _sourceSubscription.disposable = subscription |
= SingleAssignmentDisposable()
0288
0289 private var _stopped| Merge.swift:262 | if _parent._stopped && _parent._group.count == MergeNoIterators { |
| Merge.swift:320 | _stopped = true |
= false
0290
0291 override init| Merge.swift:166 | super.init(observer: observer) |
| Merge.swift:183 | super.init(observer: observer) |
| Merge.swift:199 | super.init(observer: observer) |
| Merge.swift:220 | super.init(observer: observer) |
(observer: O) {
0292 super.init(observer: observer)
0293 }
0294
0295 func performMap| Merge.swift:306 | let value = try performMap(element) |
(element: SourceType) throws -> S {
0296 abstractMethod()
0297 }
0298
0299 func on(event: Event<SourceType>) {
0300 switch event {
0301 case .Next(let element):
0302 if !subscribeNext {
0303 return
0304 }
0305 do {
0306 let value = try performMap(element)
0307 subscribeInner(value.asObservable())
0308 }
0309 catch let e {
0310 forwardOn(.Error(e))
0311 dispose()
0312 }
0313 case .Error(let error):
0314 _lock.lock(); defer { _lock.unlock() } forwardOn(.Error(error))
0316 dispose()
0317 case .Completed:
0319 _lock.lock(); defer { _lock.unlock() } _stopped = true
0321 if _group.count == MergeNoIterators {
0322 forwardOn(.Completed)
0323 dispose()
0324 }
0325 else {
0326 _sourceSubscription.dispose()
0327 }
0328 }
0330 }
0331
0332 func subscribeInner| Merge.swift:307 | subscribeInner(value.asObservable()) |
(source: Observable<O.E>) {
0333 let iterDisposable = SingleAssignmentDisposable()
0334 if let disposeKey = _group.addDisposable(iterDisposable) {
0335 let iter = MergeSinkIter(parent: self, disposeKey: disposeKey)
0336 let subscription = source.subscribe(iter)
0337 iterDisposable.disposable = subscription
0338 }
0339 }
0340
0341 func run| Merge.swift:367 | sink.disposable = sink.run(_source) |
| Merge.swift:386 | sink.disposable = sink.run(_source) |
| Merge.swift:406 | sink.disposable = sink.run(_source) |
| Merge.swift:420 | sink.disposable = sink.run(_source) |
(source: Observable<SourceType>) -> Disposable {
0342 _group.addDisposable(_sourceSubscription)
0343
0344 let subscription = source.subscribe(self)
0345 _sourceSubscription.disposable = subscription
0346
0347 return _group
0348 }
0349 }
0350
0351
0353 final class FlatMap| Observable+StandardSequenceOperators.swift:211 | return FlatMap(source: asObservable(), selector: selector) |
<SourceType| Merge.swift:354 | typealias Selector = (SourceType) throws -> S |
| Merge.swift:356 | private let _source: Observable<SourceType> |
| Merge.swift:360 | init(source: Observable<SourceType>, selector: Selector) { |
, S| Merge.swift:353 | final class FlatMap<SourceType, S: ObservableConvertibleType>: Producer<S.E> { |
| Merge.swift:354 | typealias Selector = (SourceType) throws -> S |
| Merge.swift:365 | override func run<O: ObserverType where O.E == S.E>(observer: O) -> Disposable { |
: ObservableConvertibleType>: Producer<S.E> {
0354 typealias Selector| Merge.swift:358 | private let _selector: Selector |
| Merge.swift:360 | init(source: Observable<SourceType>, selector: Selector) { |
= (SourceType) throws -> S
0355
0356 private let _source| Merge.swift:361 | _source = source |
| Merge.swift:367 | sink.disposable = sink.run(_source) |
: Observable<SourceType>
0357
0358 private let _selector| Merge.swift:362 | _selector = selector |
| Merge.swift:366 | let sink = FlatMapSink(selector: _selector, observer: observer) |
: Selector
0359
0360 init| Observable+StandardSequenceOperators.swift:211 | return FlatMap(source: asObservable(), selector: selector) |
(source: Observable<SourceType>, selector: Selector) {
0361 _source = source
0362 _selector = selector
0363 }
0364
0365 override func run<O: ObserverType where O.E == S.E>(observer: O) -> Disposable {
0366 let sink = FlatMapSink(selector: _selector, observer: observer)
0367 sink.disposable = sink.run(_source)
0368 return sink
0369 }
0370 }
0371
0372 final class FlatMapWithIndex| Observable+StandardSequenceOperators.swift:225 | return FlatMapWithIndex(source: asObservable(), selector: selector) |
<SourceType| Merge.swift:373 | typealias Selector = (SourceType, Int) throws -> S |
| Merge.swift:375 | private let _source: Observable<SourceType> |
| Merge.swift:379 | init(source: Observable<SourceType>, selector: Selector) { |
| Merge.swift:385 | let sink = FlatMapWithIndexSink<SourceType, S, O>(selector: _selector, observer: observer) |
, S| Merge.swift:372 | final class FlatMapWithIndex<SourceType, S: ObservableConvertibleType>: Producer<S.E> { |
| Merge.swift:373 | typealias Selector = (SourceType, Int) throws -> S |
| Merge.swift:384 | override func run<O: ObserverType where O.E == S.E>(observer: O) -> Disposable { |
| Merge.swift:385 | let sink = FlatMapWithIndexSink<SourceType, S, O>(selector: _selector, observer: observer) |
: ObservableConvertibleType>: Producer<S.E> {
0373 typealias Selector| Merge.swift:377 | private let _selector: Selector |
| Merge.swift:379 | init(source: Observable<SourceType>, selector: Selector) { |
= (SourceType, Int) throws -> S
0374
0375 private let _source| Merge.swift:380 | _source = source |
| Merge.swift:386 | sink.disposable = sink.run(_source) |
: Observable<SourceType>
0376
0377 private let _selector| Merge.swift:381 | _selector = selector |
| Merge.swift:385 | let sink = FlatMapWithIndexSink<SourceType, S, O>(selector: _selector, observer: observer) |
: Selector
0378
0379 init| Observable+StandardSequenceOperators.swift:225 | return FlatMapWithIndex(source: asObservable(), selector: selector) |
(source: Observable<SourceType>, selector: Selector) {
0380 _source = source
0381 _selector = selector
0382 }
0383
0384 override func run<O: ObserverType where O.E == S.E>(observer: O) -> Disposable {
0385 let sink = FlatMapWithIndexSink<SourceType, S, O>(selector: _selector, observer: observer)
0386 sink.disposable = sink.run(_source)
0387 return sink
0388 }
0389
0390 }
0391
0392 final class FlatMapFirst| Observable+StandardSequenceOperators.swift:245 | return FlatMapFirst(source: asObservable(), selector: selector) |
<SourceType| Merge.swift:393 | typealias Selector = (SourceType) throws -> S |
| Merge.swift:395 | private let _source: Observable<SourceType> |
| Merge.swift:399 | init(source: Observable<SourceType>, selector: Selector) { |
| Merge.swift:405 | let sink = FlatMapFirstSink<SourceType, S, O>(selector: _selector, observer: observer) |
, S| Merge.swift:392 | final class FlatMapFirst<SourceType, S: ObservableConvertibleType>: Producer<S.E> { |
| Merge.swift:393 | typealias Selector = (SourceType) throws -> S |
| Merge.swift:404 | override func run<O: ObserverType where O.E == S.E>(observer: O) -> Disposable { |
| Merge.swift:405 | let sink = FlatMapFirstSink<SourceType, S, O>(selector: _selector, observer: observer) |
: ObservableConvertibleType>: Producer<S.E> {
0393 typealias Selector| Merge.swift:397 | private let _selector: Selector |
| Merge.swift:399 | init(source: Observable<SourceType>, selector: Selector) { |
= (SourceType) throws -> S
0394
0395 private let _source| Merge.swift:400 | _source = source |
| Merge.swift:406 | sink.disposable = sink.run(_source) |
: Observable<SourceType>
0396
0397 private let _selector| Merge.swift:401 | _selector = selector |
| Merge.swift:405 | let sink = FlatMapFirstSink<SourceType, S, O>(selector: _selector, observer: observer) |
: Selector
0398
0399 init| Observable+StandardSequenceOperators.swift:245 | return FlatMapFirst(source: asObservable(), selector: selector) |
(source: Observable<SourceType>, selector: Selector) {
0400 _source = source
0401 _selector = selector
0402 }
0403
0404 override func run<O: ObserverType where O.E == S.E>(observer: O) -> Disposable {
0405 let sink = FlatMapFirstSink<SourceType, S, O>(selector: _selector, observer: observer)
0406 sink.disposable = sink.run(_source)
0407 return sink
0408 }
0409 }
0410
0411 final class Merge| Observable+Multiple.swift:159 | return Merge(source: asObservable()) |
<S| Merge.swift:411 | final class Merge<S: ObservableConvertibleType> : Producer<S.E> { |
| Merge.swift:412 | private let _source: Observable<S> |
| Merge.swift:414 | init(source: Observable<S>) { |
| Merge.swift:418 | override func run<O: ObserverType where O.E == S.E>(observer: O) -> Disposable { |
| Merge.swift:419 | let sink = MergeBasicSink<S, O>(observer: observer) |
: ObservableConvertibleType> : Producer<S.E> {
0412 private let _source| Merge.swift:415 | _source = source |
| Merge.swift:420 | sink.disposable = sink.run(_source) |
: Observable<S>
0413
0414 init| Observable+Multiple.swift:159 | return Merge(source: asObservable()) |
(source: Observable<S>) {
0415 _source = source
0416 }
0417
0418 override func run<O: ObserverType where O.E == S.E>(observer: O) -> Disposable {
0419 let sink = MergeBasicSink<S, O>(observer: observer)
0420 sink.disposable = sink.run(_source)
0421 return sink
0422 }
0423 }
0424
0425