Skip to content

XXL-Job 任务编排

任务 A 跑完后才能跑任务 B? 任务 C 需要等任务 A 和任务 B 都完成?

这种任务之间的依赖关系,叫做任务编排

今天来看看 XXL-Job 是如何实现任务编排的。

场景引入

┌─────────────────────────────────────────────────────────────┐
│                    典型业务场景                              │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│   每日数据报表生成:                                          │
│                                                             │
│   ┌─────────┐                                              │
│   │ 同步订单 │ ──▶ [完成]                                   │
│   └────┬────┘                                              │
│        │                                                   │
│        ▼                                                   │
│   ┌─────────┐                                              │
│   │ 统计报表 │ ──▶ [需要等订单同步完成]                      │
│   └────┬────┘                                              │
│        │                                                   │
│        ▼                                                   │
│   ┌─────────┐                                              │
│   │ 生成PDF │ ──▶ [需要等统计完成]                          │
│   └────┬────┘                                              │
│        │                                                   │
│        ▼                                                   │
│   ┌─────────┐                                              │
│   │ 发送邮件 │ ──▶ [需要等PDF生成]                          │
│   └─────────┘                                              │
│                                                             │
└─────────────────────────────────────────────────────────────┘

子任务机制

XXL-Job 支持配置「子任务」,当主任务执行完成后,自动触发子任务。

配置子任务

在调度中心配置:

主任务:生成日报
├── 子任务1:同步订单数据
├── 子任务2:同步用户数据
└── 子任务3:同步商品数据

执行流程

┌─────────────────────────────────────────────────────────────┐
│                    子任务执行流程                            │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│   1. 主任务触发                                             │
│      ┌─────────────────────────────────────────────────┐   │
│      │ 任务A 执行中...                                   │   │
│      └─────────────────────────────────────────────────┘   │
│                           │                                 │
│                           ▼                                 │
│   2. 主任务完成                                           │
│      ┌─────────────────────────────────────────────────┐   │
│      │ UPDATE xxl_job_log SET handle_code = 200       │   │
│      │ WHERE id = :logId                               │   │
│      └─────────────────────────────────────────────────┘   │
│                           │                                 │
│                           ▼                                 │
│   3. 查询子任务                                           │
│      ┌─────────────────────────────────────────────────┐   │
│      │ SELECT * FROM xxl_job_info                       │   │
│      │ WHERE parent_job_id = :jobId                     │   │
│      └─────────────────────────────────────────────────┘   │
│                           │                                 │
│                           ▼                                 │
│   4. 触发子任务                                           │
│      ┌─────────────────────────────────────────────────┐   │
│      │ 任务B.trigger()                                  │   │
│      │ 任务C.trigger()                                  │   │
│      │ 任务D.trigger()                                  │   │
│      └─────────────────────────────────────────────────┘   │
│                                                             │
└─────────────────────────────────────────────────────────────┘

代码实现

java
public class ChildJobService {
    
    public void onJobSuccess(long jobId, long logId) {
        // 1. 查询子任务
        List<JobInfo> childJobs = jobInfoDao.findByParentJobId(jobId);
        
        for (JobInfo childJob : childJobs) {
            // 2. 为每个子任务创建新的执行记录
            XxlJobLog childLog = new XxlJobLog();
            childLog.setJobId(childJob.getId());
            childLog.setTriggerTime(new Date());
            childLog.setTriggerStatus(TriggerStatus.BE_TRIGGER_CODE.getCode());
            logDao.save(childLog);
            
            // 3. 触发子任务
            triggerJob(childJob, childLog.getId(), TriggerTypeEnum.PARENT_TRIGGER);
        }
    }
}

任务依赖

配置任务依赖

XXL-Job 支持配置任务依赖:

任务A:数据同步(无依赖)
任务B:数据清洗(依赖任务A)
任务C:数据统计(依赖任务B)
任务D:报表生成(依赖任务A和任务B)

依赖判断逻辑

java
public class JobDependencyChecker {
    
    public boolean canExecute(JobInfo job) {
        // 1. 获取任务的所有依赖
        List<Long> parentJobIds = job.getParentJobIds();
        
        if (parentJobIds == null || parentJobIds.isEmpty()) {
            return true;  // 没有依赖,可以执行
        }
        
        // 2. 检查每个依赖任务的状态
        for (Long parentJobId : parentJobIds) {
            JobInfo parentJob = jobInfoDao.findById(parentJobId);
            
            // 3. 检查最近一次执行结果
            XxlJobLog lastLog = jobLogDao.findLastJobLog(parentJobId);
            
            if (lastLog == null) {
                return false;  // 从未执行过
            }
            
            if (lastLog.getHandleCode() != ReturnT.SUCCESS_CODE) {
                return false;  // 上次执行失败
            }
        }
        
        return true;  // 所有依赖都成功,可以执行
    }
}

依赖检测时机

