- Java異步編程實戰(zhàn)
- 翟陸續(xù)
- 3457字
- 2020-01-15 10:22:29
2.2 顯式使用線程池實現(xiàn)異步編程
2.2.1 如何顯式使用線程池實現(xiàn)異步編程
在Java中我們可以使用線程池來實現(xiàn)線程復(fù)用,每當(dāng)我們需要執(zhí)行異步任務(wù)時,可以把任務(wù)投遞到線程池里進行異步執(zhí)行。我們可以修改上節(jié)的代碼,使用線程池來執(zhí)行異步任務(wù),修改后代碼如下:
// 0自定義線程池 private final static int AVALIABLE_PROCESSORS = Runtime.getRuntime(). availableProcessors(); private final static ThreadPoolExecutor POOL_EXECUTOR = new ThreadPoolExecutor(AVALIABLE_PROCESSORS, AVALIABLE_PROCESSORS * 2, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<>(5), new ThreadPoolExecutor.CallerRunsPolicy()); public static void main(String[] args) throws InterruptedException, ExecutionException { long start = System.currentTimeMillis(); // 1.開啟異步單元執(zhí)行任務(wù)A POOL_EXECUTOR.execute(() -> { try { doSomethingA(); } catch (Exception e) { e.printStackTrace(); } }); // 2.執(zhí)行任務(wù)B doSomethingB(); // 3.同步等待線程A運行結(jié)束 System.out.println(System.currentTimeMillis() - start); // 4.掛起當(dāng)前線程 Thread.currentThread().join(); }
上面代碼0創(chuàng)建了一個線程池,這里我們設(shè)置線程池核心線程個數(shù)為當(dāng)前物理機的CPU核數(shù),最大線程個數(shù)為當(dāng)前物理機CPU核數(shù)的2倍;設(shè)置線程池阻塞隊列的大小為5;需要注意的是,我們將線程池的拒絕策略設(shè)置為CallerRunsPolicy,即當(dāng)線程池任務(wù)飽和,執(zhí)行拒絕策略時不會丟棄新的任務(wù),而是會使用調(diào)用線程來執(zhí)行;另外我們使用了命名的線程創(chuàng)建工廠,以便排查問題時可以方便追溯是哪個相關(guān)業(yè)務(wù)。
創(chuàng)建完線程池后,代碼1則把異步任務(wù)提交到了線程池內(nèi)運行,而不是直接開啟一個新線程來運行;這里使用線程池起到了復(fù)用線程的作用,避免了線程的頻繁創(chuàng)建與銷毀,另外對線程個數(shù)也起到了限制作用。
其實通過上面代碼我們可以進一步釋放main線程的負(fù)擔(dān),也就是可以把任務(wù)doSomethingB的執(zhí)行也提交到線程池內(nèi)進行異步執(zhí)行,代碼如下:
// 0自定義線程池 private final static int AVALIABLE_PROCESSORS = Runtime.getRuntime(). availableProcessors(); private final static ThreadPoolExecutor POOL_EXECUTOR = new ThreadPoolExecutor(AVALIABLE_PROCESSORS, AVALIABLE_PROCESSORS * 2, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<>(5), new ThreadPoolExecutor.CallerRunsPolicy()); public static void main(String[] args) throws InterruptedException, ExecutionException { long start = System.currentTimeMillis(); // 1.開啟異步單元執(zhí)行任務(wù)A POOL_EXECUTOR.execute(() -> { try { doSomethingA(); } catch (Exception e) { e.printStackTrace(); } }); // 2.執(zhí)行任務(wù)B POOL_EXECUTOR.execute(() -> { try { doSomethingB(); } catch (Exception e) { e.printStackTrace(); } }); // 3.同步等待線程A運行結(jié)束 System.out.println(System.currentTimeMillis() - start); // 4.掛起當(dāng)前線程 Thread.currentThread().join(); }
如上面代碼所示,main函數(shù)所在線程只需要把兩個任務(wù)提交到線程池后就可以做自己的事情了,具體兩個任務(wù)是由線程池中的線程執(zhí)行。
上面演示了向線程池內(nèi)投遞異步任務(wù)并沒有返回值的情況,其實我們可以向線程池投遞一個Callable類型的異步任務(wù),并且獲取其執(zhí)行結(jié)果,代碼如下:
public class AsyncThreadPoolExample { public static String doSomethingA() { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("--- doSomethingA---"); return "A Task Done"; } // 0自定義線程池 private final static int AVALIABLE_PROCESSORS = Runtime.getRuntime(). availableProcessors(); private final static ThreadPoolExecutor POOL_EXECUTOR = new ThreadPoolExecutor(AVALIABLE_PROCESSORS, AVALIABLE_PROCESSORS * 2, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<>(5), new NamedThreadFactory("ASYNC-POOL"), new ThreadPoolExecutor. CallerRunsPolicy()); public static void main(String[] args) throws InterruptedException, ExecutionException { // 1.開啟異步單元執(zhí)行任務(wù)A Future<? > resultA = POOL_EXECUTOR.submit(() -> doSomethingA()); // 2.同步等待執(zhí)行結(jié)果 System.out.println(resultA.get()); } }
如上面代碼所示,doSomethingA方法具有String類型的返回值,代碼0創(chuàng)建了一個線程池,在main方法中,代碼1使用lambda表達式將Callable類型的任務(wù)提交到線程池,提交后會馬上返回一個Future對象,代碼2在futureA上調(diào)用get()方法阻塞等待異步任務(wù)的執(zhí)行結(jié)果。
如上代碼確實可以在main函數(shù)所在線程獲取到異步任務(wù)的執(zhí)行結(jié)果,但是main線程必須以阻塞的代價來獲取結(jié)果,在異步任務(wù)執(zhí)行完畢前,main函數(shù)所在線程就不能做其他事情了,這顯然不是我們所需要的,具體怎么解決這個問題,下章我們會具體講解。
2.2.2 線程池ThreadPoolExecutor原理剖析
1.概述
線程池作為異步執(zhí)行的利器,我們有必要講解下其內(nèi)部實現(xiàn),以便讓大家對異步編程有更深入的理解。首先我們看下其類圖結(jié)構(gòu)圖,如圖2-1所示。

圖2-1 線程池類圖結(jié)構(gòu)
如圖2-1所示,成員變量ctl是Integer的原子變量,使用一個變量同時記錄線程池狀態(tài)和線程池中線程個數(shù),假設(shè)計算機硬件的Integer類型是32位二進制標(biāo)示,如下面代碼所示,其中高3位用來表示線程池狀態(tài),后面29位用來記錄線程池線程個數(shù):
//用來標(biāo)記線程池狀態(tài)(高3位),線程個數(shù)(低29位) //默認(rèn)是RUNNING狀態(tài),線程個數(shù)為0 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); //線程個數(shù)掩碼位數(shù),并不是所有平臺int類型是32位,所以準(zhǔn)確說是具體平臺下Integer的二進制位 數(shù)-3后的剩余位數(shù)才是線程的個數(shù) private static final int COUNT_BITS = Integer.SIZE -3; //線程最大個數(shù)(低29位)00011111111111111111111111111111 private static final int CAPACITY = (1 << COUNT_BITS) -1;
線程池的主要狀態(tài)列舉如下:
//(高3位):11100000000000000000000000000000 private static final int RUNNING = -1 << COUNT_BITS; //(高3位):00000000000000000000000000000000 private static final int SHUTDOWN = 0 << COUNT_BITS; //(高3位):00100000000000000000000000000000 private static final int STOP = 1 << COUNT_BITS; //(高3位):01000000000000000000000000000000 private static final int TIDYING = 2 << COUNT_BITS; //(高3位):01100000000000000000000000000000 private static final int TERMINATED = 3 << COUNT_BITS;
線程池狀態(tài)含義:
● RUNNING:接收新任務(wù)并且處理阻塞隊列里的任務(wù)。
● SHUTDOWN:拒絕新任務(wù)但是處理阻塞隊列里的任務(wù)。
● STOP:拒絕新任務(wù)并且拋棄阻塞隊列里的任務(wù),同時中斷正在處理的任務(wù)。
● TIDYING:所有任務(wù)都執(zhí)行完(包含阻塞隊列里面任務(wù)),當(dāng)前線程池活動線程為0,將要調(diào)用terminated方法。
● TERMINATED:終止?fàn)顟B(tài)。terminated方法調(diào)用完成以后的狀態(tài)。
線程池狀態(tài)之間轉(zhuǎn)換路徑:
● RUNNING→SHUTDOWN:當(dāng)顯式調(diào)用shutdown()方法時,或者隱式調(diào)用了finalize(),它里面調(diào)用了shutdown()方法時。
● RUNNING或者SHUTDOWN→STOP:當(dāng)顯式調(diào)用shutdownNow()方法時。
● SHUTDOWN→TIDYING:當(dāng)線程池和任務(wù)隊列都為空時。
● STOP→TIDYING:當(dāng)線程池為空時。
● TIDYING→TERMINATED :當(dāng)terminated() hook方法執(zhí)行完成時。
線程池同時提供了一些方法用來獲取線程池的運行狀態(tài)和線程池中的線程個數(shù),代碼如下:
// 獲取高三位 運行狀態(tài) private static int runStateOf(int c) { return c & ~CAPACITY; } //獲取低29位 線程個數(shù) private static int workerCountOf(int c) { return c & CAPACITY; } //計算ctl新值,線程狀態(tài) 與 線程個數(shù) private static int ctlOf(int rs, int wc) { return rs | wc; }
另外線程池是可配置的,使用者可以根據(jù)自己的需要對線程池的參數(shù)進行調(diào)整,如類圖中線程池提供了可供使用者配置的參數(shù):
● corePoolSize:線程池核心線程個數(shù)。
● workQueue:用于保存等待執(zhí)行的任務(wù)的阻塞隊列,比如基于數(shù)組的有界Array-BlockingQueue、基于鏈表的無界LinkedBlockingQueue、最多只有一個元素的同步隊列SynchronousQueue、優(yōu)先級隊列PriorityBlockingQueue等。
● maximunPoolSize:線程池最大線程數(shù)量。
● threadFactory:創(chuàng)建線程的工廠類。
● defaultHandler:飽和策略,當(dāng)隊列滿了并且線程個數(shù)達到maximunPoolSize后采取的策略,比如AbortPolicy(拋出異常)、CallerRunsPolicy(使用調(diào)用者所在線程來運行任務(wù))、DiscardOldestPolicy(調(diào)用poll丟棄一個任務(wù),執(zhí)行當(dāng)前任務(wù))、DiscardPolicy(默默丟棄,不拋出異常)。
● keeyAliveTime:存活時間。如果當(dāng)前線程池中的線程數(shù)量比核心線程數(shù)量要多,并且是閑置狀態(tài)的話,這些閑置的線程能存活的最大時間。
前文圖2-1中變量mainLock是獨占鎖,用來控制新增Worker線程時的原子性,termination是該鎖對應(yīng)的條件隊列,在線程調(diào)用awaitTermination時用來存放阻塞的線程。
Worker繼承AQS和Runnable接口,是具體承載任務(wù)的對象。Worker繼承了AQS,實現(xiàn)了簡單不可重入獨占鎖,其中state=0標(biāo)示鎖未被獲取的狀態(tài),state=1標(biāo)示鎖已經(jīng)被獲取的狀態(tài),state = -1是創(chuàng)建Worker時默認(rèn)的狀態(tài)。創(chuàng)建時狀態(tài)設(shè)置為-1是為了避免該線程在運行runWorker()方法前被中斷,下面會具體講解到。其中變量firstTask記錄該工作線程執(zhí)行的第一個任務(wù),Thread是具體執(zhí)行任務(wù)的線程。
DefaultThreadFactory是線程工廠,newThread方法是對線程的一個修飾。其中,poolNumber是個靜態(tài)的原子變量,用來統(tǒng)計線程工廠的個數(shù),threadNumber用來記錄每個線程工廠創(chuàng)建了多少線程,這兩個值也作為線程池和線程的名稱的一部分。
ThreadPoolExecutor提供了一系列構(gòu)造函數(shù)讓我們創(chuàng)建線程池,比如:
ThreadPoolExecutor(int corePoolSize, //核心線程個數(shù) int maximumPoolSize, //最大線程個數(shù) long keepAliveTime, //非核心不活躍線程最大存活時間 TimeUnit unit, //keepAliveTime的單位 BlockingQueue<Runnable> workQueue, //阻塞隊列類型 ThreadFactory threadFactory, //線程池創(chuàng)建工廠 RejectedExecutionHandler handler)//拒絕策略
則當(dāng)我們需要創(chuàng)建自己的線程池時,就可以顯式地新建一個該實例出來。
2.提交任務(wù)到線程池原理解析
ThreadPoolExecutor中提交任務(wù)到線程池的方法有下面幾種,如表2-1所示。
表2-1 提交任務(wù)到線程池的方法

首先我們看方法public void execute(Runnable command)提交任務(wù)到線程池的邏輯:
public void execute(Runnable command) { //(1) 如果任務(wù)為null,則拋出NPE異常 if (command == null) throw new NullPointerException(); //(2)獲取當(dāng)前線程池的狀態(tài)+線程個數(shù)變量的組合值 int c = ctl.get(); //(3)當(dāng)前線程池線程個數(shù)是否小于corePoolSize,小于則開啟新線程運行 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } //(4)如果線程池處于RUNNING狀態(tài),則添加任務(wù)到阻塞隊列 if (isRunning(c) && workQueue.offer(command)) { //(4.1)二次檢查 int recheck = ctl.get(); //(4.2)如果當(dāng)前線程池狀態(tài)不是RUNNING則從隊列刪除任務(wù),并執(zhí)行拒絕策略 if (! isRunning(recheck) && remove(command)) reject(command); //(4.3)如果當(dāng)前線程池線程為空,則添加一個線程 else if (workerCountOf(recheck) == 0) addWorker(null, false); } //(5)如果隊列滿了,則新增線程,新增失敗則執(zhí)行拒絕策略 else if (! addWorker(command, false)) reject(command); }
● 代碼3是指如果當(dāng)前線程池線程個數(shù)小于corePoolSize,則會在調(diào)用方法addWorker新增一個核心線程執(zhí)行該任務(wù)。
● 如果當(dāng)前線程池線程個數(shù)大于等于corePoolSize則執(zhí)行代碼4,如果當(dāng)前線程池處于RUNNING狀態(tài)則添加當(dāng)前任務(wù)到任務(wù)隊列。這里需要判斷線程池狀態(tài)是因為線程池有可能已經(jīng)處于非RUNNING狀態(tài),而非RUNNING狀態(tài)下是拋棄新任務(wù)的。
● 如果任務(wù)添加任務(wù)隊列成功,則執(zhí)行代碼4.2對線程池狀態(tài)進行二次校驗,這是因為添加任務(wù)到任務(wù)隊列后,執(zhí)行代碼4.2前線程池的狀態(tài)有可能已經(jīng)變化了,如果當(dāng)前線程池狀態(tài)不是RUNNING則把任務(wù)從任務(wù)隊列移除,移除后執(zhí)行拒絕策略;如果二次校驗通過,則執(zhí)行代碼4.3重新判斷當(dāng)前線程池里面是否還有線程,如果沒有則新增一個線程。
● 如果代碼4添加任務(wù)失敗,則說明任務(wù)隊列滿了,那么執(zhí)行代碼5嘗試調(diào)用addWorker方法新開啟線程來執(zhí)行該任務(wù);如果當(dāng)前線程池的線程個數(shù)大于maximumPoolSize則addWorker返回false,執(zhí)行配置的拒絕策略。
下面我們來看public Future<? > submit(Runnable task)方法提交任務(wù)的邏輯:
public Future<? > submit(Runnable task) { // 6 NPE判斷 if (task == null) throw new NullPointerException(); // 7 包裝任務(wù)為FutureTask RunnableFuture<Void> ftask = newTaskFor(task, null); // 8 投遞到線程池執(zhí)行 execute(ftask); // 9 返回ftask return ftask; }
代碼7調(diào)用newTaskFor方法對我們提交的Runnable類型任務(wù)進行包裝,包裝為RunnableFuture類型任務(wù),然后提交RunnableFuture任務(wù)到線程池后返回ftask對象。
下面我們來看newTaskFor的代碼邏輯:
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { return new FutureTask<T>(runnable, value); }
由代碼可知,其內(nèi)部創(chuàng)建了一個FutureTask對象,構(gòu)造函數(shù)如下:
public FutureTask(Runnable runnable, V result) { //將runnable適配為Callable類型任務(wù),并且讓result作為執(zhí)行結(jié)果 this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable }
上述代碼中的FutureTask會在運行時執(zhí)行給定的Runnable,并將在任務(wù)Runnable執(zhí)行完成后,把給定的結(jié)果value通過FutureTask的get方法返回。
下面我們看public Future submit(Runnable task, T result)方法的邏輯,其代碼如下:
public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; } protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable); }
由上述代碼可知,兩個參數(shù)的submit方法類似,不同在于該方法接收的是含有返回值的Callable類型的任務(wù),最終也是轉(zhuǎn)換為FutureTask后提交到線程池,并返回。
3.線程池中任務(wù)執(zhí)行原理解析
當(dāng)用戶線程提交任務(wù)到線程池后,在線程池沒有執(zhí)行拒絕策略的情況下,用戶線程會馬上返回,而提交的任務(wù)要么直接切換到線程池中的Worker線程來執(zhí)行,要么先放入線程池的阻塞隊列里面,稍后再由Worker線程來執(zhí)行。本節(jié)我們就看下具體執(zhí)行異步任務(wù)的Worker線程是如何工作的。首先我們看下Worker的構(gòu)造函數(shù):
Worker(Runnable firstTask) { setState(-1); // 在調(diào)用runWorker前禁止中斷 this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); //創(chuàng)建一個線程 }
在上述代碼中,Worker構(gòu)造函數(shù)內(nèi)首先設(shè)置Worker的運行狀態(tài)status為-1,是為了避免當(dāng)前Worker在調(diào)用runWorker方法前被中斷(當(dāng)其他線程調(diào)用了線程池的shutdownNow時,如果Worker狀態(tài)≥0則會中斷該線程)。在前面的小節(jié)中我們講到Worker繼承了AbstractQueuedSynchronizer類,實現(xiàn)了簡單不可重入獨占鎖,其中status=0標(biāo)示鎖未被獲取的狀態(tài),state=1標(biāo)示鎖已經(jīng)被獲取的狀態(tài),state = -1是創(chuàng)建Worker時默認(rèn)的狀態(tài)。然后把傳遞的任務(wù)firstTask保存起來,最后使用線程池中指定的線程池工廠創(chuàng)建一個線程作為該Worker對象的執(zhí)行線程。
由于Worker本身實現(xiàn)了Runnable方法,所以下面我們看其run方法內(nèi)是如何執(zhí)行任務(wù)的:
public void run() { runWorker(this); //委托給runWorker方法 }
runWorker方法的代碼如下:
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); //(1)status設(shè)置為0,允許中斷 boolean completedAbruptly = true; try { //(2) while (task ! = null || (task = getTask()) ! = null) { //(2.1) w.lock(); ... try { //(2.2)任務(wù)執(zhí)行前干一些事情 beforeExecute(wt, task); Throwable thrown = null; try { task.run(); //(2.3)執(zhí)行任務(wù) } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { //(2.4)任務(wù)執(zhí)行完畢后干一些事情 afterExecute(task, thrown); } } finally { task = null; //(2.5)統(tǒng)計當(dāng)前Worker完成了多少個任務(wù) w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { //(3)執(zhí)行清工作 processWorkerExit(w, completedAbruptly); } }
如上代碼在運行runWorker的代碼1時會調(diào)用unlock方法,該方法把status變?yōu)榱?,所以這時候調(diào)用shutdownNow會中斷Worker線程。
如代碼2所示,如果當(dāng)前task==null或者調(diào)用getTask從任務(wù)隊列獲取的任務(wù)返回null,則跳轉(zhuǎn)到代碼3執(zhí)行清理工作,當(dāng)前Worker也就退出執(zhí)行了。如果task不為null則執(zhí)行代碼2.1獲取工作線程內(nèi)部持有的獨占鎖,然后執(zhí)行擴展接口代碼2.2,代碼2.3具體執(zhí)行任務(wù),代碼2.4在任務(wù)執(zhí)行完畢后做一些事情,代碼2.5統(tǒng)計當(dāng)前Worker完成了多少個任務(wù),并釋放鎖。
這里在執(zhí)行具體任務(wù)期間加鎖,是為了避免任務(wù)運行期間,其他線程調(diào)用了shutdown方法關(guān)閉線程池時中斷正在執(zhí)行任務(wù)的線程。
代碼3執(zhí)行清理任務(wù),其代碼如下:
private void processWorkerExit(Worker w, boolean completedAbruptly) { ... //(3.1)統(tǒng)計整個線程池完成的任務(wù)個數(shù),并從工作集里面刪除當(dāng)前woker final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; workers.remove(w); } finally { mainLock.unlock(); } //(3.2)嘗試設(shè)置線程池狀態(tài)為TERMINATED,如果當(dāng)前是shutdonw狀態(tài)并且工作隊列為空 //或者當(dāng)前是stop狀態(tài)且當(dāng)前線程池里面沒有活動線程 tryTerminate(); //(3.3)如果當(dāng)前線程個數(shù)小于核心個數(shù),則增加 int c = ctl.get(); if (runStateLessThan(c, STOP)) { if (! completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } addWorker(null, false); } }
代碼3.1統(tǒng)計線程池完成任務(wù)個數(shù),可知在統(tǒng)計前加了全局鎖,把當(dāng)前工作線程中完成的任務(wù)累加到全局計數(shù)器,然后從工作集中刪除當(dāng)前Worker。
代碼3.2判斷如果當(dāng)前線程池狀態(tài)是shutdown狀態(tài)并且工作隊列為空,或者當(dāng)前是stop狀態(tài)并且當(dāng)前線程池里面沒有活動線程,則設(shè)置線程池狀態(tài)為TERMINATED。
代碼3.3判斷當(dāng)前線程中的線程個數(shù)是否小于核心線程個數(shù),如果是則新增一個線程。
4.關(guān)閉線程池原理解析
線程池中有兩種模式的線程池關(guān)閉方法,如表2-2所示。
表2-2 關(guān)閉線程池的方法

首先我們來看public void shutdown()方法的代碼邏輯:
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //(1)權(quán)限檢查 checkShutdownAccess(); //(2)設(shè)置當(dāng)前線程池狀態(tài)為SHUTDOWN,如果已經(jīng)是SHUTDOWN則直接返回 advanceRunState(SHUTDOWN); //(3)設(shè)置中斷標(biāo)志 interruptIdleWorkers(); onShutdown(); } finally { mainLock.unlock(); } //(4)嘗試狀態(tài)變?yōu)門ERMINATED tryTerminate(); }
代碼1檢查如果設(shè)置了安全管理器,則看當(dāng)前調(diào)用shutdown命令的線程是否有關(guān)閉線程的權(quán)限,如果有權(quán)限則還要看調(diào)用線程是否有中斷工作線程的權(quán)限,如果沒有權(quán)限則拋出SecurityException或者NullPointerException異常。
代碼2的內(nèi)容如下,如果當(dāng)前狀態(tài)>=SHUTDOWN則直接返回,否則設(shè)置當(dāng)前狀態(tài)為SHUTDOWN:
private void advanceRunState(int targetState) { for (; ; ) { int c = ctl.get(); if (runStateAtLeast(c, targetState) || ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c)))) break; } }
代碼3的內(nèi)容如下,設(shè)置所有空閑線程的中斷標(biāo)志,這里首先加了全局鎖,同時只有一個線程可以調(diào)用shutdown設(shè)置中斷標(biāo)志。然后嘗試獲取Worker本身的鎖,獲取成功則設(shè)置中斷標(biāo)識,由于正在執(zhí)行的任務(wù)已經(jīng)獲取了鎖,所以正在執(zhí)行的任務(wù)沒有被中斷。這里中斷的是阻塞到getTask()方法,企圖從隊列里獲取任務(wù)的線程,也就是空閑線程。
private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) { Thread t = w.thread; //如果工作線程沒有被中斷,并且沒有正在運行則設(shè)置中斷 if (! t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); } }
代碼4嘗試將線程池的狀態(tài)變?yōu)門ERMINATED, tryTerminate的代碼如下:
final void tryTerminate() { for (; ; ) { ... int c = ctl.get(); ... final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try {//設(shè)置當(dāng)前線程池狀態(tài)為TIDYING if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { terminated(); } finally { //設(shè)置當(dāng)前線程池狀態(tài)為TERMINATED ctl.set(ctlOf(TERMINATED, 0)); //激活調(diào)用條件變量termination的await系列方法被阻塞的所有線程 termination.signalAll(); } return; } } finally { mainLock.unlock(); } } }
如上述代碼所示,首先使用CAS設(shè)置當(dāng)前線程池狀態(tài)為TIDYING,如果成功則執(zhí)行擴展接口terminated在線程池狀態(tài)變?yōu)門ERMINATED前做一些事情,然后設(shè)置當(dāng)前線程池狀態(tài)為TERMINATED,最后調(diào)用termination.signalAll()來激活調(diào)用線程池的awaitTermination系列方法被阻塞的所有線程。
下面我們來看public void shutdownNow()方法的代碼邏輯:
public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); //(5)權(quán)限檢查 advanceRunState(STOP); //(6) 設(shè)置線程池狀態(tài)為stop interruptWorkers(); //(7)中斷所有線程 tasks = drainQueue(); //(8)移動隊列任務(wù)到tasks } finally { mainLock.unlock(); } //(9)終止?fàn)顟B(tài) tryTerminate(); return tasks; }
首先調(diào)用代碼5檢查權(quán)限,然后調(diào)用代碼6設(shè)置當(dāng)前線程池狀態(tài)為STOP,接著執(zhí)行代碼7中斷所有的工作線程,這里需要注意的是中斷所有線程,包含空閑線程和正在執(zhí)行任務(wù)的線程:
private void interruptWorkers() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) w.interruptIfStarted(); } finally { mainLock.unlock(); } }
然后調(diào)用代碼8將當(dāng)前任務(wù)隊列的任務(wù)移動到tasks列表,代碼如下:
private List<Runnable> drainQueue() { //8.1獲取任務(wù)隊列 BlockingQueue<Runnable> q = workQueue; ArrayList<Runnable> taskList = new ArrayList<Runnable>(); //8.2 從任務(wù)隊列移除任務(wù)到taskList列表 q.drainTo(taskList); //8.3 如果q還不為空,則說明drainTo接口調(diào)用失效,則循環(huán)移除 if (! q.isEmpty()) { for (Runnable r : q.toArray(new Runnable[0])) { if (q.remove(r)) taskList.add(r); } } //8.4返回異常的任務(wù)列表 return taskList; }
由上述代碼可知,調(diào)用線程池隊列的drainTo方法把隊列中的任務(wù)移除到taskList里,如果發(fā)現(xiàn)線程池隊列還不為空(比如DelayQueue或者其他類型的隊列drainTo可能移除元素失敗),則循環(huán)移除里面的元素,最后返回移除的任務(wù)列表。
5.線程池的拒絕策略解析
線程池是通過池化少量線程來提供線程復(fù)用的,當(dāng)調(diào)用線程向線程池中投遞大量任務(wù)后,線程池可能就處于飽和狀態(tài)了。所謂飽和狀態(tài)是指當(dāng)前線程池隊列已經(jīng)滿了,并且線程池中的線程已經(jīng)達到了最大線程個數(shù)。當(dāng)線程池處于飽和狀態(tài)時,再向線程池投遞任務(wù),而對于投遞的任務(wù)如何處理,是由線程池拒絕策略決定的。拒絕策略的執(zhí)行是在execute方法,大家可以返回前面章節(jié)查看。
線程池中提供了RejectedExecutionHandler接口,用來提供對線程池拒絕策略的抽象,其定義如下:
public interface RejectedExecutionHandler { void rejectedExecution(Runnable r, ThreadPoolExecutor executor); }
線程池中提供了一系列該接口的實現(xiàn)供我們使用,如表2-3所示。
表2-3 線程池提供的拒絕策略

首先我們看下AbortPolicy策略的代碼:
public static class AbortPolicy implements RejectedExecutionHandler { public AbortPolicy() { } /** * 拋出RejectedExecutionException. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task * @throws RejectedExecutionException always */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); } }
由上述代碼可知,該拒絕策略執(zhí)行時會直接向調(diào)用線程拋出RejectedExecutionException異常,并丟失提交的任務(wù)r。
然后我們看下CallerRunsPolicy策略的代碼:
public static class CallerRunsPolicy implements RejectedExecutionHandler { public CallerRunsPolicy() { } /** * 使用調(diào)用線程執(zhí)行任務(wù)r * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (! e.isShutdown()) { r.run(); } } }
分析上述代碼,該拒絕策略執(zhí)行時,如果線程池沒有被關(guān)閉,則會直接使用調(diào)用線程執(zhí)行提交的任務(wù)r,否則默默丟棄該任務(wù)。
然后我們看下DiscardPolicy策略的代碼:
public static class DiscardPolicy implements RejectedExecutionHandler { public DiscardPolicy() { } /** * 什么都不做,默默丟棄提交的任務(wù) * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } }
該拒絕策略執(zhí)行時,什么都不做,默默丟棄提交的任務(wù)。
最后我們看下DiscardOldestPolicy策略的代碼:
public static class DiscardOldestPolicy implements RejectedExecutionHandler { public DiscardOldestPolicy() { } /** * 丟棄線程池隊列里面最老的任務(wù),并把當(dāng)前任務(wù)提交到線程池 * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (! e.isShutdown()) { e.getQueue().poll(); //移除隊首元素 e.execute(r); //提交任務(wù)r到線程池執(zhí)行 } } }
該拒絕策略首先會丟棄線程池隊列里面最老的任務(wù),然后把當(dāng)前任務(wù)r提交到線程池。
- 深入淺出Electron:原理、工程與實踐
- Learn Swift by Building Applications
- 高級C/C++編譯技術(shù)(典藏版)
- 實戰(zhàn)Java高并發(fā)程序設(shè)計(第3版)
- C++對象模型詳解
- Visual Basic程序設(shè)計
- Java 從入門到項目實踐(超值版)
- 遠方:兩位持續(xù)創(chuàng)業(yè)者的點滴思考
- 貫通Tomcat開發(fā)
- 超簡單:用Python讓Excel飛起來(實戰(zhàn)150例)
- 實驗編程:PsychoPy從入門到精通
- Spring Boot學(xué)習(xí)指南:構(gòu)建云原生Java和Kotlin應(yīng)用程序
- Docker on Windows
- 輕松學(xué)Scratch 3.0 少兒編程(全彩)
- Real-time Web Application Development using Vert.x 2.0