RabbitMQ 使用记录

RabbitMQ 使用记录

项目里有订单超时取消的需求,用定时任务轮询太暴力,就引入了 RabbitMQ 的延迟消息来做。记录安装和接入过程,重点是延迟队列(死信队列)这块的配置。

RabbitMQ 是一个轻量级且易于部署的开源消息队列,支持多种消息协议,可部署在分布式和联合配置中,满足高规模、高可用性需求。

核心概念:

  • Exchange(交换器)direct(单播)、fanout(广播)、topic(组播,# 匹配多单词,* 匹配一单词)
  • Queue(队列):消息暂存的地方,消费者从这里取消息

安装

Linux(Docker)

docker run -d --name rabbitmq \
--publish 5671:5671 \
--publish 5672:5672 \
--publish 4369:4369 \
--publish 25672:25672 \
--publish 15671:15671 \
--publish 15672:15672 \
rabbitmq:3.7.15-management

开启管理界面插件(如果用的不是 management 版本):

docker exec -it rabbitmq rabbitmq-plugins enable rabbitmq_management

访问管理控制台:http://服务器IP:15672,默认账密 guest/guest


Spring Boot 接入

1. 添加依赖

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2. 配置连接

spring:
rabbitmq:
host: localhost
port: 5672
virtual-host: /test
username: guest
password: guest
publisher-confirms: true # 异步消息需要回调时必须设置为 true

3. 配置类

配置序列化方式、延迟队列(TTL 死信队列)和实际消费队列:

@Configuration
@EnableRabbit
public class RabbitMQConfig {

/** 消息序列化:使用 Jackson JSON 格式(默认是 JDK 序列化,可读性差) */
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}

/** 实际消费队列绑定的交换机 */
@Bean
DirectExchange orderDirect() {
return (DirectExchange) ExchangeBuilder
.directExchange(QueueEnum.QUEUE_NEWS.getExchange())
.durable(true).build();
}

/** 延迟队列绑定的交换机 */
@Bean
DirectExchange orderTtlDirect() {
return (DirectExchange) ExchangeBuilder
.directExchange(QueueEnum.QUEUE_TTL_NEWS.getExchange())
.durable(true).build();
}

/** 实际消费队列 */
@Bean
public Queue orderQueue() {
return new Queue(QueueEnum.QUEUE_NEWS.getName());
}

/** 延迟队列(死信队列),消息过期后转发到实际消费队列 */
@Bean
public Queue orderTtlQueue() {
return QueueBuilder
.durable(QueueEnum.QUEUE_TTL_NEWS.getName())
.withArgument("x-dead-letter-exchange", QueueEnum.QUEUE_NEWS.getExchange())
.withArgument("x-dead-letter-routing-key", QueueEnum.QUEUE_NEWS.getRouteKey())
.build();
}

@Bean
Binding orderBinding(DirectExchange orderDirect, Queue orderQueue) {
return BindingBuilder.bind(orderQueue).to(orderDirect)
.with(QueueEnum.QUEUE_NEWS.getRouteKey());
}

@Bean
Binding orderTtlBinding(DirectExchange orderTtlDirect, Queue orderTtlQueue) {
return BindingBuilder.bind(orderTtlQueue).to(orderTtlDirect)
.with(QueueEnum.QUEUE_TTL_NEWS.getRouteKey());
}
}

4. 发送延迟消息

@Component
public class RabbitMqSender {
@Autowired
private AmqpTemplate amqpTemplate;

/**
* 发送延迟消息
* @param id 业务 ID
* @param delayTimes 延迟时间(毫秒)
*/
public void sendMessage(Long id, final long delayTimes) {
amqpTemplate.convertAndSend(
QueueEnum.QUEUE_TTL_NEWS.getExchange(),
QueueEnum.QUEUE_TTL_NEWS.getRouteKey(),
id,
message -> {
message.getMessageProperties().setExpiration(String.valueOf(delayTimes));
return message;
}
);
}
}

5. 消费消息

@Component
@RabbitListener(queues = "order.news")
public class RabbitMqReceiver {

@RabbitHandler
public void handle(Long orderId) {
// 到这里说明延迟时间到了,处理超时取消逻辑
log.info("收到超时订单,开始处理取消:{}", orderId);
}
}

顺序消息

RabbitMQ 本身不保证全局消息顺序,但可以通过架构设计实现局部有序

核心思路

同一业务的消息必须满足两个条件才能保证有序:

  1. 路由到同一个队列 — 用相同的 routing key,保证同一类消息进同一个队列
  2. 单消费者线程处理 — 设置 prefetch = 1,同一时刻只有一条消息在处理
// application.yml 配置:限制每个消费者每次只预取 1 条消息
spring:
rabbitmq:
listener:
simple:
prefetch: 1 # 每次只取一条,处理完再取下一条
concurrency: 1 # 单线程消费(默认是 1,显式配置可读性更强)
acknowledge-mode: manual # 手动 ACK,处理成功后再确认

发送时保证路由一致

