Skip to content

gRPC 流式调用:为什么普通 RPC 不够用?

你可能用过 gRPC 的普通 RPC 调用:

java
// 普通 RPC:一次请求,一次响应
Order order = orderService.getOrderById(1001L);

这很简单,就像打电话——你问一句,对方答一句。

但如果你的业务是这样的场景呢?

  • 文件上传:几百 MB 的文件,怎么传?
  • 实时监控:服务端需要不断推送数据给客户端
  • 聊天消息:双向持续通信

普通 RPC 解决不了这些问题,你需要流式调用。


三种流式 RPC

gRPC 支持四种 RPC 类型:

类型说明比喻
普通 RPC一发一收打电话
服务端流 RPC一发多收发邮件
客户端流 RPC多发一收批量上传
双向流 RPC多发多收微信聊天

服务端流 RPC:Server Streaming

场景

客户端发送一个请求,服务端返回一系列消息。

典型场景:

  • 获取历史订单(分页返回)
  • 实时日志推送
  • AI 模型推理的逐步输出

定义接口

protobuf
syntax = "proto3";

package order;

option java_package = "com.example.order";

service OrderService {
    // 普通 RPC
    rpc GetOrder(GetOrderRequest) returns (Order);
    
    // 服务端流 RPC:搜索订单,返回流式结果
    rpc SearchOrders(SearchRequest) returns (stream Order);
    
    // 服务端流 RPC:获取订单变更通知
    rpc WatchOrderStatus(WatchRequest) returns (stream OrderStatusChange);
}

message SearchRequest {
    string keyword = 1;
    int32 page_size = 10;
}

message WatchRequest {
    int64 order_id = 1;
}

message OrderStatusChange {
    int64 order_id = 1;
    OrderStatus old_status = 2;
    OrderStatus new_status = 3;
    int64 change_time = 4;
}

enum OrderStatus {
    UNKNOWN = 0;
    PENDING = 1;
    PAID = 2;
    SHIPPED = 3;
    COMPLETED = 4;
    CANCELLED = 5;
}

服务端实现

java
public class OrderServiceImpl extends OrderServiceGrpc.OrderServiceImplBase {
    
    private final OrderRepository orderRepository;
    
    @Override
    public void searchOrders(SearchRequest request,
                           StreamObserver<Order> responseObserver) {
        // 模拟分页查询
        String keyword = request.getKeyword();
        int pageSize = request.getPageSize();
        
        // 第一次查询获取总数
        int total = orderRepository.countByKeyword(keyword);
        int page = 0;
        
        while (page * pageSize < total) {
            // 查询每一页
            List<Order> orders = orderRepository
                .findByKeyword(keyword, page++, pageSize);
            
            // 流式返回每一条
            for (Order order : orders) {
                // 每次调用 onNext,发送一条消息给客户端
                responseObserver.onNext(order);
                
                // 模拟限流:每秒最多返回 100 条
                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    responseObserver.onError(e);
                    return;
                }
            }
        }
        
        // 发送完成信号
        responseObserver.onCompleted();
    }
}

客户端调用

java
public class OrderClient {
    
    private final ManagedChannel channel;
    private final OrderServiceGrpc.OrderServiceBlockingStub blockingStub;
    private final OrderServiceGrpc.OrderServiceStub asyncStub;
    
    public void searchOrdersDemo() {
        SearchRequest request = SearchRequest.newBuilder()
            .setKeyword("手机")
            .setPageSize(100)
            .build();
        
        // 使用阻塞调用获取流
        Iterator<Order> iterator = blockingStub.searchOrders(request);
        
        int count = 0;
        System.out.println("开始接收订单...");
        
        while (iterator.hasNext()) {
            Order order = iterator.next();
            count++;
            System.out.printf("收到订单 #%d: %s%n", 
                order.getOrderId(), order.getCustomerName());
        }
        
        System.out.println("共收到 " + count + " 个订单");
    }
}

客户端流 RPC:Client Streaming

场景

客户端发送一系列消息,服务端返回一个响应。

典型场景:

  • 批量上传订单
  • 大文件分片上传
  • 批量导入数据

定义接口

protobuf
service OrderService {
    // 客户端流 RPC:批量创建订单
    rpc BatchCreateOrders(stream CreateOrderRequest) returns (BatchCreateResponse);
    
    // 客户端流 RPC:批量取消订单
    rpc BatchCancelOrders(stream CancelOrderRequest) returns (BatchCancelResponse);
}

message CreateOrderRequest {
    string customer_name = 1;
    int32 total_amount = 2;
    repeated string sku_codes = 3;
}

message BatchCreateResponse {
    int32 success_count = 1;
    int32 fail_count = 2;
    repeated int64 created_order_ids = 3;
    repeated string errors = 4;
}

服务端实现

