Skip to content

消息可靠投递:消息持久化 + 消费确认 + 补偿机制

用户付款成功了,但订单状态没更新。 用户没付款,但短信通知发出去了。

这两类问题,都是消息队列不可靠的典型症状。

消息可靠性是消息队列最核心的问题,也是最容易出错的环节。

消息可靠性的三个环节

一条消息从发送到消费,经历三个环节:

┌──────────────────────────────────────────────────────────────────┐
│                        消息可靠性三环节                             │
│                                                                   │
│   ┌─────────┐      ┌─────────┐      ┌─────────┐                  │
│   │ Producer │ ──> │  Broker │ ──> │ Consumer│                  │
│   │  生产者  │      │  中间件  │      │  消费者  │                  │
│   └─────────┘      └─────────┘      └─────────┘                  │
│        │                │                │                        │
│        ↓                ↓                ↓                        │
│   生产者确认         消息持久化         消费确认                     │
│   (Confirm)        (Persistence)        (Ack)                     │
│                                                                   │
└──────────────────────────────────────────────────────────────────┘

任何一环出问题,消息都可能丢失

环节可能丢失原因解决方案
生产者 → Broker网络抖动、Broker 挂了生产者确认(Confirm)
Broker 存储磁盘故障、刷盘失败消息持久化
Broker → ConsumerConsumer 挂了消费确认(Ack)

一、生产者确认(Confirm)

Kafka Producer 配置

java
// Kafka 生产者确认配置
properties.put("bootstrap.servers", "localhost:9092");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

// 关键配置:acks 确认级别
properties.put("acks", "all");  // 所有副本确认才返回成功

// 开启重试
properties.put("retries", 3);
properties.put("retry.backoff.ms", 100);

// 开启幂等性(防止重复发送)
properties.put("enable.idempotence", true);

KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

// 发送消息
ProducerRecord<String, String> record = new ProducerRecord<>("orders", orderId, order);
producer.send(record, (metadata, exception) -> {
    if (exception != null) {
        // 发送失败,记录日志,重试发送
        log.error("发送消息失败: {}", exception.getMessage());
        retrySend(record);
    } else {
        // 发送成功
        log.info("消息发送成功: topic={}, partition={}, offset={}",
            metadata.topic(), metadata.partition(), metadata.offset());
    }
});

acks 配置详解

acks 值含义可靠性性能适用场景
0不等待确认最高允许少量丢失的日志
1Leader 确认一般场景
all / -1所有 ISR 确认金融级场景
java
// acks = all 的详细配置
properties.put("min.insync.replicas", 2);  // 最少 2 个副本确认
properties.put("acks", "all");

// 场景分析:
// 3 个副本,min.insync.replicas = 2
// 只要 2 个副本写入成功,就算成功
// 如果只有 1 个副本存活,写入会失败

RabbitMQ Publisher Confirm

java
// RabbitMQ 开启确认
connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
connectionFactory.setPublisherReturns(true);

@Bean
public RabbitTemplate rabbitTemplate() {
    RabbitTemplate template = new RabbitTemplate(connectionFactory);
    
    // 设置确认回调
    template.setConfirmCallback((correlationData, ack, cause) -> {
        if (ack) {
            // Broker 确认收到
            log.info("消息发送成功: {}", correlationData);
        } else {
            // Broker 没确认
            log.error("消息发送失败: {}, cause: {}", correlationData, cause);
            // 重试发送
            retrySend(correlationData);
        }
    });
    
    // 设置返回回调(消息无法路由)
    template.setReturnsCallback(returned -> {
        log.error("消息无法路由: exchange={}, routingKey={}, replyCode={}",
            returned.getExchange(), returned.getRoutingKey(), returned.getReplyCode());
    });
    
    return template;
}

// 发送消息
public void sendOrder(Order order) {
    CorrelationData correlationData = new CorrelationData(order.getId());
    
    rabbitTemplate.convertAndSend(
        "orders.exchange",
        "orders.created",
        order,
        correlationData
    );
}

二、消息持久化

Kafka 消息持久化

java
// Kafka 持久化配置
properties.put("bootstrap.servers", "localhost:9092");

// 消息持久化关键配置
properties.put("acks", "all");           // 所有副本确认
properties.put("retries", 3);            // 重试次数

// 刷盘策略:建议使用 log.flush.interval.messages
// 依赖操作系统刷盘(更快,更安全)
// 不要设置太频繁的强制刷盘,会影响性能

// 消息本身设置持久化
ProducerRecord<String, String> record = new ProducerRecord<>(
    "orders",
    order.getId(),
    JSON.toJSONString(order)
);
// 消息会持久化到磁盘

Kafka 持久化原理

消息写入 → Page Cache(内存) → 异步刷盘 → 磁盘

