官术网_书友最值得收藏!

4.1 主題的管理

主題的管理包括創(chuàng)建主題、查看主題信息、修改主題和刪除主題等操作。可以通過 Kafka提供的 kafka-topics.sh 腳本來執(zhí)行這些操作,這個腳本位于$KAFKA_HOME/bin/目錄下,其核心代碼僅有一行,具體如下:

可以看到其實質(zhì)上是調(diào)用了kafka.admin.TopicCommand類來執(zhí)行主題管理的操作。

主題的管理并非只有使用 kafka-topics.sh 腳本這一種方式,我們還可以通過KafkaAdminClient 的方式實現(xiàn)(這種方式實質(zhì)上是通過發(fā)送 CreateTopicsRequest、DeleteTopicsRequest 等請求來實現(xiàn)的,對于 XXXRequest 系列的細節(jié)在 6.1 節(jié)中會有詳細的介紹),甚至我們還可以通過直接操縱日志文件和ZooKeeper節(jié)點來實現(xiàn)。下面按照創(chuàng)建主題、查看主題信息、修改主題、刪除主題的順序來介紹其中的操作細節(jié)。

4.1.1 創(chuàng)建主題

如果broker端配置參數(shù)auto.create.topics.enable設(shè)置為true(默認值就是true),那么當生產(chǎn)者向一個尚未創(chuàng)建的主題發(fā)送消息時,會自動創(chuàng)建一個分區(qū)數(shù)為num.partitions (默認值為1)、副本因子為default.replication.factor(默認值為1)的主題。除此之外,當一個消費者開始從未知主題中讀取消息時,或者當任意一個客戶端向未知主題發(fā)送元數(shù)據(jù)請求時,都會按照配置參數(shù)num.partitions和default.replication.factor的值來創(chuàng)建一個相應(yīng)的主題。很多時候,這種自動創(chuàng)建主題的行為都是非預(yù)期的。除非有特殊應(yīng)用需求,否則不建議將auto.create.topics.enable參數(shù)設(shè)置為true,這個參數(shù)會增加主題的管理與維護的難度。

更加推薦也更加通用的方式是通過kafka-topics.sh腳本來創(chuàng)建主題。在1.3節(jié)演示消息的生產(chǎn)與消費時就通過這種方式創(chuàng)建了一個分區(qū)數(shù)為4、副本因子為3的主題topic-demo。下面通過創(chuàng)建另一個主題topic-create來回顧一下這種創(chuàng)建主題的方式,示例如下:

上面的示例中創(chuàng)建了一個分區(qū)數(shù)為 4、副本因子為 2 的主題。示例中的環(huán)境是一個包含 3個broker節(jié)點的集群,每個節(jié)點的名稱和brokerId的對照關(guān)系如下:

在執(zhí)行完腳本之后,Kafka會在log.dir或log.dirs參數(shù)所配置的目錄下創(chuàng)建相應(yīng)的主題分區(qū),默認情況下這個目錄為/tmp/kafka-logs/。我們來查看一下node1節(jié)點中創(chuàng)建的主題分區(qū),參考如下:

可以看到 node1 節(jié)點中創(chuàng)建了 2 個文件夾 topic-create-0 和 topic-create-1,對應(yīng)主題topic-create的2個分區(qū)編號為0和1的分區(qū),命名方式可以概括為<topic>-<partition>。嚴謹?shù)卣f,其實<topic>-<partition>這類文件夾對應(yīng)的不是分區(qū),分區(qū)同主題一樣是一個邏輯的概念而沒有物理上的存在。并且這里我們也只是看到了2個分區(qū),而我們創(chuàng)建的是4個分區(qū),其余2個分區(qū)被分配到了node2和node3節(jié)點中,參考如下:

三個broker節(jié)點一共創(chuàng)建了8個文件夾,這個數(shù)字8實質(zhì)上是分區(qū)數(shù)4與副本因子2的乘積。每個副本(或者更確切地說應(yīng)該是日志,副本與日志一一對應(yīng))才真正對應(yīng)了一個命名形式如<topic>-<partition>的文件夾。

主題、分區(qū)、副本和 Log(日志)的關(guān)系如圖 4-1 所示,主題和分區(qū)都是提供給上層用戶的抽象,而在副本層面或更加確切地說是Log層面才有實際物理上的存在。同一個分區(qū)中的多個副本必須分布在不同的broker中,這樣才能提供有效的數(shù)據(jù)冗余。對于示例中的分區(qū)數(shù)為4、副本因子為2、broker數(shù)為3的情況下,按照2、3、3的分區(qū)副本個數(shù)分配給各個broker是最優(yōu)的選擇。再比如在分區(qū)數(shù)為3、副本因子為3,并且broker數(shù)同樣為3的情況下,分配3、3、3的分區(qū)副本個數(shù)給各個broker是最優(yōu)的選擇,也就是每個broker中都擁有所有分區(qū)的一個副本。

