六、SpringCloud消息驱动

CY 2019年03月20日 66次浏览

消息中间件的类型有特别多,如果一个个去学习,成本非常大,这样就需要一个类似于Hibernate操作数据库一样的工具来操作消息中间件了。

Spring Cloud Stream

这个工具就是用来无差别的操作消息中间件的工具,支持列表见官网

先了解如何去使用,至于原理方面必须要先了解各种消息中间件的基本工作流程才能深入。

消息生产者

引入依赖:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

配置文件:

spring:
  cloud:
    stream:
      binders:
        defaultRabbit:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: localhost
                prot: 5672
                username: guest
                password: guest
      bindings:
        output:
          destination: rabbitExchange
          content-type: application/json
          binder: defaultRabbit

上面配置定义了一个binder叫做defaultRabbit,然后将其注入点bindings

import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;

import javax.annotation.Resource;
import java.util.UUID;

@EnableBinding(Source.class)
@Slf4j
public class MessageProvider {

    /**
     * 这里的变量名必须写成output,因为Spring容器中有三个MessageChannel
     * output,nullChannel,errorChannel
     */
    @Resource
    private MessageChannel output;

    public String send() {
        String uuid = UUID.randomUUID().toString();
        output.send(MessageBuilder.withPayload("消费者:" + uuid).build());
        log.info("消费者发送消息成功 ==> " + uuid);
        return uuid;
    }
}

使用@EnableBinding注解来声明该类Source,注意导包

注入一个MessageChannel类用来输出消息

发送的内容是一个org.springframework.messaging.Message对象

写一个Rest接口来调用send方法

@RestController
public class MessageController {

    @Resource
    private MessageProvider messageProvider;

    @RequestMapping("/send")
    public String send() {
        return messageProvider.send();
    }
}

消息消费者

引入一样的包:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

配置中,只需要把output改为input即可

spring:
  application:
    name: cloud-stream-consumer
  cloud:
    stream:
      binders:
        defaultRabbit:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: localhost
                prot: 5672
                username: guest
                password: guest
      bindings:
        input:
          destination: rabbitExchange
          content-type: application/json
          binder: defaultRabbit

然后写一个接收消息的类:

@EnableBinding(Sink.class)
@Slf4j
public class MessageController {

    @Value("${server.port}")
    private String serverPort;

    @StreamListener(Sink.INPUT)
    public void receive(Message<String> message) {
      log.info(serverPort + "收到消息:" + message.getPayload());
    }
}

与生产者不同的是@EnableBinding注解中的类使用的是Sink,然后在方法上面使用了@StreamListener注解

分组消费

之前的消费者会遇到两个问题

首先,消费者比较多的时候发现消费者会重复消费,也就是多个消费者消费的内容都是一样的,而不是所有消费者要抢消息。

其次,消费者如果全部没有运行的时候,就会造成发送的消息不见了。

为了解决上面的两个问题,就可以多一步配置,来做到分组消费:

spring:
  cloud:
    stream:
      bindings:
        input:
          group: rabbitConsumer

多个消费者是同一个组的时候就会均匀的收到消息了。