spring boot RbbitMQ
RabbitMQ 介绍
RabbitMQ 是一个由 Erlang 开发的 AMQP(Advanced Message Queuing Protocol)开源实现。很多人可能并不知道什么是 AMQP。AMQP 是一个提供统一服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息中间件设计。基于此协议的客户端与消息中间件可以传递消息,并不受客户端/中间件不同产品、不同开发语言等条件的限制。RabbitMQ 是由 RabbitMQ Technologies Ltd 开发并且提供商业支持的。该公司在 2010 年 4 月被SpringSource(VMWare 的一个部门)收购。在2013年5月被并入 Pivotal。其实 VMWare、Pivotal 和 EMC 本质上是一家的。不同的是,VMWare 是独立上市子公司,而 Pivotal 整合了 EMC 的某些资源,现在并没有上市。
RabbitMQ 支持多种客户端,如 Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP 等,支持 Ajax。用于分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面都有不错的表现。并且,正如RabbitMQ官网(官网地址:http://www.rabbitmq.com/)介绍的,RabbitMQ 在全球范围内在小型初创公司和大型企业中进行了超过 35 000 次 RabbitMQ 生产部署,是最受欢迎的开源消息代理。RabbitMQ 很轻量级,易于在内部和云中部署。它支持多种消息传递协议。RabbitMQ 可以部署在分布式和联合配置中,以满足高规模、高可用性的要求。
RabbitMQ有 四个重要概念,分别是:虚拟主机(vritual host),交换机(exchange),队列(queue),和绑定(binding)。
- Broker:虚拟主机,一个broker里可以开设多个 vhost,用作不同用户的权限分离。
- Virtual Host :为什么需要多个虚拟主机呢?很简单,RabbitMQ 当中,用户只能在虚拟主机的粒度进行权限控制。 因此,如果需要禁止 A 组访问 B 组的交换机/队列/绑定,必须为 A 和 B 分别创建一个虚拟主机。每一个 RabbitMQ 服务器都有一个默认的虚拟主机“/”。
- Exchange:Exchange 负责将它路由到一个或多个队列中,这个过程会根据 Exchange 的类型、Exchange 和队列之间的 binding 以及消息的 routing key 进行路由。
- Binding:交换机需要和绑定队列
- Quene:队列
- Connection:连接,建立一个tcp连接,使用多路复用方式提升性能,节约资源
- Channel:信道,一个连接可以多路复用打开多个信道,获取队列中的消息
不同类型的 Exchange,包括以下内容。
- Default:这是代理创建的特殊 Exchange。它会将消息路由至名字与消息 routing key 相同的队列。所有的队列都会自动绑定至 Default Exchange。
- Direct:如果消息的 routing key 与队列的 binding key 相同,那么消息将会路由到该队列上。
- Topic:如果消息的 routing key 与队列 binding key(可能会包含通配符)匹配,那么消息将会路由到一个或多个这样的队列上。
- Fanout:不管 routing key 和 binding key 是什么,消息都将会路由到所有绑定队列上。
- Headers:与 Topic Exchange 类似,只不过要基于消息的头信息进行路由,而不是 routing key。
- Dead letter:捕获所有无法投递(也就是它们无法匹配所有已定义的 Exchange 和队列的 binding 关系)的消息。
RabbitMQ 特点
可靠性:RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布确认 消息集群:多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker 高可用:队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。 多语言客户端:RabbitMQ 几乎支持所有常用语言,比如 Java、.NET、Ruby 等 管理界面:RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker 的许多方面
Spring Boot使用RabbitMQ
创建应用
创建一个命名为: spring-boot-rabbitmq-example
的 Spring boot应用。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.9.5</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
配置
在 application.yml
中配置
server:
port: 8080
spring:
application:
name: rabbitmq-example
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
配置消息转换器
@Configuration
public class RabbitConfig {
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
}
默认情况下,消息转换是通过SimpleMessageConverter来实现的,它能够将简单类型(如String)和Serializable对象转换成Message对象。但是,Spring为RabbitTemplate提供了多个消息转换器,包括下面内容。
- Jackson2JsonMessageConverter:使用Jackson 2 JSON实现对象和JSON的相互转换。
- MarshallingMessageConverter:使用Spring的Marshaller和Unmarshaller进行转换。
- SerializerMessageConverter:使用Spring的Serializer和Deserializer转换String和任意种类的原生对象。
- SimpleMessageConverter:转换String、字节数组和Serializable类型。
- ContentTypeDelegatingMessageConverter:基于contentType头信息,将转换功能委托给另外一个MessageConverter
- MessagingMessageConverter:将消息转换功能委托给另外一个MessageConverter,并将头信息的转换委托给AmqpHeaderConverter。
测试实体类
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Book implements Serializable {
private static final long serialVersionUID = -2647663884508887444L;
private int id;
private String name;
private String author;
}
简单消息接收器
@Component
@Slf4j
public class DirectReceiver {
/**
* 简单直连消息接收
* @param message
*/
@RabbitListener(queuesToDeclare=@Queue("direct.queue"))
@RabbitHandler
public void message(Message message) {
log.info("message result = {}",message);
}
}
启动服务
启动服务
运行测试类
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringBootRabbitmqExampleApplicationTests {
@Autowired
private AmqpTemplate amqpTemplate;
@Test
public void contextLoads() {
Book book = new Book(1,"spring cloud","mtcarpenter");
amqpTemplate.convertAndSend("direct.queue",book);
}
}
运行结果
2020-05-25 15:00:09.444 INFO 1456 --- [ntContainer#0-1] c.m.r.example.direct.DirectReceiver : message result = (Body:'{"id":1,"name":"spring cloud","author":"mtcarpenter"}' MessageProperties [headers={__TypeId__=com.mtcarpenter.rabbitmq.example.Book}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=direct.queue, deliveryTag=1, consumerTag=amq.ctag-5LNFFWiSMM7C338m1SSRvg, consumerQueue=direct.queue])
在主应用控制类控制台会接收如下消息。
Topic 转发模式消息发送
Topic转发模式是通过设置主题的方式来进行消息发送和接收的,这里需要使用到Route-key,创建一个TopicConfig类配置主题和交换机
/**
* 消息接收者 - consumer
*
* @RabbitListener - 可以注解类和方法。
* 注解类,当表当前类的对象是一个rabbit listener。
* 监听逻辑明确,可以由更好的方法定义规范。
* 必须配合@RabbitHandler才能实现rabbit消息消费能力,一个类可以有多个方法,但是仅有一个方法注解@RabbitHandler。
* 注解方法,代表当前方法是一个rabbit listener处理逻辑。
* 方便开发,一个类中可以定义若干个listener逻辑。
* 方法定义规范可能不合理。如:一个方法的处理逻辑太多,造成方法的bad smell。
*
* @RabbitListener - 代表当前类型是一个rabbitmq的监听器。
* bindings:绑定队列
* @QueueBinding - @RabbitListener.bindings属性的类型。绑定一个队列。
* value:绑定队列, Queue类型。
* exchange:配置交换器, Exchange类型。
* key:路由键,字符串类型。
*
* @Queue - 队列。
* value:队列名称
* autoDelete:是否是一个临时队列。
* true :当所有的consumer关闭后,自动删除queue。
* false:当任意一个consumer启动并创建queue后,如果queue中有消息未消费,无论是否有consumer继续执行,都保存queue。
*
* @Exchange - 交换器
* value:为交换器起个名称
* type:指定具体的交换器类型
*/
@Component
@Slf4j
public class TopicReceiver {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "magazineOrder",durable = "true"),
exchange = @Exchange(value = "order",type = "topic"),
key = "magazine"
))
@RabbitHandler
public void magazineMessage(Message message) {
log.info("magazine result = {}",message);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "historyOrder",durable = "true"),
exchange = @Exchange(value = "order",type = "topic"),
key = "history"
))
@RabbitHandler
public void historyMessage(Message message) {
log.info("history result = {}",message);
}
}
@RabbitListener
: 可以注解类和方法。- 注解类:当表当前类的对象是一个rabbit listener。监听逻辑明确,可以由更好的方法定义规范。必须配合@RabbitHandler才能实现rabbit消息消费能力,一个类可以有多个方法,但是仅有一个方法注解@RabbitHandler。
- 注解方法: 代表当前方法是一个rabbit listener处理逻辑。方便开发,一个类中可以定义若干个listener逻辑。方法定义规范可能不合理。如:一个方法的处理逻辑太多,造成方法的bad smell。
@RabbitListener
: 代表当前类型是一个rabbitmq的监听器。- bindings:绑定队列
- @QueueBinding - @RabbitListener.bindings属性的类型。绑定一个队列。
- value:绑定队列, Queue类型。
- exchange:配置交换器, Exchange类型。
- key:路由键,字符串类型。
- @Queue - 队列。
- value:队列名称
- autoDelete:是否是一个临时队列。
- true :当所有的consumer关闭后,自动删除queue。
- false:当任意一个consumer启动并创建queue后,如果queue中有消息未消费,无论是否有consumer继续执行,都保存queue。
- @Exchange - 交换器
- value:为交换器起个名称
- type:指定具体的交换器类型
测试类
@Test
public void topicReceiver(){
Book magazine = new Book(1,"杂志图书","mtcarpenter1");
amqpTemplate.convertAndSend("order","magazine",magazine);
Book history = new Book(2,"历史图书","mtcarpenter2");
amqpTemplate.convertAndSend("order","history",history);
}
运行结果
2020-05-26 09:14:03.488 INFO 12844 --- [ntContainer#1-1] c.m.r.example.topic.TopicReceiver : magazine result = (Body:'{"id":1,"name":"杂志图书","author":"mtcarpenter1"}' MessageProperties [headers={__TypeId__=com.mtcarpenter.rabbitmq.example.Book}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=order, receivedRoutingKey=magazine, deliveryTag=1, consumerTag=amq.ctag-Y3pCplXesU51cHOjjv8egg, consumerQueue=magazineOrder])
2020-05-26 09:14:03.488 INFO 12844 --- [ntContainer#2-1] c.m.r.example.topic.TopicReceiver : history result = (Body:'{"id":2,"name":"历史图书","author":"mtcarpenter2"}' MessageProperties [headers={__TypeId__=com.mtcarpenter.rabbitmq.example.Book}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=order, receivedRoutingKey=history, deliveryTag=1, consumerTag=amq.ctag-686MnOKPr86BcnDvzh6kUQ, consumerQueue=historyOrder])
消息确认和回退
配置类
application.yml
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
# 连接超时
connection-timeout: 1500
# 开启发送确认
publisher-confirms: true
# 开启发送失败退回
publisher-returns: true
template:
mandatory: true
# 开启ACK
listener:
direct:
acknowledge-mode: manual
simple:
acknowledge-mode: manual
max-concurrency: 10
concurrency: 5
监听器手工 ack
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "encyclopediaOrder",durable = "true"),
exchange = @Exchange(value = "order",type = "topic",
ignoreDeclarationExceptions = "true"),
key = "encyclopedia"
))
@RabbitHandler
public void onOrderMessage(@Payload Book book,
Channel channel,
@Headers Map<String, Object> headers) throws Exception {
log.info("book id = {}" + book.getId());
Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
//手工ACK
channel.basicAck(deliveryTag, false);
}
发送器
@Component
@Slf4j
public class TopicSender implements RabbitTemplate.ReturnCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 发送消息方法调用: 构建Message消息
* @param book
* @throws Exception
*/
public void send(Book book) throws Exception {
//设置消息退回后的回调处理机制
this.rabbitTemplate.setReturnCallback(this);
// CorrelationData是一个当发送原始消息时,由客户机提供的对象。
// ack是一个boolean值,当ack(确认)的时候,值为true;当nack(不确认)的时候,值为false
// 经测试发现,只要 rabbitTemplate.convertAndSend() 能正确找到exchange,无论是否能将消息路由到正确的queue,ack值都为true
// 只有当rabbitTemplate.convertAndSend()无法找到exchange时,ack 值才为false
// cause是附加原因,比如说当nack的时候附加的原因
this.rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (!ack) {
log.error("消息发送失败 cause = {} ,correlationData={} ", cause, correlationData.toString());
} else {
log.info("消息发送成功 ");
}
});
this.rabbitTemplate.convertAndSend("order", "encyclopedia", book);
}
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("return message={} , exchange = {} , routingKey = {},replyCode={}, replyText{}",
message, exchange, routingKey, replyCode, replyText);
}
}
测试类
@Test
public void topicSender() throws Exception {
topicSender.send(new Book(3,"百科图书","mtcarpenter"));
}
文章参考
- Spring Boot 2实战之旅
-
Spring实战(第5版)
- https://www.cnblogs.com/jing99/p/11679426.html
代码示例
本文示例代码访问下面查看仓库:
- Github: https://github.com/mtcarpenter/spring-cloud-learning
- Gitee: https://gitee.com/mtcarpenter/spring-cloud-learning
其中,本文示例代码名称:
spring-boot-rabbitmq-example
: spring boot rabbitMQ
微信扫描二维码,关注一个有故事的程序员
(转载本站文章请注明作者和出处 山间木匠-mtcarpenter)