圖4-1 主題、分區(qū)、副本和Log之間的關(guān)系

我們不僅可以通過日志文件的根目錄來查看集群中各個broker的分區(qū)副本的分配情況,還可以通過ZooKeeper客戶端來獲取。當創(chuàng)建一個主題時會在ZooKeeper的/brokers/topics/目錄下創(chuàng)建一個同名的實節(jié)點,該節(jié)點中記錄了該主題的分區(qū)副本分配方案。示例如下:

示例數(shù)據(jù)中的"2":[1,2]表示分區(qū) 2 分配了 2 個副本,分別在 brokerId 為 1 和 2 的 broker節(jié)點中。

回顧一下1.3 節(jié)中提及的知識點:kafka-topics.sh腳本中的 zookeeper、partitions、replication-factor和topic這4個參數(shù)分別代表ZooKeeper連接地址、分區(qū)數(shù)、副本因子和主題名稱。另一個 create 參數(shù)表示的是創(chuàng)建主題的指令類型,在 kafka-topics.sh 腳本中對應(yīng)的還有l(wèi)ist、describe、alter和delete這4個同級別的指令類型,每個類型所需要的參數(shù)也不盡相同。

還可以通過describe指令類型來查看分區(qū)副本的分配細節(jié),示例如下:

示例中的Topic和Partition分別表示主題名稱和分區(qū)號。PartitionCount表示主題中分區(qū)的個數(shù),ReplicationFactor表示副本因子,而Configs表示創(chuàng)建或修改主題時指定的參數(shù)配置。Leader表示分區(qū)的leader副本所對應(yīng)的brokerId,Isr表示分區(qū)的ISR集合,Replicas表示分區(qū)的所有的副本分配情況,即AR集合,其中的數(shù)字都表示的是brokerId。

使用kafka-topics.sh腳本創(chuàng)建主題的指令格式歸納如下:

到目前為止,創(chuàng)建主題時的分區(qū)副本都是按照既定的內(nèi)部邏輯來進行分配的。kafka-topics.sh腳本中還提供了一個 replica-assignment 參數(shù)來手動指定分區(qū)副本的分配方案。replica-assignment參數(shù)的用法歸納如下:

這種方式根據(jù)分區(qū)號的數(shù)值大小按照從小到大的順序進行排列,分區(qū)與分區(qū)之間用逗號“,”隔開,分區(qū)內(nèi)多個副本用冒號“:”隔開。并且在使用replica-assignment參數(shù)創(chuàng)建主題時不需要原本必備的partitions和replication-factor這兩個參數(shù)。

我們可以通過replica-assignment參數(shù)來創(chuàng)建一個與主題topic-create相同的分配方案的主題topic-create-same和不同的分配方案的主題topic-create-diff,示例如下:

注意同一個分區(qū)內(nèi)的副本不能有重復(fù),比如指定了0:0,1:1這種,就會報出AdminCommand-FailedException異常,示例如下:

如果分區(qū)之間所指定的副本數(shù)不同,比如0:1,0,1:0這種,就會報出AdminOperationException異常,示例如下:

當然,類似0:1,,0:1,1:0這種企圖跳過一個分區(qū)的行為也是不被允許的,示例如下:

在創(chuàng)建主題時我們還可以通過config參數(shù)來設(shè)置所要創(chuàng)建主題的相關(guān)參數(shù),通過這個參數(shù)可以覆蓋原本的默認配置。在創(chuàng)建主題時可以同時設(shè)置多個參數(shù),具體的用法歸納如下:

下面的示例使用了config參數(shù)來創(chuàng)建一個主題topic-config:

示例中設(shè)置了 cleanup.policy 參數(shù)為 compact,以及 max.message.bytes 參數(shù)為10000,這兩個參數(shù)都是主題端的配置,我們再次通過 describe 指令來查看所創(chuàng)建的主題信息:

可以看到 Configs 一欄中包含了創(chuàng)建時所設(shè)置的參數(shù)。我們還可以通過 ZooKeeper 客戶端查看所設(shè)置的參數(shù),對應(yīng)的ZooKeeper節(jié)點為/config/topics/[topic],示例如下:

創(chuàng)建主題時對于主題名稱的命名方式也很有講究。首先是不能與已經(jīng)存在的主題同名,如果創(chuàng)建了同名的主題就會報錯。我們嘗試創(chuàng)建一個已經(jīng)存在的主題topic-create,示例如下:

通過上面的示例可以看出,在發(fā)生命名沖突時會報出TopicExistsException的異常信息。在kafka-topics.sh 腳本中還提供了一個 if-not-exists 參數(shù),如果在創(chuàng)建主題時帶上了這個參數(shù),那么在發(fā)生命名沖突時將不做任何處理(既不創(chuàng)建主題,也不報錯)。如果沒有發(fā)生命名沖突,那么和不帶if-not-exists參數(shù)的行為一樣正常創(chuàng)建主題。我們再次嘗試創(chuàng)建一個已經(jīng)存在的主題topic-create,示例如下:

