官术网_书友最值得收藏!

2.2 顯式使用線程池實現(xiàn)異步編程

2.2.1 如何顯式使用線程池實現(xiàn)異步編程

在Java中我們可以使用線程池來實現(xiàn)線程復(fù)用,每當(dāng)我們需要執(zhí)行異步任務(wù)時,可以把任務(wù)投遞到線程池里進行異步執(zhí)行。我們可以修改上節(jié)的代碼,使用線程池來執(zhí)行異步任務(wù),修改后代碼如下:

            // 0自定義線程池
            private final static int AVALIABLE_PROCESSORS = Runtime.getRuntime().
        availableProcessors();
            private final static ThreadPoolExecutor POOL_EXECUTOR = new
        ThreadPoolExecutor(AVALIABLE_PROCESSORS,
                    AVALIABLE_PROCESSORS * 2, 1, TimeUnit.MINUTES, new
        LinkedBlockingQueue<>(5),
                    new ThreadPoolExecutor.CallerRunsPolicy());

            public static void main(String[] args) throws InterruptedException,
        ExecutionException {

                long start = System.currentTimeMillis();

                // 1.開啟異步單元執(zhí)行任務(wù)A
                POOL_EXECUTOR.execute(() -> {
                    try {
                        doSomethingA();

                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                });
                  // 2.執(zhí)行任務(wù)B
                  doSomethingB();

                  // 3.同步等待線程A運行結(jié)束
                  System.out.println(System.currentTimeMillis() - start);

                  // 4.掛起當(dāng)前線程
                  Thread.currentThread().join();
              }

上面代碼0創(chuàng)建了一個線程池,這里我們設(shè)置線程池核心線程個數(shù)為當(dāng)前物理機的CPU核數(shù),最大線程個數(shù)為當(dāng)前物理機CPU核數(shù)的2倍;設(shè)置線程池阻塞隊列的大小為5;需要注意的是,我們將線程池的拒絕策略設(shè)置為CallerRunsPolicy,即當(dāng)線程池任務(wù)飽和,執(zhí)行拒絕策略時不會丟棄新的任務(wù),而是會使用調(diào)用線程來執(zhí)行;另外我們使用了命名的線程創(chuàng)建工廠,以便排查問題時可以方便追溯是哪個相關(guān)業(yè)務(wù)。

創(chuàng)建完線程池后,代碼1則把異步任務(wù)提交到了線程池內(nèi)運行,而不是直接開啟一個新線程來運行;這里使用線程池起到了復(fù)用線程的作用,避免了線程的頻繁創(chuàng)建與銷毀,另外對線程個數(shù)也起到了限制作用。

其實通過上面代碼我們可以進一步釋放main線程的負(fù)擔(dān),也就是可以把任務(wù)doSomethingB的執(zhí)行也提交到線程池內(nèi)進行異步執(zhí)行,代碼如下:

            // 0自定義線程池
            private final static int AVALIABLE_PROCESSORS = Runtime.getRuntime().
        availableProcessors();
            private final static ThreadPoolExecutor POOL_EXECUTOR = new
        ThreadPoolExecutor(AVALIABLE_PROCESSORS,
                    AVALIABLE_PROCESSORS * 2, 1, TimeUnit.MINUTES, new
        LinkedBlockingQueue<>(5),
                    new ThreadPoolExecutor.CallerRunsPolicy());

            public static void main(String[] args) throws InterruptedException,
        ExecutionException {

                long start = System.currentTimeMillis();

                // 1.開啟異步單元執(zhí)行任務(wù)A
                POOL_EXECUTOR.execute(() -> {
                    try {
                            doSomethingA();

                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    });

                    // 2.執(zhí)行任務(wù)B
                    POOL_EXECUTOR.execute(() -> {
                        try {
                            doSomethingB();

                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    });

                    // 3.同步等待線程A運行結(jié)束
                    System.out.println(System.currentTimeMillis() - start);

                    // 4.掛起當(dāng)前線程
                    Thread.currentThread().join();
                }

如上面代碼所示,main函數(shù)所在線程只需要把兩個任務(wù)提交到線程池后就可以做自己的事情了,具體兩個任務(wù)是由線程池中的線程執(zhí)行。

上面演示了向線程池內(nèi)投遞異步任務(wù)并沒有返回值的情況,其實我們可以向線程池投遞一個Callable類型的異步任務(wù),并且獲取其執(zhí)行結(jié)果,代碼如下:

        public class AsyncThreadPoolExample {

            public static String doSomethingA() {

                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("--- doSomethingA---");
                return "A Task Done";
            }
            // 0自定義線程池
            private final static int AVALIABLE_PROCESSORS = Runtime.getRuntime().
        availableProcessors();
            private final static ThreadPoolExecutor POOL_EXECUTOR = new
        ThreadPoolExecutor(AVALIABLE_PROCESSORS,
                    AVALIABLE_PROCESSORS * 2, 1, TimeUnit.MINUTES, new
        LinkedBlockingQueue<>(5),
                    new NamedThreadFactory("ASYNC-POOL"), new ThreadPoolExecutor.
        CallerRunsPolicy());

            public static void main(String[] args) throws InterruptedException,
        ExecutionException {

                // 1.開啟異步單元執(zhí)行任務(wù)A
                Future<? > resultA = POOL_EXECUTOR.submit(() -> doSomethingA());

                // 2.同步等待執(zhí)行結(jié)果
                System.out.println(resultA.get());
            }
        }

如上面代碼所示,doSomethingA方法具有String類型的返回值,代碼0創(chuàng)建了一個線程池,在main方法中,代碼1使用lambda表達式將Callable類型的任務(wù)提交到線程池,提交后會馬上返回一個Future對象,代碼2在futureA上調(diào)用get()方法阻塞等待異步任務(wù)的執(zhí)行結(jié)果。

如上代碼確實可以在main函數(shù)所在線程獲取到異步任務(wù)的執(zhí)行結(jié)果,但是main線程必須以阻塞的代價來獲取結(jié)果,在異步任務(wù)執(zhí)行完畢前,main函數(shù)所在線程就不能做其他事情了,這顯然不是我們所需要的,具體怎么解決這個問題,下章我們會具體講解。

2.2.2 線程池ThreadPoolExecutor原理剖析

1.概述

線程池作為異步執(zhí)行的利器,我們有必要講解下其內(nèi)部實現(xiàn),以便讓大家對異步編程有更深入的理解。首先我們看下其類圖結(jié)構(gòu)圖,如圖2-1所示。

圖2-1 線程池類圖結(jié)構(gòu)

如圖2-1所示,成員變量ctl是Integer的原子變量,使用一個變量同時記錄線程池狀態(tài)和線程池中線程個數(shù),假設(shè)計算機硬件的Integer類型是32位二進制標(biāo)示,如下面代碼所示,其中高3位用來表示線程池狀態(tài),后面29位用來記錄線程池線程個數(shù):

        //用來標(biāo)記線程池狀態(tài)(高3位),線程個數(shù)(低29位)
        //默認(rèn)是RUNNING狀態(tài),線程個數(shù)為0
        private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

        //線程個數(shù)掩碼位數(shù),并不是所有平臺int類型是32位,所以準(zhǔn)確說是具體平臺下Integer的二進制位
        數(shù)-3后的剩余位數(shù)才是線程的個數(shù)
        private static final int COUNT_BITS = Integer.SIZE -3;

        //線程最大個數(shù)(低29位)00011111111111111111111111111111
        private static final int CAPACITY    = (1 << COUNT_BITS) -1;

線程池的主要狀態(tài)列舉如下:

        //(高3位):11100000000000000000000000000000
        private static final int RUNNING     = -1 << COUNT_BITS;

        //(高3位):00000000000000000000000000000000
        private static final int SHUTDOWN    =   0 << COUNT_BITS;

        //(高3位):00100000000000000000000000000000
        private static final int STOP         =   1 << COUNT_BITS;
        //(高3位):01000000000000000000000000000000
        private static final int TIDYING     =   2 << COUNT_BITS;

        //(高3位):01100000000000000000000000000000
        private static final int TERMINATED =   3 << COUNT_BITS;

線程池狀態(tài)含義:

● RUNNING:接收新任務(wù)并且處理阻塞隊列里的任務(wù)。

● SHUTDOWN:拒絕新任務(wù)但是處理阻塞隊列里的任務(wù)。

● STOP:拒絕新任務(wù)并且拋棄阻塞隊列里的任務(wù),同時中斷正在處理的任務(wù)。

● TIDYING:所有任務(wù)都執(zhí)行完(包含阻塞隊列里面任務(wù)),當(dāng)前線程池活動線程為0,將要調(diào)用terminated方法。

● TERMINATED:終止?fàn)顟B(tài)。terminated方法調(diào)用完成以后的狀態(tài)。

線程池狀態(tài)之間轉(zhuǎn)換路徑:

● RUNNING→SHUTDOWN:當(dāng)顯式調(diào)用shutdown()方法時,或者隱式調(diào)用了finalize(),它里面調(diào)用了shutdown()方法時。

● RUNNING或者SHUTDOWN→STOP:當(dāng)顯式調(diào)用shutdownNow()方法時。

● SHUTDOWN→TIDYING:當(dāng)線程池和任務(wù)隊列都為空時。

● STOP→TIDYING:當(dāng)線程池為空時。

● TIDYING→TERMINATED :當(dāng)terminated() hook方法執(zhí)行完成時。

線程池同時提供了一些方法用來獲取線程池的運行狀態(tài)和線程池中的線程個數(shù),代碼如下:

        // 獲取高三位 運行狀態(tài)
        private static int runStateOf(int c)      { return c & ~CAPACITY; }

        //獲取低29位 線程個數(shù)
        private static int workerCountOf(int c)   { return c & CAPACITY; }

        //計算ctl新值,線程狀態(tài) 與 線程個數(shù)
        private static int ctlOf(int rs, int wc) { return rs | wc; }

另外線程池是可配置的,使用者可以根據(jù)自己的需要對線程池的參數(shù)進行調(diào)整,如類圖中線程池提供了可供使用者配置的參數(shù):

● corePoolSize:線程池核心線程個數(shù)。

● workQueue:用于保存等待執(zhí)行的任務(wù)的阻塞隊列,比如基于數(shù)組的有界Array-BlockingQueue、基于鏈表的無界LinkedBlockingQueue、最多只有一個元素的同步隊列SynchronousQueue、優(yōu)先級隊列PriorityBlockingQueue等。

● maximunPoolSize:線程池最大線程數(shù)量。

● threadFactory:創(chuàng)建線程的工廠類。

● defaultHandler:飽和策略,當(dāng)隊列滿了并且線程個數(shù)達到maximunPoolSize后采取的策略,比如AbortPolicy(拋出異常)、CallerRunsPolicy(使用調(diào)用者所在線程來運行任務(wù))、DiscardOldestPolicy(調(diào)用poll丟棄一個任務(wù),執(zhí)行當(dāng)前任務(wù))、DiscardPolicy(默默丟棄,不拋出異常)。

● keeyAliveTime:存活時間。如果當(dāng)前線程池中的線程數(shù)量比核心線程數(shù)量要多,并且是閑置狀態(tài)的話,這些閑置的線程能存活的最大時間。

前文圖2-1中變量mainLock是獨占鎖,用來控制新增Worker線程時的原子性,termination是該鎖對應(yīng)的條件隊列,在線程調(diào)用awaitTermination時用來存放阻塞的線程。

Worker繼承AQS和Runnable接口,是具體承載任務(wù)的對象。Worker繼承了AQS,實現(xiàn)了簡單不可重入獨占鎖,其中state=0標(biāo)示鎖未被獲取的狀態(tài),state=1標(biāo)示鎖已經(jīng)被獲取的狀態(tài),state = -1是創(chuàng)建Worker時默認(rèn)的狀態(tài)。創(chuàng)建時狀態(tài)設(shè)置為-1是為了避免該線程在運行runWorker()方法前被中斷,下面會具體講解到。其中變量firstTask記錄該工作線程執(zhí)行的第一個任務(wù),Thread是具體執(zhí)行任務(wù)的線程。

DefaultThreadFactory是線程工廠,newThread方法是對線程的一個修飾。其中,poolNumber是個靜態(tài)的原子變量,用來統(tǒng)計線程工廠的個數(shù),threadNumber用來記錄每個線程工廠創(chuàng)建了多少線程,這兩個值也作為線程池和線程的名稱的一部分。

ThreadPoolExecutor提供了一系列構(gòu)造函數(shù)讓我們創(chuàng)建線程池,比如:

        ThreadPoolExecutor(int corePoolSize, //核心線程個數(shù)
                            int maximumPoolSize, //最大線程個數(shù)
                            long keepAliveTime, //非核心不活躍線程最大存活時間
                            TimeUnit unit, //keepAliveTime的單位
                            BlockingQueue<Runnable> workQueue, //阻塞隊列類型
                            ThreadFactory threadFactory, //線程池創(chuàng)建工廠
                            RejectedExecutionHandler handler)//拒絕策略

則當(dāng)我們需要創(chuàng)建自己的線程池時,就可以顯式地新建一個該實例出來。

2.提交任務(wù)到線程池原理解析

ThreadPoolExecutor中提交任務(wù)到線程池的方法有下面幾種,如表2-1所示。

表2-1 提交任務(wù)到線程池的方法

首先我們看方法public void execute(Runnable command)提交任務(wù)到線程池的邏輯:

        public void execute(Runnable command) {

            //(1) 如果任務(wù)為null,則拋出NPE異常
            if (command == null)
                throw new NullPointerException();

            //(2)獲取當(dāng)前線程池的狀態(tài)+線程個數(shù)變量的組合值
            int c = ctl.get();

            //(3)當(dāng)前線程池線程個數(shù)是否小于corePoolSize,小于則開啟新線程運行
            if (workerCountOf(c) < corePoolSize) {
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
        //(4)如果線程池處于RUNNING狀態(tài),則添加任務(wù)到阻塞隊列
        if (isRunning(c) && workQueue.offer(command)) {

            //(4.1)二次檢查
            int recheck = ctl.get();
            //(4.2)如果當(dāng)前線程池狀態(tài)不是RUNNING則從隊列刪除任務(wù),并執(zhí)行拒絕策略
            if (! isRunning(recheck) && remove(command))
                reject(command);

            //(4.3)如果當(dāng)前線程池線程為空,則添加一個線程
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //(5)如果隊列滿了,則新增線程,新增失敗則執(zhí)行拒絕策略
        else if (! addWorker(command, false))
            reject(command);
    }

● 代碼3是指如果當(dāng)前線程池線程個數(shù)小于corePoolSize,則會在調(diào)用方法addWorker新增一個核心線程執(zhí)行該任務(wù)。

● 如果當(dāng)前線程池線程個數(shù)大于等于corePoolSize則執(zhí)行代碼4,如果當(dāng)前線程池處于RUNNING狀態(tài)則添加當(dāng)前任務(wù)到任務(wù)隊列。這里需要判斷線程池狀態(tài)是因為線程池有可能已經(jīng)處于非RUNNING狀態(tài),而非RUNNING狀態(tài)下是拋棄新任務(wù)的。

● 如果任務(wù)添加任務(wù)隊列成功,則執(zhí)行代碼4.2對線程池狀態(tài)進行二次校驗,這是因為添加任務(wù)到任務(wù)隊列后,執(zhí)行代碼4.2前線程池的狀態(tài)有可能已經(jīng)變化了,如果當(dāng)前線程池狀態(tài)不是RUNNING則把任務(wù)從任務(wù)隊列移除,移除后執(zhí)行拒絕策略;如果二次校驗通過,則執(zhí)行代碼4.3重新判斷當(dāng)前線程池里面是否還有線程,如果沒有則新增一個線程。

● 如果代碼4添加任務(wù)失敗,則說明任務(wù)隊列滿了,那么執(zhí)行代碼5嘗試調(diào)用addWorker方法新開啟線程來執(zhí)行該任務(wù);如果當(dāng)前線程池的線程個數(shù)大于maximumPoolSize則addWorker返回false,執(zhí)行配置的拒絕策略。

下面我們來看public Future<? > submit(Runnable task)方法提交任務(wù)的邏輯:

    public Future<? > submit(Runnable task) {
        // 6 NPE判斷
              if (task == null) throw new NullPointerException();
              // 7 包裝任務(wù)為FutureTask
              RunnableFuture<Void> ftask = newTaskFor(task, null);
              // 8 投遞到線程池執(zhí)行
              execute(ftask);
              // 9 返回ftask
              return ftask;
          }

代碼7調(diào)用newTaskFor方法對我們提交的Runnable類型任務(wù)進行包裝,包裝為RunnableFuture類型任務(wù),然后提交RunnableFuture任務(wù)到線程池后返回ftask對象。

下面我們來看newTaskFor的代碼邏輯:

        protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
            return new FutureTask<T>(runnable, value);
        }

由代碼可知,其內(nèi)部創(chuàng)建了一個FutureTask對象,構(gòu)造函數(shù)如下:

              public FutureTask(Runnable runnable, V result) {
                  //將runnable適配為Callable類型任務(wù),并且讓result作為執(zhí)行結(jié)果
                  this.callable = Executors.callable(runnable, result);
                  this.state = NEW;         // ensure visibility of callable
              }

上述代碼中的FutureTask會在運行時執(zhí)行給定的Runnable,并將在任務(wù)Runnable執(zhí)行完成后,把給定的結(jié)果value通過FutureTask的get方法返回。

下面我們看public Future submit(Runnable task, T result)方法的邏輯,其代碼如下:

        public <T> Future<T> submit(Callable<T> task) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<T> ftask = newTaskFor(task);
            execute(ftask);
            return ftask;
        }

        protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
            return new FutureTask<T>(callable);
        }

由上述代碼可知,兩個參數(shù)的submit方法類似,不同在于該方法接收的是含有返回值的Callable類型的任務(wù),最終也是轉(zhuǎn)換為FutureTask后提交到線程池,并返回。

3.線程池中任務(wù)執(zhí)行原理解析

當(dāng)用戶線程提交任務(wù)到線程池后,在線程池沒有執(zhí)行拒絕策略的情況下,用戶線程會馬上返回,而提交的任務(wù)要么直接切換到線程池中的Worker線程來執(zhí)行,要么先放入線程池的阻塞隊列里面,稍后再由Worker線程來執(zhí)行。本節(jié)我們就看下具體執(zhí)行異步任務(wù)的Worker線程是如何工作的。首先我們看下Worker的構(gòu)造函數(shù):

        Worker(Runnable firstTask) {
            setState(-1); // 在調(diào)用runWorker前禁止中斷
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this); //創(chuàng)建一個線程
        }

