消息的分割:
接下来,我们以一个最简单的例子来尝试一下 Spring Integration:
这段代码解释为:
SubscribableChannel messageChannel =new DirectChannel(); // 1
messageChannel.subscribe(msg-> { // 2
System.out.println("receive: " +msg.getPayload());
});
messageChannel.send(MessageBuilder.withPayload("msgfrom alibaba").build()); // 3
1. 构造一个可订阅的消息通道 messageChannel
;
2. 使用 MessageHandler
去消费这个消息通道里的消息;
3. 发送一条消息到这个消息通道,消息最终被消息通道里的 MessageHandler
所消费。
最后控制台打印出: receive: msg from alibaba
;
DirectChannel
内部有个 UnicastingDispatcher
类型的消息分发器,会分发到对应的消息通道 MessageChannel
中,从名字也可以看出来,UnicastingDispatcher
是个单播的分发器,只能选择一个消息通道。那么如何选择呢? 内部提供了 LoadBalancingStrategy
负载均衡策略,默认只有轮询的实现,可以进行扩展。
我们对上段代码做一点修改,使用多个 MessageHandler
去处理消息:
SubscribableChannel messageChannel = new DirectChannel();
messageChannel.subscribe(msg -> {
System.out.println("receive1: " + msg.getPayload());
-
SCS 与 Spring Boot Actuator 整合,提供了 /bindings, /channels
endpoint;
-
SCS 与 Spring Boot Externalized Configuration 整合,提供了 BindingProperties, BinderProperties
等外部化配置类;
-
SCS 增强了消息发送失败的和消费失败情况下的处理逻辑等功能。
Binder
是提供与外部消息中间件集成的组件,为构造 Binding
提供了 2 个方法,分别是 bindConsumer
和 bindProducer
,它们分别用于构造生产者和消费者。目前官方的实现有 Rabbit Binder 和 Kafka Binder, Spring Cloud Alibaba 内部已经实现了 RocketMQ Binder。
