官术网_书友最值得收藏!

A command-line message consumer

The last step is how to read the generated messages. Kafka also has a powerful command that enables messages to be consumed from the command line. Remember that all of these command-line tasks can also be done programmatically. As the producer, each line in the input is considered a message from the producer. 

For this section, the execution of the previous steps is needed. The Kafka brokers must be up and running and a topic created inside them. Also, some messages need to be produced with the message console producer, to begin consuming these messages from the console.

Run the following command:

> <confluent-path>/bin/kafka-console-consumer --topic amazingTopic --bootstrap-server localhost:9093 --from-beginning

The output should be as follows:

Fool me once shame on you
Fool me twice shame on me

The parameters are the topic's name and the name of the broker producer. Also, the --from-beginning parameter indicates that messages should be consumed from the beginning instead of the last messages in the log (now test it, generate many more messages, and don't specify this parameter).

There are more useful parameters for this command, some important ones are as follows:

  • --fetch-size: This is the amount of data to be fetched in a single request. The size in bytes follows as argument. The default value is 1,024 x 1,024.
  • --socket-buffer-size: This is the size of the TCP RECV. The size in bytes follows this parameter. The default value is 2 x 1024 x 1024.
  • --formater: This is the name of the class to use for formatting messages for display. The default value is NewlineMessageFormatter.
  • --autocommit.interval.ms: This is the time interval at which to save the current offset in milliseconds. The time in milliseconds follows as argument. The default value is 10,000.
  • --max-messages: This is the maximum number of messages to consume before exiting. If not set, the consumption is continuous. The number of messages follows as the argument.
  • --skip-message-on-error: If there is an error while processing a message, the system should skip it instead of halting.

The most requested forms of this command are as follows:

  • To consume just one message, use the following:
      > <confluent-path>/bin/kafka-console-consumer --topic 
amazingTopic --
bootstrap-server localhost:9093 --max-messages 1

  • To consume one message from an offset, use the following:
      > <confluent-path>/bin/kafka-console-consumer --topic 
amazingTopic --
bootstrap-server localhost:9093 --max-messages 1 --formatter
'kafka.coordinator.GroupMetadataManager$OffsetsMessageFormatter'
  • To consume messages from a specific consumer group, use the following:
      <confluent-path>/bin/kafka-console-consumer –topic amazingTopic -
- bootstrap-server localhost:9093 --new-consumer --consumer-
property
group.id=my-group
主站蜘蛛池模板: 买车| 襄汾县| 剑河县| 新民市| 奇台县| 咸丰县| 屏边| 诏安县| 宁南县| 德令哈市| 佛冈县| 鄂托克旗| 吉木萨尔县| 岳西县| 金阳县| 德格县| 芒康县| 平罗县| 离岛区| 长武县| 若尔盖县| 新平| 巴马| 丁青县| 郧西县| 苍溪县| 辽中县| 定陶县| 宾川县| 措美县| 延津县| 都昌县| 灵寿县| 毕节市| 萨迦县| 黄山市| 商城县| 博湖县| 茶陵县| 龙井市| 曲麻莱县|