0001    //
0002    //  ReplaySubject.swift
0003    //  RxSwift
0004    //
0005    //  Created by Krunoslav Zaher on 4/14/15.
0006    //  Copyright © 2015 Krunoslav Zaher. All rights reserved.
0007    //
0008    
0009    import Foundation
0010    
0011    /**
0012    Represents an object that is both an observable sequence as well as an observer.
0013    
0014    Each notification is broadcasted to all subscribed and future observers, subject to buffer trimming policies.
0015    */
0016    public class ReplaySubject
Observable+Binding.swift:93
        return self.multicast(ReplaySubject.create(bufferSize: bufferSize))
Observable+Binding.swift:108
        return self.multicast(ReplaySubject.createUnbounded())
ReplaySubject.swift:21
    public typealias SubjectObserverType = ReplaySubject<Element>
ReplaySubject.swift:57
    public static func create(bufferSize bufferSize: Int) -> ReplaySubject<Element> {
ReplaySubject.swift:71
	public static func createUnbounded() -> ReplaySubject<Element> {
ReplaySubject.swift:77
    : ReplaySubject<Element>
<Element
ReplaySubject.swift:17
    : Observable<Element>
ReplaySubject.swift:21
    public typealias SubjectObserverType = ReplaySubject<Element>
ReplaySubject.swift:23
    typealias DisposeKey = Bag<AnyObserver<Element>>.KeyType
ReplaySubject.swift:57
    public static func create(bufferSize bufferSize: Int) -> ReplaySubject<Element> {
ReplaySubject.swift:71
	public static func createUnbounded() -> ReplaySubject<Element> {
> 0017 : Observable<Element> 0018 , SubjectType 0019 , ObserverType 0020 , Disposable { 0021 public typealias SubjectObserverType
ReplaySubject.swift:41
    public func asObserver() -> SubjectObserverType {
= ReplaySubject<Element> 0022 0023 typealias DisposeKey
ReplaySubject.swift:25
    func unsubscribe(key: DisposeKey) {
ReplaySubject.swift:150
    func synchronizedUnsubscribe(disposeKey: DisposeKey) {
ReplaySubject.swift:155
    func _synchronized_unsubscribe(disposeKey: DisposeKey) {
= Bag<AnyObserver<Element>>.KeyType 0024 0025 func unsubscribe(key: DisposeKey) { 0026 abstractMethod() 0027 } 0028 0029 /** 0030 Notifies all subscribed observers about next event. 0031 0032 - parameter event: Event to send to the observers. 0033 */ 0034 public func on(event: Event<E>) { 0035 abstractMethod() 0036 } 0037 0038 /** 0039 Returns observer interface for subject. 0040 */ 0041 public func asObserver() -> SubjectObserverType { 0042 return self 0043 } 0044 0045 /** 0046 Unsubscribe all observers and release resources. 0047 */ 0048 public func dispose
ReplaySubject.swift:164
        super.dispose()
() { 0049 } 0050 0051 /** 0052 Creates new instance of `ReplaySubject` that replays at most `bufferSize` last elements of sequence. 0053 0054 - parameter bufferSize: Maximal number of elements to replay to observer after subscription. 0055 - returns: New instance of replay subject. 0056 */ 0057 public static func create
Observable+Binding.swift:93
        return self.multicast(ReplaySubject.create(bufferSize: bufferSize))
(bufferSize bufferSize: Int) -> ReplaySubject<Element> { 0058 if bufferSize == 1 { 0059 return ReplayOne() 0060 } 0061 else { 0062 return ReplayMany(bufferSize: bufferSize) 0063 } 0064 } 0065 0066 /** 0067 Creates a new instance of `ReplaySubject` that buffers all the elements of a sequence. 0068 To avoid filling up memory, developer needs to make sure that the use case will only ever store a 'reasonable' 0069 number of elements. 0070 */ 0071 public static func createUnbounded
Observable+Binding.swift:108
        return self.multicast(ReplaySubject.createUnbounded())
() -> ReplaySubject<Element> { 0072 return ReplayAll() 0073 } 0074 } 0075 0076 class ReplayBufferBase
ReplaySubject.swift:181
final class ReplayOne<Element> : ReplayBufferBase<Element> {
ReplaySubject.swift:208
class ReplayManyBase<Element> : ReplayBufferBase<Element> {
<Element
ReplaySubject.swift:77
    : ReplaySubject<Element>
ReplaySubject.swift:84
    private var _stoppedEvent = nil as Event<Element>?
ReplaySubject.swift:85
    private var _observers = Bag<AnyObserver<Element>>()
ReplaySubject.swift:91
    func addValueToBuffer(value: Element) {
ReplaySubject.swift:95
    func replayBuffer(observer: AnyObserver<Element>) {
ReplaySubject.swift:99
    override func on(event: Event<Element>) {
ReplaySubject.swift:126
    override func subscribe<O : ObserverType where O.E == Element>(observer: O) -> Disposable {
> 0077 : ReplaySubject<Element> 0078 , SynchronizedUnsubscribeType { 0079 0080 private var _lock
ReplaySubject.swift:100
        _lock.lock(); defer { _lock.unlock() }
ReplaySubject.swift:100
        _lock.lock(); defer { _lock.unlock() }
ReplaySubject.swift:127
        _lock.lock(); defer { _lock.unlock() }
ReplaySubject.swift:127
        _lock.lock(); defer { _lock.unlock() }
ReplaySubject.swift:151
        _lock.lock(); defer { _lock.unlock() }
ReplaySubject.swift:151
        _lock.lock(); defer { _lock.unlock() }
ReplaySubject.swift:170
        _lock.lock(); defer { _lock.unlock() }
ReplaySubject.swift:170
        _lock.lock(); defer { _lock.unlock() }
= NSRecursiveLock() 0081 0082 // state 0083 private var _disposed
ReplaySubject.swift:105
        if _disposed {
ReplaySubject.swift:132
        if _disposed {
ReplaySubject.swift:156
        if _disposed {
ReplaySubject.swift:175
        _disposed = true
= false 0084 private var _stoppedEvent
ReplaySubject.swift:109
        if _stoppedEvent != nil {
ReplaySubject.swift:119
            _stoppedEvent = event
ReplaySubject.swift:140
        if let stoppedEvent = _stoppedEvent {
ReplaySubject.swift:176
        _stoppedEvent = nil
= nil as Event<Element>? 0085 private var _observers
ReplaySubject.swift:117
            _observers.on(event)
ReplaySubject.swift:121
            _observers.on(event)
ReplaySubject.swift:122
            _observers.removeAll()
ReplaySubject.swift:145
            let key = _observers.insert(AnyObserver)
ReplaySubject.swift:160
        _ = _observers.removeKey(disposeKey)
ReplaySubject.swift:177
        _observers.removeAll()
= Bag<AnyObserver<Element>>() 0086 0087 func trim
ReplaySubject.swift:116
            trim()
ReplaySubject.swift:120
            trim()
() { 0088 abstractMethod() 0089 } 0090 0091 func addValueToBuffer
ReplaySubject.swift:115
            addValueToBuffer(value)
(value: Element) { 0092 abstractMethod() 0093 } 0094 0095 func replayBuffer
ReplaySubject.swift:139
        replayBuffer(AnyObserver)
(observer: AnyObserver<Element>) { 0096 abstractMethod() 0097 } 0098 0099 override func on(event: Event<Element>) { 0100 _lock.lock(); defer { _lock.unlock() } 0101 _synchronized_on(event) 0102 } 0103 0104 func _synchronized_on
ReplaySubject.swift:101
        _synchronized_on(event)
(event: Event<E>) { 0105 if _disposed { 0106 return 0107 } 0108 0109 if _stoppedEvent != nil { 0110 return 0111 } 0112 0113 switch event { 0114 case .Next(let value): 0115 addValueToBuffer(value) 0116 trim() 0117 _observers.on(event) 0118 case .Error, .Completed: 0119 _stoppedEvent = event 0120 trim() 0121 _observers.on(event) 0122 _observers.removeAll() 0123 } 0124 } 0125 0126 override func subscribe<O : ObserverType where O.E == Element>(observer: O) -> Disposable { 0127 _lock.lock(); defer { _lock.unlock() } 0128 return _synchronized_subscribe(observer) 0129 } 0130 0131 func _synchronized_subscribe
ReplaySubject.swift:128
        return _synchronized_subscribe(observer)
<O : ObserverType where O.E == E>(observer: O) -> Disposable { 0132 if _disposed { 0133 observer.on(.Error(RxError.Disposed(object: self))) 0134 return NopDisposable.instance 0135 } 0136 0137 let AnyObserver = observer.asObserver() 0138 0139 replayBuffer(AnyObserver) 0140 if let stoppedEvent = _stoppedEvent { 0141 observer.on(stoppedEvent) 0142 return NopDisposable.instance 0143 } 0144 else { 0145 let key = _observers.insert(AnyObserver) 0146 return SubscriptionDisposable(owner: self, key: key) 0147 } 0148 } 0149 0150 func synchronizedUnsubscribe(disposeKey: DisposeKey) { 0151 _lock.lock(); defer { _lock.unlock() } 0152 _synchronized_unsubscribe(disposeKey) 0153 } 0154 0155 func _synchronized_unsubscribe
ReplaySubject.swift:152
        _synchronized_unsubscribe(disposeKey)
(disposeKey: DisposeKey) { 0156 if _disposed { 0157 return 0158 } 0159 0160 _ = _observers.removeKey(disposeKey) 0161 } 0162 0163 override func dispose() { 0164 super.dispose() 0165 0166 synchronizedDispose() 0167 } 0168 0169 func synchronizedDispose
ReplaySubject.swift:166
        synchronizedDispose()
() { 0170 _lock.lock(); defer { _lock.unlock() } 0171 _synchronized_dispose() 0172 } 0173 0174 func _synchronized_dispose
ReplaySubject.swift:171
        _synchronized_dispose()
ReplaySubject.swift:203
        super._synchronized_dispose()
ReplaySubject.swift:226
        super._synchronized_dispose()
() { 0175 _disposed = true 0176 _stoppedEvent = nil 0177 _observers.removeAll() 0178 } 0179 } 0180 0181 final class ReplayOne
ReplaySubject.swift:59
            return ReplayOne()
<Element
ReplaySubject.swift:181
final class ReplayOne<Element> : ReplayBufferBase<Element> {
ReplaySubject.swift:182
    private var _value: Element?
ReplaySubject.swift:192
    override func addValueToBuffer(value: Element) {
ReplaySubject.swift:196
    override func replayBuffer(observer: AnyObserver<Element>) {
> : ReplayBufferBase<Element> { 0182 private var _value
ReplaySubject.swift:193
        _value = value
ReplaySubject.swift:197
        if let value = _value {
ReplaySubject.swift:204
        _value = nil
: Element? 0183 0184 override init
ReplaySubject.swift:59
            return ReplayOne()
() { 0185 super.init() 0186 } 0187 0188 override func trim() { 0189 0190 } 0191 0192 override func addValueToBuffer(value: Element) { 0193 _value = value 0194 } 0195 0196 override func replayBuffer(observer: AnyObserver<Element>) { 0197 if let value = _value { 0198 observer.on(.Next(value)) 0199 } 0200 } 0201 0202 override func _synchronized_dispose() { 0203 super._synchronized_dispose() 0204 _value = nil 0205 } 0206 } 0207 0208 class ReplayManyBase
ReplaySubject.swift:231
final class ReplayMany<Element> : ReplayManyBase<Element> {
ReplaySubject.swift:247
final class ReplayAll<Element> : ReplayManyBase<Element> {
<Element
ReplaySubject.swift:208
class ReplayManyBase<Element> : ReplayBufferBase<Element> {
ReplaySubject.swift:209
    private var _queue: Queue<Element>
ReplaySubject.swift:215
    override func addValueToBuffer(value: Element) {
> : ReplayBufferBase<Element> { 0209 private var _queue
ReplaySubject.swift:212
        _queue = Queue(capacity: queueSize + 1)
ReplaySubject.swift:216
        _queue.enqueue(value)
ReplaySubject.swift:220
        for item in _queue {
ReplaySubject.swift:227
        _queue = Queue(capacity: 0)
ReplaySubject.swift:241
        while _queue.count > _bufferSize {
ReplaySubject.swift:242
            _queue.dequeue()
: Queue<Element> 0210 0211 init
ReplaySubject.swift:237
        super.init(queueSize: bufferSize)
ReplaySubject.swift:249
        super.init(queueSize: 0)
(queueSize: Int) { 0212 _queue = Queue(capacity: queueSize + 1) 0213 } 0214 0215 override func addValueToBuffer(value: Element) { 0216 _queue.enqueue(value) 0217 } 0218 0219 override func replayBuffer(observer: AnyObserver<E>) { 0220 for item in _queue { 0221 observer.on(.Next(item)) 0222 } 0223 } 0224 0225 override func _synchronized_dispose() { 0226 super._synchronized_dispose() 0227 _queue = Queue(capacity: 0) 0228 } 0229 } 0230 0231 final class ReplayMany
ReplaySubject.swift:62
            return ReplayMany(bufferSize: bufferSize)
<Element
ReplaySubject.swift:231
final class ReplayMany<Element> : ReplayManyBase<Element> {
> : ReplayManyBase<Element> { 0232 private let _bufferSize
ReplaySubject.swift:235
        _bufferSize = bufferSize
ReplaySubject.swift:241
        while _queue.count > _bufferSize {
: Int 0233 0234 init
ReplaySubject.swift:62
            return ReplayMany(bufferSize: bufferSize)
(bufferSize: Int) { 0235 _bufferSize = bufferSize 0236 0237 super.init(queueSize: bufferSize) 0238 } 0239 0240 override func trim() { 0241 while _queue.count > _bufferSize { 0242 _queue.dequeue() 0243 } 0244 } 0245 } 0246 0247 final class ReplayAll
ReplaySubject.swift:72
		return ReplayAll()
<Element
ReplaySubject.swift:247
final class ReplayAll<Element> : ReplayManyBase<Element> {
> : ReplayManyBase<Element> { 0248 init
ReplaySubject.swift:72
		return ReplayAll()
() { 0249 super.init(queueSize: 0) 0250 } 0251 0252 override func trim() { 0253 0254 } 0255 }