返回

剖析Project Reactor中concat操作符的实现原理

后端

Project Reactor是一个用于构建响应式系统的Java库,它提供了丰富的操作符来处理数据流。concat操作符是其中一个常用的操作符,它可以将多个Publisher按顺序拼接起来,并向下游转发这些Publisher发射出来的元素。本篇文章将通过分析concat操作符的源码,深入理解其实现原理。

源码分析

concat操作符的实现位于Reactor Core模块的FluxConcatIterable类中。该类继承了Flux类,并实现了concat方法。concat方法接收一个Publisher的Iterable作为参数,并返回一个新的Publisher,该Publisher将按顺序依次订阅参数中的Publisher,并向下游转发这些Publisher发射出来的元素。

public final class FluxConcatIterable<T> extends Flux<T> {

    final Iterable<? extends Publisher<? extends T>> sources;

    public FluxConcatIterable(Iterable<? extends Publisher<? extends T>> sources) {
        this.sources = Objects.requireNonNull(sources, "sources");
    }

    @Override
    public void subscribe(CoreSubscriber<? super T> actual) {
        ConcatSubscription<T> parent = new ConcatSubscription<>(actual, sources);
        actual.onSubscribe(parent);
        parent.subscribe();
    }

    static final class ConcatSubscription<T>
            implements FuseableSubscription<T>, SubscriptionArbiter {

        final CoreSubscriber<? super T> actual;

        final Iterator<? extends Publisher<? extends T>> sources;

        Subscription current;

        volatile int wip = 1;

        volatile boolean cancelled;

        boolean done;

        ConcatSubscription(CoreSubscriber<? super T> actual,
                           Iterable<? extends Publisher<? extends T>> sources) {
            this.actual = actual;
            this.sources = sources.iterator();
        }

        void subscribe() {
            if (SubscriptionHelper.validate(current, wip, this)) {
                current = SubscriptionHelper.cancelled();
                subscribeNext();
            }
        }

        void subscribeNext() {
            if (!done && !cancelled) {
                while (!cancelled) {
                    if (!sources.hasNext()) {
                        done = true;
                        current = SubscriptionHelper.CANCELLED;
                        actual.onComplete();
                        return;
                    }
                    Publisher<? extends T> p = sources.next();
                    if (p == null) {
                        onError(new NullPointerException("The publisher returned is null"));
                        return;
                    }
                    current = p.subscribe(new ConcatInnerSubscriber<T>(this));
                    return;
                }
            }
        }

        // ... (剩下的代码)
    }
}

从源码中可以看出,concat操作符的实现主要包含以下几个步骤:

  1. 创建一个新的Publisher,该Publisher将按顺序依次订阅参数中的Publisher,并向下游转发这些Publisher发射出来的元素。
  2. 当新的Publisher订阅参数中的第一个Publisher时,会创建一个新的Subscription,该Subscription负责订阅和管理参数中的第一个Publisher。
  3. 当参数中的第一个Publisher完成或出错时,新的Publisher会取消订阅该Subscription,并创建一个新的Subscription,该Subscription负责订阅和管理参数中的第二个Publisher。
  4. 依此类推,直到参数中的所有Publisher都完成或出错,新的Publisher才会完成或出错。

使用示例

concat操作符的使用非常简单,只需要将多个Publisher作为参数传入即可。例如,以下代码将两个Flux实例按顺序拼接起来,并向下游转发这些Flux实例发射出来的元素:

Flux<String> flux1 = Flux.just("Hello");
Flux<String> flux2 = Flux.just("World");

Flux.concat(flux1, flux2)
    .subscribe(System.out::println);

输出结果为:

Hello
World

总结

concat操作符是一个非常实用的操作符,它可以将多个Publisher按顺序拼接起来,并向下游转发这些Publisher发射出来的元素。在实际开发中,concat操作符经常用于合并来自不同来源的数据流。