RabbitMQ高级特性及Spring集成

消息可靠性投递

持久化:Exchange持久化Queue持久化Message持久化生产方确认Confirm消费方确认AckBroker高可用

生成端确认

使用RabbitMQ时,作为消息发送方希望杜绝任何消息丢失投递失败场景。RabbitMQ提供了Confirm确认模式Return退回模式两种方式用来控制消息的投递可靠性。

RabbitMQ整个消息投递的路径为:Producer到Rabbitmq Broker到Exchange到Queue到Consumer;消息从ProducerExchange则会返回一个ConfirmCallback消息从ExchangeQueue投递失败则会返回一个ReturnCallback。可利用这两个Callback控制消息的可靠性投递;

设置ConnectionFactorypublisher-confirms="true"开启确认模式。使用RabbitTemplatesetConfirmCallback设置回调函数。当消息发送到Exchange后回调confirm方法。在方法中判断ack,若为true则发送成功,若为false则发送失败需要处理。

设置ConnectionFactorypublisher-returns="true"开启退回模式。使用RabbitTemplatesetReturnCallback设置退回函数,当消息从Exchange路由到Queue失败后,若设置了rabbitTemplate.setMandatory(true)参数,则会将消息退回给Producer并执行回调函数returnedMessage

消费端确认

ack指Acknowledge确认,表示消费端收到消息后的确认方式,有三种确认方式:

  • 自动确认acknowledge="none"
  • 手动确认acknowledge="manual"
  • 根据异常情况确认acknowledge="auto"