在上述代碼中,Worker構(gòu)造函數(shù)內(nèi)首先設(shè)置Worker的運行狀態(tài)status為-1,是為了避免當(dāng)前Worker在調(diào)用runWorker方法前被中斷(當(dāng)其他線程調(diào)用了線程池的shutdownNow時,如果Worker狀態(tài)≥0則會中斷該線程)。在前面的小節(jié)中我們講到Worker繼承了AbstractQueuedSynchronizer類,實現(xiàn)了簡單不可重入獨占鎖,其中status=0標(biāo)示鎖未被獲取的狀態(tài),state=1標(biāo)示鎖已經(jīng)被獲取的狀態(tài),state = -1是創(chuàng)建Worker時默認(rèn)的狀態(tài)。然后把傳遞的任務(wù)firstTask保存起來,最后使用線程池中指定的線程池工廠創(chuàng)建一個線程作為該Worker對象的執(zhí)行線程。

由于Worker本身實現(xiàn)了Runnable方法,所以下面我們看其run方法內(nèi)是如何執(zhí)行任務(wù)的:

                  public void run() {
                      runWorker(this); //委托給runWorker方法
                  }

runWorker方法的代碼如下:

        final void runWorker(Worker w) {
                Thread wt = Thread.currentThread();
                Runnable task = w.firstTask;
                  w.firstTask = null;
                  w.unlock(); //(1)status設(shè)置為0,允許中斷
                  boolean completedAbruptly = true;
                  try {
                      //(2)
                      while (task ! = null || (task = getTask()) ! = null) {

                            //(2.1)
                          w.lock();
                          ...
                          try {
                              //(2.2)任務(wù)執(zhí)行前干一些事情
                              beforeExecute(wt, task);
                              Throwable thrown = null;
                              try {
                                  task.run(); //(2.3)執(zhí)行任務(wù)
                              } catch (RuntimeException x) {
                                  thrown = x; throw x;
                              } catch (Error x) {
                                  thrown = x; throw x;
                              } catch (Throwable x) {
                                  thrown = x; throw new Error(x);
                              } finally {
                                  //(2.4)任務(wù)執(zhí)行完畢后干一些事情
                                  afterExecute(task, thrown);
                              }
                          } finally {
                              task = null;
                              //(2.5)統(tǒng)計當(dāng)前Worker完成了多少個任務(wù)
                              w.completedTasks++;
                              w.unlock();
                          }
                      }
                      completedAbruptly = false;
                  } finally {
                      //(3)執(zhí)行清工作
                      processWorkerExit(w, completedAbruptly);
                  }
              }

