欢迎访问 生活随笔!

生活随笔

当前位置: 首页 >

RabbitMQ(五):Exchange交换器--topic

发布时间:2024/9/30 40 豆豆
生活随笔 收集整理的这篇文章主要介绍了 RabbitMQ(五):Exchange交换器--topic 小编觉得挺不错的,现在分享给大家,帮大家做个参考.

内容翻译自:RabbitMQ Tutorials Java版


RabbitMQ(一):Hello World程序

RabbitMQ(二):Work Queues、循环分发、消息确认、持久化、公平分发

RabbitMQ(三):Exchange交换器--fanout

RabbitMQ(四):Exchange交换器--direct

RabbitMQ(五):Exchange交换器--topic

RabbitMQ(六):回调队列callback queue、关联标识correlation id、实现简单的RPC系统

RabbitMQ(七):常用方法说明 与 学习小结


Topics:

在上一篇博客中我们改进了我们的日志系统:使用direct路由器替代了fanout路由器,从而可以选择性地接收日志。

尽管使用direct路由器给我们的日志系统带来了改进,但仍然有一些限制:不能基于多种标准进行路由。

在我们的日志系统中,我们可能不仅需要根据日志的严重级别来接收日志,而且有时想基于日志来源进行路由。如果你知道syslog这个Unix工具,你可能了解这个概念,sysylog会基于日志严重级别(info/warn/crit...)和设备(auth/cron/kern...)进行日志分发。

如果我们可以监听来自corn的错误日志,同时也监听kern的所有日志,那么我们的日志系统就会更加灵活。

为了实现这个功能,我们需要了解一个复杂的路由器:topic路由器。


主题路由器(Topic Exchange):

发送到topic路由器的消息的路由键routing_key不能任意给定:它必须是一些单词的集合,中间用点号.分割。这些单词可以是任意的,但通常会体现出消息的特征。一些有效的路由键示例:stock.usd.nyse,nyse.vmw,quick.orange.rabbit。这些路由键可以包含很多单词,但路由键总长度不能超过255个字节。

绑定键binding key也必须是这种形式。topic路由器背后的逻辑与direct路由器类似:以特定路由键发送的消息将会发送到所有绑定键与之匹配的队列中。但绑定键有两种特殊的情况:

(1)*(星号)仅代表一个单词

(2)#(井号)代表任意个单词

下图可以很好地解释这两个符号的含义:

对于上图的例子,我们将会发送描述动物的消息。这些消息将会以由三个单词组成的路由键发送。路由键中的第一个单词描述了速度,第二个描述了颜色,第三个描述了物种:<speed>.<colour>.<species>。

我们创建了三个绑定,Q1的绑定键为*.orange.*,Q2的绑定键有两个,分别是*.*.rabbit和lazy.#。

上述绑定关系可以描述为:

(1)Q1关注所有颜色为orange的动物。

(2)Q2关注所有的rabbit,以及所有的lazy的动物。

如果一个消息的路由键是quick.orange.rabbit,那么Q1和Q2都可以接收到,路由键是lazy.orange.elephant的消息同样如此。但是,路由键是quick.orange.fox的消息只会到达Q1,路由键是lazy.brown.fox的消息只会到达Q2。注意,路由键为lazy.pink.rabbit的消息只会到达Q2一次,尽管它匹配了两个绑定键。路由键为quick.brown.fox的消息因为不和任意的绑定键匹配,所以将会被丢弃。

假如我们不按常理出牌:发送一个路由键只有一个单词或者四个单词的消息,像orange或者quick.orange.male.rabbit,这样的话,这些消息因为不和任意绑定键匹配,都将会丢弃。但是,lazy.orange.male.rabbit消息因为和lazy.#匹配,所以会到达Q2,尽管它包含四个单词。

Topic exchange::

Topic exchange非常强大,可以实现其他任意路由器的功能。

当一个队列以绑定键#绑定,它将会接收到所有的消息,而无视路由键(实际是绑定键#匹配了任意的路由键)。----这和fanout路由器一样了。

当*和#这两个特殊的字符不出现在绑定键中,Topic exchange就会和direct exchange类似了。


放在一块:

我们将会在我们的日志系统中使用主题路由器Topic exchange,并假设所有的日志消息以两个单词<facility>.<severity>为路由键。

代码和上个教程几乎一样。

生产者EmitLogTopic.java:

import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel;public class EmitLogTopic {private static final String EXCHANGE_NAME = "topic_logs";public static void main(String[] argv) {Connection connection = null;Channel channel = null;try {//建立连接和通道ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");connection = factory.newConnection();channel = connection.createChannel();//声明路由器和路由器类型channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);//定义路由键和消息String routingKey = "";String message = "msg.....";//发布消息channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");} catch (Exception e) {e.printStackTrace();} finally {if (connection != null) {try {connection.close();} catch (Exception ignore) {}}}} }

消费者ReceiveLogsTopic.java:

import com.rabbitmq.client.*;import java.io.IOException;public class ReceiveLogsTopic {private static final String EXCHANGE_NAME = "topic_logs";public static void main(String[] argv) throws Exception {//建立连接和通道ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();//声明路由器和路由器类型channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);String queueName = channel.queueDeclare().getQueue();//String bingingKeys[] = {""};for (String bindingKey : bingingKeys) {channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);}System.out.println(" [*] Waiting for messages. To exit press CTRL+C");//监听消息Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");}};channel.basicConsume(queueName, true, consumer);} }

现在,可以动手实验了。
开头提到的:日志严重级别info/warn/crit...和设备auth/cron/kern...。

消费者:
将String bingingKeys[] = {""}改为String bingingKeys[] = {"#"},启动第一个消费者;
再改为String bingingKeys[] = {"kern.*"},启动第二个消费者;
再改为String bingingKeys[] = {"*.critical"},启动第三个消费者;
再改为String bingingKeys[] = {"kern.*", "*.critical"},启动第四个消费者。

生产者,发送多个消息,如:
路由键为kern.critical 的消息:A critical kernel error;
路由键为kern.info 的消息:A kernel info;
路由键为kern.warn 的消息:A kernel warning;
路由键为auth.critical 的消息:A critical auth error;
路由键为cron.warn 的消息:A cron waning;
路由键为cron.critical 的消息:A critical cron error;

试试最后的结果:第一个消费者将会接收到所有的消息,第二个消费者将会kern的所有严重级别的日志,第三个消费者将会接收到所有设备的critical消息,第四个消费者将会接收到kern设备的所有消息和所有critical消息。

下篇博客中,我们将会学习如何让消息往返,以此来作为一个远程过程调用(RPC)。

 


说明

①与原文略有出入,如有疑问,请参阅原文

②原文均是编译后通过javacp命令直接运行程序,我是在IDE中进行的,相应的操作做了修改。

总结

以上是生活随笔为你收集整理的RabbitMQ(五):Exchange交换器--topic的全部内容,希望文章能够帮你解决所遇到的问题。

如果觉得生活随笔网站内容还不错,欢迎将生活随笔推荐给好友。