深入探究!Java 多线程相互等待机制

前言

在刚开始学习Java编程时,我们的程序通常在main方法中按照自上而下的顺序逐行执行。随着不断深入的学习和实践经验的积累,特别是在实习期间,我接触到了Java多线程的应用领域。多线程技术可以充分挖掘单台服务器的计算潜力,依据服务器的负载情况和CPU核心数,灵活调整线程数量,从而显著提升程序的执行效率。

当时,我对多线程的理解还不够深入,主要是根据服务器的负载来调整线程数量。从实际应用效果来看,在线程之间无需交互的场景中,多线程技术的优势尤为突出。以处理Kafka数据为例,通过多线程并行处理,可以快速高效地完成数据的读取、解析和存储,简单易用且效果显著。

然而,当面对需要线程间紧密交互的场景时,情况就变得复杂。例如,当线程A、B和C需要共同协作完成一项任务时,A完成自身工作后,必须等待B和C全部完成才能继续下一步。如果自己实现这种复杂的线程协作逻辑,不仅耗时耗力,还容易出现死锁、数据不一致等多线程编程中的常见问题。

为了更好地应对这类场景,今天我将详细介绍两种强大的并发工具类:CountDownLatchCyclicBarrier。它们能够有效简化线程间的协作流程,提高程序的稳定性和可维护性,让我们在多线程编程中更加游刃有余。

CountDownLatch

CountDownLatch可以理解成 倒计时计数器,它可以让多个线程 等待某些任务完成后再继续执行。如果你以前遇到过“等所有子任务执行完再汇总结果”的问题。

使用

当我们使用new CountDownLatch(N)创建对象时,构造函数中的N作为计数器,通常等于需要完成任务的线程数量。这一设计使得CountDownLatch能够精确地跟踪和控制线程任务的完成情况。

对于开发者来说,使用CountDownLatch时主要需要关注以下两个关键方法:

  1. countDown():当某个任务完成后,调用此方法会使计数器N的值减少1。这类似于接力赛中每完成一棒交接就记录一次,帮助团队明确剩余任务的数量。通过反复调用countDown(),计数器的值逐渐减少,反映任务的完成进度。

  2. await():调用await()方法会使当前线程进入等待状态,直到计数器的值降为0。这就像在团队项目中,当一个成员完成自己的部分后,需要等待其他成员全部完成,才能继续推进项目。只有当所有任务都调用countDown()将计数器减至0时,等待中的线程才会被唤醒,继续执行后续的代码。

如果不想无限等待,可以指定超时时间:

latch.await(5, TimeUnit.SECONDS);

上面代码表示,如果 5 秒内 countDown() 没有执行完, await() 会自动超时并继续执行。

举个示例

这里举个例子,假设你是一个老师,考试结束后,你必须等所有学生交卷( countDown())后,才能收卷走人( await())。

我们先定义一个学生类Student:

class Student implements Runnable {
    private CountDownLatch latch;
    private int id;

    public Student(CountDownLatch latch, int id) {
        this.latch = latch;
        this.id = id;
    }

    @Override
    public void run() {
        System.out.println("学生 " + id + " 交卷");
        latch.countDown(); // 交卷后,计数器减 1
    }
}

在Student类中,通过构造参数传入CountDownLatch,并实现Runnable线程类的run(),通过调用 countDown() 完成N - 1来模拟学生的交卷动作。然后实现一个主类,主线程main就扮演老师的角色。

public class CountDownLatchDemo {
    public static void main(String[] args) throws InterruptedException {
        int studentCount = 5;
        CountDownLatch latch = new CountDownLatch(studentCount);

        for (int i = 1; i <= studentCount; i++) {
            new Thread(new Student(latch, i)).start();
        }
        // 等待所有学生交卷(N变成0)
        latch.await();
        System.out.println("老师收卷,考试结束!");
    }
}

在代码中创建五个学生(线程),然后启动程序。我们知道,在main中的代码是顺序执行的,正常情况下,创建并启动线程之后,main函数是接着执行的,也就是说,我不管学生交卷与否,我直接收卷结束考试。

但是在上面main函数中,添加了一行代码 latch.await() ,也就意味着必须要等到CountDownLatch中的N变成0(即所有人交卷),才能继续执行,如图所示:

我们知道多线程的情况下,每个线程执行完成的顺序是没有规律的,但是不论谁先完成都要等待。

CyclicBarrier

在多线程并发工具中,除了CountDownLatch之外, CyclicBarrier 也具有这种”等待“功能,CyclicBarrier翻译为 循环屏障,但与CountDownLatch的使用上游不同之处:

