文章
问答
冒泡
spring cloud stream实现mq消息的传递

前言
在项目中,经常会使用到mq实现异步或者服务之间的通信。spring cloud stream 提供了多个mq的集成,不需要去单独的去调用rabbit或者kakfa的client发送消息。随着项目的变化,或者技术的演进,我们可能会遇到切换消息队列的情况。spring原来的amqp是各个mq实现了接口,然后通过自己的client去发送消息。spring cloud 提供了spring cloud stream组件,对各种消息中间件进行了一个聚合。也就是说,同样的接口,我们可以同时发送rabbitmq或者kafka的消息。

整体架构如下


环境

  •     spring  boot :2.5.3
  •     spring cloud: 2020.0.4


添加依赖

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>


这里我们以rabbitmq为例,设置exchange为message-center,queue为test

spring.rabbitmq.addresses=xxxx
spring.rabbitmq.port=xxxx
spring.rabbitmq.username=xxxx
spring.rabbitmq.password=xxxx

spring.cloud.stream.bindings.message-center-in-0.group=test
spring.cloud.stream.bindings.message-center-in-0.destination=message-center
spring.cloud.stream.bindings.message-center-out-0.group=test
spring.cloud.stream.bindings.message-center-out-0.destination=message-center
spring.cloud.function.definition=message-center


另外还可以通过rabbit的其他配置,配置更多的个性化配置;例如

spring.cloud.stream.rabbit.bindings.message-center-out-0.producer.binding-routing-key=stream-routing-key
spring.cloud.stream.rabbit.bindings.message-center-out-0.producer.routing-key-expression='stream-routing-key'


这里的message-center-in-0 可以分解为 message-center:functionName,  in:input or output ,0:index
input - <functionName> + -in- + <index>
output - <functionName> + -out- + <index>

spring cloud stream 是基于@FunctionalInterface的,官方文档可以看到有
Consumer 消费者
Supplier 生产者
Function 既是生产者又是消费者
一般情况下 我们使用Consumer比较多,这里我们以一个Consumer为例

定义一个Consumer接收消息

@Configuration
public class StreamConfiguration {

    @Bean //如果出现消费者找不到的情况,可以考虑指定Bean的value值
    public Consumer<String> messageCenter(){
        return s->{
            System.out.println(s);
        };
    }
}


发送方

@RestController
@RequestMapping
public class DemoController {

    @Autowired
    private StreamBridge streamBridge;

    @GetMapping("send")
    public void send(){
        streamBridge.send("message-center-out-0","12345");
    }

}


在rabbitmq的管理界面可以看到exchange下有了message-center,queue下有了message-center.test。并且消费者接收到消息。

添加ack机制

spring.cloud.stream.bindings.message-center-in-0.group=test
spring.cloud.stream.bindings.message-center-in-0.destination=message-center
spring.cloud.stream.bindings.message-center-in-0.consumer.max-attempts=3 //最大重试次数
spring.cloud.stream.bindings.message-center-out-0.group=test
spring.cloud.stream.bindings.message-center-out-0.destination=message-center
spring.cloud.stream.rabbit.bindings.message-center-in-0.consumer.acknowledge-mode=MANUAL
spring.cloud.stream.rabbit.bindings.message-center-in-0.consumer.auto-bind-dlq=true
spring.cloud.stream.rabbit.bindings.message-center-in-0.consumer.republish-to-dlq=true
spring.cloud.function.definition=message-center

 

@Bean
public Consumer<Message<String>> messageCenter(){
    return s->{
      Channel channel =  s.getHeaders().get(AmqpHeaders.CHANNEL, Channel.class);
      Long deliverTag =  s.getHeaders().get(AmqpHeaders.DELIVERY_TAG, Long.class);
        System.out.println(s.getPayload());
        try {
            channel.basicAck(deliverTag,false);
        } catch (IOException e) {
            e.printStackTrace();
            throw new RuntimeException("ss");
        }
    };
}

关于作者

落雁沙
非典型码农
获得点赞
文章被阅读