0001    //
0002    //  PublishSubject.swift
0003    //  Rx
0004    //
0005    //  Created by Krunoslav Zaher on 2/11/15.
0006    //  Copyright © 2015 Krunoslav Zaher. All rights reserved.
0007    //
0008    
0009    import Foundation
0010    
0011    /**
0012    Represents an object that is both an observable sequence as well as an observer.
0013    
0014    Each notification is broadcasted to all subscribed observers.
0015    */
0016    final public class PublishSubject
Observable+Binding.swift:72
        return self.multicast(PublishSubject())
PublishSubject.swift:22
    public typealias SubjectObserverType = PublishSubject<Element>
PublishSubject.swift:116
    public func asObserver() -> PublishSubject<Element> {
RetryWhen.swift:92
    private let _errorSubject = PublishSubject<Error>()
RetryWhen.swift:94
    private let _notifier = PublishSubject<TriggerObservable.E>()
Window.swift:23
    private var _subject = PublishSubject<Element>()
Window.swift:51
        _subject = PublishSubject<Element>()
<Element
PublishSubject.swift:17
    : Observable<Element>
PublishSubject.swift:22
    public typealias SubjectObserverType = PublishSubject<Element>
PublishSubject.swift:24
    typealias DisposeKey = Bag<AnyObserver<Element>>.KeyType
PublishSubject.swift:30
    private var _observers = Bag<AnyObserver<Element>>()
PublishSubject.swift:32
    private var _stoppedEvent = nil as Event<Element>?
PublishSubject.swift:55
    public func on(event: Event<Element>) {
PublishSubject.swift:84
    public override func subscribe<O : ObserverType where O.E == Element>(observer: O) -> Disposable {
PublishSubject.swift:116
    public func asObserver() -> PublishSubject<Element> {
> 0017 : Observable<Element> 0018 , SubjectType 0019 , Cancelable 0020 , ObserverType 0021 , SynchronizedUnsubscribeType { 0022 public typealias SubjectObserverType = PublishSubject<Element> 0023 0024 typealias DisposeKey
PublishSubject.swift:104
    func synchronizedUnsubscribe(disposeKey: DisposeKey) {
PublishSubject.swift:109
    func _synchronized_unsubscribe(disposeKey: DisposeKey) {
= Bag<AnyObserver<Element>>.KeyType 0025 0026 private var _lock
PublishSubject.swift:56
        _lock.lock(); defer { _lock.unlock() }
PublishSubject.swift:56
        _lock.lock(); defer { _lock.unlock() }
PublishSubject.swift:85
        _lock.lock(); defer { _lock.unlock() }
PublishSubject.swift:85
        _lock.lock(); defer { _lock.unlock() }
PublishSubject.swift:105
        _lock.lock(); defer { _lock.unlock() }
PublishSubject.swift:105
        _lock.lock(); defer { _lock.unlock() }
PublishSubject.swift:124
        _lock.lock(); defer { _lock.unlock() }
PublishSubject.swift:124
        _lock.lock(); defer { _lock.unlock() }
= NSRecursiveLock() 0027 0028 // state 0029 private var _disposed
PublishSubject.swift:39
            return _disposed
PublishSubject.swift:63
            if _disposed || _stopped {
PublishSubject.swift:95
        if _disposed {
PublishSubject.swift:129
        _disposed = true
= false 0030 private var _observers
PublishSubject.swift:67
            _observers.on(event)
PublishSubject.swift:72
                _observers.on(event)
PublishSubject.swift:73
                _observers.removeAll()
PublishSubject.swift:100
        let key = _observers.insert(observer.asObserver())
PublishSubject.swift:110
        _ = _observers.removeKey(disposeKey)
PublishSubject.swift:130
        _observers.removeAll()
= Bag<AnyObserver<Element>>() 0031 private var _stopped
PublishSubject.swift:63
            if _disposed || _stopped {
PublishSubject.swift:71
                _stopped = true
= false 0032 private var _stoppedEvent
PublishSubject.swift:69
            if _stoppedEvent == nil {
PublishSubject.swift:70
                _stoppedEvent = event
PublishSubject.swift:90
        if let stoppedEvent = _stoppedEvent {
PublishSubject.swift:131
        _stoppedEvent = nil
= nil as Event<Element>? 0033 0034 /** 0035 Indicates whether the subject has been disposed. 0036 */ 0037 public var disposed: Bool { 0038 get { 0039 return _disposed 0040 } 0041 } 0042 0043 /** 0044 Creates a subject. 0045 */ 0046 public override init
Observable+Binding.swift:72
        return self.multicast(PublishSubject())
() { 0047 super.init() 0048 } 0049 0050 /** 0051 Notifies all subscribed observers about next event. 0052 0053 - parameter event: Event to send to the observers. 0054 */ 0055 public func on
RetryWhen.swift:64
                _parent._errorSubject.on(.Next(failedWith))
Window.swift:50
        _subject.on(.Completed)
Window.swift:66
            _subject.on(.Next(element))
Window.swift:71
                _subject.on(.Error(e as ErrorType))
Window.swift:84
            _subject.on(.Error(error))
Window.swift:88
            _subject.on(.Completed)
(event: Event<Element>) { 0056 _lock.lock(); defer { _lock.unlock() } 0057 _synchronized_on(event) 0058 } 0059 0060 func _synchronized_on
PublishSubject.swift:57
        _synchronized_on(event)
(event: Event<E>) { 0061 switch event { 0062 case .Next(_): 0063 if _disposed || _stopped { 0064 return 0065 } 0066 0067 _observers.on(event) 0068 case .Completed, .Error: 0069 if _stoppedEvent == nil { 0070 _stoppedEvent = event 0071 _stopped = true 0072 _observers.on(event) 0073 _observers.removeAll() 0074 } 0075 } 0076 } 0077 0078 /** 0079 Subscribes an observer to the subject. 0080 0081 - parameter observer: Observer to subscribe to the subject. 0082 - returns: Disposable object that can be used to unsubscribe the observer from the subject. 0083 */ 0084 public override func subscribe
RetryWhen.swift:62
                let errorHandlerSubscription = _parent._notifier.subscribe(RetryTriggerSink(parent: self))
<O : ObserverType where O.E == Element>(observer: O) -> Disposable { 0085 _lock.lock(); defer { _lock.unlock() } 0086 return _synchronized_subscribe(observer) 0087 } 0088 0089 func _synchronized_subscribe
PublishSubject.swift:86
        return _synchronized_subscribe(observer)
<O : ObserverType where O.E == E>(observer: O) -> Disposable { 0090 if let stoppedEvent = _stoppedEvent { 0091 observer.on(stoppedEvent) 0092 return NopDisposable.instance 0093 } 0094 0095 if _disposed { 0096 observer.on(.Error(RxError.Disposed(object: self))) 0097 return NopDisposable.instance 0098 } 0099 0100 let key = _observers.insert(observer.asObserver()) 0101 return SubscriptionDisposable(owner: self, key: key) 0102 } 0103 0104 func synchronizedUnsubscribe(disposeKey: DisposeKey) { 0105 _lock.lock(); defer { _lock.unlock() } 0106 _synchronized_unsubscribe(disposeKey) 0107 } 0108 0109 func _synchronized_unsubscribe
PublishSubject.swift:106
        _synchronized_unsubscribe(disposeKey)
(disposeKey: DisposeKey) { 0110 _ = _observers.removeKey(disposeKey) 0111 } 0112 0113 /** 0114 Returns observer interface for subject. 0115 */ 0116 public func asObserver
RetryWhen.swift:128
        let triggerSubscription = _handler.subscribe(_notifier.asObserver())
() -> PublishSubject<Element> { 0117 return self 0118 } 0119 0120 /** 0121 Unsubscribe all observers and release resources. 0122 */ 0123 public func dispose() { 0124 _lock.lock(); defer { _lock.unlock() } 0125 _synchronized_dispose() 0126 } 0127 0128 final func _synchronized_dispose
PublishSubject.swift:125
        _synchronized_dispose()
() { 0129 _disposed = true 0130 _observers.removeAll() 0131 _stoppedEvent = nil 0132 } 0133 }