返回
剖析Project Reactor中concat操作符的实现原理
后端
2024-01-21 07:55:26
Project Reactor是一个用于构建响应式系统的Java库,它提供了丰富的操作符来处理数据流。concat操作符是其中一个常用的操作符,它可以将多个Publisher按顺序拼接起来,并向下游转发这些Publisher发射出来的元素。本篇文章将通过分析concat操作符的源码,深入理解其实现原理。
源码分析
concat操作符的实现位于Reactor Core模块的FluxConcatIterable
类中。该类继承了Flux
类,并实现了concat
方法。concat
方法接收一个Publisher的Iterable作为参数,并返回一个新的Publisher,该Publisher将按顺序依次订阅参数中的Publisher,并向下游转发这些Publisher发射出来的元素。
public final class FluxConcatIterable<T> extends Flux<T> {
final Iterable<? extends Publisher<? extends T>> sources;
public FluxConcatIterable(Iterable<? extends Publisher<? extends T>> sources) {
this.sources = Objects.requireNonNull(sources, "sources");
}
@Override
public void subscribe(CoreSubscriber<? super T> actual) {
ConcatSubscription<T> parent = new ConcatSubscription<>(actual, sources);
actual.onSubscribe(parent);
parent.subscribe();
}
static final class ConcatSubscription<T>
implements FuseableSubscription<T>, SubscriptionArbiter {
final CoreSubscriber<? super T> actual;
final Iterator<? extends Publisher<? extends T>> sources;
Subscription current;
volatile int wip = 1;
volatile boolean cancelled;
boolean done;
ConcatSubscription(CoreSubscriber<? super T> actual,
Iterable<? extends Publisher<? extends T>> sources) {
this.actual = actual;
this.sources = sources.iterator();
}
void subscribe() {
if (SubscriptionHelper.validate(current, wip, this)) {
current = SubscriptionHelper.cancelled();
subscribeNext();
}
}
void subscribeNext() {
if (!done && !cancelled) {
while (!cancelled) {
if (!sources.hasNext()) {
done = true;
current = SubscriptionHelper.CANCELLED;
actual.onComplete();
return;
}
Publisher<? extends T> p = sources.next();
if (p == null) {
onError(new NullPointerException("The publisher returned is null"));
return;
}
current = p.subscribe(new ConcatInnerSubscriber<T>(this));
return;
}
}
}
// ... (剩下的代码)
}
}
从源码中可以看出,concat操作符的实现主要包含以下几个步骤:
- 创建一个新的Publisher,该Publisher将按顺序依次订阅参数中的Publisher,并向下游转发这些Publisher发射出来的元素。
- 当新的Publisher订阅参数中的第一个Publisher时,会创建一个新的Subscription,该Subscription负责订阅和管理参数中的第一个Publisher。
- 当参数中的第一个Publisher完成或出错时,新的Publisher会取消订阅该Subscription,并创建一个新的Subscription,该Subscription负责订阅和管理参数中的第二个Publisher。
- 依此类推,直到参数中的所有Publisher都完成或出错,新的Publisher才会完成或出错。
使用示例
concat操作符的使用非常简单,只需要将多个Publisher作为参数传入即可。例如,以下代码将两个Flux实例按顺序拼接起来,并向下游转发这些Flux实例发射出来的元素:
Flux<String> flux1 = Flux.just("Hello");
Flux<String> flux2 = Flux.just("World");
Flux.concat(flux1, flux2)
.subscribe(System.out::println);
输出结果为:
Hello
World
总结
concat操作符是一个非常实用的操作符,它可以将多个Publisher按顺序拼接起来,并向下游转发这些Publisher发射出来的元素。在实际开发中,concat操作符经常用于合并来自不同来源的数据流。