0001 // 0002 // Observable+Binding.swift 0003 // Rx 0004 // 0005 // Created by Krunoslav Zaher on 3/1/15. 0006 // Copyright © 2015 Krunoslav Zaher. All rights reserved. 0007 // 0008 0009 import Foundation 0010 0011 // MARK: multicast 0012 0013 extension ObservableType { 0014 0015 /** 0016 Multicasts the source sequence notifications through the specified subject to the resulting connectable observable. 0017 0018 Upon connection of the connectable observable, the subject is subscribed to the source exactly one, and messages are forwarded to the observers registered with the connectable observable. 0019 0020 For specializations with fixed subject types, see `publish` and `replay`. 0021 0022 - seealso: [multicast operator on reactivex.io](http://reactivex.io/documentation/operators/publish.html) 0023 0024 - parameter subject: Subject to push source elements into. 0025 - returns: A connectable observable sequence that upon connection causes the source sequence to push results into the specified subject. 0026 */ 0027 @warn_unused_result(message="http://git.io/rxs.uo") 0028 public func multicast<S: SubjectType where S.SubjectObserverType.E == E>(subject: S) 0029 -> ConnectableObservable<S.E> { 0030 return ConnectableObservableAdapter(source: self.asObservable(), subject: subject) 0031 } 0032 0033 /** 0034 Multicasts the source sequence notifications through an instantiated subject into all uses of the sequence within a selector function. 0035 0036 Each subscription to the resulting sequence causes a separate multicast invocation, exposing the sequence resulting from the selector function's invocation. 0037 0038 For specializations with fixed subject types, see `publish` and `replay`. 0039 0040 - seealso: [multicast operator on reactivex.io](http://reactivex.io/documentation/operators/publish.html) 0041 0042 - parameter subjectSelector: Factory function to create an intermediate subject through which the source sequence's elements will be multicast to the selector function. 0043 - parameter selector: Selector function which can use the multicasted source sequence subject to the policies enforced by the created subject. 0044 - returns: An observable sequence that contains the elements of a sequence produced by multicasting the source sequence within a selector function. 0045 */ 0046 @warn_unused_result(message="http://git.io/rxs.uo") 0047 public func multicast<S: SubjectType, R where S.SubjectObserverType.E == E>(subjectSelector: () throws -> S, selector: (Observable<S.E>) throws -> Observable<R>) 0048 -> Observable<R> { 0049 return Multicast( 0050 source: self.asObservable(), 0051 subjectSelector: subjectSelector, 0052 selector: selector 0053 ) 0054 } 0055 } 0056 0057 // MARK: publish 0058 0059 extension ObservableType { 0060 0061 /** 0062 Returns a connectable observable sequence that shares a single subscription to the underlying sequence. 0063 0064 This operator is a specialization of `multicast` using a `PublishSubject`. 0065 0066 - seealso: [publish operator on reactivex.io](http://reactivex.io/documentation/operators/publish.html) 0067 0068 - returns: A connectable observable sequence that shares a single subscription to the underlying sequence. 0069 */ 0070 @warn_unused_result(message="http://git.io/rxs.uo") 0071 public func publish
Observable+Binding.swift:72 return self.multicast(PublishSubject())Observable+Binding.swift:93 return self.multicast(ReplaySubject.create(bufferSize: bufferSize))Observable+Binding.swift:108 return self.multicast(ReplaySubject.createUnbounded())() -> ConnectableObservable<E> { 0072 return self.multicast(PublishSubject()) 0073 } 0074 } 0075 0076 // MARK: replay 0077 0078 extension ObservableType { 0079 0080 /** 0081 Returns a connectable observable sequence that shares a single subscription to the underlying sequence replaying bufferSize elements. 0082 0083 This operator is a specialization of `multicast` using a `ReplaySubject`. 0084 0085 - seealso: [replay operator on reactivex.io](http://reactivex.io/documentation/operators/replay.html) 0086 0087 - parameter bufferSize: Maximum element count of the replay buffer. 0088 - returns: A connectable observable sequence that shares a single subscription to the underlying sequence. 0089 */ 0090 @warn_unused_result(message="http://git.io/rxs.uo") 0091 public func replay
Observable+Binding.swift:144 return self.publish().refCount()(bufferSize: Int) 0092 -> ConnectableObservable<E> { 0093 return self.multicast(ReplaySubject.create(bufferSize: bufferSize)) 0094 } 0095 0096 /** 0097 Returns a connectable observable sequence that shares a single subscription to the underlying sequence replaying all elements. 0098 0099 This operator is a specialization of `multicast` using a `ReplaySubject`. 0100 0101 - seealso: [replay operator on reactivex.io](http://reactivex.io/documentation/operators/replay.html) 0102 0103 - returns: A connectable observable sequence that shares a single subscription to the underlying sequence. 0104 */ 0105 @warn_unused_result(message="http://git.io/rxs.uo") 0106 public func replayAll() 0107 -> ConnectableObservable<E> { 0108 return self.multicast(ReplaySubject.createUnbounded()) 0109 } 0110 } 0111 0112 // MARK: refcount 0113 0114 extension ConnectableObservableType { 0115 0116 /** 0117 Returns an observable sequence that stays connected to the source as long as there is at least one subscription to the observable sequence. 0118 0119 - seealso: [refCount operator on reactivex.io](http://reactivex.io/documentation/operators/refCount.html) 0120 0121 - returns: An observable sequence that stays connected to the source as long as there is at least one subscription to the observable sequence. 0122 */ 0123 @warn_unused_result(message="http://git.io/rxs.uo") 0124 public func refCount
Observable+Binding.swift:169 return self.replay(bufferSize).refCount()() -> Observable<E> { 0125 return RefCount(source: self) 0126 } 0127 } 0128 0129 // MARK: share 0130 0131 extension ObservableType { 0132 0133 /** 0134 Returns an observable sequence that shares a single subscription to the underlying sequence. 0135 0136 This operator is a specialization of publish which creates a subscription when the number of observers goes from zero to one, then shares that subscription with all subsequent observers until the number of observers returns to zero, at which point the subscription is disposed. 0137 0138 - seealso: [share operator on reactivex.io](http://reactivex.io/documentation/operators/refcount.html) 0139 0140 - returns: An observable sequence that contains the elements of a sequence produced by multicasting the source sequence. 0141 */ 0142 @warn_unused_result(message="http://git.io/rxs.uo") 0143 public func share() -> Observable<E> { 0144 return self.publish().refCount() 0145 } 0146 } 0147 0148 // MARK: shareReplay 0149 0150 extension ObservableType { 0151 0152 /** 0153 Returns an observable sequence that shares a single subscription to the underlying sequence, and immediately upon subscription replays maximum number of elements in buffer. 0154 0155 This operator is a specialization of replay which creates a subscription when the number of observers goes from zero to one, then shares that subscription with all subsequent observers until the number of observers returns to zero, at which point the subscription is disposed. 0156 0157 - seealso: [shareReplay operator on reactivex.io](http://reactivex.io/documentation/operators/replay.html) 0158 0159 - parameter bufferSize: Maximum element count of the replay buffer. 0160 - returns: An observable sequence that contains the elements of a sequence produced by multicasting the source sequence. 0161 */ 0162 @warn_unused_result(message="http://git.io/rxs.uo") 0163 public func shareReplay(bufferSize: Int) 0164 -> Observable<E> { 0165 if bufferSize == 1 { 0166 return ShareReplay1(source: self.asObservable()) 0167 } 0168 else { 0169 return self.replay(bufferSize).refCount() 0170 } 0171 } 0172 0173 /** 0174 Returns an observable sequence that shares a single subscription to the underlying sequence, and immediately upon subscription replays latest element in buffer. 0175 0176 This operator is a specialization of replay which creates a subscription when the number of observers goes from zero to one, then shares that subscription with all subsequent observers until the number of observers returns to zero, at which point the subscription is disposed. 0177 0178 Unlike `shareReplay(bufferSize: Int)`, this operator will clear latest element from replay buffer in case number of subscribers drops from one to zero. In case sequence 0179 completes or errors out replay buffer is also cleared. 0180 0181 - seealso: [shareReplay operator on reactivex.io](http://reactivex.io/documentation/operators/replay.html) 0182 0183 - returns: An observable sequence that contains the elements of a sequence produced by multicasting the source sequence. 0184 */ 0185 @warn_unused_result(message="http://git.io/rxs.uo") 0186 public func shareReplayLatestWhileConnected() 0187 -> Observable<E> { 0188 return ShareReplay1WhileConnected(source: self.asObservable()) 0189 } 0190 }
Observable+Binding.swift:144 return self.publish().refCount()Observable+Binding.swift:169 return self.replay(bufferSize).refCount()