RxJava2.x zip 源码解析
2023-11-19 15:38:20
距离前两篇文章已经过去三个月之久了,终于补上第三篇了。第三篇预期就是针对某一个操作符的源码进行解析,选择了 Observable.zip 的原因一是司里这块用的比较多,再一个笔者觉得这个操作符十分强大,想去探索一番 zip 操作符是如何实现这样的骚操作,如果读者还不了解 zip 操作符的话,可以参考 RxJava2.x Observable.zip() 方法详细解析 一文。本文将从 zip 的基本原理、源码解析和一些实际应用场景几个方面展开,我们开始吧!
zip 操作符介绍
zip 操作符是 RxJava2.x 中的一个非常强大的操作符,它可以将多个 Observable 合并成一个 Observable,并按照一定的规则将这些 Observable 中的数据组合起来。zip 操作符有两种重载形式:
- zip(ObservableSource<? extends T1> source1, ObservableSource<? extends T2> source2, BiFunction<? super T1, ? super T2, ? extends R> zipper)
- zip(Iterable<? extends ObservableSource<? extends T>> sources, Function<? super Object[], ? extends R> zipper)
对于第一个重载形式,它将两个 Observable 合并成一个 Observable,并使用提供的 BiFunction 函数将这两个 Observable 中的数据组合起来。对于第二个重载形式,它将多个 Observable 合并成一个 Observable,并使用提供的 Function 函数将这些 Observable 中的数据组合起来。
zip 操作符源码解析
zip 操作符的源码位于 io.reactivex.Observable 类中,我们先来看一下第一个重载形式的源码:
public static <T1, T2, R> Observable<R> zip(ObservableSource<? extends T1> source1, ObservableSource<? extends T2> source2, BiFunction<? super T1, ? super T2, ? extends R> zipper) {
return zip(Arrays.asList(source1, source2), Functions.toFunction(zipper));
}
从源码中可以看出,zip 操作符的第一个重载形式实际上是调用了第二个重载形式,并且将两个 Observable 和 BiFunction 函数包装成了一个 Iterable 和 Function。
我们再来看一下第二个重载形式的源码:
public static <T, R> Observable<R> zip(Iterable<? extends ObservableSource<? extends T>> sources, Function<? super Object[], ? extends R> zipper) {
return Observable.create(new ZipObservable(sources, zipper));
}
从源码中可以看出,zip 操作符的第二个重载形式创建了一个新的 Observable,并将其包装成了一个 ZipObservable 对象。ZipObservable 对象实现了 ObservableSource 接口,因此它可以被订阅。
ZipObservable 对象的 subscribe 方法如下:
@Override
public void subscribe(Observer<? super R> observer) {
ZipCoordinator<T, R> zc = new ZipCoordinator<>(observer, zipper, sources.size());
for (ObservableSource<? extends T> o : sources) {
o.subscribe(zc.next());
}
}
从源码中可以看出,ZipObservable 对象的 subscribe 方法创建了一个 ZipCoordinator 对象,并将其包装成了一个 Observer 对象。ZipCoordinator 对象实现了 Observer 接口,因此它可以被 Observable 订阅。
ZipCoordinator 对象的 next 方法如下:
public void next(T value) {
synchronized (gate) {
values[index] = value;
ready++;
if (ready == sources.size()) {
@SuppressWarnings("unchecked")
R r = (R) zipper.apply(values);
downstream.onNext(r);
index = 0;
ready = 0;
Arrays.fill(values, null);
}
}
}
从源码中可以看出,ZipCoordinator 对象的 next 方法将接收到的数据存储在一个数组中,并记录已经接收到的数据数量。当已经接收到的数据数量等于 Observable 的数量时,它将调用 zipper 函数将这些数据组合起来,并将其发送给 downstream Observer 对象。
zip 操作符的实际应用场景
zip 操作符的实际应用场景非常广泛,这里列举几个常见的场景:
- 将多个 Observable 合并成一个 Observable,并按照一定的规则将这些 Observable 中的数据组合起来。
- 将一个 Observable 与一个静态数据源合并成一个 Observable,并按照一定的规则将这些数据组合起来。
- 将一个 Observable 与一个事件源合并成一个 Observable,并按照一定的规则将这些数据组合起来。
总结
zip 操作符是 RxJava2.x 中的一个非常强大的操作符,它可以将多个 Observable 合并成一个 Observable,并按照一定的规则将这些 Observable 中的数据组合起来。zip 操作符的实际应用场景非常广泛,在实际开发中经常会用到。希望这篇文章能够帮助读者更好地理解和使用 zip 操作符。