0001 // 0002 // Buffer.swift 0003 // Rx 0004 // 0005 // Created by Krunoslav Zaher on 9/13/15. 0006 // Copyright © 2015 Krunoslav Zaher. All rights reserved. 0007 // 0008 0009 import Foundation 0010 0011 class BufferTimeCount<Element
Buffer.swift:37 typealias Parent = BufferTimeCount<Element>Observable+Time.swift:215 return BufferTimeCount(source: self.asObservable(), timeSpan: timeSpan, count: count, scheduler: scheduler)> : Producer<[Element]> { 0012 0013 private let _timeSpan
Buffer.swift:11 class BufferTimeCount<Element> : Producer<[Element]> {Buffer.swift:16 private let _source: Observable<Element>Buffer.swift:18 init(source: Observable<Element>, timeSpan: RxTimeInterval, count: Int, scheduler: SchedulerType) {Buffer.swift:25 override func run<O : ObserverType where O.E == [Element]>(observer: O) -> Disposable {: RxTimeInterval 0014 private let _count
Buffer.swift:20 _timeSpan = timeSpanBuffer.swift:107 nextTimer.disposable = _parent._scheduler.scheduleRelative(windowID, dueTime: _parent._timeSpan) { previousWindowID in: Int 0015 private let _scheduler
Buffer.swift:21 _count = countBuffer.swift:79 if _buffer.count == _parent._count {: SchedulerType 0016 private let _source
Buffer.swift:22 _scheduler = schedulerBuffer.swift:107 nextTimer.disposable = _parent._scheduler.scheduleRelative(windowID, dueTime: _parent._timeSpan) { previousWindowID in: Observable<Element> 0017 0018 init
Buffer.swift:19 _source = sourceBuffer.swift:56 return StableCompositeDisposable.create(_timerD, _parent._source.subscribe(self))(source: Observable<Element>, timeSpan: RxTimeInterval, count: Int, scheduler: SchedulerType) { 0019 _source = source 0020 _timeSpan = timeSpan 0021 _count = count 0022 _scheduler = scheduler 0023 } 0024 0025 override func run<O : ObserverType where O.E == [Element]>(observer: O) -> Disposable { 0026 let sink = BufferTimeCountSink(parent: self, observer: observer) 0027 sink.disposable = sink.run() 0028 return sink 0029 } 0030 } 0031 0032 class BufferTimeCountSink
Observable+Time.swift:215 return BufferTimeCount(source: self.asObservable(), timeSpan: timeSpan, count: count, scheduler: scheduler)<Element
Buffer.swift:26 let sink = BufferTimeCountSink(parent: self, observer: observer), O
Buffer.swift:37 typealias Parent = BufferTimeCount<Element>Buffer.swift:38 typealias E = ElementBuffer.swift:46 private var _buffer = [Element](): ObserverType where O.E == [Element]> 0033 : Sink<O> 0034 , LockOwnerType 0035 , ObserverType 0036 , SynchronizedOnType { 0037 typealias Parent
Buffer.swift:33 : Sink<O>Buffer.swift:49 init(parent: Parent, observer: O) {= BufferTimeCount<Element> 0038 typealias E
Buffer.swift:40 private let _parent: ParentBuffer.swift:49 init(parent: Parent, observer: O) {= Element 0039 0040 private let _parent
Buffer.swift:70 func on(event: Event<E>) {Buffer.swift:74 func _synchronized_on(event: Event<E>) {: Parent 0041 0042 let _lock
Buffer.swift:50 _parent = parentBuffer.swift:56 return StableCompositeDisposable.create(_timerD, _parent._source.subscribe(self))Buffer.swift:79 if _buffer.count == _parent._count {Buffer.swift:107 nextTimer.disposable = _parent._scheduler.scheduleRelative(windowID, dueTime: _parent._timeSpan) { previousWindowID inBuffer.swift:107 nextTimer.disposable = _parent._scheduler.scheduleRelative(windowID, dueTime: _parent._timeSpan) { previousWindowID in= NSRecursiveLock() 0043 0044 // state 0045 private let _timerD
Buffer.swift:108 self._lock.performLocked {= SerialDisposable() 0046 private var _buffer
Buffer.swift:56 return StableCompositeDisposable.create(_timerD, _parent._source.subscribe(self))Buffer.swift:95 if _timerD.disposed {Buffer.swift:105 _timerD.disposable = nextTimer= [Element]() 0047 private var _windowID
Buffer.swift:63 let buffer = _bufferBuffer.swift:64 _buffer = []Buffer.swift:77 _buffer.append(element)Buffer.swift:79 if _buffer.count == _parent._count {Buffer.swift:84 _buffer = []Buffer.swift:88 forwardOn(.Next(_buffer))= 0 0048 0049 init
Buffer.swift:55 createTimer(_windowID)Buffer.swift:60 _windowID = _windowID &+ 1Buffer.swift:60 _windowID = _windowID &+ 1Buffer.swift:61 let windowID = _windowIDBuffer.swift:99 if _windowID != windowID {Buffer.swift:109 if previousWindowID != self._windowID {(parent: Parent, observer: O) { 0050 _parent = parent 0051 super.init(observer: observer) 0052 } 0053 0054 func run
Buffer.swift:26 let sink = BufferTimeCountSink(parent: self, observer: observer)() -> Disposable { 0055 createTimer(_windowID) 0056 return StableCompositeDisposable.create(_timerD, _parent._source.subscribe(self)) 0057 } 0058 0059 func startNewWindowAndSendCurrentOne
Buffer.swift:27 sink.disposable = sink.run()() { 0060 _windowID = _windowID &+ 1 0061 let windowID = _windowID 0062 0063 let buffer = _buffer 0064 _buffer = [] 0065 forwardOn(.Next(buffer)) 0066 0067 createTimer(windowID) 0068 } 0069 0070 func on(event: Event<E>) { 0071 synchronizedOn(event) 0072 } 0073 0074 func _synchronized_on(event: Event<E>) { 0075 switch event { 0076 case .Next(let element): 0077 _buffer.append(element) 0078 0079 if _buffer.count == _parent._count { 0080 startNewWindowAndSendCurrentOne() 0081 } 0082 0083 case .Error(let error): 0084 _buffer = [] 0085 forwardOn(.Error(error)) 0086 dispose() 0087 case .Completed: 0088 forwardOn(.Next(_buffer)) 0089 forwardOn(.Completed) 0090 dispose() 0091 } 0092 } 0093 0094 func createTimer
Buffer.swift:80 startNewWindowAndSendCurrentOne()Buffer.swift:113 self.startNewWindowAndSendCurrentOne()(windowID: Int) { 0095 if _timerD.disposed { 0096 return 0097 } 0098 0099 if _windowID != windowID { 0100 return 0101 } 0102 0103 let nextTimer = SingleAssignmentDisposable() 0104 0105 _timerD.disposable = nextTimer 0106 0107 nextTimer.disposable = _parent._scheduler.scheduleRelative(windowID, dueTime: _parent._timeSpan) { previousWindowID in 0108 self._lock.performLocked { 0109 if previousWindowID != self._windowID { 0110 return 0111 } 0112 0113 self.startNewWindowAndSendCurrentOne() 0114 } 0115 0116 return NopDisposable.instance 0117 } 0118 } 0119 }
Buffer.swift:55 createTimer(_windowID)Buffer.swift:67 createTimer(windowID)