- RocketMQ實戰與原理解析
- 楊開元
- 446字
- 2019-01-04 22:41:01
2.3 發送/接收消息示例
可以用自己熟悉的開發工具創建一個Java項目,加入RocketMQ Client包的依賴,用代碼清單2-1的內容發送消息,這個示例代碼是以Sync方式發送消息的。
代碼清單2-1 Producer示例程序
public class SyncProducer { public static void main(String[] args) throws Exception { //Instantiate with a Producer group name. DefaultMQProducer Producer = new DefaultMQProducer("please_rename_unique_group_name"); producer.setNamesrvAddr("192.168.100.131:9876");
//Launch the instance. Producer.start(); for (int i = 0; i < 100; i++) { //Create a Message instance, specifying Topic, tag and Message body. Message msg = new Message("TopicTest" /* Topic */, "TagA" /* Tag */, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ ); //Call send Message to deliver Message to one of brokers. SendResult sendResult = Producer.send(msg); System.out.printf("%s%n", sendResult); } //Shut down once the Producer instance is not longer in use. Producer.shutdown(); } }
主要流程是:創建一個DefaultMQProducer對象,設置好GroupName和NameServer地址后啟動,然后把待發送的消息拼裝成Message對象,使用Producer來發送。接下來看看如何接收消息,也就是使用DefaultMQPushConsumer類實現的消費者程序,如代碼清單2-2所示。
代碼清單2-2 Consumer示例程序
/* * Instantiate with specified Consumer group name. */ DefaultMQPushConsumer Consumer = new DefaultMQPushConsumer("please rename to unique group name"); /* * Specify name server addresses. Consumer.setNamesrvAddr("192.168.249.47:9876"); /* * Specify where to start in case the specified Consumer group is a brand new one. */ Consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); //Consumer.setMessageModel(MessageModel.BROADCASTING); /* * Subscribe one more more Topics to consume. */ Consumer.subscribe("TopicTest”, "*"); /* * Register callback to execute on arrival of Messages fetched from brokers. */ Consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n"); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); /* * Launch the Consumer instance. */ Consumer.start();
Consumer或Producer都必須設置GroupName、NameServer地址以及端口號。然后指明要操作的Topic名稱,最后進入發送和接收邏輯。
推薦閱讀
- Learning Python Web Penetration Testing
- 極簡算法史:從數學到機器的故事
- GeoServer Cookbook
- Mastering ServiceStack
- 小程序實戰視頻課:微信小程序開發全案精講
- x86匯編語言:從實模式到保護模式(第2版)
- JavaScript前端開發與實例教程(微課視頻版)
- Full-Stack Vue.js 2 and Laravel 5
- Terraform:多云、混合云環境下實現基礎設施即代碼(第2版)
- 微信小程序全棧開發技術與實戰(微課版)
- OpenGL Data Visualization Cookbook
- HoloLens與混合現實開發
- Web程序設計:ASP.NET(第2版)
- 你必須知道的.NET(第2版)
- 趣味掌控板編程