Spring Cloud Stream

Wu Jun 2019-12-25 15:59:03
Categories: > Tags:

Spring Cloud Stream整合了Spring Boot和Spring Integration,实现了一套轻量级的消息驱动的微服务框架。目前为止只支持RabbitMQ和Kafka的自动化配置。

快速入门

以RabbitMQ为例

  1. 引入依赖
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream</artifactId>
    </dependency>

RabbitMQ

    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-rabbit</artifactId>     
    </dependency>

Kafka

	<dependency>
		<groupId>org.springframework.cloud</groupId>
		<artifactId>spring-cloud-starter-stream-kafka</artifactId>
	</dependency>
  1. 配置属性 配置目的地,类比于Kafka的Topic和RabbitMQ的队列的概念。配置格式 spring.cloud.stream.bindings.<channelName>.<key>=value
spring.cloud.stream.bindings.input.destination=test
spring.cloud.stream.bindings.output.destination=test
  1. 监听消息
    • @EnableBinding:指定一个或多个定义了@Input或@Output注解的接口,以绑定消息通道(Channel)。
      • 单个接口:@EnableBinding(Sink.class)
      • 多个接口:@EnableBinding(value = {Sink.class, Source.class})
      • Sink:Stream中默认实现的接口,通过@Input绑定了一个名为input的通道。
      • Source:默认实现的接口,绑定了output通道。
    • @StreamListener:注册方法为消息中间件的事件监听器,注解中属性值对应了监听的消息通道名。
@EnableBinding(Sink.class)
public class SinkReceiver {

    @StreamListener(Sink.INPUT)
    public void receive(Object payload) {
        ……
    }
    
}
  1. 发送消息
    通过@Output(SinkSender.OUTPUT)定义了一个输出通道
public interface SinkSender {

    String OUTPUT = "input";

    @Output(SinkSender.OUTPUT)
    MessageChannel output();

}
    
@EnableBinding(SinkSender.class)
public class SinkApplicationTests {

    @Autowired
    private SinkSender sinkSender;

    public void sinkSenderTester() {
        sinkSender.output().send(MessageBuilder.withPayload("produce a message").build());
    }
}

应用模型

绑定器

绑定器作为中间层,实现了应用程序与消息中间件细节之间的隔离。通过向应用程序暴露统一的Channel通道,使得应用程序不需要再考虑各种不同的消息中间件实现。
Stream为RabbitMQ和Kafka提供了默认的Binder实现,其他消息中间件可使用TestSupportBinder测试。
依赖的starter提供了默认的自动化配置,也可以修改配置文件

spring.cloud.stream.bindings.input.destination=raw-sensor-data

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=springcloud
spring.rabbitmq.password=123456

核心概念

发布-订阅

Spring Cloud Stream中的消息通信方式遵循了发布-订阅模式,当一条消息被投递到消息中间件之后,它会通过共享的Topic主题进行广播,消息消费者在订阅的主题中收到它并触发自身的业务逻辑处理。
Topic对应了RabbitMQ的Exchange、Kafka的Topic。

消费组

微服务应用一般都会部署多个实例。在希望消息对某个具体微服务,只被消费一次时,Stream中提供了消费组的概念。
通过spring.cloud.stream.bindings.input.group属性指定一个组名,每个发送到消费组的数据,仅由消费组中的一个消费者处理。没有指定消费组的时候,各自都属于一个独立的匿名消费组。

消息分区

消息分区是将消息增加一个固有的特征ID。当生产者将消息数据发送给多个消费者实例时,保证拥有共同特征的消息数据始终是由同一个消费者实例接收和处理。
消费者配置参数

spring.cloud.stream.bindings.input.group=Service-A

spring.cloud.stream.bindings.input.destination=greetings
//开启消费者分区功能
spring.cloud.stream.bindings.input.consumer.partitioned=true
//指定了当前消费者的总实例数量
spring.cloud.stream.instanceCount=2
//设置当前实例的索引号
spring.cloud.stream.instanceIndex=0

生产者配置参数

spring.cloud.stream.bindings.output.destination=greetings
//指定了分区键的表达式规则
spring.cloud.stream.bindings.output.producer.partitionKeyExpression=payload
//指定了消息分区的数量
spring.cloud.stream.bindings.output.producer.partitionCount=2

聚合

Spring Cloud Stream 支持聚合多个应用的功能。这个功能可以直接连接多个应用的输入,输出通道,避免通过代理(指Kafka,RabbitMQ这些Middleware)交换消息时带来的额外耗费。

//Source
@SpringBootApplication
@EnableBinding(Source.class)
public class SourceApplication {
	
	@InboundChannelAdapter(value = Source.OUTPUT)
	public String timerMessageSource() {
		return new SimpleDateFormat().format(new Date());
	}
}
//Processor
@SpringBootApplication
@EnableBinding(Processor.class)
public class ProcessorApplication {
	
	@Transformer(inputChannel=Sink.INPUT,outputChannel=Source.OUTPUT)
	public String loggerSink(String payload) {
		return payload.toUpperCase();
	}
}
//Sink
@SpringBootApplication
@EnableBinding(Sink.class)
public class SinkApplication {

	@StreamListener(Sink.INPUT)
	public void loggerSink(Object payload) {
		System.out.println("Received: " + payload);
	}
}

上面是三个应用,下面是将三个应用连接起来的代码。

@SpringBootApplication
public class App {

	public static void main(String[] args) {
		new AggregateApplicationBuilder().from(SourceApplication.class).args("--fixedDelay=5000")
				.via(ProcessorApplication.class).to(SinkApplication.class).args("--debug=true").run(args);
	}
}