官术网_书友最值得收藏!

1.2 NIO,一本難念的經

我們知道,分布式系統的基礎是網絡。因此,網絡編程是分布式軟件工程師和架構師的必備技能之一,而且隨著當前大數據和實時計算技術的興起,高性能RPC架構與網絡編程技術再次成為焦點。不管是RPC領域的ZeroC Ice、Thrift,還是經典分布式框架Actor模型中的Akka,或者實時流領域的Storm、Spark、Flink,又或者開源分布式數據庫中的Mycat、VoltDB,這些高大上產品的底層通信技術都采用了NIO(非阻塞通信)通信技術。而Java領域里大名鼎鼎的NIO框架——Netty,則被眾多的開源項目或商業軟件所采用。

相對于它的老前輩BIO(阻塞通信)來說,NIO模型非常復雜,以至于我們難以精通它,難以編寫出一個沒有缺陷、高效且適應各種意外情況的穩定的NIO通信模塊。之所以會出現這樣的問題,是因為NIO編程不是單純的一個技術點,而是涵蓋了一系列相關技術、專業知識、編程經驗和編程技巧的復雜工程。

1.2.1 難懂的ByteBuffer

Java NIO拋棄了我們所熟悉的Stream、byte[]等數據結構,設計了一個全新的數據結構——ByteBuffer,ByteBuffer的主要使用場景是保存從Socket中讀取的輸入字節流并循環利用,以減少GC的壓力。Java NIO功能強大,但難以掌握。以經典的Echo服務器為例,其核心是讀入客戶端發來的數據,并且回寫給客戶端,這段代碼用ByteBuffer來實現,大致就是下面的邏輯:

如果我們能馬上發現在上述代碼中存在一個嚴重缺陷且無法正常工作,那么說明我們的確精通了ByteBuffer的用法。這段代碼的缺陷是在第6行之前少了一個byteBuffer.flip()調用。之所以ByteBuffer會設計這樣一個名稱奇怪的Method,是因為它與我們所熟悉的InputStream&OutStream分別操作輸入輸出流的傳統I/O設計方式不同,是“二合一”的設計方式。我們可以把ByteBuffer設想成內部擁有一個固定長度的Byte數組的對象,屬性capacity為數組的長度(不可變),position變量保存當前讀(或寫)的位置,limit變量為當前可讀或可寫的位置上限。當Byte被寫入ByteBuffer中時,position++,而0到position之間的字符就是已經寫入的字符。如果后面要讀取之前寫入的這些字符,則需要將position重置為0,limit則被設置為之前position的值,這個操作恰好就是flip要做的事情,這樣一來,position到limit之間的字符剛好是要讀的全部數據。

ByteBuffer有三種實現方式:第一種是堆內存儲數據的HeapByteBuffer;第二種是堆外存儲數據的DirectByteBuffer;第三種是文件映射(數據存儲到文件中)的MappedByteBuffer。HeapByteBuffer將數據保存在JVM堆內存中,我們知道64位JVM的堆內存在最大為32GB時內存利用率最高,一旦堆超過了32GB,就進入64位的世界里了,應用程序的可用堆空間就會減小。另外,過大的JVM堆內存也容易導致復雜的GC問題,因此最好的辦法是采用堆外內存,堆外內存的管理由程序員自己控制,類似于C語言的直接內存管理。DirectByteBuffer是采用堆外內存來存放數據的,因此在訪問性能提升的同時帶來了復雜的動態內存管理問題。而動態內存管理是一項高端編程技術,涵蓋了內存分配性能、內存回收、內存碎片化、內存利用率等一系列復雜問題。

在內存分配性能方面,我們通常會在Java里采用ThreadLocal對象來實現多線程本地化分配的思路,即每個線程都擁有一個ThreadLocal類型的ByteBufferPool,然后每個線程都管理各自的內存分配和回收問題,避免共享資源導致的競爭問題。Grizzy NIO框架中的ByteBufferThreadLocalPool,就采用了ThreadLocal結合ByteBuffer視圖的動態內存管理技術:

上面的代碼很簡單也很經典,可以分配任意大小的內存塊,但存在一個問題:它只能從Pool的當前位置持續往下分配空間,而中間被回收的內存塊是無法立即被分配的,因此內存利用率不高。另外,當后面分配的內存沒有被及時釋放時,會發生內存溢出,即使前面分配的內存早已釋放大半。其實上述問題可以通過一個環狀結構(Ring)來解決,即分配到頭以后,回頭重新繼續分配,但代碼會稍微復雜點。