java
@Override
public StreamObserver<CreateOrderRequest> batchCreateOrders(
        StreamObserver<BatchCreateResponse> responseObserver) {
    
    // 返回 StreamObserver 用于接收客户端消息
    return new StreamObserver<CreateOrderRequest>() {
        
        private List<Long> successIds = new ArrayList<>();
        private List<String> errors = new ArrayList<>();
        
        // 每收到一条客户端消息调用一次
        @Override
        public void onNext(CreateOrderRequest request) {
            try {
                // 校验参数
                if (request.getTotalAmount() <= 0) {
                    throw new IllegalArgumentException("金额必须大于 0");
                }
                
                // 创建订单
                Order order = orderService.createOrder(
                    request.getCustomerName(),
                    request.getTotalAmount(),
                    request.getSkuCodesList()
                );
                
                successIds.add(order.getId());
                
            } catch (Exception e) {
                errors.add("订单 " + request.getCustomerName() 
                    + " 创建失败: " + e.getMessage());
            }
        }
        
        // 客户端发送完毕,客户端会调用 onCompleted
        @Override
        public void onCompleted() {
            // 所有消息都接收完了,返回汇总结果
            BatchCreateResponse response = BatchCreateResponse.newBuilder()
                .setSuccessCount(successIds.size())
                .setFailCount(errors.size())
                .addAllCreatedOrderIds(successIds)
                .addAllErrors(errors)
                .build();
            
            responseObserver.onNext(response);
            responseObserver.onCompleted();
        }
        
        @Override
        public void onError(Throwable t) {
            // 客户端出错,记录日志
            log.error("批量创建订单失败", t);
        }
    };
}

客户端调用

java
public void batchCreateOrdersDemo() {
    // 使用异步 stub
    StreamObserver<CreateOrderRequest> requestObserver = 
        asyncStub.batchCreateOrders(new StreamObserver<BatchCreateResponse>() {
            
            private BatchCreateResponse finalResponse;
            
            @Override
            public void onNext(BatchCreateResponse response) {
                // 服务端返回最终结果
                this.finalResponse = response;
            }
            
            @Override
            public void onCompleted() {
                // 收到响应
                System.out.println("批量创建完成:");
                System.out.println("  成功: " + finalResponse.getSuccessCount());
                System.out.println("  失败: " + finalResponse.getFailCount());
            }
            
            @Override
            public void onError(Throwable t) {
                System.err.println("调用失败: " + t.getMessage());
            }
        });
    
    // 模拟批量发送请求
    for (int i = 0; i < 100; i++) {
        CreateOrderRequest request = CreateOrderRequest.newBuilder()
            .setCustomerName("客户 #" + i)
            .setTotalAmount(100 + i)
            .addSkuCodes("SKU00" + (i % 10))
            .build();
        
        // 发送请求
        requestObserver.onNext(request);
        
        // 模拟限流
        try {
            Thread.sleep(10);
        } catch (InterruptedException e) {
            requestObserver.onError(e);
            return;
        }
    }
    
    // 发送完毕,必须调用 onCompleted
    requestObserver.onCompleted();
}

双向流 RPC:Bidirectional Streaming

场景

客户端和服务端都可以随时发送消息,互不阻塞。

典型场景:

  • 实时聊天
  • 在线协作编辑
  • 游戏同步

定义接口

protobuf
service ChatService {
    // 双向流 RPC:聊天室
    rpc Chat(stream ChatMessage) returns (stream ChatMessage);
}

message ChatMessage {
    string session_id = 1;
    string user_id = 2;
    string content = 3;
    int64 timestamp = 4;
}

服务端实现

java
@Override
public StreamObserver<ChatMessage> chat(
        StreamObserver<ChatMessage> responseObserver) {
    
    return new StreamObserver<ChatMessage>() {
        
        private final Map<String, Set<StreamObserver<ChatMessage>>> 
            sessions = new ConcurrentHashMap<>();
        
        @Override
        public void onNext(ChatMessage message) {
            String sessionId = message.getSessionId();
            
            // 获取该会话的所有在线用户
            Set<StreamObserver<ChatMessage>> subscribers = 
                sessions.computeIfAbsent(sessionId, 
                    k -> ConcurrentHashMap.newKeySet());
            
            // 广播消息给所有订阅者(包括发送者)
            ChatMessage broadcastMsg = ChatMessage.newBuilder(message)
                .setTimestamp(System.currentTimeMillis())
                .build();
            
            for (StreamObserver<ChatMessage> subscriber : subscribers) {
                subscriber.onNext(broadcastMsg);
            }
        }
        
        @Override
        public void onCompleted() {
            // 客户端离开会话
            sessions.remove(this);
        }
        
        @Override
        public void onError(Throwable t) {
            log.error("聊天出错", t);
        }
    };
}

客户端实现

java
public void chatDemo() {
    // 发起双向流调用
    StreamObserver<ChatMessage> chatStream = asyncStub.chat(
        new StreamObserver<ChatMessage>() {
            
            @Override
            public void onNext(ChatMessage message) {
                // 收到服务端消息
                System.out.printf("[%s] %s: %s%n",
                    formatTime(message.getTimestamp()),
                    message.getUserId(),
                    message.getContent());
            }
            
            @Override
            public void onCompleted() {
                System.out.println("会话结束");
            }
            
            @Override
            public void onError(Throwable t) {
                System.err.println("连接失败: " + t.getMessage());
            }
        }
    );
    
    // 模拟发送消息
    String[] messages = {
        "你好",
        "在吗?",
        "想聊聊 gRPC 流式调用",
        "双向流真强大!"
    };
    
    for (String content : messages) {
        ChatMessage message = ChatMessage.newBuilder()
            .setSessionId("session-001")
            .setUserId("user-zhang")
            .setContent(content)
            .setTimestamp(System.currentTimeMillis())
            .build();
        
        chatStream.onNext(message);
        
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            chatStream.onError(e);
            return;
        }
    }
    
    // 结束会话
    chatStream.onCompleted();
}

