Skip to content

线程与线程池

约 2619 字大约 9 分钟

2026-02-09

创建方式

继承 Thread 类

类重写run(),通过调用类的start()方法启动线程

class MyThread extends Thread {
    @Override
    public void run() {
        System.out.println("通过继承 Thread 创建线程");
    }
}

// 使用
new MyThread().start();

CompletableFuture

Future接口:

  • 取消任务
  • 判断任务是否被取消
  • 判断任务是否已经执行完成
  • 获取任务的执行结果 CompletableFuture实现了FutureCompletionStage接口

任务设定

CompletableFuture 有两种运行异步任务的方法

  • supplyAsync(Supplier<U> supplier)
  • runAsync(Runnable runnable) 前者接收一个具有返回值的Supplier<U>,用于异步计算并返回结果;后者接收一个Runnable,用于异步执行只产生副作用的任务

中断与超时处理

CompletableFuturecancel(true) 无法打断正在运行的线程。

  • 原因:它与具体执行线程解耦,且为了防止污染线程池(如 ForkJoinPool)。
  • 应对阻塞任务
    1. 使用 orTimeout(long, TimeUnit) (Java 9+) 设置超时。
    2. 若任务陷入 IO 阻塞(如 Socket),需通过关闭底层连接(Socket.close())来强制唤醒,而非依赖 interrupt()

并行任务

allOf:接收多个CompletableFuture任务,当所有的任务完成后返回结果

CompletableFuture<Void> task1 = CompletableFuture.runAsync(() -> {  
    try {  
        Thread.sleep(2000);  
        System.out.println("task1 completed");  
    } catch (InterruptedException e) {  
        throw new RuntimeException(e);  
    }  
});  
CompletableFuture<Void> task2 = CompletableFuture.runAsync(() -> {  
    try {  
        Thread.sleep(1000);  
        System.out.println("task2 completed");  
    } catch (InterruptedException e) {  
        throw new RuntimeException(e);  
    }  
});  
  
CompletableFuture<Void> combined = CompletableFuture.allOf(task1, task2);  
combined.join();

输出结果:

task2 completed
task1 completed

因为等待所有的结果完成,因此其返回的CompletableFuture类型为Void

anyOf:接收多个CompletableFuture任务,当任意一个任务完成后将其作为结果返回

CompletableFuture<Integer> task1 = CompletableFuture.supplyAsync(() -> {  
    try {  
        Thread.sleep(2000);  
        System.out.println("task1 completed");  
        return 1;  
    } catch (InterruptedException e) {  
        throw new RuntimeException(e);  
    }  
});  
CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> {  
    try {  
        Thread.sleep(1000);  
        System.out.println("task2 completed");  
        return 2;  
    } catch (InterruptedException e) {  
        throw new RuntimeException(e);  
    }  
});  
  
CompletableFuture<Object> combined = CompletableFuture.anyOf(task1, task2);  
combined.join();  
System.out.println(combined.get());

输出:

task2 completed
2

task2 先于 task1 完成,因此它被作为返回值赋予 combined,在combined.get()中将能够得到 task2 的返回值

实现 Runnable 接口

类实现Runnable接口,作为Thread构造方法创建新线程

class MyRunnable implements Runnable {
    @Override
    public void run() {
        System.out.println("通过实现 Runnable 创建线程");
    }
}

// 使用
new Thread(new MyRunnable()).start();

实现 Callable 接口 + FutureTask

