返回

揭秘 RxJava observeOn 背后的秘密:将观察者引向正确的舞台

Android

从RxJava的灵魂深处透视observeOn

RxJava 是一个功能强大的反应式编程框架,它允许您以声明式的方式编写异步和基于事件的程序。在RxJava中,观察者是一个接口,它定义了如何处理被观察者发出的事件。调度程序是一个类,它负责将任务调度到不同的线程上执行。

observeOn 操作符允许您指定观察者将在哪个调度程序上观察被观察者。这意味着您可以控制事件处理的线程,从而可以优化应用程序的性能和可伸缩性。

observeOn的幕后机制:深入源代码

为了更深入地理解 observeOn 的工作原理,让我们来看看它的源代码。observeOn 操作符位于 rxjava 包中,它是一个静态方法,接受两个参数:被观察者和调度程序。

public static <T> Observable<T> observeOn(Observable<T> source, Scheduler scheduler) {
    return source.lift(new ObserveOnOperator<T>(scheduler));
}

ObserveOnOperator 是一个实现了 LiftOperator 接口的类。LiftOperator 接口定义了一个 lift 方法,该方法将一个操作符应用于被观察者,并返回一个新的被观察者。

public final class ObserveOnOperator<T> implements LiftOperator<T, T> {

    private final Scheduler scheduler;

    public ObserveOnOperator(Scheduler scheduler) {
        this.scheduler = scheduler;
    }

    @Override
    public ObservableOperator<T, T> call(Observable<T> source) {
        return new ObserveOnObservableOperator<T>(source, scheduler);
    }
}

ObserveOnObservableOperator 是一个实现了 ObservableOperator 接口的类。ObservableOperator 接口定义了 apply 方法,该方法将操作符应用于被观察者,并返回一个新的被观察者。

public final class ObserveOnObservableOperator<T> implements ObservableOperator<T, T> {

    private final Observable<T> source;

    private final Scheduler scheduler;

    public ObserveOnObservableOperator(Observable<T> source, Scheduler scheduler) {
        this.source = source;
        this.scheduler = scheduler;
    }

    @Override
    public Subscriber<? super T> call(Subscriber<? super T> subscriber) {
        ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(subscriber, scheduler);
        source.subscribe(parent);
        return parent;
    }
}

ObserveOnSubscriber 是一个实现了 Subscriber 接口的类。Subscriber 接口定义了 onNext、onError 和 onComplete 方法,用于处理被观察者发出的事件。

public final class ObserveOnSubscriber<T> extends Subscriber<T> {

    private final Subscriber<? super T> actual;

    private final Scheduler scheduler;

    private Queue<Object> queue;

    private boolean done;

    public ObserveOnSubscriber(Subscriber<? super T> actual, Scheduler scheduler) {
        this.actual = actual;
        this.scheduler = scheduler;
        this.queue = new ConcurrentLinkedQueue<>();
        this.done = false;
    }

    @Override
    public void onNext(T t) {
        queue.offer(NotificationLite.next(t));
        drain();
    }

    @Override
    public void onError(Throwable t) {
        done = true;
        queue.offer(NotificationLite.error(t));
        drain();
    }

    @Override
    public void onComplete() {
        done = true;
        queue.offer(NotificationLite.completed());
        drain();
    }

    private void drain() {
        scheduler.scheduleDirect(new Runnable() {
            @Override
            public void run() {
                drainLoop();
            }
        });
    }

    private void drainLoop() {
        for (;;) {
            Object obj = queue.poll();
            if (obj == null) {
                if (done) {
                    actual.onComplete();
                }
                return;
            }

            @SuppressWarnings("unchecked")
            NotificationLite<T> nl = (NotificationLite<T>) obj;
            nl.accept(actual);
        }
    }
}

从源代码中可以看出,observeOn 操作符的工作原理是:

  1. 首先,它创建一个新的被观察者,这个新的被观察者将继承原被观察者的所有事件。
  2. 然后,它创建一个新的观察者,这个新的观察者将继承原观察者的所有方法。
  3. 最后,它将新的观察者订阅到新的被观察者上。

这样,当原被观察者发出事件时,这些事件将被发送到新的被观察者上,然后由新的观察者来处理这些事件。

observeOn的妙用:优化应用程序的性能和可伸缩性

observeOn 操作符可以用来优化应用程序的性能和可伸缩性。例如,您可以将耗时的操作调度到单独的线程上执行,这样就不会阻塞主线程。您还可以将事件处理调度到不同的线程上执行,这样可以提高应用程序的并发性。

举一反三:observeOn的实例

Observable.just(1, 2, 3)
        .observeOn(Schedulers.io())
        .subscribe(new Subscriber<Integer>() {
            @Override
            public void onNext(Integer integer) {
                System.out.println(integer);
            }

            @Override
            public void onError(Throwable throwable) {

            }

            @Override
            public void onComplete() {

            }
        });

在这个例子中,我们使用 observeOn 操作符将事件处理调度到 io 调度程序上。这样,耗时的操作就不会阻塞主线程。

结语

observeOn 操作符是一个非常强大的工具,它可以用来优化应用程序的性能和可伸缩性。如果您正在使用 RxJava,那么您应该熟悉 observeOn 操作符的用法。