线程与线程池
约 2619 字大约 9 分钟
2026-02-09
创建方式
继承 Thread 类
类重写run(),通过调用类的start()方法启动线程
class MyThread extends Thread {
@Override
public void run() {
System.out.println("通过继承 Thread 创建线程");
}
}
// 使用
new MyThread().start();CompletableFuture
Future接口:
- 取消任务
- 判断任务是否被取消
- 判断任务是否已经执行完成
- 获取任务的执行结果
CompletableFuture实现了Future和CompletionStage接口
任务设定
CompletableFuture 有两种运行异步任务的方法
supplyAsync(Supplier<U> supplier)runAsync(Runnable runnable)前者接收一个具有返回值的Supplier<U>,用于异步计算并返回结果;后者接收一个Runnable,用于异步执行只产生副作用的任务
中断与超时处理
CompletableFuture 的 cancel(true) 无法打断正在运行的线程。
- 原因:它与具体执行线程解耦,且为了防止污染线程池(如 ForkJoinPool)。
- 应对阻塞任务:
- 使用
orTimeout(long, TimeUnit)(Java 9+) 设置超时。 - 若任务陷入 IO 阻塞(如 Socket),需通过关闭底层连接(Socket.close())来强制唤醒,而非依赖
interrupt()。
- 使用
并行任务
allOf:接收多个CompletableFuture任务,当所有的任务完成后返回结果
CompletableFuture<Void> task1 = CompletableFuture.runAsync(() -> {
try {
Thread.sleep(2000);
System.out.println("task1 completed");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
CompletableFuture<Void> task2 = CompletableFuture.runAsync(() -> {
try {
Thread.sleep(1000);
System.out.println("task2 completed");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
CompletableFuture<Void> combined = CompletableFuture.allOf(task1, task2);
combined.join();输出结果:
task2 completed
task1 completed因为等待所有的结果完成,因此其返回的CompletableFuture类型为Void
anyOf:接收多个CompletableFuture任务,当任意一个任务完成后将其作为结果返回
CompletableFuture<Integer> task1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
System.out.println("task1 completed");
return 1;
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
System.out.println("task2 completed");
return 2;
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
CompletableFuture<Object> combined = CompletableFuture.anyOf(task1, task2);
combined.join();
System.out.println(combined.get());输出:
task2 completed
2task2 先于 task1 完成,因此它被作为返回值赋予 combined,在combined.get()中将能够得到 task2 的返回值
实现 Runnable 接口
类实现Runnable接口,作为Thread构造方法创建新线程
class MyRunnable implements Runnable {
@Override
public void run() {
System.out.println("通过实现 Runnable 创建线程");
}
}
// 使用
new Thread(new MyRunnable()).start();实现 Callable 接口 + FutureTask
实现Callable接口与Runnable类似,但其具有一个返回值 依靠FutureTask创建一个「任务」,其继承了[[#Future]],能够在将来获取异步结果
import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;
class MyCallable implements Callable<String> {
@Override
public String call() throws Exception {
Thread.sleep(1000);
return "任务完成";
}
}
// 使用
Callable<String> callable = new MyCallable();
FutureTask<String> futureTask = new FutureTask<>(callable);
new Thread(futureTask).start();
try {
String result = futureTask.get(); // 获取返回值
System.out.println(result);
} catch (Exception e) {
e.printStackTrace();
}线程的中断与停止
Java 不推荐使用 stop() 强行终止线程(会导致数据不一致)。线程的停止是协作式的,主线程发出信号,子线程负责响应。
核心机制:interrupt()
调用 thread.interrupt() 并不会强制停止线程,而是设置中断标志位。
- 阻塞状态(Sleep/Wait/Join):
- 线程会被立即唤醒。
- 清除中断标志位。
- 抛出
InterruptedException。
- 运行状态(CPU 密集型/For 循环):
- 不会自动抛出异常,线程会继续运行。
- 必须由代码主动轮询标志位
Thread.currentThread().isInterrupted()。
通用响应模版(优雅停机)
结合了“异常捕获”与“状态检查”的模式,支持手动回滚(Rollback)。
public void run() {
try {
while (!Thread.currentThread().isInterrupted()) {
// 1. 密集计算中:主动检查
// 2. 阻塞调用中:自动响应中断异常
doWork();
}
} catch (InterruptedException e) {
// 捕获中断信号
System.out.println("收到停止指令,准备退出");
// 如果 catch 后还需要继续检查状态,建议再次调用 Thread.currentThread().interrupt() 重置标志
} finally {
// 3. 兜底逻辑:资源释放、关闭连接、回滚事务
cleanUp();
}
}
### 使用线程池
通过线程池提交任务,由线程池管理和线程的复用和创建
```java
ExecutorService executor = Executors.newFixedThreadPool(2);
// 提交 Runnable 任务
executor.submit(() -> {
System.out.println("通过线程池执行任务");
});
// 提交 Callable 任务(有返回值)
Future<String> future = executor.submit(() -> "返回结果");
try {
System.out.println(future.get());
} catch (Exception e) {
e.printStackTrace();
}
// 关闭线程池
executor.shutdown();线程池
线程池状态
线程池是一个状态机,不同状态下存在不同的行为
| 状态名称 | 定义 | 允许的操作 |
|---|---|---|
| RUNNING | 正常运行状态 | 接受新任务 执行队列任务 |
| SHUTDOWN | 平缓优雅关闭状态,完成队列中的任务后退出 | 不接受新任务 执行队列任务 |
| STOP | 强制关闭状态,不接收任务、不执行任务、中断正在执行的任务 | 不接收新任务 不执行队列任务 |
| TIDYING | 整理状态,所有任务执行完成、工作线程数为 0,准备进入终止状态 | 过渡状态 |
| TERMINATED | 终止状态,线程池关闭,所有资源已释放 | 已关闭 |
- RUNING:线程池创建后的默认状态,状态标记位为 32 位,高 3 位为状态码,低 29 位为线程数
- 能够调用
excute()和submit()提交新任务 - 工作线程会主动从任务队列中取任务执行
- 能够调用
- SHUTDOWN:平缓关闭状态,状态值为 0
- 通过调用
threadPool.shutdown()使线程池状态由RUNNING到SHUTDOWN - 先修改状态,再中断所有的空闲线程,等待所有任务完成后关闭线程池
- 通过
isTerminated判断是否所有的任务执行完成
- 通过调用
- STOP:强制关闭状态,立即关闭,不关心是否存在未完成的任务
- 通过调用
threadPool.shutdownNow()方法,使线程池由RUNNING到STOP - 先修改状态,然后中断所有的工作线程,无论是否存在正在执行或未完成的任务,并将未执行的任务列表返回、清空任务列表
- 中断工作线程指的是向线程发送中断信息,需要具体的任务执行对中断异常的捕获并处理,否则线程将会继续执行任务
- 通过调用
- TIDYING:过渡状态,表明线程池满足「所有任务处理完成 + 工作线程数为 0」,TIDYING 的作用是检查是否满足关闭条件,满足后进入终止态,因此
TIDYING被称为「过渡态」- 当满足条件后,线程池由
SHUTDOWN或STOP到TIDYING- 从
SHUTDOWN来:队列任务执行完 + 工作线程数为 0 - 从
STOP来:队列已清空 + 工作线程数为 0
- 从
tryTerminate()检查是否满足条件,若满足条件后调用terminated()钩子函数
- 当满足条件后,线程池由
- TERMINATED:终止态,表明线程池完成所有的关闭动作
如何监控线程池的状态?
使用线程池自带方法
isRunnung():是否处于 RUNNING 状态isShutdown():是否处于 SHUTDOWN/STOP/TIDYING/TERMINTED 状态isTerminating():是否处于 TIDYING 状态isTerminated():是否处于 TERMINATED 状态
使用 Executors 工具类创建线程
固定大小线程池
使用无界队列(LinkedBlockingQueue),可能导致内存溢出
ExecutorService fixedPool = Executors.newFixedThreadPool(3);可缓存线程池
线程数可无限增长,空闲线程 60s 后回收
ExecutorService cachedPool = Executors.newCachedThreadPool();手动创建线程池
线程池主要有两种
ThreadPoolExecutor:通用的线程池,适用于任务之间无关联的情况ForkJoinPool:适用于父子任务之间存在关联的情况
ThreadPoolExecutor
使用ThreadPoolExecutor构造函数创建 初始线程数为 0,若当前线程小于核心线程数或阻塞队列已满且未达到最大线程数,则当新任务到来会创建新的线程,对于超出核心线程数的空闲线程,会保持keepAliveTime后回收,corePoolSize个线程 若线程池已关闭,提交的任务将会执行拒绝策略 若线程池未关闭,任务队列已满且线程数已达最大线程数,执行拒绝策略
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, // corePoolSize:核心线程数
4, // maximumPoolSize:最大线程数
60L, // keepAliveTime:非核心线程空闲存活时间
TimeUnit.SECONDS, // 时间单位
new LinkedBlockingQueue<>(100), // 任务队列(有界)
Executors.defaultThreadFactory(), // 线程工厂
new ThreadPoolExecutor.AbortPolicy() // 拒绝策略
);拒绝策略
- AbortPolicy:直接抛出异常
- CallerRunsPolicy:由提交任务的线程执行该任务
- DiscardPolicy:丢弃该任务,不抛出异常
- DiscardOldestPolicy:丢弃队列中最老的任务,添加新任务;若线程池已关闭,则等同于
DiscardPolicy
线程数设置
IO密集型(在 IO 繁忙期间尽量提高 CPU 利用率): CPU 数 * (1 + 平均等待时间/平均计算时间) CPU 密集型: CPU 核心数 + 1
ForkJoinPool
线程数初始固定,因此适合 CPU 密集型,若对于 IO 密集型可能会导致出现 CPU 资源的「饥饿」情况
通过forkJoinPool.invoke(ForkJoinTask<T> task)调用一个 ForkJoinTask 任务,任务类主要实现了其中的exec方法,通过在执行方法中创建新的子任务并调用fork()方法,将该子任务放入当前工作线程的队列中 线程池中的空闲线程会掠夺其他工作线程中的任务队列中的任务 线程队列的拥有者从队列尾部添加和取出任务,而「掠夺」线程从队列的头部获取任务,避免干扰 
在以下的代码中,工作线程将问题规模分为两半,一半通过fork()添加至队列中,另一半由自身进入递归 其他空闲线程通过掠夺队列中的任务,进行同样的规模分半操作执行任务
public class ForkJoinSumExample {
public static void main(String[] args) {
int[] array = new int[1_000_000];
Arrays.setAll(array, i -> i + 1); // 1, 2, 3, ..., 1000000
ForkJoinPool pool = new ForkJoinPool();
SumTask task = new SumTask(array, 0, array.length);
long start = System.currentTimeMillis();
int sum = pool.invoke(task);
long end = System.currentTimeMillis();
System.out.println("Sum: " + sum); // 应为 500000500000
System.out.println("Time: " + (end - start) + " ms");
pool.shutdown();
}
}并发相关类
Future
表示异步计算的结果,对于一些耗时的任务场景可以交给子线程去执行,主线程继续执行其他任务,直至需要先前的异步结果时再等待
get():阻塞当前进程,直至获取结果,这使得所在代码块变为同步cancel(boolean mayInterruptIfRunning):尝试取消任务。true(干预模式):若任务正在运行,向执行线程发送interrupt()信号(适用于可中断任务)。false(温和模式):若任务正在运行,允许其执行完毕;仅改变 Future 状态(适用于 IO 写操作或无法回滚的操作)。- 注意:
CompletableFuture和ForkJoinTask的实现中,cancel(true)不会中断正在执行的线程(为了保护线程池复用),它们仅改变任务状态。
isCancel():任务是否被取消,返回布尔isDone:任务是否已完成
#review