通過上面的示例可以看出,在添加if-not-exists參數(shù)之后,并沒有像第一次創(chuàng)建主題時的那樣出現(xiàn)“Created topic "topic-create".”的提示信息。通過describe指令查看主題中的分區(qū)數(shù)和副本因子數(shù),還是同第一次創(chuàng)建時的一樣分別為 4 和 2,也并沒有被覆蓋,如此便證實了if-not-exists參數(shù)可以在發(fā)生命名沖突時不做任何處理。在實際應(yīng)用中,如果不想在創(chuàng)建主題的時候跳出TopicExistsException的異常信息,不妨試一下這個參數(shù)。

kafka-topics.sh腳本在創(chuàng)建主題時還會檢測是否包含“.”或“_”字符。為什么要檢測這兩個字符呢?因為在Kafka的內(nèi)部做埋點時會根據(jù)主題的名稱來命名metrics的名稱,并且會將點號“.”改成下畫線“_”。假設(shè)遇到一個名稱為“topic.1_2”的主題,還有一個名稱為“topic_1.2”的主題,那么最后的metrics的名稱都會為“topic_1_2”,這樣就發(fā)生了名稱沖突。舉例如下,首先創(chuàng)建一個以“topic.1_2”為名稱的主題,提示 WARNING 警告,之后再創(chuàng)建“topic.1_2”時發(fā)生InvalidTopicException異常。

注意要點:主題的命名同樣不推薦(雖然可以這樣做)使用雙下畫線“__”開頭,因為以雙下畫線開頭的主題一般看作Kafka的內(nèi)部主題,比如__consumer_offsets和__transaction_state。主題的名稱必須由大小寫字母、數(shù)字、點號“.”、連接線“-”、下畫線“_”組成,不能為空,不能只有點號“.”,也不能只有雙點號“..”,且長度不能超過249。

Kafka從0.10.x版本開始支持指定broker的機架信息(機架的名稱)。如果指定了機架信息,則在分區(qū)副本分配時會盡可能地讓分區(qū)副本分配到不同的機架上。指定機架信息是通過broker端參數(shù)broker.rack來配置的,比如配置當前broker所在的機架為“RACK1”:

如果一個集群中有部分broker指定了機架信息,并且其余的broker沒有指定機架信息,那么在執(zhí)行kafka-topics.sh腳本創(chuàng)建主題時會報出的AdminOperationException的異常,示例如下:

此時若要成功創(chuàng)建主題,要么將集群中的所有broker都加上機架信息或都去掉機架信息,要么使用disable-rack-aware參數(shù)來忽略機架信息,示例如下:

如果集群中的所有broker都有機架信息,那么也可以使用disable-rack-aware參數(shù)來忽略機架信息對分區(qū)副本的分配影響,有關(guān)分區(qū)副本的分配細節(jié)會在4.1.2節(jié)中做詳細介紹。

本節(jié)開頭就提及了 kafka-topics.sh 腳本實質(zhì)上是調(diào)用了 kafka.admin.TopicCommand 類,通過向 TopicCommand 類中傳入一些關(guān)鍵參數(shù)來實現(xiàn)主題的管理。我們也可以直接調(diào)用TopicCommand類中的main()函數(shù)來直接管理主題,比如這里創(chuàng)建一個分區(qū)數(shù)為1、副本因子為1的主題topic-create-api,如代碼清單4-1所示。

代碼清單4-1 使用TopicCommand創(chuàng)建主題

使用這種方式需要添加相應(yīng)的Maven依賴:

可以看到這種方式與使用kafka-topics.sh腳本的方式并無太大差別,可以使用這種方式集成到自動化管理系統(tǒng)中來創(chuàng)建相應(yīng)的主題。當然這種方式也可以適用于對主題的刪、改、查等操作的實現(xiàn),只需修改對應(yīng)的參數(shù)即可。不過更推薦使用4.2節(jié)中介紹的KafkaAdminClient來代替這種實現(xiàn)方式。

4.1.2 分區(qū)副本的分配

4.1.1節(jié)中多處提及了分區(qū)副本的分配,讀者對此或許有點迷惑,在生產(chǎn)者和消費者中也都有分區(qū)分配的概念。生產(chǎn)者的分區(qū)分配是指為每條消息指定其所要發(fā)往的分區(qū),消費者中的分區(qū)分配是指為消費者指定其可以消費消息的分區(qū),而這里的分區(qū)分配是指為集群制定創(chuàng)建主題時的分區(qū)副本分配方案,即在哪個broker中創(chuàng)建哪些分區(qū)的副本。

在創(chuàng)建主題時,如果使用了replica-assignment參數(shù),那么就按照指定的方案來進行分區(qū)副本的創(chuàng)建;如果沒有使用replica-assignment參數(shù),那么就需要按照內(nèi)部的邏輯來計算分配方案了。使用kafka-topics.sh腳本創(chuàng)建主題時的內(nèi)部分配邏輯按照機架信息劃分成兩種策略:未指定機架信息和指定機架信息。如果集群中所有的 broker 節(jié)點都沒有配置broker.rack參數(shù),或者使用disable-rack-aware參數(shù)來創(chuàng)建主題,那么采用的就是未指定機架信息的分配策略,否則采用的就是指定機架信息的分配策略。

