RabbitMQ 使用记录
项目里有订单超时取消的需求,用定时任务轮询太暴力,就引入了 RabbitMQ 的延迟消息来做。记录安装和接入过程,重点是延迟队列(死信队列)这块的配置。
RabbitMQ 是一个轻量级且易于部署的开源消息队列,支持多种消息协议,可部署在分布式和联合配置中,满足高规模、高可用性需求。
核心概念:
- Exchange(交换器):
direct(单播)、fanout(广播)、topic(组播,# 匹配多单词,* 匹配一单词) - Queue(队列):消息暂存的地方,消费者从这里取消息
安装
Linux(Docker)
docker run -d --name rabbitmq \ --publish 5671:5671 \ --publish 5672:5672 \ --publish 4369:4369 \ --publish 25672:25672 \ --publish 15671:15671 \ --publish 15672:15672 \ rabbitmq:3.7.15-management
|
开启管理界面插件(如果用的不是 management 版本):
docker exec -it rabbitmq rabbitmq-plugins enable rabbitmq_management
|
访问管理控制台:http://服务器IP:15672,默认账密 guest/guest
Spring Boot 接入
1. 添加依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
|
2. 配置连接
spring: rabbitmq: host: localhost port: 5672 virtual-host: /test username: guest password: guest publisher-confirms: true
|
3. 配置类
配置序列化方式、延迟队列(TTL 死信队列)和实际消费队列:
@Configuration @EnableRabbit public class RabbitMQConfig {
@Bean public MessageConverter messageConverter() { return new Jackson2JsonMessageConverter(); }
@Bean DirectExchange orderDirect() { return (DirectExchange) ExchangeBuilder .directExchange(QueueEnum.QUEUE_NEWS.getExchange()) .durable(true).build(); }
@Bean DirectExchange orderTtlDirect() { return (DirectExchange) ExchangeBuilder .directExchange(QueueEnum.QUEUE_TTL_NEWS.getExchange()) .durable(true).build(); }
@Bean public Queue orderQueue() { return new Queue(QueueEnum.QUEUE_NEWS.getName()); }
@Bean public Queue orderTtlQueue() { return QueueBuilder .durable(QueueEnum.QUEUE_TTL_NEWS.getName()) .withArgument("x-dead-letter-exchange", QueueEnum.QUEUE_NEWS.getExchange()) .withArgument("x-dead-letter-routing-key", QueueEnum.QUEUE_NEWS.getRouteKey()) .build(); }
@Bean Binding orderBinding(DirectExchange orderDirect, Queue orderQueue) { return BindingBuilder.bind(orderQueue).to(orderDirect) .with(QueueEnum.QUEUE_NEWS.getRouteKey()); }
@Bean Binding orderTtlBinding(DirectExchange orderTtlDirect, Queue orderTtlQueue) { return BindingBuilder.bind(orderTtlQueue).to(orderTtlDirect) .with(QueueEnum.QUEUE_TTL_NEWS.getRouteKey()); } }
|
4. 发送延迟消息
@Component public class RabbitMqSender { @Autowired private AmqpTemplate amqpTemplate;
public void sendMessage(Long id, final long delayTimes) { amqpTemplate.convertAndSend( QueueEnum.QUEUE_TTL_NEWS.getExchange(), QueueEnum.QUEUE_TTL_NEWS.getRouteKey(), id, message -> { message.getMessageProperties().setExpiration(String.valueOf(delayTimes)); return message; } ); } }
|
5. 消费消息
@Component @RabbitListener(queues = "order.news") public class RabbitMqReceiver {
@RabbitHandler public void handle(Long orderId) { log.info("收到超时订单,开始处理取消:{}", orderId); } }
|
顺序消息
RabbitMQ 本身不保证全局消息顺序,但可以通过架构设计实现局部有序。
核心思路
同一业务的消息必须满足两个条件才能保证有序:
- 路由到同一个队列 — 用相同的 routing key,保证同一类消息进同一个队列
- 单消费者线程处理 — 设置
prefetch = 1,同一时刻只有一条消息在处理
spring: rabbitmq: listener: simple: prefetch: 1 # 每次只取一条,处理完再取下一条 concurrency: 1 # 单线程消费(默认是 1,显式配置可读性更强) acknowledge-mode: manual # 手动 ACK,处理成功后再确认
|
发送时保证路由一致
@Component public class OrderEventSender { @Autowired private RabbitTemplate rabbitTemplate;
public void sendOrderEvent(Long orderId, String eventType) { OrderEvent event = new OrderEvent(orderId, eventType, LocalDateTime.now()); rabbitTemplate.convertAndSend("order.events", String.valueOf(orderId), event); } }
|
消费时手动 ACK 保证不丢序
@Component public class OrderEventReceiver {
@RabbitListener(queues = "order.events.queue") public void handle(OrderEvent event, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException { try { log.info("处理订单事件:{} - {}", event.getOrderId(), event.getEventType()); orderStateMachine.process(event); channel.basicAck(tag, false); } catch (Exception e) { log.error("处理订单事件失败,NACK 重新入队:{}", event, e); channel.basicNack(tag, false, false); } } }
|
注意:顺序消息的代价是吞吐量下降,单线程 + prefetch=1 意味着每条消息必须处理完才能取下一条。如果对顺序要求不严格,不建议强制开启。
消息补偿机制
消息补偿解决的核心问题是:消费者处理失败后怎么办。RabbitMQ 提供了从自动重试到死信兜底的完整链路。
1. 手动 ACK + NACK
关闭自动 ACK,由业务代码控制确认,是补偿机制的基础:
spring: rabbitmq: listener: simple: acknowledge-mode: manual
|
channel.basicAck(deliveryTag, false);
channel.basicNack(deliveryTag, false, true);
channel.basicNack(deliveryTag, false, false);
|
2. Spring Retry 自动重试
用 Spring AMQP 的重试拦截器,自动按退避策略重试,耗尽后走 MessageRecoverer:
@Configuration public class RabbitRetryConfig {
@Bean public RetryOperationsInterceptor retryInterceptor() { return RetryInterceptorBuilder.stateless() .maxAttempts(3) .backOffOptions(1000, 2.0, 10000) .recoverer(new RejectAndDontRequeueRecoverer()) .build(); } }
@Bean public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory( ConnectionFactory connectionFactory, RetryOperationsInterceptor retryInterceptor) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setAcknowledgeMode(AcknowledgeMode.AUTO); factory.setAdviceChain(retryInterceptor); return factory; }
|
3. 死信队列兜底 + 告警
重试耗尽后的消息进死信队列,监控告警并人工介入:
@Configuration public class DeadLetterConfig {
public static final String DEAD_LETTER_EXCHANGE = "dlx.exchange"; public static final String DEAD_LETTER_QUEUE = "dlx.queue";
@Bean DirectExchange deadLetterExchange() { return new DirectExchange(DEAD_LETTER_EXCHANGE); }
@Bean Queue deadLetterQueue() { return QueueBuilder.durable(DEAD_LETTER_QUEUE).build(); }
@Bean Binding deadLetterBinding() { return BindingBuilder.bind(deadLetterQueue()) .to(deadLetterExchange()).with("dlx.routing.key"); } }
@Component @RabbitListener(queues = DeadLetterConfig.DEAD_LETTER_QUEUE) public class DeadLetterConsumer {
@RabbitHandler public void handle(Message message) { String body = new String(message.getBody()); log.error("消息进入死信队列,需人工处理:{}", body); alarmService.send("MQ死信告警", body); deadMsgService.save(message); } }
|
4. 幂等性保障(消息 ID 去重)
重试机制会导致消息重复消费,消费者必须做幂等处理:
@Component @RabbitListener(queues = "order.news") public class IdempotentConsumer {
@Autowired private StringRedisTemplate redisTemplate;
@RabbitHandler public void handle(OrderMessage msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException { String msgId = msg.getMsgId(); String redisKey = "mq:consumed:" + msgId;
Boolean isFirst = redisTemplate.opsForValue().setIfAbsent(redisKey, "1", 24, TimeUnit.HOURS); if (Boolean.FALSE.equals(isFirst)) { log.warn("重复消息,已跳过:{}", msgId); channel.basicAck(tag, false); return; }
try { orderService.processPaySuccess(msg.getOrderId()); channel.basicAck(tag, false); } catch (Exception e) { redisTemplate.delete(redisKey); channel.basicNack(tag, false, false); } } }
|
补偿流程总结
消息发出 ↓ 消费者处理 ├── 成功 → basicAck,结束 └── 失败 → Spring Retry 自动重试(最多3次,指数退避) ├── 重试成功 → basicAck,结束 └── 重试耗尽 → basicNack(requeue=false) ↓ 进入死信队列(DLX) ↓ 死信消费者 → 记录DB + 发告警 + 等待人工补偿
|