实现Callable接口与Runnable类似,但其具有一个返回值 依靠FutureTask创建一个「任务」,其继承了[[#Future]],能够在将来获取异步结果

import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;

class MyCallable implements Callable<String> {
    @Override
    public String call() throws Exception {
        Thread.sleep(1000);
        return "任务完成";
    }
}

// 使用
Callable<String> callable = new MyCallable();
FutureTask<String> futureTask = new FutureTask<>(callable);
new Thread(futureTask).start();

try {
    String result = futureTask.get(); // 获取返回值
    System.out.println(result);
} catch (Exception e) {
    e.printStackTrace();
}

线程的中断与停止

Java 不推荐使用 stop() 强行终止线程(会导致数据不一致)。线程的停止是协作式的,主线程发出信号,子线程负责响应。

核心机制:interrupt()

调用 thread.interrupt() 并不会强制停止线程,而是设置中断标志位。

  • 阻塞状态(Sleep/Wait/Join)
    • 线程会被立即唤醒。
    • 清除中断标志位。
    • 抛出 InterruptedException
  • 运行状态(CPU 密集型/For 循环)
    • 不会自动抛出异常,线程会继续运行。
    • 必须由代码主动轮询标志位 Thread.currentThread().isInterrupted()

通用响应模版(优雅停机)

结合了“异常捕获”与“状态检查”的模式,支持手动回滚(Rollback)。

public void run() {
    try {
        while (!Thread.currentThread().isInterrupted()) {
            // 1. 密集计算中:主动检查
            // 2. 阻塞调用中:自动响应中断异常
            doWork(); 
        }
    } catch (InterruptedException e) {
        // 捕获中断信号
        System.out.println("收到停止指令,准备退出");
        // 如果 catch 后还需要继续检查状态,建议再次调用 Thread.currentThread().interrupt() 重置标志
    } finally {
        // 3. 兜底逻辑:资源释放、关闭连接、回滚事务
        cleanUp();
    }
}

### 使用线程池
通过线程池提交任务,由线程池管理和线程的复用和创建
```java
ExecutorService executor = Executors.newFixedThreadPool(2);

// 提交 Runnable 任务
executor.submit(() -> {
    System.out.println("通过线程池执行任务");
});

// 提交 Callable 任务(有返回值)
Future<String> future = executor.submit(() -> "返回结果");
try {
    System.out.println(future.get());
} catch (Exception e) {
    e.printStackTrace();
}

// 关闭线程池
executor.shutdown();

线程池

线程池状态

线程池是一个状态机,不同状态下存在不同的行为

状态名称定义允许的操作
RUNNING正常运行状态接受新任务
执行队列任务
SHUTDOWN平缓优雅关闭状态,完成队列中的任务后退出不接受新任务
执行队列任务
STOP强制关闭状态,不接收任务、不执行任务、中断正在执行的任务不接收新任务
不执行队列任务
TIDYING整理状态,所有任务执行完成、工作线程数为 0,准备进入终止状态过渡状态
TERMINATED终止状态,线程池关闭,所有资源已释放已关闭
  • RUNING:线程池创建后的默认状态,状态标记位为 32 位,高 3 位为状态码,低 29 位为线程数
    • 能够调用excute()submit()提交新任务
    • 工作线程会主动从任务队列中取任务执行
  • SHUTDOWN:平缓关闭状态,状态值为 0
    • 通过调用threadPool.shutdown()使线程池状态由RUNNINGSHUTDOWN
    • 先修改状态,再中断所有的空闲线程,等待所有任务完成后关闭线程池
    • 通过isTerminated判断是否所有的任务执行完成
  • STOP:强制关闭状态,立即关闭,不关心是否存在未完成的任务
    • 通过调用threadPool.shutdownNow()方法,使线程池由RUNNINGSTOP
    • 先修改状态,然后中断所有的工作线程,无论是否存在正在执行或未完成的任务,并将未执行的任务列表返回、清空任务列表
      • 中断工作线程指的是向线程发送中断信息,需要具体的任务执行对中断异常的捕获并处理,否则线程将会继续执行任务
  • TIDYING:过渡状态,表明线程池满足「所有任务处理完成 + 工作线程数为 0」,TIDYING 的作用是检查是否满足关闭条件,满足后进入终止态,因此TIDYING被称为「过渡态」
    • 当满足条件后,线程池由SHUTDOWNSTOPTIDYING
      • SHUTDOWN来:队列任务执行完 + 工作线程数为 0
      • STOP来:队列已清空 + 工作线程数为 0
    • tryTerminate()检查是否满足条件,若满足条件后调用terminated()钩子函数
  • TERMINATED:终止态,表明线程池完成所有的关闭动作

如何监控线程池的状态?

使用线程池自带方法

  • isRunnung():是否处于 RUNNING 状态
  • isShutdown():是否处于 SHUTDOWN/STOP/TIDYING/TERMINTED 状态
  • isTerminating():是否处于 TIDYING 状态
  • isTerminated():是否处于 TERMINATED 状态

使用 Executors 工具类创建线程

固定大小线程池

使用无界队列(LinkedBlockingQueue),可能导致内存溢出

ExecutorService fixedPool = Executors.newFixedThreadPool(3);

可缓存线程池

线程数可无限增长,空闲线程 60s 后回收

ExecutorService cachedPool = Executors.newCachedThreadPool();

手动创建线程池

线程池主要有两种

  • ThreadPoolExecutor:通用的线程池,适用于任务之间无关联的情况
  • ForkJoinPool:适用于父子任务之间存在关联的情况

ThreadPoolExecutor

使用ThreadPoolExecutor构造函数创建 初始线程数为 0,若当前线程小于核心线程数阻塞队列已满且未达到最大线程数,则当新任务到来会创建新的线程,对于超出核心线程数的空闲线程,会保持keepAliveTime后回收,corePoolSize个线程 若线程池已关闭,提交的任务将会执行拒绝策略 若线程池未关闭,任务队列已满且线程数已达最大线程数,执行拒绝策略

ThreadPoolExecutor executor = new ThreadPoolExecutor(
    2,                    // corePoolSize:核心线程数
    4,                    // maximumPoolSize:最大线程数
    60L,                  // keepAliveTime:非核心线程空闲存活时间
    TimeUnit.SECONDS,     // 时间单位
    new LinkedBlockingQueue<>(100), // 任务队列(有界)
    Executors.defaultThreadFactory(), // 线程工厂
    new ThreadPoolExecutor.AbortPolicy() // 拒绝策略
);

拒绝策略

  • AbortPolicy:直接抛出异常
  • CallerRunsPolicy:由提交任务的线程执行该任务
  • DiscardPolicy:丢弃该任务,不抛出异常
  • DiscardOldestPolicy:丢弃队列中最老的任务,添加新任务;若线程池已关闭,则等同于DiscardPolicy

线程数设置

IO密集型(在 IO 繁忙期间尽量提高 CPU 利用率): CPU 数 * (1 + 平均等待时间/平均计算时间) CPU 密集型: CPU 核心数 + 1

ForkJoinPool

线程数初始固定,因此适合 CPU 密集型,若对于 IO 密集型可能会导致出现 CPU 资源的「饥饿」情况

通过forkJoinPool.invoke(ForkJoinTask<T> task)调用一个 ForkJoinTask 任务,任务类主要实现了其中的exec方法,通过在执行方法中创建新的子任务并调用fork()方法,将该子任务放入当前工作线程的队列中 线程池中的空闲线程会掠夺其他工作线程中的任务队列中的任务 线程队列的拥有者从队列尾部添加和取出任务,而「掠夺」线程从队列的头部获取任务,避免干扰 image

在以下的代码中,工作线程将问题规模分为两半,一半通过fork()添加至队列中,另一半由自身进入递归 其他空闲线程通过掠夺队列中的任务,进行同样的规模分半操作执行任务

public class ForkJoinSumExample {
    public static void main(String[] args) {
        int[] array = new int[1_000_000];
        Arrays.setAll(array, i -> i + 1); // 1, 2, 3, ..., 1000000

        ForkJoinPool pool = new ForkJoinPool();
        SumTask task = new SumTask(array, 0, array.length);
        long start = System.currentTimeMillis();
        int sum = pool.invoke(task);
        long end = System.currentTimeMillis();

        System.out.println("Sum: " + sum); // 应为 500000500000
        System.out.println("Time: " + (end - start) + " ms");
        pool.shutdown();
    }
}

并发相关类

Future

表示异步计算的结果,对于一些耗时的任务场景可以交给子线程去执行,主线程继续执行其他任务,直至需要先前的异步结果时再等待

  • get():阻塞当前进程,直至获取结果,这使得所在代码块变为同步
  • cancel(boolean mayInterruptIfRunning):尝试取消任务。
    • true(干预模式):若任务正在运行,向执行线程发送 interrupt() 信号(适用于可中断任务)。
    • false(温和模式):若任务正在运行,允许其执行完毕;仅改变 Future 状态(适用于 IO 写操作或无法回滚的操作)。
    • 注意CompletableFutureForkJoinTask 的实现中,cancel(true) 不会中断正在执行的线程(为了保护线程池复用),它们仅改变任务状态。
  • isCancel():任务是否被取消,返回布尔
  • isDone:任务是否已完成

#review