0001    //
0002    //  Window.swift
0003    //  Rx
0004    //
0005    //  Created by Junior B. on 29/10/15.
0006    //  Copyright © 2015 Krunoslav Zaher. All rights reserved.
0007    //
0008    
0009    import Foundation
0010    
0011    class WindowTimeCountSink
Window.swift:148
        let sink = WindowTimeCountSink(parent: self, observer: observer)
<Element
Window.swift:16
    typealias Parent = WindowTimeCount<Element>
Window.swift:17
    typealias E = Element
Window.swift:23
    private var _subject = PublishSubject<Element>()
Window.swift:51
        _subject = PublishSubject<Element>()
, O
Window.swift:12
    : Sink<O>
Window.swift:31
    init(parent: Parent, observer: O) {
: ObserverType where O.E == Observable<Element>> 0012 : Sink<O> 0013 , ObserverType 0014 , LockOwnerType 0015 , SynchronizedOnType { 0016 typealias Parent
Window.swift:19
    private let _parent: Parent
Window.swift:31
    init(parent: Parent, observer: O) {
= WindowTimeCount<Element> 0017 typealias E
Window.swift:56
    func on(event: Event<E>) {
Window.swift:60
    func _synchronized_on(event: Event<E>) {
= Element 0018 0019 private let _parent
Window.swift:32
        _parent = parent
Window.swift:45
        _groupDisposable.addDisposable(_parent._source.subscribeSafe(self))
Window.swift:75
            if (_count == _parent._count) {
Window.swift:111
        nextTimer.disposable = _parent._scheduler.scheduleRelative(windowId, dueTime: _parent._timeSpan) { previousWindowId in
Window.swift:111
        nextTimer.disposable = _parent._scheduler.scheduleRelative(windowId, dueTime: _parent._timeSpan) { previousWindowId in
: Parent 0020 0021 let _lock
Window.swift:115
            self._lock.performLocked {
= NSRecursiveLock() 0022 0023 private var _subject
Window.swift:42
        forwardOn(.Next(AddRef(source: _subject, refCount: _refCountDisposable).asObservable()))
Window.swift:50
        _subject.on(.Completed)
Window.swift:51
        _subject = PublishSubject<Element>()
Window.swift:53
        forwardOn(.Next(AddRef(source: _subject, refCount: _refCountDisposable).asObservable()))
Window.swift:66
            _subject.on(.Next(element))
Window.swift:71
                _subject.on(.Error(e as ErrorType))
Window.swift:84
            _subject.on(.Error(error))
Window.swift:88
            _subject.on(.Completed)
= PublishSubject<Element>() 0024 private var _count
Window.swift:69
                try incrementChecked(&_count)
Window.swift:75
            if (_count == _parent._count) {
Window.swift:77
                _count = 0
Window.swift:120
                self._count = 0
= 0 0025 private var _windowId
Window.swift:43
        createTimer(_windowId)
Window.swift:78
                _windowId += 1
Window.swift:79
                newId = _windowId
Window.swift:103
        if _windowId != windowId {
Window.swift:116
                if previousWindowId != self._windowId {
Window.swift:121
                self._windowId = self._windowId &+ 1
Window.swift:121
                self._windowId = self._windowId &+ 1
Window.swift:122
                newId = self._windowId
= 0 0026 0027 private let _timerD
Window.swift:34
        _groupDisposable.addDisposable(_timerD)
Window.swift:99
        if _timerD.disposed {
Window.swift:109
        _timerD.disposable = nextTimer
= SerialDisposable() 0028 private let _refCountDisposable
Window.swift:36
        _refCountDisposable = RefCountDisposable(disposable: _groupDisposable)
Window.swift:42
        forwardOn(.Next(AddRef(source: _subject, refCount: _refCountDisposable).asObservable()))
Window.swift:46
        return _refCountDisposable
Window.swift:53
        forwardOn(.Next(AddRef(source: _subject, refCount: _refCountDisposable).asObservable()))
: RefCountDisposable 0029 private let _groupDisposable
Window.swift:34
        _groupDisposable.addDisposable(_timerD)
Window.swift:36
        _refCountDisposable = RefCountDisposable(disposable: _groupDisposable)
Window.swift:45
        _groupDisposable.addDisposable(_parent._source.subscribeSafe(self))
= CompositeDisposable() 0030 0031 init
Window.swift:148
        let sink = WindowTimeCountSink(parent: self, observer: observer)
(parent: Parent, observer: O) { 0032 _parent = parent 0033 0034 _groupDisposable.addDisposable(_timerD) 0035 0036 _refCountDisposable = RefCountDisposable(disposable: _groupDisposable) 0037 super.init(observer: observer) 0038 } 0039 0040 func run
Window.swift:149
        sink.disposable = sink.run()
() -> Disposable { 0041 0042 forwardOn(.Next(AddRef(source: _subject, refCount: _refCountDisposable).asObservable())) 0043 createTimer(_windowId) 0044 0045 _groupDisposable.addDisposable(_parent._source.subscribeSafe(self)) 0046 return _refCountDisposable 0047 } 0048 0049 func startNewWindowAndCompleteCurrentOne
Window.swift:80
                self.startNewWindowAndCompleteCurrentOne()
Window.swift:123
                self.startNewWindowAndCompleteCurrentOne()
() { 0050 _subject.on(.Completed) 0051 _subject = PublishSubject<Element>() 0052 0053 forwardOn(.Next(AddRef(source: _subject, refCount: _refCountDisposable).asObservable())) 0054 } 0055 0056 func on(event: Event<E>) { 0057 synchronizedOn(event) 0058 } 0059 0060 func _synchronized_on(event: Event<E>) { 0061 var newWindow = false 0062 var newId = 0 0063 0064 switch event { 0065 case .Next(let element): 0066 _subject.on(.Next(element)) 0067 0068 do { 0069 try incrementChecked(&_count) 0070 } catch (let e) { 0071 _subject.on(.Error(e as ErrorType)) 0072 dispose() 0073 } 0074 0075 if (_count == _parent._count) { 0076 newWindow = true 0077 _count = 0 0078 _windowId += 1 0079 newId = _windowId 0080 self.startNewWindowAndCompleteCurrentOne() 0081 } 0082 0083 case .Error(let error): 0084 _subject.on(.Error(error)) 0085 forwardOn(.Error(error)) 0086 dispose() 0087 case .Completed: 0088 _subject.on(.Completed) 0089 forwardOn(.Completed) 0090 dispose() 0091 } 0092 0093 if newWindow { 0094 createTimer(newId) 0095 } 0096 } 0097 0098 func createTimer
Window.swift:43
        createTimer(_windowId)
Window.swift:94
            createTimer(newId)
Window.swift:126
            self.createTimer(newId)
(windowId: Int) { 0099 if _timerD.disposed { 0100 return 0101 } 0102 0103 if _windowId != windowId { 0104 return 0105 } 0106 0107 let nextTimer = SingleAssignmentDisposable() 0108 0109 _timerD.disposable = nextTimer 0110 0111 nextTimer.disposable = _parent._scheduler.scheduleRelative(windowId, dueTime: _parent._timeSpan) { previousWindowId in 0112 0113 var newId = 0 0114 0115 self._lock.performLocked { 0116 if previousWindowId != self._windowId { 0117 return 0118 } 0119 0120 self._count = 0 0121 self._windowId = self._windowId &+ 1 0122 newId = self._windowId 0123 self.startNewWindowAndCompleteCurrentOne() 0124 } 0125 0126 self.createTimer(newId) 0127 0128 return NopDisposable.instance 0129 } 0130 } 0131 } 0132 0133 class WindowTimeCount
Observable+Time.swift:236
            return WindowTimeCount(source: self.asObservable(), timeSpan: timeSpan, count: count, scheduler: scheduler)
Window.swift:16
    typealias Parent = WindowTimeCount<Element>
<Element
Window.swift:133
class WindowTimeCount<Element> : Producer<Observable<Element>> {
Window.swift:138
    private let _source: Observable<Element>
Window.swift:140
    init(source: Observable<Element>, timeSpan: RxTimeInterval, count: Int, scheduler: SchedulerType) {
Window.swift:147
    override func run<O : ObserverType where O.E == Observable<Element>>(observer: O) -> Disposable {
> : Producer<Observable<Element>> { 0134 0135 private let _timeSpan
Window.swift:111
        nextTimer.disposable = _parent._scheduler.scheduleRelative(windowId, dueTime: _parent._timeSpan) { previousWindowId in
Window.swift:142
        _timeSpan = timeSpan
: RxTimeInterval 0136 private let _count
Window.swift:75
            if (_count == _parent._count) {
Window.swift:143
        _count = count
: Int 0137 private let _scheduler
Window.swift:111
        nextTimer.disposable = _parent._scheduler.scheduleRelative(windowId, dueTime: _parent._timeSpan) { previousWindowId in
Window.swift:144
        _scheduler = scheduler
: SchedulerType 0138 private let _source
Window.swift:45
        _groupDisposable.addDisposable(_parent._source.subscribeSafe(self))
Window.swift:141
        _source = source
: Observable<Element> 0139 0140 init
Observable+Time.swift:236
            return WindowTimeCount(source: self.asObservable(), timeSpan: timeSpan, count: count, scheduler: scheduler)
(source: Observable<Element>, timeSpan: RxTimeInterval, count: Int, scheduler: SchedulerType) { 0141 _source = source 0142 _timeSpan = timeSpan 0143 _count = count 0144 _scheduler = scheduler 0145 } 0146 0147 override func run<O : ObserverType where O.E == Observable<Element>>(observer: O) -> Disposable { 0148 let sink = WindowTimeCountSink(parent: self, observer: observer) 0149 sink.disposable = sink.run() 0150 return sink 0151 } 0152 } 0153