RabbitMQ 交换器、持久化
一、 交换器
RabbitMQ交换器(Exchange)分为四种
- direct
默认的交换器类型,消息的RoutingKey与队列的bindingKey匹配,消息就投递到相应的队列
- fanout
一种发布/订阅模式的交换器,发布一条消息时,fanout把消息广播附加到fanout交换器的队列上
接收类(订阅):
import com.rabbitmq.client.*;import java.io.IOException;public class ReceiveLogs {private static final String EXCHANGE_NAME = "logs";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, "fanout");//一旦创建exchange,RabbitMQ不允许对其改变,否则报错String queueName = channel.queueDeclare().getQueue();channel.queueBind(queueName, EXCHANGE_NAME, "");//绑定是交换器与队列之间的关系,可以理解为,队列对此交换器的消息感兴趣System.out.println(" [*] Waiting for messages. To exit press CTRL+C");Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println(" [x] Received '" + message + "'");}};channel.basicConsume(queueName, true, consumer);} }发布类:
import java.io.IOException; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel;public class ReceiveLog {private static final String EXCHANGE_NAME = "log";public static void main(String[] argv)throws java.io.IOException {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, "fanout");String message = "hi";channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");channel.close();connection.close();} }
- topic
topic类似于fanout交换器,但更加具体化,用routingKey进行规则匹配,更灵活的匹配出用户想要接收的消息
routingKey形如:com.company.module.demo,具体匹配规则:
"*"与"#"可以匹配任意字符,区别是"*"只能匹配由"."分割的一段字符,而"#"可以匹配所有字符
发布一条"com.abc.test.push"的消息,能匹配的routingKey:
com.abc.test.* #.test.push #不能匹配的:
com.abc.* *.test.push *发布类:
声明队列时,需要注意队列的属性,虽然队列的声明由消费者或生产者完成都可以,但如果由消费者声明,由于生产者生产消息时,可能队列还没有声明,会造成消息丢失,所以推荐由生产者声明队列
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;public class RabbitMqSendTest {private static String queue = "test_queue";private static String exchange = "TestExchange";private static String routingKey = "abc.test";public static void main(String[] args) {ConnectionFactory factory = new com.rabbitmq.client.ConnectionFactory();factory.setHost("172.16.67.60");factory.setPort(5672);factory.setUsername("admin");factory.setPassword("admin");Connection mqConnection = null;try {mqConnection = factory.newConnection();Channel mqChannel = mqConnection.createChannel();if (null != mqChannel && mqChannel.isOpen()) {mqChannel.exchangeDeclare(exchange, "topic"); // String queueName = mqChannel.queueDeclare().getQueue(); // mqChannel.queueBind(queueName, exchange, routingKey); //声明队列名称与属性 //durable持久队列,mq重启队列可恢复 exclusive独占队列,仅限于声明它的连接使用操作 //autoDelete 自动删除 arguments 其他属性mqChannel.queueDeclare(queue, false, false, false, null);mqChannel.queueBind(queue, exchange, routingKey);//*******************************************mqChannel.basicPublish(exchange, routingKey, null,("hello").getBytes());}} catch (Exception e) {e.printStackTrace();}finally {try {mqConnection.close();} catch (IOException e) {e.printStackTrace();}}} }接收类
import com.rabbitmq.client.*; import java.io.IOException;public class ReceiveTopic {private static String queue = "consume_queue";private static String exchange = "TestExchange";private static String routingKey = "*.test";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("172.16.67.60");factory.setPort(5672);factory.setUsername("admin");factory.setPassword("admin");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// channel声明Exchange,名称与类型channel.exchangeDeclare(exchange, "topic"); // String queuename = channel.queueDeclare().getQueue();channel.queueDeclare(queue, false, false, false, null);channel.queueBind(queue, exchange, "*.test"); //消费者指定消息队列,并选择特定的RoutingKeySystem.out.println(" [*] Waiting for messages. To exit press CTRL+C");Consumer client = new DefaultConsumer(channel) {public void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body)throws IOException {String msgString = new String(body, "UTF-8");System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + msgString + "'");}};channel.basicConsume(queue, true,client);System.out.println();} }二、持久化
RabbitMQ默认情况下重启消息服务器时,会丢失消息,为了尽量保证消息在服务器宕机时不丢失,就需要把消息持久化,但是也只是尽量不丢失,由于涉及磁盘写入,当消息量巨大时,mq性能也会被严重拉低。
转载于:https://www.cnblogs.com/castielangel/p/9952069.html
总结
以上是生活随笔为你收集整理的RabbitMQ 交换器、持久化的全部内容,希望文章能够帮你解决所遇到的问题。
- 上一篇: mysql5.7.23版本环境配置
- 下一篇: @Column