Skip to content

HBase Java API:与 HBase 交互

HBase 提供了丰富的 Java API,让我们来看看常见操作。


连接管理

1. ConnectionFactory

java
// 创建连接(推荐方式)
public class HBaseConnection {
    public static Connection createConnection() throws IOException {
        Configuration config = HBaseConfiguration.create();

        // ZooKeeper 配置
        config.set("hbase.zookeeper.quorum", "zk1,zk2,zk3");
        config.set("hbase.zookeeper.property.clientPort", "2181");

        // 创建连接
        Connection connection = ConnectionFactory.createConnection(config);
        return connection;
    }

    // 使用完记得关闭
    public static void close(Connection connection) throws IOException {
        if (connection != null && !connection.isClosed()) {
            connection.close();
        }
    }
}

2. Connection Pool

java
// 连接池(高并发场景推荐)
public class HBaseConnectionPool {
    private final GenericKeyedObjectPool<Connection> pool;

    public HBaseConnectionPool(Configuration config, int poolSize) {
        pool = new GenericKeyedObjectPool<>(
            new ConnectionFactory(config), poolSize);
    }

    public Connection getConnection() throws Exception {
        return pool.borrowObject();
    }

    public void returnConnection(Connection connection) {
        try {
            pool.returnObject(connection);
        } catch (Exception e) {
            // 忽略
        }
    }
}

表操作

1. 创建表

java
// 创建表
public class TableOperations {
    private final Admin admin;

    public void createTable(String tableName) throws IOException {
        TableName tn = TableName.valueOf(tableName);

        if (admin.tableExists(tn)) {
            return;
        }

        TableDescriptor table = TableDescriptorBuilder
            .newBuilder(tn)
            .setColumnFamilies(
                ColumnFamilyDescriptorBuilder
                    .of("info")
                    .setBloomFilterType(BloomType.ROW)
                    .setCompressionType(Compression.Algorithm.SNAPPY)
                    .setBlockCacheEnabled(true)
                    .setMaxVersions(3)
                    .build(),
                ColumnFamilyDescriptorBuilder
                    .of("data")
                    .setBloomFilterType(BloomType.ROWCOL)
                    .setCompressionType(Compression.Algorithm.SNAPPY)
                    .build()
            )
            .build();

        admin.createTable(table);
    }

    // 创建预分区表
    public void createTableWithSplit(String tableName, int numRegions)
            throws IOException {
        TableName tn = TableName.valueOf(tableName);
        byte[][] splitKeys = new byte[numRegions - 1][];

        // 均匀分布的分裂点
        for (int i = 1; i < numRegions; i++) {
            splitKeys[i - 1] = Bytes.toBytes(String.format("%03d", i * 256 / numRegions));
        }

        TableDescriptor table = TableDescriptorBuilder
            .newBuilder(tn)
            .setColumnFamilies(
                ColumnFamilyDescriptorBuilder.of("info")
            )
            .build();

        admin.createTable(table, splitKeys);
    }
}

2. 写入数据

java
// 写入数据
public class WriteOperations {
    private final Table table;

    public void putData(String rowKey, String cf, String cq, String value)
            throws IOException {
        Put put = new Put(Bytes.toBytes(rowKey));
        put.addColumn(
            Bytes.toBytes(cf),
            Bytes.toBytes(cq),
            Bytes.toBytes(value)
        );
        table.put(put);
    }

    // 批量写入
    public void batchPut(List<Put> puts) throws IOException {
        Object[] results = new Object[puts.size()];
        table.batch(puts, results);
    }

    // 原子递增
    public long increment(String rowKey, String cf, String cq)
            throws IOException {
        Increment increment = new Increment(Bytes.toBytes(rowKey));
        increment.addColumn(
            Bytes.toBytes(cf),
            Bytes.toBytes(cq),
            1L
        );
        Result result = table.increment(increment);
        return Bytes.toLong(result.getValue(
            Bytes.toBytes(cf),
            Bytes.toBytes(cq)
        ));
    }

    // Check-And-Put(原子操作)
    public void checkAndPut(String rowKey, String cf, String cq,
                            String expectedValue, String newValue)
            throws IOException {
        Put put = new Put(Bytes.toBytes(rowKey));
        put.addColumn(
            Bytes.toBytes(cf),
            Bytes.toBytes(cq),
            Bytes.toBytes(newValue)
        );

        table.checkAndPut(
            Bytes.toBytes(rowKey),
            Bytes.toBytes(cf),
            Bytes.toBytes(cq),
            Bytes.toBytes(expectedValue),
            put
        );
    }
}

3. 读取数据

java
// 读取数据
public class ReadOperations {
    private final Table table;

    // 读取单条
    public String get(String rowKey, String cf, String cq)
            throws IOException {
        Get get = new Get(Bytes.toBytes(rowKey));
        get.addColumn(Bytes.toBytes(cf), Bytes.toBytes(cq));

        Result result = table.get(get);
        if (result.isEmpty()) {
            return null;
        }
        return Bytes.toString(result.getValue(
            Bytes.toBytes(cf),
            Bytes.toBytes(cq)
        ));
    }

    // 读取多版本
    public List<String> getVersions(String rowKey, String cf, String cq,
                                 int maxVersions) throws IOException {
        Get get = new Get(Bytes.toBytes(rowKey));
        get.addColumn(Bytes.toBytes(cf), Bytes.toBytes(cq));
        get.setMaxVersions(maxVersions);

        Result result = table.get(get);
        List<String> versions = new ArrayList<>();

        for (Cell cell : result.getColumnCells(
                Bytes.toBytes(cf),
                Bytes.toBytes(cq))) {
            versions.add(Bytes.toString(CellUtil.cloneValue(cell)));
        }
        return versions;
    }

