官术网_书友最值得收藏!

3.5 BlockingQueue

在多線程環境中,經常會用到“生產者-消費者”模式,負責生產的線程要把數據交給負責消費的線程,那么,自然需要一個數據共享容器,由生產者存入,消費者取出。這個容器就像是一個倉庫,生產出來的貨物堆積在里面,需要消費的時候再搬運出來,這個時候,就需要隊列(Queue)來實現該倉庫,一般而言,該隊列有兩種存取方式:

先進先出(FIFO,First In First Out):先插入的元素先取出,也就是按順序排隊;

后進先出(LIFO,Last In First Out):后插入的元素先取出,這是個棧結構(Stack),強調的是優先處理最新的物件。

設想這樣一個問題,如果生產的線程太積極,消費線程來不及處理,倉庫滿了,又或者消費線程太迅速,生產線程產能跟不上消費,那么要如何處理?

這就是生產者-消費者模型(Producer-Consumer)所解決的問題了。這個模型又稱為有界緩存模型,它主要包括了三個基本部分:

1)產品倉庫:用于存放產品;

2)生產者:負責生產產品,并把生產出來的產品存入倉庫;

3)消費者:消費倉庫里的產品。

這個模型的特性在于:倉庫里沒有產品的時候,消費者沒法繼續消費產品,只能等待新的產品產生;當倉庫裝滿之后,生產者沒有辦法存放產品,只能等待消費者消耗掉產品之后,才能繼續存放。

該特性應用在多線程環境中,可以表達為:生產者線程在倉庫裝滿之后會被阻塞,消費者線程則是在倉庫清空后阻塞。

在Java Concurrent包發布之前,該模型需要程序員自己維護阻塞隊列,但自己實現的隊列往往會在性能和安全性上有所缺陷,Java Concurrent包提供了BlockingQueue接口及其實現類來實現生產者-消費者模型。

java.util.concurrent.BlockingQueue是一個阻塞隊列接口。當BlockingQueue操作無法立即響應時,有四種處理方式:

1)拋出異常;

2)返回特定的值,根據操作不同,可能是null或者false中的一個;

3)無限期的阻塞當前線程,直到操作可以成功為止;

4)根據阻塞超時設置來進行阻塞;

BlockingQueue的核心方法和未響應處理方式的對應形式見表3-2。

表3-2 BlockingQueue的核心方法

BlockingQueue有很多實現類,圖3-16給出了部分常用的實現類。

3.5.1 ArrayBlockingQueue

ArrayBlockingQueue是基于數組實現的有界BlockingQueue,該隊列滿足先入先出(FIFO)的特性。它是一個典型的“有界緩存”,由一個固定大小的數組保存元素,一旦創建好以后,容量就不能改變了。

當隊列滿時,存數據的操作會被阻塞;隊列空時,取數據的操作會被阻塞。

除了數組以外,它還維護了兩個int變量,分別對應隊頭和隊尾的下標,隊頭存放的是入隊最早的元素,而隊尾則是入隊最晚的元素。

圖3-16 Queue類圖

下面給出一個使用ArrayBlockingQueue實現的生產者消費者模型的簡單示例代碼。

程序運行結果為:

這段代碼創建了一個生產者和兩個消費者,生產者每隔1s中就會生產一件商品并放入到隊列中,如果隊列滿了,那么生產者會一直等待,直到有消費者消費了商品后生產者才能把商品放入到隊列中。而消費者則每隔5s消費一件。

3.5.2 LinkedBlockingQueue

鏈表阻塞隊列,從命名可以看出它是基于鏈表實現的。同樣這也是個先入先出隊列(FIFO),隊頭是隊列里入隊時間最長的元素,隊尾則是入隊時間最短的。理論上它的吞吐量要超出數組阻塞隊列ArrayBlockingQueue。LinkedBlockQueue可以指定容量限制,在沒有指定的情況下,默認為Integer.MAX_VALUE。

與ArrayBlockingQueue相比,LinkedBlockingQueue的重入鎖被分成了兩份,分別對應存值和取值。這種實現方法被稱為雙鎖隊列算法,這樣做的好處在于,讀寫操作的lock操作是由兩個鎖來控制的,互不干涉,因此可以同時進行讀操作和寫操作,這也是LinkedBlockingQueue吞吐量超過ArrayBlockingQueue的主要原因。但是,使用兩個鎖要比一個鎖復雜很多,需要考慮各種死鎖的狀況。

LinkedBlockQueue的使用方式與ArrayBlockingQueue是相同的,示例代碼如下:

3.5.3 PriorityBlockingQueue

優先級阻塞隊列PriorityBlockQueue不是FIFO(先入先出)隊列,它要求使用者提供一個Comparetor比較器,或者隊列內部元素實現Comparable接口,隊頭元素會是整個隊列里的最小元素。

PriorityBlockQueue是用數組實現的最小堆結構,利用的原理是:在數組實現的完全二叉樹中,根結點的下標為子結點下標除以2。

PriorityBlockQueue是不定長的,會隨著數據的增長逐步擴容,其最大容量為Integer.MAX_VALUE-8。如果容量超出這個值,那么會產生OutOfMemoryError。

下面給出一個使用PriorityBlockQueue實現的“生產者-消費者”模型的代碼,生產者會把生產的產品放入隊列中,消費者會根據商品的優先級進行消費。

這個示例使用了一個生產者和消費者,也可以根據需求修改為多個生產者和消費者。

