返回

从LongAccumulator源码解析看Java原子操作的演进

后端

引言

在并发编程中,原子操作是保证共享数据一致性的重要手段。Java从JDK5开始引入原子操作类,包括AtomicLong、AtomicInteger等,这些类提供了对基本数据类型的原子操作支持。在JDK6中,又引入了LongAdder类,它提供了对Long类型变量的更有效的原子操作支持。在JDK8中,又引入了LongAccumulator类,它提供了对Long类型变量的更全面的原子操作支持。

LongAccumulator源码解析

LongAccumulator类位于java.util.concurrent.atomic包下,它是一个累加器类,可以对Long类型变量进行原子操作。LongAccumulator的源码如下:

public class LongAccumulator extends Striped64 implements Serializable {

    private static final long serialVersionUID = 7249069246763182397L;

    // 初始值
    private final long initialValue;

    // 累加器数组
    private final AtomicLong[] cells;

    // 构造函数
    public LongAccumulator(long initialValue, LongBinaryOperator function, int cells) {
        super(cells);
        this.initialValue = initialValue;
        this.cells = new AtomicLong[cells];
        for (int i = 0; i < cells; ++i)
            cells[i] = new AtomicLong(initialValue);
    }

    // 原子累加方法
    public long accumulate(long x) {
        int[] hc;
        AtomicLong[] r;
        int m;
        boolean wasUncontended;
        long v;
        int i;
        ThreadLocalRandom rnd = threadLocalRandom();
        if ((hc = baseHashControl) != null ||
                (hc = setBaseHashControl(i = rnd.nextInt())) != null) {
            m = hc.length - 1;
            if (m >= 0) {
                i &= m;
                wasUncontended = true;
                r = cells;
            }
        } else if ((r = cells) != null)
            m = r.length - 1;
        else
            throw new IllegalStateException("No cells");
        if (m < 0)
            longAccumulate(x, hc, bs, r, m, rnd);
        else if (wasUncontended) {
            return lngAdd(v = r[i].get(), x, r, i);
        } else
            return longAccumulate(x, hc, bs, r, m, rnd);
    }

    // 原子获取值方法
    public long get() {
        AtomicLong[] r;
        long sum = initialValue;
        if ((r = cells) != null) {
            for (AtomicLong a : r)
                sum += a.get();
        }
        return sum;
    }

    // 原子重置值方法
    public void reset() {
        AtomicLong[] r;
        if ((r = cells) != null)
            for (AtomicLong a : r)
                a.set(initialValue);
    }

    // 原子获取并设置值方法
    public long getAndSet(long newValue) {
        AtomicLong[] r;
        long sum = initialValue;
        if ((r = cells) != null) {
            for (AtomicLong a : r)
                sum += a.getAndSet(initialValue);
        }
        return sum;
    }

    // 原子更新值方法
    public long updateAndGet(LongUnaryOperator updateFunction) {
        long prev, next;
        do {
            prev = get();
            next = updateFunction.applyAsLong(prev);
        } while (!compareAndSet(prev, next));
        return next;
    }

    // 原子获取并更新值方法
    public long getAndUpdate(LongUnaryOperator updateFunction) {
        long prev, next;
        do {
            prev = get();
            next = updateFunction.applyAsLong(prev);
        } while (!compareAndSet(prev, next));
        return prev;
    }

    // 原子累加和方法
    public long accumulateAndGet(long x) {
        long prev, next;
        do {
            prev = get();
            next = prev + x;
        } while (!compareAndSet(prev, next));
        return next;
    }

    // 原子累加并获取值方法
    public long getAndAccumulate(long x) {
        long prev, next;
        do {
            prev = get();
            next = prev + x;
        } while (!compareAndSet(prev, next));
        return prev;
    }

    // 原子自增方法
    public long increment() {
        return lngAdd(get(), 1L, cells, bs);
    }

    // 原子自减方法
    public long decrement() {
        return lngAdd(get(), -1L, cells, bs);
    }

    // 原子添加方法
    public LongAdder longAdder() {
        return new LongAdder() {
            @Override
            public void add(long x) {
                accumulate(x);
            }

            @Override
            public long sum() {
                return get();
            }

            @Override
            public void reset() {
                LongAccumulator.this.reset();
            }
        };
    }

    // 原子比较并设置值方法
    public boolean compareAndSet(long expect, long update) {
        return lngCAS(cells, bs, expect, update);
    }

    // 原子累加方法,内部方法
    private final long longAccumulate(long x, int[] hc, Bulkable bs,
                                    AtomicLong[] r, int m, ThreadLocalRandom rnd) {
        long v;
        int i = m & rnd.nextInt();
        if ((v = r[i].get()) != x) {
            if (bs != null)
                v = bs.longAccumulate(v, x, hc, bs);
            else if (hc != null)
                v = lngAccumulate(v, x, hc, bs, m, rnd);
            else
                v = lngAdd(v, x, r, i);
        }
        return v;