欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 健康 > 美食 > Java并发编程-线程池(三)

Java并发编程-线程池(三)

2025/5/16 22:55:44 来源:https://blog.csdn.net/gdr12/article/details/147971243  浏览:    关键词:Java并发编程-线程池(三)

文章目录

  • 线程池实现原理
    • addWorker(Runnable firstTask, boolean core)
      • 1. 状态检查:校验线程池是否允许添加线程
      • 2. 工作线程数调整:CAS保证并发安全
      • 3. 初始化变量
      • 4. 创建 `Worker` 对象并获取线程
      • 5. 加锁保证线程安全
      • 6. 启动工作线程
      • 7. 异常处理
      • 核心作用

线程池实现原理

接下来,我们进入 addWork 方法, 在创建线程前会获取全局锁:

addWorker(Runnable firstTask, boolean core)

/* Set containing all worker threads in pool. Accessed only when* holding mainLock.*/
private final HashSet<Worker> workers = new HashSet<Worker>();
private boolean addWorker(Runnable firstTask, boolean core) {retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);if (rs >= SHUTDOWN && ! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))/ 若状态为STOP/TIDYING/TERMINATED(rs >= SHUTDOWN),直接拒绝新增线程。例外情况:当状态为SHUTDOWN且满足以下条件时允许添加 非核心线程 处理残留任务:1. firstTask == null(线程不携带新任务)2. workQueue非空(队列中仍有待处理任务)*/return false;for (;;) {int wc = workerCountOf(c); // 工作线程数(低29位)if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))// 容量校验return false;if (compareAndIncrementWorkerCount(c))// CAS递增workerCountbreak retry; // 成功则退出整个retry循环c = ctl.get();  // 重新读取当前ctlif (runStateOf(c) != rs)continue retry; // 状态变化则重试外层循环// 若仅workerCount变化,继续内层循环重试CAS}}boolean workerStarted = false; //标记工作线程是否成功启动boolean workerAdded = false;//标记线程是否被成功添加到线程池Worker w = null;try {final ReentrantLock mainLock = this.mainLock;w = new Worker(firstTask);//通过线程工厂创建Worker对象(后面讲)final Thread t = w.thread;//获取其绑定的线程if (t != null) {mainLock.lock(); // 获取全局锁try {//重新检查线程池状态int c = ctl.get();int rs = runStateOf(c);if (rs < SHUTDOWN || //线程池处于RUNNING状态时,允许添加新线程(rs == SHUTDOWN && firstTask == null)) {//SHUTDOWN状态下仅允许添加无初始任务的线程(处理队列中的剩余任务)if (t.isAlive()) // 防止重复启动throw new IllegalThreadStateException();workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {//若成功添加Worker,则启动其线程(真正开始消费任务)t.start();workerStarted = true;}}} finally {if (! workerStarted)//失败回滚:若未成功启动(如线程池已关闭),调用addWorkerFailed回滚资源addWorkerFailed(w);}return workerStarted;
}
private void addWorkerFailed(Worker w) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {if (w != null)workers.remove(w); //移除WorkerdecrementWorkerCount();//减少计数tryTerminate(); //尝试终止线程池} finally {mainLock.unlock();}
}

1. 状态检查:校验线程池是否允许添加线程

int c = ctl.get();
int rs = runStateOf(c); // 运行状态(高3位)
if (rs >= SHUTDOWN &&!(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))return false;
  • 关键条件:

    • 若状态为STOP/TIDYING/TERMINATEDrs >= SHUTDOWN),**直接拒绝新增线程**。

    • 例外情况:当状态为SHUTDOWN且满足以下条件时允许添加**非核心线程**处理残留任务:

      • firstTask == null(线程不携带新任务)

      • workQueue非空(队列中仍有待处理任务),确保线程关闭时仍能处理队列残留任务

2. 工作线程数调整:CAS保证并发安全

for (;;) {int wc = workerCountOf(c); // 工作线程数(低29位)// 容量校验if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))return false;// CAS递增workerCountif (compareAndIncrementWorkerCount(c))break retry; // 成功则退出整个retry循环// CAS失败后处理c = ctl.get(); // 重新读取当前ctlif (runStateOf(c) != rs)continue retry; // 状态变化则重试外层循环// 若仅workerCount变化,继续内层循环重试CAS
}
  • 双重校验逻辑:

    • 容量控制:通过参数core决定上限为核心/最大线程数,避免资源溢出。

    • CAS操作:通过compareAndIncrementWorkerCount原子性增加线程数,防止多线程竞争导致计数错误。

    • 失败处理:若检测到状态变化(如线程池关闭),重新进行外层状态检查。若仅线程数变化,则仅重试内层循环。

    • 双重循环:外层处理状态变化,内层处理线程数变更,分离关注点,提升并发效率。

3. 初始化变量

初始化 workerStartedworkerAddedfalse,表示工作线程未启动且未成功添加到工作线程集合。Worker 对象 w 的初始值为 null

4. 创建 Worker 对象并获取线程

w = new Worker(firstTask);
final Thread t = w.thread;

通过线程工厂创建 Worker 对象,并获取其绑定的线程 t。若线程工厂创建失败(如返回 null),后续逻辑直接跳转至第7步异常处理。

5. 加锁保证线程安全

获取 mainLock(全局锁),确保对线程池状态的检查和修改、workers 集合的操作是原子的:

6. 启动工作线程

if (workerAdded) {t.start();  // 启动线程workerStarted = true;
}

如果 Worker 成功添加到集合,则启动其绑定的线程。

7. 异常处理

finally {if (!workerStarted)addWorkerFailed(w); // 回滚失败的 Worker 操作
}
  • 若线程未启动(如锁获取后线程池已关闭),调用 addWorkerFailed 移除 Worker 并更新线程池状态。

核心作用

这段代码是线程池 ThreadPoolExecutor中添加工作线程的核心逻辑,它会创建Worker,Worker内置了一个线程(由线程工厂创建),这个线程会在Worker创建后启动,专门用于执行Worker的run()方法。另外还实现了:

  1. 线程安全的 Worker管理:通过 ReentrantLock 保证对共享变量(如 workers 集合)的原子操作。

  2. 状态双重检查:防止在加锁期间线程池状态发生变化。

  1. 异常回滚机制:确保部分失败的操作能被正确处理,保证线程池的稳定性。

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词