- 深入理解Kafka:核心設計與實踐原理
- 朱忠華
- 6841字
- 2019-07-09 14:50:31
2.1 客戶端開發
一個正常的生產邏輯需要具備以下幾個步驟:
(1)配置生產者客戶端參數及創建相應的生產者實例。
(2)構建待發送的消息。
(3)發送消息。
(4)關閉生產者實例。
代碼清單 1-2 中已經簡單對生產者客戶端的編碼做了一個基本演示,本節對其修改以做具體的分析,如代碼清單2-1所示。
代碼清單2-1 生產者客戶端示例代碼

相比代碼清單 1-2 而言,這里僅僅是讓編碼的邏輯顯得更加“正統”一些,也更加方便下面內容的陳述。
這里有必要單獨說明的是構建的消息對象 ProducerRecord,它并不是單純意義上的消息,它包含了多個屬性,原本需要發送的與業務相關的消息體只是其中的一個 value 屬性,比如“Hello,Kafka!”只是ProducerRecord對象中的一個屬性。ProducerRecord類的定義如下(只截取成員變量):

其中topic和partition字段分別代表消息要發往的主題和分區號。headers字段是消息的頭部,Kafka 0.11.x版本才引入這個屬性,它大多用來設定一些與應用相關的信息,如無需要也可以不用設置。key是用來指定消息的鍵,它不僅是消息的附加信息,還可以用來計算分區號進而可以讓消息發往特定的分區。前面提及消息以主題為單位進行歸類,而這個key可以讓消息再進行二次歸類,同一個key的消息會被劃分到同一個分區中,詳情參見2.1.4節。有key的消息還可以支持日志壓縮的功能,詳情參見5.4節。value是指消息體,一般不為空,如果為空則表示特定的消息—墓碑消息,詳情參見5.4節。timestamp是指消息的時間戳,它有CreateTime和LogAppendTime兩種類型,前者表示消息創建的時間,后者表示消息追加到日志文件的時間,詳情參見5.2節。
接下來我們將按照生產邏輯的各個步驟來一一做相應分析。
2.1.1 必要的參數配置
在創建真正的生產者實例前需要配置相應的參數,比如需要連接的Kafka集群地址。參照代碼清單2-1中的initConfig()方法,在Kafka生產者客戶端KafkaProducer中有3個參數是必填的。
· bootstrap.servers:該參數用來指定生產者客戶端連接Kafka集群所需的broker地址清單,具體的內容格式為host1:port1,host2:port2,可以設置一個或多個地址,中間以逗號隔開,此參數的默認值為“”。注意這里并非需要所有的broker地址,因為生產者會從給定的broker里查找到其他broker的信息。不過建議至少要設置兩個以上的broker 地址信息,當其中任意一個宕機時,生產者仍然可以連接到 Kafka集群上。有關此參數的更多釋義可以參考6.5.2節。
· key.serializer 和 value.serializer:broker 端接收的消息必須以字節數組(byte[])的形式存在。代碼清單2-1中生產者使用的KafkaProducer<String,String>和ProducerRecord<String,String>中的泛型<String,String>對應的就是消息中key和value的類型,生產者客戶端使用這種方式可以讓代碼具有良好的可讀性,不過在發往broker之前需要將消息中對應的key和value做相應的序列化操作來轉換成字節數組。key.serializer和value.serializer這兩個參數分別用來指定key和value序列化操作的序列化器,這兩個參數無默認值。注意這里必須填寫序列化器的全限定名,如代碼清單2-1中的org.apache.kafka.common.serialization.StringSerializer,單單指定StringSerializer是錯誤的,更多有關序列化的內容可以參考2.1.3節。
注意到代碼清單2-1中的initConfig()方法里還設置了一個參數client.id,這個參數用來設定KafkaProducer對應的客戶端id,默認值為“”。如果客戶端不設置,則KafkaProducer會自動生成一個非空字符串,內容形式如“producer-1”“producer-2”,即字符串“producer-”與數字的拼接。
KafkaProducer中的參數眾多,遠非示例initConfig()方法中的那樣只有4個,開發人員可以根據業務應用的實際需求來修改這些參數的默認值,以達到靈活調配的目的。一般情況下,普通開發人員無法記住所有的參數名稱,只能有個大致的印象。在實際使用過程中,諸如“key.serializer”“max.request.size”“interceptor.classes”之類的字符串經常由于人為因素而書寫錯誤。為此,我們可以直接使用客戶端中的 org.apache.kafka.clients.producer.ProducerConfig類來做一定程度上的預防措施,每個參數在 ProducerConfig 類中都有對應的名稱,以代碼清單2-1中的initConfig()方法為例,引入ProducerConfig后的修改結果如下:

