Skip to content

Redis 管道(Pipeline)与批量操作

你需要批量读取 10000 个用户的信息。如果一个个 get:

java
for (Long userId : userIds) {
    String key = "user:" + userId;
    User user = (User) redisTemplate.opsForValue().get(key);
    result.add(user);
}

你知道这会发生什么吗?

每个 get 命令都包含:

  • 客户端发送请求(网络往返)
  • Redis 执行命令
  • 服务端返回响应(网络往返)

10000 个命令 = 至少 10000 次网络往返。如果每次往返耗时 1ms,总耗时就是 10 秒。

有更聪明的办法吗?

Redis Pipeline 的原理

Pipeline(管道)是 Redis 提供的一种批量执行命令的机制。它将多个命令打包在一起,客户端一次性发送,Redis 一次性返回所有结果。

普通模式:
Client → GET key1 → Redis → Client → GET key2 → Redis → Client → ...
          ↑ 往返 1ms           ↑ 往返 1ms           ↑ 往返 1ms

Pipeline 模式:
Client → GET key1 GET key2 GET key3 ... → Redis → [结果1 结果2 结果3 ...] → Client
          ↑ 往返 1ms                                ↑ 往返 1ms

核心改变:从 N 次网络往返,变成 1 次网络往返。

Pipeline vs 普通命令

性能对比

java
// 普通模式:逐个执行
long start1 = System.currentTimeMillis();
for (Long userId : userIds) {
    String key = "user:" + userId;
    redisTemplate.opsForValue().get(key);
}
long time1 = System.currentTimeMillis() - start1;

// Pipeline 模式:批量执行
long start2 = System.currentTimeMillis();
redisTemplate.executePipelined((RedisCallback<Object>) connection -> {
    for (Long userId : userIds) {
        String key = "user:" + userId;
        connection.stringCommands().get(key.getBytes());
    }
    return null;
});
long time2 = System.currentTimeMillis() - start2;

log.info("普通模式: {}ms, Pipeline模式: {}ms", time1, time2);

实测结果(10000 个 key):

模式耗时提升
普通模式10000ms+-
Pipeline100-200ms50-100 倍

注意事项

Pipeline 不是事务,它只是把多个命令打包发送。命令之间没有原子性保证

java
// Pipeline:非原子操作
redisTemplate.executePipelined((RedisCallback<Object>) connection -> {
    // 如果中间某个命令失败,后续命令仍会执行
    connection.stringCommands().set("key1".getBytes(), "value1".getBytes());
    connection.stringCommands().set("key2".getBytes(), "value2".getBytes());
    connection.stringCommands().set("key3".getBytes(), "value3".getBytes());
    return null;
});

Pipeline 的正确使用方式

Spring Data Redis

java
// 方式一:使用 executePipelined
List<Object> results = redisTemplate.executePipelined(
    (RedisCallback<Object>) connection -> {
        for (Long userId : userIds) {
            String key = "user:" + userId;
            connection.stringCommands().get(key.getBytes());
        }
        return null;
    }
);

// 方式二:使用 RedisTemplate 的 opsForPipeline
try (SessionCallback<Object> session = redisTemplate.execute()) {
    List<Object> results = redisTemplate.getConnectionFactory()
        .getConnection()
        .pipelined();
    
    for (Long userId : userIds) {
        String key = "user:" + userId;
        ((Jedis) redisTemplate.getConnectionFactory()
            .getConnection())
            .get(key);
    }
}

Jedis

java
Jedis jedis = jedisPool.getResource();
try {
    Pipeline pipeline = jedis.pipelined();
    
    for (Long userId : userIds) {
        String key = "user:" + userId;
        pipeline.get(key);
    }
    
    // 执行并获取结果
    List<Object> results = pipeline.syncAndReturnAll();
} finally {
    jedis.close();
}

Redisson

java
RBatch batch = redissonClient.createBatch();
for (Long userId : userIds) {
    String key = "user:" + userId;
    batch.getMap(key).getAsync("name");
}
List<Object> results = batch.execute();

批量操作的场景

场景一:批量读取

java
// 批量获取用户信息
public Map<Long, User> batchGetUsers(List<Long> userIds) {
    if (userIds == null || userIds.isEmpty()) {
        return Collections.emptyMap();
    }
    
    List<Object> results = redisTemplate.executePipelined(
        (RedisCallback<Object>) connection -> {
            for (Long userId : userIds) {
                String key = "user:" + userId;
                connection.stringCommands().get(key.getBytes());
            }
            return null;
        }
    );
    
    Map<Long, User> userMap = new HashMap<>();
    for (int i = 0; i < userIds.size(); i++) {
        String json = (String) results.get(i);
        if (json != null) {
            userMap.put(userIds.get(i), JSON.parseObject(json, User.class));
        }
    }
    
    return userMap;
}

场景二:批量写入

java
// 批量保存用户信息
public void batchSaveUsers(Map<Long, User> userMap) {
    if (userMap == null || userMap.isEmpty()) {
        return;
    }
    
    redisTemplate.executePipelined((RedisCallback<Object>) connection -> {
        for (Map.Entry<Long, User> entry : userMap.entrySet()) {
            String key = "user:" + entry.getKey();
            String value = JSON.toJSONString(entry.getValue());
            connection.stringCommands().set(key.getBytes(), value.getBytes());
        }
        return null;
    });
}

