加入收藏 | 设为首页 | 会员中心 | 我要投稿 鹰潭站长网 (https://www.0701zz.cn/)- 图像处理、低代码、云通信、数据工具、物联设备!
当前位置: 首页 > 站长资讯 > 外闻 > 正文

Spring Cloud Stream 体系

发布时间:2020-11-05 14:27:53 所属栏目:外闻 来源:互联网
导读:Spring Cloud Stream(后面以 SCS 代替 Spring Cloud Stream)本身内容很多,而且它还有很多外部的依赖,想要熟悉 SCS,必须要先了解 Spring Messaging 和 Spring Integration 这两个项目,接下来,文章将从围绕以下三点进行展开: 什么是 Spring Messaging;

Spring Cloud Stream (后面以 SCS 代替 Spring Cloud Stream) 本身内容很多,而且它还有很多外部的依赖,想要熟悉 SCS,必须要先了解 Spring Messaging 和 Spring Integration 这两个项目,接下来,文章将从围绕以下三点进行展开:

 

  • 什么是 Spring Messaging;

  • 什么是 Spring Integration;

  • 什么是 SCS 体系及其原理;

    Spring Integration


    Spring Integration 提供了 Spring 编程模型的扩展用来支持企业集成模式(Enterprise Integration Patterns),是对 Spring Messaging 的扩展。

    它提出了不少新的概念,包括消息路由 MessageRoute、消息分发 MessageDispatcher、消息过滤 Filter、消息转换 Transformer、消息聚合 Aggregator、消息分割 Splitter 等等。同时还提供了 MessageChannel 和MessageHandler 的实现,分别包括 DirectChannel、ExecutorChannel、PublishSubscribeChannel 和MessageFilter、ServiceActivatingHandler、MethodInvokingSplitter 等内容。

    这里为大家介绍几种消息的处理方式:
    • 消息的分割:

      接下来,我们以一个最简单的例子来尝试一下 Spring Integration:

      这段代码解释为:

       

      SubscribableChannel messageChannel =new DirectChannel(); // 1

      messageChannel.subscribe(msg-> { // 2
       System.out.println("receive: " +msg.getPayload());
      });

      messageChannel.send(MessageBuilder.withPayload("msgfrom alibaba").build()); // 3

       

      1. 构造一个可订阅的消息通道 messageChannel

      2. 使用 MessageHandler 去消费这个消息通道里的消息;

      3. 发送一条消息到这个消息通道,消息最终被消息通道里的 MessageHandler 所消费。

      最后控制台打印出: receive: msg from alibaba

      DirectChannel 内部有个 UnicastingDispatcher 类型的消息分发器,会分发到对应的消息通道 MessageChannel 中,从名字也可以看出来,UnicastingDispatcher 是个单播的分发器,只能选择一个消息通道。那么如何选择呢? 内部提供了 LoadBalancingStrategy 负载均衡策略,默认只有轮询的实现,可以进行扩展。

      我们对上段代码做一点修改,使用多个 MessageHandler 去处理消息:

      
      				

      SubscribableChannel messageChannel = new DirectChannel();

      messageChannel.subscribe(msg -> {
           System.out.println("receive1: " + msg.getPayload());

      • SCS 在 Spring Integration 的基础上进行了封装,提出了 Binder, Binding, @EnableBinding, @StreamListener 等概念;

      • SCS 与 Spring Boot Actuator 整合,提供了 /bindings, /channels endpoint;

      • SCS 与 Spring Boot Externalized Configuration 整合,提供了 BindingProperties, BinderProperties 等外部化配置类;

      • SCS 增强了消息发送失败的和消费失败情况下的处理逻辑等功能。

      • SCS 是 Spring Integration 的加强,同时与 Spring Boot 体系进行了融合,也是 Spring Cloud Bus 的基础。它屏蔽了底层消息中间件的实现细节,希望以统一的一套 API 来进行消息的发送/消费,底层消息中间件的实现细节由各消息中间件的 Binder 完成。

      Binder 是提供与外部消息中间件集成的组件,为构造 Binding提供了 2 个方法,分别是 bindConsumer 和 bindProducer ,它们分别用于构造生产者和消费者。目前官方的实现有 Rabbit Binder 和 Kafka Binder, Spring Cloud Alibaba 内部已经实现了 RocketMQ Binder

(编辑:鹰潭站长网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    热点阅读