返回

JUC 必备武器:熟练掌握 Callable 和 Future、JUC 三大辅助类、阻塞队列

后端

1. Callable 和 Future 接口

Callable 和 Future 接口是 Java 并发编程中的两个重要接口。Callable 接口允许您编写可中断的异步任务,而 Future 接口则允许您获取异步任务的结果。

Callable 接口类似于 Runnable 接口,但它有一个返回值。当您调用 Callable.call() 方法时,它将返回一个结果。您可以使用 Future 接口来获取 Callable.call() 方法的返回值。

Callable 和 Future 接口通常与 ExecutorService 一起使用。ExecutorService 可以管理一组线程,并可以将 Callable 任务提交给这些线程执行。当 Callable 任务执行完毕后,您可以使用 Future 接口来获取任务的结果。

2. JUC 三大辅助类

JUC 三大辅助类是指 CountDownLatch、Semaphore 和 CyclicBarrier。这些类可以帮助您处理并发编程中的常见问题。

CountDownLatch 可以帮助您等待一组任务全部执行完毕。当您调用 CountDownLatch.countDown() 方法时,计数器会减少 1。当计数器减少到 0 时,所有等待的线程都会被唤醒。

Semaphore 可以帮助您限制对共享资源的并发访问。当您调用 Semaphore.acquire() 方法时,如果信号量可用,则允许您获取该信号量。如果信号量不可用,则您需要等待,直到信号量可用。

CyclicBarrier 可以帮助您等待一组线程全部到达某个点。当您调用 CyclicBarrier.await() 方法时,如果所有线程都到达了该点,则所有线程都会被唤醒。如果只有部分线程到达了该点,则您需要等待,直到所有线程都到达该点。

3. 阻塞队列

阻塞队列是一种特殊的队列,它可以阻塞线程,直到队列中有元素可供消费。阻塞队列通常与生产者-消费者模式一起使用。

在生产者-消费者模式中,生产者线程将元素放入队列中,消费者线程从队列中取出元素。如果队列为空,消费者线程将被阻塞,直到生产者线程将元素放入队列中。

Java 并发包中提供了两种阻塞队列:ArrayBlockingQueue 和 LinkedBlockingQueue。ArrayBlockingQueue 是一个基于数组的阻塞队列,LinkedBlockingQueue 是一个基于链表的阻塞队列。

4. 示例代码

以下是一个使用 Callable 和 Future 接口的示例代码:

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class CallableFutureExample {

    public static void main(String[] args) throws Exception {
        // 创建一个线程池
        ExecutorService executorService = Executors.newFixedThreadPool(10);

        // 创建一个 Callable 任务
        Callable<Integer> task = new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                // 执行任务
                return 100;
            }
        };

        // 将任务提交给线程池
        Future<Integer> future = executorService.submit(task);

        // 获取任务结果
        Integer result = future.get();

        // 打印任务结果
        System.out.println(result);
    }
}

以下是一个使用 JUC 三大辅助类的示例代码:

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.CyclicBarrier;

public class JUCExample {

    public static void main(String[] args) throws Exception {
        // 创建一个 CountDownLatch
        CountDownLatch countDownLatch = new CountDownLatch(10);

        // 创建一个 Semaphore
        Semaphore semaphore = new Semaphore(5);

        // 创建一个 CyclicBarrier
        CyclicBarrier cyclicBarrier = new CyclicBarrier(10);

        // 创建 10 个线程
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                try {
                    // 获取信号量
                    semaphore.acquire();

                    // 执行任务
                    Thread.sleep(1000);

                    // 释放信号量
                    semaphore.release();

                    // 等待其他线程到达集合点
                    cyclicBarrier.await();

                    // 计数器减 1
                    countDownLatch.countDown();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }).start();
        }

        // 等待所有线程执行完毕
        countDownLatch.await();

        // 打印结果
        System.out.println("所有任务执行完毕");
    }
}

以下是一个使用阻塞队列的示例代码:

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class BlockingQueueExample {

    public static void main(String[] args) throws Exception {
        // 创建一个阻塞队列
        BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);

        // 创建生产者线程
        Thread producer = new Thread(() -> {
            for (int i = 0; i < 100; i++) {
                try {
                    // 将元素放入队列
                    queue.put(i);

                    // 打印生产者线程信息
                    System.out.println("生产者线程生产了元素:" + i);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

        // 创建消费者线程
        Thread consumer = new Thread(() -> {
            while (true) {
                try {
                    // 从队列中取出元素
                    Integer element = queue.take();

                    // 打印消费者线程信息
                    System.out.println("消费者线程消费了元素:" + element);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

        // 启动生产者线程和消费者线程
        producer.start();
        consumer.start();

        // 等待生产者线程和消费者线程执行完毕
        producer.join();
        consumer.join();
    }
}