Skip to content

ElasticJob-Lite 注册中心原理

ZooKeeper 在 ElasticJob 中扮演什么角色?

它不是简单的「存储配置」,而是整个分布式协调的核心——选举主节点、分配分片、检测故障

理解 ZooKeeper 的工作原理,是掌握 ElasticJob 的关键。

ZooKeeper 在 ElasticJob 中的作用

┌─────────────────────────────────────────────────────────────┐
│                ZooKeeper 在 ElasticJob 中的角色               │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│   ┌─────────────────────────────────────────────────────┐   │
│   │                      ZooKeeper                       │   │
│   │                                                     │   │
│   │   ┌─────────────────────────────────────────────┐  │   │
│   │   │ 1. 服务注册                                  │  │   │
│   │   │    节点启动时注册到 ZooKeeper                 │  │   │
│   │   └─────────────────────────────────────────────┘  │   │
│   │                                                     │   │
│   │   ┌─────────────────────────────────────────────┐  │   │
│   │   │ 2. 主节点选举                                │  │   │
│   │   │    选举产生 Leader 负责调度                   │  │   │
│   │   └─────────────────────────────────────────────┘  │   │
│   │                                                     │   │
│   │   ┌─────────────────────────────────────────────┐  │   │
│   │   │ 3. 分片分配                                  │  │   │
│   │   │    动态分配分片到在线节点                     │  │   │
│   │   └─────────────────────────────────────────────┘  │   │
│   │                                                     │   │
│   │   ┌─────────────────────────────────────────────┐  │   │
│   │   │ 4. 故障检测                                  │  │   │
│   │   │    节点宕机时自动重新分配                     │  │   │
│   │   └─────────────────────────────────────────────┘  │   │
│   │                                                     │   │
│   │   ┌─────────────────────────────────────────────┐  │   │
│   │   │ 5. 分布式锁                                  │  │   │
│   │   │    保证同一分片只有一个节点执行                │  │   │
│   │   └─────────────────────────────────────────────┘  │   │
│   │                                                     │   │
│   └─────────────────────────────────────────────────────┘   │
│                                                             │
└─────────────────────────────────────────────────────────────┘

节点结构

ElasticJob 在 ZooKeeper 中创建的节点结构:

/elasticjob
├── config                           # 作业配置
│   └── {jobName}                   # 作业名称
│       ├── shardingTotalCount      # 分片总数
│       ├── cron                    # cron 表达式
│       ├── shardingItemParameters  # 分片参数
│       └── jobParameter            # 作业参数

├── instances                        # 运行实例
│   └── {jobName}
│       └── {instanceId}            # 实例 ID(IP + 进程 ID)
│           └── sharding             # 该实例负责的分片
│               ├── 0                # 分片 0
│               └── 1                # 分片 1

├── sharding                        # 分片状态
│   └── {jobName}
│       └── {shardingItem}          # 分片号
│           └── instance             # 当前拥有该分片的实例

├── leader                          # 主节点选举
│   └── election
│       └── instance                # 当前主节点

├── server                          # 服务实例信息
│   └── {ip:port}
│       ├── jobs                    # 该节点运行的作业列表
│       └── status                  # 节点状态

└── locks                           # 分布式锁
    └── {jobName}
        └── {shardingItem}         # 分片锁
            └── lock                # 锁节点

服务注册机制

当一个应用节点启动时,会向 ZooKeeper 注册:

java
// 节点启动时注册
public class JobRegistry {
    
    public void register(String jobName, JobInstance instance) {
        // 1. 创建临时节点 /elasticjob/instances/{jobName}/{instanceId}
        String instancePath = "/elasticjob/instances/" + jobName 
            + "/" + instance.getInstanceId();
        
        registryCenter.persistEphemeral(instancePath, 
            JSON.toJSONString(instance));
        
        // 2. 创建服务节点 /elasticjob/server/{ip:port}
        String serverPath = "/elasticjob/server/" + instance.getIpPort();
        
        // 持久化节点,记录该节点运行了哪些作业
        registryCenter.persist(serverPath, jobName);
        
        // 3. 设置 Watch 监听节点变化
        watchJobInstancesChange(jobName);
    }
}

注册信息示例

