❶ java线程池怎么实现的
多线程技术主要解决处理器单元内多个线程执行的问题,它可以显著减少处理器单元的闲置时间,增加处理器单元的吞吐能力。
假设一个服务器完成一项任务所需时间为:T1 创建线程时间,T2 在线程中执行任务的时间,T3 销毁线程时间。
如果:T1 + T3 远大于 T2,则可以采用线程池,以提高服务器性能。
一个线程池包括以下四个基本组成部分:
1、线程池管理器(ThreadPool):用于创建并管理线程池,包括 创建线程池,销毁线程池,添加新任务;
2、工作线程(PoolWorker):线程池中线程,在没有任务时处于等待状态,可以循环的执行任务;
3、任务接口(Task):每个任务必须实现的接口,以供工作线程调度任务的执行,它主要规定了任务的入口,任务执行完后的收尾工作,任务的执行状态等;
4、任务队列(taskQueue):用于存放没有处理的任务。提供一种缓冲机制。
线程池技术正是关注如何缩短或调整T1,T3时间的技术,从而提高服务器程序性能的。它把T1,T3分别安排在服务器程序的启动和结束的时间段或者一些空闲的时间段,这样在服务器程序处理客户请求时,不会有T1,T3的开销了。
线程池不仅调整T1,T3产生的时间段,而且它还显著减少了创建线程的数目,看一个例子:
假设一个服务器一天要处理50000个请求,并且每个请求需要一个单独的线程完成。在线程池中,线程数一般是固定的,所以产生线程总数不会超过线程池中线程的数目,而如果服务器不利用线程池来处理这些请求则线程总数为50000。一般线程池大小是远小于50000。所以利用线程池的服务器程序不会为了创建50000而在处理请求时浪费时间,从而提高效率。
代码实现中并没有实现任务接口,而是把Runnable对象加入到线程池管理器(ThreadPool),然后剩下的事情就由线程池管理器(ThreadPool)来完成了
packagemine.util.thread;
importjava.util.LinkedList;
importjava.util.List;
/**
*线程池类,线程管理器:创建线程,执行任务,销毁线程,获取线程基本信息
*/
publicfinalclassThreadPool{
//线程池中默认线程的个数为5
privatestaticintworker_num=5;
//工作线程
privateWorkThread[]workThrads;
//未处理的任务
_task=0;
//任务队列,作为一个缓冲,List线程不安全
privateList<Runnable>taskQueue=newLinkedList<Runnable>();
;
//创建具有默认线程个数的线程池
privateThreadPool(){
this(5);
}
//创建线程池,worker_num为线程池中工作线程的个数
privateThreadPool(intworker_num){
ThreadPool.worker_num=worker_num;
workThrads=newWorkThread[worker_num];
for(inti=0;i<worker_num;i++){
workThrads[i]=newWorkThread();
workThrads[i].start();//开启线程池中的线程
}
}
//单态模式,获得一个默认线程个数的线程池
(){
returngetThreadPool(ThreadPool.worker_num);
}
//单态模式,获得一个指定线程个数的线程池,worker_num(>0)为线程池中工作线程的个数
//worker_num<=0创建默认的工作线程个数
(intworker_num1){
if(worker_num1<=0)
worker_num1=ThreadPool.worker_num;
if(threadPool==null)
threadPool=newThreadPool(worker_num1);
returnthreadPool;
}
//执行任务,其实只是把任务加入任务队列,什么时候执行有线程池管理器觉定
publicvoidexecute(Runnabletask){
synchronized(taskQueue){
taskQueue.add(task);
taskQueue.notify();
}
}
//批量执行任务,其实只是把任务加入任务队列,什么时候执行有线程池管理器觉定
publicvoidexecute(Runnable[]task){
synchronized(taskQueue){
for(Runnablet:task)
taskQueue.add(t);
taskQueue.notify();
}
}
//批量执行任务,其实只是把任务加入任务队列,什么时候执行有线程池管理器觉定
publicvoidexecute(List<Runnable>task){
synchronized(taskQueue){
for(Runnablet:task)
taskQueue.add(t);
taskQueue.notify();
}
}
//销毁线程池,该方法保证在所有任务都完成的情况下才销毁所有线程,否则等待任务完成才销毁
publicvoiddestroy(){
while(!taskQueue.isEmpty()){//如果还有任务没执行完成,就先睡会吧
try{
Thread.sleep(10);
}catch(InterruptedExceptione){
e.printStackTrace();
}
}
//工作线程停止工作,且置为null
for(inti=0;i<worker_num;i++){
workThrads[i].stopWorker();
workThrads[i]=null;
}
threadPool=null;
taskQueue.clear();//清空任务队列
}
//返回工作线程的个数
publicintgetWorkThreadNumber(){
returnworker_num;
}
//返回已完成任务的个数,这里的已完成是只出了任务队列的任务个数,可能该任务并没有实际执行完成
(){
returnfinished_task;
}
//返回任务队列的长度,即还没处理的任务个数
publicintgetWaitTasknumber(){
returntaskQueue.size();
}
//覆盖toString方法,返回线程池信息:工作线程个数和已完成任务个数
@Override
publicStringtoString(){
return"WorkThreadnumber:"+worker_num+"finishedtasknumber:"
+finished_task+"waittasknumber:"+getWaitTasknumber();
}
/**
*内部类,工作线程
*/
{
//该工作线程是否有效,用于结束该工作线程
privatebooleanisRunning=true;
/*
*关键所在啊,如果任务队列不空,则取出任务执行,若任务队列空,则等待
*/
@Override
publicvoidrun(){
Runnabler=null;
while(isRunning){//注意,若线程无效则自然结束run方法,该线程就没用了
synchronized(taskQueue){
while(isRunning&&taskQueue.isEmpty()){//队列为空
try{
taskQueue.wait(20);
}catch(InterruptedExceptione){
e.printStackTrace();
}
}
if(!taskQueue.isEmpty())
r=taskQueue.remove(0);//取出任务
}
if(r!=null){
r.run();//执行任务
}
finished_task++;
r=null;
}
}
//停止工作,让该线程自然执行完run方法,自然结束
publicvoidstopWorker(){
isRunning=false;
}
}
}
❷ 由浅入深理解Java线程池及线程池的如何使用
重要的特征也就是最大程度利用线程.
首先,创建线程本身需要额外(相对于执行任务而必须的资源)的开销.
作业系统在每创建一个线程时,至少需要创建以下资源:
(1) 线程内核对象:用于对线程上下文的管理.
(2) 用户模式执行栈.
(3) 内核模式执行栈.
这些资源被线程占有后作业系统和用户都无法使用.
相反的过程,销毁线程需要回收资源,也需要一定开销.
其次,过多的线程将导致过度的切换.线程切换带来的性能更是不可估量.系统完成线程切换要经过以下过程:
(1) 从用户模式切换到内核模式.
(2) 将CPU寄存器的值保存到当前线程的内核对象中.
(3)打开一个自旋锁,根据调度策略决定下一个要执行的线程.释放自旋锁,如果要执行的线程不是同一进
程中的线程,还需要切换虚拟内存等进程环境.
(4) 将要执行的线程的内核对象的值写到CPU寄存器中.
(5) 切换到用户模式执行新线程的执行逻辑.
所以线程池的目的就是为了减少创建和切换线程的额外开销,利用已经的线程多次循环执行多个任务从而提
高系统的处理能力.
❸ java线程池(一) 简述线程池的几种使用方式
首先说明下java线程是如何实现线程重用的
1. 线程执行完一个Runnable的run()方法后,不会被杀死
2. 当线程被重用时,这个线程会进入新Runnable对象的run()方法12
java线程池由Executors提供的几种静态方法创建线程池。下面通过代码片段简单介绍下线程池的几种实现方式。后续会针对每个实现方式做详细的说明
newFixedThreadPool
创建一个固定大小的线程池
添加的任务达到线程池的容量之后开始加入任务队列开始线程重用总共开启线程个数跟指定容量相同。
@Test
public void newFixedThreadPool() throws Exception {
ExecutorService executorService = Executors.newFixedThreadPool(1);
executorService = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder().build());
RunThread run1 = new RunThread("run 1");
executorService.execute(run1);
executorService.shutdown();
}12345678
newSingleThreadExecutor
仅支持单线程顺序处理任务
@Test
public void newSingleThreadExecutor() throws Exception {
ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().build());
executorService.execute(new RunThread("run 1"));
executorService.execute(new RunThread("run 2"));
executorService.shutdown();
}123456789
newCachedThreadPool
这种情况跟第一种的方式类似,不同的是这种情况线程池容量上线是Integer.MAX_VALUE 并且线程池开启缓存60s
@Test
public void newCachedThreadPool() throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
executorService = Executors.newCachedThreadPool(new ThreadFactoryBuilder().build());
executorService.execute(new RunThread("run 1"));
executorService.execute(new RunThread("run 2"));
executorService.shutdown();
}123456789
newWorkStealingPool
支持给定的并行级别,并且可以使用多个队列来减少争用。
@Test
public void newWorkStealingPool() throws Exception {
ExecutorService executorService = Executors.newWorkStealingPool();
executorService = Executors.newWorkStealingPool(1);
RunThread run1 = new RunThread("run 1");
executorService.execute(run1);
executorService.shutdown();
}123456789
newScheledThreadPool
看到的现象和第一种相同,也是在线程池满之前是新建线程,然后开始进入任务队列,进行线程重用
支持定时周期执行任务(还没有看完)
@Test
public void newScheledThreadPool() throws Exception {
ExecutorService executorService = Executors.newScheledThreadPool(1);
executorService = Executors.newScheledThreadPool(1, new ThreadFactoryBuilder().build());
executorService.execute(new RunThread("run 1"));
executorService.execute(new RunThread("run 2"));
executorService.shutdown();
}
❹ 超详细的线程池使用解析
Java 中线程池是运用场景最多的并发框架,几乎所有需要异步或并发执行任务的程序都可以使用线程池。合理的使用线程池可以带来多个好处:
(1) 降低资源消耗 。通过重复利用已创建的线程降低线程在创建和销毁时造成的消耗。
(2) 提高响应速度 。当处理执行任务时,任务可以不需要等待线程的创建就能立刻执行。
(3) 提高线程的可管理性 。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。
线程池的处理流程如上图所示
线程池中通过 ctl 字段来表示线程池中的当前状态,主池控制状态 ctl 是 AtomicInteger 类型,包装了两个概念字段:workerCount 和 runState,workerCount 表示有效线程数,runState 表示是否正在运行、正在关闭等状态。使用 ctl 字段表示两个概念,ctl 的前 3 位表示线程池状态,线程池中限制 workerCount 为(2^29 )-1(约 5 亿)个线程,而不是 (2^31)-1(20 亿)个线程。workerCount 是允许启动和不允许停止的工作程序的数量。该值可能与实际的活动线程数暂时不同,例如,当 ThreadFactory 在被询问时未能创建线程时,以及退出线程在终止前仍在执行记时。用户可见的池大小报告为工作集的当前大小。 runState 提供主要的生命周期控制,取值如下表所示:
runState 随着时间的推移而改变,在 awaitTermination() 方法中等待的线程将在状态达到 TERMINATED 时返回。状态的转换为:
RUNNING -> SHUTDOWN 在调用 shutdown() 时,可能隐含在 finalize() 中
(RUNNING 或 SHUTDOWN)-> STOP 在调用 shutdownNow() 时
SHUTDOWN -> TIDYING 当队列和线程池都为空时
STOP -> TIDYING 当线程池为空时
TIDYING -> TERMINATED 当 terminate() 方法完成时
开发人员如果需要在线程池变为 TIDYING 状态时进行相应的处理,可以通过重载 terminated() 函数来实现。
结合上图说明线程池 ThreadPoolExecutor 执行流程,使用 execute() 方法提交任务到线程池中执行时分为4种场景:
(1)线程池中运行的线程数量小于 corePoolSize,创建新线程来执行任务。
(2)线程池中运行线程数量不小于 corePoolSize,将任务加入到阻塞队列 BlockingQueue。
(3)如果无法将任务加入到阻塞队列(队列已满),创建新的线程来处理任务(这里需要获取全局锁)。
(4)当创建新的线程数量使线程池中当前运行线程数量超过 maximumPoolSize,线程池中拒绝任务,调用 RejectedExecutionHandler.rejectedExecution() 方法处理。
源码分析:
线程池创建线程时,会将线程封装成工作线程 Worker,Worker 在执行完任务后,还会循环获取工作队列里的任务来执行。
创建线程池之前,首先要知道创建线程池中的核心参数:
corePoolSize (核心线程数大小):当提交任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,直到需要执行的任务数大于核心线程数时就不再创建。
runnableTaskQueue (任务队列):用于保存等待执行任务的阻塞队列。一般选择以下几种:
ArrayBlockingQueue:基于数组的有界阻塞队列,按照 FIFO 原则对元素进行排序。
LinkedBlockingQueue:基于链表的阻塞队列,按照 FIFO 原则对元素进行排序。
SynchronousQueue:同步阻塞队列,也是不存储元素的阻塞队列。每一个插入操作必须要等到另一个 线程调用移除操作,否则插入操作一直处于阻塞状态。
PriorityBlockingQueue:优先阻塞队列,一个具有优先级的无限阻塞队列。
maximumPoolSize (最大线程数大小):线程池允许创建的最大线程数,当队列已满,并且线程池中的线程数小于最大线程数,则线程池会创建新的线程执行任务。当使用无界队列时,此参数无用。
RejectedExecutionHandler (拒绝策略):当任务队列和线程池都满了,说明线程池处于饱和状态,那么必须使用拒绝策略来处理新提交的任务。JDK 内置拒绝策略有以下 4 种:
AbortPolicy:直接抛出异常
CallerRunsPolicy:使用调用者所在的线程来执行任务
DiscardOldestPolicy:丢弃队列中最近的一个任务来执行当前任务
DiscardPolicy:直接丢弃不处理
可以根据应用场景来实现 RejectedExecutionHandler 接口自定义处理策略。
keepAliveTime (线程存活时间):线程池的工作线程空闲后,保持存活的时间。
TimeUnit (存活时间单位):可选单位DAYS(天)、HOURS(小时)、MINUTES(分钟)、MILLISECONDS(毫秒)、MICROSECONDS(微妙)、NANOSECONDS(纳秒)。
ThreadFactory (线程工厂):可以通过线程工厂给创建出来的线程设置有意义的名字。
创建线程池主要分为两大类,第一种是通过 Executors 工厂类创建线程池,第二种是自定义创建线程池。根据《阿里java开发手册》中的规范,线程池不允许使用 Executors 去创建,原因是规避资源耗尽的风险。
创建一个单线程化的线程池
创建固定线程数的线程池
以上两种创建线程池方式使用链表阻塞队列来存放任务,实际场景中可能会堆积大量请求导致 OOM
创建可缓存线程池
允许创建的线程数量最大为 Integer.MAX_VALUE,当创建大量线程时会导致 CPU 处于重负载状态和 OOM 的发生
向线程池提交任务可以使用两个方法,分别为 execute() 和 submit()。
execute() 方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功。execute() 方法中传入的是 Runnable 类的实例。
submit() 方法用于提交需要返回值的任务。线程池会返回一个 Future 类型的对象,通过 future 对象可以判断任务是否执行成功,并且可以通过 future 的 get() 方法来获取返回值。get() 方法会阻塞当前线程直到任务完成,使用 get(long timeout, TimeUnit unit)方法会阻塞当前线程一段时间后立即返回,这时候可能任务没有执行完。
可以通过调用线程池的 shutdown() 或shutdownNow() 方法来关闭线程池。他们的原理是遍历线程池中的工作线程,然后逐个调用 interrupt() 方法来中断线程,所以无法响应中断任务可能永远无法终止。
shutdown() 和 shutdownNow() 方法的区别在于 shutdownNow 方法首先将线程池的状态设置为 STOP,然后尝试停止正在执行或暂停任务的线程,并返回等待执行任务的列表,而 shutdown 只是将线程池的状态设置成 SHUTDOWN 状态,然后中断所有没有正在执行任务的线程。
线程池使用面临的核心的问题在于: 线程池的参数并不好配置 。一方面线程池的运行机制不是很好理解,配置合理需要强依赖开发人员的个人经验和知识;另一方面,线程池执行的情况和任务类型相关性较大,IO 密集型和 CPU 密集型的任务运行起来的情况差异非常大,这导致业界并没有一些成熟的经验策略帮助开发人员参考。
(1)以任务型为参考的简单评估:
假设线程池大小的设置(N 为 CPU 的个数)
如果纯计算的任务,多线程并不能带来性能提升,因为 CPU 处理能力是稀缺的资源,相反导致较多的线程切换的花销,此时建议线程数为 CPU 数量或+1;----为什么+1?因为可以防止 N 个线程中有一个线程意外中断或者退出,CPU 不会空闲等待。
如果是 IO 密集型应用, 则线程池大小设置为 2N+1. 线程数 = CPU 核数 目标 CPU 利用率 (1 + 平均等待时间 / 平均工作时间)
(2)以任务数为参考的理想状态评估:
1)默认值
2)如何设置 * 需要根据相关值来决定 - tasks :每秒的任务数,假设为500~1000 - taskCost:每个任务花费时间,假设为0.1s - responsetime:系统允许容忍的最大响应时间,假设为1s
以上都为理想值,实际情况下要根据机器性能来决定。如果在未达到最大线程数的情况机器 cpu load 已经满了,则需要通过升级硬件和优化代码,降低 taskCost 来处理。
(仅为简单的理想状态的评估,可作为线程池参数设置的一个参考)
与主业务无直接数据依赖的从业务可以使用异步线程池来处理,在项目初始化时创建线程池并交给将从业务中的任务提交给异步线程池执行能够缩短响应时间。
严禁在业务代码中起线程!!!
当任务需要按照指定顺序(FIFO, LIFO, 优先级)执行时,推荐创建使用单线程化的线程池。
本文章主要说明了线程池的执行原理和创建方式以及推荐线程池参数设置和一般使用场景。在开发中,开发人员需要根据业务来合理的创建和使用线程池达到降低资源消耗,提高响应速度的目的。
原文链接:https://juejin.cn/post/7067324722811240479
❺ java 线程池 工作队列是如何工作的
使用线程池的好处
1、降低资源消耗
可以重复利用已创建的线程降低线程创建和销毁造成的消耗。
2、提高响应速度
当任务到达时,任务可以不需要等到线程创建就能立即执行。
3、提高线程的可管理性
线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控
线程池的工作原理
首先我们看下当一个新的任务提交到线程池之后,线程池是如何处理的
1、线程池判断核心线程池里的线程是否都在执行任务。如果不是,则创建一个新的工作线程来执行任务。如果核心线程池里的线程都在执行任务,则执行第二步。
2、线程池判断工作队列是否已经满。如果工作队列没有满,则将新提交的任务存储在这个工作队列里进行等待。如果工作队列满了,则执行第三步
3、线程池判断线程池的线程是否都处于工作状态。如果没有,则创建一个新的工作线程来执行任务。如果已经满了,则交给饱和策略来处理这个任务
线程池饱和策略
这里提到了线程池的饱和策略,那我们就简单介绍下有哪些饱和策略:
AbortPolicy
为Java线程池默认的阻塞策略,不执行此任务,而且直接抛出一个运行时异常,切记ThreadPoolExecutor.execute需要try catch,否则程序会直接退出。
DiscardPolicy
直接抛弃,任务不执行,空方法
DiscardOldestPolicy
从队列里面抛弃head的一个任务,并再次execute 此task。
CallerRunsPolicy
在调用execute的线程里面执行此command,会阻塞入口
用户自定义拒绝策略(最常用)
实现RejectedExecutionHandler,并自己定义策略模式
下我们以ThreadPoolExecutor为例展示下线程池的工作流程图
3.jpg
关键方法源码分析
我们看看核心方法添加到线程池方法execute的源码如下:
// //Executes the given task sometime in the future. The task //may execute in a new thread or in an existing pooled thread. // // If the task cannot be submitted for execution, either because this // executor has been shutdown or because its capacity has been reached, // the task is handled by the current {@code RejectedExecutionHandler}. // // @param command the task to execute // @throws RejectedExecutionException at discretion of // {@code RejectedExecutionHandler}, if the task // cannot be accepted for execution // @throws NullPointerException if {@code command} is null // public void execute(Runnable command) { if (command == null) throw new NullPointerException(); // // Proceed in 3 steps: // // 1. If fewer than corePoolSize threads are running, try to // start a new thread with the given command as its first // task. The call to addWorker atomically checks runState and // workerCount, and so prevents false alarms that would add // threads when it shouldn't, by returning false. // 翻译如下: // 判断当前的线程数是否小于corePoolSize如果是,使用入参任务通过addWord方法创建一个新的线程, // 如果能完成新线程创建exexute方法结束,成功提交任务 // 2. If a task can be successfully queued, then we still need // to double-check whether we should have added a thread // (because existing ones died since last checking) or that // the pool shut down since entry into this method. So we // recheck state and if necessary roll back the enqueuing if // stopped, or start a new thread if there are none. // 翻译如下: // 在第一步没有完成任务提交;状态为运行并且能否成功加入任务到工作队列后,再进行一次check,如果状态 // 在任务加入队列后变为了非运行(有可能是在执行到这里线程池shutdown了),非运行状态下当然是需要 // reject;然后再判断当前线程数是否为0(有可能这个时候线程数变为了0),如是,新增一个线程; // 3. If we cannot queue task, then we try to add a new // thread. If it fails, we know we are shut down or saturated // and so reject the task. // 翻译如下: // 如果不能加入任务到工作队列,将尝试使用任务新增一个线程,如果失败,则是线程池已经shutdown或者线程池 // 已经达到饱和状态,所以reject这个他任务 // int c = ctl.get(); // 工作线程数小于核心线程数 if (workerCountOf(c) < corePoolSize) { // 直接启动新线程,true表示会再次检查workerCount是否小于corePoolSize if (addWorker(command, true)) return; c = ctl.get(); } // 如果工作线程数大于等于核心线程数 // 线程的的状态未RUNNING并且队列notfull if (isRunning(c) && workQueue.offer(command)) { // 再次检查线程的运行状态,如果不是RUNNING直接从队列中移除 int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) // 移除成功,拒绝该非运行的任务 reject(command); else if (workerCountOf(recheck) == 0) // 防止了SHUTDOWN状态下没有活动线程了,但是队列里还有任务没执行这种特殊情况。 // 添加一个null任务是因为SHUTDOWN状态下,线程池不再接受新任务 addWorker(null, false); } // 如果队列满了或者是非运行的任务都拒绝执行 else if (!addWorker(command, false)) reject(command); }
❻ java 线程池是怎么处理执行线程的
java中线程池的监控可以检测到正在执行的线程数。
通过线程池提供的参数回进行监控。线程池里有一些答属性在监控线程池的时候可以使用
taskCount:线程池需要执行的任务数量。
completedTaskCount:线程池在运行过程中已完成的任务数量。小于或等于taskCount。
largestPoolSize:线程池曾经创建过的最大线程数量。通过这个数据可以知道线程池是否满过。如等于线程池的最大大小,则表示线程池曾经满了。
getPoolSize:线程池的线程数量。如果线程池不销毁的话,池里的线程不会自动销毁,所以这个大小只增不+ getActiveCount:获取活动的线程数。
通过扩展线程池进行监控。通过继承线程池并重写线程池的beforeExecute,afterExecute和terminated方法,我们可以在任务执行前,执行后和线程池关闭前干一些事情。如监控任务的平均执行时间,最大执行时间和最小执行时间等。这几个方法在线程池里是空方法。
❼ java线程组,线程池,线程队列分别是什么有什么区别
你好,我可以给你详细解释一下:
线程组表示一个线程的集合。此外,线程组也可以包含其他线程组。线程组构成一棵树,在树中,除了初始线程组外,每个线程组都有一个父线程组。
允许线程访问有关自己的线程组的信息,但是不允许它访问有关其线程组的父线程组或其他任何线程组的信息。
线程池:我们可以把并发执行的任务传递给一个线程池,来替代为每个并发执行的任务都启动一个新的线程。只要池里有空闲的线程,任务就会分配给一个线程执行。在线程池的内部,任务被插入一个阻塞队列(Blocking Queue ),线程池里的线程会去取这个队列里的任务。当一个新任务插入队列时,一个空闲线程就会成功的从队列中取出任务并且执行它。
线程池经常应用在多线程服务器上。每个通过网络到达服务器的连接都被包装成一个任务并且传递给线程池。线程池的线程会并发的处理连接上的请求。以后会再深入有关 Java 实现多线程服务器的细节。
线程队列:是指线程处于拥塞的时候形成的调度队列
排队有三种通用策略:
直接提交。工作队列的默认选项是 SynchronousQueue,它将任务直接提交给线程而不保持它们。在此,如果不存在可用于立即运行任务的线程,则试图把任务加入队列将失败,因此会构造一个新的线程。此策略可以避免在处理可能具有内部依赖性的请求集时出现锁。直接提交通常要求无界 maximumPoolSizes 以避免拒绝新提交的任务。当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。
无界队列。使用无界队列(例如,不具有预定义容量的 LinkedBlockingQueue)将导致在所有corePoolSize 线程都忙时新任务在队列中等待。这样,创建的线程就不会超过 corePoolSize。(因此,maximumPoolSize的值也就无效了。)当每个任务完全独立于其他任务,即任务执行互不影响时,适合于使用无界队列;例如,在 Web页服务器中。这种排队可用于处理瞬态突发请求,当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。
有界队列。当使用有限的 maximumPoolSizes时,有界队列(如 ArrayBlockingQueue)有助于防止资源耗尽,但是可能较难调整和控制。队列大小和最大池大小可能需要相互折衷:使用大型队列和小型池可以最大限度地降低 CPU 使用率、操作系统资源和上下文切换开销,但是可能导致人工降低吞吐量。如果任务频繁阻塞(例如,如果它们是 I/O边界),则系统可能为超过您许可的更多线程安排时间。使用小型队列通常要求较大的池大小,CPU使用率较高,但是可能遇到不可接受的调度开销,这样也会降低吞吐量。
❽ 合理使用线程池以及线程变量
背景
随着计算技术的不断发展,3纳米制程芯片已进入试产阶段,摩尔定律在现有工艺下逐渐面临巨大的物理瓶颈,通过多核处理器技术来提升服务器的性能成为提升算力的主要方向。
在服务器领域,基于java构建的后端服务器占据着领先地位,因此,掌握java并发编程技术,充分利用CPU的并发处理能力是一个开发人员必修的基本功,本文结合线程池源码和实践,简要介绍了线程池和线程变量的使用。
线程池概述
线程池是一种“池化”的线程使用模式,通过创建一定数量的线程,让这些线程处于就绪状态来提高系统响应速度,在线程使用完成后归还到线程池来达到重复利用的目标,从而降低系统资源的消耗。
总体来说,线程池有如下的优势:
线程池的使用
在java中,线程池的实现类是ThreadPoolExecutor,构造函数如下:
可以通过 new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory,handler)来创建一个线程池。
在构造函数中,corePoolSize为线程池核心线程数。默认情况下,核心线程会一直存活,但是当将allowCoreThreadTimeout设置为true时,核心线程超时也会回收。
在构造函数中,maximumPoolSize为线程池所能容纳的最大线程数。
在构造函数中,keepAliveTime表示线程闲置超时时长。如果线程闲置时间超过该时长,非核心线程就会被回收。如果将allowCoreThreadTimeout设置为true时,核心线程也会超时回收。
在构造函数中,timeUnit表示线程闲置超时时长的时间单位。常用的有:TimeUnit.MILLISECONDS(毫秒)、TimeUnit.SECONDS(秒)、TimeUnit.MINUTES(分)。
在构造函数中,blockingQueue表示任务队列,线程池任务队列的常用实现类有:
在构造函数中,threadFactory表示线程工厂。用于指定为线程池创建新线程的方式,threadFactory可以设置线程名称、线程组、优先级等参数。如通过Google工具包可以设置线程池里的线程名:
在构造函数中,rejectedExecutionHandler表示拒绝策略。当达到最大线程数且队列任务已满时需要执行的拒绝策略,常见的拒绝策略如下:
ThreadPoolExecutor线程池有如下几种状态:
线程池提交一个任务时任务调度的主要步骤如下:
核心代码如下:
Tomcat 的整体架构包含连接器和容器两大部分,其中连接器负责与外部通信,容器负责内部逻辑处理。在连接器中:
Tomcat为了实现请求的快速响应,使用线程池来提高请求的处理能力。下面我们以HTTP非阻塞I/O为例对Tomcat线程池进行简要的分析。
在Tomcat中,通过AbstractEndpoint类提供底层的网络I/O的处理,若用户没有配置自定义公共线程池,则AbstractEndpoint通过createExecutor方法来创建Tomcat默认线程池。
核心部分代码如下:
其中,TaskQueue、ThreadPoolExecutor分别为Tomcat自定义任务队列、线程池实现。
Tomcat自定义线程池继承于java.util.concurrent.ThreadPoolExecutor,并新增了一些成员变量来更高效地统计已经提交但尚未完成的任务数量(submittedCount),包括已经在队列中的任务和已经交给工作线程但还未开始执行的任务。
Tomcat在自定义线程池ThreadPoolExecutor中重写了execute()方法,并实现对提交执行的任务进行submittedCount加一。Tomcat在自定义ThreadPoolExecutor中,当线程池抛出RejectedExecutionException异常后,会调用force()方法再次向TaskQueue中进行添加任务的尝试。如果添加失败,则submittedCount减一后,再抛出RejectedExecutionException。
在Tomcat中重新定义了一个阻塞队列TaskQueue,它继承于LinkedBlockingQueue。在Tomcat中,核心线程数默认值为10,最大线程数默认为200, 为了避免线程到达核心线程数后后续任务放入队列等待,Tomcat通过自定义任务队列TaskQueue重写offer方法实现了核心线程池数达到配置数后线程的创建。
具体地,从线程池任务调度机制实现可知,当offer方法返回false时,线程池将尝试创建新新线程,从而实现任务的快速响应。TaskQueue核心实现代码如下:
Tomcat中通过自定义任务线程TaskThread实现对每个线程创建时间的记录;使用静态内部类WrappingRunnable对Runnable进行包装,用于对StopPooledThreadException异常类型的处理。
Executors常用方法有以下几个:
Executors类看起来功能比较强大、用起来还比较方便,但存在如下弊端 :
使用线程时,可以直接调用 ThreadPoolExecutor 的构造函数来创建线程池,并根据业务实际场景来设置corePoolSize、blockingQueue、RejectedExecuteHandler等参数。
使用局部线程池时,若任务执行完后没有执行shutdown()方法或有其他不当引用,极易造成系统资源耗尽。
在工程实践中,通常使用下述公式来计算核心线程数:
nThreads=(w+c)/c*n*u=(w/c+1)*n*u
其中,w为等待时间,c为计算时间,n为CPU核心数(通常可通过 Runtime.getRuntime().availableProcessors()方法获取),u为CPU目标利用率(取值区间为[0, 1]);在最大化CPU利用率的情况下,当处理的任务为计算密集型任务时,即等待时间w为0,此时核心线程数等于CPU核心数。
上述计算公式是理想情况下的建议核心线程数,而不同系统/应用在运行不同的任务时可能会有一定的差异,因此最佳线程数参数还需要根据任务的实际运行情况和压测表现进行微调。
为了更好地发现、分析和解决问题,建议在使用多线程时增加对异常的处理,异常处理通常有下述方案:
为了实现优雅停机的目标,我们应当先调用shutdown方法,调用这个方法也就意味着,这个线程池不会再接收任何新的任务,但是已经提交的任务还会继续执行。之后我们还应当调用awaitTermination方法,这个方法可以设定线程池在关闭之前的最大超时时间,如果在超时时间结束之前线程池能够正常关闭则会返回true,否则,超时会返回false。通常我们需要根据业务场景预估一个合理的超时时间,然后调用该方法。
如果awaitTermination方法返回false,但又希望尽可能在线程池关闭之后再做其他资源回收工作,可以考虑再调用一下shutdownNow方法,此时队列中所有尚未被处理的任务都会被丢弃,同时会设置线程池中每个线程的中断标志位。shutdownNow并不保证一定可以让正在运行的线程停止工作,除非提交给线程的任务能够正确响应中断。
ThreadLocal线程变量概述
ThreadLocal类提供了线程本地变量(thread-local variables),这些变量不同于普通的变量,访问线程本地变量的每个线程(通过其get或set方法)都有其自己的独立初始化的变量副本,因此ThreadLocal没有多线程竞争的问题,不需要单独进行加锁。
ThreadLocal的原理与实践
对于ThreadLocal而言,常用的方法有get/set/initialValue 3个方法。
众所周知,在java中SimpleDateFormat有线程安全问题,为了安全地使用SimpleDateFormat,除了1)创建SimpleDateFormat局部变量;和2)加同步锁 两种方案外,我们还可以使用3)ThreadLocal的方案:
Thread 内部维护了一个 ThreadLocal.ThreadLocalMap 实例(threadLocals),ThreadLocal 的操作都是围绕着 threadLocals 来操作的。
从JDK源码可见,ThreadLocalMap中的Entry是弱引用类型的,这就意味着如果这个ThreadLocal只被这个Entry引用,而没有被其他对象强引用时,就会在下一次GC的时候回收掉。
EagleEye(鹰眼)作为全链路监控系统在集团内部被广泛使用,traceId、rpcId、压测标等信息存储在EagleEye的ThreadLocal变量中,并在HSF/Dubbo服务调用间进行传递。EagleEye通过Filter将数据初始化到ThreadLocal中,部分相关代码如下:
在EagleEyeFilter中,通过EagleEyeRequestTracer.startTrace方法进行初始化,在前置入参转换后,通过startTrace重载方法将鹰眼上下文参数存入ThreadLocal中,相关代码如下:
EagleEyeFilter在finally代码块中,通过EagleEyeRequestTracer.endTrace方法结束调用链,通过clear方法将ThreadLocal中的数据进行清理,相关代码实现如下:
在某权益领取原有链路中,通过app打开一级页面后才能发起权益领取请求,请求经过淘系无线网关(Mtop)后到达服务端,服务端通过mtop sdk获取当前会话信息。
在XX项目中,对权益领取链路进行了升级改造,在一级页面请求时,通过服务端同时发起权益领取请求。具体地,服务端在处理一级页面请求时,同时通过调用hsf/bbo接口来进行权益领取,因此在发起rpc调用时需要携带用户当前会话信息,在服务提供端将会话信息进行提取并注入到mtop上下文,从而才能通过mtop sdk获取到会话id等信息。某开发同学在实现时,因ThreadLocal使用不当造成下述问题:
【问题1:权益领取失败分析】
在权益领取服务中,该应用构建了一套高效和线程安全的依赖注入框架,基于该框架的业务逻辑模块通常抽象为xxxMole形式,Mole间为网状依赖关系,框架会按依赖关系自动调用init方法(其中,被依赖的mole 的init方法先执行)。
在应用中,权益领取接口的主入口为CommonXXApplyMole类,CommonXXApplyMole依赖XXSessionMole。当请求来临时,会按依赖关系依次调用init方法,因此XXSessionMole的init方法会优先执行;而开发同学在CommonXXApplyMole类中的init方法中通过调用recoverMtopContext()方法来期望恢复mtop上下文,因recoverMtopContext()方法的调用时机过晚,从而导致XXSessionMole模块获取不到正确的会话id等信息而导致权益领取失败。
【问题2:脏数据分析】
权益领取服务在处理请求时,若当前线程曾经处理过权益领取请求,因ThreadLocal变量值未被清理,此时XXSessionMole通过mtop SDK获取会话信息时得到的是前一次请求的会话信息,从而造成脏数据。
【解决方案】
在依赖注入框架入口处AbstractGate#visit(或在XXSessionMole中)通过recoverMtopContext方法注入mtop上下文信息,并在入口方法的finally代码块清理当前请求的threadlocal变量值。
若使用强引用类型,则threadlocal的引用链为:Thread -> ThreadLocal.ThreadLocalMap -> Entry[] -> Entry -> key(threadLocal对象)和value;在这种场景下,只要这个线程还在运行(如线程池场景),若不调用remove方法,则该对象及关联的所有强引用对象都不会被垃圾回收器回收。
若使用static关键字进行修饰,则一个线程仅对应一个线程变量;否则,threadlocal语义变为perThread-perInstance,容易引发内存泄漏,如下述示例:
在上述main方法第22行debug,可见线程的threadLocals变量中有3个threadlocal实例。在工程实践中,使用threadlocal时通常期望一个线程只有一个threadlocal实例,因此,若不使用static修饰,期望的语义发生了变化,同时易引起内存泄漏。
如果不执行清理操作,则可能会出现:
建议使用try...finally 进行清理。
我们在使用ThreadLocal时,通常期望的语义是perThread,若不使用static进行修饰,则语义变为perThread-perInstance;在线程池场景下,若不用static进行修饰,创建的线程相关实例可能会达到 M * N个(其中M为线程数,N为对应类的实例数),易造成内存泄漏(https://errorprone.info/bugpattern/ThreadLocalUsage)。
在应用中,谨慎使用ThreadLocal.withInitial(Supplier<? extends S> supplier)这个工厂方法创建ThreadLocal对象,一旦不同线程的ThreadLocal使用了同一个Supplier对象,那么隔离也就无从谈起了,如:
总结
在java工程实践中,线程池和线程变量被广泛使用,因线程池和线程变量的不当使用经常造成安全生产事故,因此,正确使用线程池和线程变量是每一位开发人员必须修炼的基本功。本文从线程池和线程变量的使用出发,简要介绍了线程池和线程变量的原理和使用实践,各开发人员可结合最佳实践和实际应用场景,正确地使用线程和线程变量,构建出稳定、高效的java应用服务。