欢迎访问 生活随笔!

生活随笔

当前位置: 首页 >

【图解 Knative】剖析 Eventing Broker-Trigger 实现原理

发布时间:2023/12/20 49 豆豆
生活随笔 收集整理的这篇文章主要介绍了 【图解 Knative】剖析 Eventing Broker-Trigger 实现原理 小编觉得挺不错的,现在分享给大家,帮大家做个参考.

1. 背景介绍

我们都知道,Knative 有两个主要的子项目:Serving 和 Eventing。其中 关于 Serviing 可以查看之前的一篇公众号文章 【超详细】深入探究 Knative 扩缩容的奥秘。Eventing 将系统中的服务以事件驱动的方式松耦合的绑定在一起:即事件发送者不关注谁来消费事件,事件消费者也不关注事件是由谁产生的。Eventing 中有多种事件组合的方式,比如:

  • 最简单的 Source -> Service 直接绑定

  • 通过 channel 与 subscriptions 订阅的方式

  • 并行事件流处理 的方式 Parallel,以及串行事件流处理的方式 Sequence

  • 以及支持事件过滤的 Broker & Trigger 也是本文将要重点介绍的模式

✅注:对于其他几种事件组合的方式,感兴趣的可以在官网查阅 https://knative.dev/docs/eventing/

2. 工作原理介绍

