欢迎访问 生活随笔!

生活随笔

当前位置: 首页 >

SpringCloud Stream消息驱动

发布时间:2023/12/10 60 豆豆
生活随笔 收集整理的这篇文章主要介绍了 SpringCloud Stream消息驱动 小编觉得挺不错的,现在分享给大家,帮大家做个参考.

为啥有这个技术???

1. 这个stream是操作消息队列的,简化,学习消息队列的成本降低。 2. 可操作rabbitMQ兔子message queue,kafaka,可理解为jdbc可操作oracle, mysql.. 3. spring家的技术学就完了。。

stream

    • 对消息驱动需要了解的概念
    • 消息驱动生产者,消费者
    • 再建一个服务,消息消费者,问题

对消息驱动需要了解的概念

(1) 网站 文档
https://spring.io/projects/spring-cloud-stream#overview

https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/3.0.1.RELEASE/reference/html/


(2) 介绍

什么是SpringCloudStream 官方定义Spring Cloud Stream是一个构建消息驱动微服务的框架。 应用程序通过inputs或者outputs与Spring Cloud Stream中binder对象交互。 通过我们配置来binding(绑定), 而Spring Cloud Stream的binder对象负责与消息中间件交互。 所以,我们只需要搞清楚如何与Spring Cloud Stream 交互就可以访便使用消息驱动的方式。 通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动。 Spring Cloud Stream为一些供应商的消息中间件产品提供了个性化的自动化配置实现, 引用了发布-订阅、消费组、分区的三个核心概念。 目前仅支持RabbitMQ、Kafka.

(3) 标准mq

  • Message

    • 生产者/消费者之间靠消息媒介传递信息内容
  • 消息通道MessageChannel

    • 消息必须走特定的通道
  • 消息通道MessageChannel的子接口SubscribableChannel,由MessageHandler消息处理器订阅

    • 消息通道里的消息如何被消费呢,谁负责收发处理

(4) 为什么用Cloud Stream

1 绑定stream凭什么可以统一底层差异

  • 在没有绑定器这个概念的情况下,我们的SpringBoot应用要直接与消息中间件进行信息交互的时候,
    于各消息中间件构建的初衷不同,它们的实现细节上会有较大的差异性
    通过定义绑定器作为中间层,完美地实现了应用程序与消息中间件细节之间的隔离。
    通过向应用程序暴露统- -的Channel通道, 使得应用程序不需要再考虑各种不同的消息中间件实现。

2 binder架构

INPUT对应于消费者 OUTPUT对应于生产者

(5) Stream中的消息通信方式遵循了发布-订阅模式

Topic主题进行广播 在RabbitMQ就是Exchange 交换机 在kafka中就是Topic

(6) 常用api,注解

消息驱动生产者,消费者

  • 生产者
    pom
  • <dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency>
    ymal server:port: 8802spring:application:name: cloud-stream-consumercloud:stream:binders: # 在此处配置要绑定的rabbitmq的服务信息;defaultRabbit: # 表示定义的名称,用于于binding整合type: rabbit # 消息组件类型environment: # 设置rabbitmq的相关的环境配置spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestbindings: # 服务的整合处理input: # 这个名字是一个通道的名称destination: studyExchange # 表示要使用的Exchange名称定义content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”binder: defaultRabbit # 设置要绑定的消息服务的具体设置eureka:client: # 客户端进行Eureka注册的配置service-url:defaultZone: http://localhost:7001/eurekainstance:lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)instance-id: receive-8802.com # 在信息列表时显示主机名称prefer-ip-address: true # 访问的路径变为IP地址

    发送消息接口,实现

    public interface IMessageProvider {public String send(); }/// 实现 import com.atguigu.springcloud.service.IMessageProvider; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.integration.support.MessageBuilderFactory; import org.springframework.messaging.MessageChannel; import org.springframework.integration.support.MessageBuilder; import javax.annotation.Resource; import org.springframework.cloud.stream.messaging.Source;import javax.annotation.Resource; import java.util.UUID;@EnableBinding(Source.class) //定义消息的推送管道 public class MessageProviderImpl implements IMessageProvider {@Resourceprivate MessageChannel output; // 消息发送管道@Overridepublic String send(){String serial = UUID.randomUUID().toString();output.send(MessageBuilder.withPayload(serial).build());System.out.println("*****serial: "+serial);return null;} }
  • 消费者
    yaml, pom同时,yaml要改端口。
    定义controller接收消息
  • import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.messaging.Message; import org.springframework.beans.factory.annotation.Value; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Sink; import org.springframework.stereotype.Component;@Component @EnableBinding(Sink.class) public class ReceiveMessageListenerController {@Value("${server.port}")private String serverPort;@StreamListener(Sink.INPUT)public void input(Message<String> message) {System.out.println("消费者1号,接受:"+message.getPayload()+"\t port:"+serverPort);}}

    再建一个服务,消息消费者,问题

    1. 消息重复。发送者发送消息,两个消费者都会接收到消息,如果是支付多个模块, 收到一条消息,多个模块会收到坏账,需要分组,只对一个支付模块发消息. 2. 消息持久化。当关掉消费者,消息丢失。

    a. 新增group配置,自定义group

    group: damn

    b. 持久化,服务挂了,保证消息不丢失

    页数配置分组解决

    创作挑战赛新人创作奖励来咯,坚持创作打卡瓜分现金大奖

    总结

    以上是生活随笔为你收集整理的SpringCloud Stream消息驱动的全部内容,希望文章能够帮你解决所遇到的问题。

    如果觉得生活随笔网站内容还不错,欢迎将生活随笔推荐给好友。