┌─────────────────────────────────────────────────────────────┐
│                    依赖检测时机                              │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│   检测点1:触发时检测                                        │
│   ┌─────────────────────────────────────────────────────┐  │
│   │ if (!canExecute(job)) {                             │  │
│   │     log.warn("依赖任务未成功,跳过执行");             │  │
│   │     return;                                         │  │
│   │ }                                                    │  │
│   │ executor.run();                                      │  │
│   └─────────────────────────────────────────────────────┘  │
│                                                             │
│   检测点2:执行前检测                                        │
│   ┌─────────────────────────────────────────────────────┐  │
│   │ public void execute() {                              │  │
│   │     if (!canExecute(job)) {                          │  │
│   │         throw new RuntimeException("依赖任务未成功");   │  │
│   │     }                                                 │  │
│   │     // 执行业务逻辑                                   │  │
│   │ }                                                    │  │
│   └─────────────────────────────────────────────────────┘  │
│                                                             │
└─────────────────────────────────────────────────────────────┘

执行顺序控制

简单顺序(子任务)

A(主任务)
  └─▶ B
  └─▶ C
  └─▶ D

执行顺序:A → B, C, D(并发)

链式顺序

A → B → C → D

执行顺序:A → B → C → D(串行)

实现方式:

java
public class ChainJobExecutor {
    
    public void executeChain(List<Long> jobIds) {
        for (Long jobId : jobIds) {
            // 执行当前任务
            executeJob(jobId);
            
            // 检查执行结果
            JobLog lastLog = jobLogDao.findLastJobLog(jobId);
            if (lastLog.getHandleCode() != ReturnT.SUCCESS_CODE) {
                throw new RuntimeException("任务执行失败:" + jobId);
            }
        }
    }
}

DAG 顺序(有向无环图)

     ┌─────────┐
     │    A    │
     └────┬────┘

    ┌─────┴─────┐
    ▼           ▼
┌───────┐   ┌───────┐
│   B   │   │   C   │
└───┬───┘   └───┬───┘
    │           │
    └─────┬─────┘


      ┌───────┐
      │   D   │
      └───────┘

执行顺序:
1. A
2. B, C(等 A 完成)
3. D(等 B, C 都完成)

XXL-Job 通过子任务 + 依赖检测实现 DAG 顺序:

任务A:无依赖
任务B:依赖任务A
任务C:依赖任务A
任务D:依赖任务B 和任务C

任务编排的最佳实践

实践一:避免深层依赖

❌ 不推荐:深层依赖
A → B → C → D → E → F → G

问题:
· 一个任务失败会导致整条链失败
· 调试困难
· 性能差(串行执行)

✅ 推荐:扁平依赖
A ──┬──▶ D
B ──┘
C ──┬──▶ E
    └──▶ F

特点:
· 失败影响范围小
· 可以并行执行
· 易于调试

实践二:设置合理的超时时间

java
public class ComplexJob implements XxlJobSimpleJob {
    
    @Override
    public void execute() throws Exception {
        long startTime = System.currentTimeMillis();
        long maxTime = 30 * 60 * 1000;  // 30 分钟
        
        // 任务1:数据同步
        syncData();
        checkTimeout(startTime, maxTime);
        
        // 任务2:数据清洗
        cleanData();
        checkTimeout(startTime, maxTime);
        
        // 任务3:生成报表
        generateReport();
    }
    
    private void checkTimeout(long startTime, long maxTime) {
        if (System.currentTimeMillis() - startTime > maxTime) {
            throw new RuntimeException("任务执行超时");
        }
    }
}

实践三:完善的错误处理

java
public class ResilientJob implements XxlJobSimpleJob {
    
    @Override
    public void execute() throws Exception {
        try {
            // 任务1:同步数据
            syncOrders();
        } catch (Exception e) {
            XxlJobHelper.log("订单同步失败:" + e.getMessage());
            // 记录失败,但继续执行后续任务
        }
        
        try {
            // 任务2:统计汇总(依赖任务1,但容忍部分失败)
            statistics();
        } catch (Exception e) {
            XxlJobHelper.log("统计失败:" + e.getMessage());
            // 记录失败,但继续执行
        }
        
        // 任务3:发送通知(无论如何都要执行)
        try {
            sendNotification();
        } catch (Exception e) {
            XxlJobHelper.log("通知发送失败:" + e.getMessage());
            // 这个失败应该告警
            XxlJobHelper.handleFail("通知发送失败");
        }
    }
}

任务编排 vs 工作流引擎

维度XXL-Job 任务编排专业工作流引擎
适用场景简单的任务依赖复杂业务流程
配置方式UI 配置BPMN 文件
可视化简单树状完整流程图
状态管理基础完善
补偿机制支持
人工审批支持
学习成本

总结

功能说明适用场景
子任务主任务完成后触发子任务并行执行多个相关任务
任务依赖检查依赖任务状态后再执行顺序执行,有前置条件
阻塞策略控制并发行为不可并发的任务
超时控制防止任务无限执行长耗时任务

思考题

如果任务 A 有 3 个子任务 B、C、D,其中 C 执行失败了,XXL-Job 会怎么处理?

是继续执行 D,还是整个流程都标记为失败?

基于 VitePress 构建