官术网_书友最值得收藏!

2.2 原理分析

在前面的章節(jié)中,我們已經(jīng)了解了KafkaProducer的具體使用方法,而本節(jié)的內(nèi)容主要是對(duì)Kafka 生產(chǎn)者客戶端的內(nèi)部原理進(jìn)行分析,通過了解生產(chǎn)者客戶端的整體脈絡(luò)可以讓我們更好地使用它,避免因?yàn)橐恍├斫馍系钠疃斐墒褂蒙系腻e(cuò)誤。

2.2.1 整體架構(gòu)

在2.1.4節(jié)的開頭介紹了消息在真正發(fā)往Kafka之前,有可能需要經(jīng)歷攔截器(Interceptor)、序列化器(Serializer)和分區(qū)器(Partitioner)等一系列的作用,那么在此之后又會(huì)發(fā)生什么呢?下面我們來看一下生產(chǎn)者客戶端的整體架構(gòu),如圖2-1所示。

圖2-1 生產(chǎn)者客戶端的整體架構(gòu)

整個(gè)生產(chǎn)者客戶端由兩個(gè)線程協(xié)調(diào)運(yùn)行,這兩個(gè)線程分別為主線程和Sender線程(發(fā)送線程)。在主線程中由KafkaProducer創(chuàng)建消息,然后通過可能的攔截器、序列化器和分區(qū)器的作用之后緩存到消息累加器(RecordAccumulator,也稱為消息收集器)中。Sender 線程負(fù)責(zé)從RecordAccumulator中獲取消息并將其發(fā)送到Kafka中。

RecordAccumulator 主要用來緩存消息以便 Sender 線程可以批量發(fā)送,進(jìn)而減少網(wǎng)絡(luò)傳輸?shù)馁Y源消耗以提升性能。RecordAccumulator 緩存的大小可以通過生產(chǎn)者客戶端參數(shù)buffer.memory 配置,默認(rèn)值為 33554432B,即 32MB。如果生產(chǎn)者發(fā)送消息的速度超過發(fā)送到服務(wù)器的速度,則會(huì)導(dǎo)致生產(chǎn)者空間不足,這個(gè)時(shí)候KafkaProducer的send()方法調(diào)用要么被阻塞,要么拋出異常,這個(gè)取決于參數(shù)max.block.ms的配置,此參數(shù)的默認(rèn)值為60000,即60秒。

主線程中發(fā)送過來的消息都會(huì)被追加到RecordAccumulator的某個(gè)雙端隊(duì)列(Deque)中,在 RecordAccumulator 的內(nèi)部為每個(gè)分區(qū)都維護(hù)了一個(gè)雙端隊(duì)列,隊(duì)列中的內(nèi)容就是ProducerBatch,即 Deque<ProducerBatch>。消息寫入緩存時(shí),追加到雙端隊(duì)列的尾部;Sender讀取消息時(shí),從雙端隊(duì)列的頭部讀取。注意ProducerBatch不是ProducerRecord,ProducerBatch中可以包含一至多個(gè) ProducerRecord。通俗地說,ProducerRecord 是生產(chǎn)者中創(chuàng)建的消息,而ProducerBatch是指一個(gè)消息批次,ProducerRecord會(huì)被包含在ProducerBatch中,這樣可以使字節(jié)的使用更加緊湊。與此同時(shí),將較小的ProducerRecord拼湊成一個(gè)較大的ProducerBatch,也可以減少網(wǎng)絡(luò)請(qǐng)求的次數(shù)以提升整體的吞吐量。ProducerBatch和消息的具體格式有關(guān),更多的詳細(xì)內(nèi)容可以參考 5.2 節(jié)。如果生產(chǎn)者客戶端需要向很多分區(qū)發(fā)送消息,則可以將buffer.memory參數(shù)適當(dāng)調(diào)大以增加整體的吞吐量。