操作系统会定期将 Page Cache 刷到磁盘
Kafka 也可根据配置强制刷盘

可靠性 vs 性能:
- 同步刷盘:慢,但最可靠
- 异步刷盘:快,可能丢失少量消息

RabbitMQ 消息持久化

java
// RabbitMQ 持久化三要素

// 1. 交换机持久化
@Bean
public DirectExchange ordersExchange() {
    // durable = true 表示持久化
    return new DirectExchange("orders.exchange", true, false);
}

// 2. 队列持久化
@Bean
public Queue ordersQueue() {
    // durable = true 表示持久化
    return new Queue("orders.queue", true);
}

// 3. 消息持久化
public void sendOrder(Order order) {
    rabbitTemplate.convertAndSend("orders.exchange", "orders.created", order, message -> {
        // DeliveryMode.PERSISTENT 表示消息持久化
        message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
        return message;
    });
}

持久化 vs 性能权衡

配置可靠性性能建议
内存 + 异步刷盘默认配置
同步刷盘金融级场景
不持久化最高测试环境

三、消费确认( Ack)

Kafka 消费者 offset 提交

java
// Kafka 消费确认方式

// 方式一:自动提交(默认,可能丢消息)
properties.put("enable.auto.commit", true);
properties.put("auto.commit.interval.ms", 5000);  // 每 5 秒提交一次

// 方式二:手动提交(推荐,不丢消息)
properties.put("enable.auto.commit", false);

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("orders"));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    
    for (ConsumerRecord<String, String> record : records) {
        try {
            processOrder(record.value());
            
            // 手动提交 offset
            // 在业务处理成功后提交
            consumer.commitSync();  // 同步提交
            // 或
            consumer.commitAsync(); // 异步提交
            
        } catch (Exception e) {
            // 业务处理失败,不提交 offset
            // 下次重新消费这条消息
            log.error("处理消息失败", e);
        }
    }
}

RabbitMQ 消费确认

java
// RabbitMQ 手动确认
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    
    // 手动确认模式
    factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
    
    // prefetch 控制预取数量
    factory.setPrefetchCount(10);
    
    return factory;
}

// 消费代码
@RabbitListener(queues = "orders.queue")
public void handleOrder(Order order, Channel channel, 
                        @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
    try {
        // 1. 业务处理
        processOrder(order);
        
        // 2. 确认消息
        // multiple = false:只确认当前这条
        // multiple = true:确认所有小于等于 tag 的消息
        channel.basicAck(tag, false);
        
    } catch (Exception e) {
        log.error("处理消息失败", e);
        
        // 3. 处理失败,根据情况选择:
        
        // 方案一:重新入队(重新消费)
        // channel.basicNack(tag, false, true);
        
        // 方案二:拒绝消息(不重试,进入死信队列)
        channel.basicNack(tag, false, false);
        
        // 方案三:拒绝消息,稍后重试
        // channel.basicNack(tag, false, true);
        // 或
        // Thread.sleep(1000);
        // channel.basicAck(tag, false);
    }
}

消费幂等性设计

即使有了确认机制,网络抖动仍可能导致消息被重复消费。消费端必须实现幂等性

java
// 方案一:唯一键去重
@Service
public class OrderService {
    
    @Autowired
    private OrderRepository orderRepository;
    
    public void processOrder(Order order) {
        // 检查订单是否已处理
        if (orderRepository.existsByOrderId(order.getOrderId())) {
            log.info("订单已处理,跳过: {}", order.getOrderId());
            return;
        }
        
        // 业务处理
        // ...
        orderRepository.save(order);
    }
}

// 方案二:Redis 分布式锁
@Service
public class OrderService {
    
    @Autowired
    private RedisTemplate<String, String> redisTemplate;
    
    public void processOrder(Order order) {
        String lockKey = "order:lock:" + order.getOrderId();
        Boolean locked = redisTemplate.opsForValue().setIfAbsent(lockKey, "1", 10, TimeUnit.SECONDS);
        
        if (!locked) {
            throw new RuntimeException("订单正在处理中");
        }
        
        try {
            // 业务处理
            processOrderInternal(order);
        } finally {
            redisTemplate.delete(lockKey);
        }
    }
}

// 方案三:数据库唯一索引
// CREATE UNIQUE INDEX idx_order_id ON orders(order_id);
// 插入重复订单会抛异常

四、补偿机制

定时任务对账

java
// 定时检查消息处理情况
@Service
public class MessageCompensationService {
    
    @Autowired
    private OrderRepository orderRepository;
    
    @Autowired
    private MessageLogRepository messageLogRepository;
    