如上代碼在運行runWorker的代碼1時會調(diào)用unlock方法,該方法把status變?yōu)榱?,所以這時候調(diào)用shutdownNow會中斷Worker線程。

如代碼2所示,如果當(dāng)前task==null或者調(diào)用getTask從任務(wù)隊列獲取的任務(wù)返回null,則跳轉(zhuǎn)到代碼3執(zhí)行清理工作,當(dāng)前Worker也就退出執(zhí)行了。如果task不為null則執(zhí)行代碼2.1獲取工作線程內(nèi)部持有的獨占鎖,然后執(zhí)行擴展接口代碼2.2,代碼2.3具體執(zhí)行任務(wù),代碼2.4在任務(wù)執(zhí)行完畢后做一些事情,代碼2.5統(tǒng)計當(dāng)前Worker完成了多少個任務(wù),并釋放鎖。

這里在執(zhí)行具體任務(wù)期間加鎖,是為了避免任務(wù)運行期間,其他線程調(diào)用了shutdown方法關(guān)閉線程池時中斷正在執(zhí)行任務(wù)的線程。

代碼3執(zhí)行清理任務(wù),其代碼如下:

        private void processWorkerExit(Worker w, boolean completedAbruptly) {
              ...

            //(3.1)統(tǒng)計整個線程池完成的任務(wù)個數(shù),并從工作集里面刪除當(dāng)前woker
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                completedTaskCount += w.completedTasks;
                workers.remove(w);
            } finally {
                mainLock.unlock();
            }

            //(3.2)嘗試設(shè)置線程池狀態(tài)為TERMINATED,如果當(dāng)前是shutdonw狀態(tài)并且工作隊列為空
            //或者當(dāng)前是stop狀態(tài)且當(dāng)前線程池里面沒有活動線程
            tryTerminate();

            //(3.3)如果當(dāng)前線程個數(shù)小于核心個數(shù),則增加
            int c = ctl.get();
            if (runStateLessThan(c, STOP)) {
                if (! completedAbruptly) {
                    int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                    if (min == 0 && ! workQueue.isEmpty())
                        min = 1;
                    if (workerCountOf(c) >= min)
                        return; // replacement not needed
                }
                addWorker(null, false);
            }
        }