首先看一下未指定機架信息的分配策略,具體的實現(xiàn)涉及代碼的邏輯細節(jié),未指定機架信息的分配策略比較容易理解,這里通過源碼來逐一進行分析。所對應(yīng)的具體實現(xiàn)為kafka.admin.AdminUtils.scala文件中的assignReplicasToBrokersRackUnaware()方法,該方法的內(nèi)容如下:

該方法參數(shù)列表中的fixedStartIndex和startPartitionId值是從上游的方法中調(diào)用傳下來的,都是-1,分別表示第一個副本分配的位置和起始分區(qū)編號。assignReplicasToBrokersRackUnaware ()方法的核心是遍歷每個分區(qū) partition,然后從 brokerArray (brokerId 的列表)中選取replicationFactor個brokerId分配給這個partition。

該方法首先創(chuàng)建一個可變的Map用來存放該方法將要返回的結(jié)果,即分區(qū)partition和分配副本的映射關(guān)系。由于fixedStartIndex為-1,所以startIndex是一個隨機數(shù),用來計算一個起始分配的brokerId,同時又因為startPartitionId為-1,所以currentPartitionId的值為0,可見默認情況下創(chuàng)建主題時總是從編號為0的分區(qū)依次輪詢進行分配。

nextReplicaShift表示下一次副本分配相對于前一次分配的位移量,從字面上理解有點繞口。舉個例子:假設(shè)集群中有3個broker節(jié)點,對應(yīng)于代碼中的brokerArray,創(chuàng)建的某個主題中有3個副本和6個分區(qū),那么首先從partitionId(partition的編號)為0的分區(qū)開始進行分配,假設(shè)第一次計算(由rand.nextInt(brokerArray.length)隨機產(chǎn)生)得到的nextReplicaShift值為1,第一次隨機產(chǎn)生的 startIndex 值為 2,那么 partitionId 為 0 的第一個副本的位置(這里指的是brokerArray的數(shù)組下標)firstReplicaIndex=(currentPartitionId+startIndex)%brokerArray.length=(0+2)%3=2,第二個副本的位置為replicaIndex(firstReplicaIndex,nextReplicaShift,j,brokerArray.length)=replicaIndex(2,nextReplicaShift+1,0,3)=?,這里引入了一個新的方法replicaIndex(),不過這個方法很簡單,具體如下:

繼續(xù)計算 replicaIndex(2,nextReplicaShift+1,0,3)=replicaIndex(2,2,0,3)=(2+(1+(2+0)%(3-1)))%3=0。繼續(xù)計算下一個副本的位置replicaIndex(2,2,1,3)=(2+(1+(2+1)%(3-1)))%3=1。所以partitionId為0的副本分配位置列表為[2,0,1],如果brokerArray正好是從0開始編號的,也正好是順序不間斷的,即brokerArray為[0,1,2],那么當前partitionId為0的副本分配策略為[2,0,1]。如果brokerId不是從0開始的,也不是順序的(有可能之前集群的其中幾個broker下線了),最終的brokerArray為[2,5,8],那么partitionId為0的分區(qū)的副本分配策略為[8,2,5]。為了便于說明問題,可以簡單假設(shè)brokerArray就是[0,1,2]。

同樣計算下一個分區(qū),即partitionId為1的副本分配策略。此時nextReplicaShift的值還是2,沒有滿足自增的條件。這個分區(qū)的 firstReplicaIndex=(1+2)%3=0。第二個副本的位置replicaIndex(0,2,0,3)=(0+(1+(2+0)%(3-1)))%3=1,第三個副本的位置replicaIndex(0,2,1,3)=2,最終partitionId為2的分區(qū)分配策略為[0,1,2]。

依次類推,更多的分配細節(jié)可以參考下面的示例,topic-test2的分區(qū)分配策略和上面陳述的一致:

我們無法預(yù)先獲知startIndex和nextReplicaShift的值,因為都是隨機產(chǎn)生的。startIndex和nextReplicaShift的值可以通過最終的分區(qū)分配方案來反推,比如上面的topic-test2,第一個分區(qū)(即partitionId=0的分區(qū))的第一個副本為2,那么可由2=(0+startIndex)%3推斷出startIndex為2。之所以startIndex選擇隨機產(chǎn)生,是因為這樣可以在多個主題的情況下盡可能地均勻分布分區(qū)副本,如果這里固定為一個特定值,那么每次的第一個副本都是在這個broker上,進而導(dǎo)致少數(shù)幾個broker所分配到的分區(qū)副本過多而其余broker分配到的分區(qū)副本過少,最終導(dǎo)致負載不均衡。尤其是某些主題的副本數(shù)和分區(qū)數(shù)都比較少,甚至都為1的情況下,所有的副本都落到了那個指定的broker上。與此同時,在分配時位移量nextReplicaShift也可以更好地使分區(qū)副本分配得更加均勻。