    // 每分钟执行
    @Scheduled(fixedRate = 60000)
    public void checkPendingMessages() {
        // 1. 查找状态为"发送中"的消息,超过 5 分钟没变
        List<MessageLog> pendingMessages = messageLogRepository
            .findPendingMessages(LocalDateTime.now().minusMinutes(5));
        
        for (MessageLog msg : pendingMessages) {
            // 2. 检查业务表是否有对应记录
            boolean exists = orderRepository.existsByMessageId(msg.getMessageId());
            
            if (exists) {
                // 业务已处理,更新消息状态
                msg.setStatus("COMPLETED");
                messageLogRepository.save(msg);
            } else {
                // 业务未处理,尝试重新发送
                resendMessage(msg);
            }
        }
    }
}

// 消息发送记录表
@Entity
public class MessageLog {
    @Id
    private String messageId;
    private String status;  // PENDING, SENT, COMPLETED, FAILED
    private int retryCount;
    private LocalDateTime createTime;
    private LocalDateTime updateTime;
    private LocalDateTime nextRetryTime;
}

死信队列兜底

java
// 配置死信队列处理无法消费的消息
@Configuration
public class DeadLetterConfig {
    
    @Bean
    public DirectExchange deadLetterExchange() {
        return new DirectExchange("dlx.exchange");
    }
    
    @Bean
    public Queue deadLetterQueue() {
        return new Queue("dlq.orders");
    }
    
    @Bean
    public Binding deadLetterBinding() {
        return BindingBuilder.bind(deadLetterQueue())
            .to(deadLetterExchange())
            .with("orders.dead");
    }
}

// 原队列配置死信
@Bean
public Queue ordersQueue() {
    Map<String, Object> args = new HashMap<>();
    args.put("x-dead-letter-exchange", "dlx.exchange");
    args.put("x-dead-letter-routing-key", "orders.dead");
    // 消息 TTL:7 天后自动删除
    args.put("x-message-ttl", 604800000);
    return new Queue("orders.queue", true, false, false, args);
}

// 死信队列消费者
@RabbitListener(queues = "dlq.orders")
public void handleDeadLetter(Order order, Channel channel, 
                             @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
    log.error("死信消息: {}", order);
    
    // 人工处理或告警
    alertService.send("订单消息进入死信队列: " + order.getOrderId());
    
    // 确认消息(避免无限重试)
    channel.basicAck(tag, false);
}

五、完整可靠性方案

┌─────────────────────────────────────────────────────────────────┐
│                     消息可靠性完整方案                            │
│                                                                  │
│  Producer                                                         │
│    │                                                             │
│    ├─ 发送前:记录消息到数据库(状态=PENDING)                     │
│    │                                                             │
│    ├─ 发送时:Confirm 回调确认收到                                │
│    │   └─ 超时未确认:重试发送                                    │
│    │                                                             │
│    └─ 发送成功:更新消息状态=SENT                                 │
│                                                                  │
│  Broker                                                          │
│    │                                                             │
│    ├─ 持久化存储(磁盘)                                          │
│    │                                                             │
│    └─ 副本同步(多副本)                                          │
│                                                                  │
│  Consumer                                                        │
│    │                                                             │
│    ├─ 消费消息                                                    │
│    │                                                             │
│    ├─ 业务处理                                                    │
│    │   └─ 业务成功:更新消息状态=COMPLETED                        │
│    │   └─ 业务失败:记录失败原因                                  │
│    │                                                             │
│    └─ 手动 Ack(业务处理成功后)                                  │
│                                                                  │
│  补偿机制                                                         │
│    │                                                             │
│    ├─ 定时任务:检查 PENDING 消息,重试                           │
│    │                                                             │
│    ├─ 死信队列:无法消费的消息兜底                                │
│    │                                                             │
│    └─ 人工干预:告警 + 人工处理                                   │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

总结

消息可靠投递三板斧:

环节方案配置
生产确认Confirm 机制acks=all, retries=3
持久化磁盘持久化durable=true, deliveryMode=PERSISTENT
消费确认手动 AckacknowledgeMode=MANUAL
补偿定时对账 + 死信队列兜底保障

记住:没有任何方案能 100% 保证消息不丢,只能做到极高可靠。需要在可靠性、性能、成本之间做权衡。


留给你的问题

假设你的支付系统用消息队列通知下游商户:

  1. 支付成功后需要通知 10 个下游商户,每个商户调用 API。如果某个商户 API 挂了,消息怎么处理?
  2. 用户重复点击支付按钮,可能发送两条支付消息。如何保证只处理一次?
  3. 支付消息发出去了,但下游商户 API 响应慢,导致超时。支付系统怎么知道下游到底处理了没有?
  4. 如果 RabbitMQ 整个集群挂了,你的消息还在内存里没持久化到磁盘。你怎么尽可能减少损失?

思考这些问题,能帮助你设计更可靠的消息系统。

基于 VitePress 构建