官术网_书友最值得收藏!

1.14 并行流

流使得并行處理塊操作變得很容易。這個過程幾乎是自動的,但是需要遵守一些規(guī)則。首先,必須有一個并行流。可以用Collection.parallelStream()方法從任何集合中獲取一個并行流:

而且,parallel方法可以將任意的順序流轉(zhuǎn)換為并行流。

只要在終結(jié)方法執(zhí)行時,流處于并行模式,那么所有的中間流操作都將被并行化。

當(dāng)流操作并行運(yùn)行時,其目標(biāo)是要讓其返回結(jié)果與順序執(zhí)行時返回的結(jié)果相同。重要的是,這些操作可以以任意順序執(zhí)行。

下面的示例是一項你無法完成的任務(wù)。假設(shè)你想要對字符串流中的所有短單詞計數(shù):

這是一種非常非常糟糕的代碼。傳遞給forEach的函數(shù)會在多個并發(fā)線程中運(yùn)行,每個都會更新共享的數(shù)組。正如我們在卷Ⅰ第14章中所解釋的,這是一種經(jīng)典的競爭情況。如果多次運(yùn)行這個程序,你很可能就會發(fā)現(xiàn)每次運(yùn)行都會產(chǎn)生不同的計數(shù)值,而且每個都是錯的。

你的職責(zé)是要確保傳遞給并行流操作的任何函數(shù)都可以安全地并行執(zhí)行,達(dá)到這個目的的最佳方式是遠(yuǎn)離易變狀態(tài)。在本例中,如果用長度將字符串群組,然后分別對它們進(jìn)行計數(shù),那么就可以安全地并行化這項計算。

警告:傳遞給并行流操作的函數(shù)不應(yīng)該被堵塞。并行流使用fork-join池來操作流的各個部分。如果多個流操作被阻塞,那么池可能就無法做任何事情了。

默認(rèn)情況下,從有序集合(數(shù)組和列表)、范圍、生成器和迭代產(chǎn)生的流,或者通過調(diào)用Stream.sorted產(chǎn)生的流,都是有序的。它們的結(jié)果是按照原來元素的順序累積的,因此是完全可預(yù)知的。如果運(yùn)行相同的操作兩次,將會得到完全相同的結(jié)果。

排序并不排斥高效的并行處理。例如,當(dāng)計算stream.map(fun)時,流可以被劃分為n的部分,它們會被并行地處理。然后,結(jié)果將會按照順序重新組裝起來。

當(dāng)放棄排序需求時,有些操作可以被更有效地并行化。通過在流上調(diào)用unordered方法,就可以明確表示我們對排序不感興趣。Stream.distinct就是從這種方式中獲益的一種操作。在有序的流中,distinct會保留所有相同元素中的第一個,這對并行化是一種阻礙,因為處理每個部分的線程在其之前的所有部分都被處理完之前,并不知道應(yīng)該丟棄哪些元素。如果可以接受保留唯一元素中任意一個的做法,那么所有部分就可以并行地處理(使用共享的集來跟蹤重復(fù)元素)。

還可以通過放棄排序要求來提高limit方法的速度。如果只想從流中取出任意n個元素,而并不在意到底要獲取哪些,那么可以調(diào)用:

正如1.9節(jié)所討論的,合并映射表的代價很高昂。正是因為這個原因,Collectors.groupByConcurrent方法使用了共享的并發(fā)映射表。為了從并行化中獲益,映射表中值的順序不會與流中的順序相同。

當(dāng)然,如果使用獨(dú)立于排序的下游收集器,那么就不必在意了,例如:

警告:不要修改在執(zhí)行某項流操作后會將元素返回到流中的集合(即使這種修改是線程安全的)。記住,流并不會收集它們的數(shù)據(jù),數(shù)據(jù)總是在單獨(dú)的集合中。如果修改了這樣的集合,那么流操作的結(jié)果就是未定義的。JDK文檔對這項需求并未做出任何約束,并且對順序流和并行流都采用了這種處理方式。

更準(zhǔn)確地講,因為中間的流操作都是惰性的,所以直到執(zhí)行終結(jié)操作時才對集合進(jìn)行修改仍舊是可行的。例如,下面的操作盡管并不推薦,但是仍舊可以工作:

但是,下面的代碼是錯誤的:

為了讓并行流正常工作,需要滿足大量的條件:

·數(shù)據(jù)應(yīng)該在內(nèi)存中。必須等到數(shù)據(jù)到達(dá)是非常低效的。

·流應(yīng)該可以被高效地分成若干個子部分。由數(shù)組或平衡二叉樹支撐的流都可以工作得很好,但是Stream.iterate返回的結(jié)果不行。

·流操作的工作量應(yīng)該具有較大的規(guī)模。如果總工作負(fù)載并不是很大,那么搭建并行計算時所付出的代價就沒有什么意義。

·流操作不應(yīng)該被阻塞。

換句話說,不要將所有的流都轉(zhuǎn)換為并行流。只有在對已經(jīng)位于內(nèi)存中的數(shù)據(jù)執(zhí)行大量計算操作時,才應(yīng)該使用并行流。

程序清單1-8中的示例程序展示了如何操作并行流。

程序清單1-8 parallel/ParallelStreams.java

java.util.stream.BaseStream<T,S extends BaseStream<T,S>>8

·S parallel()

產(chǎn)生一個與當(dāng)前流中元素相同的并行流。

·S unordered()

產(chǎn)生一個與當(dāng)前流中元素相同的無序流。

java.util.Collection<E>1.2

·Stream<E>parallelStream()8

用當(dāng)前集合中的元素產(chǎn)生一個并行流。

在本章中,你學(xué)習(xí)到了如何運(yùn)用Java 8的流庫。下一章將討論另一個重要的主題:處理輸入和輸出。

主站蜘蛛池模板: 无为县| 景东| 桐乡市| 怀柔区| 三河市| 北安市| 鸡西市| 富宁县| 博湖县| 利津县| 铜陵市| 大冶市| 广昌县| 商洛市| 乐至县| 晋江市| 河曲县| 嵊泗县| 宣武区| 南通市| 乌拉特前旗| 通道| 容城县| 资中县| 大名县| 田东县| 开原市| 林州市| 施秉县| 汉沽区| 巴南区| 石屏县| 神池县| 阿荣旗| 彰化市| 绥芬河市| 虞城县| 内江市| 郁南县| 洛川县| 偏关县|