消息中间件的类型有特别多,如果一个个去学习,成本非常大,这样就需要一个类似于Hibernate
操作数据库一样的工具来操作消息中间件了。
Spring Cloud Stream
这个工具就是用来无差别的操作消息中间件的工具,支持列表见官网。
先了解如何去使用,至于原理方面必须要先了解各种消息中间件的基本工作流程才能深入。
消息生产者
引入依赖:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
配置文件:
spring:
cloud:
stream:
binders:
defaultRabbit:
type: rabbit
environment:
spring:
rabbitmq:
host: localhost
prot: 5672
username: guest
password: guest
bindings:
output:
destination: rabbitExchange
content-type: application/json
binder: defaultRabbit
上面配置定义了一个binder
叫做defaultRabbit
,然后将其注入点bindings
中
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import javax.annotation.Resource;
import java.util.UUID;
@EnableBinding(Source.class)
@Slf4j
public class MessageProvider {
/**
* 这里的变量名必须写成output,因为Spring容器中有三个MessageChannel
* output,nullChannel,errorChannel
*/
@Resource
private MessageChannel output;
public String send() {
String uuid = UUID.randomUUID().toString();
output.send(MessageBuilder.withPayload("消费者:" + uuid).build());
log.info("消费者发送消息成功 ==> " + uuid);
return uuid;
}
}
使用@EnableBinding
注解来声明该类Source
,注意导包
注入一个MessageChannel
类用来输出消息
发送的内容是一个org.springframework.messaging.Message
对象
写一个Rest
接口来调用send
方法
@RestController
public class MessageController {
@Resource
private MessageProvider messageProvider;
@RequestMapping("/send")
public String send() {
return messageProvider.send();
}
}
消息消费者
引入一样的包:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
配置中,只需要把output
改为input
即可
spring:
application:
name: cloud-stream-consumer
cloud:
stream:
binders:
defaultRabbit:
type: rabbit
environment:
spring:
rabbitmq:
host: localhost
prot: 5672
username: guest
password: guest
bindings:
input:
destination: rabbitExchange
content-type: application/json
binder: defaultRabbit
然后写一个接收消息的类:
@EnableBinding(Sink.class)
@Slf4j
public class MessageController {
@Value("${server.port}")
private String serverPort;
@StreamListener(Sink.INPUT)
public void receive(Message<String> message) {
log.info(serverPort + "收到消息:" + message.getPayload());
}
}
与生产者不同的是@EnableBinding
注解中的类使用的是Sink
,然后在方法上面使用了@StreamListener
注解
分组消费
之前的消费者会遇到两个问题
首先,消费者比较多的时候发现消费者会重复消费,也就是多个消费者消费的内容都是一样的,而不是所有消费者要抢消息。
其次,消费者如果全部没有运行的时候,就会造成发送的消息不见了。
为了解决上面的两个问题,就可以多一步配置,来做到分组消费:
spring:
cloud:
stream:
bindings:
input:
group: rabbitConsumer
多个消费者是同一个组的时候就会均匀的收到消息了。