0001 // 0002 // PublishSubject.swift 0003 // Rx 0004 // 0005 // Created by Krunoslav Zaher on 2/11/15. 0006 // Copyright © 2015 Krunoslav Zaher. All rights reserved. 0007 // 0008 0009 import Foundation 0010 0011 /** 0012 Represents an object that is both an observable sequence as well as an observer. 0013 0014 Each notification is broadcasted to all subscribed observers. 0015 */ 0016 final public class PublishSubject<Element
Observable+Binding.swift:72 return self.multicast(PublishSubject())PublishSubject.swift:22 public typealias SubjectObserverType = PublishSubject<Element>PublishSubject.swift:116 public func asObserver() -> PublishSubject<Element> {RetryWhen.swift:92 private let _errorSubject = PublishSubject<Error>()RetryWhen.swift:94 private let _notifier = PublishSubject<TriggerObservable.E>()Window.swift:23 private var _subject = PublishSubject<Element>()Window.swift:51 _subject = PublishSubject<Element>()> 0017 : Observable<Element> 0018 , SubjectType 0019 , Cancelable 0020 , ObserverType 0021 , SynchronizedUnsubscribeType { 0022 public typealias SubjectObserverType = PublishSubject<Element> 0023 0024 typealias DisposeKey
PublishSubject.swift:17 : Observable<Element>PublishSubject.swift:22 public typealias SubjectObserverType = PublishSubject<Element>PublishSubject.swift:24 typealias DisposeKey = Bag<AnyObserver<Element>>.KeyTypePublishSubject.swift:30 private var _observers = Bag<AnyObserver<Element>>()PublishSubject.swift:32 private var _stoppedEvent = nil as Event<Element>?PublishSubject.swift:55 public func on(event: Event<Element>) {PublishSubject.swift:84 public override func subscribe<O : ObserverType where O.E == Element>(observer: O) -> Disposable {PublishSubject.swift:116 public func asObserver() -> PublishSubject<Element> {= Bag<AnyObserver<Element>>.KeyType 0025 0026 private var _lock
PublishSubject.swift:104 func synchronizedUnsubscribe(disposeKey: DisposeKey) {PublishSubject.swift:109 func _synchronized_unsubscribe(disposeKey: DisposeKey) {= NSRecursiveLock() 0027 0028 // state 0029 private var _disposed
PublishSubject.swift:56 _lock.lock(); defer { _lock.unlock() }PublishSubject.swift:56 _lock.lock(); defer { _lock.unlock() }PublishSubject.swift:85 _lock.lock(); defer { _lock.unlock() }PublishSubject.swift:85 _lock.lock(); defer { _lock.unlock() }PublishSubject.swift:105 _lock.lock(); defer { _lock.unlock() }PublishSubject.swift:105 _lock.lock(); defer { _lock.unlock() }PublishSubject.swift:124 _lock.lock(); defer { _lock.unlock() }PublishSubject.swift:124 _lock.lock(); defer { _lock.unlock() }= false 0030 private var _observers
PublishSubject.swift:39 return _disposedPublishSubject.swift:63 if _disposed || _stopped {PublishSubject.swift:95 if _disposed {PublishSubject.swift:129 _disposed = true= Bag<AnyObserver<Element>>() 0031 private var _stopped
PublishSubject.swift:67 _observers.on(event)PublishSubject.swift:72 _observers.on(event)PublishSubject.swift:73 _observers.removeAll()PublishSubject.swift:100 let key = _observers.insert(observer.asObserver())PublishSubject.swift:110 _ = _observers.removeKey(disposeKey)PublishSubject.swift:130 _observers.removeAll()= false 0032 private var _stoppedEvent
PublishSubject.swift:63 if _disposed || _stopped {PublishSubject.swift:71 _stopped = true= nil as Event<Element>? 0033 0034 /** 0035 Indicates whether the subject has been disposed. 0036 */ 0037 public var disposed: Bool { 0038 get { 0039 return _disposed 0040 } 0041 } 0042 0043 /** 0044 Creates a subject. 0045 */ 0046 public override init
PublishSubject.swift:69 if _stoppedEvent == nil {PublishSubject.swift:70 _stoppedEvent = eventPublishSubject.swift:90 if let stoppedEvent = _stoppedEvent {PublishSubject.swift:131 _stoppedEvent = nil() { 0047 super.init() 0048 } 0049 0050 /** 0051 Notifies all subscribed observers about next event. 0052 0053 - parameter event: Event to send to the observers. 0054 */ 0055 public func on
Observable+Binding.swift:72 return self.multicast(PublishSubject())(event: Event<Element>) { 0056 _lock.lock(); defer { _lock.unlock() } 0057 _synchronized_on(event) 0058 } 0059 0060 func _synchronized_on
RetryWhen.swift:64 _parent._errorSubject.on(.Next(failedWith))Window.swift:50 _subject.on(.Completed)Window.swift:66 _subject.on(.Next(element))Window.swift:71 _subject.on(.Error(e as ErrorType))Window.swift:84 _subject.on(.Error(error))Window.swift:88 _subject.on(.Completed)(event: Event<E>) { 0061 switch event { 0062 case .Next(_): 0063 if _disposed || _stopped { 0064 return 0065 } 0066 0067 _observers.on(event) 0068 case .Completed, .Error: 0069 if _stoppedEvent == nil { 0070 _stoppedEvent = event 0071 _stopped = true 0072 _observers.on(event) 0073 _observers.removeAll() 0074 } 0075 } 0076 } 0077 0078 /** 0079 Subscribes an observer to the subject. 0080 0081 - parameter observer: Observer to subscribe to the subject. 0082 - returns: Disposable object that can be used to unsubscribe the observer from the subject. 0083 */ 0084 public override func subscribe
PublishSubject.swift:57 _synchronized_on(event)<O : ObserverType where O.E == Element>(observer: O) -> Disposable { 0085 _lock.lock(); defer { _lock.unlock() } 0086 return _synchronized_subscribe(observer) 0087 } 0088 0089 func _synchronized_subscribe
RetryWhen.swift:62 let errorHandlerSubscription = _parent._notifier.subscribe(RetryTriggerSink(parent: self))<O : ObserverType where O.E == E>(observer: O) -> Disposable { 0090 if let stoppedEvent = _stoppedEvent { 0091 observer.on(stoppedEvent) 0092 return NopDisposable.instance 0093 } 0094 0095 if _disposed { 0096 observer.on(.Error(RxError.Disposed(object: self))) 0097 return NopDisposable.instance 0098 } 0099 0100 let key = _observers.insert(observer.asObserver()) 0101 return SubscriptionDisposable(owner: self, key: key) 0102 } 0103 0104 func synchronizedUnsubscribe(disposeKey: DisposeKey) { 0105 _lock.lock(); defer { _lock.unlock() } 0106 _synchronized_unsubscribe(disposeKey) 0107 } 0108 0109 func _synchronized_unsubscribe
PublishSubject.swift:86 return _synchronized_subscribe(observer)(disposeKey: DisposeKey) { 0110 _ = _observers.removeKey(disposeKey) 0111 } 0112 0113 /** 0114 Returns observer interface for subject. 0115 */ 0116 public func asObserver
PublishSubject.swift:106 _synchronized_unsubscribe(disposeKey)() -> PublishSubject<Element> { 0117 return self 0118 } 0119 0120 /** 0121 Unsubscribe all observers and release resources. 0122 */ 0123 public func dispose() { 0124 _lock.lock(); defer { _lock.unlock() } 0125 _synchronized_dispose() 0126 } 0127 0128 final func _synchronized_dispose
RetryWhen.swift:128 let triggerSubscription = _handler.subscribe(_notifier.asObserver())() { 0129 _disposed = true 0130 _observers.removeAll() 0131 _stoppedEvent = nil 0132 } 0133 }
PublishSubject.swift:125 _synchronized_dispose()