    // 范围扫描
    public List<Result> scan(String startRow, String stopRow,
                             int limit) throws IOException {
        Scan scan = new Scan();
        scan.withStartRow(Bytes.toBytes(startRow));
        scan.withStopRow(Bytes.toBytes(stopRow));
        scan.setLimit(limit);

        List<Result> results = new ArrayList<>();
        try (ResultScanner scanner = table.getScanner(scan)) {
            for (Result result : scanner) {
                results.add(result);
            }
        }
        return results;
    }

    // 使用过滤器
    public List<Result> scanWithFilter(String startRow, String stopRow,
                                      String cf, String valueRegex)
            throws IOException {
        Scan scan = new Scan();
        scan.withStartRow(Bytes.toBytes(startRow));
        scan.withStopRow(Bytes.toBytes(stopRow));

        // 值过滤器
        ValueFilter filter = new ValueFilter(
            CompareOperator.EQUAL,
            new RegexStringComparator(valueRegex)
        );
        scan.setFilter(filter);

        List<Result> results = new ArrayList<>();
        try (ResultScanner scanner = table.getScanner(scan)) {
            for (Result result : scanner) {
                results.add(result);
            }
        }
        return results;
    }
}

4. 删除数据

java
// 删除数据
public class DeleteOperations {
    private final Table table;

    // 删除单列
    public void deleteColumn(String rowKey, String cf, String cq)
            throws IOException {
        Delete delete = new Delete(Bytes.toBytes(rowKey));
        delete.addColumn(
            Bytes.toBytes(cf),
            Bytes.toBytes(cq)
        );
        table.delete(delete);
    }

    // 删除所有版本
    public void deleteAllColumns(String rowKey, String cf, String cq)
            throws IOException {
        Delete delete = new Delete(Bytes.toBytes(rowKey));
        delete.addColumns(
            Bytes.toBytes(cf),
            Bytes.toBytes(cq)
        );
        table.delete(delete);
    }

    // 删除行
    public void deleteRow(String rowKey) throws IOException {
        Delete delete = new Delete(Bytes.toBytes(rowKey));
        table.delete(delete);
    }

    // 删除指定版本
    public void deleteVersion(String rowKey, String cf, String cq,
                            long timestamp) throws IOException {
        Delete delete = new Delete(Bytes.toBytes(rowKey));
        delete.addColumn(
            Bytes.toBytes(cf),
            Bytes.toBytes(cq),
            timestamp
        );
        table.delete(delete);
    }
}

完整示例

java
// 用户表 CRUD 示例
public class UserTable {
    private final Table table;

    // RowKey 设计:hash(userId) + "_" + userId
    public String designRowKey(String userId) {
        String hash = MD5(userId).substring(0, 8);
        return hash + "_" + userId;
    }

    // 保存用户
    public void saveUser(User user) throws IOException {
        String rowKey = designRowKey(user.getId());

        Put put = new Put(Bytes.toBytes(rowKey));

        // 基本信息列族
        put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"),
            Bytes.toBytes(user.getName()));
        put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("email"),
            Bytes.toBytes(user.getEmail()));
        put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("created"),
            Bytes.toBytes(user.getCreatedAt().toString()));

        // 扩展信息列族
        put.addColumn(Bytes.toBytes("extra"), Bytes.toBytes("phone"),
            Bytes.toBytes(user.getPhone()));
        put.addColumn(Bytes.toBytes("extra"), Bytes.toBytes("bio"),
            Bytes.toBytes(user.getBio()));

        table.put(put);
    }

    // 获取用户
    public User getUser(String userId) throws IOException {
        String rowKey = designRowKey(userId);

        Get get = new Get(Bytes.toBytes(rowKey));
        get.addFamily(Bytes.toBytes("info"));
        get.addFamily(Bytes.toBytes("extra"));

        Result result = table.get(get);
        if (result.isEmpty()) {
            return null;
        }

        // 解析结果
        User user = new User();
        user.setId(userId);
        user.setName(getString(result, "info", "name"));
        user.setEmail(getString(result, "info", "email"));
        user.setPhone(getString(result, "extra", "phone"));
        user.setBio(getString(result, "extra", "bio"));
        return user;
    }

    // 批量获取用户
    public List<User> getUsers(List<String> userIds) throws IOException {
        List<Get> gets = userIds.stream()
            .map(id -> {
                Get get = new Get(Bytes.toBytes(designRowKey(id)));
                get.addFamily(Bytes.toBytes("info"));
                return get;
            })
            .collect(Collectors.toList());

        Result[] results = table.get(gets);

        return IntStream.range(0, userIds.size())
            .mapToObj(i -> parseUser(userIds.get(i), results[i]))
            .collect(Collectors.toList());
    }

    // 删除用户
    public void deleteUser(String userId) throws IOException {
        String rowKey = designRowKey(userId);
        Delete delete = new Delete(Bytes.toBytes(rowKey));
        table.delete(delete);
    }
}

面试追问方向

  • HBase 的 Put 和 Delete 是如何保证原子性的?
  • 如何设计一个高效的用户 Feed 系统?

下一节,我们来了解 Phoenix SQL 层。

基于 VitePress 构建