自动确认是指当消息一旦被Consumer接收到,则自动确认收到,并将相应message从RabbitMQ消息缓存中移除。但在实际业务处理中,很可能消息接收到,业务处理出现异常,则该消息会丢失。若设置了手动确认方式,则需要在业务处理成功后,调用channel.basicAck()手动签收,若出现异常则调用channel.basicNack()方法,让其自动重新发送消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<!--加载配置文件-->
<context:property-placeholder location="classpath:rabbitmq.properties"/>
<!-- 定义rabbitmq connectionFactory -->
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}" password="${rabbitmq.password}" virtual-host="${rabbitmq.virtual-host}" publisher-confirms="true" publisher-returns="true"/>
<!--定义管理交换机、队列-->
<rabbit:admin connection-factory="connectionFactory"/>
<!--定义rabbitTemplate对象操作可以在代码中方便发送消息-->
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
<!--消息可靠性投递(生产端)-->
<rabbit:queue id="test_queue_confirm" name="test_queue_confirm"/>
<rabbit:direct-exchange name="test_exchange_confirm">
<rabbit:bindings>
<rabbit:binding queue="test_queue_confirm" key="confirm"/>
</rabbit:bindings>
</rabbit:direct-exchange>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml")
public class ProducerTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testConfirm() { //测试Confirm模式
//定义回调
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/**
* @param correlationData 相关配置信息
* @param ack exchange交换机 是否成功收到了消息。true 成功,false代表失败
* @param cause 失败原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("confirm方法被执行了...." + correlationData.getId());
// ack为true表示消息已经到达交换机
if (ack) { // 接收成功
System.out.println("接收成功消息" + cause);
} else { // 接收失败
System.out.println("接收失败消息" + cause);
// 做一些处理,让消息再次发送。
}
}
});
// 进行消息发送
for (int i = 0; i < 5; i++) {
rabbitTemplate.convertAndSend("test_exchange_confirm", "confirm", "message Confirm...");
}
}
@Test
public void testReturn() { // 测试return模式
// 设置交换机处理失败消息的模式,为true时消息到达不了队列时,会将消息重新返回给生产者
rabbitTemplate.setMandatory(true);
// 定义回调
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
/**
* @param message 消息对象
* @param replyCode 错误码
* @param replyText 错误信息
* @param exchange 交换机
* @param routingKey 路由键
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("return 执行了....");
System.out.println("message:" + message);
System.out.println("replyCode:" + replyCode);
System.out.println("replyText:" + replyText);
System.out.println("exchange:" + exchange);
System.out.println("routingKey:" + routingKey);
}
});
// 进行消息发送
rabbitTemplate.convertAndSend("test_exchange_confirm", "confirm", "message return...");
}
}
1
2
3
4
5
<!--定义监听器容器 acknowledge="manual":手动签收 prefetch="1":每次抓取多少条消息 -->
<!--定义监听器容器 acknowledge="manual" prefetch="1" -->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1">
<rabbit:listener ref="ackListener" queue-names="test_queue_confirm"></rabbit:listener></rabbit:listener>
</rabbit:listener-container>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Component
public class AckListener implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
//1、获取消息的id
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
//2、获取消息
System.out.println("message:" + new String(message.getBody()));
//3、进行业务处理
System.out.println("=====进行业务处理====");
//模拟出现异常
int i = 5/0;
//4、进行消息签收
channel.basicAck(deliveryTag, false);
System.out.println("收到了消息:" + deliveryTag);
} catch (Exception e) {
//拒绝签收,第三个参数:requeue:重回队列。如果设置为true,则消息重新回到queue,broker会重新发送该消息给消费端
channel.basicNack(deliveryTag, false, true);
}
}
}

消费端限流

<rabbit:listener-container>中配置prefetch属性设置消费端一次拉取多少消息,消费端的确认模式一定为手动确认acknowledge="manual"

TTL

当消息达到存活时间后,还未被消费会被自动清除,RabbitMQ可对消息设置过期时间,也可对整个队列设置过期时间

设置队列过期时间使用参数x-message-ttl单位ms毫秒,会对整个队列消息统一过期。设置消息过期时间使用参数expiration单位ms毫秒,当该消息在队列头部时,会单独判断这一消息是否过期。若两者都进行了设置以时间短的为准

1
2
3
4
5
6
7
8
9
10
11
12
<rabbit:queue name="test_queue_ttl" id="test_queue_ttl">
<!--设置queue的参数-->
<rabbit:queue-arguments>
<!--x-message-ttl指队列的过期时间-->
<entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"/>
</rabbit:queue-arguments>
</rabbit:queue>
<rabbit:topic-exchange name="test_exchange_ttl">
<rabbit:bindings>
<rabbit:binding pattern="ttl.#" queue="test_queue_ttl"/>
</rabbit:bindings>
</rabbit:topic-exchange>

死信队列

死信队列DXLDead Letter Exchange死信交换机,当消息成为Dead Message后,可被重新发送到另一个交换机,这个交换机就是DLX。消息成为死信的三种情况:

  • 队列消息长度到达限制
  • 消费者拒接消费消息basicNack/basicReject不把消息重新放入原目标队列requeue=false
  • 原队列存在消息过期设置,消息到达超时时间未被消费

死信交换机死信队列和普通的没有区别,当消息成为死信后,若该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队列。可通过给队列设置x-dead-letter-exchangex-dead-letter-routing-key参数来绑定死信交换机;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
<!--
1. 声明正常的队列(test_queue_dlx)和交换机(test_exchange_dlx)
2. 声明死信队列(queue_dlx)和死信交换机(exchange_dlx)
3. 正常队列绑定死信交换机
设置两个参数:
* x-dead-letter-exchange:死信交换机名称
* x-dead-letter-routing-key:发送给死信交换机的routingkey
-->
<!-- 1. 声明正常的队列(test_queue_dlx)和交换机(test_exchange_dlx) -->
<rabbit:queue name="test_queue_dlx" id="test_queue_dlx">
<!--3. 正常队列绑定死信交换机-->
<rabbit:queue-arguments>
<!--3.1 x-dead-letter-exchange:死信交换机名称-->
<entry key="x-dead-letter-exchange" value="exchange_dlx"/>
<!--3.2 x-dead-letter-routing-key:发送给死信交换机的routingkey-->
<entry key="x-dead-letter-routing-key" value="dlx.hehe"/>
<!--4.1 设置队列的过期时间 ttl-->
<entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"/>
<!--4.2 设置队列的长度限制 max-length-->
<entry key="x-max-length" value="10" value-type="java.lang.Integer"/>
</rabbit:queue-arguments>
</rabbit:queue>
<rabbit:topic-exchange name="test_exchange_dlx">
<rabbit:bindings>
<rabbit:binding pattern="test.dlx.#" queue="test_queue_dlx"/>
</rabbit:bindings>
</rabbit:topic-exchange>
<!-- 2. 声明死信队列(queue_dlx)和死信交换机(exchange_dlx) -->
<rabbit:queue name="queue_dlx" id="queue_dlx"/>
<rabbit:topic-exchange name="exchange_dlx">
<rabbit:bindings>
<rabbit:binding pattern="dlx.#" queue="queue_dlx"/>
</rabbit:bindings>
</rabbit:topic-exchange>
1
2
3
4
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1">
<!--定义监听器,监听正常队列-->
<rabbit:listener ref="dlxListener" queue-names="test_queue_dlx"></rabbit:listener></rabbit:listener>
</rabbit:listener-container>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Component
public class DlxListener implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
//1.接收转换消息
System.out.println(new String(message.getBody()));
//2. 处理业务逻辑
System.out.println("处理业务逻辑...");
//int i = 3/0;//出现错误
//3. 手动签收
channel.basicAck(deliveryTag, true);
} catch (Exception e) {
//e.printStackTrace();
System.out.println("出现异常,拒绝接受");
//4.拒绝签收,不重回队列 requeue=false
channel.basicNack(deliveryTag, true, false);
}
}
}

延迟队列

延迟队列即消息进入队列后不会立即被消费,只有到达指定时间后才会被消费,在RabbitMQ中并未提供延迟队列功能。但是可使用TTL+死信队列组合实现延迟队列的效果;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
<!--
延迟队列:
1. 定义正常交换机(order_exchange)和队列(order_queue)
2. 定义死信交换机(order_exchange_dlx)和队列(order_queue_dlx)
3. 绑定,设置正常队列过期时间为30分钟
-->
<!-- 1. 定义正常交换机(order_exchange)和队列(order_queue)-->
<rabbit:queue id="order_queue" name="order_queue">
<!--3. 绑定,设置正常队列过期时间为30分钟-->
<rabbit:queue-arguments>
<entry key="x-dead-letter-exchange" value="order_exchange_dlx"/>
<entry key="x-dead-letter-routing-key" value="dlx.order.cancel"/>
<entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"/>
</rabbit:queue-arguments>
</rabbit:queue>
<rabbit:topic-exchange name="order_exchange">
<rabbit:bindings>
<rabbit:binding pattern="order.#" queue="order_queue"/>
</rabbit:bindings>
</rabbit:topic-exchange>
<!--2. 定义死信交换机(order_exchange_dlx)和队列(order_queue_dlx)-->
<rabbit:queue id="order_queue_dlx" name="order_queue_dlx"/>
<rabbit:topic-exchange name="order_exchange_dlx">
<rabbit:bindings>
<rabbit:binding pattern="dlx.order.#" queue="order_queue_dlx"/>
</rabbit:bindings>
</rabbit:topic-exchange>
1
2
3
4
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1">
<!--延迟队列效果实现:一定要监听的是死信队列!!!-->
<rabbit:listener ref="orderListener" queue-names="order_queue_dlx"></rabbit:listener>
</rabbit:listener-container>

消息幂等性保障

通过版本号实现乐观锁的方式优化;

消息积压

消费者宕机积压、消费者消费能力不足积压、生产者者流量太大等都可能导致消息积压;

可通过上线更多的消费者,进行正常消费上线专门的队列消费服务,将消息先批量取出来记录数据库再慢慢处理;

Spring集成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
https://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!--加载配置文件-->
<context:property-placeholder location="classpath:rabbitmq.properties"/>
<!-- 定义rabbitmq connectionFactory -->
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}" password="${rabbitmq.password}" virtual-host="${rabbitmq.virtual-host}"/>
<!-- 定义管理交换机、队列 -->
<rabbit:admin connection-factory="connectionFactory"/>
<!-- 定义持久化队列,不存在则自动创建;不绑定到交换机则绑定到默认交换机,默认交换机类型为direct,名字为:"",路由键为队列的名称 -->
<!-- id:bean的名称,name:queue的名称,auto-declare:自动创建,durable:是否持久化,auto-delete:自动删除。最后一个消费者和该队列断开连接后,自动删除队列 -->
<rabbit:queue id="spring_queue" name="spring_queue" auto-declare="true" durable="false"/>
<!-- 广播;所有队列都能收到消息 -->
<!--定义广播交换机中的持久化队列,不存在则自动创建-->
<rabbit:queue id="spring_fanout_queue_1" name="spring_fanout_queue_1" auto-declare="true"/>
<!--定义广播交换机中的持久化队列,不存在则自动创建-->
<rabbit:queue id="spring_fanout_queue_2" name="spring_fanout_queue_2" auto-declare="true"/>

<!--定义广播类型交换机;并绑定上述两个队列-->
<rabbit:fanout-exchange id="spring_fanout_exchange" name="spring_fanout_exchange" auto-declare="true">
<rabbit:bindings>
<rabbit:binding queue="spring_fanout_queue_1"/>
<rabbit:binding queue="spring_fanout_queue_2"/>
</rabbit:bindings>
</rabbit:fanout-exchange>
<!-- 路由;所有队列都能收到消息 -->
<rabbit:queue id="spring_direct_queue" name="spring_direct_queue" auto-declare="true"/>
<rabbit:direct-exchange id="spring_direct_exchange" name="spring_direct_exchange">
<rabbit:bindings>
<rabbit:binding queue="spring_direct_queue"/>
</rabbit:bindings>
</rabbit:direct-exchange>
<!-- 通配符;*匹配一个单词,#匹配多个单词 -->
<rabbit:queue id="spring_topic_queue_star" name="spring_topic_queue_star" auto-declare="true"/>
<rabbit:queue id="spring_topic_queue_well" name="spring_topic_queue_well" auto-declare="true"/>
<rabbit:queue id="spring_topic_queue_well2" name="spring_topic_queue_well2" auto-declare="true"/>
<!-- 声明 topic 类型的交换机 -->
<rabbit:topic-exchange name="spring_topic_exchange" id="spring_topic_exchange" auto-declare="true">
<rabbit:bindings>
<rabbit:binding pattern="eleven.*" queue="spring_topic_queue_star"/>
<rabbit:binding pattern="eleven.#" queue="spring_topic_queue_well"/>
<rabbit:binding pattern="itcast.*" queue="spring_topic_queue_well2"/>
</rabbit:bindings>
</rabbit:topic-exchange>
<!--定义rabbitTemplate对象操作可以在代码中方便发送消息-->
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
</beans>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-producer-basic.xml")
public class ProducerBasicTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testHelloWorld() {
rabbitTemplate.convertAndSend("spring_queue", "hello world spring...");
}
@Test
public void testFanout() {
rabbitTemplate.convertAndSend("spring_fanout_exchange", "", "spring fanout...");
}
@Test
public void testDirect() {
rabbitTemplate.convertAndSend("spring_direct_exchange", "info", "spring_direct...");
}
@Test
public void testTopic() {
rabbitTemplate.convertAndSend("spring_topic_exchange", "eleven.hehe.haha", "spring topic...");
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
https://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!--加载配置文件-->
<context:property-placeholder location="classpath:rabbitmq.properties"/>
<!-- 定义rabbitmq connectionFactory -->
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}" password="${rabbitmq.password}" virtual-host="${rabbitmq.virtual-host}"/>

<bean id="springQueueListener" class="com.eleven.icode.rabbitmq.SpringQueueListener"/>
<bean id="fanoutListener" class="com.eleven.icode.rabbitmq.SpringQueueListener"/>
<bean id="fanoutListener2" class="com.eleven.icode.rabbitmq.SpringQueueListener"/>
<bean id="topicListenerStar" class="com.eleven.icode.rabbitmq.SpringQueueListener"/>
<bean id="topicListenerWell" class="com.eleven.icode.rabbitmq.SpringQueueListener"/>
<bean id="topicListenerWell2" class="com.eleven.icode.rabbitmq.SpringQueueListener"/>

<rabbit:listener-container connection-factory="connectionFactory" auto-declare="true">
<rabbit:listener ref="springQueueListener" queue-names="spring_queue"/>
<rabbit:listener ref="fanoutListener" queue-names="spring_fanout_queue_1"/>
<rabbit:listener ref="fanoutListener2" queue-names="spring_fanout_queue_2"/>
<rabbit:listener ref="topicListenerStar" queue-names="spring_topic_queue_star"/>
<rabbit:listener ref="topicListenerWell" queue-names="spring_topic_queue_well"/>
<rabbit:listener ref="topicListenerWell2" queue-names="spring_topic_queue_well2"/>
</rabbit:listener-container>
</beans>
1
2
3
4
5
6
public class SpringQueueListener implements MessageListener {
@Override
public void onMessage(Message message) {
System.out.println(new String(message.getBody()));
}
}

SpringBoot集成

1
2
3
4
5
6
7
spring:
rabbitmq:
host: localhost #主机ip
port: 5672 #端口
username: eleven
password: eleven
virtual-host: eleven
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Configuration
public class TopicConfig {
@Bean
public Queue topicQ1() { // 声明队列
return new Queue("topic_sb_mq_q1");
}
@Bean
public Queue topicQ2() { // 声明队列
return new Queue("topic_sb_mq_q2");
}
@Bean
public TopicExchange setTopicExchange() { // 声明exchange
return new TopicExchange("topicExchange");
}
@Bean
public Binding bindTopicHebei1() { // 声明binding,需要声明一个roytingKey
return BindingBuilder.bind(topicQ1()).to(setTopicExchange()).with("changsha.*");
}
@Bean
public Binding bindTopicHebei2() {
return BindingBuilder.bind(topicQ2()).to(setTopicExchange()).with("#.beijing");
}
}

@RabbitListener(queues = "topic_sb_mq_q2")
public void topicReceiveq2(String message) {
System.out.println("Topic模式 topic_sb_mq_q2 received message : " + message);
}