運行結果為:

3.5.4 ConcurrentLinkedQueue

ConcurrentLinkedQueue是一種非阻塞的線程安全隊列,與阻塞隊列LinkedBlockingQueue相對應。在之前的章節里有過介紹,LinkedBlockingQueue使用兩個ReentrantLock分別控制入隊和出隊以達到線程安全。

ConcurrentLinkedQueue同樣也是使用鏈表實現的FIFO隊列,但不同的是,它沒有使用任何鎖的機制,而是用CAS來實現的線程安全。下面以offer方法為例來介紹ConcurrentLinkedQueue是如何使用CAS實現的。

它是個單向鏈表,每個結點有一個當前結點的元素和下一個結點的指針,結點的定義如下

它采用先進先出的規則對結點進行排序,當添加一個元素的時候,它會添加到隊列的尾部(tail),當獲取一個元素時,它會返回隊列頭部(head)的元素。tail結點和head結點方便快速定位最后一個和第一個元素。

下面給出一個Node類中實現了CAS的方法;

offer方法的實現如下:

ConncurrentLinkedQueue的同步非阻塞算法使用循環+CAS來實現,這一類的源碼閱讀不能按照線性代碼執行的思維去考慮,而是應該用類似于狀態機的思路去理解。

只有把握以下原則,才能理解這種類型代碼的編程思路:

1)在確認達到執行目的前,循環不會終止;

2)非線程安全的全局變量要用局部變量引用以保證初始狀態;

3)由于全局變量可能被其他線程修改,在使用對應局部變量時,要驗證是否合法;

4)最終賦值要用CAS方法以保證原子性,避免線程發生不期望的修改。

理解了上面的思路后,來具體分析offer方法的循環體的實現原理。

變量含義:

1)p結點的期望值為最后一個結點;

2)newNode是新結點,期望添加到p結點之后;

3)q結點為p結點的后繼結點,可能為null,也可能因為多線程修改而不為空(指向新的結點);

4)t結點為代碼執行開始時的tail結點(成員變量),也可能因為多線程修改了tail結點,從而和tail結點不一致;

執行目的:

1)newNode作為新結點,需要插入到最后一個結點的next位置,如果它成為最后一個結點,那么把它設置為尾結點;

2)需要注意的是,多線程環境下,在多個插入同時進行時,不保證結點順序與執行順序的一致性,當然,這不影響執行成功。

狀態解析:

1)該插入算法,是以p結點的狀態判斷為核心的;

2)當p結點的下一個結點為null時,說明沒有后繼結點,此時執行p.casNext(null,newNode),如果失敗,那么說明其他線程在之前的瞬間修改了p.next,此時就需要從頭開始再找一次尾結點;如果成功,則執行目的達到,循環體可以結束了;

3)當p結點和q結點相等,這時鏈表發生了閉合(off),這是一個特殊情況,產生的原因有多種,但本質上是因為保證效率導致的意外情況,tail作為尾結點的引用可以在O(1)的時間復雜度內可以找到。但是,tail是可變的,所以其next可能指向它自身(比如重新設置casTail代碼可能還沒執行)。所以,如果t不是tail,那么使用tail重新計算,如果依然是tail,那么需要重置p為head,從頭開始遍歷鏈表,雖然復雜度為O(n),但是能保證以正確的方式找到隊尾;

4)如果以上情況都不滿足,那么判斷p是否還是隊尾,如果不是則設置為隊尾,否則p重新指向p.next,這里可能會產生疑惑,隊尾tail結點的next不應該是null嗎?

其實tail只是一個優化算法,不代表真正的隊尾,它有三種狀態:

1)初始化時,它是head;

2)奇數次插入時,它是隊尾;

3)偶數次插入時,它是隊尾的前一個結點;

由此可知,p==q一定發生在q!=null的時候。

4)這里需要特別注意下面代碼:

在q==null的時候,說明p應當為最后一個結點,如果p !=t,那么說明tail并不是尾結點,而是尾結點的前驅結點,此時需要重新設置tail為newNode,之后,tail會指向真正的尾結點。正是這句代碼導致了奇數次插入時tail是隊尾,偶數次是隊尾的前一個結點。

3.5.5 DelayQueue

DelayQueue是一種延遲隊列,它所管理的對象必須實現java.util.concurrent.Delayed接口,該接口提供了一個getDelay方法,用于獲取剩余的延遲時間,同時該接口繼承自Comparable,其compareTo的實現體一般用于比較延遲時間的大小。

DelayQueue是阻塞的優先級隊列。其線程安全由重入鎖ReentrantLock實現,而優先級特性則完全由內部組合的PriorityQueue來提供。

PriorityQueue內部使用“最小堆”實現的。下面給出一個DelayQueue的使用示例代碼:

運行結果為:

主站蜘蛛池模板: 丹东市| 昂仁县| 湟中县| 确山县| 运城市| 平山县| 江山市| 昆明市| 兴海县| 久治县| 安化县| 宁远县| 义乌市| 临城县| 乡城县| 万州区| 姚安县| 芦山县| 罗山县| 乌拉特中旗| 马边| 会同县| 正镶白旗| 丁青县| 宁国市| 南华县| 龙游县| 宜州市| 辽阳市| 阜宁县| 屏东市| 富阳市| 雷山县| 虎林市| 阳朔县| 大姚县| 元氏县| 宣城市| 广元市| 申扎县| 扶绥县|