官术网_书友最值得收藏!

Java Kafka consumer

The following program is a simple Java consumer which consumes data from topic test. Please make sure data is already available in the mentioned topic otherwise no record will be consumed.

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.log4j.Logger;

import java.util.*;

public class DemoConsumer {
private static final Logger log = Logger.getLogger(DemoConsumer.class);

public static void main(String[] args) throws Exception {

String topic = "test1";
List<String> topicList = new ArrayList<>();
topicList.add(topic);
Properties consumerProperties = new Properties();
consumerProperties.put("bootstrap.servers", "localhost:9092");
consumerProperties.put("group.id", "Demo_Group");
consumerProperties.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
consumerProperties.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");

consumerProperties.put("enable.auto.commit", "true");
consumerProperties.put("auto.commit.interval.ms", "1000");
consumerProperties.put("session.timeout.ms", "30000");

KafkaConsumer<String, String> demoKafkaConsumer = new KafkaConsumer<String, String>(consumerProperties);

demoKafkaConsumer.subscribe(topicList);
log.info("Subscribed to topic " + topic);
int i = 0;
try {
while (true) {
ConsumerRecords<String, String> records = demoKafkaConsumer.poll(500);
for (ConsumerRecord<String, String> record : records)
log.info("offset = " + record.offset() + "key =" + record.key() + "value =" + record.value());

//TODO : Do processing for data here
demoKafkaConsumer.commitAsync(new OffsetCommitCallback() {
public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {

}
});

}
} catch (Exception ex) {
//TODO : Log Exception Here
} finally {
try {
demoKafkaConsumer.commitSync();

} finally {
demoKafkaConsumer.close();
}
}
}
}
主站蜘蛛池模板: 扎囊县| 吐鲁番市| 改则县| 达孜县| 七台河市| 科技| 岳阳市| 蒙城县| 武功县| 玉门市| 永丰县| 河津市| 灌阳县| 庆阳市| 西乡县| 乌拉特中旗| 且末县| 海安县| 鄢陵县| 威信县| 萨嘎县| 巧家县| 恩平市| 甘谷县| 巴林左旗| 文水县| 鄂尔多斯市| 东丰县| 白水县| 河源市| 新郑市| 彭泽县| 西藏| 韩城市| 中江县| 苍梧县| 屏南县| 清水河县| 吉林市| 永登县| 沾益县|