场景三:批量删除

java
// 批量删除 key
public void batchDelete(List<String> keys) {
    if (keys == null || keys.isEmpty()) {
        return;
    }
    
    redisTemplate.executePipelined((RedisCallback<Object>) connection -> {
        for (String key : keys) {
            connection.keyCommands().del(key.getBytes());
        }
        return null;
    });
}

场景四:批量设置过期时间

java
// 批量设置过期时间
public void batchExpire(Map<String, Long> keyExpireMap) {
    redisTemplate.executePipelined((RedisCallback<Object>) connection -> {
        for (Map.Entry<String, Long> entry : keyExpireMap.entrySet()) {
            String key = entry.getKey();
            long expireSeconds = entry.getValue();
            connection.keyCommands().expire(key.getBytes(), expireSeconds);
        }
        return null;
    });
}

Pipeline 的注意事项

1. 不要一次打包太多命令

Pipeline 虽然高效,但如果一次发送太多命令:

  • 占用大量客户端内存
  • Redis 处理时间变长
  • 网络传输时间变长
  • 如果 Redis 重启,部分命令可能丢失

建议:每批 1000-5000 个命令,分批执行。

java
public void batchGetInChunks(List<Long> userIds) {
    List<Object> allResults = new ArrayList<>();
    
    // 分批处理,每批 1000 个
    List<List<Long>> chunks = Lists.partition(userIds, 1000);
    for (List<Long> chunk : chunks) {
        List<Object> results = redisTemplate.executePipelined(
            (RedisCallback<Object>) connection -> {
                for (Long userId : chunk) {
                    String key = "user:" + userId;
                    connection.stringCommands().get(key.getBytes());
                }
                return null;
            }
        );
        allResults.addAll(results);
    }
}

2. Pipeline 不支持事务

如果你需要原子性操作(要么都执行,要么都不执行),应该使用 Redis 事务:

java
// Redis 事务
List<Object> results = redisTemplate.execute(new SessionCallback<List<Object>>() {
    @Override
    @SuppressWarnings("unchecked")
    public List<Object> execute(RedisOperations operations) throws DataAccessException {
        operations.multi();  // 开启事务
        for (Long userId : userIds) {
            String key = "user:" + userId;
            operations.opsForValue().get(key);
        }
        return operations.exec();  // 执行事务
    }
});

但事务模式下,命令不会立即执行,而是等 exec() 才一起执行,所以性能不如 Pipeline。

3. Pipeline 的返回顺序

Pipeline 返回的结果顺序与发送顺序一致。如果某个 key 不存在,对应位置返回 null

批量操作的最佳实践

读取策略

java
public Map<Long, User> getUsers(List<Long> userIds) {
    if (userIds == null || userIds.isEmpty()) {
        return Collections.emptyMap();
    }
    
    // 1. 先从 Redis Pipeline 批量获取
    Map<Long, User> cached = batchGetUsers(userIds);
    
    // 2. 找出未命中的 key
    List<Long> missIds = userIds.stream()
        .filter(id -> !cached.containsKey(id))
        .collect(Collectors.toList());
    
    // 3. 未命中的从数据库加载
    if (!missIds.isEmpty()) {
        Map<Long, User> fromDb = loadUsersFromDb(missIds);
        cached.putAll(fromDb);
        
        // 4. 回填 Redis(异步,不阻塞)
        CompletableFuture.runAsync(() -> batchSaveUsers(fromDb));
    }
    
    return cached;
}

写入策略

java
public void saveUsers(Map<Long, User> userMap) {
    if (userMap == null || userMap.isEmpty()) {
        return;
    }
    
    // 批量写入 Redis
    batchSaveUsers(userMap);
    
    // 如果需要,同时写入数据库
    saveUsersToDb(userMap);
}

总结

Pipeline 是 Redis 批量操作的利器:

  1. 核心优势:将 N 次网络往返减少到 1 次,性能提升 50-100 倍
  2. 适用场景:批量读取、批量写入、批量删除
  3. 注意事项:不要一次打包太多命令,Pipeline 不是事务
  4. 最佳实践:分批处理 + 结合本地缓存

留给你的问题

假设你需要实现一个用户积分批量查询接口:

  • 输入:1000 个用户 ID
  • 输出:每个用户的积分
  • 要求:响应时间 < 100ms

请思考:

  1. 如果 Redis 有 1000 个用户积分 key,使用 Pipeline 批量获取,大约需要多长时间?(假设 Redis 命令执行 0.1ms,网络往返 1ms)
  2. 如果要求响应时间 < 100ms,在网络延迟较高的情况下(如 10ms),Pipeline 还够用吗?有什么替代方案?
  3. 如果用户积分需要实时准确(不允许本地缓存),还有什么优化手段?

这道题的关键在于理解网络延迟对系统性能的影响,以及如何通过架构优化来弥补。

基于 VitePress 构建