Skip to content

Kafka 消息发送模式:同步、异步、异步回调

你真的会用 producer.send() 吗?

同一个方法,三种用法,性能和可靠性天差地别。

一、发送模式总览

┌─────────────────────────────────────────────────────────────────┐
│                    Kafka 三种发送模式                             │
│                                                                  │
│  ┌──────────────┐   ┌──────────────┐   ┌──────────────┐        │
│  │   同步发送   │   │   异步发送   │   │  异步回调    │        │
│  │  send().get()│   │   send()     │   │  send(cb)    │        │
│  └──────────────┘   └──────────────┘   └──────────────┘        │
│        │                  │                  │                    │
│        ↓                  ↓                  ↓                    │
│   等待返回            立即返回            立即返回                  │
│   阻塞等待           返回 Future        回调通知                   │
│                                                                  │
│   性能:低              中                高                       │
│   可靠性:高            中                高(配合重试)            │
│   适用:关键消息        普通消息          高吞吐场景                │
└─────────────────────────────────────────────────────────────────┘

二、同步发送

2.1 基础用法

java
// 同步发送:等待消息发送成功
public class SyncProducer {
    
    public void sendSync() {
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        
        ProducerRecord<String, String> record = new ProducerRecord<>(
            "order-topic",
            "order-123",
            "order data"
        );
        
        try {
            // send() 返回 Future,调用 get() 阻塞等待结果
            RecordMetadata metadata = producer.send(record).get();
            
            System.out.println("发送成功!");
            System.out.println("Topic: " + metadata.topic());
            System.out.println("Partition: " + metadata.partition());
            System.out.println("Offset: " + metadata.offset());
            
        } catch (ExecutionException e) {
            // 发送失败
            System.out.println("发送失败: " + e.getCause());
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

2.2 同步发送的时序

┌─────────────────────────────────────────────────────────────────┐
│                    同步发送时序                                  │
│                                                                  │
│  Producer Thread                                                │
│  │                                                              │
│  │  1. send(record).get()                                       │
│  │ ───────────────────────────────────────────────────────────── │
│  │                                                              │
│  │  2. 等待...(阻塞)                                          │
│  │     │                                                        │
│  │     ├── 选择分区                                             │
│  │     ├── 序列化                                               │
│  │     ├── 追加到缓冲区                                         │
│  │     ├── Sender 线程发送                                       │
│  │     └── 等待 ACK                                             │
│  │                                                              │
│  │  3. 返回结果                                                 │
│  │ ←────────────────────────────────────────────────────────────│
│  │                                                              │
│  │  4. 继续下一条                                               │
│  │                                                              │
│  时间:每条消息约 5-20ms(取决于网络和 Broker)                   │
└─────────────────────────────────────────────────────────────────┘

2.3 同步发送的适用场景

java
// 场景 1:订单创建
// 订单创建后必须确保消息发送成功
public class OrderCreation {
    
    public void createOrder(Order order) {
        // 1. 创建订单
        orderRepository.save(order);
        
        // 2. 发送订单消息(必须成功)
        try {
            producer.send(new ProducerRecord<>("order-topic", 
                order.getId(), order)).get();
        } catch (Exception e) {
            // 发送失败,订单创建也要回滚
            throw new RuntimeException("订单创建失败", e);
        }
        
        // 3. 返回成功
        return order;
    }
}

// 场景 2:支付通知
// 支付完成后必须通知下游
public class PaymentNotification {
    
    public void notifyPayment(Payment payment) {
        // 发送失败不能算支付成功
        RecordMetadata metadata = producer.send(
            new ProducerRecord<>("payment-topic", 
                payment.getId(), payment)
        ).get();
        
        log.info("支付通知已发送: {}", metadata.offset());
    }
}

三、异步发送

3.1 基础用法

java
// 异步发送:不等待发送结果
public class AsyncProducer {
    
    public void sendAsync() {
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        
        ProducerRecord<String, String> record = new ProducerRecord<>(
            "log-topic",
            "log-data"
        );
        
        // send() 立即返回 Future,不阻塞
        Future<RecordMetadata> future = producer.send(record);
        
        // 可以继续处理其他业务
        doOtherWork();
        
        // 如果需要结果,可以稍后获取
        try {
            RecordMetadata metadata = future.get(5, TimeUnit.SECONDS);
            log.info("发送成功: offset={}", metadata.offset());
        } catch (Exception e) {
            log.error("发送失败", e);
        }
    }
}

3.2 异步发送的时序

┌─────────────────────────────────────────────────────────────────┐
│                    异步发送时序                                  │
│                                                                  │
│  Producer Thread                    Sender Thread                │
│  │                                 │                            │
│  │  1. send(record)                │                            │
│  │  2. 立即返回 Future             │                            │
│  │ ──────────────────────────────────                           │
│  │                                                              │
│  │  3. 继续下一条                   │ 4. 后台发送                 │
│  │  4. 继续下一条                   │ 5. 处理 ACK                │
│  │  5. 继续下一条                   │                            │
│  │                                 │                            │
│  │  6. future.get()(如果需要)     │                            │
│  │ ←─────────────────────────────────                           │
│  │                                                              │
│  时间:几乎为 0ms                                               │
└─────────────────────────────────────────────────────────────────┘

3.3 异步发送的问题

java
// 异步发送的问题:不知道发送结果
public class AsyncProducerProblem {
    
    public void problem() {
        for (int i = 0; i < 1000; i++) {
            // 发送时不等待结果
            producer.send(new ProducerRecord<>("topic", "msg" + i));
        }
        
        // 问题:循环结束后,不知道哪些消息发送成功了
        // 可能消息还在缓冲区,还没发送出去
    }
    
    // 解决:循环结束后等待缓冲区清空
    public void flush() {
        // 发送所有消息
        for (int i = 0; i < 1000; i++) {
            producer.send(new ProducerRecord<>("topic", "msg" + i));
        }
        
        // 等待所有消息发送完成
        producer.flush();
        
        // 此时所有消息要么发送成功,要么还在重试
    }
}

四、异步回调

4.1 基础用法

java
// 异步回调:发送完成后自动通知
public class CallbackProducer {
    
    public void sendWithCallback() {
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        
        ProducerRecord<String, String> record = new ProducerRecord<>(
            "order-topic",
            "order-123",
            "order data"
        );
        
        // 发送时传入回调
        producer.send(record, new Callback() {
            @Override
            public void onCompletion(RecordMetadata metadata, Exception e) {
                if (e == null) {
                    // 发送成功
                    System.out.println("发送成功!");
                    System.out.println("Topic: " + metadata.topic());
                    System.out.println("Partition: " + metadata.partition());
                    System.out.println("Offset: " + metadata.offset());
                } else {
                    // 发送失败
                    System.err.println("发送失败: " + e.getMessage());
                    // 这里可以重试或记录
                }
            }
        });
        
        // 继续处理其他业务
        doOtherWork();
    }
}

4.2 Lambda 简化写法

java
// 使用 Lambda 简化回调
public class LambdaCallback {
    
    public void sendWithLambda() {
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        
        ProducerRecord<String, String> record = new ProducerRecord<>(
            "order-topic",
            order.getId(),
            JSON.toJSONString(order)
        );
        
        // Lambda 回调
        producer.send(record, (metadata, e) -> {
            if (e != null) {
                log.error("发送订单消息失败: orderId={}, error={}", 
                    order.getId(), e.getMessage());
                // 失败处理:重试、告警、记录
            } else {
                log.info("发送订单消息成功: orderId={}, offset={}", 
                    order.getId(), metadata.offset());
            }
        });
    }
}

4.3 回调的时序

┌─────────────────────────────────────────────────────────────────┐
│                    异步回调时序                                  │
│                                                                  │
│  Producer Thread                    Sender Thread                │
│  │                                 │                            │
│  │  1. send(record, callback)      │                            │
│  │  2. 立即返回                     │                            │
│  │ ──────────────────────────────────                           │
│  │                                                              │
│  │  3. 继续处理业务                   │ 4. 后台发送              │
│  │                                   │ 5. 收到 ACK              │
│  │                                   │ 6. 执行回调               │
│  │                                   │ ──────────────────────────│
│  │                                                              │
│  │                                   │ 回调在 Sender 线程执行    │
│  │                                   │ 注意线程安全问题!         │
└─────────────────────────────────────────────────────────────────┘

4.4 回调的线程安全

java
// 回调的线程安全问题
public class CallbackThreadSafety {
    
    public void threadSafety() {
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        
        // 回调在 Sender 线程执行,不在 Producer 主线程
        producer.send(record, (metadata, e) -> {
            // 这里的代码可能和主线程并发执行
            // 不要访问主线程的非线程安全对象
            
            // 错误写法:
            // sharedMap.put(key, value);  // 如果 sharedMap 不是线程安全的
            
            // 正确写法:
            // 使用线程安全的方式处理
            log.info("offset={}", metadata.offset());
        });
    }
    
    // 如果需要访问主线程状态,使用线程安全的方式
    public void safeCallback() {
        ConcurrentHashMap<String, Order> pendingOrders = new ConcurrentHashMap<>();
        
        producer.send(record, (metadata, e) -> {
            if (e == null) {
                // 从线程安全的 Map 中移除
                pendingOrders.remove(record.key());
                log.info("发送成功: {}", metadata.offset());
            } else {
                // 失败处理
                pendingOrders.get(record.key()); // 安全访问
            }
        });
    }
}

五、发送模式对比

模式代码阻塞性能可靠性适用场景
同步send().get()完全阻塞关键消息
异步Future.get()按需阻塞需要结果的普通消息
回调send(Callback)不阻塞高(配合重试)高吞吐场景

六、实战:发送 100 万条消息

6.1 低效写法

java
// 低效写法:逐条同步发送
public class BadProducer {
    
    public void sendBad() {
        long start = System.currentTimeMillis();
        
        for (int i = 0; i < 1000000; i++) {
            try {
                producer.send(new ProducerRecord<>("topic", 
                    "key-" + i, "value-" + i)).get();
            } catch (Exception e) {
                log.error("发送失败", e);
            }
        }
        
        long cost = System.currentTimeMillis() - start;
        log.info("发送 100 万条消息,耗时: {} ms", cost);
        // 结果:可能需要几十分钟
    }
}

6.2 异步批量发送

java
// 高效写法:异步 + 批量
public class GoodProducer {
    
    public void sendGood() {
        long start = System.currentTimeMillis();
        
        // 使用 CountDownLatch 等待完成
        CountDownLatch latch = new CountDownLatch(1000000);
        
        AtomicLong successCount = new AtomicLong(0);
        AtomicLong failCount = new AtomicLong(0);
        
        for (int i = 0; i < 1000000; i++) {
            final int index = i;
            producer.send(new ProducerRecord<>("topic", 
                "key-" + i, "value-" + i), (metadata, e) -> {
                
                if (e == null) {
                    successCount.incrementAndGet();
                } else {
                    failCount.incrementAndGet();
                }
                latch.countDown();
            });
        }
        
        // 等待所有消息发送完成
        latch.await(10, TimeUnit.MINUTES);
        
        long cost = System.currentTimeMillis() - start;
        log.info("发送 100 万条消息,耗时: {} ms", cost);
        log.info("成功: {}, 失败: {}", successCount.get(), failCount.get());
        // 结果:可能只需要几分钟
    }
}

6.3 最高效写法:批处理 + 压缩

java
// 最高效写法:充分利用批处理和压缩
public class BestProducer {
    
    public void sendBest() {
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        
        long start = System.currentTimeMillis();
        long count = 0;
        
        // 充分利用 batch.size 和 linger.ms
        // 消息会先在缓冲区积累,达到阈值后批量发送
        
        for (int i = 0; i < 1000000; i++) {
            producer.send(new ProducerRecord<>("topic", 
                "key-" + i, "value-" + i));
            count++;
            
            // 每 10 万条打印一次进度
            if (count % 100000 == 0) {
                producer.flush();
                log.info("已发送: {}", count);
            }
        }
        
        // 确保所有消息发送完成
        producer.flush();
        
        long cost = System.currentTimeMillis() - start;
        log.info("发送 {} 万条消息,耗时: {} ms", count / 10000, cost);
    }
}

七、发送配置优化

7.1 批处理配置

java
// 批处理配置
public class BatchConfig {
    
    // 关键参数
    
    // batch.size:单个批次最大字节数
    // 太大:延迟增加
    // 太小:批次效果不明显
    // 推荐:16KB - 128KB
    props.put("batch.size", 32768);  // 32KB
    
    // linger.ms:等待凑够批次的时间
    // 为 0:无等待,有消息就发送
    // > 0:最多等待这么久,凑够批次
    // 推荐:5-20ms(根据延迟敏感度调整)
    props.put("linger.ms", 10);
    
    // buffer.memory:缓冲区总大小
    // 太小:发送过快时阻塞
    // 推荐:至少 batch.size × 并发数 × 2
    props.put("buffer.memory", 67108864);  // 64MB
}

7.2 压缩配置

java
// 压缩配置
public class CompressionConfig {
    
    // compression.type
    // none:不压缩
    // gzip:压缩率高,CPU 开销大
    // snappy:压缩率中等,CPU 开销小
    // lz4:压缩率高,速度快(推荐)
    
    props.put("compression.type", "lz4");
    
    // 压缩位置:Producer 端压缩
    // 优点:减少网络传输
    // 缺点:Broker 存储压缩消息
}

7.3 推荐配置

java
// 推荐的高吞吐配置
public class RecommendedConfig {
    
    Properties props = new Properties();
    
    // 连接
    props.put("bootstrap.servers", "localhost:9092,localhost:9093,localhost:9094");
    
    // 序列化
    props.put("key.serializer", "StringSerializer");
    props.put("value.serializer", "StringSerializer");
    
    // 批处理
    props.put("batch.size", 65536);       // 64KB
    props.put("linger.ms", 20);           // 等待 20ms
    props.put("buffer.memory", 134217728); // 128MB
    
    // 压缩
    props.put("compression.type", "lz4");
    
    // 重试
    props.put("retries", 3);
    props.put("retry.backoff.ms", 100);
    
    // 可靠性(可选)
    // props.put("acks", "all");
    // props.put("enable.idempotence", true);
}

八、常见问题

8.1 消息丢失

java
// 问题:异步发送后立即关闭 Producer
public class MessageLossProblem {
    
    public void problem() {
        for (int i = 0; i < 100; i++) {
            producer.send(new ProducerRecord<>("topic", "msg" + i));
        }
        
        // 问题:可能还有消息在缓冲区
        producer.close();  // 缓冲区中的消息可能丢失
    }
    
    // 解决:确保消息发送完成
    public void solution() {
        for (int i = 0; i < 100; i++) {
            producer.send(new ProducerRecord<>("topic", "msg" + i));
        }
        
        // flush() 等待所有消息发送完成
        producer.flush();
        
        // 然后再关闭
        producer.close();
    }
}

8.2 顺序问题

java
// 问题:异步发送无法保证顺序
public class OrderingProblem {
    
    public void problem() {
        // 发送 3 条消息,希望按顺序处理
        producer.send(new ProducerRecord<>("topic", "1"));  // 发送
        producer.send(new ProducerRecord<>("topic", "2"));  // 发送
        producer.send(new ProducerRecord<>("topic", "3"));  // 发送
        
        // 由于是异步的,可能的到达顺序是:
        // 3, 1, 2(乱序)
    }
    
    // 解决:使用同步发送或带 Key
    public void solution() {
        // 方案 1:同步发送
        for (int i = 0; i < 3; i++) {
            producer.send(new ProducerRecord<>("topic", 
                String.valueOf(i), "msg" + i)).get();
        }
        
        // 方案 2:使用相同 Key(发到同一分区,保证有序)
        for (int i = 0; i < 3; i++) {
            producer.send(new ProducerRecord<>("topic", 
                "same-key", "msg" + i));  // 同一分区,有序
        }
    }
}

8.3 性能调优

java
// 性能调优思路
public class PerformanceTuning {
    
    // 吞吐量低?
    // 1. 增加 batch.size
    // 2. 增加 linger.ms
    // 3. 增加 buffer.memory
    // 4. 开启压缩
    // 5. 增加网络线程数
    
    // 延迟高?
    // 1. 减少 batch.size
    // 2. 减少 linger.ms
    // 3. 减少 retries
    // 4. 检查网络
    
    // 内存溢出?
    // 1. 减少 buffer.memory
    // 2. 减少 batch.size
    // 3. 增加发送频率
}

总结

发送模式选择指南:

场景推荐模式原因
订单/支付消息同步必须确保成功
日志/监控数据异步回调高吞吐,允许少量丢失
大批量导入批处理 + flush追求极致吞吐
消息不重复关键同步 + 幂等性确保成功 + 防重复

选择正确的发送模式,是保证系统可靠性和性能的关键。


留给你的问题

  1. 异步发送的丢失风险:异步发送时,producer.send() 成功了但回调还没执行时应用挂了,消息会丢失吗?缓冲区里的消息呢?

  2. 回调异常处理:回调里抛出异常会怎样?Kafka 会重试吗?

  3. 批处理的边界:设置 batch.size=64KB, linger.ms=20ms,如果 1ms 内就凑够了 64KB,消息会等 20ms 再发送吗?

  4. 多线程安全KafkaProducer 线程安全吗?多个线程共用一个 Producer 和各自用一个 Producer,哪个更好?

思考这些问题,能帮你更好地使用 Producer。

基于 VitePress 构建