json
// /elasticjob/instances/orderSyncJob/192.168.1.100@-@12345
{
    "instanceId": "192.168.1.100@-@12345",
    "ip": "192.168.1.100",
    "port": 12345,
    "sharding": [0, 2],  // 该实例负责分片 0 和 2
    "startTime": "2024-01-15T10:00:00"
}

主节点选举

ElasticJob 使用 ZooKeeper 的临时有序节点实现主节点选举:

java
// 主节点选举核心逻辑
public class LeaderService {
    
    private final String jobName;
    private final CoordinatorRegistryCenter registryCenter;
    
    // 选举主节点
    public void leaderElection() {
        // 1. 创建临时有序节点 /elasticjob/leader/election/instance
        // ZooKeeper 会自动生成一个递增的序号
        String instancePath = "/elasticjob/leader/election/instance";
        
        String leaderNode = registryCenter.persistEphemeralSequential(
            instancePath, 
            JSON.toJSONString(currentInstance)
        );
        
        // 2. 获取所有节点,按序号排序
        List<String> instances = registryCenter.getChildrenKeys(
            "/elasticjob/leader/election/instance"
        );
        
        // 3. 序号最小的节点成为主节点
        Collections.sort(instances);
        String smallestInstance = instances.get(0);
        
        // 4. 如果自己是主节点,设置主节点信息
        if (smallestInstance.equals(leaderNode)) {
            setLeaderHost(currentInstance);
        }
    }
    
    // 监听主节点变化
    public void addLeaderElectionListener(Runnable listener) {
        String leaderPath = "/elasticjob/leader/election/instance";
        
        // Watch 机制:当节点列表变化时,重新触发选举
        registryCenter.watch(leaderPath, () -> {
            // 主节点可能发生了变化
            leaderElection();
            listener.run();
        });
    }
}

选举过程图解

┌─────────────────────────────────────────────────────────────┐
│                    主节点选举过程                            │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│   初始状态:                                                │
│   ┌─────────────────────────────────────────────────────┐   │
│   │ /leader/election/instance                           │   │
│   │ ├── 0000000001  (Server1)                          │   │
│   │ ├── 0000000002  (Server2)                          │   │
│   │ └── 0000000003  (Server3)                          │   │
│   │                                                      │   │
│   │ 主节点:Server1 (序号最小)                            │   │
│   └─────────────────────────────────────────────────────┘   │
│                                                             │
│   Server1 宕机:                                            │
│   ┌─────────────────────────────────────────────────────┐   │
│   │ /leader/election/instance                           │   │
│   │ ├── 0000000002  (Server2)  ──▶ 成为新的主节点      │   │
│   │ └── 0000000003  (Server3)                          │   │
│   │                                                      │   │
│   │ ZooKeeper 自动删除 Server1 的临时节点                 │   │
│   │ Server2 和 Server3 收到 Watch 通知                   │   │
│   │ 重新选举,Server2 成为主节点                          │   │
│   └─────────────────────────────────────────────────────┘   │
│                                                             │
└─────────────────────────────────────────────────────────────┘

分片分配机制

主节点负责将分片分配给各个在线节点:

java
// 分片分配核心逻辑
public class ShardingService {
    
    public void sharding() {
        // 1. 获取作业配置
        int shardingTotalCount = jobConfig.getShardingTotalCount();
        
        // 2. 获取所有在线实例
        List<JobInstance> onlineInstances = getOnlineInstances();
        
        // 3. 计算分配方案(平均分配)
        Map<JobInstance, List<Integer>> allocation = 
            calculateAllocation(shardingTotalCount, onlineInstances);
        
        // 4. 更新分片状态
        for (Map.Entry<JobInstance, List<Integer>> entry : allocation.entrySet()) {
            JobInstance instance = entry.getKey();
            List<Integer> shards = entry.getValue();
            
            // 写入 /elasticjob/sharding/{jobName}/{shardId}/instance
            for (Integer shardId : shards) {
                String path = "/elasticjob/sharding/" + jobName 
                    + "/" + shardId + "/instance";
                registryCenter.persist(path, instance.getInstanceId());
            }
            
            // 更新实例的分片信息
            updateInstanceSharding(instance, shards);
        }
    }
    
