官术网_书友最值得收藏!

4.2 初識KafkaAdminClient

一般情況下,我們都習慣使用kafka-topics.sh腳本來管理主題,但有些時候我們希望將主題管理類的功能集成到公司內部的系統中,打造集管理、監控、運維、告警為一體的生態平臺,那么就需要以程序調用API的方式去實現。本節主要介紹KafkaAdminClient的基本使用方式,以及采用這種調用API方式下的創建主題時的合法性驗證。

4.2.1 基本使用

代碼清單4-1中使用TopicCommand創建了一個主題,當然我們也可以用它來實現主題的刪除、修改、查看等操作,實質上與使用 kafka-config.sh 腳本的方式無異。這種方式與應用程序之間的交互性非常差,且不說它的編程模型類似于拼寫字符串,它本身調用的TopicCommand類的main()方法的返回值是一個void類,并不能提供給調用者有效的反饋信息。比如我們使用下面的方式來查看主題topic-create的詳細信息,如代碼清單4-2所示。

代碼清單4-2 查看主題

當調用 describeTopic()方法時,雖然我們可以在終端看到主題 topic-create 的詳細信息,但方法的調用者卻無法捕獲這個信息,因為返回值類型為void。對于方法的調用者而言,執行這個方法和不執行這個方法沒有什么區別。

在 Kafka 0.11.0.0 版本之前,我們可以通過 kafka-core 包(Kafka 服務端代碼)下的kafka.admin.AdminClient和kafka.admin.AdminUtils來實現部分Kafka的管理功能,但它們都已經過時了,在未來的版本中會被刪除。從 0.11.0.0 版本開始,Kafka 提供了另一個工具類org.apache.kafka.clients.admin.KafkaAdminClient來作為替代方案。KafkaAdminClient不僅可以用來管理broker、配置和ACL(Access Control List),還可以用來管理主題。

KafkaAdminClient繼承了org.apache.kafka.clients.admin.AdminClient抽象類,并提供了多種方法。篇幅限制,下面只列出與本章內容相關的一些方法。

· 創建主題:CreateTopicsResult createTopics(Collection<NewTopic>newTopics)。

· 刪除主題:DeleteTopicsResult deleteTopics(Collection<String>topics)。

· 列出所有可用的主題:ListTopicsResult listTopics()。

· 查看主題的信息:DescribeTopicsResult describeTopics(Collection<String>topicNames)。

· 查詢配置信息:DescribeConfigsResult describeConfigs(Collection<ConfigResource>resources)。

· 修改配置信息:AlterConfigsResult alterConfigs(Map<ConfigResource,Config>configs)。

· 增加分區:CreatePartitionsResult createPartitions(Map<String,NewPartitions>newPartitions)。

下面分別介紹這些方法的具體使用方式。首先分析如何使用KafkaAdminClient創建一個主題,下面的示例中創建了一個分區數為 4、副本因子為 1 的主題 topic-admin,如代碼清單 4-3所示。

代碼清單4-3 使用KafkaAdminClient創建一個主題

示例中第②行創建了一個KafkaAdminClient實例,實例中通過引入在第①行中建立的配置來連接 Kafka 集群。AdminClient.create()方法實際上調用的就是 KafkaAdminClient 中的createInternal方法構建的KafkaAdminClient實例,具體定義如下:

第③行中的 NewTopic 用來設定所要創建主題的具體信息,包含創建主題時需要的主題名稱、分區數和副本因子等。NewTopic中的成員變量如下所示。

同kafka-topics.sh腳本一樣,可以通過指定分區數和副本因子來創建一個主題,也可以通過指定區副本的具體分配方案來創建一個主題,比如將第③行替換為下面的內容:

也可以在創建主題時指定需要覆蓋的配置。比如覆蓋 cleanup.policy 配置,需要在第③和第④行之間加入如下代碼:

第④行是真正的創建主題的核心。KafkaAdminClient內部使用Kafka 的一套自定義二進制協議來實現諸如創建主題的管理功能。它主要的實現步驟如下:

(1)客戶端根據方法的調用創建相應的協議請求,比如創建主題的createTopics方法,其內部就是發送CreateTopicRequest請求。

(2)客戶端將請求發送至服務端。

(3)服務端處理相應的請求并返回響應,比如這個與CreateTopicRequest請求對應的就是CreateTopicResponse。

(4)客戶端接收相應的響應并進行解析處理。和協議相關的請求和相應的類基本都在org.apache.kafka.common.requests包下,AbstractRequest和AbstractResponse是這些請求和響應類的兩個基本父類。

有關Kafka的自定義協議的更多內容可以參閱6.1節。

第④行中的返回值是CreateTopicsResult類型,它的具體定義也很簡單,如代碼清單4-4所示。