Netty則采用了另外一種思路。首先,Netty的作者認為JDK的ByteBuffer設計得并不好,其中ByteBuffer不能繼承,以及API難用、容易出錯是最大的兩個問題,于是他重新設計了一個接口ByteBuf來代替官方的ByteBuffer。如下所示是ByteBuf的設計示意圖,它通過分離讀寫的位置變量(reader index及writer index),簡單、有效地解決了ByteBuffer難懂的flip操作問題,這樣一來ByteBuf也可以實現同時讀與寫的功能了。

由于ByteBuf是一個接口,所以可以繼承與擴展,為了實現分配任意長度的Buffer,Netty設計了一個CompositeByteBuf實現類,它通過組合多個ByteBuf實例的方式巧妙實現了動態擴容能力,這種組合擴容的方式存在一個讀寫效率問題,即判斷當前的讀寫位置是否要移到下一個ByteBuf實例上。

Netty的ByteBuf實例還有一個很重要的特征,即記錄了被引用的次數,所有實例都繼承自AbstractReferenceCountedByteBuf。這點非常重要,因為我們在實現ByteBufPool時,需要確保ByteBuf被正確釋放和回收,由于官方的ByteBuffer缺乏這一特征,因此很容易因為使用不當導致內存泄漏或者內存訪問錯誤等嚴重Bug。

由于使用ByteBuffer時用得最多的是堆外DirectByteBuffer,因此一個功能齊全、高效的Buffer Pool對于NIO來說相當重要。官方JDK并沒有提供這樣的工具包,于是Netty的作者基于ByteBuf實現了一套可以在Netty之外單獨使用的Buffer Pool框架,如下圖所示。

MappedByteBuffer說得通俗一點就是Map把一個磁盤文件(整體或部分內容)映射到計算機虛擬內存的一塊區域,這樣就可以直接操作內存中的數據,無須每次都通過I/O從物理硬盤上讀取文件,所以在效率上有很大提升。要想真正理解MappedByteBuffer的原理和價值,就需要掌握操作系統內存、文件系統、內存頁與內存交換的基本知識。

如下圖所示,每個進程都有一個虛擬地址空間,也被稱為邏輯內存地址,其大小由該系統上的地址大小規定,比如32位Windows的單進程可尋址空間是4GB,虛擬地址空間也使用分頁機制,即我們所說的內存頁面。當一個程序嘗試使用虛擬地址訪問內存時,操作系統連同硬件會將該分頁的虛擬地址映射到某個具體的物理位置,這個位置可以是物理內存、頁面文件(Page File是Windows的說法,對應Linux下的swap)或文件系統中的一個普通文件。盡管每個進程都有自己的地址空間,但程序通常無法使用所有這些空間,因為地址空間被劃分為內核空間和用戶空間。大部分操作系統都將每個進程地址空間的一部分映射到一個通用的內核內存區域。被映射來供內核使用的地址空間部分被稱為內核空間,其余部分被稱為用戶空間,可供用戶的應用程序使用。

MappedByteBuffer使用mmap系統調用來實現文件的內存映射,如下圖中的過程1所示。此外,內存映射的過程只是在邏輯上被放入內存中,具體到代碼,就是建立并初始化了相關的數據結構(struct address_space),并沒有實際的數據復制,文件沒有被載入內存,所以建立內存映射的效率很高。僅在此文件的內容要被訪問時,才會觸發操作系統加載內存頁,這個過程可能涉及物理內存不足時內存交換的問題,即過程4。

通過上面的原理分析,我們就不難理解JDK中關于MappedByteBuffer的一些方法的作用了。

● fore():當緩沖區是READ_WRITE模式時,此方法對緩沖區內容的修改強行寫入文件。

● load():將緩沖區的內容載入內存,并返回該緩沖區的引用。

● isLoaded():如果緩沖區的內容在物理內存中,則返回真,否則返回假。

MappedByteBuffer的主要使用場景有如下兩個。

● 基于文件共享的高性能進程間通信(IPC)。

● 大文件高性能讀寫訪問。

