0001 // 0002 // ReplaySubject.swift 0003 // RxSwift 0004 // 0005 // Created by Krunoslav Zaher on 4/14/15. 0006 // Copyright © 2015 Krunoslav Zaher. All rights reserved. 0007 // 0008 0009 import Foundation 0010 0011 /** 0012 Represents an object that is both an observable sequence as well as an observer. 0013 0014 Each notification is broadcasted to all subscribed and future observers, subject to buffer trimming policies. 0015 */ 0016 public class ReplaySubject<Element
Observable+Binding.swift:93 return self.multicast(ReplaySubject.create(bufferSize: bufferSize))Observable+Binding.swift:108 return self.multicast(ReplaySubject.createUnbounded())ReplaySubject.swift:21 public typealias SubjectObserverType = ReplaySubject<Element>ReplaySubject.swift:57 public static func create(bufferSize bufferSize: Int) -> ReplaySubject<Element> {ReplaySubject.swift:71 public static func createUnbounded() -> ReplaySubject<Element> {ReplaySubject.swift:77 : ReplaySubject<Element>> 0017 : Observable<Element> 0018 , SubjectType 0019 , ObserverType 0020 , Disposable { 0021 public typealias SubjectObserverType
ReplaySubject.swift:17 : Observable<Element>ReplaySubject.swift:21 public typealias SubjectObserverType = ReplaySubject<Element>ReplaySubject.swift:23 typealias DisposeKey = Bag<AnyObserver<Element>>.KeyTypeReplaySubject.swift:57 public static func create(bufferSize bufferSize: Int) -> ReplaySubject<Element> {ReplaySubject.swift:71 public static func createUnbounded() -> ReplaySubject<Element> {= ReplaySubject<Element> 0022 0023 typealias DisposeKey
ReplaySubject.swift:41 public func asObserver() -> SubjectObserverType {= Bag<AnyObserver<Element>>.KeyType 0024 0025 func unsubscribe(key: DisposeKey) { 0026 abstractMethod() 0027 } 0028 0029 /** 0030 Notifies all subscribed observers about next event. 0031 0032 - parameter event: Event to send to the observers. 0033 */ 0034 public func on(event: Event<E>) { 0035 abstractMethod() 0036 } 0037 0038 /** 0039 Returns observer interface for subject. 0040 */ 0041 public func asObserver() -> SubjectObserverType { 0042 return self 0043 } 0044 0045 /** 0046 Unsubscribe all observers and release resources. 0047 */ 0048 public func dispose
ReplaySubject.swift:25 func unsubscribe(key: DisposeKey) {ReplaySubject.swift:150 func synchronizedUnsubscribe(disposeKey: DisposeKey) {ReplaySubject.swift:155 func _synchronized_unsubscribe(disposeKey: DisposeKey) {() { 0049 } 0050 0051 /** 0052 Creates new instance of `ReplaySubject` that replays at most `bufferSize` last elements of sequence. 0053 0054 - parameter bufferSize: Maximal number of elements to replay to observer after subscription. 0055 - returns: New instance of replay subject. 0056 */ 0057 public static func create
ReplaySubject.swift:164 super.dispose()(bufferSize bufferSize: Int) -> ReplaySubject<Element> { 0058 if bufferSize == 1 { 0059 return ReplayOne() 0060 } 0061 else { 0062 return ReplayMany(bufferSize: bufferSize) 0063 } 0064 } 0065 0066 /** 0067 Creates a new instance of `ReplaySubject` that buffers all the elements of a sequence. 0068 To avoid filling up memory, developer needs to make sure that the use case will only ever store a 'reasonable' 0069 number of elements. 0070 */ 0071 public static func createUnbounded
Observable+Binding.swift:93 return self.multicast(ReplaySubject.create(bufferSize: bufferSize))() -> ReplaySubject<Element> { 0072 return ReplayAll() 0073 } 0074 } 0075 0076 class ReplayBufferBase
Observable+Binding.swift:108 return self.multicast(ReplaySubject.createUnbounded())<Element
ReplaySubject.swift:181 final class ReplayOne<Element> : ReplayBufferBase<Element> {ReplaySubject.swift:208 class ReplayManyBase<Element> : ReplayBufferBase<Element> {> 0077 : ReplaySubject<Element> 0078 , SynchronizedUnsubscribeType { 0079 0080 private var _lock
ReplaySubject.swift:77 : ReplaySubject<Element>ReplaySubject.swift:84 private var _stoppedEvent = nil as Event<Element>?ReplaySubject.swift:85 private var _observers = Bag<AnyObserver<Element>>()ReplaySubject.swift:91 func addValueToBuffer(value: Element) {ReplaySubject.swift:95 func replayBuffer(observer: AnyObserver<Element>) {ReplaySubject.swift:99 override func on(event: Event<Element>) {ReplaySubject.swift:126 override func subscribe<O : ObserverType where O.E == Element>(observer: O) -> Disposable {= NSRecursiveLock() 0081 0082 // state 0083 private var _disposed
ReplaySubject.swift:100 _lock.lock(); defer { _lock.unlock() }ReplaySubject.swift:100 _lock.lock(); defer { _lock.unlock() }ReplaySubject.swift:127 _lock.lock(); defer { _lock.unlock() }ReplaySubject.swift:127 _lock.lock(); defer { _lock.unlock() }ReplaySubject.swift:151 _lock.lock(); defer { _lock.unlock() }ReplaySubject.swift:151 _lock.lock(); defer { _lock.unlock() }ReplaySubject.swift:170 _lock.lock(); defer { _lock.unlock() }ReplaySubject.swift:170 _lock.lock(); defer { _lock.unlock() }= false 0084 private var _stoppedEvent
ReplaySubject.swift:105 if _disposed {ReplaySubject.swift:132 if _disposed {ReplaySubject.swift:156 if _disposed {ReplaySubject.swift:175 _disposed = true= nil as Event<Element>? 0085 private var _observers
ReplaySubject.swift:109 if _stoppedEvent != nil {ReplaySubject.swift:119 _stoppedEvent = eventReplaySubject.swift:140 if let stoppedEvent = _stoppedEvent {ReplaySubject.swift:176 _stoppedEvent = nil= Bag<AnyObserver<Element>>() 0086 0087 func trim
ReplaySubject.swift:117 _observers.on(event)ReplaySubject.swift:121 _observers.on(event)ReplaySubject.swift:122 _observers.removeAll()ReplaySubject.swift:145 let key = _observers.insert(AnyObserver)ReplaySubject.swift:160 _ = _observers.removeKey(disposeKey)ReplaySubject.swift:177 _observers.removeAll()() { 0088 abstractMethod() 0089 } 0090 0091 func addValueToBuffer
ReplaySubject.swift:116 trim()ReplaySubject.swift:120 trim()(value: Element) { 0092 abstractMethod() 0093 } 0094 0095 func replayBuffer
ReplaySubject.swift:115 addValueToBuffer(value)(observer: AnyObserver<Element>) { 0096 abstractMethod() 0097 } 0098 0099 override func on(event: Event<Element>) { 0100 _lock.lock(); defer { _lock.unlock() } 0101 _synchronized_on(event) 0102 } 0103 0104 func _synchronized_on
ReplaySubject.swift:139 replayBuffer(AnyObserver)(event: Event<E>) { 0105 if _disposed { 0106 return 0107 } 0108 0109 if _stoppedEvent != nil { 0110 return 0111 } 0112 0113 switch event { 0114 case .Next(let value): 0115 addValueToBuffer(value) 0116 trim() 0117 _observers.on(event) 0118 case .Error, .Completed: 0119 _stoppedEvent = event 0120 trim() 0121 _observers.on(event) 0122 _observers.removeAll() 0123 } 0124 } 0125 0126 override func subscribe<O : ObserverType where O.E == Element>(observer: O) -> Disposable { 0127 _lock.lock(); defer { _lock.unlock() } 0128 return _synchronized_subscribe(observer) 0129 } 0130 0131 func _synchronized_subscribe
ReplaySubject.swift:101 _synchronized_on(event)<O : ObserverType where O.E == E>(observer: O) -> Disposable { 0132 if _disposed { 0133 observer.on(.Error(RxError.Disposed(object: self))) 0134 return NopDisposable.instance 0135 } 0136 0137 let AnyObserver = observer.asObserver() 0138 0139 replayBuffer(AnyObserver) 0140 if let stoppedEvent = _stoppedEvent { 0141 observer.on(stoppedEvent) 0142 return NopDisposable.instance 0143 } 0144 else { 0145 let key = _observers.insert(AnyObserver) 0146 return SubscriptionDisposable(owner: self, key: key) 0147 } 0148 } 0149 0150 func synchronizedUnsubscribe(disposeKey: DisposeKey) { 0151 _lock.lock(); defer { _lock.unlock() } 0152 _synchronized_unsubscribe(disposeKey) 0153 } 0154 0155 func _synchronized_unsubscribe
ReplaySubject.swift:128 return _synchronized_subscribe(observer)(disposeKey: DisposeKey) { 0156 if _disposed { 0157 return 0158 } 0159 0160 _ = _observers.removeKey(disposeKey) 0161 } 0162 0163 override func dispose() { 0164 super.dispose() 0165 0166 synchronizedDispose() 0167 } 0168 0169 func synchronizedDispose
ReplaySubject.swift:152 _synchronized_unsubscribe(disposeKey)() { 0170 _lock.lock(); defer { _lock.unlock() } 0171 _synchronized_dispose() 0172 } 0173 0174 func _synchronized_dispose
ReplaySubject.swift:166 synchronizedDispose()() { 0175 _disposed = true 0176 _stoppedEvent = nil 0177 _observers.removeAll() 0178 } 0179 } 0180 0181 final class ReplayOne
ReplaySubject.swift:171 _synchronized_dispose()ReplaySubject.swift:203 super._synchronized_dispose()ReplaySubject.swift:226 super._synchronized_dispose()<Element
ReplaySubject.swift:59 return ReplayOne()> : ReplayBufferBase<Element> { 0182 private var _value
ReplaySubject.swift:181 final class ReplayOne<Element> : ReplayBufferBase<Element> {ReplaySubject.swift:182 private var _value: Element?ReplaySubject.swift:192 override func addValueToBuffer(value: Element) {ReplaySubject.swift:196 override func replayBuffer(observer: AnyObserver<Element>) {: Element? 0183 0184 override init
ReplaySubject.swift:193 _value = valueReplaySubject.swift:197 if let value = _value {ReplaySubject.swift:204 _value = nil() { 0185 super.init() 0186 } 0187 0188 override func trim() { 0189 0190 } 0191 0192 override func addValueToBuffer(value: Element) { 0193 _value = value 0194 } 0195 0196 override func replayBuffer(observer: AnyObserver<Element>) { 0197 if let value = _value { 0198 observer.on(.Next(value)) 0199 } 0200 } 0201 0202 override func _synchronized_dispose() { 0203 super._synchronized_dispose() 0204 _value = nil 0205 } 0206 } 0207 0208 class ReplayManyBase
ReplaySubject.swift:59 return ReplayOne()<Element
ReplaySubject.swift:231 final class ReplayMany<Element> : ReplayManyBase<Element> {ReplaySubject.swift:247 final class ReplayAll<Element> : ReplayManyBase<Element> {> : ReplayBufferBase<Element> { 0209 private var _queue
ReplaySubject.swift:208 class ReplayManyBase<Element> : ReplayBufferBase<Element> {ReplaySubject.swift:209 private var _queue: Queue<Element>ReplaySubject.swift:215 override func addValueToBuffer(value: Element) {: Queue<Element> 0210 0211 init
ReplaySubject.swift:212 _queue = Queue(capacity: queueSize + 1)ReplaySubject.swift:216 _queue.enqueue(value)ReplaySubject.swift:220 for item in _queue {ReplaySubject.swift:227 _queue = Queue(capacity: 0)ReplaySubject.swift:241 while _queue.count > _bufferSize {ReplaySubject.swift:242 _queue.dequeue()(queueSize: Int) { 0212 _queue = Queue(capacity: queueSize + 1) 0213 } 0214 0215 override func addValueToBuffer(value: Element) { 0216 _queue.enqueue(value) 0217 } 0218 0219 override func replayBuffer(observer: AnyObserver<E>) { 0220 for item in _queue { 0221 observer.on(.Next(item)) 0222 } 0223 } 0224 0225 override func _synchronized_dispose() { 0226 super._synchronized_dispose() 0227 _queue = Queue(capacity: 0) 0228 } 0229 } 0230 0231 final class ReplayMany
ReplaySubject.swift:237 super.init(queueSize: bufferSize)ReplaySubject.swift:249 super.init(queueSize: 0)<Element
ReplaySubject.swift:62 return ReplayMany(bufferSize: bufferSize)> : ReplayManyBase<Element> { 0232 private let _bufferSize
ReplaySubject.swift:231 final class ReplayMany<Element> : ReplayManyBase<Element> {: Int 0233 0234 init
ReplaySubject.swift:235 _bufferSize = bufferSizeReplaySubject.swift:241 while _queue.count > _bufferSize {(bufferSize: Int) { 0235 _bufferSize = bufferSize 0236 0237 super.init(queueSize: bufferSize) 0238 } 0239 0240 override func trim() { 0241 while _queue.count > _bufferSize { 0242 _queue.dequeue() 0243 } 0244 } 0245 } 0246 0247 final class ReplayAll
ReplaySubject.swift:62 return ReplayMany(bufferSize: bufferSize)<Element
ReplaySubject.swift:72 return ReplayAll()> : ReplayManyBase<Element> { 0248 init
ReplaySubject.swift:247 final class ReplayAll<Element> : ReplayManyBase<Element> {() { 0249 super.init(queueSize: 0) 0250 } 0251 0252 override func trim() { 0253 0254 } 0255 }
ReplaySubject.swift:72 return ReplayAll()