RxSwift——Observable补充&&Timer
2023-12-22 21:12:59
Observable可观察序列的继承链
Observable可观察序列的继承链大致如下:
- ObservableBase<T>
- Observable<T>
- AnonymousObservable<T>
- AnonymousObservableSink<T, TError>
AnonymousObservableSink
AnonymousObservableSink是AnonymousObservable的嵌套类型,它是一个观察者,负责将事件传递给订阅者。
class AnonymousObservableSink<T, TError>: ObserverBase<T>, IDisposable {
override func on(event: Event<T, TError>) {
switch event {
case .next(let value):
self.forwardOn(value)
case .error(let error):
self.forwardOnError(error)
case .completed:
self.forwardOnCompleted()
}
}
func dispose() {
self.isDisposed = true
}
// ... 其他方法
}
背压
背压是一个流控制机制,它允许订阅者控制它从Observable接收事件的速率。如果订阅者无法处理所有事件,它可以发出一个背压信号,这将导致Observable减慢事件的发射速率。
在RxSwift中,背压可以通过subscribe(onNext:onError:onCompleted:onSubscribe:)
方法中的onSubscribe
参数来实现。onSubscribe
是一个回调函数,它会在订阅者订阅Observable时被调用。订阅者可以通过onSubscribe
函数来发送一个背压信号,这将导致Observable减慢事件的发射速率。
let observable = Observable<Int>.interval(RxTimeInterval.milliseconds(10), scheduler: MainScheduler.instance)
observable.subscribe(onNext: { value in
print(value)
}, onError: { error in
print(error)
}, onCompleted: {
print("Completed")
}, onSubscribe: { disposable in
// 发送背压信号
disposable.dispose()
})
Schedulers
Schedulers是RxSwift中的一个重要概念,它用于调度事件的执行。在RxSwift中,有两种类型的Schedulers:
- Main Scheduler :这是应用程序的主线程上的Scheduler。
- Concurrent Scheduler :这是一个并发线程上的Scheduler。
在RxSwift中,可以使用subscribeOn
和observeOn
操作符来指定事件的执行Scheduler。subscribeOn
操作符指定Observable事件的订阅Scheduler,observeOn
操作符指定Observable事件的观察Scheduler。
let observable = Observable<Int>.interval(RxTimeInterval.milliseconds(10), scheduler: MainScheduler.instance)
observable.subscribeOn(ConcurrentScheduler.instance)
.observeOn(MainScheduler.instance)
.subscribe(onNext: { value in
print(value)
}, onError: { error in
print(error)
}, onCompleted: {
print("Completed")
})
在上面的示例中,Observable事件的订阅Scheduler是ConcurrentScheduler.instance,Observable事件的观察Scheduler是MainScheduler.instance。这意味着Observable事件将在ConcurrentScheduler.instance线程上订阅,并在MainScheduler.instance线程上观察。