相比較而言,指定機架信息的分配策略比未指定機架信息的分配策略要稍微復(fù)雜一些,但主體思想并沒相差很多,只是將機架信息作為附加的參考項。假設(shè)目前有3個機架rack1、rack2和rack3,Kafka集群中的9個broker點都部署在這3個機架之上,機架與broker節(jié)點的對照關(guān)系如下:

如果不考慮機架信息,那么對照assignReplicasToBrokersRackUnaware()方法里的brokerArray變量的值為[0,1,2,3,4,5 6,7,8]。指定基架信息的assignReplicasToBrokersRackAware()方法里的brokerArray的值在這里就會被轉(zhuǎn)換為[0,3,6,1,4,7,2,5,8],顯而易見,這是輪詢各個機架而產(chǎn)生的結(jié)果,如此新的brokerArray(確切地說是arrangedBrokerList)中包含了簡單的機架分配信息。之后的步驟也和assignReplicasToBrokersRackUnaware()方法類似,同樣包含startIndex、currentPartiionId、nextReplicaShift 的概念,循環(huán)為每一個分區(qū)分配副本。分配副本時,除了處理第一個副本,其余的也調(diào)用 replicaIndex()方法來獲得一個 broker,但這里和assignReplicasToBrokersRackUnaware()不同的是,這里不是簡單地將這個broker添加到當前分區(qū)的副本列表之中,還要經(jīng)過一層篩選,滿足以下任意一個條件的broker不能被添加到當前分區(qū)的副本列表之中:

· 如果此broker所在的機架中已經(jīng)存在一個broker擁有該分區(qū)的副本,并且還有其他的機架中沒有任何一個broker擁有該分區(qū)的副本。

· 如果此broker中已經(jīng)擁有該分區(qū)的副本,并且還有其他broker中沒有該分區(qū)的副本。

當創(chuàng)建一個主題時,無論通過kafka-topics.sh腳本,還是通過其他方式(比如4.2節(jié)中介紹的KafkaAdminClient)創(chuàng)建主題時,實質(zhì)上是在ZooKeeper中的/brokers/topics節(jié)點下創(chuàng)建與該主題對應(yīng)的子節(jié)點并寫入分區(qū)副本分配方案,并且在/config/topics/節(jié)點下創(chuàng)建與該主題對應(yīng)的子節(jié)點并寫入主題相關(guān)的配置信息(這個步驟可以省略不執(zhí)行)。而Kafka創(chuàng)建主題的實質(zhì)性動作是交由控制器異步去完成的,有關(guān)控制器的更多細節(jié)可以參考 6.4 節(jié)的相關(guān)內(nèi)容。

知道了 kafka-topics.sh 腳本的實質(zhì)之后,我們可以直接使用 ZooKeeper 的客戶端在/brokers/topics節(jié)點下創(chuàng)建相應(yīng)的主題節(jié)點并寫入預(yù)先設(shè)定好的分配方案,這樣就可以創(chuàng)建一個新的主題了。這種創(chuàng)建主題的方式還可以繞過一些原本使用kafka-topics.sh腳本創(chuàng)建主題時的一些限制,比如分區(qū)的序號可以不用從0開始連續(xù)累加了。首先我們通過ZooKeeper客戶端創(chuàng)建一個除了與主題topic-create名稱不同其余都相同的主題topic-create-zk,示例如下:

通過查看主題topic-create-zk的分配情況,可以看到與主題 topic-create 的信息沒有什么差別。

我們再創(chuàng)建一個另類的主題,分配情況和主題 topic-create 一樣,唯獨分區(qū)號已經(jīng)與主題topic-create-special大相徑庭,示例如下:

可以看到分區(qū)號為10、21、33和40,而通過單純地使用kafka-topics.sh腳本是無法實現(xiàn)的。不過這種方式也只是一些實戰(zhàn)方面上的技巧,筆者還是建議使用更加正統(tǒng)的kafka-topics.sh腳本或KafkaAdminClient來管理相應(yīng)的主題。

4.1.3 查看主題

4.1.1節(jié)中提及了kafka-topics.sh腳本有5種指令類型:create、list、describe、alter和delete。其中l(wèi)ist和describe指令可以用來方便地查看主題信息,在前面的內(nèi)容中我們已經(jīng)接觸過了describe指令的用法,本節(jié)會對其做更細致的講述。

通過list指令可以查看當前所有可用的主題,示例如下:

前面的章節(jié)我們都是通過 describe 指令來查看單個主題信息的,如果不使用--topic指定主題,則會展示出所有主題的詳細信息。--topic還支持指定多個主題,示例如下:

在使用 describe 指令查看主題信息時還可以額外指定 topics-with-overrides、under-replicated-partitions和unavailable-partitions這三個參數(shù)來增加一些附加功能。

