前言
在项目中,经常会使用到mq实现异步或者服务之间的通信。spring cloud stream 提供了多个mq的集成,不需要去单独的去调用rabbit或者kakfa的client发送消息。随着项目的变化,或者技术的演进,我们可能会遇到切换消息队列的情况。spring原来的amqp是各个mq实现了接口,然后通过自己的client去发送消息。spring cloud 提供了spring cloud stream组件,对各种消息中间件进行了一个聚合。也就是说,同样的接口,我们可以同时发送rabbitmq或者kafka的消息。
整体架构如下
环境
- spring boot :2.5.3
- spring cloud: 2020.0.4
添加依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
这里我们以rabbitmq为例,设置exchange为message-center,queue为test
spring.rabbitmq.addresses=xxxx
spring.rabbitmq.port=xxxx
spring.rabbitmq.username=xxxx
spring.rabbitmq.password=xxxx
spring.cloud.stream.bindings.message-center-in-0.group=test
spring.cloud.stream.bindings.message-center-in-0.destination=message-center
spring.cloud.stream.bindings.message-center-out-0.group=test
spring.cloud.stream.bindings.message-center-out-0.destination=message-center
spring.cloud.function.definition=message-center
另外还可以通过rabbit的其他配置,配置更多的个性化配置;例如
spring.cloud.stream.rabbit.bindings.message-center-out-0.producer.binding-routing-key=stream-routing-key
spring.cloud.stream.rabbit.bindings.message-center-out-0.producer.routing-key-expression='stream-routing-key'
这里的message-center-in-0 可以分解为 message-center:functionName, in:input or output ,0:index
input - <functionName> + -in- + <index>
output - <functionName> + -out- + <index>
spring cloud stream 是基于@FunctionalInterface的,官方文档可以看到有
Consumer 消费者
Supplier 生产者
Function 既是生产者又是消费者
一般情况下 我们使用Consumer比较多,这里我们以一个Consumer为例
定义一个Consumer接收消息
@Configuration
public class StreamConfiguration {
@Bean //如果出现消费者找不到的情况,可以考虑指定Bean的value值
public Consumer<String> messageCenter(){
return s->{
System.out.println(s);
};
}
}
发送方
@RestController
@RequestMapping
public class DemoController {
@Autowired
private StreamBridge streamBridge;
@GetMapping("send")
public void send(){
streamBridge.send("message-center-out-0","12345");
}
}
在rabbitmq的管理界面可以看到exchange下有了message-center,queue下有了message-center.test。并且消费者接收到消息。
添加ack机制
spring.cloud.stream.bindings.message-center-in-0.group=test
spring.cloud.stream.bindings.message-center-in-0.destination=message-center
spring.cloud.stream.bindings.message-center-in-0.consumer.max-attempts=3 //最大重试次数
spring.cloud.stream.bindings.message-center-out-0.group=test
spring.cloud.stream.bindings.message-center-out-0.destination=message-center
spring.cloud.stream.rabbit.bindings.message-center-in-0.consumer.acknowledge-mode=MANUAL
spring.cloud.stream.rabbit.bindings.message-center-in-0.consumer.auto-bind-dlq=true
spring.cloud.stream.rabbit.bindings.message-center-in-0.consumer.republish-to-dlq=true
spring.cloud.function.definition=message-center
@Bean
public Consumer<Message<String>> messageCenter(){
return s->{
Channel channel = s.getHeaders().get(AmqpHeaders.CHANNEL, Channel.class);
Long deliverTag = s.getHeaders().get(AmqpHeaders.DELIVERY_TAG, Long.class);
System.out.println(s.getPayload());
try {
channel.basicAck(deliverTag,false);
} catch (IOException e) {
e.printStackTrace();
throw new RuntimeException("ss");
}
};
}