@Component
public class OrderEventSender {
@Autowired
private RabbitTemplate rabbitTemplate;

/**
* 订单状态流转事件(创建 → 支付 → 发货)必须按序
* 用 orderId 作为 routing key,保证同一订单的消息进同一个队列
*/
public void sendOrderEvent(Long orderId, String eventType) {
OrderEvent event = new OrderEvent(orderId, eventType, LocalDateTime.now());
// routing key 固定为 orderId,保证同一订单走同一队列
rabbitTemplate.convertAndSend("order.events", String.valueOf(orderId), event);
}
}

消费时手动 ACK 保证不丢序

@Component
public class OrderEventReceiver {

@RabbitListener(queues = "order.events.queue")
public void handle(OrderEvent event, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
try {
log.info("处理订单事件:{} - {}", event.getOrderId(), event.getEventType());
orderStateMachine.process(event);
// 处理成功,ACK 确认
channel.basicAck(tag, false);
} catch (Exception e) {
log.error("处理订单事件失败,NACK 重新入队:{}", event, e);
// 处理失败,NACK 且不重新入队(避免无限循环),等待人工介入
channel.basicNack(tag, false, false);
}
}
}

注意:顺序消息的代价是吞吐量下降,单线程 + prefetch=1 意味着每条消息必须处理完才能取下一条。如果对顺序要求不严格,不建议强制开启。


消息补偿机制

消息补偿解决的核心问题是:消费者处理失败后怎么办。RabbitMQ 提供了从自动重试到死信兜底的完整链路。

1. 手动 ACK + NACK

关闭自动 ACK,由业务代码控制确认,是补偿机制的基础:

spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual # 手动 ACK
// 处理成功
channel.basicAck(deliveryTag, false);

// 处理失败,requeue=true 重新入队(会立即重试,小心死循环)
channel.basicNack(deliveryTag, false, true);

// 处理失败,requeue=false 拒绝且不重新入队(触发死信路由)
channel.basicNack(deliveryTag, false, false);

2. Spring Retry 自动重试

用 Spring AMQP 的重试拦截器,自动按退避策略重试,耗尽后走 MessageRecoverer:

@Configuration
public class RabbitRetryConfig {

@Bean
public RetryOperationsInterceptor retryInterceptor() {
return RetryInterceptorBuilder.stateless()
.maxAttempts(3) // 最多重试 3 次
.backOffOptions(1000, 2.0, 10000) // 初始1s,指数退避,最大10s
.recoverer(new RejectAndDontRequeueRecoverer()) // 耗尽后拒绝不重入队,走死信
.build();
}
}

// 把重试拦截器配置到 Listener 容器上
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
ConnectionFactory connectionFactory, RetryOperationsInterceptor retryInterceptor) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
factory.setAdviceChain(retryInterceptor);
return factory;
}

3. 死信队列兜底 + 告警

重试耗尽后的消息进死信队列,监控告警并人工介入:

@Configuration
public class DeadLetterConfig {

public static final String DEAD_LETTER_EXCHANGE = "dlx.exchange";
public static final String DEAD_LETTER_QUEUE = "dlx.queue";

@Bean
DirectExchange deadLetterExchange() {
return new DirectExchange(DEAD_LETTER_EXCHANGE);
}

@Bean
Queue deadLetterQueue() {
return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();
}

@Bean
Binding deadLetterBinding() {
return BindingBuilder.bind(deadLetterQueue())
.to(deadLetterExchange()).with("dlx.routing.key");
}
}

// 死信消费者:记录日志 + 发告警
@Component
@RabbitListener(queues = DeadLetterConfig.DEAD_LETTER_QUEUE)
public class DeadLetterConsumer {

@RabbitHandler
public void handle(Message message) {
String body = new String(message.getBody());
log.error("消息进入死信队列,需人工处理:{}", body);
// 发钉钉 / 企微告警
alarmService.send("MQ死信告警", body);
// 持久化到数据库,方便后续补偿
deadMsgService.save(message);
}
}

4. 幂等性保障(消息 ID 去重)

重试机制会导致消息重复消费,消费者必须做幂等处理:

@Component
@RabbitListener(queues = "order.news")
public class IdempotentConsumer {

@Autowired
private StringRedisTemplate redisTemplate;

@RabbitHandler
public void handle(OrderMessage msg, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
String msgId = msg.getMsgId(); // 生产者在消息体中放唯一 ID
String redisKey = "mq:consumed:" + msgId;

// 用 Redis SETNX 做幂等判断
Boolean isFirst = redisTemplate.opsForValue().setIfAbsent(redisKey, "1", 24, TimeUnit.HOURS);
if (Boolean.FALSE.equals(isFirst)) {
log.warn("重复消息,已跳过:{}", msgId);
channel.basicAck(tag, false); // 重复消息也要 ACK,避免无限重投
return;
}

try {
orderService.processPaySuccess(msg.getOrderId());
channel.basicAck(tag, false);
} catch (Exception e) {
// 处理失败,删除 Redis key,允许下次重试
redisTemplate.delete(redisKey);
channel.basicNack(tag, false, false);
}
}
}

补偿流程总结

消息发出

消费者处理
├── 成功 → basicAck,结束
└── 失败 → Spring Retry 自动重试(最多3次,指数退避)
├── 重试成功 → basicAck,结束
└── 重试耗尽 → basicNack(requeue=false)

进入死信队列(DLX)

死信消费者 → 记录DB + 发告警 + 等待人工补偿