📤 1. 开启生产者确认机制,确保生产者的消息能到达队列
通过ConfirmCallback和ReturnCallback回调函数监控消息发送状态,确保消息不在发送阶段丢失。
Spring Boot 配置 - 开启生产者确认
# 开启生产者确认机制,确保消息到达交换机
spring.rabbitmq.publisher-confirm-type=correlated
# 开启消息返回机制,确保消息能路由到队列
spring.rabbitmq.publisher-returns=true
# 设置为true,消息无法路由时会返回给生产者
spring.rabbitmq.template.mandatory=true
Java 实现 - 生产者确认回调
@Component
public class PublisherCallback implements
RabbitTemplate.ConfirmCallback,
RabbitTemplate.ReturnsCallback {
/**
* 确认回调:监控消息是否到达交换机
* @param data 消息的唯一标识
* @param ack true表示消息到达交换机,false表示失败
* @param cause 失败原因
*/
@Override
public void confirm(CorrelationData data,
boolean ack, String cause) {
if (ack) {
// 消息成功到达交换机
log.info("✅ 消息发送成功: {}", data.getId());
} else {
// 消息未到达交换机,需要重新发送
log.error("❌ 消息发送失败: {}, 原因: {}",
data.getId(), cause);
// 这里可以实现重新发送逻辑或记录到错误队列
resendMessage(data);
}
}
/**
* 返回回调:监控消息是否路由到队列
* 当消息无法路由到任何队列时触发
*/
@Override
public void returnedMessage(ReturnedMessage returned) {
log.warn("⚠️ 消息路由失败,无法找到匹配的队列: {}",
returned.getMessage().toString());
// 这里可以实现消息重新路由或记录到死信队列
}
/**
* 重新发送消息的逻辑
*/
private void resendMessage(CorrelationData data) {
// 实现重新发送逻辑
log.info("🔄 准备重新发送消息: {}", data.getId());
}
}
💾 2. 开启持久化功能,确保消息未消费前在队列中不会丢失
包括队列持久化、交换机持久化和消息持久化,三者缺一不可,确保服务重启后消息不丢失。
队列和交换机持久化配置
@Configuration
public class RabbitConfig {
/**
* 队列持久化配置
* durable(true) 确保队列在服务重启后不会丢失
*/
@Bean
public Queue myQueue() {
return QueueBuilder.durable("myQueue") // 开启队列持久化
.withArgument("x-message-ttl", 60000) // 消息TTL 60秒
.build();
}
/**
* 交换机持久化配置
* durable(true) 确保交换机在服务重启后不会丢失
*/
@Bean
public DirectExchange myExchange() {
return ExchangeBuilder
.directExchange("myExchange")
.durable(true) // 开启交换机持久化
.build();
}
/**
* 绑定关系配置
* 将队列绑定到交换机,指定路由键
*/
@Bean
public Binding binding() {
return BindingBuilder
.bind(myQueue()) // 绑定队列
.to(myExchange()) // 到交换机
.with("routing.key"); // 使用路由键
}
}
消息持久化发送
@Service
public class MessageSender {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 发送持久化消息
* 只有消息、队列、交换机都持久化,消息才不会丢失
*/
public void sendMessage(String message) {
rabbitTemplate.convertAndSend(
"myExchange", // 持久化的交换机
"routing.key", // 路由键
message, // 消息内容
msg -> {
// 设置消息为持久化模式(关键步骤)
msg.getMessageProperties()
.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
// 设置消息内容类型
msg.getMessageProperties()
.setContentType("application/json");
// 设置消息ID,用于追踪
msg.getMessageProperties()
.setMessageId(UUID.randomUUID().toString());
return msg;
}
);
log.info("💾 已发送持久化消息: {}", message);
}
}
✅ 3. 开启消费者确认机制为auto,由Spring确认消息成功后完成ack
只有消息处理成功后才会发送确认信号,确保消息不在消费阶段丢失。
消费者确认配置
# 消费者确认模式:auto模式,Spring自动管理ACK
spring.rabbitmq.listener.simple.acknowledge-mode=auto
# 预取数量:每次从队列取1条消息,确保消息不丢失
spring.rabbitmq.listener.simple.prefetch=1
# 并发消费者数量:同时处理消息的消费者数量
spring.rabbitmq.listener.simple.concurrency=2
# 最大并发数:高峰期最多的消费者数量
spring.rabbitmq.listener.simple.max-concurrency=5
消费者实现 - 确保消息不丢失
@Component
public class MessageConsumer {
/**
* 消费者监听器
* acknowledge-mode=auto 模式下,Spring会自动管理ACK
* 只有方法正常执行完成,才会发送ACK确认
*/
@RabbitListener(queues = "myQueue")
public void handleMessage(
@Payload String message,
@Header Map headers,
Channel channel) {
try {
log.info("📨 接收到消息: {}", message);
// 执行业务逻辑处理
processBusinessLogic(message);
log.info("✅ 消息处理成功");
// 方法正常结束,Spring自动发送ACK确认
// 消息会从队列中删除,不会丢失
} catch (BusinessException e) {
// 业务异常:不重新入队,避免无限重试
log.error("❌ 业务处理失败: {}", e.getMessage());
throw new AmqpRejectAndDontRequeueException(
"业务处理失败,不重新入队", e);
// Spring会发送NACK,消息不会重新入队
} catch (Exception e) {
// 系统异常:重新入队,等待重试
log.error("💥 系统异常: {}", e.getMessage());
throw e;
// Spring会发送NACK,消息重新入队等待重试
}
}
/**
* 业务逻辑处理
* 这里是实际的业务代码
*/
private void processBusinessLogic(String message) {
// 模拟业务处理
if (message.contains("error")) {
throw new BusinessException("模拟业务异常");
}
// 实际的业务逻辑
// 例如:保存到数据库、调用其他服务等
log.info("🔄 正在处理业务逻辑...");
// 模拟处理时间
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
🔄 4. 开启消费者失败重试机制,多次重试失败后将消息投递到异常交换机,交由人工处理
通过重试机制处理临时性故障,重试失败后进入死信队列,确保消息最终不会丢失。
重试机制配置
# 开启重试机制,处理临时性故障
spring.rabbitmq.listener.simple.retry.enabled=true
# 最大重试次数:失败后最多重试3次
spring.rabbitmq.listener.simple.retry.max-attempts=3
# 初始重试间隔:第一次重试等待1秒
spring.rabbitmq.listener.simple.retry.initial-interval=1000
# 重试间隔倍数:每次重试间隔翻倍
spring.rabbitmq.listener.simple.retry.multiplier=2
# 最大重试间隔:重试间隔最长不超过10秒
spring.rabbitmq.listener.simple.retry.max-interval=10000
死信队列配置 - 确保消息不丢失
@Configuration
public class ErrorHandlerConfig {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 自定义错误处理器
* 当消息重试达到最大次数后,将消息发送到死信队列
*/
@Bean
public RabbitListenerErrorHandler customErrorHandler() {
return (amqpMessage, message, exception) -> {
log.error("💥 消息处理失败,达到最大重试次数: {}",
new String(amqpMessage.getBody()));
// 发送到死信队列,确保消息不丢失
sendToDeadLetterQueue(amqpMessage);
// 发送通知给运维人员
notifyAdministrator(amqpMessage, exception);
return null;
};
}
/**
* 死信队列配置
* 用于存储重试失败的消息
*/
@Bean
public Queue deadLetterQueue() {
return QueueBuilder.durable("dead.letter.queue") // 死信队列也要持久化
.build();
}
/**
* 死信交换机配置
*/
@Bean
public DirectExchange deadLetterExchange() {
return ExchangeBuilder
.directExchange("dead.letter.exchange")
.durable(true) // 死信交换机也要持久化
.build();
}
/**
* 死信队列绑定
*/
@Bean
public Binding deadLetterBinding() {
return BindingBuilder
.bind(deadLetterQueue())
.to(deadLetterExchange())
.with("dead.letter.key");
}
/**
* 发送消息到死信队列
* 确保重试失败的消息不会丢失
*/
private void sendToDeadLetterQueue(Message message) {
try {
rabbitTemplate.send("dead.letter.exchange",
"dead.letter.key", message);
log.info("📮 消息已发送到死信队列");
} catch (Exception e) {
log.error("❌ 发送到死信队列失败: {}", e.getMessage());
// 这里可以考虑写入数据库或文件,确保消息不丢失
}
}
/**
* 通知管理员处理异常消息
*/
private void notifyAdministrator(Message message, Exception exception) {
// 发送邮件、短信或钉钉通知
log.warn("🚨 已通知管理员处理异常消息: {}", exception.getMessage());
// 实际项目中可以集成邮件服务、短信服务等
}
}
🔄 消息不丢失的完整流程
📤
生产者
Producer
🔄
交换机
Exchange
📋
队列
Queue
📥
消费者
Consumer
①生产者确认机制
ConfirmCallback + ReturnCallback
②持久化机制
durable=true + deliveryMode=2
③消费者确认机制
acknowledge-mode=auto
④失败重试机制
retry + 死信队列
⚠️
死信队列
Dead Letter Queue
👨💻
人工处理
Manual Intervention
💾
持久化存储
Persistent Storage
①生产者确认机制
确保消息从生产者可靠投递到交换机和队列。通过ConfirmCallback确认消息到达交换机,ReturnCallback确保消息能路由到队列。
②持久化机制
确保消息、队列和交换机都持久化到磁盘,即使RabbitMQ服务重启,消息也不会丢失。三者缺一不可。
③消费者确认机制
消费者处理成功后才发送确认(ack),确保消息被正确处理后才从队列中删除,避免消息丢失。
④失败重试机制
消息处理失败后自动重试,达到最大重试次数后,消息进入死信队列,触发告警,等待人工处理。
🎮 四大机制演示
💡 点击上方按钮查看四大机制如何确保消息不丢失
🎯 每个演示都会展示真实的消息处理流程
🏆 最终保障
四大机制环环相扣,从消息发送到最终消费,每个环节都有可靠性保障。
即使出现服务重启、网络故障、业务异常等情况,消息也不会丢失。