代碼3.1統(tǒng)計線程池完成任務(wù)個數(shù),可知在統(tǒng)計前加了全局鎖,把當(dāng)前工作線程中完成的任務(wù)累加到全局計數(shù)器,然后從工作集中刪除當(dāng)前Worker。

代碼3.2判斷如果當(dāng)前線程池狀態(tài)是shutdown狀態(tài)并且工作隊列為空,或者當(dāng)前是stop狀態(tài)并且當(dāng)前線程池里面沒有活動線程,則設(shè)置線程池狀態(tài)為TERMINATED。

代碼3.3判斷當(dāng)前線程中的線程個數(shù)是否小于核心線程個數(shù),如果是則新增一個線程。

4.關(guān)閉線程池原理解析

線程池中有兩種模式的線程池關(guān)閉方法,如表2-2所示。

表2-2 關(guān)閉線程池的方法

首先我們來看public void shutdown()方法的代碼邏輯:

        public void shutdown() {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                //(1)權(quán)限檢查
                checkShutdownAccess();

                //(2)設(shè)置當(dāng)前線程池狀態(tài)為SHUTDOWN,如果已經(jīng)是SHUTDOWN則直接返回
                advanceRunState(SHUTDOWN);

                //(3)設(shè)置中斷標(biāo)志
                interruptIdleWorkers();
                onShutdown();
            } finally {
                  mainLock.unlock();
              }
              //(4)嘗試狀態(tài)變?yōu)門ERMINATED
              tryTerminate();
          }