增加topics-with-overrides參數(shù)可以找出所有包含覆蓋配置的主題,它只會列出包含了與集群不一樣配置的主題。注意使用topics-with-overrides參數(shù)時只顯示原本只使用describe指令的第一行信息,參考示例如下:

under-replicated-partitions和unavailable-partitions參數(shù)都可以找出有問題的分區(qū)。通過 under-replicated-partitions 參數(shù)可以找出所有包含失效副本的分區(qū)。包含失效副本的分區(qū)可能正在進行同步操作,也有可能同步發(fā)生異常,此時分區(qū)的ISR集合小于 AR 集合。對于通過該參數(shù)查詢到的分區(qū)要重點監(jiān)控,因為這很可能意味著集群中的某個broker已經(jīng)失效或同步效率降低等。有關(guān)失效副本的更多細節(jié)可以參閱8.1.1節(jié)。

舉個例子,參照主題topic-create的環(huán)境,我們將集群中的node2節(jié)點下線,之后再通過這個參數(shù)來查看topic-create的信息,參考如下:

我們再將node2節(jié)點恢復(fù),執(zhí)行同樣的命令,可以看到?jīng)]有任何信息顯示:

通過 unavailable-partitions 參數(shù)可以查看主題中沒有 leader 副本的分區(qū),這些分區(qū)已經(jīng)處于離線狀態(tài),對于外界的生產(chǎn)者和消費者來說處于不可用的狀態(tài)。

舉個例子,參考主題topic-create的環(huán)境,我們將集群中的node2和node3節(jié)點下線,之后再通過這個參數(shù)來查看topic-create的信息,參考如下:

我們再將node2和node3恢復(fù),執(zhí)行同樣的命令,可以看到?jīng)]有任何信息:

4.1.4 修改主題

當一個主題被創(chuàng)建之后,依然允許我們對其做一定的修改,比如修改分區(qū)個數(shù)、修改配置等,這個修改的功能就是由kafka-topics.sh腳本中的alter指令提供的。

我們首先來看如何增加主題的分區(qū)數(shù)。以前面的主題topic-config為例,當前分區(qū)數(shù)為1,修改為3,示例如下:

注意上面提示的告警信息:當主題中的消息包含key時(即key不為null),根據(jù)key計算分區(qū)的行為就會受到影響。當topic-config的分區(qū)數(shù)為1時,不管消息的key為何值,消息都會發(fā)往這一個分區(qū);當分區(qū)數(shù)增加到3時,就會根據(jù)消息的key來計算分區(qū)號,原本發(fā)往分區(qū)0的消息現(xiàn)在有可能會發(fā)往分區(qū)1或分區(qū)2。如此還會影響既定消息的順序,所以在增加分區(qū)數(shù)時一定要三思而后行。對于基于key計算的主題而言,建議在一開始就設(shè)置好分區(qū)數(shù)量,避免以后對其進行調(diào)整。

目前Kafka只支持增加分區(qū)數(shù)而不支持減少分區(qū)數(shù)。比如我們再將主題topic-config的分區(qū)數(shù)修改為1,就會報出InvalidPartitionException的異常,示例如下:

為什么不支持減少分區(qū)?

按照Kafka現(xiàn)有的代碼邏輯,此功能完全可以實現(xiàn),不過也會使代碼的復(fù)雜度急劇增大。實現(xiàn)此功能需要考慮的因素很多,比如刪除的分區(qū)中的消息該如何處理?如果隨著分區(qū)一起消失則消息的可靠性得不到保障;如果需要保留則又需要考慮如何保留。直接存儲到現(xiàn)有分區(qū)的尾部,消息的時間戳就不會遞增,如此對于Spark、Flink這類需要消息時間戳(事件時間)的組件將會受到影響;如果分散插入現(xiàn)有的分區(qū),那么在消息量很大的時候,內(nèi)部的數(shù)據(jù)復(fù)制會占用很大的資源,而且在復(fù)制期間,此主題的可用性又如何得到保障?與此同時,順序性問題、事務(wù)性問題,以及分區(qū)和副本的狀態(tài)機切換問題都是不得不面對的。反觀這個功能的收益點卻是很低的,如果真的需要實現(xiàn)此類功能,則完全可以重新創(chuàng)建一個分區(qū)數(shù)較小的主題,然后將現(xiàn)有主題中的消息按照既定的邏輯復(fù)制過去即可。

在創(chuàng)建主題時有一個if-not-exists參數(shù)來忽略一些異常,在這里也有對應(yīng)的參數(shù),如果所要修改的主題不存在,可以通過 if-exists 參數(shù)來忽略異常。下面修改一個不存在的主題topic-unknown的分區(qū),會報出錯誤信息“Topic topic-unknown does not exist”,示例如下:

