返回

RxSwift——Observable补充&&Timer

IOS

Observable可观察序列的继承链

Observable可观察序列的继承链大致如下:

  • ObservableBase<T>
  • Observable<T>
  • AnonymousObservable<T>
  • AnonymousObservableSink<T, TError>

AnonymousObservableSink

AnonymousObservableSink是AnonymousObservable的嵌套类型,它是一个观察者,负责将事件传递给订阅者。

class AnonymousObservableSink<T, TError>: ObserverBase<T>, IDisposable {
    override func on(event: Event<T, TError>) {
        switch event {
        case .next(let value):
            self.forwardOn(value)
        case .error(let error):
            self.forwardOnError(error)
        case .completed:
            self.forwardOnCompleted()
        }
    }

    func dispose() {
        self.isDisposed = true
    }

    // ... 其他方法
}

背压

背压是一个流控制机制,它允许订阅者控制它从Observable接收事件的速率。如果订阅者无法处理所有事件,它可以发出一个背压信号,这将导致Observable减慢事件的发射速率。

在RxSwift中,背压可以通过subscribe(onNext:onError:onCompleted:onSubscribe:)方法中的onSubscribe参数来实现。onSubscribe是一个回调函数,它会在订阅者订阅Observable时被调用。订阅者可以通过onSubscribe函数来发送一个背压信号,这将导致Observable减慢事件的发射速率。

let observable = Observable<Int>.interval(RxTimeInterval.milliseconds(10), scheduler: MainScheduler.instance)

observable.subscribe(onNext: { value in
    print(value)
}, onError: { error in
    print(error)
}, onCompleted: {
    print("Completed")
}, onSubscribe: { disposable in
    // 发送背压信号
    disposable.dispose()
})

Schedulers

Schedulers是RxSwift中的一个重要概念,它用于调度事件的执行。在RxSwift中,有两种类型的Schedulers:

  • Main Scheduler :这是应用程序的主线程上的Scheduler。
  • Concurrent Scheduler :这是一个并发线程上的Scheduler。

在RxSwift中,可以使用subscribeOnobserveOn操作符来指定事件的执行Scheduler。subscribeOn操作符指定Observable事件的订阅Scheduler,observeOn操作符指定Observable事件的观察Scheduler。

let observable = Observable<Int>.interval(RxTimeInterval.milliseconds(10), scheduler: MainScheduler.instance)

observable.subscribeOn(ConcurrentScheduler.instance)
    .observeOn(MainScheduler.instance)
    .subscribe(onNext: { value in
        print(value)
    }, onError: { error in
        print(error)
    }, onCompleted: {
        print("Completed")
    })

在上面的示例中,Observable事件的订阅Scheduler是ConcurrentScheduler.instance,Observable事件的观察Scheduler是MainScheduler.instance。这意味着Observable事件将在ConcurrentScheduler.instance线程上订阅,并在MainScheduler.instance线程上观察。