代碼1檢查如果設(shè)置了安全管理器,則看當(dāng)前調(diào)用shutdown命令的線程是否有關(guān)閉線程的權(quán)限,如果有權(quán)限則還要看調(diào)用線程是否有中斷工作線程的權(quán)限,如果沒有權(quán)限則拋出SecurityException或者NullPointerException異常。

代碼2的內(nèi)容如下,如果當(dāng)前狀態(tài)>=SHUTDOWN則直接返回,否則設(shè)置當(dāng)前狀態(tài)為SHUTDOWN:

        private void advanceRunState(int targetState) {
            for (; ; ) {
                int c = ctl.get();
                if (runStateAtLeast(c, targetState) ||
                    ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
                    break;
            }
        }

代碼3的內(nèi)容如下,設(shè)置所有空閑線程的中斷標(biāo)志,這里首先加了全局鎖,同時只有一個線程可以調(diào)用shutdown設(shè)置中斷標(biāo)志。然后嘗試獲取Worker本身的鎖,獲取成功則設(shè)置中斷標(biāo)識,由于正在執(zhí)行的任務(wù)已經(jīng)獲取了鎖,所以正在執(zhí)行的任務(wù)沒有被中斷。這里中斷的是阻塞到getTask()方法,企圖從隊列里獲取任務(wù)的線程,也就是空閑線程。

        private void interruptIdleWorkers(boolean onlyOne) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                for (Worker w : workers) {
                    Thread t = w.thread;
                    //如果工作線程沒有被中斷,并且沒有正在運行則設(shè)置中斷
                    if (! t.isInterrupted() && w.tryLock()) {
                        try {
                            t.interrupt();
                        } catch (SecurityException ignore) {
        } finally {
            w.unlock();
        }
    }
    if (onlyOne)
        break;
}
} finally {
mainLock.unlock();
}
}