消息在網(wǎng)絡(luò)上都是以字節(jié)(Byte)的形式傳輸?shù)模诎l(fā)送之前需要?jiǎng)?chuàng)建一塊內(nèi)存區(qū)域來保存對(duì)應(yīng)的消息。在Kafka生產(chǎn)者客戶端中,通過java.io.ByteBuffer實(shí)現(xiàn)消息內(nèi)存的創(chuàng)建和釋放。不過頻繁的創(chuàng)建和釋放是比較耗費(fèi)資源的,在RecordAccumulator的內(nèi)部還有一個(gè)BufferPool,它主要用來實(shí)現(xiàn)ByteBuffer的復(fù)用,以實(shí)現(xiàn)緩存的高效利用。不過BufferPool只針對(duì)特定大小的ByteBuffer進(jìn)行管理,而其他大小的ByteBuffer不會(huì)緩存進(jìn)BufferPool中,這個(gè)特定的大小由batch.size參數(shù)來指定,默認(rèn)值為16384B,即16KB。我們可以適當(dāng)?shù)卣{(diào)大batch.size參數(shù)以便多緩存一些消息。

ProducerBatch的大小和batch.size參數(shù)也有著密切的關(guān)系。當(dāng)一條消息(ProducerRecord)流入RecordAccumulator時(shí),會(huì)先尋找與消息分區(qū)所對(duì)應(yīng)的雙端隊(duì)列(如果沒有則新建),再?gòu)倪@個(gè)雙端隊(duì)列的尾部獲取一個(gè) ProducerBatch(如果沒有則新建),查看 ProducerBatch 中是否還可以寫入這個(gè) ProducerRecord,如果可以則寫入,如果不可以則需要?jiǎng)?chuàng)建一個(gè)新的ProducerBatch。在新建ProducerBatch時(shí)評(píng)估這條消息的大小是否超過batch.size參數(shù)的大小,如果不超過,那么就以 batch.size 參數(shù)的大小來創(chuàng)建 ProducerBatch,這樣在使用完這段內(nèi)存區(qū)域之后,可以通過BufferPool 的管理來進(jìn)行復(fù)用;如果超過,那么就以評(píng)估的大小來創(chuàng)建ProducerBatch,這段內(nèi)存區(qū)域不會(huì)被復(fù)用。

Sender 從 RecordAccumulator 中獲取緩存的消息之后,會(huì)進(jìn)一步將原本<分區(qū),Deque<ProducerBatch>>的保存形式轉(zhuǎn)變成<Node,List< ProducerBatch>的形式,其中Node表示Kafka集群的broker節(jié)點(diǎn)。對(duì)于網(wǎng)絡(luò)連接來說,生產(chǎn)者客戶端是與具體的broker節(jié)點(diǎn)建立的連接,也就是向具體的 broker 節(jié)點(diǎn)發(fā)送消息,而并不關(guān)心消息屬于哪一個(gè)分區(qū);而對(duì)于 KafkaProducer的應(yīng)用邏輯而言,我們只關(guān)注向哪個(gè)分區(qū)中發(fā)送哪些消息,所以在這里需要做一個(gè)應(yīng)用邏輯層面到網(wǎng)絡(luò)I/O層面的轉(zhuǎn)換。

在轉(zhuǎn)換成<Node,List<ProducerBatch>>的形式之后,Sender 還會(huì)進(jìn)一步封裝成<Node,Request>的形式,這樣就可以將Request請(qǐng)求發(fā)往各個(gè)Node了,這里的Request是指Kafka的各種協(xié)議請(qǐng)求,對(duì)于消息發(fā)送而言就是指具體的 ProduceRequest,更多與 Kafka 協(xié)議有關(guān)的內(nèi)容可以參考6.1節(jié)。

請(qǐng)求在從Sender線程發(fā)往Kafka之前還會(huì)保存到InFlightRequests中,InFlightRequests保存對(duì)象的具體形式為 Map<NodeId,Deque<Request>>,它的主要作用是緩存了已經(jīng)發(fā)出去但還沒有收到響應(yīng)的請(qǐng)求(NodeId 是一個(gè) String 類型,表示節(jié)點(diǎn)的 id 編號(hào))。與此同時(shí),InFlightRequests還提供了許多管理類的方法,并且通過配置參數(shù)還可以限制每個(gè)連接(也就是客戶端與Node之間的連接)最多緩存的請(qǐng)求數(shù)。這個(gè)配置參數(shù)為max.in.flight.requests.per.connection,默認(rèn)值為 5,即每個(gè)連接最多只能緩存 5 個(gè)未響應(yīng)的請(qǐng)求,超過該數(shù)值之后就不能再向這個(gè)連接發(fā)送更多的請(qǐng)求了,除非有緩存的請(qǐng)求收到了響應(yīng)(Response)。通過比較Deque<Request>的size與這個(gè)參數(shù)的大小來判斷對(duì)應(yīng)的Node中是否已經(jīng)堆積了很多未響應(yīng)的消息,如果真是如此,那么說明這個(gè) Node 節(jié)點(diǎn)負(fù)載較大或網(wǎng)絡(luò)連接有問題,再繼續(xù)向其發(fā)送請(qǐng)求會(huì)增大請(qǐng)求超時(shí)的可能。

2.2.2 元數(shù)據(jù)的更新

