rocketmq发送第一条消息(三)
生活随笔
收集整理的这篇文章主要介绍了
rocketmq发送第一条消息(三)
小编觉得挺不错的,现在分享给大家,帮大家做个参考.
直接上代码
导包,pom.xml
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.6.1</version></dependency>生产者
public static void main(String[] args) throws Exception{DefaultMQProducer producer = new DefaultMQProducer("testGrp");// 设置nameserver地址 nameserver具备路由功能(发现服务,有点注册中心的意思),让其分配合理的broker来进行消息发送producer.setNamesrvAddr("192.168.52.11:9876");try {producer.start();} catch (MQClientException e) {e.printStackTrace();}// topic 消息将要发送的地址,消息目的地// body 消息中具体的数据Message message1 = new Message("monkeyTopic", "第1条消息".getBytes());Message message2 = new Message("monkeyTopic", "第2条消息".getBytes());Message message3 = new Message("monkeyTopic", "第3条消息".getBytes());List<Message> messageList = new ArrayList<>();messageList.add(message1);messageList.add(message2);messageList.add(message3);// 同步消息发送,等待broker返回值 sendResult是返回回来的 // SendResult sendResult = producer.send(message); // 单条消息发送SendResult sendResult = producer.send(messageList); // 多条消息同时发送,总大小官方建议小于1MSystem.out.println("sendResult: " + sendResult);producer.shutdown();}生产者启动效果图:
消费者
public static void main(String[] args) throws Exception{DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testConsumer");consumer.setNamesrvAddr("192.168.52.11:9876");// 每个消费者需要关注一个topic,也就说消费消息只能消息固定topic的消息,不能随便消费// topic 表示消息地址// 过滤器 * 表示不过滤consumer.subscribe("monkeyTopic","*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {for (MessageExt messageExt : list){System.out.println(new String(messageExt.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 广播消息,所有的监听同一个topic的都能收到consumer.setMessageModel(MessageModel.BROADCASTING);// 集群消息,消息只能消费一次 // consumer.setMessageModel(MessageModel.CLUSTERING);consumer.start();System.out.println("consumer start .....");}消费者消费启动效果图:
总结
以上是生活随笔为你收集整理的rocketmq发送第一条消息(三)的全部内容,希望文章能够帮你解决所遇到的问题。
- 上一篇: rocketmq之控制台rocketmq
- 下一篇: rocketmq发送顺序消息(四)