    // 平均分配算法
    private Map<JobInstance, List<Integer>> calculateAllocation(
            int totalShards, List<JobInstance> instances) {
        
        Map<JobInstance, List<Integer>> result = new HashMap<>();
        
        // 每个实例应得的分片数
        int avg = totalShards / instances.size();
        int remainder = totalShards % instances.size();
        
        int currentShard = 0;
        for (int i = 0; i < instances.size(); i++) {
            JobInstance instance = instances.get(i);
            int count = avg + (i < remainder ? 1 : 0);
            
            List<Integer> shards = new ArrayList<>();
            for (int j = 0; j < count; j++) {
                shards.add(currentShard++);
            }
            
            result.put(instance, shards);
        }
        
        return result;
    }
}

分片分配示例

场景:4个分片,3台服务器

初始分配:
┌─────────────────────────────────────────────────────────────┐
│ /elasticjob/sharding/orderSyncJob/                         │
│ ├── 0/instance = "192.168.1.100@-@12345"                  │
│ ├── 1/instance = "192.168.1.101@-@12346"                  │
│ ├── 2/instance = "192.168.1.100@-@12345"                  │
│ └── 3/instance = "192.168.1.102@-@12347"                  │
│                                                             │
│ Server1 (192.168.1.100) → 分片 [0, 2]                     │
│ Server2 (192.168.1.101) → 分片 [1]                        │
│ Server3 (192.168.1.102) → 分片 [3]                        │
└─────────────────────────────────────────────────────────────┘

Server3 宕机后重新分配:
┌─────────────────────────────────────────────────────────────┐
│ Server1 (192.168.1.100) → 分片 [0, 2, 3]                  │
│ Server2 (192.168.1.101) → 分片 [1]                        │
│ Server3 已离线                                             │
└─────────────────────────────────────────────────────────────┘

节点监听机制

ZooKeeper 的 Watch 机制是实时感知节点变化的关键:

java
// 监听节点变化
public class Instance listeningService {
    
    // 监听实例变化
    public void watchInstances(String jobName) {
        String instancesPath = "/elasticjob/instances/" + jobName;
        
        // 监听子节点变化(新增或删除)
        registryCenter.watch(instancesPath, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                if (event.getType() == Event.EventType.NodeChildrenChanged) {
                    // 实例列表发生变化
                    onInstancesChanged();
                }
            }
        });
    }
    
    // 实例变化处理
    private void onInstancesChanged() {
        // 1. 重新获取在线实例
        List<String> onlineInstances = registryCenter
            .getChildrenKeys("/elasticjob/instances/" + jobName);
        
        // 2. 如果是主节点,重新分配分片
        if (isLeader()) {
            shardingService.sharding();
        }
        
        // 3. 触发故障转移
        failoverService.processFailover();
    }
    
    // 监听分片变化
    public void watchSharding(String jobName, int shardingItem) {
        String shardingPath = "/elasticjob/sharding/" + jobName 
            + "/" + shardingItem + "/instance";
        
        registryCenter.watch(shardingPath, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                // 分片所属节点发生变化
                onShardingInstanceChanged(shardingItem);
            }
        });
    }
}

Watch 机制图解

┌─────────────────────────────────────────────────────────────┐
│                    Watch 机制工作流程                        │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│   Server1 启动                                              │
│        │                                                    │
│        ▼                                                    │
│   ┌─────────────────────────────────────────────────────┐   │
│   │ ZooKeeper 创建临时节点:                              │   │
│   │ /elasticjob/instances/orderSyncJob/192.168.1.100    │   │
│   │                                                      │   │
│   │ 同时设置 Watch:                                       │   │
│   │ 「当这个节点的子节点列表变化时,通知我」               │   │
│   └─────────────────────────────────────────────────────┘   │
│        │                                                    │
│        ▼                                                    │
│   Server2、Server3 启动                                     │
│        │                                                    │
│        ▼                                                    │
│   ┌─────────────────────────────────────────────────────┐   │
│   │ ZooKeeper 检测到子节点列表变化                         │   │
│   │ 通知所有 Watch 该路径的客户端                          │   │
│   │                                                      │   │
│   │ Server1 收到通知:「有新的实例加入了」                  │   │
│   │ Server1 作为主节点,执行重新分片                        │   │
│   └─────────────────────────────────────────────────────┘   │
│                                                             │
└─────────────────────────────────────────────────────────────┘