比较维度 CountDownLatch CyclicBarrier
功能 一次性计数器,允许一个或多个线程等待其他线程完成一组操作后再继续执行 使一组线程相互等待,直到所有线程都到达某个屏障点,然后一起继续执行,可重复使用
使用方式 在需要等待的线程中调用await()等待,在完成任务的线程中调用countDown()减少计数器 所有参与协作的线程都调用await(),当最后一个线程调用await()时,所有线程被释放
计数器重置 不能重置,计数器减到0后无法再次用于相同等待场景 可通过reset()方法重置,能多次用于线程同步
内部实现机制 基于AQS实现,利用AQS状态表示计数器值,通过CAS操作修改状态 基于ReentrantLock和Condition实现,利用重入锁保证线程安全,条件变量实现线程等待和唤醒
适用场景 多线程任务汇总,如多个线程读取不同数据源数据后汇总分析;服务启动准备,确保依赖服务和资源准备好后启动核心业务 多阶段任务同步,如分布式计算MapReduce过程中每轮任务同步;并发数据处理,多线程处理数据多轮迭代时的同步

简单来说:

  1. CyclicBarrier可复用
  2. CyclicBarrier是线程本身等待,CountDownLatch是主线程等待

使用

可能上面说的很难理解,我们先从用法开始看起:

  1. CyclicBarrier(N):设置需要等待的线程数
  2. await():线程到达屏障,N - 1并等待其他线程

从用法中可以看出,await 既能使计数器 -1,还能等待,那么CountDownLatch中,计数器-1是在线程中调用的,那么我们就在线程中调用await()。

举个示例

class Student implements Runnable {
    private CyclicBarrier cyclicBarrier;
    private int id;

    public Student(CyclicBarrier cyclicBarrier, int id) {
        this.cyclicBarrier = cyclicBarrier;
        this.id = id;
    }

    @SneakyThrows
    @Override
    public void run() {
        System.out.println("学生 " + id + " 交卷");
        cyclicBarrier.await();
    }
}

此时,我们在main线程中也调用await等待所有线程完成。

public static void main(String[] args) throws InterruptedException {
    int studentCount = 5;
    CyclicBarrier cyclicBarrier = new CyclicBarrier(studentCount + 1);

    for (int i = 1; i <= studentCount; i++) {
        new Thread(new Student(cyclicBarrier, i)).start();
    }

    cyclicBarrier.await();
    System.out.println("老师收卷,考试结束!");
}

与CountDownLatch不同的是,有5个学生,但是这里的N是6,因为老师也要等待所有学生完成之后,自己才能结束考试。

思考

同样一个交卷场景下,CountDownLatch用的就挺舒服,但是CyclicBarrier就比较别扭,因为你想,考试的情况下,明明只要老师等待所有人交卷结束考试即可,为什么学生之间还需要等待呢?

所以CyclicBarrier比较适合的场景是: 学生互相等待一起做一件事情。同样是考试,老师要求必须所有学生到齐了才能发卷考试,我们将上面run()中的“交卷”改为“到达教室”,然后实现main函数。

public static void main(String[] args) {
        int studentCount = 5;
        CyclicBarrier cyclicBarrier = new CyclicBarrier(studentCount, () -> {
            System.out.println("开始考试");
        });

        for (int i = 1; i <= studentCount; i++) {
            new Thread(new Student(cyclicBarrier, i)).start();
        }
    }
}

在上面main中无需调用await等待。在CyclicBarrier我们除了传入N,还可以传入一个回调函数,当N为0时,会执行回调函数。所以CyclicBarrie更适合线程之间等待的场景,代码执行结果如下:

结语

在实际应用中,何时选用CountDownLatch,又何时使用CyclicBarrier呢?

  • 适用CountDownLatch的场景:当你需要等待一组线程全部执行完毕后,再去执行主任务时,CountDownLatch便是合适之选。例如,在一个复杂的系统启动流程中,多个服务需要依次启动,只有当所有这些服务都成功启动后,整个系统才能正常运行。此时,利用CountDownLatch就可以实现主线程等待所有服务启动线程完成任务,进而保障系统的有序启动。
  • 适用CyclicBarrier的场景:若需求是多个线程彼此相互等待,直到所有线程都准备就绪后,再一起执行后续操作,CyclicBarrier则更为匹配。以多人组队游戏为例,在游戏开始前,所有玩家都需要完成资源加载。只有当每一位玩家都加载完成,也就是所有线程(玩家加载线程)都到达了这个“加载完成”的屏障点时,游戏才能正式开始,这正是CyclicBarrier的典型应用场景。

二者的关键区别在于参与主体和等待关系。CountDownLatch的参与主体是线程和主线程,线程之间无需相互等待,主要是主线程等待线程组完成任务,执行任务的主体是主线程。而CyclicBarrier的参与主体仅为线程,线程之间需要相互等待,执行任务的主体是这些相互协作的线程本身 。

文章来源: https://study.disign.me/article/202508/10.java-multithreading-wait.md

发布时间: 2025-02-20

作者: 技术书栈编辑