注意到上面的代碼中key.serializer和value.serializer參數對應類的全限定名比較長,也比較容易寫錯,這里通過Java中的技巧來做進一步的改進,相關代碼如下:

如此代碼便簡潔了許多,同時進一步降低了人為出錯的可能性。在配置完參數之后,我們就可以使用它來創建一個生產者實例,示例如下:

KafkaProducer是線程安全的,可以在多個線程中共享單個KafkaProducer實例,也可以將KafkaProducer實例進行池化來供其他線程調用。
KafkaProducer 中有多個構造方法,比如在創建 KafkaProducer 實例時并沒有設定key.serializer 和 value.serializer 這兩個配置參數,那么就需要在構造方法中添加對應的序列化器,示例如下:

其內部原理和無序列化器的構造方法一樣,不過就實際應用而言,一般都選用 public KafkaProducer(Properties properties)這個構造方法來創建KafkaProducer實例。
2.1.2 消息的發送
在創建完生產者實例之后,接下來的工作就是構建消息,即創建ProducerRecord對象。通過代碼清單2-1中我們已經了解了ProducerRecord的屬性結構,其中topic屬性和value屬性是必填項,其余屬性是選填項,對應的ProducerRecord的構造方法也有多種,參考如下:

代碼清單 2-1 中使用的是最后一種構造方法,也是最簡單的一種,這種方式相當于將ProducerRecord中除topic和value外的屬性全部值設置為null。在實際的應用中,還會用到其他構造方法,比如要指定 key,或者添加 headers 等。有可能會遇到這些構造方法都不滿足需求的情況,需要自行添加更多的構造方法,比如下面的示例:

可以參閱11.1節的內容來了解此構造方法的具體應用。注意,針對不同的消息,需要構建不同的ProducerRecord對象,在實際應用中創建ProducerRecord對象是一個非常頻繁的動作。
創建生產者實例和構建消息之后,就可以開始發送消息了。發送消息主要有三種模式:發后即忘(fire-and-forget)、同步(sync)及異步(async)。
代碼清單2-1中的這種發送方式就是發后即忘,它只管往Kafka中發送消息而并不關心消息是否正確到達。在大多數情況下,這種發送方式沒有什么問題,不過在某些時候(比如發生不可重試異常時)會造成消息的丟失。這種發送方式的性能最高,可靠性也最差。
KafkaProducer 的 send()方法并非是 void 類型,而是 Future<RecordMetadata>類型,send()方法有2個重載方法,具體定義如下:

要實現同步的發送方式,可以利用返回的Future對象實現,示例如下:

實際上send()方法本身就是異步的,send()方法返回的Future對象可以使調用方稍后獲得發送的結果。示例中在執行send()方法之后直接鏈式調用了get()方法來阻塞等待Kafka的響應,直到消息發送成功,或者發生異常。如果發生異常,那么就需要捕獲異常并交由外層邏輯處理。
也可以在執行完send()方法之后不直接調用get()方法,比如下面的一種同步發送方式的實現:


這樣可以獲取一個RecordMetadata對象,在RecordMetadata對象里包含了消息的一些元數據信息,比如當前消息的主題、分區號、分區中的偏移量(offset)、時間戳等。如果在應用代碼中需要這些信息,則可以使用這個方式。如果不需要,則直接采用producer.send(record).get()的方式更省事。
Future 表示一個任務的生命周期,并提供了相應的方法來判斷任務是否已經完成或取消,以及獲取任務的結果和取消任務等。既然KafkaProducer.send()方法的返回值是一個Future類型的對象,那么完全可以用Java語言層面的技巧來豐富應用的實現,比如使用Future中的 get(long timeout,TimeUnit unit)方法實現可超時的阻塞。
KafkaProducer中一般會發生兩種類型的異常:可重試的異常和不可重試的異常。常見的可重試異常有:NetworkException、LeaderNotAvailableException、UnknownTopicOrPartitionException、NotEnoughReplicasException、NotCoordinatorException 等。比如NetworkException 表示網絡異常,這個有可能是由于網絡瞬時故障而導致的異常,可以通過重試解決;又比如LeaderNotAvailableException表示分區的leader副本不可用,這個異常通常發生在leader副本下線而新的 leader 副本選舉完成之前,重試之后可以重新恢復。不可重試的異常,比如 1.4 節中提及的RecordTooLargeException異常,暗示了所發送的消息太大,KafkaProducer對此不會進行任何重試,直接拋出異常。
對于可重試的異常,如果配置了 retries 參數,那么只要在規定的重試次數內自行恢復了,就不會拋出異常。retries參數的默認值為0,配置方式參考如下:

示例中配置了10次重試。如果重試了10次之后還沒有恢復,那么仍會拋出異常,進而發送的外層邏輯就要處理這些異常了。
同步發送的方式可靠性高,要么消息被發送成功,要么發生異常。如果發生異常,則可以捕獲并進行相應的處理,而不會像“發后即忘”的方式直接造成消息的丟失。不過同步發送的方式的性能會差很多,需要阻塞等待一條消息發送完之后才能發送下一條。
我們再來了解一下異步發送的方式,一般是在send()方法里指定一個Callback的回調函數,Kafka在返回響應時調用該函數來實現異步的發送確認。有讀者或許會有疑問,send()方法的返回值類型就是Future,而Future本身就可以用作異步的邏輯處理。這樣做不是不行,只不過Future里的 get()方法在何時調用,以及怎么調用都是需要面對的問題,消息不停地發送,那么諸多消息對應的Future對象的處理難免會引起代碼處理邏輯的混亂。使用Callback的方式非常簡潔明了,Kafka有響應時就會回調,要么發送成功,要么拋出異常。異步發送方式的示例如下:

示例代碼中遇到異常時(exception!=null)只是做了簡單的打印操作,在實際應用中應該使用更加穩妥的方式來處理,比如可以將異常記錄以便日后分析,也可以做一定的處理來進行消息重發。onCompletion()方法的兩個參數是互斥的,消息發送成功時,metadata 不為 null 而exception為null;消息發送異常時,metadata為null而exception不為null。

對于同一個分區而言,如果消息record1于record2之前先發送(參考上面的示例代碼),那么KafkaProducer就可以保證對應的callback1在callback2之前調用,也就是說,回調函數的調用也可以保證分區有序。
通常,一個KafkaProducer不會只負責發送單條消息,更多的是發送多條消息,在發送完這些消息之后,需要調用KafkaProducer的close()方法來回收資源。下面的示例中發送了100條消息,之后就調用了close()方法來回收所占用的資源:


close()方法會阻塞等待之前所有的發送請求完成后再關閉 KafkaProducer。與此同時,KafkaProducer還提供了一個帶超時時間的close()方法,具體定義如下:

如果調用了帶超時時間timeout的close()方法,那么只會在等待timeout時間內來完成所有尚未完成的請求處理,然后強行退出。在實際應用中,一般使用的都是無參的close()方法。
2.1.3 序列化
生產者需要用序列化器(Serializer)把對象轉換成字節數組才能通過網絡發送給Kafka。而在對側,消費者需要用反序列化器(Deserializer)把從 Kafka 中收到的字節數組轉換成相應的對象。在代碼清單2-1中,為了方便,消息的key和value都使用了字符串,對應程序中的序列化器也使用了客戶端自帶的org.apache.kafka.common.serialization.StringSerializer,除了用于String類型的序列化器,還有ByteArray、ByteBuffer、Bytes、Double、Integer、Long這幾種類型,它們都實現了org.apache.kafka.common.serialization.Serializer接口,此接口有3個方法:

configure()方法用來配置當前類,serialize()方法用來執行序列化操作。而close()方法用來關閉當前的序列化器,一般情況下 close()是一個空方法,如果實現了此方法,則必須確保此方法的冪等性,因為這個方法很可能會被KafkaProducer調用多次。
生產者使用的序列化器和消費者使用的反序列化器是需要一一對應的,如果生產者使用了某種序列化器,比如StringSerializer,而消費者使用了另一種序列化器,比如IntegerSerializer,那么是無法解析出想要的數據的。本節討論的都是與生產者相關的,對于與消費者相關的反序列化器的內容請參見3.2.3節。
下面就以StringSerializer為例來看看Serializer接口中的3個方法的使用方法,StringSerializer類的具體實現如代碼清單2-2所示。
代碼清單2-2 StringSerializer的代碼實現