正因為上述兩個獨特的使用場景,MappedByteBuffer有很多高端應用,比如Kafka采用MappedByteBuffer來處理消息日志文件。分布式文件系統Tachyon也采用了MappedByteBuffer加速文件讀寫。高性能IPC通信技術在當前的大數據和實時計算方面越來越重要,原因很簡單:當前服務器的核心數越來越多,而且都支持NUMA技術,在這種情況下,單機上的多進程架構能最大地提升系統的整體吞吐量。于是,有人基于MappedByteBuffer實現了一個DEMO性質的高性能IPC通信實例,該實例采用內存映射文件來實現Java多進程間的數據通信,其原理圖如下所示。

其中,一個進程負責寫入數據到內存映射文件中,其他進程(不限于Java)則從此映射文件中讀取數據。經筆者測試,其性能極高,在筆者的筆記本計算機上可以達到每秒4000萬的傳輸速度,消息延時僅僅25ns。受此項目的啟發,筆者也發起了一個更為完善的Mycat-IPC開源框架,此項目的關鍵點在于用一個MappedByteBuffer模擬了N組環形隊列的數據結構,用來表示一個進程發送或者讀取的消息隊列。

如下所示是MappedByteBuffer的內存結構圖,內存起始位置記錄了當前定義的幾個RingQueue,隨后記錄了每個RingQueue的長度以確定其開始內存地址與結束內存地址,RingQueue類似于ByteBuffer的設計,有記錄讀寫內存位置的變量,而被放入隊列中的每個“消息”都有兩個字節的長度、消息體本身,以及下個消息的開始位置Flag(繼續當前位置還是已經掉頭、從頭開始)。筆者計劃未來將Mycat拆成多進程的架構,一個進程負責接收客戶端的Socket請求,然后把數據通過IPC框架分發給后面幾個獨立的進程去處理,處理完的響應再通過IPC回傳給Socket監聽進程,最終寫入客戶端。

MappedByteBuffer還有另外一個奇妙的特性,“零復制”傳輸數據,它的transferTo方法能節省一次緩沖區的復制過程,將其直接寫入另外一個Channel通道,如下圖所示。

Netty傳輸文件的邏輯就用到了transferTo這一特性,下面的代碼片段給出了真相:

1.2.2 晦澀的“非阻塞”

NIO里“非阻塞”(None Blocking)這個否定式的新名稱對于大多數程序員來說的確很難理解。在解釋“非阻塞”這個概念之前,讓我們先來惡補一下TCP/IP通信的基礎知識。

首先,對于TCP通信來說,每個TCP Socket在內核中都有一個發送緩沖區和一個接收緩沖區,TCP的全雙工工作模式及TCP的滑動窗口便依賴于這兩個獨立的Buffer及此Buffer的填充狀態。接收緩沖區把數據緩存入內核中,若應用進程一直沒有調用Socket的read方法進行讀取,則此數據會一直被緩存在接收緩沖區中。不管進程是否讀取Socket,對端發來的數據都會經由內核接收并且緩存到Socket的內核接收緩沖區中。read方法所做的工作,就是把內核接收緩沖區中的數據復制到應用層用戶的Buffer中。進程在調用Socket的send方法發送數據時,最簡單的情況(也是一般情況)是將數據從應用層用戶的Buffer中復制到Socket的內核發送緩沖區中,send方法便會在上層返回。換句話說,在send方法返回時,數據不一定會被發送到對端(與write方法寫文件有點類似),send方法僅僅是把應用層Buffer的數據復制到Socket的內核發送Buffer中。而對于UDP通信來說,每個UDP Socket都有一個接收緩沖區,沒有發送緩沖區,從概念上來說只要有數據就發送,不管對方是否可以正確接收,所以不緩沖,也不需要發送緩沖區。

其次,我們來說說TCP/IP的滑動窗口和流量控制機制。前面提到,Socket的接收緩沖區被TCP和UDP用來緩存在網絡上收到的數據,保存到應用進程讀取為止。對于TCP來說,如果應用進程一直沒有讀取,則在Buffer滿了之后發生的動作是:通知對端TCP中的窗口關閉,保證TCP套接口接收緩沖區不會溢出,保證TCP是可靠傳輸的,這便是滑動窗口的實現。因為對方不允許發出超過通告窗口大小的數據,所以如果對方無視窗口的大小發出了超過窗口大小的數據,則接收方TCP將丟棄它,這就是TCP的流量控制原理。對于UDP來說,當接收方的Socket接收緩沖區滿時,新來的數據報無法進入接收緩沖區,此數據報會被丟棄。UDP是沒有流量控制的,快的發送者可以很容易地淹沒慢的接收者,導致接收方的UDP丟棄數據報。

