调度引擎是 DataJump 平台的核心组件,负责管理和执行所有数据处理任务。本方案采用 Master-Worker 分布式架构,支持 DAG 任务编排、多种调度策略、故障恢复等企业级特性。
| 目标 | 指标 |
|---|---|
| 任务规模 | 支持百万级任务调度 |
| 调度延迟 | P99 < 1s |
| 高可用 | 99.99% 可用性 |
| 弹性伸缩 | 支持动态扩缩容 Worker 节点 |
┌─────────────────────────────────────────────────────────────────┐
│ API Gateway │
└─────────────────────────────┬───────────────────────────────────┘
│
┌─────────────────────────────▼───────────────────────────────────┐
│ Scheduler Service │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────────┐ │
│ │ Master │ │ Master │ │ Master │ │
│ │ (Active) │◄─┤ (Standby) │ │ (Standby) │ │
│ └──────┬──────┘ └─────────────┘ └─────────────────────────┘ │
│ │ Zookeeper 选主 │
└─────────┼───────────────────────────────────────────────────────┘
│
┌─────▼─────┐
│ Kafka │ 任务分发队列
└─────┬─────┘
│
┌─────────▼───────────────────────────────────────────────────────┐
│ Worker Cluster │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────────┐ │
│ │ Worker-1 │ │ Worker-2 │ │ Worker-N │ │
│ │ ┌─────────┐ │ │ ┌─────────┐ │ │ ┌─────────┐ │ │
│ │ │Executor │ │ │ │Executor │ │ │ │Executor │ │ │
│ │ │ Pool │ │ │ │ Pool │ │ │ │ Pool │ │ │
│ │ └─────────┘ │ │ └─────────┘ │ │ └─────────┘ │ │
│ └─────────────┘ └─────────────┘ └─────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
- DAG 解析器:解析任务依赖关系,构建执行计划
- 调度器:基于 Quartz 实现定时触发,支持 Cron 表达式
- 任务分发器:将就绪任务分发到 Worker 队列
- 状态管理器:维护任务实例状态,处理回调
- 任务消费者:从 Kafka 消费任务
- 执行器池:多线程执行任务,支持资源隔离
- 心跳上报:定期向 Master 汇报状态
- 日志采集:收集任务执行日志
// DAG 定义
public class DAG {
private Long id;
private String name;
private String description;
private List<Task> tasks;
private List<Edge> edges;
private ScheduleConfig scheduleConfig;
private AlertConfig alertConfig;
}
// 任务节点
public class Task {
private Long id;
private String name;
private TaskType type; // SQL, SHELL, SPARK, FLINK, PYTHON
private String content;
private Map<String, Object> params;
private RetryConfig retryConfig;
private ResourceConfig resourceConfig;
}
// 依赖边
public class Edge {
private Long sourceTaskId;
private Long targetTaskId;
private DependencyType type; // STRONG, WEAK
}public class DAGParser {
/**
* 拓扑排序,检测循环依赖
*/
public List<Task> topologicalSort(DAG dag) {
Map<Long, Integer> inDegree = new HashMap<>();
Map<Long, List<Long>> adjacency = new HashMap<>();
// 初始化入度和邻接表
for (Task task : dag.getTasks()) {
inDegree.put(task.getId(), 0);
adjacency.put(task.getId(), new ArrayList<>());
}
for (Edge edge : dag.getEdges()) {
inDegree.merge(edge.getTargetTaskId(), 1, Integer::sum);
adjacency.get(edge.getSourceTaskId()).add(edge.getTargetTaskId());
}
// BFS 拓扑排序
Queue<Long> queue = new LinkedList<>();
for (Map.Entry<Long, Integer> entry : inDegree.entrySet()) {
if (entry.getValue() == 0) {
queue.offer(entry.getKey());
}
}
List<Task> sorted = new ArrayList<>();
while (!queue.isEmpty()) {
Long taskId = queue.poll();
sorted.add(findTask(dag, taskId));
for (Long nextId : adjacency.get(taskId)) {
inDegree.merge(nextId, -1, Integer::sum);
if (inDegree.get(nextId) == 0) {
queue.offer(nextId);
}
}
}
if (sorted.size() != dag.getTasks().size()) {
throw new CyclicDependencyException("检测到循环依赖");
}
return sorted;
}
}| 类型 | 说明 | 示例 |
|---|---|---|
| Cron | 基于 Cron 表达式的定时调度 | 0 0 2 * * ? 每天凌晨2点 |
| 事件触发 | 基于外部事件触发 | Kafka 消息、HTTP 回调 |
| 数据就绪 | 上游数据分区就绪后触发 | Hive 分区就绪 |
| 手动触发 | 用户手动执行 | 一次性任务 |
| 依赖触发 | 上游任务完成后触发 | DAG 内部依赖 |
public class ScheduleConfig {
private ScheduleType type;
private String cronExpression;
private LocalDateTime startTime;
private LocalDateTime endTime;
private DependencyConfig dependencyConfig;
private Integer timeout; // 超时时间(分钟)
private Integer priority; // 优先级 1-10
}
public class DependencyConfig {
private DependencyStrategy strategy; // ALL_SUCCESS, ONE_SUCCESS, NONE
private List<DependencyItem> dependencies;
private Integer waitTimeout; // 等待超时
} ┌──────────────┐
│ CREATED │
└──────┬───────┘
│ submit
▼
┌──────────────┐
┌─────────│ PENDING │─────────┐
│ └──────┬───────┘ │
│ timeout │ schedule │ kill
│ ▼ │
│ ┌──────────────┐ │
│ ┌────│ QUEUED │────┐ │
│ │ └──────┬───────┘ │ │
│ │ timeout │ dispatch │kill│
│ │ ▼ │ │
│ │ ┌──────────────┐ │ │
│ │ │ RUNNING │────┼────┤
│ │ └──────┬───────┘ │ │
│ │ │ │ │
│ │ ┌──────┴──────┐ │ │
│ │ │ │ │ │
│ ▼ ▼ ▼ ▼ ▼
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ TIMEOUT │ │ SUCCESS │ │ FAILED │
└──────────────┘ └──────────────┘ └──────┬───────┘
│ retry
▼
┌──────────────┐
│ PENDING │
└──────────────┘
| 状态 | 说明 |
|---|---|
| CREATED | 任务实例已创建 |
| PENDING | 等待依赖条件满足 |
| QUEUED | 已进入调度队列 |
| RUNNING | 正在执行 |
| SUCCESS | 执行成功 |
| FAILED | 执行失败 |
| TIMEOUT | 执行超时 |
| KILLED | 被手动终止 |
调度引擎采用可插拔的任务执行器架构,支持多种任务类型的统一执行。
public interface TaskExecutor {
/**
* 获取支持的任务类型
*/
String getType();
/**
* 执行任务
* @param task 任务定义
* @param context 执行上下文(包含 bizdate、task_id 等)
* @return 执行结果
*/
TaskExecutionResult execute(Task task, Map<String, String> context);
}public class TaskExecutionResult {
private boolean success;
private int exitCode;
private String output;
private String errorMessage;
private String fullLog;
private long executionTime; // 执行耗时(毫秒)
}| 执行器 | 任务类型 | 说明 |
|---|---|---|
| ShellTaskExecutor | SHELL | 执行 Shell 脚本,支持变量替换和环境变量注入 |
| PythonTaskExecutor | PYTHON | 执行 Python 脚本,自动选择 python3/python |
| SqlTaskExecutor | SQL | 执行 SQL 语句,支持 SELECT/INSERT/UPDATE/DELETE,通过 JDBC 执行 |
@Component
public class TaskExecutorFactory {
private final Map<String, TaskExecutor> executors = new HashMap<>();
@PostConstruct
public void init() {
register(new ShellTaskExecutor());
register(new PythonTaskExecutor());
register(new SqlTaskExecutor());
}
public TaskExecutor getExecutor(String taskType) {
return executors.get(taskType.toUpperCase());
}
}- 使用 ProcessBuilder 执行命令
- 支持环境变量注入(bizdate、task_id、task_name)
- 捕获 stdout 和 stderr
- 支持执行超时(默认 1 小时)
- 完整日志记录
- 自动清理临时文件
- 自动检测 Python 解释器(python3 优先)
- 创建临时脚本文件执行
- 支持环境变量传递
- UTF-8 编码支持
- 完整的输出捕获
- 支持执行超时(默认 1 小时)
- 通过 JDBC Template 执行 SQL 语句
- 支持 SELECT、INSERT、UPDATE、DELETE 操作
- 多语句执行(按分号分隔)
- 查询结果格式化输出(限制显示 100 行)
- 返回受影响行数(DML 操作)
任务执行日志存储在 t_task_instance.log_content 字段中,支持通过 API 查询。
========== Task Execution Start ==========
Task ID: 123
Task Name: data_sync
Task Type: SHELL
Start Time: 2024-01-22 10:30:00
==========================================
[执行输出内容...]
========== Task Execution End ==========
Exit Code: 0
Duration: 1234ms
=========================================
public class RetryConfig {
private Integer maxRetryTimes; // 最大重试次数
private Integer retryInterval; // 重试间隔(秒)
private RetryStrategy strategy; // FIXED, EXPONENTIAL
private List<Integer> retryOnExitCodes; // 特定退出码时重试
}
public class RetryExecutor {
public void executeWithRetry(TaskInstance instance, RetryConfig config) {
int attempt = 0;
while (attempt <= config.getMaxRetryTimes()) {
try {
execute(instance);
return;
} catch (TaskExecutionException e) {
attempt++;
if (attempt > config.getMaxRetryTimes()) {
throw e;
}
int delay = calculateDelay(config, attempt);
Thread.sleep(delay * 1000L);
log.info("任务 {} 第 {} 次重试", instance.getId(), attempt);
}
}
}
private int calculateDelay(RetryConfig config, int attempt) {
if (config.getStrategy() == RetryStrategy.EXPONENTIAL) {
return config.getRetryInterval() * (int) Math.pow(2, attempt - 1);
}
return config.getRetryInterval();
}
}调度引擎实现了基于 Spring @Scheduled 的轻量级 Cron 调度服务,定期扫描并触发到期的 DAG。
@Service
public class DagSchedulerService {
@Scheduled(fixedRate = 60000) // 每60秒扫描一次
public void scanAndTriggerDags() {
// 查询 schedule_enabled=1 且 next_fire_time <= now 的 DAG
List<Dag> dags = dagMapper.selectScheduledDags(now);
for (Dag dag : dags) {
triggerScheduledDag(dag);
}
}
}- 定时扫描:每 60 秒查询数据库,找出
schedule_enabled=1且next_fire_time <= 当前时间的 DAG - 触发执行:调用
DagExecutionEngine.triggerDag()创建 DAG 实例并执行 - 更新下次执行时间:根据 Cron 表达式计算下一次触发时间,更新到数据库
public class CronUtils {
// 基于 Spring CronExpression 实现
public static boolean isValid(String cronExpression);
public static LocalDateTime getNextFireTime(String cronExpression);
public static String getDescription(String cronExpression); // 可读描述
}支持 Spring Cron 6 段格式:秒 分 时 日 月 周
| 接口 | 方法 | 说明 |
|---|---|---|
/api/v1/dags/{dagId}/schedule |
GET | 获取调度配置 |
/api/v1/dags/{dagId}/schedule |
PUT | 更新调度配置 |
/api/v1/dags/{dagId}/schedule/enable |
POST | 启用调度 |
/api/v1/dags/{dagId}/schedule/disable |
POST | 禁用调度 |
/api/v1/dags/schedule/validate-cron |
POST | 验证 Cron 表达式 |
ALTER TABLE t_dag
ADD COLUMN cron_expression VARCHAR(100) COMMENT 'Cron 调度表达式',
ADD COLUMN schedule_enabled TINYINT DEFAULT 0 COMMENT '是否启用调度 0-否 1-是',
ADD COLUMN next_fire_time DATETIME COMMENT '下次执行时间';
-- 调度扫描优化索引
ALTER TABLE t_dag ADD INDEX idx_schedule (schedule_enabled, next_fire_time);┌─────────────────────────────────────────────┐
│ Zookeeper Cluster │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ ZK1 │ │ ZK2 │ │ ZK3 │ │
│ └─────────┘ └─────────┘ └─────────┘ │
└─────────────────────┬───────────────────────┘
│ 选主
┌─────────────┼─────────────┐
│ │ │
▼ ▼ ▼
┌─────────┐ ┌─────────┐ ┌─────────┐
│ Master1 │ │ Master2 │ │ Master3 │
│ (Active)│ │(Standby)│ │(Standby)│
└─────────┘ └─────────┘ └─────────┘
- 使用 Zookeeper 进行 Master 选主
- Active Master 负责调度,Standby 热备
- Master 故障时自动切换,恢复时间 < 30s
- 调度状态持久化到 MySQL,支持故障恢复
public class WorkerAutoScaler {
private final KubernetesClient k8sClient;
@Scheduled(fixedRate = 60000)
public void autoScale() {
// 获取当前队列积压
int queueSize = getQueueSize();
int currentWorkers = getCurrentWorkerCount();
int runningTasks = getRunningTaskCount();
// 计算目标 Worker 数量
int targetWorkers = calculateTargetWorkers(queueSize, runningTasks);
// 限制在 min-max 范围内
targetWorkers = Math.max(MIN_WORKERS, Math.min(MAX_WORKERS, targetWorkers));
if (targetWorkers != currentWorkers) {
scaleWorkers(targetWorkers);
log.info("Worker 扩缩容: {} -> {}", currentWorkers, targetWorkers);
}
}
}-- DAG 定义表
CREATE TABLE t_dag (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
name VARCHAR(255) NOT NULL,
description TEXT,
project_id BIGINT NOT NULL,
status TINYINT DEFAULT 1 COMMENT '1-上线 0-下线',
schedule_config JSON,
alert_config JSON,
cron_expression VARCHAR(100) COMMENT 'Cron 调度表达式',
schedule_enabled TINYINT DEFAULT 0 COMMENT '是否启用调度 0-否 1-是',
next_fire_time DATETIME COMMENT '下次执行时间',
created_by BIGINT,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX idx_project (project_id),
INDEX idx_status (status),
INDEX idx_schedule (schedule_enabled, next_fire_time)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- 任务定义表
CREATE TABLE t_task (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
dag_id BIGINT NOT NULL,
name VARCHAR(255) NOT NULL,
type VARCHAR(50) NOT NULL COMMENT 'SQL/SHELL/SPARK/FLINK/PYTHON',
content LONGTEXT,
params JSON,
retry_config JSON,
resource_config JSON,
position_x INT,
position_y INT,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX idx_dag (dag_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- 任务依赖表
CREATE TABLE t_task_dependency (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
dag_id BIGINT NOT NULL,
source_task_id BIGINT NOT NULL,
target_task_id BIGINT NOT NULL,
dependency_type VARCHAR(20) DEFAULT 'STRONG',
INDEX idx_dag (dag_id),
INDEX idx_source (source_task_id),
INDEX idx_target (target_task_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- DAG 实例表
CREATE TABLE t_dag_instance (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
dag_id BIGINT NOT NULL,
schedule_time DATETIME NOT NULL COMMENT '调度时间',
status TINYINT NOT NULL COMMENT '状态',
start_time DATETIME,
end_time DATETIME,
trigger_type VARCHAR(20) COMMENT 'MANUAL/SCHEDULED/EVENT',
run_params JSON,
INDEX idx_dag_schedule (dag_id, schedule_time),
INDEX idx_status (status)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- 任务实例表
CREATE TABLE t_task_instance (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
task_id BIGINT NOT NULL,
dag_instance_id BIGINT NOT NULL,
status TINYINT NOT NULL,
start_time DATETIME,
end_time DATETIME,
worker_id VARCHAR(100),
retry_count INT DEFAULT 0,
log_path VARCHAR(500),
log_content LONGTEXT COMMENT '任务执行日志内容',
exit_code INT,
error_message VARCHAR(2000) COMMENT '错误信息',
INDEX idx_dag_instance (dag_instance_id),
INDEX idx_task (task_id),
INDEX idx_status_time (status, start_time)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;# 获取 DAG 列表(按项目)
GET /api/v1/dags?projectId={projectId}
# 创建 DAG
POST /api/v1/dags
Request:
name: string
description: string
projectId: number
# 获取 DAG 详情
GET /api/v1/dags/{dagId}
# 更新 DAG
PUT /api/v1/dags/{dagId}
# 删除 DAG
DELETE /api/v1/dags/{dagId}
# 获取 DAG 下的任务列表
GET /api/v1/dags/{dagId}/tasks
# 上线 DAG
POST /api/v1/dags/{dagId}/online
# 下线 DAG
POST /api/v1/dags/{dagId}/offline# 保存 DAG 画布(任务节点和依赖边)
POST /api/v1/dags/{dagId}/canvas
Request:
tasks: TaskNode[]
edges: Edge[]
# 加载 DAG 画布
GET /api/v1/dags/{dagId}/canvas# 校验 DAG(循环依赖检测)
POST /api/v1/dags/{dagId}/validate
Request:
edges: Edge[] # 画布中的边列表
- source: string # 源节点 ID
- target: string # 目标节点 ID
Response:
valid: boolean # 是否通过校验
message: string # 校验结果描述使用 DFS 深度优先搜索检测图中是否存在环。空边列表或无 edges 字段时直接返回校验通过。
# 手动触发 DAG
POST /api/v1/dags/{dagId}/trigger
Request:
params: object # 可选运行参数
# 获取 DAG 执行历史
GET /api/v1/dags/{dagId}/instances
Query:
limit: number # 默认 20
# 获取 DAG 实例详情
GET /api/v1/dags/instances/{instanceId}
# 获取 DAG 实例状态
GET /api/v1/dags/instances/{instanceId}/status
# 获取任务实例详情
GET /api/v1/dags/task-instances/{taskInstanceId}
# 获取任务实例日志
GET /api/v1/dags/task-instances/{taskInstanceId}/log# 获取 DAG 调度配置
GET /api/v1/dags/{dagId}/schedule
Response:
dagId: number
cronExpression: string
cronDescription: string
enabled: boolean
nextFireTime: datetime
# 更新调度配置
PUT /api/v1/dags/{dagId}/schedule
Request:
cronExpression: string
enabled: boolean
# 启用调度
POST /api/v1/dags/{dagId}/schedule/enable
Request:
cronExpression: string
# 禁用调度
POST /api/v1/dags/{dagId}/schedule/disable
# 验证 Cron 表达式
POST /api/v1/dags/schedule/validate-cron
Request:
cronExpression: string
Response:
valid: boolean
description: string
nextFireTime: datetime- 分片调度:按 DAG ID 分片,多 Master 并行调度
- 批量操作:批量更新任务状态,减少数据库压力
- 缓存:DAG 定义缓存到 Redis,减少数据库查询
- 异步处理:任务分发、状态更新异步化
- 任务预热:提前拉取即将执行的任务
- 资源池化:执行器线程池复用
- 本地缓存:Worker 本地缓存任务脚本
| 指标 | 说明 | 告警阈值 |
|---|---|---|
| scheduler.dag.running | 运行中 DAG 数量 | - |
| scheduler.task.queued | 队列中任务数量 | > 10000 |
| scheduler.task.running | 运行中任务数量 | - |
| scheduler.dispatch.latency | 调度延迟 | P99 > 1s |
| scheduler.task.success_rate | 任务成功率 | < 95% |
| worker.cpu.usage | Worker CPU 使用率 | > 80% |
| worker.memory.usage | Worker 内存使用率 | > 80% |