Skip to content

HBase 写入流程:WAL + MemStore + Flush

理解 HBase 的写入流程,是理解它高吞吐量的关键。


写入流程总览

┌─────────────────────────────────────────────────────────────┐
│                    HBase 写入流程                              │
│                                                             │
│  Client                                                     │
│     │                                                        │
│     ├─→ 1. 写 WAL (Write-Ahead Log)                         │
│     │     ↓                                                   │
│     ├─→ 2. 写 MemStore (内存)                                │
│     │     ↓                                                   │
│     └─→ 3. 返回成功                                          │
│                                                             │
│  后台:MemStore → Flush → HFile                              │
│                                                             │
└─────────────────────────────────────────────────────────────┘

1. WAL 写入

WAL(Write-Ahead Log)保证数据不丢失:

java
// WAL 写入(伪代码)
public class WALWrite {
    private final WAL wal;

    public void write(Put put) throws IOException {
        // 1. 构造 WAL Entry
        WALKey key = new WALKey(
            put.getRow(),
            put.getTableName(),
            put.getFamilyMap().keySet()
        );
        WALEdit edit = new WALEdit();
        for (Map.Entry<byte[], List<KeyValue>> entry : put.getFamilyMap().entrySet()) {
            for (KeyValue kv : entry.getValue()) {
                edit.add(kv);
            }
        }

        // 2. 写入 WAL(先追加,不刷盘)
        long sequenceId = wal.append(regionInfo, key, edit, true);

        // 3. 强制刷盘(可选,根据配置)
        // wal.sync();
    }
}

WAL 持久化策略

java
// WAL 持久化级别
public enum Durability {
    USE_DEFAULT,      // 使用表级默认配置
    SKIP_WAL,        // 不写 WAL(最快,但可能丢数据)
    ASYNC_WAL,       // 异步刷盘
    SYNC_WAL,        // 同步刷盘(默认)
    FSYNC_WAL        // 强制 fsync
}

// 设置表的持久化级别
TableDescriptor table = TableDescriptorBuilder
    .newBuilder(tableName)
    .setDurability(Durability.SYNC_WAL)
    .build();

2. MemStore 写入

MemStore 是内存中的数据结构,按 RowKey 排序:

java
// MemStore 写入(伪代码)
public class MemStoreWrite {
    private final ConcurrentSkipListMap<byte[], Cell> memStore;

    public void write(Put put) {
        // 1. 获取写入序列号
        long sequenceId = wal.getLastSequenceId();

        // 2. 写入 MemStore(内存中的跳表)
        for (Map.Entry<byte[], List<KeyValue>> entry : put.getFamilyMap().entrySet()) {
            for (KeyValue kv : entry.getValue()) {
                // MemStore 内部按 RowKey + CF + CQ + TS 排序
                memStore.put(kv.getKey(), kv);
            }
        }
    }
}

MemStore 的特点

  • 有序:内部使用 ConcurrentSkipListMap,按 RowKey 排序
  • 写优化:顺序写入内存,比写磁盘快几个数量级
  • 批量聚合:同一 Key 的多次写入在内存中合并

3. MemStore Flush

当 MemStore 达到阈值,触发 Flush 到磁盘:

Flush 触发条件:
┌─────────────────────────────────────────────────────────────┐
│                                                             │
│  条件1: MemStore 大小达到阈值(默认 128MB × Region 数)      │
│  条件2: WAL 大小超过阈值                                     │
│  条件3: 定时 Flush(默认 1 小时)                           │
│  条件4: 手动 Flush                                         │
│                                                             │
└─────────────────────────────────────────────────────────────┘
java
// Flush 配置
public class FlushConfig {
    // Region 级 Flush 阈值
    public static final long MEMSTORE_SIZE = 128 * 1024 * 1024L;  // 128MB

    // 全局 Flush 阈值(防止一个 Region 刷盘影响其他)
    // globalMemStoreSize = MEMSTORE_SIZE × Region 数 × 0.95
}

Flush 流程

MemStore Flush 流程:
┌─────────────────────────────────────────────────────────────┐
│                                                             │
│  1. 创建 Snapshot(快照,保存当前 MemStore 状态)           │
│                                                             │
│  2. 清空 MemStore(接受新写入)                             │
│                                                             │
│  3. 将 Snapshot 写成 HFile(后台线程)                      │
│                                                             │
│  4. 删除旧 HFile 中的废弃数据                               │
│                                                             │
│  5. 更新 Meta 信息                                          │
│                                                             │
└─────────────────────────────────────────────────────────────┘

完整写入示例

java
public class HBaseWriteFlow {
    private final Connection connection;

    public void writeData(String rowKey, String cf, String cq, String value)
            throws IOException {
        Table table = connection.getTable(TableName.valueOf("t_user"));

        // 1. 构造 Put
        Put put = new Put(Bytes.toBytes(rowKey));
        put.addColumn(
            Bytes.toBytes(cf),
            Bytes.toBytes(cq),
            Bytes.toBytes(value)
        );

        // 2. 写入(内部自动处理 WAL + MemStore)
        table.put(put);

        // 3. 或者批量写入(性能更好)
        List<Put> puts = new ArrayList<>();
        puts.add(put);
        table.put(puts);

        // 4. 关闭
        table.close();
    }

    // 异步写入(高吞吐场景)
    public void asyncWrite(List<Put> puts) throws IOException {
        Table table = connection.getTable(TableName.valueOf("t_user"));

        // 使用 AsyncRequestFutureBuilder 实现高吞吐
        AsyncTable<?> asyncTable = AsyncConnectionImpl.getAsyncTable();
        // ... 异步写入逻辑

        table.close();
    }
}

写入性能优化

1. 批量写入

java
// 批量写入 vs 单条写入
public class BatchWrite {
    // 错误:逐条写入
    public void badBatchWrite(Table table, List<Put> puts) {
        for (Put put : puts) {
            table.put(put);  // 每条一次 RPC
        }
    }

    // 正确:批量写入
    public void goodBatchWrite(Table table, List<Put> puts) {
        table.put(puts);  // 一次 RPC
    }
}

2. 关闭 WAL(高吞吐场景)

java
// 高吞吐场景可以关闭 WAL(但可能丢数据)
Put put = new Put(Bytes.toBytes(rowKey));
put.setDurability(Durability.SKIP_WAL);  // 不写 WAL
put.addColumn(Bytes.toBytes(cf), Bytes.toBytes(cq), Bytes.toBytes(value));
table.put(put);

3. 预分区

java
// 创建表时预分区,避免热点
byte[][] splitKeys = new byte[9][];
for (int i = 1; i < 10; i++) {
    splitKeys[i-1] = Bytes.toBytes("user_" + i + "0000000");
}

admin.createTable(
    TableDescriptorBuilder.newBuilder(tableName)
        .setColumnFamilies(ColumnFamilyDescriptorBuilder.of("info"))
        .build(),
    splitKeys  // 指定分区点
);

面试追问方向

  • HBase 的 MemStore 为什么需要排序?
  • Flush 和 Compaction 有什么区别?

下一节,我们来了解 HBase 的读取流程。

基于 VitePress 构建