上图展示的是 Broker & Trigger 模型的中 Event 传递的基本方式,其中事件传递的格式是标准的 CLoudEvent 的格式。简要介绍下图中的流程:

  • 事件由事件源 Source 产生, Knative 支持多种事件源,如 GitHub、Heartbeats、k8s、ContainerSource等,更多 Source 可在官网查阅 https://knative.dev/docs/eventing/sources/,也可以自定义 Source ,参考之前发的一篇公众号为 Serverless 平台插上腾飞的翅膀--自定义 Faas 事件驱动事件源。

  • 图中 事件源产生 type 为 foo 的事件,发送到 Broker,其中 有三个 Trigger 绑定了 Broker,两个 Trigger 的 filter 是 type:foo ,也就是会关注 type=foo 的事件,然后发送给对应的消费者。

  • Service1和 Service3 只是单纯的消费,并不回复事件

  • Service2 收到事件后回复 type=bar 的事件,事件重新传递到 Broker上,此时只有一个 Trigger 过滤了 type:bar 事件被传送到 Service3 消费。

  • 其中消费者是否回复事件是可选的。

  • 于图中的实例 yaml,可参考附录章节

    3. 底层实现原理

    上面介绍了 Broker & Trigger 的工作原理,现在从底层实现的角度进一步讲解。

    Knative Eventing 事件传递过程中依赖消息通道 Channel,为了便于持久化,生产环境使用可持久化的消息通过( 本文以 NatsStreaming 为例) 来做事件的消息通道,如果是开发调试,则直接用 InMemoryChannel 即可(只会保存在内存中,不会持久化)

    先上图

    • 图中包含控制平面与数据平面,图中箭头:控制平面为实线,数据平面为虚线

    • 实线的方框为 Knative 组件,虚线的方框为 k8s CR 资源

    下面分别从数据平面和控制平面分别讲解

    3.1 数据平面

    EventSource------> Broker------>Trigger------->SubScriber

    1. EventSource------> Broker

    Broker 可以手动生成,或者通过给 namespace 打 label eventing.knative.dev/injection:true ,让sugar-controller 会自动生成对应的 Broker 实例

    EventSource 一般通过 SinkURI 环境变量将 Broker 的 地址传入 。比如,Broker 的地址为 http://broker-ingress.knative-eventing.svc.cluster.local/default/default,该地址为broker-ingress 的地址,而 broker-ingress通过 请求 URL的 path 可以得到 Broker 的信息,比如/default/default表示 default namespace 下名为 default 的 Broker

    # kubectl get broker NAME      URL                                                                        AGE   READY   REASON default   http://broker-ingress.knative-eventing.svc.cluster.local/default/default   46h   True

    2.Broker------>Trigger

    broker-ingress ****得到 Broker 的信息(name namespace)之后,可以得到 Broker 的 status 信息,如下得到 channelAddress 的地址(http://default-kne-trigger-kn-channel.default.svc.cluster.local)

    apiVersion: eventing.knative.dev/v1 kind: Broker metadata:name: defaultnamespace: default status:address:url: http://broker-ingress.knative-eventing.svc.cluster.local/default/defaultannotations:knative.dev/channelAPIVersion: messaging.knative.dev/v1beta1knative.dev/channelAddress: http://default-kne-trigger-kn-channel.default.svc.cluster.localknative.dev/channelKind: NatssChannelknative.dev/channelName: default-kne-trigger

    channelAddress 的地址是个svc 的地址,通过 externalName 指向 natss-ch-dispatcher

    # kubectl get svc default-kne-trigger-kn-channel NAME                             TYPE           CLUSTER-IP   EXTERNAL-IP                                              PORT(S)   AGE default-kne-trigger-kn-channel   ExternalName   <none>       natss-ch-dispatcher.knative-eventing.svc.cluster.local   <none>    46h

    natss-ch-dispatcher 负责将 消息(主题为channel.Name + "." + channel.Namespace) 发布到 Natss-Streaming 组件

    3. Trigger------->SubScriber

    natss-ch-dispatcher ****不仅负责发布,还负责订阅消息,natss-ch-dispatcher ****watch natssChannel (见下面的 NatssChannel), 获取 natssChannel的 subscriber的地址 subscriberUri,通过 subscriberUri发送消息给 broker-filter ,跟 broker-ingress 一样,subscriberUri 的地址是broker-filter 的 地址,通过请求 path 区分哪个 trigger ,请求 path :/triggers/<trigger namespace>/<trigger name>/<trigger UID>

    apiVersion: messaging.knative.dev/v1beta1 kind: NatssChannel metadata:name: default-kne-triggernamespace: default spec:subscribers:- generation: 1replyUri: http://broker-ingress.knative-eventing.svc.cluster.local/default/defaultsubscriberUri: http://broker-filter.knative-eventing.svc.cluster.local/triggers/default/trigger2/c0f3f1bb-9a25-4fb2-b803-fc5cd74e57dauid: 9b4c991a-8912-4333-aa5a-caf053e5ee9c- generation: 1replyUri: http://broker-ingress.knative-eventing.svc.cluster.local/default/defaultsubscriberUri: http://broker-filter.knative-eventing.svc.cluster.local/triggers/default/trigger3/28e7ba73-d514-4aea-a0de-7946fc21e7ccuid: 56d00260-e69f-4d7a-bd05-c81479774a95- generation: 1replyUri: http://broker-ingress.knative-eventing.svc.cluster.local/default/defaultsubscriberUri: http://broker-filter.knative-eventing.svc.cluster.local/triggers/default/trigger1/3f6e7a8e-f7d1-4c8f-a25a-1970b6ab73e3uid: 963fd671-097f-489a-a857-c1803ad3fb19 status:address:url: http://default-kne-trigger-kn-channel.default.svc.cluster.local

    broker-filter 获取到 Trigger 的信息后,通过 根据 Trigger 的filter 将消息过滤,再决定是否将消息发给 对应的 subscriber, subscriber可以从 Trigger status 的 subcriberUri 获取到,见下图,对于subscriber Reply 的消息,broker-filter 发送到 replyUri 地址上,http://broker-ingress.knative-eventing.svc.cluster.local/default/default ,也就是发送给 Broker (实际是 broker-ingress)。

    apiVersion: eventing.knative.dev/v1 kind: Trigger metadata:name: trigger1namespace: default spec:broker: defaultfilter:attributes:type: dev.knative.sources.pingsubscriber:ref:apiVersion: serving.knative.dev/v1kind: Servicename: service1namespace: default status:subscriberUri: http://service1.default.svc.cluster.local

    至此数据面的通信流就完成了,接下来看控制面的数据流。

    3.2 控制平面

    控制平面主要看上图中的实线部分,此处再贴一下图

    接下来按流程讲解

    • 1.1 mt-broker-controller watch Broker 的创建

    • 1.2 mt-broker-controller 根据 Broker 的配置,创建对应的 Channel,此处为 NatssChannel

    • 1.3 Natss-ch-controller watch NatssChannel 的创建,更新 Natss-Streaming 的服务端状态到NatssChannel 的 status

    • 1.4 Natss-ch-controller创建 svc,externalname指向 natss-ch-dispatcher

    • 1.5 mt-broker-controllerwatch NatssChannel 的status

    • 1.6 mt-broker-controller 更新 Broker 的status,其中包含: Broker 的address(broker-ingress 的地址),channel 的 address,供 broker-ingress 使用

    • 2.1 mt-broker-controllerwatch Trigger

    • 2.2 mt-broker-controller根据 Trigger(含 subscriber 的信息)创建subscription,其中包含 subsciber(broker-filter) 的地址和 Broker 的信息

    apiVersion: messaging.knative.dev/v1 kind: Subscription metadata:name: default-trigger1-3f6e7a8e-f7d1-4c8f-a25a-1970b6ab73e3namespace: default spec:channel:apiVersion: messaging.knative.dev/v1beta1kind: NatssChannelname: default-kne-triggerreply:ref:apiVersion: eventing.knative.dev/v1kind: Brokername: defaultnamespace: defaultsubscriber:uri: http://broker-filter.knative-eventing.svc.cluster.local/triggers/default/trigger1/3f6e7a8e-f7d1-4c8f-a25a-1970b6ab73e3
    • 2.3 eventing-controller watch subscription ,解析 subcriber和 replyUri的地址(broker-ingress 的地址)

    • 2.4 eventing-controller根据subcription 更新 Natss-Channel :subsriberUrl 和 replyUrl

    spec:subscribers:- generation: 1replyUri: http://broker-ingress.knative-eventing.svc.cluster.local/default/defaultsubscriberUri: http://broker-filter.knative-eventing.svc.cluster.local/triggers/default/trigger2/c0f3f1bb-9a25-4fb2-b803-fc5cd74e57dauid: 9b4c991a-8912-4333-aa5a-caf053e5ee9c- generation: 1replyUri: http://broker-ingress.knative-eventing.svc.cluster.local/default/defaultsubscriberUri: http://broker-filter.knative-eventing.svc.cluster.local/triggers/default/trigger3/28e7ba73-d514-4aea-a0de-7946fc21e7ccuid: 56d00260-e69f-4d7a-bd05-c81479774a95- generation: 1replyUri: http://broker-ingress.knative-eventing.svc.cluster.local/default/defaultsubscriberUri: http://broker-filter.knative-eventing.svc.cluster.local/triggers/default/trigger1/3f6e7a8e-f7d1-4c8f-a25a-1970b6ab73e3uid: 963fd671-097f-489a-a857-c1803ad3fb19 status:address:url: http://default-kne-trigger-kn-channel.default.svc.cluster.local

    2.5 mt-broker-controller 根据 subscription 和 broker 的状态 更新 Trigger 的状态

    控制面的逻辑就完了,和数据面的逻辑结合的地方在于:

    broker-ingress 和 broker-filter :

  • broker-ingress watch Broker ,从 Broker 的 status 中获取 channel 的地址(natss-ch-dispatch 的 svc 的地址)

  • broker-filter watch Trigger ,根据 trigger 中subscriber 地址,将event filter 后决定是否发送到 对应的 target

  • 至此,整个流程都讲完了,可以按照附录中的资源,创建一下,看看其中的资源。

    附录

  • broker.yaml

  • apiVersion: eventing.knative.dev/v1 kind: Broker metadata:name: defaultnamespace: default
  • trigger1.yaml

  • apiVersion: eventing.knative.dev/v1 kind: Trigger metadata:name: trigger1 spec:filter:attributes:type: foosubscriber:ref:apiVersion: serving.knative.dev/v1kind: Servicename: service1
  • trigger2.yaml

  • apiVersion: eventing.knative.dev/v1 kind: Trigger metadata:name: trigger2 spec:filter:attributes:type: foosubscriber:ref:apiVersion: serving.knative.dev/v1kind: Servicename: service2
  • trigger3.yaml

  • apiVersion: eventing.knative.dev/v1 kind: Trigger metadata:name: trigger3 spec:filter:attributes:type: barsubscriber:ref:apiVersion: serving.knative.dev/v1kind: Servicename: service3
  • service1.yaml

  • apiVersion: serving.knative.dev/v1 kind: Service metadata:name: service1 spec:template:spec:containers:- image: docker.io/zhaojizhuang66/event-display:v1

    5.service3.yaml (service1和service3 一样都是打印接收到的 event)

    apiVersion: serving.knative.dev/v1 kind: Service metadata:name: service1 spec:template:spec:containers:- image: docker.io/zhaojizhuang66/event-display:v1
  • service2.yaml

  • apiVersion: serving.knative.dev/v1 kind: Service metadata:name: service2 spec:template:spec:containers:- image: docker.io/zhaojizhuang66/event-display-with-reply:v1

    关注公众号: Knative,了解更多 Serverless 、Knative,云原生相关资讯

    关注公众号,回复 "进群",即可进群与众多云原生 Serverless 技术大佬探讨技术,探讨人生。

    总结

    以上是生活随笔为你收集整理的【图解 Knative】剖析 Eventing Broker-Trigger 实现原理的全部内容,希望文章能够帮你解决所遇到的问题。

    如果觉得生活随笔网站内容还不错,欢迎将生活随笔推荐给好友。