代碼4嘗試將線程池的狀態(tài)變?yōu)門ERMINATED, tryTerminate的代碼如下:

        final void tryTerminate() {
                for (; ; ) {
                    ...
                    int c = ctl.get();
                    ...

                    final ReentrantLock mainLock = this.mainLock;
                    mainLock.lock();
                    try {//設(shè)置當(dāng)前線程池狀態(tài)為TIDYING
                        if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                            try {
                                  terminated();
                            } finally {
                                  //設(shè)置當(dāng)前線程池狀態(tài)為TERMINATED
                                  ctl.set(ctlOf(TERMINATED, 0));
                                  //激活調(diào)用條件變量termination的await系列方法被阻塞的所有線程
                                  termination.signalAll();
                            }
                            return;
                        }
                    } finally {
                        mainLock.unlock();
                    }
                }
            }

如上述代碼所示,首先使用CAS設(shè)置當(dāng)前線程池狀態(tài)為TIDYING,如果成功則執(zhí)行擴展接口terminated在線程池狀態(tài)變?yōu)門ERMINATED前做一些事情,然后設(shè)置當(dāng)前線程池狀態(tài)為TERMINATED,最后調(diào)用termination.signalAll()來激活調(diào)用線程池的awaitTermination系列方法被阻塞的所有線程。