除了修改分區(qū)數(shù),我們還可以使用kafka-topics.sh腳本的alter指令來變更主題的配置。在創(chuàng)建主題的時候我們可以通過config參數(shù)來設(shè)置所要創(chuàng)建主題的相關(guān)參數(shù),通過這個參數(shù)可以覆蓋原本的默認配置。在創(chuàng)建完主題之后,我們還可以通過alter指令配合config參數(shù)增加或修改一些配置以覆蓋它們配置原有的值。

下面的示例中演示了將主題topic-config的max.message.bytes配置值從10000修改為20000,示例如下:

我們再次覆蓋主題topic-config的另一個配置segment.bytes(看上去相當于增加動作),示例如下:

我們可以通過delete-config參數(shù)來刪除之前覆蓋的配置,使其恢復(fù)原有的默認值。下面的示例將主題topic-config中所有修改過的3個配置都刪除:

注意到在變更(增、刪、改)配置的操作執(zhí)行之后都會提示一段告警信息,指明了使用kafka-topics.sh腳本的alter指令來變更主題配置的功能已經(jīng)過時(deprecated),將在未來的版本中刪除,并且推薦使用kafka-configs.sh腳本來實現(xiàn)相關(guān)功能。

4.1.5 配置管理

kafka-configs.sh 腳本是專門用來對配置進行操作的,這里的操作是指在運行狀態(tài)下修改原有的配置,如此可以達到動態(tài)變更的目的。kafka-configs.sh腳本包含變更配置alter和查看配置describe這兩種指令類型。同使用kafka-topics.sh腳本變更配置的原則一樣,增、刪、改的行為都可以看作變更操作,不過kafka-configs.sh腳本不僅可以支持操作主題相關(guān)的配置,還可以支持操作broker、用戶和客戶端這3個類型的配置。

kafka-configs.sh腳本使用entity-type參數(shù)來指定操作配置的類型,并且使用entity-name參數(shù)來指定操作配置的名稱。比如查看主題topic-config的配置可以按如下方式執(zhí)行:

--describe指定了查看配置的指令動作,--entity-type指定了查看配置的實體類型,--entity-name指定了查看配置的實體名稱。entity-type只可以配置4個值:topics、brokers、clients和users,entity-type與entity-name的對應(yīng)關(guān)系如表4-1所示。

表4-1 entity-type和entity-name的對應(yīng)關(guān)系

使用alter指令變更配置時,需要配合add-config和delete-config這兩個參數(shù)一起使用。add-config參數(shù)用來實現(xiàn)配置的增、改,即覆蓋原有的配置;delete-config參數(shù)用來實現(xiàn)配置的刪,即刪除被覆蓋的配置以恢復(fù)默認值。

下面的示例演示了 add-config 參數(shù)的用法,覆蓋了主題 topic-config 的兩個配置cleanup.policy和max.message.bytes(示例執(zhí)行之前主題topic-config無任何被覆蓋的配置):

上面示例中還使用了兩種方式來查看主題topic-config中配置信息,注意比較這兩者之間的差別。

使用delete-config參數(shù)刪除配置時,同add-config參數(shù)一樣支持多個配置的操作,多個配置之間用逗號“,”分隔,下面的示例中演示了如何刪除上面剛剛增加的主題配置:

使用kafka-configs.sh腳本來變更(alter)配置時,會在ZooKeeper中創(chuàng)建一個命名形式為/config/<entity-type>/<entity-name>的節(jié)點,并將變更的配置寫入這個節(jié)點,比如對于主題topic-config而言,對應(yīng)的節(jié)點名稱為/config/topics/topic-config,節(jié)點中的數(shù)據(jù)內(nèi)容為:

可以推導(dǎo)出節(jié)點內(nèi)容的數(shù)據(jù)格式為:

其中property-name代表屬性名,property-value代表屬性值。增加配置實際上是往節(jié)點內(nèi)容中添加屬性的鍵值對,修改配置是在節(jié)點內(nèi)容中修改相應(yīng)屬性的屬性值,刪除配置是刪除相應(yīng)的屬性鍵值對。

變更配置時還會在ZooKeeper中的/config/changes/節(jié)點下創(chuàng)建一個以“config_change_”為前綴的持久順序節(jié)點(PERSISTENT_SEQUENTIAL),節(jié)點命名形式可以歸納為/config/changes/config_change_<seqNo>。比如示例中的主題topic-config與此對應(yīng)的節(jié)點名稱和節(jié)點內(nèi)容如下:

seqNo是一個單調(diào)遞增的10位數(shù)字的字符串,不足位則用0補齊。

查看(describe)配置時,就是從/config/<entity-type>/<entity-name>節(jié)點中獲取相應(yīng)的數(shù)據(jù)內(nèi)容。如果使用 kafka-configs.sh 腳本查看配置信息時沒有指定entity-name參數(shù)的值,則會查看entity-type所對應(yīng)的所有配置信息。示例如下:

4.1.6 主題端參數(shù)