分布式锁

ElasticJob 使用 ZooKeeper 实现分布式锁,保证同一分片只有一个节点执行:

java
// 分片分布式锁
public class ShardingLockService {
    
    private final String jobName;
    private final int shardingItem;
    private final CoordinatorRegistryCenter registryCenter;
    
    // 尝试获取分片锁
    public boolean tryLock(long timeout, TimeUnit unit) {
        String lockPath = "/elasticjob/locks/" + jobName 
            + "/" + shardingItem + "/lock";
        
        // 创建临时顺序节点
        String lockNode = registryCenter.persistEphemeralSequential(
            lockPath + "/", 
            currentInstance.getInstanceId()
        );
        
        // 获取锁列表
        List<String> locks = registryCenter.getChildrenKeys(lockPath);
        Collections.sort(locks);
        
        // 如果自己是最小的节点,获得锁
        if (lockNode.endsWith(locks.get(0))) {
            return true;
        }
        
        // 否则等待比自己序号小的节点释放锁
        String predecessor = findPredecessor(lockNode, locks);
        return waitForLock(predecessor, timeout, unit);
    }
    
    // 释放锁
    public void unlock() {
        String lockPath = "/elasticjob/locks/" + jobName 
            + "/" + shardingItem + "/lock";
        registryCenter.remove(lockPath);
    }
}

锁竞争过程

┌─────────────────────────────────────────────────────────────┐
│                    分布式锁竞争过程                          │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│   场景:分片 0 正在等待执行                                  │
│                                                             │
│   Server1 尝试获取锁                                        │
│   ┌─────────────────────────────────────────────────────┐   │
│   │ /elasticjob/locks/orderSyncJob/0/lock/              │   │
│   │ ├── 0000000001  (Server1)  ← 最小,获得锁 ✓         │   │
│   │ └── 0000000002  (Server2)                          │   │
│   │                                                      │   │
│   │ Server1 开始执行分片 0                                │   │
│   └─────────────────────────────────────────────────────┘   │
│                                                             │
│   Server1 执行完成,释放锁                                  │
│   ┌─────────────────────────────────────────────────────┐   │
│   │ /elasticjob/locks/orderSyncJob/0/lock/              │   │
│   │ └── 0000000002  (Server2)  ← 现在最小,获得锁 ✓     │   │
│   │                                                      │   │
│   │ Server2 开始执行分片 0                                │   │
│   └─────────────────────────────────────────────────────┘   │
│                                                             │
└─────────────────────────────────────────────────────────────┘

故障检测与恢复

ZooKeeper 的临时节点特性使得故障检测变得简单:

java
// 故障检测服务
public class FailoverService {
    
    public void processFailover() {
        // 1. 找出宕机的实例
        List<String> allInstances = registryCenter.getChildrenKeys(
            "/elasticjob/instances/" + jobName
        );
        
        List<String> crashedInstances = findCrashedInstances(allInstances);
        
        // 2. 处理每个宕机实例的分片
        for (String crashedInstance : crashedInstances) {
            List<Integer> shards = getInstanceShards(crashedInstance);
            
            for (Integer shard : shards) {
                // 3. 尝试抢锁
                if (tryAcquireShardingLock(shard)) {
                    // 4. 执行故障转移
                    executeFailoverJob(shard);
                }
            }
        }
    }
    
    // 检查实例是否存活
    private boolean isInstanceAlive(String instanceId) {
        // 如果实例的临时节点存在,说明实例还活着
        String instancePath = "/elasticjob/instances/" + jobName + "/" + instanceId;
        return registryCenter.isExisted(instancePath);
    }
}

总结

ZooKeeper 机制ElasticJob 应用
临时节点服务注册、故障检测
Watch节点变化监听、分片重新分配
临时顺序节点主节点选举、分布式锁
持久化节点作业配置、节点信息服务

ZooKeeper 是 ElasticJob 去中心化架构的基石——没有 ZooKeeper,就无法实现分布式协调。

思考题

ZooKeeper 本身也是分布式系统,它会不会也出现故障?

如果 ZooKeeper 集群发生脑裂(Split-Brain),ElasticJob 会发生什么?

这个问题涉及到分布式系统的一致性和可用性权衡,也是 CAP 理论的核心体现。

基于 VitePress 构建