流式调用的底层原理

HTTP/2 的多路复用

流式调用依赖于 HTTP/2 的多路复用特性:

HTTP/1.1:每次请求需要独立的 TCP 连接(或串行使用)
┌────────────────────────────────────────────────────┐
│  Request 1 → Response 1 → Request 2 → Response 2  │
└────────────────────────────────────────────────────┘

HTTP/2:多个 Stream 共用一个 TCP 连接
┌────────────────────────────────────────────────────┐
│ Stream 1:  HEADERS ──────────────────────────────→  │
│ Stream 1:  DATA ──────────────────────────────→   │
│ Stream 1:  DATA ──────────────────────────────→   │
│ Stream 2:  HEADERS ────────────────→              │
│ Stream 2:  DATA ────────────────→                 │
│ Stream 1:  DATA ──────────────────────────────→   │
└────────────────────────────────────────────────────┘

每个 Stream 都有自己的 Stream ID,互不干扰,可以并发传输。

数据帧结构

┌──────────────────────────────────────────────────────┐
│                  HTTP/2 Frame                       │
├────────────┬────────────┬───────────┬───────────────┤
│ Length(3B)│ Type(1B)   │ Flags(1B) │ Stream ID(4B) │
├────────────┴────────────┴───────────┴───────────────┤
│                    Payload                           │
└──────────────────────────────────────────────────────┘
  • DATA 帧:传输业务数据(如 Protobuf 序列化的消息)
  • HEADERS 帧:传输元数据(如方法名、metadata)
  • WINDOW_UPDATE:流量控制

流式调用的注意事项

1. 背压(Backpressure)

当发送方速度过快,接收方处理不过来时,需要背压机制:

java
// 服务端实现时,注意处理背压
@Override
public void searchOrders(SearchRequest request,
                        StreamObserver<Order> responseObserver) {
    
    // 使用 Channel 实现背压
    Channel<Order> orderChannel = orderRepository.streamOrders(request);
    
    // 控制发送速度,不要超过客户端消费速度
    while (orderChannel.isOpen()) {
        Order order = orderChannel.poll(100, TimeUnit.MILLISECONDS);
        if (order != null) {
            responseObserver.onNext(order);
        }
    }
    
    responseObserver.onCompleted();
}

2. 超时控制

java
// 设置流式调用的超时
ManagedChannel channel = ManagedChannelBuilder
    .forAddress("localhost", 8080)
    .build();

// 普通超时
try {
    Order order = blockingStub
        .withDeadlineAfter(5, TimeUnit.SECONDS)
        .getOrder(request);
} catch (StatusRuntimeException e) {
    if (e.getStatus() == Status.DEADLINE_EXCEEDED) {
        System.err.println("调用超时");
    }
}

// 流式调用超时(每条消息都要检查)
Iterator<Order> iterator = blockingStub
    .withDeadlineAfter(30, TimeUnit.SECONDS)
    .searchOrders(request);

3. 错误处理

java
// 服务端主动报错
@Override
public void getOrder(GetOrderRequest request,
                    StreamObserver<Order> responseObserver) {
    try {
        Order order = orderRepository.findById(request.getOrderId());
        if (order == null) {
            // 返回 NOT_FOUND 错误
            responseObserver.onError(Status.NOT_FOUND
                .withDescription("订单不存在: " + request.getOrderId())
                .asRuntimeException());
            return;
        }
        responseObserver.onNext(order);
        responseObserver.onCompleted();
    } catch (Exception e) {
        // 返回 INTERNAL 错误
        responseObserver.onError(Status.INTERNAL
            .withCause(e)
            .asRuntimeException());
    }
}

总结

流类型方法使用场景
普通 RPCrpc Method(Request) returns (Response)简单请求响应
服务端流rpc Method(Request) returns (stream Response)分页数据、实时推送
客户端流rpc Method(stream Request) returns (Response)批量上传、大文件传输
双向流rpc Method(stream Request) returns (stream Response)聊天、协作、游戏

流式 RPC 是 gRPC 区别于传统 REST 的核心优势之一,合理使用可以大幅提升系统性能和实时性。


留给你的问题

假设你正在设计一个实时协作文档编辑系统,需要考虑:

  • 多个用户同时编辑同一份文档
  • 每次编辑操作需要实时同步给所有在线用户
  • 网络不稳定时需要处理重连和冲突

在这种场景下,普通 RPC 能否满足需求?如果用双向流 RPC,你需要考虑哪些问题?

这个问题,可以结合 RPC 超时控制与重试机制 来思考如何保证流式调用的可靠性。

基于 VitePress 构建