官术网_书友最值得收藏!

Scala Kafka consumer

This is the Scala version of the previous program and will work the same as the previous snippet. Kafka allows you to write consumer in many languages including Scala.

import org.apache.kafka.clients.consumer._
import org.apache.kafka.common.TopicPartition
import org.apache.log4j.Logger
import java.util._


object DemoConsumer {
private val log: Logger = Logger.getLogger(classOf[DemoConsumer])

@throws[Exception]
def main(args: Array[String]) {
val topic: String = "test1"
val topicList: List[String] = new ArrayList[String]
topicList.add(topic)
val consumerProperties: Properties = new Properties
consumerProperties.put("bootstrap.servers", "10.200.99.197:6667")
consumerProperties.put("group.id", "Demo_Group")
consumerProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") consumerProperties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
consumerProperties.put("enable.auto.commit", "true")
consumerProperties.put("auto.commit.interval.ms", "1000")
consumerProperties.put("session.timeout.ms", "30000")
val demoKafkaConsumer: KafkaConsumer[String, String] = new KafkaConsumer[String, String](consumerProperties)
demoKafkaConsumer.subscribe(topicList)
log.info("Subscribed to topic " + topic)
val i: Int = 0
try
while (true) {
val records: ConsumerRecords[String, String] = demoKafkaConsumer.poll(2)
import scala.collection.JavaConversions._
for (record <- records) {
log.info("offset = " + record.offset + "key =" + record.key + "value =" + record.value)
System.out.print(record.value)
}
//TODO : Do processing for data here
demoKafkaConsumer.commitAsync(new OffsetCommitCallback() {
def onComplete(map: Map[TopicPartition, OffsetAndMetadata], e: Exception) {
}
})
}

catch {
case ex: Exception => {
//TODO : Log Exception Here
}
} finally try
demoKafkaConsumer.commitSync()
finally demoKafkaConsumer.close()
}
}

主站蜘蛛池模板: 县级市| 将乐县| 五大连池市| 平舆县| 永济市| 长泰县| 大同县| 安仁县| 开鲁县| 磐安县| 沐川县| 景东| 安丘市| 康定县| 沈阳市| 广州市| 安乡县| 沾益县| 铜川市| 泽普县| 荃湾区| 辽阳市| 白沙| 女性| 黔东| 维西| 芒康县| 清镇市| 乌鲁木齐县| 宜宾县| 当雄县| 普定县| 乌拉特前旗| 峨眉山市| 蓬莱市| 洞头县| 荃湾区| 通江县| 黄浦区| 山阳县| 宁陕县|