返回

RxJava的subscribeOn与observeOn背后的实现原理解析

Android

引言
在前面的文章中,我们对RxJava的定义、订阅、事件产生和消费的过程进行了梳理,并对源码进行了解析。而在本文中,我们将重点关注RxJava的subscribeOn和observeOn这两个关键操作符的源码实现,以便更深入地理解RxJava的线程调度机制。

RxJava中的线程调度机制

RxJava中的线程调度机制是一个非常重要的概念,它允许我们控制事件的处理线程。通过使用subscribeOn和observeOn这两个操作符,我们可以指定事件产生和消费的线程。

subscribeOn

subscribeOn操作符可以指定事件产生的线程。当我们使用subscribeOn操作符时,它会创建一个新的Scheduler,并把事件源(Observable)和这个Scheduler关联起来。当事件产生时,Scheduler会把事件分发到指定的线程上进行处理。

observeOn

observeOn操作符可以指定事件消费的线程。当我们使用observeOn操作符时,它也会创建一个新的Scheduler,并把事件消费者(Observer)和这个Scheduler关联起来。当事件被消费时,Scheduler会把事件分发到指定的线程上进行处理。

源码解析

为了更好地理解subscribeOn和observeOn这两个操作符的实现原理,我们来看看它们的源码。

subscribeOn

subscribeOn操作符的源码位于io.reactivex.internal.operators.observable.ObservableSubscribeOn中。

public Observable<T> subscribeOn(Scheduler scheduler) {
    return create(new SubscribeOnObservable(this, scheduler));
}

从源码中可以看出,subscribeOn操作符会创建一个新的Observable,这个Observable的实现类是SubscribeOnObservable。SubscribeOnObservable中会保存源Observable和Scheduler。

public SubscribeOnObservable(Observable<T> source, Scheduler scheduler) {
    this.source = source;
    this.scheduler = scheduler;
}

当SubscribeOnObservable被订阅时,它会创建一个新的Subscriber,这个Subscriber的实现类是SubscribeOnSubscriber。

@Override
public void subscribeActual(Observer<? super T> observer) {
    SubscribeOnObserver<T> parent = new SubscribeOnObserver<>(observer, scheduler);
    source.subscribe(parent);
}

SubscribeOnSubscriber中会保存源Subscriber和Scheduler。

public SubscribeOnObserver(Observer<? super T> actual, Scheduler scheduler) {
    this.actual = actual;
    this.scheduler = scheduler;
}

当SubscribeOnSubscriber收到事件时,它会调用Scheduler的schedule方法,把事件分发到指定的线程上进行处理。

@Override
public void onNext(T t) {
    scheduler.schedule(new OnNextAction<>(actual, t));
}

observeOn

observeOn操作符的源码位于io.reactivex.internal.operators.observable.ObservableObserveOn中。

public Observable<T> observeOn(Scheduler scheduler) {
    return create(new ObserveOnObservable<>(this, scheduler));
}

从源码中可以看出,observeOn操作符会创建一个新的Observable,这个Observable的实现类是ObserveOnObservable。ObserveOnObservable中会保存源Observable和Scheduler。

public ObserveOnObservable(Observable<T> source, Scheduler scheduler) {
    this.source = source;
    this.scheduler = scheduler;
}

当ObserveOnObservable被订阅时,它会创建一个新的Subscriber,这个Subscriber的实现类是ObserveOnSubscriber。

@Override
public void subscribeActual(Observer<? super T> observer) {
    ObserveOnObserver<T> parent = new ObserveOnObserver<>(observer, scheduler);
    source.subscribe(parent);
}

ObserveOnSubscriber中会保存源Subscriber和Scheduler。

public ObserveOnObserver(Observer<? super T> actual, Scheduler scheduler) {
    this.actual = actual;
    this.scheduler = scheduler;
}

当ObserveOnSubscriber收到事件时,它会调用Scheduler的schedule方法,把事件分发到指定的线程上进行处理。

@Override
public void onNext(T t) {
    scheduler.schedule(new OnNextAction<>(actual, t));
}

总结

通过对源码的解析,我们了解到了subscribeOn和observeOn这两个操作符的实现原理。subscribeOn操作符可以指定事件产生的线程,而observeOn操作符可以指定事件消费的线程。通过使用这两个操作符,我们可以更好地控制事件的处理流程。