明白了Socket讀寫數據的底層原理,我們就容易理解傳統的“阻塞模式”了:對于讀取Socket數據的過程而言,如果接收緩沖區為空,則調用Socket的read方法的線程會阻塞,直到有數據進入接收緩沖區;對于寫數據到Socket中的線程而言,如果待發送的數據長度大于發送緩沖區的空余長度,則會被阻塞在write方法上,等待發送緩沖區的報文被發送到網絡上,然后繼續發送下一段數據,循環上述過程直到數據都被寫入發送緩沖區為止。

從上述過程來看,傳統的Socket阻塞模式直接導致每個Socket都必須綁定一個線程來操作數據,參與通信的任意一方如果處理數據的速度較慢,則都會直接拖累另一方,導致另一方的線程不得不浪費大量的時間在I/O等待上,所以,每個Socket都要綁定一個單獨的線程正是傳統Socket阻塞模式的根本“缺陷”。之所以這里加了“缺陷”兩個字,是因為這種模式在一些特定場合下效果是最好的,比如只有少量的TCP連接通信,雙方都非常快速地傳輸數據,此時這種模式的性能最高。

現在我們可以開始分析“非阻塞”模式了,它就是要解決I/O線程與Socket解耦的問題,因此,它引入了事件機制來達到解耦的目的。我們可以認為在NIO底層中存在一個I/O調度線程,它不斷掃描每個Socket的緩沖區,當發現寫入緩沖區為空(或者不滿)時,它會產生一個Socket可寫事件,此時程序就可以把數據寫入Socket中,如果一次寫不完,則等待下次可寫事件的通知;當發現在讀取緩沖區中有數據時,會產生一個Socket可讀事件,程序在收到這個通知事件時,就可以從Socket讀取數據了。

上述原理聽起來很簡單,但實際上有很多“坑”,如下所述。

● 收到可寫事件時,想要一次性地寫入全部數據,而不是將剩余數據放入Session中,等待下次可寫事件的到來。

● 在寫完數據并且沒有可寫數據時,若應答數據報文已經被全部發送給客戶端,則需要取消對可寫事件的“訂閱”,否則NIO調度線程總是報告Socket可寫事件,導致CPU使用率狂飆。因此,如果沒有數據可寫,就不要訂閱可寫事件。

● 如果來不及處理發送的數據,就需要暫時“取消訂閱”可讀事件,否則數據從Socket里讀取以后,下次還會很快發送過來,而來不及處理的數據被積壓到內存隊列中,導致內存溢出。

此外,在NIO中還有一個容易被忽略的高級問題,即業務數據處理邏輯是使用NIO調度線程來執行還是使用其他線程池里的線程來執行?關于這個問題,沒有絕對的答案,我們在Mycat的研發過程中經過大量測試和研究得出以下結論:

如果數據報文的處理邏輯比較簡單,不存在耗時和阻塞的情況,則可以直接用NIO調度線程來執行這段邏輯,避免線程上下文切換帶來的損耗;如果數據報文的處理邏輯比較復雜,耗時比較多,而且可能存在阻塞和執行時間不確定的情況,則建議將其放入線程池里去異步執行,防止I/O調度線程被阻塞。

如下所示是Mycat里相關設計的示意圖。

1.2.3 復雜的Reactor模型

Java NIO框架比較原始,目前主流的Java網絡程序都在其上設計實現了Reactor模型,隱藏了NIO底層的復雜細節,大大簡化了NIO編程,其原理和架構如下圖所示。Acceptor負責接收客戶端Socket發起的新建連接請求,并把該Socket綁定到一個Reactor線程上,于是這個Socket隨后的讀寫事件都交給此Reactor線程來處理。Reactor線程在讀取數據后,交給用戶程序中的具體Handler實現類來完成特定的業務邏輯處理。為了不影響Reactor線程,我們通常使用一個單獨的線程池來異步執行Handler的接口方法。

如果僅僅到此為止,則NIO里的Reactor模型還不算很復雜,但實際上,我們的服務器是多核心的,而且需要高速并發處理大量的客戶端連接,單線程的Reactor模型就滿足不了需求了,因此我們需要多線程的Reactor。一般原則是Reactor(線程)的數量與CPU核心數(邏輯CPU)保持一致,即每個CPU都執行一個Reactor線程,客戶端的Socket連接則被隨機均分到這些Reactor線程上去處理,如果有8000個連接,而CPU核心數為8,則每個CPU核心平均承擔1000個連接。