下面我們來看public void shutdownNow()方法的代碼邏輯:

        public List<Runnable> shutdownNow() {

            List<Runnable> tasks;
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                checkShutdownAccess(); //(5)權(quán)限檢查
                advanceRunState(STOP); //(6) 設(shè)置線程池狀態(tài)為stop
                interruptWorkers(); //(7)中斷所有線程
                tasks = drainQueue(); //(8)移動隊列任務(wù)到tasks
            } finally {
                mainLock.unlock();
            }
            //(9)終止?fàn)顟B(tài)
            tryTerminate();
            return tasks;
        }

首先調(diào)用代碼5檢查權(quán)限,然后調(diào)用代碼6設(shè)置當(dāng)前線程池狀態(tài)為STOP,接著執(zhí)行代碼7中斷所有的工作線程,這里需要注意的是中斷所有線程,包含空閑線程和正在執(zhí)行任務(wù)的線程:

              private void interruptWorkers() {
                  final ReentrantLock mainLock = this.mainLock;
                  mainLock.lock();
                  try {
                      for (Worker w : workers)
                          w.interruptIfStarted();
                  } finally {
                      mainLock.unlock();
                  }
              }

然后調(diào)用代碼8將當(dāng)前任務(wù)隊列的任務(wù)移動到tasks列表,代碼如下:

            private List<Runnable> drainQueue() {
              //8.1獲取任務(wù)隊列
              BlockingQueue<Runnable> q = workQueue;
              ArrayList<Runnable> taskList = new ArrayList<Runnable>();
              //8.2 從任務(wù)隊列移除任務(wù)到taskList列表
              q.drainTo(taskList);
              //8.3 如果q還不為空,則說明drainTo接口調(diào)用失效,則循環(huán)移除
              if (! q.isEmpty()) {
                  for (Runnable r : q.toArray(new Runnable[0])) {
                      if (q.remove(r))
                          taskList.add(r);
                  }
              }
              //8.4返回異常的任務(wù)列表
              return taskList;
          }