代碼清單4-4 CreateTopicsResult的具體內容

CreateTopicsResult 中的方法主要還是針對成員變量 futures 的操作,futures 的類型Map<String,KafkaFuture<Void>>中的key代表主題名稱,而KafkaFuture<Void>代表創建后的返回值類型。KafkaAdminClient中的createTopics()方法可以一次性創建多個主題。KafkaFuture是原本為了支持JDK8以下的版本而自定義實現的一個類,實現了Future接口,可以通過Future.get()方法來等待服務端的返回,參見代碼清單 4-3 中的第⑤行。在未來的版本中,會有計劃地將KafkaFuture替換為JDK8中引入的CompletableFuture。

雖然這里創建主題之后的返回值類型為Void,但并不代表所有操作的返回值類型都是Void,比如 KafkaAdminClient 中的 listTopics()方法的返回值為 ListTopicsResult 類型,這個ListTopicsResult類型內部的成員變量future的類型為KafkaFuture<Map<String,TopicListing>>,這里就包含了具體的返回信息。

在使用KafkaAdminClient之后記得要調用close()方法來釋放資源。

KafkaAdminClient中的deleteTopics()、listTopics()及describeTopics()方法都很簡單,讀者不妨自己實踐一下。下面講一講describeConfigs()和alterConfigs()這兩個方法。首先查看剛剛創建的主題topic-admin的具體配置信息,如代碼清單4-5所示。

代碼清單4-5 describeConfigs()方法的使用示例

最終的輸出結果不會只列出被覆蓋的配置信息,而是會列出主題中所有的配置信息。

alterConfigs()方法的使用方式也很簡單。下面的示例中將主題 topic-admin 的cleanup.policy參數修改為compact,只需將代碼清單4-5中的第①至第④行替換為下面的內容即可:

本章的最后將演示如何使用KafkaAdminClient的createPartitions()方法來增加一個主題的分區。下面的示例將主題topic-admin的分區從4增加到5,只需將代碼清單4-5中的第①至第④行替換為下面的內容即可:

本節主要講述如何使用KafkaAdminClient來管理主題,對于其他的功能介紹,以及如何改造擴展KafkaAdminClient的功能并沒有涉及,不過這些都會在6.4.2節和10.2節中進行擴充。

4.2.2 主題合法性驗證

一般情況下,Kafka 生產環境中的 auto.create.topics.enable 參數會被設置為false,即自動創建主題這條路會被堵住。kafka-topics.sh腳本創建的方式一般由運維人員操作,普通用戶無權過問。那么KafkaAdminClient就為普通用戶提供了一個“口子”,或者將其集成到公司內部的資源申請、審核系統中會更加方便。普通用戶在創建主題的時候,有可能由于誤操作或其他原因而創建了不符合運維規范的主題,比如命名不規范,副本因子數太低等,這些都會影響后期的系統運維。如果創建主題的操作封裝在資源申請、審核系統中,那么在前端就可以根據規則過濾不符合規范的申請操作。如果用戶用KafkaAdminClient或類似的工具創建了一個錯誤的主題,我們有什么辦法可以做相應的規范處理呢?

Kafka broker 端有一個這樣的參數:create.topic.policy.class.name,默認值為null,它提供了一個入口用來驗證主題創建的合法性。使用方式很簡單,只需要自定義實現org.apache.kafka.server.policy.CreateTopicPolicy 接口,比如下面示例中的 PolicyDemo。然后在broker 端的配置文件 config/server.properties 中配置參數 create.topic.policy.class.name的值為org.apache.kafka.server.policy.PolicyDemo,最后啟動服務。PolicyDemo的代碼參考代碼清單4-6,主要實現接口中的configure()、close()及validate()方法,configure()方法會在Kafka服務啟動的時候執行,validate()方法用來鑒定主題參數的合法性,其在創建主題時執行,close()方法在關閉Kafka服務時執行。

代碼清單4-6 主題合法性驗證示例

此時如果采用代碼清單4-3中的方式創建一個分區數為4、副本因子為1的主題,那么客戶端就出報出如下的錯誤:

相應的Kafka服務端的日志如下:

主站蜘蛛池模板: 海阳市| 乐安县| 德安县| 阿合奇县| 永安市| 元谋县| 若尔盖县| 大关县| 延津县| 如皋市| 连江县| 云霄县| 濮阳县| 汶上县| 昂仁县| 北辰区| 桦南县| 新巴尔虎左旗| 闽侯县| 长葛市| 延庆县| 淳化县| 桂东县| 梨树县| 南靖县| 双鸭山市| 承德县| 墨脱县| 宜兴市| 大余县| 彭泽县| 水城县| 铜川市| 化隆| 化德县| 杨浦区| 大埔区| 陆丰市| 翼城县| 小金县| 巍山|