0001    //
0002    //  Merge.swift
0003    //  Rx
0004    //
0005    //  Created by Krunoslav Zaher on 3/28/15.
0006    //  Copyright © 2015 Krunoslav Zaher. All rights reserved.
0007    //
0008    
0009    import Foundation
0010    
0011    // MARK: Limited concurrency version
0012    
0013    class MergeLimitedSinkIter
Merge.swift:102
            let observer = MergeLimitedSinkIter(parent: self, disposeKey: key)
<S
Merge.swift:19
    typealias Parent = MergeLimitedSink<S, O>
: ObservableConvertibleType, O
Merge.swift:17
    typealias E = O.E
Merge.swift:19
    typealias Parent = MergeLimitedSink<S, O>
: ObserverType where S.E == O.E> 0014 : ObserverType 0015 , LockOwnerType 0016 , SynchronizedOnType { 0017 typealias E
Merge.swift:33
    func on(event: Event<E>) {
Merge.swift:37
    func _synchronized_on(event: Event<E>) {
= O.E 0018 typealias DisposeKey
Merge.swift:22
    private let _disposeKey: DisposeKey
Merge.swift:28
    init(parent: Parent, disposeKey: DisposeKey) {
= Bag<Disposable>.KeyType 0019 typealias Parent
Merge.swift:21
    private let _parent: Parent
Merge.swift:28
    init(parent: Parent, disposeKey: DisposeKey) {
= MergeLimitedSink<S, O> 0020 0021 private let _parent
Merge.swift:25
        return _parent._lock
Merge.swift:29
        _parent = parent
Merge.swift:40
            _parent.forwardOn(event)
Merge.swift:42
            _parent.forwardOn(event)
Merge.swift:43
            _parent.dispose()
Merge.swift:45
            _parent._group.removeDisposable(_disposeKey)
Merge.swift:46
            if let next = _parent._queue.dequeue() {
Merge.swift:47
                _parent.subscribe(next, group: _parent._group)
Merge.swift:47
                _parent.subscribe(next, group: _parent._group)
Merge.swift:50
                _parent._activeCount = _parent._activeCount - 1
Merge.swift:50
                _parent._activeCount = _parent._activeCount - 1
Merge.swift:52
                if _parent._stopped && _parent._activeCount == 0 {
Merge.swift:52
                if _parent._stopped && _parent._activeCount == 0 {
Merge.swift:53
                    _parent.forwardOn(.Completed)
Merge.swift:54
                    _parent.dispose()
: Parent 0022 private let _disposeKey
Merge.swift:30
        _disposeKey = disposeKey
Merge.swift:45
            _parent._group.removeDisposable(_disposeKey)
: DisposeKey 0023 0024 var _lock: NSRecursiveLock { 0025 return _parent._lock 0026 } 0027 0028 init
Merge.swift:102
            let observer = MergeLimitedSinkIter(parent: self, disposeKey: key)
(parent: Parent, disposeKey: DisposeKey) { 0029 _parent = parent 0030 _disposeKey = disposeKey 0031 } 0032 0033 func on(event: Event<E>) { 0034 synchronizedOn(event) 0035 } 0036 0037 func _synchronized_on(event: Event<E>) { 0038 switch event { 0039 case .Next: 0040 _parent.forwardOn(event) 0041 case .Error: 0042 _parent.forwardOn(event) 0043 _parent.dispose() 0044 case .Completed: 0045 _parent._group.removeDisposable(_disposeKey) 0046 if let next = _parent._queue.dequeue() { 0047 _parent.subscribe(next, group: _parent._group) 0048 } 0049 else { 0050 _parent._activeCount = _parent._activeCount - 1 0051 0052 if _parent._stopped && _parent._activeCount == 0 { 0053 _parent.forwardOn(.Completed) 0054 _parent.dispose() 0055 } 0056 } 0057 } 0058 } 0059 } 0060 0061 class MergeLimitedSink
Merge.swift:19
    typealias Parent = MergeLimitedSink<S, O>
Merge.swift:156
        let sink = MergeLimitedSink<S, O>(maxConcurrent: _maxConcurrent, observer: observer)
<S
Merge.swift:66
    typealias E = S
Merge.swift:67
    typealias QueueType = Queue<S>
Merge.swift:88
    func run(source: Observable<S>) -> Disposable {
: ObservableConvertibleType, O
Merge.swift:62
    : Sink<O>
Merge.swift:81
    init(maxConcurrent: Int, observer: O) {
: ObserverType where S.E == O.E> 0062 : Sink<O> 0063 , ObserverType 0064 , LockOwnerType 0065 , SynchronizedOnType { 0066 typealias E
Merge.swift:96
    func subscribe(innerSource: E, group: CompositeDisposable) {
Merge.swift:109
    func on(event: Event<E>) {
Merge.swift:113
    func _synchronized_on(event: Event<E>) {
= S 0067 typealias QueueType
Merge.swift:76
    private var _queue = QueueType(capacity: 2)
= Queue<S> 0068 0069 private let _maxConcurrent
Merge.swift:82
        _maxConcurrent = maxConcurrent
Merge.swift:117
            if _activeCount < _maxConcurrent {
: Int 0070 0071 let _lock
Merge.swift:25
        return _parent._lock
= NSRecursiveLock() 0072 0073 // state 0074 private var _stopped
Merge.swift:52
                if _parent._stopped && _parent._activeCount == 0 {
Merge.swift:141
            _stopped = true
= false 0075 private var _activeCount
Merge.swift:50
                _parent._activeCount = _parent._activeCount - 1
Merge.swift:50
                _parent._activeCount = _parent._activeCount - 1
Merge.swift:52
                if _parent._stopped && _parent._activeCount == 0 {
Merge.swift:117
            if _activeCount < _maxConcurrent {
Merge.swift:118
                _activeCount += 1
Merge.swift:133
            if _activeCount == 0 {
= 0 0076 private var _queue
Merge.swift:46
            if let next = _parent._queue.dequeue() {
Merge.swift:122
                _queue.enqueue(value)
= QueueType(capacity: 2) 0077 0078 private let _sourceSubscription
Merge.swift:84
        _group.addDisposable(_sourceSubscription)
Merge.swift:89
        _group.addDisposable(_sourceSubscription)
Merge.swift:92
        _sourceSubscription.disposable = disposable
Merge.swift:138
                _sourceSubscription.dispose()
= SingleAssignmentDisposable() 0079 private let _group
Merge.swift:45
            _parent._group.removeDisposable(_disposeKey)
Merge.swift:47
                _parent.subscribe(next, group: _parent._group)
Merge.swift:84
        _group.addDisposable(_sourceSubscription)
Merge.swift:89
        _group.addDisposable(_sourceSubscription)
Merge.swift:93
        return _group
Merge.swift:127
                self.subscribe(value, group: _group)
= CompositeDisposable() 0080 0081 init(maxConcurrent: Int, observer: O) { 0082 _maxConcurrent = maxConcurrent 0083 0084 _group.addDisposable(_sourceSubscription) 0085 super.init(observer: observer) 0086 } 0087 0088 func run
Merge.swift:157
        sink.disposable = sink.run(_source)
(source: Observable<S>) -> Disposable { 0089 _group.addDisposable(_sourceSubscription) 0090 0091 let disposable = source.subscribe(self) 0092 _sourceSubscription.disposable = disposable 0093 return _group 0094 } 0095 0096 func subscribe
Merge.swift:47
                _parent.subscribe(next, group: _parent._group)
Merge.swift:127
                self.subscribe(value, group: _group)
(innerSource: E, group: CompositeDisposable) { 0097 let subscription = SingleAssignmentDisposable() 0098 0099 let key = group.addDisposable(subscription) 0100 0101 if let key = key { 0102 let observer = MergeLimitedSinkIter(parent: self, disposeKey: key) 0103 0104 let disposable = innerSource.asObservable().subscribe(observer) 0105 subscription.disposable = disposable 0106 } 0107 } 0108 0109 func on(event: Event<E>) { 0110 synchronizedOn(event) 0111 } 0112 0113 func _synchronized_on(event: Event<E>) { 0114 switch event { 0115 case .Next(let value): 0116 let subscribe: Bool 0117 if _activeCount < _maxConcurrent { 0118 _activeCount += 1 0119 subscribe = true 0120 } 0121 else { 0122 _queue.enqueue(value) 0123 subscribe = false 0124 } 0125 0126 if subscribe { 0127 self.subscribe(value, group: _group) 0128 } 0129 case .Error(let error): 0130 forwardOn(.Error(error)) 0131 dispose() 0132 case .Completed: 0133 if _activeCount == 0 { 0134 forwardOn(.Completed) 0135 dispose() 0136 } 0137 else { 0138 _sourceSubscription.dispose() 0139 } 0140 0141 _stopped = true 0142 } 0143 } 0144 } 0145 0146 class MergeLimited
Observable+Multiple.swift:173
        return MergeLimited(source: asObservable(), maxConcurrent: maxConcurrent)
<S
Merge.swift:146
class MergeLimited<S: ObservableConvertibleType> : Producer<S.E> {
Merge.swift:147
    private let _source: Observable<S>
Merge.swift:150
    init(source: Observable<S>, maxConcurrent: Int) {
Merge.swift:155
    override func run<O: ObserverType where O.E == S.E>(observer: O) -> Disposable {
Merge.swift:156
        let sink = MergeLimitedSink<S, O>(maxConcurrent: _maxConcurrent, observer: observer)
: ObservableConvertibleType> : Producer<S.E> { 0147 private let _source
Merge.swift:151
        _source = source
Merge.swift:157
        sink.disposable = sink.run(_source)
: Observable<S> 0148 private let _maxConcurrent
Merge.swift:152
        _maxConcurrent = maxConcurrent
Merge.swift:156
        let sink = MergeLimitedSink<S, O>(maxConcurrent: _maxConcurrent, observer: observer)
: Int 0149 0150 init
Observable+Multiple.swift:173
        return MergeLimited(source: asObservable(), maxConcurrent: maxConcurrent)
(source: Observable<S>, maxConcurrent: Int) { 0151 _source = source 0152 _maxConcurrent = maxConcurrent 0153 } 0154 0155 override func run<O: ObserverType where O.E == S.E>(observer: O) -> Disposable { 0156 let sink = MergeLimitedSink<S, O>(maxConcurrent: _maxConcurrent, observer: observer) 0157 sink.disposable = sink.run(_source) 0158 return sink 0159 } 0160 } 0161 0162 // MARK: Merge 0163 0164 final class MergeBasicSink
Merge.swift:419
        let sink = MergeBasicSink<S, O>(observer: observer)
<S
Merge.swift:164
final class MergeBasicSink<S: ObservableConvertibleType, O: ObserverType where O.E == S.E> : MergeSink<S, S, O> {
Merge.swift:164
final class MergeBasicSink<S: ObservableConvertibleType, O: ObserverType where O.E == S.E> : MergeSink<S, S, O> {
Merge.swift:169
    override func performMap(element: S) throws -> S {
Merge.swift:169
    override func performMap(element: S) throws -> S {
: ObservableConvertibleType, O
Merge.swift:164
final class MergeBasicSink<S: ObservableConvertibleType, O: ObserverType where O.E == S.E> : MergeSink<S, S, O> {
Merge.swift:165
    override init(observer: O) {
: ObserverType where O.E == S.E> : MergeSink<S, S, O> { 0165 override init(observer: O) { 0166 super.init(observer: observer) 0167 } 0168 0169 override func performMap(element: S) throws -> S { 0170 return element 0171 } 0172 } 0173 0174 // MARK: flatMap 0175 0176 final class FlatMapSink
Merge.swift:366
        let sink = FlatMapSink(selector: _selector, observer: observer)
<SourceType
Merge.swift:176
final class FlatMapSink<SourceType, S: ObservableConvertibleType, O: ObserverType where O.E == S.E> : MergeSink<SourceType, S, O> {
Merge.swift:177
    typealias Selector = (SourceType) throws -> S
Merge.swift:186
    override func performMap(element: SourceType) throws -> S {
, S
Merge.swift:176
final class FlatMapSink<SourceType, S: ObservableConvertibleType, O: ObserverType where O.E == S.E> : MergeSink<SourceType, S, O> {
Merge.swift:177
    typealias Selector = (SourceType) throws -> S
Merge.swift:186
    override func performMap(element: SourceType) throws -> S {
: ObservableConvertibleType, O
Merge.swift:176
final class FlatMapSink<SourceType, S: ObservableConvertibleType, O: ObserverType where O.E == S.E> : MergeSink<SourceType, S, O> {
Merge.swift:181
    init(selector: Selector, observer: O) {
: ObserverType where O.E == S.E> : MergeSink<SourceType, S, O> { 0177 typealias Selector
Merge.swift:179
    private let _selector: Selector
Merge.swift:181
    init(selector: Selector, observer: O) {
= (SourceType) throws -> S 0178 0179 private let _selector
Merge.swift:182
        _selector = selector
Merge.swift:187
        return try _selector(element)
: Selector 0180 0181 init
Merge.swift:366
        let sink = FlatMapSink(selector: _selector, observer: observer)
(selector: Selector, observer: O) { 0182 _selector = selector 0183 super.init(observer: observer) 0184 } 0185 0186 override func performMap(element: SourceType) throws -> S { 0187 return try _selector(element) 0188 } 0189 } 0190 0191 final class FlatMapWithIndexSink
Merge.swift:385
        let sink = FlatMapWithIndexSink<SourceType, S, O>(selector: _selector, observer: observer)
<SourceType
Merge.swift:191
final class FlatMapWithIndexSink<SourceType, S: ObservableConvertibleType, O: ObserverType where O.E == S.E> : MergeSink<SourceType, S, O> {
Merge.swift:192
    typealias Selector = (SourceType, Int) throws -> S
Merge.swift:202
    override func performMap(element: SourceType) throws -> S {
, S
Merge.swift:191
final class FlatMapWithIndexSink<SourceType, S: ObservableConvertibleType, O: ObserverType where O.E == S.E> : MergeSink<SourceType, S, O> {
Merge.swift:192
    typealias Selector = (SourceType, Int) throws -> S
Merge.swift:202
    override func performMap(element: SourceType) throws -> S {
: ObservableConvertibleType, O
Merge.swift:191
final class FlatMapWithIndexSink<SourceType, S: ObservableConvertibleType, O: ObserverType where O.E == S.E> : MergeSink<SourceType, S, O> {
Merge.swift:197
    init(selector: Selector, observer: O) {
: ObserverType where O.E == S.E> : MergeSink<SourceType, S, O> { 0192 typealias Selector
Merge.swift:195
    private let _selector: Selector
Merge.swift:197
    init(selector: Selector, observer: O) {
= (SourceType, Int) throws -> S 0193 0194 private var _index
Merge.swift:203
        return try _selector(element, try incrementChecked(&_index))
= 0 0195 private let _selector
Merge.swift:198
        _selector = selector
Merge.swift:203
        return try _selector(element, try incrementChecked(&_index))
: Selector 0196 0197 init(selector: Selector, observer: O) { 0198 _selector = selector 0199 super.init(observer: observer) 0200 } 0201 0202 override func performMap(element: SourceType) throws -> S { 0203 return try _selector(element, try incrementChecked(&_index)) 0204 } 0205 } 0206 0207 // MARK: FlatMapFirst 0208 0209 final class FlatMapFirstSink
Merge.swift:405
        let sink = FlatMapFirstSink<SourceType, S, O>(selector: _selector, observer: observer)
<SourceType
Merge.swift:209
final class FlatMapFirstSink<SourceType, S: ObservableConvertibleType, O: ObserverType where O.E == S.E> : MergeSink<SourceType, S, O> {
Merge.swift:210
    typealias Selector = (SourceType) throws -> S
Merge.swift:223
    override func performMap(element: SourceType) throws -> S {
, S
Merge.swift:209
final class FlatMapFirstSink<SourceType, S: ObservableConvertibleType, O: ObserverType where O.E == S.E> : MergeSink<SourceType, S, O> {
Merge.swift:210
    typealias Selector = (SourceType) throws -> S
Merge.swift:223
    override func performMap(element: SourceType) throws -> S {
: ObservableConvertibleType, O
Merge.swift:209
final class FlatMapFirstSink<SourceType, S: ObservableConvertibleType, O: ObserverType where O.E == S.E> : MergeSink<SourceType, S, O> {
Merge.swift:218
    init(selector: Selector, observer: O) {
: ObserverType where O.E == S.E> : MergeSink<SourceType, S, O> { 0210 typealias Selector
Merge.swift:212
    private let _selector: Selector
Merge.swift:218
    init(selector: Selector, observer: O) {
= (SourceType) throws -> S 0211 0212 private let _selector
Merge.swift:219
        _selector = selector
Merge.swift:224
        return try _selector(element)
: Selector 0213 0214 override var subscribeNext: Bool { 0215 return _group.count == MergeNoIterators 0216 } 0217 0218 init(selector: Selector, observer: O) { 0219 _selector = selector 0220 super.init(observer: observer) 0221 } 0222 0223 override func performMap(element: SourceType) throws -> S { 0224 return try _selector(element) 0225 } 0226 } 0227 0228 // It's value is one because initial source subscription is always in CompositeDisposable 0229 private let MergeNoIterators
Merge.swift:215
        return _group.count == MergeNoIterators
Merge.swift:262
            if _parent._stopped && _parent._group.count == MergeNoIterators {
Merge.swift:321
                if _group.count == MergeNoIterators {
= 1 0230 0231 class MergeSinkIter
Merge.swift:335
            let iter = MergeSinkIter(parent: self, disposeKey: disposeKey)
<SourceType
Merge.swift:232
    typealias Parent = MergeSink<SourceType, S, O>
, S
Merge.swift:232
    typealias Parent = MergeSink<SourceType, S, O>
: ObservableConvertibleType, O
Merge.swift:232
    typealias Parent = MergeSink<SourceType, S, O>
Merge.swift:234
    typealias E = O.E
: ObserverType where O.E == S.E> : ObserverType { 0232 typealias Parent
Merge.swift:236
    private let _parent: Parent
Merge.swift:239
    init(parent: Parent, disposeKey: DisposeKey) {
= MergeSink<SourceType, S, O> 0233 typealias DisposeKey
Merge.swift:237
    private let _disposeKey: DisposeKey
Merge.swift:239
    init(parent: Parent, disposeKey: DisposeKey) {
= CompositeDisposable.DisposeKey 0234 typealias E
Merge.swift:244
    func on(event: Event<E>) {
= O.E 0235 0236 private let _parent
Merge.swift:240
        _parent = parent
Merge.swift:247
            _parent._lock.lock(); defer { _parent._lock.unlock() } // lock {
Merge.swift:247
            _parent._lock.lock(); defer { _parent._lock.unlock() } // lock {
Merge.swift:248
                _parent.forwardOn(.Next(value))
Merge.swift:251
            _parent._lock.lock(); defer { _parent._lock.unlock() } // lock {
Merge.swift:251
            _parent._lock.lock(); defer { _parent._lock.unlock() } // lock {
Merge.swift:252
                _parent.forwardOn(.Error(error))
Merge.swift:253
                _parent.dispose()
Merge.swift:256
            _parent._group.removeDisposable(_disposeKey)
Merge.swift:262
            if _parent._stopped && _parent._group.count == MergeNoIterators {
Merge.swift:262
            if _parent._stopped && _parent._group.count == MergeNoIterators {
Merge.swift:263
                _parent._lock.lock(); defer { _parent._lock.unlock() } // lock {
Merge.swift:263
                _parent._lock.lock(); defer { _parent._lock.unlock() } // lock {
Merge.swift:264
                    _parent.forwardOn(.Completed)
Merge.swift:265
                    _parent.dispose()
: Parent 0237 private let _disposeKey
Merge.swift:241
        _disposeKey = disposeKey
Merge.swift:256
            _parent._group.removeDisposable(_disposeKey)
: DisposeKey 0238 0239 init
Merge.swift:335
            let iter = MergeSinkIter(parent: self, disposeKey: disposeKey)
(parent: Parent, disposeKey: DisposeKey) { 0240 _parent = parent 0241 _disposeKey = disposeKey 0242 } 0243 0244 func on(event: Event<E>) { 0245 switch event { 0246 case .Next(let value): 0247 _parent._lock.lock(); defer { _parent._lock.unlock() } // lock { 0248 _parent.forwardOn(.Next(value)) 0249 // } 0250 case .Error(let error): 0251 _parent._lock.lock(); defer { _parent._lock.unlock() } // lock { 0252 _parent.forwardOn(.Error(error)) 0253 _parent.dispose() 0254 // } 0255 case .Completed: 0256 _parent._group.removeDisposable(_disposeKey) 0257 // If this has returned true that means that `Completed` should be sent. 0258 // In case there is a race who will sent first completed, 0259 // lock will sort it out. When first Completed message is sent 0260 // it will set observer to nil, and thus prevent further complete messages 0261 // to be sent, and thus preserving the sequence grammar. 0262 if _parent._stopped && _parent._group.count == MergeNoIterators { 0263 _parent._lock.lock(); defer { _parent._lock.unlock() } // lock { 0264 _parent.forwardOn(.Completed) 0265 _parent.dispose() 0266 // } 0267 } 0268 } 0269 } 0270 } 0271 0272 0273 class MergeSink
Merge.swift:164
final class MergeBasicSink<S: ObservableConvertibleType, O: ObserverType where O.E == S.E> : MergeSink<S, S, O> {
Merge.swift:176
final class FlatMapSink<SourceType, S: ObservableConvertibleType, O: ObserverType where O.E == S.E> : MergeSink<SourceType, S, O> {
Merge.swift:191
final class FlatMapWithIndexSink<SourceType, S: ObservableConvertibleType, O: ObserverType where O.E == S.E> : MergeSink<SourceType, S, O> {
Merge.swift:209
final class FlatMapFirstSink<SourceType, S: ObservableConvertibleType, O: ObserverType where O.E == S.E> : MergeSink<SourceType, S, O> {
Merge.swift:232
    typealias Parent = MergeSink<SourceType, S, O>
<SourceType
Merge.swift:277
    typealias Element = SourceType
Merge.swift:295
    func performMap(element: SourceType) throws -> S {
Merge.swift:299
    func on(event: Event<SourceType>) {
Merge.swift:341
    func run(source: Observable<SourceType>) -> Disposable {
, S
Merge.swift:295
    func performMap(element: SourceType) throws -> S {
: ObservableConvertibleType, O
Merge.swift:274
    : Sink<O>
Merge.swift:276
    typealias ResultType = O.E
Merge.swift:291
    override init(observer: O) {
Merge.swift:332
    func subscribeInner(source: Observable<O.E>) {
: ObserverType where O.E == S.E> 0274 : Sink<O> 0275 , ObserverType { 0276 typealias ResultType = O.E 0277 typealias Element = SourceType 0278 0279 private let _lock
Merge.swift:247
            _parent._lock.lock(); defer { _parent._lock.unlock() } // lock {
Merge.swift:247
            _parent._lock.lock(); defer { _parent._lock.unlock() } // lock {
Merge.swift:251
            _parent._lock.lock(); defer { _parent._lock.unlock() } // lock {
Merge.swift:251
            _parent._lock.lock(); defer { _parent._lock.unlock() } // lock {
Merge.swift:263
                _parent._lock.lock(); defer { _parent._lock.unlock() } // lock {
Merge.swift:263
                _parent._lock.lock(); defer { _parent._lock.unlock() } // lock {
Merge.swift:314
            _lock.lock(); defer { _lock.unlock() } // lock {
Merge.swift:314
            _lock.lock(); defer { _lock.unlock() } // lock {
Merge.swift:319
            _lock.lock(); defer { _lock.unlock() } // lock {
Merge.swift:319
            _lock.lock(); defer { _lock.unlock() } // lock {
= NSRecursiveLock() 0280 0281 private var subscribeNext
Merge.swift:302
            if !subscribeNext {
: Bool { 0282 return true 0283 } 0284 0285 // state 0286 private let _group
Merge.swift:215
        return _group.count == MergeNoIterators
Merge.swift:256
            _parent._group.removeDisposable(_disposeKey)
Merge.swift:262
            if _parent._stopped && _parent._group.count == MergeNoIterators {
Merge.swift:321
                if _group.count == MergeNoIterators {
Merge.swift:334
        if let disposeKey = _group.addDisposable(iterDisposable) {
Merge.swift:342
        _group.addDisposable(_sourceSubscription)
Merge.swift:347
        return _group
= CompositeDisposable() 0287 private let _sourceSubscription
Merge.swift:326
                    _sourceSubscription.dispose()
Merge.swift:342
        _group.addDisposable(_sourceSubscription)
Merge.swift:345
        _sourceSubscription.disposable = subscription
= SingleAssignmentDisposable() 0288 0289 private var _stopped
Merge.swift:262
            if _parent._stopped && _parent._group.count == MergeNoIterators {
Merge.swift:320
                _stopped = true
= false 0290 0291 override init
Merge.swift:166
        super.init(observer: observer)
Merge.swift:183
        super.init(observer: observer)
Merge.swift:199
        super.init(observer: observer)
Merge.swift:220
        super.init(observer: observer)
(observer: O) { 0292 super.init(observer: observer) 0293 } 0294 0295 func performMap
Merge.swift:306
                let value = try performMap(element)
(element: SourceType) throws -> S { 0296 abstractMethod() 0297 } 0298 0299 func on(event: Event<SourceType>) { 0300 switch event { 0301 case .Next(let element): 0302 if !subscribeNext { 0303 return 0304 } 0305 do { 0306 let value = try performMap(element) 0307 subscribeInner(value.asObservable()) 0308 } 0309 catch let e { 0310 forwardOn(.Error(e)) 0311 dispose() 0312 } 0313 case .Error(let error): 0314 _lock.lock(); defer { _lock.unlock() } // lock { 0315 forwardOn(.Error(error)) 0316 dispose() 0317 // } 0318 case .Completed: 0319 _lock.lock(); defer { _lock.unlock() } // lock { 0320 _stopped = true 0321 if _group.count == MergeNoIterators { 0322 forwardOn(.Completed) 0323 dispose() 0324 } 0325 else { 0326 _sourceSubscription.dispose() 0327 } 0328 //} 0329 } 0330 } 0331 0332 func subscribeInner
Merge.swift:307
                subscribeInner(value.asObservable())
(source: Observable<O.E>) { 0333 let iterDisposable = SingleAssignmentDisposable() 0334 if let disposeKey = _group.addDisposable(iterDisposable) { 0335 let iter = MergeSinkIter(parent: self, disposeKey: disposeKey) 0336 let subscription = source.subscribe(iter) 0337 iterDisposable.disposable = subscription 0338 } 0339 } 0340 0341 func run
Merge.swift:367
        sink.disposable = sink.run(_source)
Merge.swift:386
        sink.disposable = sink.run(_source)
Merge.swift:406
        sink.disposable = sink.run(_source)
Merge.swift:420
        sink.disposable = sink.run(_source)
(source: Observable<SourceType>) -> Disposable { 0342 _group.addDisposable(_sourceSubscription) 0343 0344 let subscription = source.subscribe(self) 0345 _sourceSubscription.disposable = subscription 0346 0347 return _group 0348 } 0349 } 0350 0351 // MARK: Producers 0352 0353 final class FlatMap
Observable+StandardSequenceOperators.swift:211
        return FlatMap(source: asObservable(), selector: selector)
<SourceType
Merge.swift:354
    typealias Selector = (SourceType) throws -> S
Merge.swift:356
    private let _source: Observable<SourceType>
Merge.swift:360
    init(source: Observable<SourceType>, selector: Selector) {
, S
Merge.swift:353
final class FlatMap<SourceType, S: ObservableConvertibleType>: Producer<S.E> {
Merge.swift:354
    typealias Selector = (SourceType) throws -> S
Merge.swift:365
    override func run<O: ObserverType where O.E == S.E>(observer: O) -> Disposable {
: ObservableConvertibleType>: Producer<S.E> { 0354 typealias Selector
Merge.swift:358
    private let _selector: Selector
Merge.swift:360
    init(source: Observable<SourceType>, selector: Selector) {
= (SourceType) throws -> S 0355 0356 private let _source
Merge.swift:361
        _source = source
Merge.swift:367
        sink.disposable = sink.run(_source)
: Observable<SourceType> 0357 0358 private let _selector
Merge.swift:362
        _selector = selector
Merge.swift:366
        let sink = FlatMapSink(selector: _selector, observer: observer)
: Selector 0359 0360 init
Observable+StandardSequenceOperators.swift:211
        return FlatMap(source: asObservable(), selector: selector)
(source: Observable<SourceType>, selector: Selector) { 0361 _source = source 0362 _selector = selector 0363 } 0364 0365 override func run<O: ObserverType where O.E == S.E>(observer: O) -> Disposable { 0366 let sink = FlatMapSink(selector: _selector, observer: observer) 0367 sink.disposable = sink.run(_source) 0368 return sink 0369 } 0370 } 0371 0372 final class FlatMapWithIndex
Observable+StandardSequenceOperators.swift:225
        return FlatMapWithIndex(source: asObservable(), selector: selector)
<SourceType
Merge.swift:373
    typealias Selector = (SourceType, Int) throws -> S
Merge.swift:375
    private let _source: Observable<SourceType>
Merge.swift:379
    init(source: Observable<SourceType>, selector: Selector) {
Merge.swift:385
        let sink = FlatMapWithIndexSink<SourceType, S, O>(selector: _selector, observer: observer)
, S
Merge.swift:372
final class FlatMapWithIndex<SourceType, S: ObservableConvertibleType>: Producer<S.E> {
Merge.swift:373
    typealias Selector = (SourceType, Int) throws -> S
Merge.swift:384
    override func run<O: ObserverType where O.E == S.E>(observer: O) -> Disposable {
Merge.swift:385
        let sink = FlatMapWithIndexSink<SourceType, S, O>(selector: _selector, observer: observer)
: ObservableConvertibleType>: Producer<S.E> { 0373 typealias Selector
Merge.swift:377
    private let _selector: Selector
Merge.swift:379
    init(source: Observable<SourceType>, selector: Selector) {
= (SourceType, Int) throws -> S 0374 0375 private let _source
Merge.swift:380
        _source = source
Merge.swift:386
        sink.disposable = sink.run(_source)
: Observable<SourceType> 0376 0377 private let _selector
Merge.swift:381
        _selector = selector
Merge.swift:385
        let sink = FlatMapWithIndexSink<SourceType, S, O>(selector: _selector, observer: observer)
: Selector 0378 0379 init
Observable+StandardSequenceOperators.swift:225
        return FlatMapWithIndex(source: asObservable(), selector: selector)
(source: Observable<SourceType>, selector: Selector) { 0380 _source = source 0381 _selector = selector 0382 } 0383 0384 override func run<O: ObserverType where O.E == S.E>(observer: O) -> Disposable { 0385 let sink = FlatMapWithIndexSink<SourceType, S, O>(selector: _selector, observer: observer) 0386 sink.disposable = sink.run(_source) 0387 return sink 0388 } 0389 0390 } 0391 0392 final class FlatMapFirst
Observable+StandardSequenceOperators.swift:245
        return FlatMapFirst(source: asObservable(), selector: selector)
<SourceType
Merge.swift:393
    typealias Selector = (SourceType) throws -> S
Merge.swift:395
    private let _source: Observable<SourceType>
Merge.swift:399
    init(source: Observable<SourceType>, selector: Selector) {
Merge.swift:405
        let sink = FlatMapFirstSink<SourceType, S, O>(selector: _selector, observer: observer)
, S
Merge.swift:392
final class FlatMapFirst<SourceType, S: ObservableConvertibleType>: Producer<S.E> {
Merge.swift:393
    typealias Selector = (SourceType) throws -> S
Merge.swift:404
    override func run<O: ObserverType where O.E == S.E>(observer: O) -> Disposable {
Merge.swift:405
        let sink = FlatMapFirstSink<SourceType, S, O>(selector: _selector, observer: observer)
: ObservableConvertibleType>: Producer<S.E> { 0393 typealias Selector
Merge.swift:397
    private let _selector: Selector
Merge.swift:399
    init(source: Observable<SourceType>, selector: Selector) {
= (SourceType) throws -> S 0394 0395 private let _source
Merge.swift:400
        _source = source
Merge.swift:406
        sink.disposable = sink.run(_source)
: Observable<SourceType> 0396 0397 private let _selector
Merge.swift:401
        _selector = selector
Merge.swift:405
        let sink = FlatMapFirstSink<SourceType, S, O>(selector: _selector, observer: observer)
: Selector 0398 0399 init
Observable+StandardSequenceOperators.swift:245
        return FlatMapFirst(source: asObservable(), selector: selector)
(source: Observable<SourceType>, selector: Selector) { 0400 _source = source 0401 _selector = selector 0402 } 0403 0404 override func run<O: ObserverType where O.E == S.E>(observer: O) -> Disposable { 0405 let sink = FlatMapFirstSink<SourceType, S, O>(selector: _selector, observer: observer) 0406 sink.disposable = sink.run(_source) 0407 return sink 0408 } 0409 } 0410 0411 final class Merge
Observable+Multiple.swift:159
        return Merge(source: asObservable())
<S
Merge.swift:411
final class Merge<S: ObservableConvertibleType> : Producer<S.E> {
Merge.swift:412
    private let _source: Observable<S>
Merge.swift:414
    init(source: Observable<S>) {
Merge.swift:418
    override func run<O: ObserverType where O.E == S.E>(observer: O) -> Disposable {
Merge.swift:419
        let sink = MergeBasicSink<S, O>(observer: observer)
: ObservableConvertibleType> : Producer<S.E> { 0412 private let _source
Merge.swift:415
        _source = source
Merge.swift:420
        sink.disposable = sink.run(_source)
: Observable<S> 0413 0414 init
Observable+Multiple.swift:159
        return Merge(source: asObservable())
(source: Observable<S>) { 0415 _source = source 0416 } 0417 0418 override func run<O: ObserverType where O.E == S.E>(observer: O) -> Disposable { 0419 let sink = MergeBasicSink<S, O>(observer: observer) 0420 sink.disposable = sink.run(_source) 0421 return sink 0422 } 0423 } 0424 0425