生活随笔
收集整理的这篇文章主要介绍了
RabbitMQ保姆级教程
小编觉得挺不错的,现在分享给大家,帮大家做个参考.
文章目录
- 前言
- 一、MQ是什么?
-
- 二、在Linux安装RabbitMQ
-
- 2.1 安装
- 2.2 RabbitMQ启动命令
- 2.3 开启RabbitMQ 后台管理界面
-
- 2.3 Docker启动RabbitMQ
- 2.4 常见消息模型
- 2.5 生产者(Producer) / 消费者(Consumer)
- 2.6 工作队列模式(Work Queues)
- 2.7 参数细节
- 2.8 实现能者多劳
-
- 2.8.1 Ack手动应答防止数据丢失和消息拒收后重新发送
- 2.8.2 预取值
- 2.9 Publish/Subscribe 发布/订阅
- 2.10 Routing(路由) - Direct
- 2.11 Routing(路由)- Topic
- 三、进阶篇 高级特性
-
- 3.1 死信队列
-
- 3.1.1 死信队列实战:消息TTL过期
- 3.1.2 死信队列实战:队列达到最大长度 设置正常队列最大长度
- 3.1.3 死信队列实战:消息被拒
- 3.2 基于SpringBoot实现延迟队列
- 3.3 发布确认 高级特性
-
- 3.3.1 可靠性投递confirm模式
- 3.3.2 可靠性投递return模式
- 3.4 优先级队列
- 3.5 消费端限流
前言
提示:RaabitMQ消息队列的学习。
一、MQ是什么?
- MQ全称 Message Queue(消息队列),是在消息的传输过程中保存消息的容器。多用于分布式系统
之间进行通信。 - RabbitMQ 是一个消息中间件:它接受并转发消息。你可以把它当做一个快递站点,当你要发送一个包
裹时,你把你的包裹放到快递站,快递员最终会把你的快递送到收件人那里,按照这种逻辑 RabbitMQ 是
一个快递站,一个快递员帮你传递快件。RabbitMQ 与快递站的主要区别在于,它不处理快件而是接收,
存储和转发消息数据。
- 工作原理
1.1 AMQP
- AMQP,即 Advanced Message Queuing Protocol(高级消息队列协议),是一个网络协议,是应用
层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,遵
循此协议,不收客户端和中间件产品和开发语言限制。2006年,AMQP 规范发布。类比HTTP。
二、在Linux安装RabbitMQ
2.1 安装
1. 我们把erlang环境与rabbitMQ 安装包解压到
Linux2. rpm
-ivh erlang安装包
3. yum install socat
-y 安装依赖
/ rpm
-ivh socat依赖包
--force
--nodeps
4. rpm
-ivh rabbitmq安装包
2.2 RabbitMQ启动命令
1. 开启服务
/sbin
/service rabbitmq
-server start
/ service rabbitmq
-server start
2. 停止服务 service rabbitmq
-server stop
3. 重启服务 service rabbitmq
-server restart
2.3 开启RabbitMQ 后台管理界面
1. rabbitmq
-plugins enable rabbitmq_management
1. 创建rabbitMQ账号rabbitmqctl add_user 用户名 密码
2. 设置用户角色rabbitmqctl set_user_tags 用户名 administrator #设置用户名为超级管理员
3. 设置用户权限rabbitmqctl set_permissions
-p
"/" admin
".*" ".*" ".*"4. 查看rabbitmq的用户和角色rabbitmqctl list_users
5. 登录rabbitMQ 界面:
Linux虚拟机ip
:15672 即可
2.3.1 登录rabbitMQ UI界面
记得开放
15672端口访问
Linux虚拟机ip
:15672 即可
输入账户密码后看到这个界面代表成功
2.3 Docker启动RabbitMQ
Docker安装
1. docker pull rabbitmq
:3-management
2. 开启rabbitMQdocker run \
-e RABBITMQ_DEFAULT_USER
=root \
-e RABBITMQ_DEFAULT_PASS
=123456 \
--name mq \
--hostname mq1 \
-p
15672:15672 \
-p
5672:5672 \
-d \rabbitmq
:3-management
2.4 常见消息模型
- channel:操作MQ的工具
- exchange:路由消息到队列中
- queue:缓存消息
- virtual host:虚拟主机,是对queue、exchange等资源的逻辑分组
2.5 生产者(Producer) / 消费者(Consumer)
<dependencies><dependency><groupId>com.rabbitmq
</groupId><artifactId>amqp-client
</artifactId><version>5.7.3
</version></dependency><dependency><groupId>commons-io
</groupId><artifactId>commons-io
</artifactId><version>2.4
</version></dependency></dependencies>
1234567891011121314
public class Producer {public static final String QUEUE_NAME
="hello";public static void main(String[] args
) throws Exception{ConnectionFactory factory
= new ConnectionFactory();factory
.setHost("ip地址");factory
.setUsername("RabbitMQ登录用户名");factory
.setPassword("RabbitMQ登录密码");Connection connection
= factory
.newConnection();Channel channel
= connection
.createChannel();channel
.queueDeclare(QUEUE_NAME
,false,false,false,null);String message
="hello rabbitMQ";channel
.basicPublish("",QUEUE_NAME
,null,message
.getBytes());System.out
.println("消息发送完毕");channel
.close();connection
.close();}
}
public class Consumer {public static final String QUEUE_NAME
="hello";public static void main(String[] args
) throws Exception{ConnectionFactory factory
= new ConnectionFactory();factory
.setHost("ip地址");factory
.setUsername("RabbitMQ登录用户名");factory
.setPassword("RabbitMQ登录密码");Connection connection
= factory
.newConnection();Channel channel
= connection
.createChannel();DeliverCallback deliverCallback
=(consumerTag
, message
)-> {String data
= new String(message
.getBody());System.out
.println(new String(message
.getBody()));};CancelCallback cancelCallback
=consumerTag
->{System.out
.println("消费消息被中断");};channel
.basicConsume(QUEUE_NAME
,true,deliverCallback
,cancelCallback
);}
}
2.6 工作队列模式(Work Queues)
- 模式说明
- Work Queues:与入门程序的简单模式相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消费,采用的是 轮询机制
- 应用场景:对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度
- 工作模式:生产者
public class ProducerWorkQueue {public static final String QUEUE_NAME
="hello";public static void main(String[] args
) throws Exception{ConnectionFactory factory
= new ConnectionFactory();factory
.setHost("ip地址");factory
.setUsername("RabbitMQ登录用户名");factory
.setPassword("RabbitMQ登录密码");Connection connection
= factory
.newConnection();Channel channel
= connection
.createChannel();channel
.queueDeclare(QUEUE_NAME
,false,false,false,null);for (int i
= 1; i
<= 10; i
++) {String message
=i
+"hello rabbitMQ";channel
.basicPublish("",QUEUE_NAME
,null,message
.getBytes());System.out
.println("消息发送完毕");}channel
.close();connection
.close();}
}
public class ConsumerWorkQueues1 {public static final String QUEUE_NAME
="hello";public static void main(String[] args
) throws Exception{ConnectionFactory factory
= new ConnectionFactory();factory
.setHost("ip地址");factory
.setUsername("RabbitMQ登录用户名");factory
.setPassword("RabbitMQ登录密码");Connection connection
= factory
.newConnection();Channel channel
= connection
.createChannel();DeliverCallback deliverCallback
=(consumerTag
, message
)-> {String data
= new String(message
.getBody());System.out
.println(new String(message
.getBody()));};CancelCallback cancelCallback
=consumerTag
->{System.out
.println("消费消息被中断");};channel
.basicConsume(QUEUE_NAME
,true,deliverCallback
,cancelCallback
);}
}
public class ConsumerWorkQueues2 {public static final String QUEUE_NAME
="hello";public static void main(String[] args
) throws Exception{ConnectionFactory factory
= new ConnectionFactory();factory
.setHost("ip地址");factory
.setUsername("RabbitMQ登录用户名");factory
.setPassword("RabbitMQ登录密码");Connection connection
= factory
.newConnection();Channel channel
= connection
.createChannel();DeliverCallback deliverCallback
=(consumerTag
, message
)-> {String data
= new String(message
.getBody());System.out
.println(new String(message
.getBody()));};CancelCallback cancelCallback
=consumerTag
->{System.out
.println("消费消息被中断");};channel
.basicConsume(QUEUE_NAME
,true,deliverCallback
,cancelCallback
);}
}
- 结果:各执行五次,也验证了 我们上面所说的 轮询机制
- 小结:
一个消息只能有一个接收者,但是可以有多个接收者
2.7 参数细节
- durable:是否进行持久化,当前队列如果进行持久化,我们重启rabbitMQ后当前队列依旧存在
channel
.queueDeclare(QUEUE_NAME
,(durable
)true/false,false,false,null);
- props :队列中的信息是否持久化,若消息持久化,我们重启rabbitMQ后当前队列依旧存在
channel
.basicPublish("",QUEUE_NAME
, MessageProperties.PERSISTENT_TEXT_PLAIN
,message
.getBytes());
- autoDelete:是否自动删除,最后一个消费者断开连接后,该队列是否自动删除
channel
.queueDeclare(QUEUE_NAME
,false,false,(autoDelete的参数位置
)false,null);
若开启了自动应答
,rabbitMQ消息队列分配给消费者
10个数据
,只要消费者拿到消息队列的数据时
,就会告诉消息队列
,数据处理完毕。若当我们处理到第
5个数据时
,消费者出现了宕机
,死掉了
,则会出现数据丢失
channel
.basicConsume(QUEUE_NAME
,(autoAck是否自动应答
)false,deliverCallback
,cancelCallback
);
2.8 实现能者多劳
-
业务场景:
当我们的两个消费者执行业务时,a消费者执行速度快,b消费者执行速度慢,我们想让执行速度快的多执行,应当如何实现呢?
- 开启不公平分发,能者多劳 channel.basicQos(1); 0:轮询机制 1:能者多劳
- 开启手动确认
-
消费者a
public class ConsumerWorkQueues1 {public static final String QUEUE_NAME
="hello";public static void main(String[] args
) throws Exception{ConnectionFactory factory
= new ConnectionFactory();factory
.setHost("ip地址");factory
.setUsername("RabbitMQ登录用户名");factory
.setPassword("RabbitMQ登录密码");Connection connection
= factory
.newConnection();Channel channel
= connection
.createChannel();channel
.basicQos(1);DeliverCallback deliverCallback
=(consumerTag
, message
)-> {String data
= new String(message
.getBody());System.out
.println(new String(message
.getBody()));channel
.basicAck(message
.getEnvelope().getDeliveryTag(),false);};CancelCallback cancelCallback
=consumerTag
->{System.out
.println("消费消息被中断");};channel
.basicConsume(QUEUE_NAME
,false,deliverCallback
,cancelCallback
);}
}
public class ConsumerWorkQueues2 {public static final String QUEUE_NAME
="hello";public static void main(String[] args
) throws Exception{ConnectionFactory factory
= new ConnectionFactory();factory
.setHost("ip地址");factory
.setUsername("RabbitMQ登录用户名");factory
.setPassword("RabbitMQ登录密码");Connection connection
= factory
.newConnection();Channel channel
= connection
.createChannel();channel
.basicQos(1);DeliverCallback deliverCallback
=(consumerTag
, message
)-> {try {Thread.sleep(100);} catch (InterruptedException e
) {e
.printStackTrace();}System.out
.println(new String(message
.getBody()));channel
.basicAck(message
.getEnvelope().getDeliveryTag(),false);};CancelCallback cancelCallback
=consumerTag
->{System.out
.println("消费消息被中断");};channel
.basicConsume(QUEUE_NAME
,false,deliverCallback
,cancelCallback
);}
2.8.1 Ack手动应答防止数据丢失和消息拒收后重新发送
- 应用场景:两个消费者每次都从队列中来获取消息,若消费者a正常执行,消费者b在执行过程中出现了宕机,挂掉了那么我们未被消费的消息会被重新放回到队列中,防止消息丢失。
生产者
public class ProducerWorkQueue {public static final String QUEUE_NAME
="hello";public static void main(String[] args
) throws Exception{ConnectionFactory factory
= new ConnectionFactory();factory
.setHost("ip地址");factory
.setUsername("RabbitMQ登录用户名");factory
.setPassword("RabbitMQ登录密码");Connection connection
= factory
.newConnection();Channel channel
= connection
.createChannel();channel
.queueDeclare(QUEUE_NAME
,false,false,false,null);Scanner scanner
= new Scanner(System.in
);while (true){String msg
= scanner
.nextLine();channel
.basicPublish("",QUEUE_NAME
, null,msg
.getBytes());System.out
.println("消息发送完毕");}}
}
消费者a
public class ConsumerWorkQueues1 {public static final String QUEUE_NAME
="hello";public static void main(String[] args
) throws Exception{ConnectionFactory factory
= new ConnectionFactory();factory
.setHost("ip地址");factory
.setUsername("RabbitMQ登录用户名");factory
.setPassword("RabbitMQ登录密码");Connection connection
= factory
.newConnection();Channel channel
= connection
.createChannel();DeliverCallback deliverCallback
=(consumerTag
, message
)-> {String data
= new String(message
.getBody());System.out
.println("消费者1===>"+new String(message
.getBody()));try {int i
=3/0;channel
.basicAck(message
.getEnvelope().getDeliveryTag(),false);}catch (Exception e
){System.out
.println("拒收消息发生了异常");channel
.basicNack(message
.getEnvelope().getDeliveryTag(),false,true);}};CancelCallback cancelCallback
=consumerTag
->{System.out
.println("消费消息被中断");};channel
.basicConsume(QUEUE_NAME
,false,deliverCallback
,cancelCallback
);}
}
消费者b
public class ConsumerWorkQueues2 {public static final String QUEUE_NAME
="hello";public static void main(String[] args
) throws Exception{ConnectionFactory factory
= new ConnectionFactory();factory
.setHost("ip地址");factory
.setUsername("RabbitMQ登录用户名");factory
.setPassword("RabbitMQ登录密码");Connection connection
= factory
.newConnection();Channel channel
= connection
.createChannel();DeliverCallback deliverCallback
=(consumerTag
, message
)-> {System.out
.println("睡10秒");try {Thread.sleep(1000*10);} catch (InterruptedException e
) {e
.printStackTrace();}System.out
.println(new String(message
.getBody()));channel
.basicAck(message
.getEnvelope().getDeliveryTag(),false);};CancelCallback cancelCallback
=consumerTag
->{System.out
.println("消费消息被中断");};channel
.basicConsume(QUEUE_NAME
,false,deliverCallback
,cancelCallback
);}
}
- 当消费者b在消费消息时,我们让消费者b睡眠10秒模拟业务流程,在这10秒内我们手动关掉消费者b
发送 aa 消费者a接收
发送bb消费者b接收,在消费者b睡眠过程中我们停止消费者b,来看看手动应答的结果
此时我们查看消费者a,出现了本应该是消费者b消费的消息bb
2.8.2 预取值
channel
.basicQos(1); 0:轮询机制
1:能者多劳 若值
>1代表当前队列的预取值
,代表当前队列大概会拿到多少值
2.9 Publish/Subscribe 发布/订阅
- 也可以叫 广播模式,当我们的P消费者发送了消息,交给了X(交换机),所有绑定了这个X(交换机)的队列都可以接收到P消费者发送的消息
- 代码实现生产者
public class Provider {public static void main(String[] args
) throws Exception{ConnectionFactory factory
= new ConnectionFactory();factory
.setHost("ip地址");factory
.setUsername("RabbitMQ登录用户名");factory
.setPassword("RabbitMQ登录密码");Connection connection
= factory
.newConnection();Channel channel
= connection
.createChannel();channel
.exchangeDeclare("order", "fanout");channel
.basicPublish("order","",null,"fanout type message".getBytes());channel
.close();connection
.close();}
}
public class Consumer {public static void main(String[] args
) throws IOException, TimeoutException {ConnectionFactory factory
= new ConnectionFactory();factory
.setHost("ip地址");factory
.setUsername("RabbitMQ登录用户名");factory
.setPassword("RabbitMQ登录密码");Connection connection
= factory
.newConnection();Channel channel
= connection
.createChannel();channel
.exchangeDeclare("order","fanout");String queueName
= channel
.queueDeclare().getQueue();channel
.queueBind(queueName
,"order","");channel
.basicConsume(queueName
,true,(consumerTag
,message
)->{System.out
.println("消费者1===>"+new String(message
.getBody()));},consumerTag
-> System.out
.println("取消消费消息"));}
}
2.10 Routing(路由) - Direct
routing值订阅模型-Direct(直连)
public class Provider {public static void main(String[] args
) throws Exception{ConnectionFactory factory
= new ConnectionFactory();factory
.setHost("ip地址");factory
.setUsername("RabbitMQ登录用户名");factory
.setPassword("RabbitMQ登录密码");Connection connection
= factory
.newConnection();Channel channel
= connection
.createChannel();channel
.exchangeDeclare("logsExchange","direct");channel
.basicPublish("logsExchange","msgRouting",null,"routing logs direct info 发送了消息".getBytes());channel
.close();connection
.close();}
}
public class Consumer1 {public static void main(String[] args
) throws Exception{ConnectionFactory factory
= new ConnectionFactory();factory
.setHost("ip地址");factory
.setUsername("RabbitMQ登录用户名");factory
.setPassword("RabbitMQ登录密码");Connection connection
= factory
.newConnection();Channel channel
= connection
.createChannel();channel
.exchangeDeclare("logs","direct");String queueName
= channel
.queueDeclare().getQueue();channel
.queueBind(queueName
,"logsExchange","infoRouting");channel
.queueBind(queueName
,"logsExchange","msgRouting");channel
.basicConsume(queueName
,true,(consumerTag
, message
) -> System.out
.println(new String(message
.getBody())),consumerTag
-> System.out
.println(1));}
}
public class Consumer2 {public static void main(String[] args
) throws Exception{ConnectionFactory factory
= new ConnectionFactory();factory
.setHost("ip地址");factory
.setUsername("RabbitMQ登录用户名");factory
.setPassword("RabbitMQ登录密码");Connection connection
= factory
.newConnection();Channel channel
= connection
.createChannel();channel
.exchangeDeclare("logs","direct");String queueName
= channel
.queueDeclare().getQueue();channel
.queueBind(queueName
,"logs","error");channel
.queueBind(queueName
,"logs","msg");channel
.basicConsume(queueName
,true,(consumerTag
, message
) -> System.out
.println(new String(message
.getBody())),consumerTag
-> System.out
.println(1));}
}
2.11 Routing(路由)- Topic
- Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。
- 只不过Topic类型Exchange可以让队列在绑定RoutingKey的时候使用通配符!
#通配符
* (star
) can substitute
for exactly one word :匹配一个词#
(hash
) can substitute
for zero or more words :匹配一个或多个词
public class Provider {public static void main(String[] args
) throws Exception{ConnectionFactory factory
= new ConnectionFactory();factory
.setHost("ip地址");factory
.setUsername("RabbitMQ登录用户名");factory
.setPassword("RabbitMQ登录密码");Connection connection
= factory
.newConnection();Channel channel
= connection
.createChannel();channel
.exchangeDeclare("order","topic");String routingKey
="user.order";channel
.basicPublish("order",routingKey
,null,("routing logs topic发送了消息"+routingKey
).getBytes());channel
.close();connection
.close();}
}
public class Consumer1 {public static void main(String[] args
) throws Exception{ConnectionFactory factory
= new ConnectionFactory();factory
.setHost("ip地址");factory
.setUsername("RabbitMQ登录用户名");factory
.setPassword("RabbitMQ登录密码");Connection connection
= factory
.newConnection();Channel channel
= connection
.createChannel();channel
.exchangeDeclare("order","topic");String queueName
= channel
.queueDeclare().getQueue();channel
.queueBind(queueName
,"order","user.*");channel
.basicConsume(queueName
,true,(consumerTag
, message
) -> System.out
.println(new String(message
.getBody())),consumerTag
-> System.out
.println(1));}
}
public class Consumer2 {public static void main(String[] args
) throws Exception{ConnectionFactory factory
= new ConnectionFactory();factory
.setHost("ip地址");factory
.setUsername("RabbitMQ登录用户名");factory
.setPassword("RabbitMQ登录密码");Connection connection
= factory
.newConnection();Channel channel
= connection
.createChannel();channel
.exchangeDeclare("order","topic");String queueName
= channel
.queueDeclare().getQueue();channel
.queueBind(queueName
,"order","user.#");channel
.basicConsume(queueName
,true,(consumerTag
, message
) -> System.out
.println(new String(message
.getBody())),consumerTag
-> System.out
.println(1));}
}
三、进阶篇 高级特性
3.1 死信队列
死信,顾名思义就是无法被消费的信息,字面意思可以这样理解,一般来说,producer将消息投递到queue里,consumer从queue取出消息进行消费,但某些时候由于特定的原因导致queue中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,自然就有了死信队列
为了保证订单业务的消息数据不丢失,需要使用到RabbitMQ的死信队列机制,当消息消费发生异常时,将消息投入死信队列中。比如说:用户在商城下单成功并点击去支付后在指定时间未支付时自动失效
- 生产者:给正产的消息队列发送消息,并且设置消息过期时间为10S,超过10S消息未被消费,则消息进入死信队列
public class TTLProvider {public static final String NORMAL_EXCHANGE
="normal_exchange";public static void main(String[] args
) throws Exception{ConnectionFactory factory
= new ConnectionFactory();factory
.setHost("ip");factory
.setUsername("账户");factory
.setPassword("密码");Connection connection
= factory
.newConnection();Channel channel
= connection
.createChannel();AMQP.BasicProperties properties
=new AMQP.BasicProperties().builder().expiration("10000").build();for (int i
= 1; i
<= 10; i
++) {String msg
=""+i
;channel
.basicPublish(NORMAL_EXCHANGE
,"normal",properties
,msg
.getBytes());}System.out
.println("结束发送");}
}
public class TTLConsumer1 {public static final String NORMAL_EXCHANGE
="normal_exchange";public static final String DEAD_EXCHANGE
="dead_exchange";public static final String NORMAL_QUEUE
="normal_queue";public static final String DEAD_QUEUE
="dead_queue";public static void main(String[] args
) throws Exception{ConnectionFactory factory
= new ConnectionFactory();factory
.setHost("ip");factory
.setUsername("账户");factory
.setPassword("密码");Connection connection
= factory
.newConnection();Channel channel
= connection
.createChannel();channel
.exchangeDeclare(NORMAL_EXCHANGE
,"direct");channel
.exchangeDeclare(DEAD_EXCHANGE
,"direct");HashMap<String, Object> map
= new HashMap<>();map
.put("x-dead-letter-exchange",DEAD_EXCHANGE
);map
.put("x-dead-letter-routing-key","dead");channel
.queueDeclare(NORMAL_QUEUE
,false,false,false,map
);channel
.queueDeclare(DEAD_QUEUE
,false,false,false,null);channel
.queueBind(NORMAL_QUEUE
,NORMAL_EXCHANGE
,"normal");channel
.queueBind(DEAD_QUEUE
,DEAD_EXCHANGE
,"dead");DeliverCallback deliverCallback
=( consumerTag
, message
)->{System.out
.println("Consumer1接收消息===>"+new String(message
.getBody(),"UTF-8"));};CancelCallback cancelCallback
=(consumerTag
)-> System.out
.println(consumerTag
);channel
.basicConsume(NORMAL_QUEUE
,true,deliverCallback
,cancelCallback
);}
}
public class TTLConsumer2 {public static final String DEAD_QUEUE
="dead_queue";public static void main(String[] args
) throws Exception{ConnectionFactory factory
= new ConnectionFactory();factory
.setHost("ip");factory
.setUsername("账户");factory
.setPassword("密码");Connection connection
= factory
.newConnection();Channel channel
= connection
.createChannel();DeliverCallback deliverCallback
=( consumerTag
, message
)->{System.out
.println("Consumer1接收消息===>"+new String(message
.getBody(),"UTF-8"));};CancelCallback cancelCallback
=(consumerTag
)-> System.out
.println(consumerTag
);channel
.basicConsume(DEAD_QUEUE
,true,deliverCallback
,cancelCallback
);}
}
- 结果:当设置了死信队列,和TTL过期时间,若超过了过期时间消息未被消费,则消息会转发到死信队列
死信队列产生三大原因 - 消息被拒接
- 消息TTL过期
- 队列达到最大长度
3.1.1 死信队列实战:消息TTL过期
@Configuration
public class RabbitMQConfiguration {public static final String X_EXCHANGE
="X";public static final String Y_DEAD_LETTER_EXCHANGE
="Y";public static final String QUEUE_A
="QA";public static final String QUEUE_B
="QB";public static final String DEAD_QUEUE_D
="QD";@Beanpublic DirectExchange xExchange(){return new DirectExchange(X_EXCHANGE
);}@Beanpublic DirectExchange yExchange(){return new DirectExchange(Y_DEAD_LETTER_EXCHANGE
);}@Beanpublic Queue queueA(){Map<String,Object> arg
=new HashMap<>();arg
.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE
);arg
.put("x-dead-letter-routing-key","YD");arg
.put("x-message-ttl",10000);return QueueBuilder.durable(QUEUE_A
).withArguments(arg
).build();}@Beanpublic Queue queueB(){Map<String,Object> arg
=new HashMap<>();arg
.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE
);arg
.put("x-dead-letter-routing-key","YD");arg
.put("x-message-ttl",40000);return QueueBuilder.durable(QUEUE_B
).withArguments(arg
).build();}@Beanpublic Queue queueD(){return QueueBuilder.durable(DEAD_QUEUE_D
).build();}@Beanpublic Binding queueABindingX(@Qualifier("queueA") Queue queueA
,@Qualifier("xExchange") DirectExchange xExchange
){return BindingBuilder.bind(queueA
).to(xExchange
).with("XA");}@Beanpublic Binding queueBBindingX(@Qualifier("queueB") Queue queueB
,@Qualifier("xExchange") DirectExchange xExchange
){return BindingBuilder.bind(queueB
).to(xExchange
).with("XB");}@Beanpublic Binding queueDBindingY(@Qualifier("queueD") Queue queueD
,@Qualifier("yExchange") DirectExchange yExchange
){return BindingBuilder.bind(queueD
).to(yExchange
).with("YD");}
}
@RestController
@RequestMapping("/ttl")
@Slf4j
public class TTLProvider {@Autowiredprivate RabbitTemplate rabbitTemplate
;@GetMapping("/{msg}")public void sendMsg(@PathVariable("msg") String msg
){log
.info("当前发送时间:{}发送了一条消息",new Date().toString());rabbitTemplate
.convertAndSend("X","XA","TTL消息延迟为10S,消息为===>"+msg
);rabbitTemplate
.convertAndSend("X","XB","TTL消息延迟为40S,消息为===>"+msg
);}
}
@Component
@Slf4j
public class DeadLetterConsumer {@RabbitListener(queues
= "QD")public void t1(Message message
, Channel channel
)throws Exception{log
.info("收到死信队列的消息{},时间为{}",new String(message
.getBody(),"UTF-8"),new Date().toString());}
}
3.1.2 死信队列实战:队列达到最大长度 设置正常队列最大长度
生产者
public class Producer {public static final String NORMAL_EXCHANGE
="normal_exchange";public static void main(String[] args
) throws Exception{ConnectionFactory factory
= new ConnectionFactory();factory
.setHost("ip地址");factory
.setUsername("RabbitMQ登录用户名");factory
.setPassword("RabbitMQ登录密码");Connection connection
= factory
.newConnection();Channel channel
= connection
.createChannel();for (int i
= 1; i
<= 10; i
++) {String msg
=""+i
;channel
.basicPublish(NORMAL_EXCHANGE
,"normal",null,msg
.getBytes());}}
}
消费者a
//设置当前正常队列的长度限制超过长度,后面的消息会进入到死信队列
map.put(“x-max-length”,6);
public class Consumer01 {public static final String NORMAL_EXCHANGE
="normal_exchange";public static final String DEAD_EXCHANGE
="dead_exchange";public static final String NORMAL_QUEUE
="normal_queue";public static final String DEAD_QUEUE
="dead_queue";public static void main(String[] args
) throws Exception{ConnectionFactory factory
= new ConnectionFactory();factory
.setHost("ip地址");factory
.setUsername("RabbitMQ登录用户名");factory
.setPassword("RabbitMQ登录密码");Connection connection
= factory
.newConnection();Channel channel
= connection
.createChannel();channel
.exchangeDeclare(NORMAL_EXCHANGE
,"direct");channel
.exchangeDeclare(DEAD_EXCHANGE
,"direct");HashMap<String, Object> map
= new HashMap<>();map
.put("x-dead-letter-exchange",DEAD_EXCHANGE
);map
.put("x-dead-letter-routing-key","dead");map
.put("x-max-length",6);channel
.queueDeclare(NORMAL_QUEUE
,false,false,false,map
);channel
.queueDeclare(DEAD_QUEUE
,false,false,false,null);channel
.queueBind(NORMAL_QUEUE
,NORMAL_EXCHANGE
,"normal");channel
.queueBind(DEAD_QUEUE
,DEAD_EXCHANGE
,"dead");DeliverCallback deliverCallback
=( consumerTag
, message
)->{System.out
.println("Consumer1接收消息===>"+new String(message
.getBody(),"UTF-8"));};CancelCallback cancelCallback
=(consumerTag
)-> System.out
.println(consumerTag
);channel
.basicConsume(NORMAL_QUEUE
,true,deliverCallback
,cancelCallback
);}
}
消费者b
public class Consumer02 {public static final String DEAD_QUEUE
="dead_queue";public static void main(String[] args
) throws Exception{ConnectionFactory factory
= new ConnectionFactory();factory
.setHost("ip地址");factory
.setUsername("RabbitMQ登录用户名");factory
.setPassword("RabbitMQ登录密码");Connection connection
= factory
.newConnection();Channel channel
= connection
.createChannel();DeliverCallback deliverCallback
=( consumerTag
, message
)->{System.out
.println("Consumer1接收消息===>"+new String(message
.getBody(),"UTF-8"));};CancelCallback cancelCallback
=(consumerTag
)-> System.out
.println(consumerTag
);channel
.basicConsume(DEAD_QUEUE
,true,deliverCallback
,cancelCallback
);}
}
3.1.3 死信队列实战:消息被拒
生产者
public class Producer {public static final String NORMAL_EXCHANGE
="normal_exchange";public static void main(String[] args
) throws Exception{ConnectionFactory factory
= new ConnectionFactory();factory
.setHost("ip地址");factory
.setUsername("RabbitMQ登录用户名");factory
.setPassword("RabbitMQ登录密码");Connection connection
= factory
.newConnection();Channel channel
= connection
.createChannel();for (int i
= 1; i
<= 10; i
++) {String msg
="info"+i
;channel
.basicPublish(NORMAL_EXCHANGE
,"normal",null,msg
.getBytes());}}
}
消费者a
- 此消息被拒接,是否重新放回正常队列, false:不放回 则会放到死信队列
- 1.channel.basicReject(message.getEnvelope().getDeliveryTag(),false);
- 2.并且开启手动应答
public class Consumer01 {public static final String NORMAL_EXCHANGE
="normal_exchange";public static final String DEAD_EXCHANGE
="dead_exchange";public static final String NORMAL_QUEUE
="normal_queue";public static final String DEAD_QUEUE
="dead_queue";public static void main(String[] args
) throws Exception{ConnectionFactory factory
= new ConnectionFactory();factory
.setHost("ip");factory
.setUsername("登录账户");factory
.setPassword("登录密码");Connection connection
= factory
.newConnection();Channel channel
= connection
.createChannel();channel
.exchangeDeclare(NORMAL_EXCHANGE
,"direct");channel
.exchangeDeclare(DEAD_EXCHANGE
,"direct");HashMap<String, Object> map
= new HashMap<>();map
.put("x-dead-letter-exchange",DEAD_EXCHANGE
);map
.put("x-dead-letter-routing-key","dead");channel
.queueDeclare(NORMAL_QUEUE
,false,false,false,map
);channel
.queueDeclare(DEAD_QUEUE
,false,false,false,null);channel
.queueBind(NORMAL_QUEUE
,NORMAL_EXCHANGE
,"normal");channel
.queueBind(DEAD_QUEUE
,DEAD_EXCHANGE
,"dead");DeliverCallback deliverCallback
=( consumerTag
, message
)->{String msg
=new String(message
.getBody());if("info5".equals(msg
)){System.out
.println("Consumer1接收消息===>"+msg
+"此消息被拒绝");channel
.basicReject(message
.getEnvelope().getDeliveryTag(),false);}else {System.out
.println("Consumer1接收消息===>"+new String(message
.getBody(),"UTF-8"));channel
.basicAck(message
.getEnvelope().getDeliveryTag(),false);}};CancelCallback cancelCallback
=(consumerTag
)-> System.out
.println(consumerTag
);channel
.basicConsume(NORMAL_QUEUE
,false,deliverCallback
,cancelCallback
);}
}
消费者b
public class Consumer02 {public static final String DEAD_QUEUE
="dead_queue";public static void main(String[] args
) throws Exception{ConnectionFactory factory
= new ConnectionFactory();factory
.setHost("ip地址");factory
.setUsername("RabbitMQ登录用户名");factory
.setPassword("RabbitMQ登录密码");Connection connection
= factory
.newConnection();Channel channel
= connection
.createChannel();DeliverCallback deliverCallback
=( consumerTag
, message
)->{System.out
.println("Consumer1接收消息===>"+new String(message
.getBody(),"UTF-8"));};CancelCallback cancelCallback
=(consumerTag
)-> System.out
.println(consumerTag
);channel
.basicConsume(DEAD_QUEUE
,true,deliverCallback
,cancelCallback
);}
}
3.2 基于SpringBoot实现延迟队列
配置队列交换机
@Configuration
public class QueueConfig {@Bean("exchange")public DirectExchange exchange(){return new DirectExchange("msg");}@Bean("simpleQue")public Queue simpleQue(){HashMap<String, Object> map
= new HashMap<>();map
.put("x-dead-letter-exchange","dead");map
.put("x-dead-letter-routing-key","deadKey");map
.put("x-message-ttl",10000);return new Queue("simple",false,false,false,map
);}@Beanpublic Binding simpleQueueBandingExchange(@Qualifier("simpleQue") Queue simple
,@Qualifier("exchange") DirectExchange msg
)throws Exception{return BindingBuilder.bind(simple
).to(msg
).with("info");}@Bean("deadExchange")public DirectExchange exchange1(){return new DirectExchange("dead");}@Bean("deadQueue")public Queue deadQ(){return new Queue("deadQue",false,false,false,null);}@Beanpublic Binding deadKeyBindingDeadExchange(@Qualifier("deadQueue")Queue queue
,@Qualifier("deadExchange")DirectExchange dead
){return BindingBuilder.bind(queue
).to(dead
).with("deadKey");}
}
生产者
@RestController
public class Provider {@Autowiredprivate RabbitTemplate rabbitTemplate
;@RequestMapping("/ttl/{message}")public void t1(@PathVariable String message
){String queueName
="simple";Date date
= new Date();System.out
.println(date
);rabbitTemplate
.convertAndSend("msg","info",message
);}
}
消费者
@Component
public class Consumer {@RabbitListener(queues
= "deadQue")public void hello(Message msg
, Channel channel
)throws Exception{System.out
.println("接收到消息"+new String(msg
.getBody()));Date date1
= new Date();System.out
.println(date1
);}
}
3.3 发布确认 高级特性
3.3.1 可靠性投递confirm模式
- 场景:在生产环境中由于一些不明原因,导致rabbitmq重启,在rabbitmq重启期间的生产者消息投递失败,导致消息丢失,需要手动处理和恢复。-可靠性投递confirm模式
- 需要在application核心配置文件中设置发布确认类型
- spring-rabbitmq-publisher-confirm-type: correlated
- 类型1:none:禁用发布确认模式,是默认值
- 类型2:correlated:发布消息成功到交换机后出发回调方法
- 类型3:simple:和correlated效果一样,但是如果回调返回的是false,会关闭信道,接下来无法发送消息
配置类
@Component
public class confirmConfig
{public static final String CONFIRM_EXCHANGE_NAME
="confirm.exchange";public static final String CONFIRM_QUEUE
="confirm.queue";public static final String CONFIRM_ROUTING_KEY
="confirm";@Bean("confirmExchange")public DirectExchange confirmExchange(){return new DirectExchange(CONFIRM_EXCHANGE_NAME
);}@Bean("confirmQueue")public Queue confirmQueue(){return new Queue(CONFIRM_QUEUE
);}@Beanpublic Binding confirmQueueBindingConfirmExchange(@Qualifier("confirmExchange")DirectExchange confirmExchange
,@Qualifier("confirmQueue")Queue confirmQueue
){return BindingBuilder.bind(confirmQueue
).to(confirmExchange
).with(CONFIRM_ROUTING_KEY
);}
}
- 当生产者发送给交换机消息时,交换机的名字错了,或者交换机挂掉了,会导致消息的丢失,那么我们需要实现回调接口,当交换机收到消息后会给生产者发送回调消息
实现回调接口:实现 RabbitTemplate.ConfirmCallback接口的confirm方法并且将其注入到rabbit模板的内部类中
@Component
@Slf4j
public class ConfirmCallback implements RabbitTemplate.ConfirmCallback {@Autowiredprivate RabbitTemplate rabbitTemplate
;@PostConstruct public void init(){rabbitTemplate
.setConfirmCallback(this);}@Overridepublic void confirm(CorrelationData correlationData
, boolean b
, String s
) {String id
=correlationData
!=null?correlationData
.getId():"";if(b
){log
.info("交换机已经收到了ID为{}的消息",id
);}else {log
.info("交换机为收到了ID为{}的消息,原因是:{}",id
,s
);}}
}
生产者
@RestController
public class ConfirmProducer {@Autowiredprivate RabbitTemplate rabbitTemplate
;@GetMapping("/sendMsg/{msg}")public void t1(@PathVariable String msg
){CorrelationData correlationData
= new CorrelationData();correlationData
.setId("1");rabbitTemplate
.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME
,ConfirmConfig.CONFIRM_ROUTING_KEY
,"嘿嘿嘿".getBytes(),correlationData
);}
}
消费者
@Component
public class ConfirmConsumer {@RabbitListener(queues
= ConfirmConfig.CONFIRM_QUEUE
)public void consumer(Message message
){System.out
.println("高级特性确认发布消费者收到了消息===>"+new String(message
.getBody()));}
}
- 测试:当我们正常发送消息
- 测试:当我们把交换机名字换掉
3.3.2 可靠性投递return模式
- 场景:若交换机收到消息,队列没有收到消息,应该如何解决?
- 需要在application核心配置文件中设置是否回退消息,当消息路由不到消费者
- spring-rabbitmq-publisher-returns=true 开启回退消息
@Component
@Slf4j
public class ConfirmCallback implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback{@Autowiredprivate RabbitTemplate rabbitTemplate
;@PostConstruct public void init(){rabbitTemplate
.setConfirmCallback(this);rabbitTemplate
.setReturnCallback(this);}@Overridepublic void confirm(CorrelationData correlationData
, boolean b
, String s
) {String id
=correlationData
!=null?correlationData
.getId():"";if(b
){log
.info("交换机已经收到了ID为{}的消息",id
);}else {log
.info("交换机未收到了ID为{}的消息,原因是:{}",id
,s
);}}@Overridepublic void returnedMessage(Message message
, int i
, String s
, String s1
, String s2
) {log
.info("消息{},被交换机{}退回,原因是{},路由是{}",new String(message
.getBody()),s1
,s
,s2
);}}
3.4 优先级队列
- 优先级越高,消息先被消费者消费
- 官方设置最大优先级 0-255 超出优先级则报错 自己使用时数字不必设置很大,会浪费CPU效率
生产者
public class PriorityProducer {public static void main(String[] args
) throws Exception{ConnectionFactory factory
= new ConnectionFactory();factory
.setHost("ip地址");factory
.setUsername("RabbitMQ登录用户名");factory
.setPassword("RabbitMQ登录密码");Connection connection
= factory
.newConnection();Channel channel
= connection
.createChannel();AMQP.BasicProperties build
= new AMQP.BasicProperties.Builder().priority(10).build();for (int i
= 1; i
<= 10; i
++) {String msg
="info"+i
;if(i
==5){channel
.basicPublish("","hi",build
,msg
.getBytes());}else {channel
.basicPublish("","hi",null,msg
.getBytes());}}}
}
消费者
public class PriorityConsumer {public static void main(String[] args
) throws Exception{ConnectionFactory factory
= new ConnectionFactory();factory
.setHost("ip地址");factory
.setUsername("RabbitMQ登录用户名");factory
.setPassword("RabbitMQ登录密码");Connection connection
= factory
.newConnection();Channel channel
= connection
.createChannel();HashMap<String, Object> map
= new HashMap<>();map
.put("x-max-priority",10);channel
.queueDeclare("hi",false,false,false,map
);channel
.basicConsume("hi",true,(consumerTag
,message
)->{System.out
.println("优先级队列接收消息顺序===>"+new String(message
.getBody()));},(consumerTag
) -> System.out
.println("取消回调"));}
}
- 测试结果:我们定义的是消息5优先级最高,其他消息为默认优先级
3.5 消费端限流
- 参数一:prefetchSize:预先载入的大小 0表示不限制大小
- 参数二:prefetchCount:预先载入的消息条数
- 参数三:global:false
- 注意:autoAck手动应答一定要为false
channel
.basicQos(0,1,false);
12
public class AckProvider {public static final String QUEUE_NAME
="hello_Ack";public static void main(String[] args
) throws Exception{ConnectionFactory factory
= new ConnectionFactory();factory
.setHost("ip");factory
.setUsername("用户");factory
.setPassword("密码");Connection connection
= factory
.newConnection();Channel channel
= connection
.createChannel();channel
.queueDeclare(QUEUE_NAME
,false,false,false,null);Scanner scanner
= new Scanner(System.in
);while (true){String msg
= scanner
.nextLine();channel
.basicPublish("",QUEUE_NAME
, null,msg
.getBytes());System.out
.println("消息发送完毕");}}
}
public class AckConsumer2 {public static final String QUEUE_NAME
="hello_Ack";public static void main(String[] args
) throws Exception{ConnectionFactory factory
= new ConnectionFactory();factory
.setHost("ip");factory
.setUsername("用户");factory
.setPassword("密码");Connection connection
= factory
.newConnection();Channel channel
= connection
.createChannel();DeliverCallback deliverCallback
=(consumerTag
, message
)-> {System.out
.println(new String(message
.getBody()));try {Thread.sleep(1000*5);} catch (InterruptedException e
) {e
.printStackTrace();}channel
.basicAck(message
.getEnvelope().getDeliveryTag(),false);};CancelCallback cancelCallback
=consumerTag
->{System.out
.println("消费消息被中断");};channel
.basicQos(0,1,false);channel
.basicConsume(QUEUE_NAME
,false,deliverCallback
,cancelCallback
);}
}
总结
以上是生活随笔为你收集整理的RabbitMQ保姆级教程的全部内容,希望文章能够帮你解决所遇到的问题。
如果觉得生活随笔网站内容还不错,欢迎将生活随笔推荐给好友。