线程池:
线程池,顾名思义存放线程的池子,因为频繁的创建和销毁线程池是一件非常损耗性能的事情,所以如果先定义一个池子里面放上一定量的线程,有需要的时候就去里面取,用完了再放里面,这样不仅能缩短创建销毁线程的时间,也能减轻服务器的压力。在jdk1.5中Doug Lea引入了Executor框架,把任务的提交和执行解耦,在代码层面,我们只需要提交任务, 不再需要再关心线程是如何执行。
Executors:
1.创建线程池
Executors是java线程池的工厂类,他可以帮助我们快速创建一个线程池,如Executors.newFixedThreadPool
方法可以生成一个拥有固定线程数的线程池。
初始化线程池的时候有5个参数,分别是 corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,还有两个默认参数 threadFactory, handler
corePoolSize:线程池初始核心线程数。初始化线程池的时候,池内是没有线程,只有在执行任务的时会创建线程。
maximumPoolSize:线程池允许存在的最大线程数。
keepAliveTime: 当线程数大于corePoolSize,线程空闲时的存活时间,即当线程没有任务执行时,继续存活的时间;默认情况下,该参数只在线程数大于corePoolSize时才有用
unit:keepAliveTime 的时间单位
workQueue:缓存任务的的队列,一般采用LinkedBlockingQueue。
threadFactory:执行程序创建新线程时使用的工厂,一般采用默认值。
handler:超出线程范围和队列容量而使执行被阻塞时所使用的处理程序,一般采用默认值 AbortPolicy。
2.线程池种类
1.SingleThreadExecutor:corePoolSize和maximumPoolSize都是1,所以keepAliveTime失效,创建的是只有一个线程,如果线程异常结束,会立即创建一个新的线程,这会保证提交的任务会顺序执行
2.FixedThreadPool:固定大小的线程池corePoolSize=maximumPoolSize ,keepAliveTime=0,当线程池没有可执行任务时,不会释放线程。
由于FixedThreadPool的LinkedBlockQueue是无界的,所以maximumPoolSize参数是无效的,当线程数大于corePoolSize时,多余的线程都会进入LinkedBlockQueue等待
当线程完成任务后,会不断的从LinkedBlockQueue取任务来执行。
3.CachedThreadPool:maximumPoolSize=Integer.MAX_VALUE,这是一个很危险的参数,当线程池没有可执行的任务的时,会释放线程,当提交新任务时,如果没有空闲线程,则创建新线程执行任务,会导致一定的系统开销;所以使用这个线程池时,我们一定要注意系统的并发量,否则一不小心成千上万的线程创建出来,会非常的危险。
CachedThreadPool用的是SynchronousQueue,这是一个没有容量的队列。
由于corePoolSize=0,所以每次有任务都会进入到SynchronousQueue,但是SynchronousQueue的offer和take必须成对出现,所以大部分情况下,CachedThreadPool接受到任务会直接新建一个线程,由于maximumPoolSize=Integer.MAX_VALUE,所以基本等于无限制
4.ScheduledThreadPool:初始化的线程池可以在指定的时间内周期性的执行所提交的任务,在实际的业务场景中可以使用该线程池定期的同步数据
ThreadPoolExecutor:
1.状态常量定义
上述我们讲了四种线程池,除了ScheduledThreadPool,其余都是基于ThreadPoolExecutor实现的,现在来了解下ThreadPoolExecutor的内部实现
1.ctl是个非常灵活的变量,他的高三位表示线程的运行状态,低29位表示线程池中的线程数
2.RUNING 高三位是111,该状态的线程池会接收新任务,并处理阻塞队列中的任务;
3.SHUTDOWN 高三位000,该状态的线程池不会接收新任务,但会处理阻塞队列中的任务;
4.STOP 高三位是001,该状态的线程不会接收新任务,也不会处理阻塞队列中的任务,而且会中断正在运行的任务
5.TIDYING 即高3位为010;
6.TERMINATED 即高3位为011;2.任务执行
当向线程池提交任务,线程池执行execute方法
1 public void execute(Runnable command) { 2 //判断空 3 if (command == null) 4 throw new NullPointerException(); 5 int c = ctl.get(); 6 //判断当前线程数和corePoolSize,如果小于corePoolSize 执行addWorker方法创建新的线程执行任务 7 if (workerCountOf(c) < corePoolSize) { 8 if (addWorker(command, true)) 9 return;10 c = ctl.get();11 }12 //如果线程池处于RUNNING状态,且成功的把提交的任务放入阻塞队列中13 if (isRunning(c) && workQueue.offer(command)) {14 int recheck = ctl.get();15 //再次检查状态,如果不是运行中,且成功从阻塞队列中把任务删除,16 //执行reject方法,否则的话执行addWorker创建新线程17 if (! isRunning(recheck) && remove(command))18 reject(command);19 else if (workerCountOf(recheck) == 0)20 addWorker(null, false);21 }22 //如果addWoker执行失败,则执行reject方法处理任务;23 else if (!addWorker(command, false))24 reject(command);25 }
我们可以看到execute方法中,通过addWorker创建新的线程,我们再看看addWorker里面发生了什么
1 private boolean addWorker(Runnable firstTask, boolean core) { 2 retry: 3 for (;;) { 4 int c = ctl.get(); 5 int rs = runStateOf(c); 6 7 // Check if queue empty only if necessary. 8 //判断线程池状态,如果线程池的状态值大于或等于SHUTDOWN,直接返回false 9 if (rs >= SHUTDOWN &&10 ! (rs == SHUTDOWN &&11 firstTask == null &&12 ! workQueue.isEmpty()))13 return false;14 15 for (;;) {16 //获取当前线程池中的线程数17 int wc = workerCountOf(c);18 //如果线程数大于池的容量,直接返回false19 if (wc >= CAPACITY ||20 wc >= (core ? corePoolSize : maximumPoolSize))21 return false;22 //增加线程池中的线程计数,成功则跳出循环23 if (compareAndIncrementWorkerCount(c))24 break retry;25 c = ctl.get(); // Re-read ctl26 if (runStateOf(c) != rs)27 continue retry;28 // else CAS failed due to workerCount change; retry inner loop29 }30 }31 32 boolean workerStarted = false;33 boolean workerAdded = false;34 Worker w = null;35 try {36 //线程池的工作线程是通过Worker实现37 w = new Worker(firstTask);38 final Thread t = w.thread;39 if (t != null) {40 //加锁41 final ReentrantLock mainLock = this.mainLock;42 mainLock.lock();43 try {44 int rs = runStateOf(ctl.get());45 //判断当前线程状态46 if (rs < SHUTDOWN ||47 (rs == SHUTDOWN && firstTask == null)) {48 //???49 if (t.isAlive()) // precheck that t is startable50 throw new IllegalThreadStateException();51 //添加到集合里去52 workers.add(w);53 int s = workers.size();54 if (s > largestPoolSize)55 largestPoolSize = s;56 workerAdded = true;57 }58 } finally {59 mainLock.unlock();60 }61 //启动线程,这里调用的是runWorker方法62 if (workerAdded) {63 t.start();64 workerStarted = true;65 }66 }67 } finally {68 if (! workerStarted)69 addWorkerFailed(w);70 }71 return workerStarted;72 }
Work实现线程池中的线程
addworker方法中启动线程时,其实是执行了runWoeker方法,我们看下内部执行
1 final void runWorker(Worker w) { 2 Thread wt = Thread.currentThread(); 3 Runnable task = w.firstTask; 4 w.firstTask = null; 5 w.unlock(); // allow interrupts 6 boolean completedAbruptly = true; 7 try { 8 //从getTask获取要执行的任务,直到返回null。这里达到了线程复用的效果,让线程处理多个任务。 9 while (task != null || (task = getTask()) != null) {10 w.lock();11 // If pool is stopping, ensure thread is interrupted;12 // if not, ensure thread is not interrupted. This13 // requires a recheck in second case to deal with14 // shutdownNow race while clearing interrupt15 //保证了线程池在STOP状态下线程是中断的,非STOP状态下线程没有被中断16 if ((runStateAtLeast(ctl.get(), STOP) ||17 (Thread.interrupted() &&18 runStateAtLeast(ctl.get(), STOP))) &&19 !wt.isInterrupted())20 wt.interrupt();21 try {22 beforeExecute(wt, task);23 Throwable thrown = null;24 try {25 //执行线程26 task.run();27 } catch (RuntimeException x) {28 thrown = x; throw x;29 } catch (Error x) {30 thrown = x; throw x;31 } catch (Throwable x) {32 thrown = x; throw new Error(x);33 } finally {34 afterExecute(task, thrown);35 }36 } finally {37 task = null;38 w.completedTasks++;39 w.unlock();40 }41 }42 completedAbruptly = false;43 } finally {44 processWorkerExit(w, completedAbruptly);45 }46 }
这里面最关键的就是第9行的getTask方法,
这里截取了getTask的关键代码,