由上述代碼可知,調(diào)用線程池隊列的drainTo方法把隊列中的任務(wù)移除到taskList里,如果發(fā)現(xiàn)線程池隊列還不為空(比如DelayQueue或者其他類型的隊列drainTo可能移除元素失敗),則循環(huán)移除里面的元素,最后返回移除的任務(wù)列表。

5.線程池的拒絕策略解析

線程池是通過池化少量線程來提供線程復(fù)用的,當(dāng)調(diào)用線程向線程池中投遞大量任務(wù)后,線程池可能就處于飽和狀態(tài)了。所謂飽和狀態(tài)是指當(dāng)前線程池隊列已經(jīng)滿了,并且線程池中的線程已經(jīng)達到了最大線程個數(shù)。當(dāng)線程池處于飽和狀態(tài)時,再向線程池投遞任務(wù),而對于投遞的任務(wù)如何處理,是由線程池拒絕策略決定的。拒絕策略的執(zhí)行是在execute方法,大家可以返回前面章節(jié)查看。

線程池中提供了RejectedExecutionHandler接口,用來提供對線程池拒絕策略的抽象,其定義如下:

        public interface RejectedExecutionHandler {
            void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
        }

線程池中提供了一系列該接口的實現(xiàn)供我們使用,如表2-3所示。

表2-3 線程池提供的拒絕策略

首先我們看下AbortPolicy策略的代碼:

        public static class AbortPolicy implements RejectedExecutionHandler {
            public AbortPolicy() { }
            /**
              * 拋出RejectedExecutionException.
              *
              * @param r the runnable task requested to be executed
              * @param e the executor attempting to execute this task
              * @throws RejectedExecutionException always
              */
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                throw new RejectedExecutionException("Task " + r.toString() +
                                                      " rejected from " +
                                                      e.toString());
            }
        }

由上述代碼可知,該拒絕策略執(zhí)行時會直接向調(diào)用線程拋出RejectedExecutionException異常,并丟失提交的任務(wù)r。

然后我們看下CallerRunsPolicy策略的代碼:

        public static class CallerRunsPolicy implements RejectedExecutionHandler {

            public CallerRunsPolicy() { }

            /**
              * 使用調(diào)用線程執(zhí)行任務(wù)r
              *
              * @param r the runnable task requested to be executed
              * @param e the executor attempting to execute this task
              */
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                if (! e.isShutdown()) {
                        r.run();
                    }
                }
            }

分析上述代碼,該拒絕策略執(zhí)行時,如果線程池沒有被關(guān)閉,則會直接使用調(diào)用線程執(zhí)行提交的任務(wù)r,否則默默丟棄該任務(wù)。

然后我們看下DiscardPolicy策略的代碼:

        public static class DiscardPolicy implements RejectedExecutionHandler {

            public DiscardPolicy() { }

            /**
              * 什么都不做,默默丟棄提交的任務(wù)
              *
              * @param r the runnable task requested to be executed
              * @param e the executor attempting to execute this task
              */
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            }
        }

該拒絕策略執(zhí)行時,什么都不做,默默丟棄提交的任務(wù)。

最后我們看下DiscardOldestPolicy策略的代碼:

              public static class DiscardOldestPolicy implements RejectedExecutionHandler {

                  public DiscardOldestPolicy() { }

                  /**
                    * 丟棄線程池隊列里面最老的任務(wù),并把當(dāng)前任務(wù)提交到線程池
                    * @param r the runnable task requested to be executed
                    * @param e the executor attempting to execute this task
                    */
                  public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                      if (! e.isShutdown()) {
                          e.getQueue().poll(); //移除隊首元素
                          e.execute(r); //提交任務(wù)r到線程池執(zhí)行
                      }
                  }
              }

該拒絕策略首先會丟棄線程池隊列里面最老的任務(wù),然后把當(dāng)前任務(wù)r提交到線程池。

主站蜘蛛池模板: 嵊州市| 哈尔滨市| 黑河市| 武陟县| 蒙阴县| 望谟县| 罗定市| 潢川县| 确山县| 和政县| 库尔勒市| 台安县| 阿巴嘎旗| 望谟县| 莎车县| 宁乡县| 获嘉县| 岑巩县| 资中县| 达尔| 马山县| 方正县| 海原县| 鹿邑县| 上虞市| 信丰县| 高青县| 渝北区| 哈密市| 镇原县| 上饶市| 武夷山市| 垫江县| 喀喇沁旗| 合作市| 金塔县| 海晏县| 泸溪县| 临沂市| 西畴县| 甘孜县|