欢迎访问 生活随笔!

生活随笔

当前位置: 首页 >

分布式消息通信ActiveMQ原理-消费消息策略-笔记

发布时间:2023/12/15 51 豆豆
生活随笔 收集整理的这篇文章主要介绍了 分布式消息通信ActiveMQ原理-消费消息策略-笔记 小编觉得挺不错的,现在分享给大家,帮大家做个参考.

2019独角兽企业重金招聘Python工程师标准>>>

消息消费流程图

消费端消费消息的原理

  • 我们通过上一节课的讲解,知道有两种方法可以接收消息,
    • 一种是使用同步阻塞的MessageConsumer#receive方法。
    • 另一种是使用消息监听器MessageListener。
  • 这里需要注意的是,在同一个session下,这两者不能同时工作,
    • 也就是说不能针对不同消息采用不同的接收方式。
    • 否则会抛出异常。
  • 至于为什么这么做,最大的原因还是在事务性会话中,两种消费模式的事务不好管控

ActiveMQMessageConsumer.receive

  • 消费端同步接收消息的源码入口
public Message receive() throws JMSException {checkClosed();checkMessageListener(); //检查receive和MessageListener是否同时配置在当前的会话中sendPullCommand(0); //如果PrefetchSizeSize为0并且unconsumerMessage为空,则发起pull命令MessageDispatch md = dequeue(-1); //从unconsumerMessage出队列获取消息if (md == null) {return null;}beforeMessageIsConsumed(md);afterMessageIsConsumed(md, false); //发送ack给到brokerreturn createActiveMQMessage(md);//获取消息并返回}

sendPullCommand

  • 发送pull命令从broker上获取消息,前提是prefetchSize=0并且unconsumedMessages为空。
  • unconsumedMessage表示未消费的消息,这里面预读取的消息大小为prefetchSize的值
protected void sendPullCommand(long timeout) throws JMSException {clearDeliveredList();if (info.getCurrentPrefetchSize() == 0 && unconsumedMessages.isEmpty()) {MessagePull messagePull = new MessagePull();messagePull.configure(info);messagePull.setTimeout(timeout);session.asyncSendPacket(messagePull); //向服务端异步发送messagePull指令}}

clearDeliveredList

  • 在上面的sendPullCommand方法中,会先调用clearDeliveredList方法,
    • 主要用来清理已经分发的消息链表deliveredMessages
      • deliveredMessages,存储分发给消费者但还为应答的消息链表
      • Ø 如果session是事务的,则会遍历deliveredMessage中的消息放入到previouslyDeliveredMessage中来做重发
      • Ø 如果session是非事务的,根据ACK的模式来选择不同的应答操作
private void clearDeliveredList() {if (clearDeliveredList) {synchronized (deliveredMessages) {if (clearDeliveredList) {if (!deliveredMessages.isEmpty()) {if (session.isTransacted()) {if (previouslyDeliveredMessages == null) {previouslyDeliveredMessages = new PreviouslyDeliveredMap<MessageId,Boolean>(session.getTransactionContext().getTransactionId());}for (MessageDispatch delivered : deliveredMessages) {previouslyDeliveredMessages.put(delivered.getMessage().getMessageId(), false);}LOG.debug("{} tracking existing transacted {} delivered list ({}) on transport interrupt",getConsumerId(), previouslyDeliveredMessages.transactionId,deliveredMessages.size());} else {if (session.isClientAcknowledge()) {LOG.debug("{} rolling back delivered list ({}) on transport interrupt", getConsumerId(), deliveredMessages.size());// allow redeliveryif (!this.info.isBrowser()) {for (MessageDispatch md : deliveredMessages) {this.session.connection.rollbackDuplicate(this,md.getMessage());}}}LOG.debug("{} clearing delivered list ({}) on transport interrupt",getConsumerId(), deliveredMessages.size());deliveredMessages.clear();pendingAck = null;}}clearDeliveredList = false;}}}}

dequeue

  • 从unconsumedMessage中取出一个消息,
  • 在创建一个消费者时,就会未这个消费者创建一个为消费的消息通道,这个通道分为两种,
    • 一种是简单优先级队列分发通道SimplePriorityMessageDispatchChannel ;
    • 另一种是先进先出的分发通道FifoMessageDispatchChannel.
  • 至于为什么要存在这样一个消息分发通道,大家可以想象一下,
    • 如果消费者每次去消费完一个消息以后再去broker拿一个消息,效率是比较低的。
    • 所以通过这样的设计可以允许session能够一次性将多条消息分发给一个消费者。
    • 默认情况下对于queue来说,prefetchSize的值是1000

beforeMessageIsConsumed

  • 这里面主要是做消息消费之前的一些准备工作,
  • 如果ACK类型不是DUPS_OK_ACKNOWLEDGE或者队列模式(简单来说就是除了Topic和DupAck这两种情况),
    • 所有的消息先放到deliveredMessages链表的开头。
  • 并且如果当前是事务类型的会话,
    • 则判断transactedIndividualAck,如果为true,表示单条消息直接返回ack。
    • 否则,调用ackLater,批量应答,
      • client端在消费消息后暂且不发送ACK,而是把它缓存下来(pendingACK),
        • 等到这些消息的条数达到一定阀值时,只需要通过一个ACK指令把它们全部确认;
        • 这比对每条消息都逐个确认,在性能上要提高很多
private void beforeMessageIsConsumed(MessageDispatch md) throws JMSException {md.setDeliverySequenceId(session.getNextDeliveryId());lastDeliveredSequenceId = md.getMessage().getMessageId().getBrokerSequenceId();if (!isAutoAcknowledgeBatch()) {synchronized(deliveredMessages) {deliveredMessages.addFirst(md);}if (session.getTransacted()) {if (transactedIndividualAck) {immediateIndividualTransactedAck(md);} else {ackLater(md, MessageAck.DELIVERED_ACK_TYPE);}}}}

afterMessageIsConsumed

  • 这个方法的主要作用是执行应答操作,这里面做以下几个操作
    • Ø 如果消息过期,则返回消息过期的ack
    • Ø 如果是事务类型的会话,则不做任何处理
    • Ø 如果是AUTOACK或者(DUPS_OK_ACK且是队列),并且是优化ack操作,则走批量确认ack
    • Ø 如果是DUPS_OK_ACK,则走ackLater逻辑
    • Ø 如果是CLIENT_ACK,则执行ackLater
private void afterMessageIsConsumed(MessageDispatch md, boolean messageExpired) throwsJMSException {if (unconsumedMessages.isClosed()) {return;}if (messageExpired) {acknowledge(md, MessageAck.EXPIRED_ACK_TYPE);stats.getExpiredMessageCount().increment();} else {stats.onMessage();if (session.getTransacted()) {// Do nothing.} else if (isAutoAcknowledgeEach()) {if (deliveryingAcknowledgements.compareAndSet(false, true)) {synchronized (deliveredMessages) {if (!deliveredMessages.isEmpty()) {if (optimizeAcknowledge) {ackCounter++;// AMQ-3956 evaluate both expired and normal msgs as// otherwise consumer may get stalledif (ackCounter + deliveredCounter >= (info.getPrefetchSize() * .65)|| (optimizeAcknowledgeTimeOut > 0 && System.currentTimeMillis() >= (optimizeAckTimestamp +optimizeAcknowledgeTimeOut))) {MessageAck ack =makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);if (ack != null) {deliveredMessages.clear();ackCounter = 0;session.sendAck(ack);optimizeAckTimestamp = System.currentTimeMillis();}// AMQ-3956 - as further optimization send// ack for expired msgs when there are any.// This resets the deliveredCounter to 0 so that// we won't sent standard acks with every msg just// because the deliveredCounter just below// 0.5 * prefetch as used in ackLater()if (pendingAck != null && deliveredCounter > 0) {session.sendAck(pendingAck);pendingAck = null;deliveredCounter = 0;}}} else {MessageAck ack =makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);if (ack != null) {deliveredMessages.clear();session.sendAck(ack);}}}}deliveryingAcknowledgements.set(false);}} else if (isAutoAcknowledgeBatch()) {ackLater(md, MessageAck.STANDARD_ACK_TYPE);} else if (session.isClientAcknowledge() || session.isIndividualAcknowledge()) {boolean messageUnackedByConsumer = false;synchronized (deliveredMessages) {messageUnackedByConsumer = deliveredMessages.contains(md);}if (messageUnackedByConsumer) {ackLater(md, MessageAck.DELIVERED_ACK_TYPE);}} else {throw new IllegalStateException("Invalid session state.");}}}

 

转载于:https://my.oschina.net/u/3847203/blog/2989560

总结

以上是生活随笔为你收集整理的分布式消息通信ActiveMQ原理-消费消息策略-笔记的全部内容,希望文章能够帮你解决所遇到的问题。

如果觉得生活随笔网站内容还不错,欢迎将生活随笔推荐给好友。