首先是configure()方法,這個方法是在創建KafkaProducer實例的時候調用的,主要用來確定編碼類型,不過一般客戶端對于 key.serializer.encoding、value.serializer.encoding和serializer.encoding這幾個參數都不會配置,在KafkaProducer的參數集合(ProducerConfig)里也沒有這幾個參數(它們可以看作用戶自定義的參數),所以一般情況下encoding的值就為默認的“UTF-8”。serialize()方法非常直觀,就是將String類型轉為byte[]類型。
如果 Kafka 客戶端提供的幾種序列化器都無法滿足應用需求,則可以選擇使用如 Avro、JSON、Thrift、ProtoBuf和Protostuff等通用的序列化工具來實現,或者使用自定義類型的序列化器來實現。下面就以一個簡單的例子來介紹自定義類型的使用方法。
假設我們要發送的消息都是Company對象,這個Company的定義很簡單,只有名稱name和地址address,示例代碼參考如下(為了構建方便,示例中使用了lombok[2]工具):

下面我們再來看一下Company對應的序列化器CompanySerializer,示例代碼如代碼清單2-3所示。
代碼清單2-3 自定義的序列化器CompanySerializer


上面的這段代碼的邏輯很簡單,configure()和close()方法也都為空。與此對應的反序列化器CompanyDeserializer的詳細實現參見3.2.3節。
如何使用自定義的序列化器CompanySerializer呢?只需將KafkaProducer的value.serializer參數設置為CompanySerializer類的全限定名即可。假如我們要發送一個Company對象到Kafka,關鍵代碼如代碼清單2-4所示。
代碼清單2-4 自定義序列化器使用示例

注意,示例中消息的 key 對應的序列化器還是 StringSerializer,這個并沒有改動。其實key.serializer和value.serializer并沒有太大的區別,讀者可以自行修改key對應的序列化器,看看會不會有不一樣的效果。
2.1.4 分區器
消息在通過send()方法發往broker的過程中,有可能需要經過攔截器(Interceptor)、序列化器(Serializer)和分區器(Partitioner)的一系列作用之后才能被真正地發往 broker。攔截器(下一章會詳細介紹)一般不是必需的,而序列化器是必需的。消息經過序列化之后就需要確定它發往的分區,如果消息ProducerRecord中指定了partition字段,那么就不需要分區器的作用,因為partition代表的就是所要發往的分區號。
如果消息ProducerRecord中沒有指定partition字段,那么就需要依賴分區器,根據key這個字段來計算partition的值。分區器的作用就是為消息分配分區。
Kafka中提供的默認分區器是org.apache.kafka.clients.producer.internals.DefaultPartitioner,它實現了org.apache.kafka.clients.producer.Partitioner接口,這個接口中定義了2個方法,具體如下所示。

其中partition()方法用來計算分區號,返回值為int類型。partition()方法中的參數分別表示主題、鍵、序列化后的鍵、值、序列化后的值,以及集群的元數據信息,通過這些信息可以實現功能豐富的分區器。close()方法在關閉分區器的時候用來回收一些資源。
Partitioner 接口還有一個父接口 org.apache.kafka.common.Configurable,這個接口中只有一個方法:

Configurable接口中的configure()方法主要用來獲取配置信息及初始化數據。
在默認分區器 DefaultPartitioner 的實現中,close()是空方法,而在 partition()方法中定義了主要的分區分配邏輯。如果 key 不為 null,那么默認的分區器會對 key 進行哈希(采用MurmurHash2算法,具備高運算性能及低碰撞率),最終根據得到的哈希值來計算分區號,擁有相同key的消息會被寫入同一個分區。如果key為null,那么消息將會以輪詢的方式發往主題內的各個可用分區。
注意:如果 key 不為 null,那么計算得到的分區號會是所有分區中的任意一個;如果 key為null,那么計算得到的分區號僅為可用分區中的任意一個,注意兩者之間的差別。
在不改變主題分區數量的情況下,key與分區之間的映射可以保持不變。不過,一旦主題中增加了分區,那么就難以保證key與分區之間的映射關系了。
除了使用 Kafka 提供的默認分區器進行分區分配,還可以使用自定義的分區器,只需同DefaultPartitioner一樣實現Partitioner接口即可。默認的分區器在key為null時不會選擇非可用的分區,我們可以通過自定義的分區器DemoPartitioner來打破這一限制,具體的實現可以參考下面的示例代碼,如代碼清單2-5所示。
代碼清單2-5 自定義分區器實現


實現自定義的DemoPartitioner類之后,需要通過配置參數partitioner.class來顯式指定這個分區器。示例如下:

這個自定義分區器的實現比較簡單,讀者也可以根據自身業務的需求來靈活實現分配分區的計算方式,比如一般大型電商都有多個倉庫,可以將倉庫的名稱或ID作為key來靈活地記錄商品信息。
2.1.5 生產者攔截器
攔截器(Interceptor)是早在Kafka 0.10.0.0中就已經引入的一個功能,Kafka一共有兩種攔截器:生產者攔截器和消費者攔截器。本節主要講述生產者攔截器的相關內容,有關消費者攔截器的具體細節請參考3.2.9節。
生產者攔截器既可以用來在消息發送前做一些準備工作,比如按照某個規則過濾不符合要求的消息、修改消息的內容等,也可以用來在發送回調邏輯前做一些定制化的需求,比如統計類工作。
生產者攔截器的使用也很方便,主要是自定義實現 org.apache.kafka.clients.producer.ProducerInterceptor接口。ProducerInterceptor接口中包含3個方法:

KafkaProducer在將消息序列化和計算分區之前會調用生產者攔截器的onSend()方法來對消息進行相應的定制化操作。一般來說最好不要修改消息 ProducerRecord 的 topic、key 和partition 等信息,如果要修改,則需確保對其有準確的判斷,否則會與預想的效果出現偏差。比如修改key不僅會影響分區的計算,同樣會影響broker端日志壓縮(Log Compaction)的功能。
KafkaProducer 會在消息被應答(Acknowledgement)之前或消息發送失敗時調用生產者攔截器的 onAcknowledgement()方法,優先于用戶設定的 Callback 之前執行。這個方法運行在Producer 的 I/O 線程中,所以這個方法中實現的代碼邏輯越簡單越好,否則會影響消息的發送速度。
close()方法主要用于在關閉攔截器時執行一些資源的清理工作。在這 3 個方法中拋出的異常都會被捕獲并記錄到日志中,但并不會再向上傳遞。
ProducerInterceptor 接口與 2.1.4 節中的 Partitioner 接口一樣,它也有一個同樣的父接口Configurable,具體的內容可以參見Partitioner接口的相關介紹。
下面通過一個示例來演示生產者攔截器的具體用法,ProducerInterceptorPrefix 中通過onSend()方法來為每條消息添加一個前綴“prefix1-”,并且通過onAcknowledgement()方法來計算發送消息的成功率。ProducerInterceptorPrefix類的具體實現如代碼清單2-6所示。
代碼清單2-6 生產者攔截器示例


實現自定義的 ProducerInterceptorPrefix 之后,需要在 KafkaProducer 的配置參數interceptor.classes中指定這個攔截器,此參數的默認值為“”。示例如下:

然后使用指定了ProducerInterceptorPrefix的生產者連續發送10條內容為“kafka”的消息,在發送完之后客戶端打印出如下信息:

如果消費這 10 條消息,會發現消費了的消息都變成了“prefix1-kafka”,而不是原來的“kafka”。
KafkaProducer中不僅可以指定一個攔截器,還可以指定多個攔截器以形成攔截鏈。攔截鏈會按照 interceptor.classes 參數配置的攔截器的順序來一一執行(配置的時候,各個攔截器之間使用逗號隔開)。下面我們再添加一個自定義攔截器ProducerInterceptorPrefixPlus,它只實現了Interceptor接口中的onSend()方法,主要用來為每條消息添加另一個前綴“prefix2-”,具體實現如下:


接著修改生產者的interceptor.classes配置,具體實現如下:

此時生產者再連續發送10條內容為“kafka”的消息,那么最終消費者消費到的是10條內容為“prefix2-prefix1-kafka”的消息。如果將interceptor.classes配置中的兩個攔截器的位置互換:

那么最終消費者消費到的消息為“prefix1-prefix2-kafka”。
如果攔截鏈中的某個攔截器的執行需要依賴于前一個攔截器的輸出,那么就有可能產生“副作用”。設想一下,如果前一個攔截器由于異常而執行失敗,那么這個攔截器也就跟著無法繼續執行。在攔截鏈中,如果某個攔截器執行失敗,那么下一個攔截器會接著從上一個執行成功的攔截器繼續執行。
- Advanced Machine Learning with Python
- Interactive Data Visualization with Python
- Flash CS6中文版應用教程(第三版)
- RSpec Essentials
- Procedural Content Generation for C++ Game Development
- 好好學Java:從零基礎到項目實戰
- Visual Studio 2015高級編程(第6版)
- Processing創意編程指南
- HTML+CSS+JavaScript編程入門指南(全2冊)
- Unity 2018 Augmented Reality Projects
- Ext JS 4 Plugin and Extension Development
- Python面試通關寶典
- 算法超簡單:趣味游戲帶你輕松入門與實踐
- Python數據預處理技術與實踐
- AngularJS UI Development