2.2.1節(jié)中提及的InFlightRequests還可以獲得leastLoadedNode,即所有Node中負(fù)載最小的那一個(gè)。這里的負(fù)載最小是通過每個(gè)Node在InFlightRequests中還未確認(rèn)的請(qǐng)求決定的,未確認(rèn)的請(qǐng)求越多則認(rèn)為負(fù)載越大。對(duì)于圖 2-2 中的 InFlightRequests 來說,圖中展示了三個(gè)節(jié)點(diǎn)Node0、Node1和Node2,很明顯Node1的負(fù)載最小。也就是說,Node1為當(dāng)前的leastLoadedNode。選擇leastLoadedNode發(fā)送請(qǐng)求可以使它能夠盡快發(fā)出,避免因網(wǎng)絡(luò)擁塞等異常而影響整體的進(jìn)度。leastLoadedNode的概念可以用于多個(gè)應(yīng)用場(chǎng)合,比如元數(shù)據(jù)請(qǐng)求、消費(fèi)者組播協(xié)議的交互。

圖2-2 判定leastLoadedNode

我們使用如下的方式創(chuàng)建了一條消息ProducerRecord:

我們只知道主題的名稱,對(duì)于其他一些必要的信息卻一無所知。KafkaProducer要將此消息追加到指定主題的某個(gè)分區(qū)所對(duì)應(yīng)的leader副本之前,首先需要知道主題的分區(qū)數(shù)量,然后經(jīng)過計(jì)算得出(或者直接指定)目標(biāo)分區(qū),之后KafkaProducer需要知道目標(biāo)分區(qū)的leader副本所在的broker 節(jié)點(diǎn)的地址、端口等信息才能建立連接,最終才能將消息發(fā)送到 Kafka,在這一過程中所需要的信息都屬于元數(shù)據(jù)信息。

在2.1.1節(jié)中我們了解了bootstrap.servers參數(shù)只需要配置部分broker節(jié)點(diǎn)的地址即可,不需要配置所有broker節(jié)點(diǎn)的地址,因?yàn)榭蛻舳丝梢宰约喊l(fā)現(xiàn)其他broker節(jié)點(diǎn)的地址,這一過程也屬于元數(shù)據(jù)相關(guān)的更新操作。與此同時(shí),分區(qū)數(shù)量及l(fā)eader副本的分布都會(huì)動(dòng)態(tài)地變化,客戶端也需要?jiǎng)討B(tài)地捕捉這些變化。

元數(shù)據(jù)是指Kafka集群的元數(shù)據(jù),這些元數(shù)據(jù)具體記錄了集群中有哪些主題,這些主題有哪些分區(qū),每個(gè)分區(qū)的leader副本分配在哪個(gè)節(jié)點(diǎn)上,follower副本分配在哪些節(jié)點(diǎn)上,哪些副本在AR、ISR等集合中,集群中有哪些節(jié)點(diǎn),控制器節(jié)點(diǎn)又是哪一個(gè)等信息。

當(dāng)客戶端中沒有需要使用的元數(shù)據(jù)信息時(shí),比如沒有指定的主題信息,或者超過metadata.max.age.ms 時(shí)間沒有更新元數(shù)據(jù)都會(huì)引起元數(shù)據(jù)的更新操作。客戶端參數(shù)metadata.max.age.ms的默認(rèn)值為300000,即5分鐘。元數(shù)據(jù)的更新操作是在客戶端內(nèi)部進(jìn)行的,對(duì)客戶端的外部使用者不可見。當(dāng)需要更新元數(shù)據(jù)時(shí),會(huì)先挑選出leastLoadedNode,然后向這個(gè)Node發(fā)送MetadataRequest請(qǐng)求來獲取具體的元數(shù)據(jù)信息。這個(gè)更新操作是由Sender線程發(fā)起的,在創(chuàng)建完MetadataRequest之后同樣會(huì)存入InFlightRequests,之后的步驟就和發(fā)送消息時(shí)的類似。元數(shù)據(jù)雖然由Sender線程負(fù)責(zé)更新,但是主線程也需要讀取這些信息,這里的數(shù)據(jù)同步通過synchronized和final關(guān)鍵字來保障。

主站蜘蛛池模板: 祁连县| 屯门区| 云梦县| 靖州| 宜阳县| 湄潭县| 精河县| 兴国县| 镇康县| 班玛县| 武定县| 宿松县| 孟州市| 宁明县| 黔江区| 临高县| 元阳县| 开阳县| 苍溪县| 禹城市| 石家庄市| 宜宾县| 灵丘县| 定日县| 平果县| 望江县| 慈利县| 中卫市| 万盛区| 德化县| 会理县| 清流县| 武山县| 乐亭县| 兖州市| 中超| 桐柏县| 临泽县| 平陆县| 莆田市| 微山县|