官术网_书友最值得收藏!

Java Kafka producer example

We have covered different configurations and APIs in previous sections. Let's start coding one simple Java producer, which will help you create your own Kafka producer.

Prerequisite

  • IDE: We recommend that you use a Scala-supported IDE such as IDEA, NetBeans, or Eclipse. We have used JetBrains IDEA:
    https://www.jetbrains.com/idea/download/.
  • Build tool: Maven, Gradle, or others. We have used Maven to build our project.
  • Pom.xml: Add Kafka dependency to the pom file:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.0.0</version>
</dependency>

Java:

import java.util.Properties;
import java.util.concurrent.Future;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class DemoProducer {

public static void main(final String[] args) {
Properties producerProps = new Properties();
producerProps.put("bootstrap.servers", "localhost:9092");
producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producerProps.put("acks", "all");
producerProps.put("retries", 1);
producerProps.put("batch.size", 20000);
producerProps.put("linger.ms", 1);
producerProps.put("buffer.memory", 24568545);
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(producerProps);

for (int i = 0; i < 2000; i++) {
ProducerRecord data = new ProducerRecord<String, String>("test1", "Hello this is record " + i);
Future<RecordMetadata> recordMetadata = producer.send(data);
}
producer.close();
}
}

Scala:

import java.util.Properties
import org.apache.kafka.clients.producer._

object DemoProducer extends App {
override def main(args: Array[String]): Unit = {

val producerProps = new Properties()
producerProps.put("bootstrap.servers", "localhost:9092")
producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
producerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
producerProps.put("client.id", "SampleProducer")
producerProps.put("acks", "all")
producerProps.put("retries", new Integer(1))
producerProps.put("batch.size", new Integer(16384))
producerProps.put("linger.ms", new Integer(1))
producerProps.put("buffer.memory", new Integer(133554432))

val producer = new KafkaProducer[String, String](producerProps)

for (a <- 1 to 2000) {
val record: ProducerRecord[String, String] = new ProducerRecord("test1", "Hello this is record"+a)
producer.send(record);
}

producer.close()
}

}

The preceding example is a simple Java producer where we are producing string data without a key. We have also hardcoded the topic name, which probably can be read through configuration file or as an command line input. To understand producer, we have kept it simple. However, we will see good examples in upcoming chapters where we will follow good coding practice.

主站蜘蛛池模板: 文化| 于田县| 瑞安市| 娄底市| 江西省| 乌拉特后旗| 开封县| 绥棱县| 镇安县| 贞丰县| 永泰县| 剑河县| 交城县| 荣昌县| 曲阳县| 闽清县| 济源市| 泸水县| 务川| 龙游县| 辰溪县| 辽宁省| 东平县| 于田县| 凤翔县| 镇平县| 黄山市| 蒙山县| 吉木萨尔县| 兴义市| 新龙县| 革吉县| 霍林郭勒市| 怀仁县| 安图县| 黄陵县| 中卫市| 阜宁县| 邢台县| 彝良县| 贺州市|