書名: Building Data Streaming Applications with Apache Kafka作者名: Manish Kumar Chanchal Singh本章字數: 134字更新時間: 2022-07-12 10:38:18
Scala Kafka consumer
This is the Scala version of the previous program and will work the same as the previous snippet. Kafka allows you to write consumer in many languages including Scala.
import org.apache.kafka.clients.consumer._
import org.apache.kafka.common.TopicPartition
import org.apache.log4j.Logger
import java.util._
object DemoConsumer {
private val log: Logger = Logger.getLogger(classOf[DemoConsumer])
@throws[Exception]
def main(args: Array[String]) {
val topic: String = "test1"
val topicList: List[String] = new ArrayList[String]
topicList.add(topic)
val consumerProperties: Properties = new Properties
consumerProperties.put("bootstrap.servers", "10.200.99.197:6667")
consumerProperties.put("group.id", "Demo_Group")
consumerProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") consumerProperties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
consumerProperties.put("enable.auto.commit", "true")
consumerProperties.put("auto.commit.interval.ms", "1000")
consumerProperties.put("session.timeout.ms", "30000")
val demoKafkaConsumer: KafkaConsumer[String, String] = new KafkaConsumer[String, String](consumerProperties)
demoKafkaConsumer.subscribe(topicList)
log.info("Subscribed to topic " + topic)
val i: Int = 0
try
while (true) {
val records: ConsumerRecords[String, String] = demoKafkaConsumer.poll(2)
import scala.collection.JavaConversions._
for (record <- records) {
log.info("offset = " + record.offset + "key =" + record.key + "value =" + record.value)
System.out.print(record.value)
}
//TODO : Do processing for data here
demoKafkaConsumer.commitAsync(new OffsetCommitCallback() {
def onComplete(map: Map[TopicPartition, OffsetAndMetadata], e: Exception) {
}
})
}
catch {
case ex: Exception => {
//TODO : Log Exception Here
}
} finally try
demoKafkaConsumer.commitSync()
finally demoKafkaConsumer.close()
}
}
推薦閱讀
- Advanced Splunk
- Computer Vision for the Web
- C# 2012程序設計實踐教程 (清華電腦學堂)
- Developing Middleware in Java EE 8
- iOS開發實戰:從零基礎到App Store上架
- Hands-On JavaScript High Performance
- Jenkins Continuous Integration Cookbook(Second Edition)
- SQL 經典實例
- 虛擬現實建模與編程(SketchUp+OSG開發技術)
- Implementing Domain:Specific Languages with Xtext and Xtend
- C語言程序設計實驗指導與習題精解
- 微信公眾平臺開發最佳實踐
- JavaScript高級程序設計(第4版)
- Puppet Cookbook(Third Edition)
- Mobile Test Automation with Appium