多線程Reactor模型可能帶來另外一個問題,即負載不均衡。雖然每個Reactor線程服務的Socket數量都是均衡的,但每個Socket的I/O事件可能是不均衡的,某些Socket的I/O事件可能大大多于其他Socket,從而導致某些Reactor線程負載更高,此時是否需要重新分配Socket到不同的Reactor線程呢?這的確是一個問題。因為如果要切換Socket到另外的Reactor線程,則意味著Socket相關的Connection對象、Session對象等必須是線程安全的,這本身就帶來一定的性能損耗。另外,我們需要對I/O事件做統計分析,啟動額外的定時線程在合適的時機完成Socket重新分配,這本身就是很復雜的事情。

由于Netty的代碼過于復雜,所以下面以Mycat NIO Framework為例,來說說應該怎樣設計一個基于多線程Reactor模式的高性能NIO框架。

如下圖所示,我們先要有一個基礎類NetSystem,它負責NIO框架中基礎參數與基礎組件的創建,其中常用的基礎參數如下。

● Socket緩存區的大小。

● TCP_NODELAY標記。

● Reactor的個數。

● ByteBuffer Pool的參數。

● 業務線程池的大小。

基礎組件如下。

● NameableExecutor:業務線程池。

● NIOAcceptor:負責接收客戶端的新建連接請求。

● NIOConnector:負責發起客戶端連接(NIO模式)。

考慮到不同的應用都需要創建自己的Connection實例來實現應用特定的網絡協議,而且在一個程序里可能會有幾種網絡協議,因此人們在框架里設計了Connection抽象類,采用的是工廠模式,即由不同的ConnectionFactory來創建不同的Connection實現類。不管是作為NIO Server還是作為NIO Client,應用程序都可以采用這套機制來實現自己的Connection。當收到Socket報文(及相關事件)時,框架會調用綁定在此Connection上的NIO Handler來處理報文,而Connection要發送的數據被放入一個WriteQueue隊列里,框架實現具體的無阻塞發送邏輯。

為了更好地使用有限的內存,Mycat NIO設計了一個“雙層”的ByteBuffer Pool模型,全局的ByteBuffer Pool被所有Connection共享,每個Reactor線程則都在本地保留了一份局部占用的ByteBuffer Pool——ThreadLocalBufferPool,我們可以設定80%的ByteBuffer被N個Reactor線程從全局Pool里取出并放到本地的ThreadLocalBufferPool里,這樣一來就可以避免過多的全局Pool的鎖搶占操作,提升NIO性能。

NIOAcceptor在收到客戶端發起的新連接事件后,會新建一個Connection對象,然后隨機找到一個NIOReactor,并把此Connection對象放入該NIOReactor的Register隊列中等待處理,NIOReactor會在下一次的Selector循環事件處理之前,先處理所有新的連接請求。下面兩段來自NIOReactor的代碼表明了這一邏輯過程:

NIOConnector屬于NIO客戶端框架的一部分,與NIOAcceptor類似,在需要發起一個NIO連接時,程序調用下面的方法將連接放入“待連接隊列”中并喚醒Selector:

隨后,NIOConnector的線程會先處理“待連接隊列”,發起真正的NIO連接并異步等待響應:

最后,在NIOConnector的線程Run方法里,對收到連接完成事件的Connection,回調應用的通知接口,應用在得知連接已經建立時,可以在接口里主動發數據或者請求讀數據:

主站蜘蛛池模板: 八宿县| 额尔古纳市| 锡林郭勒盟| 青浦区| 顺义区| 武城县| 昭通市| 邳州市| 昌宁县| 青河县| 湘潭市| 保亭| 舞阳县| 项城市| 南乐县| 栖霞市| 岳阳县| 海淀区| 资讯 | 武宣县| 阿合奇县| 富平县| 昌宁县| 长海县| 宝坻区| 浦城县| 福安市| 小金县| 阿拉尔市| 虹口区| 图木舒克市| 静安区| 佳木斯市| 利津县| 东乌珠穆沁旗| 保德县| 裕民县| 凤台县| 恩平市| 涟水县| 通山县|