與主題相關(guān)的所有配置參數(shù)在 broker 層面都有對應(yīng)參數(shù),比如主題端參數(shù) cleanup.policy對應(yīng)broker層面的log.cleanup.policy。如果沒有修改過主題的任何配置參數(shù),那么就會使用broker端的對應(yīng)參數(shù)作為其默認值??梢栽趧?chuàng)建主題時覆蓋相應(yīng)參數(shù)的默認值,也可以在創(chuàng)建完主題之后變更相應(yīng)參數(shù)的默認值。比如在創(chuàng)建主題的時候沒有指定cleanup.policy 參數(shù)的值,那么就使用 log.cleanup.policy 參數(shù)所配置的值作為cleanup.policy的值。

與主題相關(guān)的參數(shù)也有很多,由于篇幅限制,在前面的配置變更的示例中難以一一列出所有的參數(shù),但是從配置變更的角度而言,其操作方式都是一樣的。為了便于讀者查閱,表 4-2列出了主題端參數(shù)與broker端參數(shù)的對照關(guān)系。

表4-2 主題端參數(shù)與broker端參數(shù)的對照關(guān)系

續(xù)表

續(xù)表

4.1.7 刪除主題

如果確定不再使用一個主題,那么最好的方式是將其刪除,這樣可以釋放一些資源,比如磁盤、文件句柄等。kafka-topics.sh腳本中的delete指令就可以用來刪除主題,比如刪除一個主題topic-delete:

可以看到在執(zhí)行完刪除命令之后會有相關(guān)的提示信息,這個提示信息和broker端配置參數(shù)delete.topic.enable 有關(guān)。必須將delete.topic.enable參數(shù)配置為true才能夠刪除主題,這個參數(shù)的默認值就是true,如果配置為false,那么刪除主題的操作將會被忽略。在實際生產(chǎn)環(huán)境中,建議將這個參數(shù)的值設(shè)置為true。

如果要刪除的主題是 Kafka 的內(nèi)部主題,那么刪除時就會報錯。截至 Kafka 2.0.0,Kafka的內(nèi)部一共包含2個主題,分別為__consumer_offsets和__transaction_state。下面的示例中嘗試刪除內(nèi)部主題__consumer_offsets:

嘗試刪除一個不存在的主題也會報錯。比如下面的示例中嘗試刪除一個不存在的主題topic-unknown:

這里同alter指令一樣,也可以通過if-exists參數(shù)來忽略異常,參考如下:

使用kafka-topics.sh腳本刪除主題的行為本質(zhì)上只是在ZooKeeper中的/admin/delete_topics 路徑下創(chuàng)建一個與待刪除主題同名的節(jié)點,以此標記該主題為待刪除的狀態(tài)。與創(chuàng)建主題相同的是,真正刪除主題的動作也是由Kafka的控制器負責完成的。

了解這一原理之后,我們可以直接通過ZooKeeper的客戶端來刪除主題。下面示例中使用ZooKeeper客戶端zkCli.sh來刪除主題topic-delete:

我們還可以通過手動的方式來刪除主題。主題中的元數(shù)據(jù)存儲在 ZooKeeper 中的/brokers/topics 和/config/topics 路徑下,主題中的消息數(shù)據(jù)存儲在 log.dir 或log.dirs配置的路徑下,我們只需要手動刪除這些地方的內(nèi)容即可。下面的示例中演示了如何刪除主題topic-delete,總共分3個步驟,第一步和第二步的順序可以互換。

第一步,刪除ZooKeeper中的節(jié)點/config/topics/topic-delete。

第二步,刪除ZooKeeper中的節(jié)點/brokers/topics/topic-delete及其子節(jié)點。

第三步,刪除集群中所有與主題topic-delete有關(guān)的文件。

注意,刪除主題是一個不可逆的操作。一旦刪除之后,與其相關(guān)的所有消息數(shù)據(jù)會被全部刪除,所以在執(zhí)行這一操作的時候也要三思而后行。

介紹到這里,基本上kafka-topics.sh腳本的使用也就講完了,為了方便讀者查閱,表4-3中列出了所有kafka-topics.sh腳本中的參數(shù)。讀者也可以通過執(zhí)行無任何參數(shù)的kafka-topics.sh腳本,或者執(zhí)行kafka-topics.sh-help來查看幫助信息。

表4-3 kafka-topics.sh腳本中的參數(shù)

續(xù)表

主站蜘蛛池模板: 治多县| 康保县| 西畴县| 黔江区| 西宁市| 特克斯县| 兰溪市| 新乡县| 桂阳县| 上犹县| 长寿区| 峨眉山市| 修文县| 宁海县| 安化县| 盱眙县| 嘉定区| 甘肃省| 桐柏县| 万荣县| 积石山| 南漳县| 郎溪县| 类乌齐县| 成都市| 南丹县| 邵东县| 体育| 陆丰市| 涿鹿县| 五华县| 无棣县| 静宁县| 锡林浩特市| 维西| 常山县| 剑河县| 房山区| 宜丰县| 昌宁县| 青河县|