官术网_书友最值得收藏!

Rebalance listeners

We discussed earlier that in case of addition or removal of consumer to the consumer group, Kafka triggers the rebalancer and consumer loses the ownership of the current partition. This may lead to duplicate processing when the partition is reassigned to consumer. There are some other operations such as database connection operation, file operation, or caching operations that may be part of consumer; you may want to deal with this before ownership of the partition is lost.

Kafka provides you with an API to handle such scenarios. It provides the ConsumerRebalanceListener interface that contains the onPartitionsRevoked() and onPartitionsAssigned() methods. We can implement these two methods and pass an object while subscribing to the topic using the subscribe method discussed earlier:

import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.common.TopicPartition;

import java.util.Collection;

public class DemoRebalancer implements ConsumerRebalanceListener {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> collection) {
//TODO: Things to Do before your partition got revoked
}

@Override
public void onPartitionsAssigned(Collection<TopicPartition> collection) {
//TODO : Things to do when new partition get assigned
}
}
主站蜘蛛池模板: 保亭| 新巴尔虎左旗| 韩城市| 股票| 崇阳县| 抚宁县| 多伦县| 阳西县| 延川县| 博湖县| 乡宁县| 玉林市| 渭南市| 安仁县| 始兴县| 延安市| 金坛市| 屏东县| 汶上县| 都昌县| 墨脱县| 铅山县| 辛集市| 麦盖提县| 衡东县| 南澳县| 泸定县| 鄱阳县| 张掖市| 西城区| 杂多县| 花莲县| 四川省| 龙川县| 含山县| 禄劝| 宜章县| 宁城县| 石楼县| 汶川县| 仪陇县|