0001 // 0002 // Observable+Extensions.swift 0003 // Rx 0004 // 0005 // Created by Krunoslav Zaher on 2/21/15. 0006 // Copyright © 2015 Krunoslav Zaher. All rights reserved. 0007 // 0008 0009 import Foundation 0010 0011 extension ObservableType { 0012 /** 0013 Subscribes an event handler to an observable sequence. 0014 0015 - parameter on: Action to invoke for each event in the observable sequence. 0016 - returns: Subscription object used to unsubscribe from the observable sequence. 0017 */ 0018 @warn_unused_result(message="http://git.io/rxs.ud") 0019 public func subscribe(on: (event: Event<E>) -> Void) 0020 -> Disposable { 0021 let observer = AnonymousObserver { e in 0022 on(event: e) 0023 } 0024 return self.subscribeSafe(observer) 0025 } 0026 0027 /** 0028 Subscribes an element handler, an error handler, a completion handler and disposed handler to an observable sequence. 0029 0030 - parameter onNext: Action to invoke for each element in the observable sequence. 0031 - parameter onError: Action to invoke upon errored termination of the observable sequence. 0032 - parameter onCompleted: Action to invoke upon graceful termination of the observable sequence. 0033 - parameter onDisposed: Action to invoke upon any type of termination of sequence (if the sequence has 0034 gracefully completed, errored, or if the generation is cancelled by disposing subscription) 0035 - returns: Subscription object used to unsubscribe from the observable sequence. 0036 */ 0037 @warn_unused_result(message="http://git.io/rxs.ud") 0038 public func subscribe(onNext onNext: (E -> Void)? = nil, onError: (ErrorType -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil) 0039 -> Disposable { 0040 0041 let disposable: Disposable 0042 0043 if let disposed = onDisposed { 0044 disposable = AnonymousDisposable(disposed) 0045 } 0046 else { 0047 disposable = NopDisposable.instance 0048 } 0049 0050 let observer = AnonymousObserver<E> { e in 0051 switch e { 0052 case .Next(let value): 0053 onNext?(value) 0054 case .Error(let e): 0055 onError?(e) 0056 disposable.dispose() 0057 case .Completed: 0058 onCompleted?() 0059 disposable.dispose() 0060 } 0061 } 0062 return BinaryDisposable( 0063 self.subscribeSafe(observer), 0064 disposable 0065 ) 0066 } 0067 0068 /** 0069 Subscribes an element handler to an observable sequence. 0070 0071 - parameter onNext: Action to invoke for each element in the observable sequence. 0072 - returns: Subscription object used to unsubscribe from the observable sequence. 0073 */ 0074 @warn_unused_result(message="http://git.io/rxs.ud") 0075 public func subscribeNext(onNext: (E) -> Void) 0076 -> Disposable { 0077 let observer = AnonymousObserver<E> { e in 0078 if case .Next(let value) = e { 0079 onNext(value) 0080 } 0081 } 0082 return self.subscribeSafe(observer) 0083 } 0084 0085 /** 0086 Subscribes an error handler to an observable sequence. 0087 0088 - parameter onError: Action to invoke upon errored termination of the observable sequence. 0089 - returns: Subscription object used to unsubscribe from the observable sequence. 0090 */ 0091 @warn_unused_result(message="http://git.io/rxs.ud") 0092 public func subscribeError(onError: (ErrorType) -> Void) 0093 -> Disposable { 0094 let observer = AnonymousObserver<E> { e in 0095 if case .Error(let error) = e { 0096 onError(error) 0097 } 0098 } 0099 return self.subscribeSafe(observer) 0100 } 0101 0102 /** 0103 Subscribes a completion handler to an observable sequence. 0104 0105 - parameter onCompleted: Action to invoke upon graceful termination of the observable sequence. 0106 - returns: Subscription object used to unsubscribe from the observable sequence. 0107 */ 0108 @warn_unused_result(message="http://git.io/rxs.ud") 0109 public func subscribeCompleted(onCompleted: () -> Void) 0110 -> Disposable { 0111 let observer = AnonymousObserver<E> { e in 0112 if case .Completed = e { 0113 onCompleted() 0114 } 0115 } 0116 return self.subscribeSafe(observer) 0117 } 0118 } 0119 0120 public extension ObservableType { 0121 /** 0122 All internal subscribe calls go through this method 0123 */ 0124 @warn_unused_result(message="http://git.io/rxs.ud") 0125 func subscribeSafe<O: ObserverType where O.E == E>(observer: O) -> Disposable { 0126 return self.asObservable().subscribe(observer) 0127 } 0128 } 0129
AddRef.swift:43 sink.disposable = StableCompositeDisposable.create(releaseDisposable, _source.subscribeSafe(sink))ElementAt.swift:76 sink.disposable = _source.subscribeSafe(sink)Observable+Extensions.swift:24 return self.subscribeSafe(observer)Observable+Extensions.swift:63 self.subscribeSafe(observer),Observable+Extensions.swift:82 return self.subscribeSafe(observer)Observable+Extensions.swift:99 return self.subscribeSafe(observer)Observable+Extensions.swift:116 return self.subscribeSafe(observer)RefCount.swift:25 let subscription = _parent._source.subscribeSafe(self)Timeout.swift:36 original.disposable = _parent._source.subscribeSafe(self)Timeout.swift:92 self._subscription.disposable = self._parent._other.subscribeSafe(self.forwarder())Window.swift:45 _groupDisposable.addDisposable(_parent._source.subscribeSafe(self))