❶ java线程池
java常用的线程池有三种:
1.
newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads)创建一个可重用固定线程数的线程池,以共享的无界队列方式来运行这些线程。在任意点,在大多数 nThreads 线程会处于处理任务的活动状态。如果在所有线程处于活动状态时提交附加任务,则在有可用线程之前,附加任务将在队列中等待。如果在关闭前的执行期间由于失败而导致任何线程终止,那么一个新线程将代替它执行后续的任务(如果需要)。在某个线程被显式地关闭之前,池中的线程将一直存在。
参数:
nThreads - 池中的线程数
返回:
新创建的线程池
抛出:
IllegalArgumentException - 如果 nThreads <= 0
2.
newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor()创建一个使用单个 worker 线程的 Executor,以无界队列方式来运行该线程。(注意,如果因为在关闭前的执行期间出现失败而终止了此单个线程,那么如果需要,一个新线程将代替它执行后续的任务)。可保证顺序地执行各个任务,并且在任意给定的时间不会有多个线程是活动的。与其他等效的 newFixedThreadPool(1) 不同,可保证无需重新配置此方法所返回的执行程序即可使用其他的线程。
返回:
新创建的单线程 Executor
3.
newCachedThreadPool
public static ExecutorService newCachedThreadPool()创建一个可根据需要创建新线程的线程池,但是在以前构造的线程可用时将重用它们。对于执行很多短期异步任务的程序而言,这些线程池通常可提高程序性能。调用 execute 将重用以前构造的线程(如果线程可用)。如果现有线程没有可用的,则创建一个新线程并添加到池中。终止并从缓存中移除那些已有 60 秒钟未被使用的线程。因此,长时间保持空闲的线程池不会使用任何资源。注意,可以使用 ThreadPoolExecutor 构造方法创建具有类似属性但细节不同(例如超时参数)的线程池。
返回:
新创建的线程池
❷ java 线程中的run()方法无返回值并且不停止,如何得到线程数据
可以让线程来实现Callable接口,实现call方法源,可以把你需要的东西return回去,然后如果是线程池的话可以用Future对象的get()方法来接收这个返回值。但是这样做线程池也只会同步执行一个线程。
❸ 线程池中空闲的线程处于什么状态
一:阻塞状态,线程并没有销毁,也没有得到CPU时间片执行;源码追踪:for (;;) {... workQueue.take();...}public E take()...{...while (count.get() == 0) { / /这里就是任务队列中的消息数量notEmpty.await();}...}public final void await()...{...LockSupport.park(this);...}继续往下:public static void park(Object blocker) {Thread t = Thread.currentThread();setBlocker(t, blocker);U.park(false, 0L);setBlocker(t, null);}private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();//线程调用该方法,线程将一直阻塞直到超时,或者是中断条件出现。 public native void park(boolean isAbsolute, long time);上面就是java11线程池中阻塞的源码追踪;二.对比object的wait()方法:@FastNativepublic final native void wait(long timeout, int nanos) throws InterruptedException;还有Thread的sleep() 方法:@FastNativeprivate static native void sleep(Object lock, long millis, int nanos)throws...;可见,线程池中使用的阻塞方式并不是Object中的wait(),也不是Thread.sleep() ;这3个方法最终实现都是通过c&c++实现的native方法.三.在<<Java虚拟机(第二版)>>中,对线程状态有以下介绍:12.4.3状态转换Java语言定义了5种线程状态,在任意一个时间点,一个线程只能有且只有其中的一种状态,这5种状态分别如下。1)新建(New):创建后尚未启动的线程处于这种状态。2)运行(Runable):Runable包括了操作系统线程状态中的Running和Ready,也就是处于此状态的线程有可能正在执行,也有可能正在等待着CPU为它分配执行时间。3)无限期等待(Waiting):处于这种状态的线程不会被分配CPU执行时间,它们要等待被其他线程显式地唤醒。以下方法会让线程陷入无限期的等待状态:●没有设置Timeout参数的Object.wait()方法。●没有设置Timeout参数的Thread.join()方法。●LockSupport.park()方法。4)限期等待(Timed Waiting):处于这种状态的线程也不会被分配CPU执行时间,不过无须等待被其他线程显式地唤醒,在一定时间之后它们会由系统自动唤醒。以下方法会让线程进入限期等待状态:●Thread.sleep()方法。●设置了Timeout参数的Object.wait()方法。●设置了Timeout参数的Thread.join()方法。●LockSupport.parkNanos()方法。●LockSupport.parkUntil()方法。5)阻塞(Blocked):线程被阻塞了,“阻塞状态”与“等待状态”的区别是:“阻塞状态”在等待着获取到一个排他锁,这个事件将在另外一个线程放弃这个锁的时候发生;而“等待状态”则是在等待一段时间,或者唤醒动作的发生。在程序等待进入同步区域的时候,线程将进入这种状态。结束(Terminated):已终止线程的线程状态,线程已经结束执行。❹ java,线程问题,怎么判断线程是否结束啊
通过Thread类中的isAlive()方法判断线程是否处于活动状态;
线程启动后,只要没有运行完毕,都会返专回true;
❺ java线程池怎么实现的
多线程技术主要解决处理器单元内多个线程执行的问题,它可以显著减少处理器单元的闲置时间,增加处理器单元的吞吐能力。
假设一个服务器完成一项任务所需时间为:T1 创建线程时间,T2 在线程中执行任务的时间,T3 销毁线程时间。
如果:T1 + T3 远大于 T2,则可以采用线程池,以提高服务器性能。
一个线程池包括以下四个基本组成部分:
1、线程池管理器(ThreadPool):用于创建并管理线程池,包括 创建线程池,销毁线程池,添加新任务;
2、工作线程(PoolWorker):线程池中线程,在没有任务时处于等待状态,可以循环的执行任务;
3、任务接口(Task):每个任务必须实现的接口,以供工作线程调度任务的执行,它主要规定了任务的入口,任务执行完后的收尾工作,任务的执行状态等;
4、任务队列(taskQueue):用于存放没有处理的任务。提供一种缓冲机制。
线程池技术正是关注如何缩短或调整T1,T3时间的技术,从而提高服务器程序性能的。它把T1,T3分别安排在服务器程序的启动和结束的时间段或者一些空闲的时间段,这样在服务器程序处理客户请求时,不会有T1,T3的开销了。
线程池不仅调整T1,T3产生的时间段,而且它还显著减少了创建线程的数目,看一个例子:
假设一个服务器一天要处理50000个请求,并且每个请求需要一个单独的线程完成。在线程池中,线程数一般是固定的,所以产生线程总数不会超过线程池中线程的数目,而如果服务器不利用线程池来处理这些请求则线程总数为50000。一般线程池大小是远小于50000。所以利用线程池的服务器程序不会为了创建50000而在处理请求时浪费时间,从而提高效率。
代码实现中并没有实现任务接口,而是把Runnable对象加入到线程池管理器(ThreadPool),然后剩下的事情就由线程池管理器(ThreadPool)来完成了
packagemine.util.thread;
importjava.util.LinkedList;
importjava.util.List;
/**
*线程池类,线程管理器:创建线程,执行任务,销毁线程,获取线程基本信息
*/
publicfinalclassThreadPool{
//线程池中默认线程的个数为5
privatestaticintworker_num=5;
//工作线程
privateWorkThread[]workThrads;
//未处理的任务
_task=0;
//任务队列,作为一个缓冲,List线程不安全
privateList<Runnable>taskQueue=newLinkedList<Runnable>();
;
//创建具有默认线程个数的线程池
privateThreadPool(){
this(5);
}
//创建线程池,worker_num为线程池中工作线程的个数
privateThreadPool(intworker_num){
ThreadPool.worker_num=worker_num;
workThrads=newWorkThread[worker_num];
for(inti=0;i<worker_num;i++){
workThrads[i]=newWorkThread();
workThrads[i].start();//开启线程池中的线程
}
}
//单态模式,获得一个默认线程个数的线程池
(){
returngetThreadPool(ThreadPool.worker_num);
}
//单态模式,获得一个指定线程个数的线程池,worker_num(>0)为线程池中工作线程的个数
//worker_num<=0创建默认的工作线程个数
(intworker_num1){
if(worker_num1<=0)
worker_num1=ThreadPool.worker_num;
if(threadPool==null)
threadPool=newThreadPool(worker_num1);
returnthreadPool;
}
//执行任务,其实只是把任务加入任务队列,什么时候执行有线程池管理器觉定
publicvoidexecute(Runnabletask){
synchronized(taskQueue){
taskQueue.add(task);
taskQueue.notify();
}
}
//批量执行任务,其实只是把任务加入任务队列,什么时候执行有线程池管理器觉定
publicvoidexecute(Runnable[]task){
synchronized(taskQueue){
for(Runnablet:task)
taskQueue.add(t);
taskQueue.notify();
}
}
//批量执行任务,其实只是把任务加入任务队列,什么时候执行有线程池管理器觉定
publicvoidexecute(List<Runnable>task){
synchronized(taskQueue){
for(Runnablet:task)
taskQueue.add(t);
taskQueue.notify();
}
}
//销毁线程池,该方法保证在所有任务都完成的情况下才销毁所有线程,否则等待任务完成才销毁
publicvoiddestroy(){
while(!taskQueue.isEmpty()){//如果还有任务没执行完成,就先睡会吧
try{
Thread.sleep(10);
}catch(InterruptedExceptione){
e.printStackTrace();
}
}
//工作线程停止工作,且置为null
for(inti=0;i<worker_num;i++){
workThrads[i].stopWorker();
workThrads[i]=null;
}
threadPool=null;
taskQueue.clear();//清空任务队列
}
//返回工作线程的个数
publicintgetWorkThreadNumber(){
returnworker_num;
}
//返回已完成任务的个数,这里的已完成是只出了任务队列的任务个数,可能该任务并没有实际执行完成
(){
returnfinished_task;
}
//返回任务队列的长度,即还没处理的任务个数
publicintgetWaitTasknumber(){
returntaskQueue.size();
}
//覆盖toString方法,返回线程池信息:工作线程个数和已完成任务个数
@Override
publicStringtoString(){
return"WorkThreadnumber:"+worker_num+"finishedtasknumber:"
+finished_task+"waittasknumber:"+getWaitTasknumber();
}
/**
*内部类,工作线程
*/
{
//该工作线程是否有效,用于结束该工作线程
privatebooleanisRunning=true;
/*
*关键所在啊,如果任务队列不空,则取出任务执行,若任务队列空,则等待
*/
@Override
publicvoidrun(){
Runnabler=null;
while(isRunning){//注意,若线程无效则自然结束run方法,该线程就没用了
synchronized(taskQueue){
while(isRunning&&taskQueue.isEmpty()){//队列为空
try{
taskQueue.wait(20);
}catch(InterruptedExceptione){
e.printStackTrace();
}
}
if(!taskQueue.isEmpty())
r=taskQueue.remove(0);//取出任务
}
if(r!=null){
r.run();//执行任务
}
finished_task++;
r=null;
}
}
//停止工作,让该线程自然执行完run方法,自然结束
publicvoidstopWorker(){
isRunning=false;
}
}
}
❻ 什么是java线程池
找的资料,你看一下吧:
多线程技术主要解决处理器单元内多个线程执行的问题,它可以显著减少处理器单元的闲置时间,增加处理器单元的吞吐能力。
假设一个服务器完成一项任务所需时间为:T1 创建线程时间,T2 在线程中执行任务的时间,T3 销毁线程时间。
如果:T1 + T3 远大于 T2,则可以采用线程池,以提高服务器性能。
一个线程池包括以下四个基本组成部分:
1、线程池管理器(ThreadPool):用于创建并管理线程池,包括 创建线程池,销毁线程池,添加新任务;
2、工作线程(PoolWorker):线程池中线程,在没有任务时处于等待状态,可以循环的执行任务;
3、任务接口(Task):每个任务必须实现的接口,以供工作线程调度任务的执行,它主要规定了任务的入口,任务执行完后的收尾工作,任务的执行状态等;
4、任务队列(taskQueue):用于存放没有处理的任务。提供一种缓冲机制。
线程池技术正是关注如何缩短或调整T1,T3时间的技术,从而提高服务器程序性能的。它把T1,T3分别安排在服务器程序的启动和结束的时间段或者一些空闲的时间段,这样在服务器程序处理客户请求时,不会有T1,T3的开销了。
线程池不仅调整T1,T3产生的时间段,而且它还显著减少了创建线程的数目,看一个例子:
假设一个服务器一天要处理50000个请求,并且每个请求需要一个单独的线程完成。在线程池中,线程数一般是固定的,所以产生线程总数不会超过线程池中线程的数目,而如果服务器不利用线程池来处理这些请求则线程总数为50000。一般线程池大小是远小于50000。所以利用线程池的服务器程序不会为了创建50000而在处理请求时浪费时间,从而提高效率。
❼ java 线程池 工作队列是如何工作的
使用线程池的好处
1、降低资源消耗
可以重复利用已创建的线程降低线程创建和销毁造成的消耗。
2、提高响应速度
当任务到达时,任务可以不需要等到线程创建就能立即执行。
3、提高线程的可管理性
线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控
线程池的工作原理
首先我们看下当一个新的任务提交到线程池之后,线程池是如何处理的
1、线程池判断核心线程池里的线程是否都在执行任务。如果不是,则创建一个新的工作线程来执行任务。如果核心线程池里的线程都在执行任务,则执行第二步。
2、线程池判断工作队列是否已经满。如果工作队列没有满,则将新提交的任务存储在这个工作队列里进行等待。如果工作队列满了,则执行第三步
3、线程池判断线程池的线程是否都处于工作状态。如果没有,则创建一个新的工作线程来执行任务。如果已经满了,则交给饱和策略来处理这个任务
线程池饱和策略
这里提到了线程池的饱和策略,那我们就简单介绍下有哪些饱和策略:
AbortPolicy
为Java线程池默认的阻塞策略,不执行此任务,而且直接抛出一个运行时异常,切记ThreadPoolExecutor.execute需要try catch,否则程序会直接退出。
DiscardPolicy
直接抛弃,任务不执行,空方法
DiscardOldestPolicy
从队列里面抛弃head的一个任务,并再次execute 此task。
CallerRunsPolicy
在调用execute的线程里面执行此command,会阻塞入口
用户自定义拒绝策略(最常用)
实现RejectedExecutionHandler,并自己定义策略模式
下我们以ThreadPoolExecutor为例展示下线程池的工作流程图
3.jpg
关键方法源码分析
我们看看核心方法添加到线程池方法execute的源码如下:
// //Executes the given task sometime in the future. The task //may execute in a new thread or in an existing pooled thread. // // If the task cannot be submitted for execution, either because this // executor has been shutdown or because its capacity has been reached, // the task is handled by the current {@code RejectedExecutionHandler}. // // @param command the task to execute // @throws RejectedExecutionException at discretion of // {@code RejectedExecutionHandler}, if the task // cannot be accepted for execution // @throws NullPointerException if {@code command} is null // public void execute(Runnable command) { if (command == null) throw new NullPointerException(); // // Proceed in 3 steps: // // 1. If fewer than corePoolSize threads are running, try to // start a new thread with the given command as its first // task. The call to addWorker atomically checks runState and // workerCount, and so prevents false alarms that would add // threads when it shouldn't, by returning false. // 翻译如下: // 判断当前的线程数是否小于corePoolSize如果是,使用入参任务通过addWord方法创建一个新的线程, // 如果能完成新线程创建exexute方法结束,成功提交任务 // 2. If a task can be successfully queued, then we still need // to double-check whether we should have added a thread // (because existing ones died since last checking) or that // the pool shut down since entry into this method. So we // recheck state and if necessary roll back the enqueuing if // stopped, or start a new thread if there are none. // 翻译如下: // 在第一步没有完成任务提交;状态为运行并且能否成功加入任务到工作队列后,再进行一次check,如果状态 // 在任务加入队列后变为了非运行(有可能是在执行到这里线程池shutdown了),非运行状态下当然是需要 // reject;然后再判断当前线程数是否为0(有可能这个时候线程数变为了0),如是,新增一个线程; // 3. If we cannot queue task, then we try to add a new // thread. If it fails, we know we are shut down or saturated // and so reject the task. // 翻译如下: // 如果不能加入任务到工作队列,将尝试使用任务新增一个线程,如果失败,则是线程池已经shutdown或者线程池 // 已经达到饱和状态,所以reject这个他任务 // int c = ctl.get(); // 工作线程数小于核心线程数 if (workerCountOf(c) < corePoolSize) { // 直接启动新线程,true表示会再次检查workerCount是否小于corePoolSize if (addWorker(command, true)) return; c = ctl.get(); } // 如果工作线程数大于等于核心线程数 // 线程的的状态未RUNNING并且队列notfull if (isRunning(c) && workQueue.offer(command)) { // 再次检查线程的运行状态,如果不是RUNNING直接从队列中移除 int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) // 移除成功,拒绝该非运行的任务 reject(command); else if (workerCountOf(recheck) == 0) // 防止了SHUTDOWN状态下没有活动线程了,但是队列里还有任务没执行这种特殊情况。 // 添加一个null任务是因为SHUTDOWN状态下,线程池不再接受新任务 addWorker(null, false); } // 如果队列满了或者是非运行的任务都拒绝执行 else if (!addWorker(command, false)) reject(command); }
❽ java线程池怎么实现
要想理解清楚java线程池实现原理,明白下面几个问题就可以了:
(1):线程池存在哪些状态,这些状态之间是如何进行切换的呢?
(2):线程池的种类有哪些?
(3):创建线程池需要哪些参数,这些参数的具体含义是什么?
(4):将任务添加到线程池之后运行流程?
(5):线程池是怎么做到重用线程的呢?
(6):线程池的关闭
首先回答第一个问题:线程池存在哪些状态;
查看ThreadPoolExecutor源码便知晓:
[java]view plain
//runStateisstoredinthehigh-orderbits
privatestaticfinalintRUNNING=-1<<COUNT_BITS;
privatestaticfinalintSHUTDOWN=0<<COUNT_BITS;
privatestaticfinalintSTOP=1<<COUNT_BITS;
privatestaticfinalintTIDYING=2<<COUNT_BITS;
=3<<COUNT_BITS;
存在5种状态:
<1>Running:可以接受新任务,同时也可以处理阻塞队列里面的任务;
<2>Shutdown:不可以接受新任务,但是可以处理阻塞队列里面的任务;
<3>Stop:不可以接受新任务,也不处理阻塞队列里面的任务,同时还中断正在处理的任务;
<4>Tidying:属于过渡阶段,在这个阶段表示所有的任务已经执行结束了,当前线程池中是不存在有效的线程的,并且将要调用terminated方法;
<5>Terminated:终止状态,这个状态是在调用完terminated方法之后所处的状态;
那么这5种状态之间是如何进行转换的呢?查看ThreadPoolExecutor源码里面的注释便可以知道啦:
[java]view plain
*RUNNING->SHUTDOWN
*Oninvocationofshutdown(),perhapsimplicitlyinfinalize()
*(RUNNINGorSHUTDOWN)->STOP
*OninvocationofshutdownNow()
*SHUTDOWN->TIDYING
*Whenbothqueueandpoolareempty
*STOP->TIDYING
*Whenpoolisempty
*TIDYING->TERMINATED
*Whentheterminated()hookmethodhascompleted
从上面可以看到,在调用shutdown方法的时候,线程池状态会从Running转换成Shutdown;在调用shutdownNow方法的时候,线程池状态会从Running/Shutdown转换成Stop;在阻塞队列为空同时线程池为空的情况下,线程池状态会从Shutdown转换成Tidying;在线程池为空的情况下,线程池状态会从Stop转换成Tidying;当调用terminated方法之后,线程池状态会从Tidying转换成Terminate;
在明白了线程池的各个状态以及状态之间是怎么进行切换之后,我们来看看第二个问题,线程池的种类:
(1):CachedThreadPool:缓存线程池,该类线程池中线程的数量是不确定的,理论上可以达到Integer.MAX_VALUE个,这种线程池中的线程都是非核心线程,既然是非核心线程,那么就存在超时淘汰机制了,当里面的某个线程空闲时间超过了设定的超时时间的话,就会回收掉该线程;
(2):FixedThreadPool:固定线程池,这类线程池中是只存在核心线程的,对于核心线程来说,如果我们不设置allowCoreThreadTimeOut属性的话是不存在超时淘汰机制的,这类线程池中的corePoolSize的大小是等于maximumPoolSize大小的,也就是说,如果线程池中的线程都处于活动状态的话,如果有新任务到来,他是不会开辟新的工作线程来处理这些任务的,只能将这些任务放到阻塞队列里面进行等到,直到有核心线程空闲为止;
(3):ScheledThreadPool:任务线程池,这种线程池中核心线程的数量是固定的,而对于非核心线程的数量是不限制的,同时对于非核心线程是存在超时淘汰机制的,主要适用于执行定时任务或者周期性任务的场景;
(4):SingleThreadPool:单一线程池,线程池里面只有一个线程,同时也不存在非核心线程,感觉像是FixedThreadPool的特殊版本,他主要用于确保任务在同一线程中的顺序执行,有点类似于进行同步吧;
接下来我们来看第三个问题,创建线程池需要哪些参数:
同样查看ThreadPoolExecutor源码,查看创建线程池的构造函数:
[java]view plain
publicThreadPoolExecutor(intcorePoolSize,
intmaximumPoolSize,
longkeepAliveTime,
TimeUnitunit,
BlockingQueue<Runnable>workQueue,
ThreadFactorythreadFactory,
)
不管你调用的是ThreadPoolExecutor的哪个构造函数,最终都会执行到这个构造函数的,这个构造函数有7个参数,正是由于对这7个参数值的赋值不同,造成生成不同类型的线程池,比如我们常见的CachedThreadPoolExecutor、FixedThreadPoolExecutor
SingleThreadPoolExecutor、ScheledThreadPoolExecutor,我们老看看这几个参数的具体含义:
<1>corePoolSize:线程池中核心线程的数量;当提交一个任务到线程池的时候,线程池会创建一个线程来执行执行任务,即使有其他空闲的线程存在,直到线程数达到corePoolSize时不再创建,这时候会把提交的新任务放入到阻塞队列中,如果调用了线程池的preStartAllCoreThreads方法,则会在创建线程池的时候初始化出来核心线程;
<2>maximumPoolSize:线程池允许创建的最大线程数;如果阻塞队列已经满了,同时已经创建的线程数小于最大线程数的话,那么会创建新的线程来处理阻塞队列中的任务;
<3>keepAliveTime:线程活动保持时间,指的是工作线程空闲之后继续存活的时间,默认情况下,这个参数只有线程数大于corePoolSize的时候才会起作用,即当线程池中的线程数目大于corePoolSize的时候,如果某一个线程的空闲时间达到keepAliveTime,那么这个线程是会被终止的,直到线程池中的线程数目不大于corePoolSize;如果调用allowCoreThreadTimeOut的话,在线程池中线程数量不大于corePoolSize的时候,keepAliveTime参数也可以起作用的,知道线程数目为0为止;
<4>unit:参数keepAliveTime的时间单位;
<5>workQueue:阻塞队列;用于存储等待执行的任务,有四种阻塞队列类型,ArrayBlockingQueue(基于数组的有界阻塞队列)、LinkedBlockingQueue(基于链表结构的阻塞队列)、SynchronousQueue(不存储元素的阻塞队列)、PriorityBlockingQueue(具有优先级的阻塞队列);
<6>threadFactory:用于创建线程的线程工厂;
<7>handler:当阻塞队列满了,且没有空闲线程的情况下,也就是说这个时候,线程池中的线程数目已经达到了最大线程数量,处于饱和状态,那么必须采取一种策略来处理新提交的任务,我们可以自己定义处理策略,也可以使用系统已经提供给我们的策略,先来看看系统为我们提供的4种策略,AbortPolicy(直接抛出异常)、CallerRunsPolicy(只有调用者所在的线程来运行任务)、DiscardOldestPolicy(丢弃阻塞队列中最近的一个任务,并执行当前任务)、Discard(直接丢弃);
接下来就是将任务添加到线程池之后的运行流程了;
我们可以调用submit或者execute方法,两者最大的区别在于,调用submit方法的话,我们可以传入一个实现Callable接口的对象,进而能在当前任务执行结束之后通过Future对象获得任务的返回值,submit内部实际上还是执行的execute方法;而调用execute方法的话,是不能获得任务执行结束之后的返回值的;此外,调用submit方法的话是可以抛出异常的,但是调用execute方法的话,异常在其内部得到了消化,也就是说异常在其内部得到了处理,不会向外传递的;
因为submit方法最终也是会执行execute方法的,因此我们只需要了解execute方法就可以了:
在execute方法内部会分三种情况来进行处理:
<1>:首先判断当前线程池中的线程数量是否小于corePoolSize,如果小于的话,则直接通过addWorker方法创建一个新的Worker对象来执行我们当前的任务;
<2>:如果说当前线程池中的线程数量大于corePoolSize的话,那么会尝试将当前任务添加到阻塞队列中,然后第二次检查线程池的状态,如果线程池不在Running状态的话,会将刚刚添加到阻塞队列中的任务移出,同时拒绝当前任务请求;如果第二次检查发现当前线程池处于Running状态的话,那么会查看当前线程池中的工作线程数量是否为0,如果为0的话,就会通过addWorker方法创建一个Worker对象出来处理阻塞队列中的任务;
<3>:如果原先线程池就不处于Running状态或者我们刚刚将当前任务添加到阻塞队列的时候出现错误的话,那么会去尝试通过addWorker创建新的Worker来处理当前任务,如果添加失败的话,则拒绝当前任务请求;
可以看到在上面的execute方法中,我们仅仅只是检查了当前线程池中的线程数量有没有超过corePoolSize的情况,那么当前线程池中的线程数量有没有超过maximumPoolSize是在哪里检测的呢?实际上是在addWorker方法里面了,我们可以看下addWorker里面的一段代码:
[java]view plain
if(wc>=CAPACITY||
wc>=(core?corePoolSize:maximumPoolSize))
returnfalse;
如果当前线程数量超过maximumPoolSize的话,直接就会调用return方法,返回false;
其实到这里我们很明显可以知道,一个线程池中线程的数量实际上就是这个线程池中Worker的数量,如果Worker的大小超过了corePoolSize,那么任务都在阻塞队列里面了,Worker是Java对我们任务的一个封装类,他的声明是酱紫的:
[java]view plain
privatefinalclassWorker
implementsRunnable
可以看到他实现了Runnable接口,他是在addWorker方法里面通过new Worker(firstTask)创建的,我们来看看他的构造函数就知道了:
[java]view plain
Worker(RunnablefirstTask){
setState(-1);//
this.firstTask=firstTask;
this.thread=getThreadFactory().newThread(this);
}
而这里的firstTask其实就是我们调用execute或者submit的时候传入的那个参数罢了,一般来说这些参数是实现Callable或者Runnable接口的;
在通过addWorker方法创建出来Worker对象之后,这个方法的最后会执行Worker内部thread属性的start方法,而这个thread属性实际上就是封装了Worker的Thread,执行他的start方法实际上执行的是Worker的run方法,因为Worker是实现了Runnable接口的,在run方法里面就会执行runWorker方法,而runWorker方法里面首先会判断当前我们传入的任务是否为空,不为空的话直接就会执行我们通过execute或者submit方法提交的任务啦,注意一点就是我们虽然会通过submit方法提交实现了Callable接口的对象,但是在调用submit方法的时候,其实是会将Callable对象封装成实现了Runnable接口对象的,不信我们看看submit方法源码是怎么实现的:
[java]view plain
public<T>Future<T>submit(Callable<T>task){
if(task==null)thrownewNullPointerException();
RunnableFuture<T>ftask=newTaskFor(task);
execute(ftask);
returnftask;
}
看到没有呢,实际上在你传入实现了Callable接口对象的时候,在submit方法里面是会将其封装成RunnableFuture对象的,而RunnableFuture接口是继承了Runnable接口的;那么说白了其实就是直接执行我们提交任务的run方法了;如果为空的话,则会通过getTask方法从阻塞队列里面拿出一个任务去执行;在任务执行结束之后继续从阻塞队列里面拿任务,直到getTask的返回值为空则退出runWorker内部循环,那么什么情况下getTask返回为空呢?查看getTask方法的源码注释可以知道:在Worker必须需要退出的情况下getTask会返回空,具体什么情况下Worker会退出呢?(1):当Worker的数量超过maximumPoolSize的时候;(2):当线程池状态为Stop的时候;(3):当线程池状态为Shutdown并且阻塞队列为空的时候;(4):使用等待超时时间从阻塞队列中拿数据,但是超时之后仍然没有拿到数据;
如果runWorker方法退出了它里面的循环,那么就说明当前阻塞队列里面是没有任务可以执行的了,你可以看到在runWorker方法内部的finally语句块中执行了processWorkerExit方法,用来对Worker对象进行回收操作,这个方法会传入一个参数表示需要删除的Worker对象;在进行Worker回收的时候会调用tryTerminate方法来尝试关闭线程池,在tryTerminate方法里面会检查是否有Worker在工作,检查线程池的状态,没问题的话就会将当前线程池的状态过渡到Tidying,之后调用terminated方法,将线程池状态更新到Terminated;
从上面的分析中,我们可以看出线程池运行的4个阶段:
(1):poolSize < corePoolSize,则直接创建新的线程(核心线程)来执行当前提交的任务;
(2):poolSize = corePoolSize,并且此时阻塞队列没有满,那么会将当前任务添加到阻塞队列中,如果此时存在工作线程(非核心线程)的话,那么会由工作线程来处理该阻塞队列中的任务,如果此时工作线程数量为0的话,那么会创建一个工作线程(非核心线程)出来;
(3):poolSize = corePoolSize,并且此时阻塞队列已经满了,那么会直接创建新的工作线程(非核心线程)来处理阻塞队列中的任务;
(4):poolSize = maximumPoolSize,并且此时阻塞队列也满了的话,那么会触发拒绝机制,具体决绝策略采用的是什么就要看我们创建ThreadPoolExecutor的时候传入的RejectExecutionHandler参数了;
接下来就是线程池是怎么做到重用线程的呢?
个人认为线程池里面重用线程的工作是在getTask里面实现的,在getTask里面是存在两个for死循环嵌套的,他会不断的从阻塞对列里面取出需要执行的任务,返回给我们的runWorker方法里面,而在runWorker方法里面只要getTask返回的任务不是空就会执行该任务的run方法来处理它,这样一直执行下去,直到getTask返回空为止,此时的情况就是阻塞队列里面没有任务了,这样一个线程处理完一个任务之后接着再处理阻塞队列中的另一个任务,当然在线程池中的不同线程是可以并发处理阻塞队列中的任务的,最后在阻塞队列内部不存在任务的时候会去判断是否需要回收Worker对象,其实Worker对象的个数就是线程池中线程的个数,至于什么情况才需要回收,上面已经说了,就是四种情况了;
最后就是线程池是怎样被关闭的呢?
涉及到线程池的关闭,需要用到两个方法,shutdown和shutdownNow,他们都是位于ThreadPoolExecutor里面的,对于shutdown的话,他会将线程池状态切换成Shutdown,此时是不会影响对阻塞队列中任务执行的,但是会拒绝执行新加进来的任务,同时会回收闲置的Worker;而shutdownNow方法会将线程池状态切换成Stop,此时既不会再去处理阻塞队列里面的任务,也不会去处理新加进来的任务,同时会回收所有Worker;
❾ Java线程池
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
❿ 超详细的线程池使用解析
Java 中线程池是运用场景最多的并发框架,几乎所有需要异步或并发执行任务的程序都可以使用线程池。合理的使用线程池可以带来多个好处:
(1) 降低资源消耗 。通过重复利用已创建的线程降低线程在创建和销毁时造成的消耗。
(2) 提高响应速度 。当处理执行任务时,任务可以不需要等待线程的创建就能立刻执行。
(3) 提高线程的可管理性 。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。
线程池的处理流程如上图所示
线程池中通过 ctl 字段来表示线程池中的当前状态,主池控制状态 ctl 是 AtomicInteger 类型,包装了两个概念字段:workerCount 和 runState,workerCount 表示有效线程数,runState 表示是否正在运行、正在关闭等状态。使用 ctl 字段表示两个概念,ctl 的前 3 位表示线程池状态,线程池中限制 workerCount 为(2^29 )-1(约 5 亿)个线程,而不是 (2^31)-1(20 亿)个线程。workerCount 是允许启动和不允许停止的工作程序的数量。该值可能与实际的活动线程数暂时不同,例如,当 ThreadFactory 在被询问时未能创建线程时,以及退出线程在终止前仍在执行记时。用户可见的池大小报告为工作集的当前大小。 runState 提供主要的生命周期控制,取值如下表所示:
runState 随着时间的推移而改变,在 awaitTermination() 方法中等待的线程将在状态达到 TERMINATED 时返回。状态的转换为:
RUNNING -> SHUTDOWN 在调用 shutdown() 时,可能隐含在 finalize() 中
(RUNNING 或 SHUTDOWN)-> STOP 在调用 shutdownNow() 时
SHUTDOWN -> TIDYING 当队列和线程池都为空时
STOP -> TIDYING 当线程池为空时
TIDYING -> TERMINATED 当 terminate() 方法完成时
开发人员如果需要在线程池变为 TIDYING 状态时进行相应的处理,可以通过重载 terminated() 函数来实现。
结合上图说明线程池 ThreadPoolExecutor 执行流程,使用 execute() 方法提交任务到线程池中执行时分为4种场景:
(1)线程池中运行的线程数量小于 corePoolSize,创建新线程来执行任务。
(2)线程池中运行线程数量不小于 corePoolSize,将任务加入到阻塞队列 BlockingQueue。
(3)如果无法将任务加入到阻塞队列(队列已满),创建新的线程来处理任务(这里需要获取全局锁)。
(4)当创建新的线程数量使线程池中当前运行线程数量超过 maximumPoolSize,线程池中拒绝任务,调用 RejectedExecutionHandler.rejectedExecution() 方法处理。
源码分析:
线程池创建线程时,会将线程封装成工作线程 Worker,Worker 在执行完任务后,还会循环获取工作队列里的任务来执行。
创建线程池之前,首先要知道创建线程池中的核心参数:
corePoolSize (核心线程数大小):当提交任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,直到需要执行的任务数大于核心线程数时就不再创建。
runnableTaskQueue (任务队列):用于保存等待执行任务的阻塞队列。一般选择以下几种:
ArrayBlockingQueue:基于数组的有界阻塞队列,按照 FIFO 原则对元素进行排序。
LinkedBlockingQueue:基于链表的阻塞队列,按照 FIFO 原则对元素进行排序。
SynchronousQueue:同步阻塞队列,也是不存储元素的阻塞队列。每一个插入操作必须要等到另一个 线程调用移除操作,否则插入操作一直处于阻塞状态。
PriorityBlockingQueue:优先阻塞队列,一个具有优先级的无限阻塞队列。
maximumPoolSize (最大线程数大小):线程池允许创建的最大线程数,当队列已满,并且线程池中的线程数小于最大线程数,则线程池会创建新的线程执行任务。当使用无界队列时,此参数无用。
RejectedExecutionHandler (拒绝策略):当任务队列和线程池都满了,说明线程池处于饱和状态,那么必须使用拒绝策略来处理新提交的任务。JDK 内置拒绝策略有以下 4 种:
AbortPolicy:直接抛出异常
CallerRunsPolicy:使用调用者所在的线程来执行任务
DiscardOldestPolicy:丢弃队列中最近的一个任务来执行当前任务
DiscardPolicy:直接丢弃不处理
可以根据应用场景来实现 RejectedExecutionHandler 接口自定义处理策略。
keepAliveTime (线程存活时间):线程池的工作线程空闲后,保持存活的时间。
TimeUnit (存活时间单位):可选单位DAYS(天)、HOURS(小时)、MINUTES(分钟)、MILLISECONDS(毫秒)、MICROSECONDS(微妙)、NANOSECONDS(纳秒)。
ThreadFactory (线程工厂):可以通过线程工厂给创建出来的线程设置有意义的名字。
创建线程池主要分为两大类,第一种是通过 Executors 工厂类创建线程池,第二种是自定义创建线程池。根据《阿里java开发手册》中的规范,线程池不允许使用 Executors 去创建,原因是规避资源耗尽的风险。
创建一个单线程化的线程池
创建固定线程数的线程池
以上两种创建线程池方式使用链表阻塞队列来存放任务,实际场景中可能会堆积大量请求导致 OOM
创建可缓存线程池
允许创建的线程数量最大为 Integer.MAX_VALUE,当创建大量线程时会导致 CPU 处于重负载状态和 OOM 的发生
向线程池提交任务可以使用两个方法,分别为 execute() 和 submit()。
execute() 方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功。execute() 方法中传入的是 Runnable 类的实例。
submit() 方法用于提交需要返回值的任务。线程池会返回一个 Future 类型的对象,通过 future 对象可以判断任务是否执行成功,并且可以通过 future 的 get() 方法来获取返回值。get() 方法会阻塞当前线程直到任务完成,使用 get(long timeout, TimeUnit unit)方法会阻塞当前线程一段时间后立即返回,这时候可能任务没有执行完。
可以通过调用线程池的 shutdown() 或shutdownNow() 方法来关闭线程池。他们的原理是遍历线程池中的工作线程,然后逐个调用 interrupt() 方法来中断线程,所以无法响应中断任务可能永远无法终止。
shutdown() 和 shutdownNow() 方法的区别在于 shutdownNow 方法首先将线程池的状态设置为 STOP,然后尝试停止正在执行或暂停任务的线程,并返回等待执行任务的列表,而 shutdown 只是将线程池的状态设置成 SHUTDOWN 状态,然后中断所有没有正在执行任务的线程。
线程池使用面临的核心的问题在于: 线程池的参数并不好配置 。一方面线程池的运行机制不是很好理解,配置合理需要强依赖开发人员的个人经验和知识;另一方面,线程池执行的情况和任务类型相关性较大,IO 密集型和 CPU 密集型的任务运行起来的情况差异非常大,这导致业界并没有一些成熟的经验策略帮助开发人员参考。
(1)以任务型为参考的简单评估:
假设线程池大小的设置(N 为 CPU 的个数)
如果纯计算的任务,多线程并不能带来性能提升,因为 CPU 处理能力是稀缺的资源,相反导致较多的线程切换的花销,此时建议线程数为 CPU 数量或+1;----为什么+1?因为可以防止 N 个线程中有一个线程意外中断或者退出,CPU 不会空闲等待。
如果是 IO 密集型应用, 则线程池大小设置为 2N+1. 线程数 = CPU 核数 目标 CPU 利用率 (1 + 平均等待时间 / 平均工作时间)
(2)以任务数为参考的理想状态评估:
1)默认值
2)如何设置 * 需要根据相关值来决定 - tasks :每秒的任务数,假设为500~1000 - taskCost:每个任务花费时间,假设为0.1s - responsetime:系统允许容忍的最大响应时间,假设为1s
以上都为理想值,实际情况下要根据机器性能来决定。如果在未达到最大线程数的情况机器 cpu load 已经满了,则需要通过升级硬件和优化代码,降低 taskCost 来处理。
(仅为简单的理想状态的评估,可作为线程池参数设置的一个参考)
与主业务无直接数据依赖的从业务可以使用异步线程池来处理,在项目初始化时创建线程池并交给将从业务中的任务提交给异步线程池执行能够缩短响应时间。
严禁在业务代码中起线程!!!
当任务需要按照指定顺序(FIFO, LIFO, 优先级)执行时,推荐创建使用单线程化的线程池。
本文章主要说明了线程池的执行原理和创建方式以及推荐线程池参数设置和一般使用场景。在开发中,开发人员需要根据业务来合理的创建和使用线程池达到降低资源消耗,提高响应速度的目的。
原文链接:https://juejin.cn/post/7067324722811240479