官术网_书友最值得收藏!

1.3 生產與消費

由 1.1 節的內容可知,生產者將消息發送至 Kafka 的主題中,或者更加確切地說應該是主題的分區中,而消費者也是通過訂閱主題從而消費消息的。在演示生產與消費消息之前,需要創建一個主題作為消息的載體。

Kafka提供了許多實用的腳本工具,存放在$KAFKA_HOME的bin目錄下,其中與主題有關的就是 kafka-topics.sh 腳本,下面我們用它演示創建一個分區數為 4、副本因子為 3 的主題topic-demo,示例如下:

其中--zookeeper指定了Kafka所連接的ZooKeeper服務地址,--topic指定了所要創建主題的名稱,--replication-factor 指定了副本因子,--partitions 指定了分區個數,--create是創建主題的動作指令。

還可以通過--describe展示主題的更多具體信息,示例如下:

創建主題topic-demo之后我們再來檢測一下Kafka集群是否可以正常地發送和消費消息。$KAFKA_HOME/bin 目錄下還提供了兩個腳本 kafka-console-producer.sh 和 kafka-console-consumer.sh,通過控制臺收發消息。首先我們打開一個shell終端,通過kafka-console-consumer.sh腳本來訂閱主題topic-demo,示例如下:

其中--bootstrap-server指定了連接的Kafka集群地址,--topic指定了消費者訂閱的主題。目前主題topic-demo尚未有任何消息存入,所以此腳本還不能消費任何消息。

我們再打開一個shell終端,然后使用kafka-console-producer.sh腳本發送一條消息“Hello,Kafka!”至主題topic-demo,示例如下:

其中--broker-list指定了連接的Kafka集群地址,--topic指定了發送消息時的主題。示例中的第二行是通過人工鍵入的方式輸入的,按下回車鍵后會跳到第三行,即“>”字符處。此時原先執行 kafka-console-consumer.sh腳本的 shell終端中出現了剛剛輸入的消息“Hello,Kafka!”,示例如下:

讀者也可以通過輸入一些其他自定義的消息來熟悉消息的收發及這兩個腳本的用法。不過這兩個腳本一般用來做一些測試類的工作,在實際應用中,不會只是簡單地使用這兩個腳本來做復雜的與業務邏輯相關的消息生產與消費的工作,具體的工作還需要通過編程的手段來實施。下面就以Kafka自身提供的Java客戶端來演示消息的收發,與Kafka的Java客戶端相關的Maven依賴如下:

要往Kafka中寫入消息,首先要創建一個生產者客戶端實例并設置一些配置參數,然后構建消息的ProducerRecord對象,其中必須包含所要發往的主題及消息的消息體,進而再通過生產者客戶端實例將消息發出,最后可以通過 close()方法來關閉生產者客戶端實例并回收相應的資源。具體的示例如代碼清單1-1所示,與腳本演示時一樣,示例中僅發送一條內容為“Hello,Kafka!”的消息到主題topic-demo。

代碼清單1-1 生產者客戶端示例代碼

對應的消費消息也比較簡單,首先創建一個消費者客戶端實例并配置相應的參數,然后訂閱主題并消費即可,具體的示例代碼如代碼清單1-2所示。

代碼清單1-2 消費者客戶端示例代碼

通過這些示例,相信各位讀者對Kafka應該有了初步的認識。這僅僅是一個開始,要正確、靈活地運用好Kafka還需要對它進行深入探索,包括生產者和消費者客戶端的使用細節及原理、服務端的使用細節及原理、運維、監控等,每一個方面都等著讀者去一一攻破。

主站蜘蛛池模板: 济宁市| 阿拉善盟| 界首市| 克什克腾旗| 蒲城县| 鹰潭市| 营山县| 益阳市| 河津市| 若羌县| 什邡市| 喀什市| 两当县| 福安市| 图们市| 内丘县| 四子王旗| 尉犁县| 驻马店市| 郧西县| 苏尼特右旗| 怀来县| 靖西县| 屯门区| 会同县| 广宗县| 颍上县| 天台县| 延庆县| 宁津县| 榆林市| 昌邑市| 志丹县| 裕民县| 务川| 临汾市| 和龙市| 兴化市| 安远县| 鄂托克前旗| 武冈市|