Skip to content

Latest commit

 

History

History
738 lines (604 loc) · 24.1 KB

File metadata and controls

738 lines (604 loc) · 24.1 KB

调度引擎技术方案

1. 概述

调度引擎是 DataJump 平台的核心组件,负责管理和执行所有数据处理任务。本方案采用 Master-Worker 分布式架构,支持 DAG 任务编排、多种调度策略、故障恢复等企业级特性。

2. 设计目标

目标 指标
任务规模 支持百万级任务调度
调度延迟 P99 < 1s
高可用 99.99% 可用性
弹性伸缩 支持动态扩缩容 Worker 节点

3. 系统架构

3.1 整体架构

┌─────────────────────────────────────────────────────────────────┐
│                        API Gateway                               │
└─────────────────────────────┬───────────────────────────────────┘
                              │
┌─────────────────────────────▼───────────────────────────────────┐
│                      Scheduler Service                           │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────────────────┐  │
│  │   Master    │  │   Master    │  │        Master           │  │
│  │  (Active)   │◄─┤  (Standby)  │  │       (Standby)         │  │
│  └──────┬──────┘  └─────────────┘  └─────────────────────────┘  │
│         │              Zookeeper 选主                            │
└─────────┼───────────────────────────────────────────────────────┘
          │
    ┌─────▼─────┐
    │   Kafka   │  任务分发队列
    └─────┬─────┘
          │
┌─────────▼───────────────────────────────────────────────────────┐
│                       Worker Cluster                             │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────────────────┐  │
│  │  Worker-1   │  │  Worker-2   │  │       Worker-N          │  │
│  │ ┌─────────┐ │  │ ┌─────────┐ │  │     ┌─────────┐         │  │
│  │ │Executor │ │  │ │Executor │ │  │     │Executor │         │  │
│  │ │  Pool   │ │  │ │  Pool   │ │  │     │  Pool   │         │  │
│  │ └─────────┘ │  │ └─────────┘ │  │     └─────────┘         │  │
│  └─────────────┘  └─────────────┘  └─────────────────────────┘  │
└─────────────────────────────────────────────────────────────────┘

3.2 核心组件

Master 节点

  • DAG 解析器:解析任务依赖关系,构建执行计划
  • 调度器:基于 Quartz 实现定时触发,支持 Cron 表达式
  • 任务分发器:将就绪任务分发到 Worker 队列
  • 状态管理器:维护任务实例状态,处理回调

Worker 节点

  • 任务消费者:从 Kafka 消费任务
  • 执行器池:多线程执行任务,支持资源隔离
  • 心跳上报:定期向 Master 汇报状态
  • 日志采集:收集任务执行日志

4. 核心功能设计

4.1 DAG 任务编排

数据模型

// DAG 定义
public class DAG {
    private Long id;
    private String name;
    private String description;
    private List<Task> tasks;
    private List<Edge> edges;
    private ScheduleConfig scheduleConfig;
    private AlertConfig alertConfig;
}

// 任务节点
public class Task {
    private Long id;
    private String name;
    private TaskType type;  // SQL, SHELL, SPARK, FLINK, PYTHON
    private String content;
    private Map<String, Object> params;
    private RetryConfig retryConfig;
    private ResourceConfig resourceConfig;
}

// 依赖边
public class Edge {
    private Long sourceTaskId;
    private Long targetTaskId;
    private DependencyType type;  // STRONG, WEAK
}

DAG 解析算法

public class DAGParser {
    /**
     * 拓扑排序,检测循环依赖
     */
    public List<Task> topologicalSort(DAG dag) {
        Map<Long, Integer> inDegree = new HashMap<>();
        Map<Long, List<Long>> adjacency = new HashMap<>();

        // 初始化入度和邻接表
        for (Task task : dag.getTasks()) {
            inDegree.put(task.getId(), 0);
            adjacency.put(task.getId(), new ArrayList<>());
        }

        for (Edge edge : dag.getEdges()) {
            inDegree.merge(edge.getTargetTaskId(), 1, Integer::sum);
            adjacency.get(edge.getSourceTaskId()).add(edge.getTargetTaskId());
        }

        // BFS 拓扑排序
        Queue<Long> queue = new LinkedList<>();
        for (Map.Entry<Long, Integer> entry : inDegree.entrySet()) {
            if (entry.getValue() == 0) {
                queue.offer(entry.getKey());
            }
        }

        List<Task> sorted = new ArrayList<>();
        while (!queue.isEmpty()) {
            Long taskId = queue.poll();
            sorted.add(findTask(dag, taskId));

            for (Long nextId : adjacency.get(taskId)) {
                inDegree.merge(nextId, -1, Integer::sum);
                if (inDegree.get(nextId) == 0) {
                    queue.offer(nextId);
                }
            }
        }

        if (sorted.size() != dag.getTasks().size()) {
            throw new CyclicDependencyException("检测到循环依赖");
        }

        return sorted;
    }
}

4.2 调度策略

支持的调度类型

类型 说明 示例
Cron 基于 Cron 表达式的定时调度 0 0 2 * * ? 每天凌晨2点
事件触发 基于外部事件触发 Kafka 消息、HTTP 回调
数据就绪 上游数据分区就绪后触发 Hive 分区就绪
手动触发 用户手动执行 一次性任务
依赖触发 上游任务完成后触发 DAG 内部依赖

调度配置

public class ScheduleConfig {
    private ScheduleType type;
    private String cronExpression;
    private LocalDateTime startTime;
    private LocalDateTime endTime;
    private DependencyConfig dependencyConfig;
    private Integer timeout;  // 超时时间(分钟)
    private Integer priority;  // 优先级 1-10
}

public class DependencyConfig {
    private DependencyStrategy strategy;  // ALL_SUCCESS, ONE_SUCCESS, NONE
    private List<DependencyItem> dependencies;
    private Integer waitTimeout;  // 等待超时
}

4.3 任务状态机

                    ┌──────────────┐
                    │   CREATED    │
                    └──────┬───────┘
                           │ submit
                           ▼
                    ┌──────────────┐
          ┌─────────│   PENDING    │─────────┐
          │         └──────┬───────┘         │
          │ timeout        │ schedule        │ kill
          │                ▼                 │
          │         ┌──────────────┐         │
          │    ┌────│   QUEUED     │────┐    │
          │    │    └──────┬───────┘    │    │
          │    │ timeout   │ dispatch   │kill│
          │    │           ▼            │    │
          │    │    ┌──────────────┐    │    │
          │    │    │   RUNNING    │────┼────┤
          │    │    └──────┬───────┘    │    │
          │    │           │            │    │
          │    │    ┌──────┴──────┐     │    │
          │    │    │             │     │    │
          │    ▼    ▼             ▼     ▼    ▼
       ┌──────────────┐    ┌──────────────┐  ┌──────────────┐
       │   TIMEOUT    │    │   SUCCESS    │  │   FAILED     │
       └──────────────┘    └──────────────┘  └──────┬───────┘
                                                    │ retry
                                                    ▼
                                            ┌──────────────┐
                                            │   PENDING    │
                                            └──────────────┘

状态定义

状态 说明
CREATED 任务实例已创建
PENDING 等待依赖条件满足
QUEUED 已进入调度队列
RUNNING 正在执行
SUCCESS 执行成功
FAILED 执行失败
TIMEOUT 执行超时
KILLED 被手动终止

4.4 任务执行器框架

调度引擎采用可插拔的任务执行器架构,支持多种任务类型的统一执行。

执行器接口

public interface TaskExecutor {
    /**
     * 获取支持的任务类型
     */
    String getType();

    /**
     * 执行任务
     * @param task 任务定义
     * @param context 执行上下文(包含 bizdate、task_id 等)
     * @return 执行结果
     */
    TaskExecutionResult execute(Task task, Map<String, String> context);
}

执行结果

public class TaskExecutionResult {
    private boolean success;
    private int exitCode;
    private String output;
    private String errorMessage;
    private String fullLog;
    private long executionTime;  // 执行耗时(毫秒)
}

内置执行器

执行器 任务类型 说明
ShellTaskExecutor SHELL 执行 Shell 脚本,支持变量替换和环境变量注入
PythonTaskExecutor PYTHON 执行 Python 脚本,自动选择 python3/python
SqlTaskExecutor SQL 执行 SQL 语句,支持 SELECT/INSERT/UPDATE/DELETE,通过 JDBC 执行

执行器工厂

@Component
public class TaskExecutorFactory {
    private final Map<String, TaskExecutor> executors = new HashMap<>();

    @PostConstruct
    public void init() {
        register(new ShellTaskExecutor());
        register(new PythonTaskExecutor());
        register(new SqlTaskExecutor());
    }

    public TaskExecutor getExecutor(String taskType) {
        return executors.get(taskType.toUpperCase());
    }
}

Shell 执行器特性

  • 使用 ProcessBuilder 执行命令
  • 支持环境变量注入(bizdate、task_id、task_name)
  • 捕获 stdout 和 stderr
  • 支持执行超时(默认 1 小时)
  • 完整日志记录
  • 自动清理临时文件

Python 执行器特性

  • 自动检测 Python 解释器(python3 优先)
  • 创建临时脚本文件执行
  • 支持环境变量传递
  • UTF-8 编码支持
  • 完整的输出捕获
  • 支持执行超时(默认 1 小时)

SQL 执行器特性

  • 通过 JDBC Template 执行 SQL 语句
  • 支持 SELECT、INSERT、UPDATE、DELETE 操作
  • 多语句执行(按分号分隔)
  • 查询结果格式化输出(限制显示 100 行)
  • 返回受影响行数(DML 操作)

4.5 任务日志管理

任务执行日志存储在 t_task_instance.log_content 字段中,支持通过 API 查询。

日志内容格式

========== Task Execution Start ==========
Task ID: 123
Task Name: data_sync
Task Type: SHELL
Start Time: 2024-01-22 10:30:00
==========================================

[执行输出内容...]

========== Task Execution End ==========
Exit Code: 0
Duration: 1234ms
=========================================

4.6 重试机制

public class RetryConfig {
    private Integer maxRetryTimes;      // 最大重试次数
    private Integer retryInterval;       // 重试间隔(秒)
    private RetryStrategy strategy;      // FIXED, EXPONENTIAL
    private List<Integer> retryOnExitCodes;  // 特定退出码时重试
}

public class RetryExecutor {
    public void executeWithRetry(TaskInstance instance, RetryConfig config) {
        int attempt = 0;
        while (attempt <= config.getMaxRetryTimes()) {
            try {
                execute(instance);
                return;
            } catch (TaskExecutionException e) {
                attempt++;
                if (attempt > config.getMaxRetryTimes()) {
                    throw e;
                }

                int delay = calculateDelay(config, attempt);
                Thread.sleep(delay * 1000L);

                log.info("任务 {} 第 {} 次重试", instance.getId(), attempt);
            }
        }
    }

    private int calculateDelay(RetryConfig config, int attempt) {
        if (config.getStrategy() == RetryStrategy.EXPONENTIAL) {
            return config.getRetryInterval() * (int) Math.pow(2, attempt - 1);
        }
        return config.getRetryInterval();
    }
}

4.7 Cron 调度服务

调度引擎实现了基于 Spring @Scheduled 的轻量级 Cron 调度服务,定期扫描并触发到期的 DAG。

调度服务架构

@Service
public class DagSchedulerService {

    @Scheduled(fixedRate = 60000) // 每60秒扫描一次
    public void scanAndTriggerDags() {
        // 查询 schedule_enabled=1 且 next_fire_time <= now 的 DAG
        List<Dag> dags = dagMapper.selectScheduledDags(now);
        for (Dag dag : dags) {
            triggerScheduledDag(dag);
        }
    }
}

调度流程

  1. 定时扫描:每 60 秒查询数据库,找出 schedule_enabled=1next_fire_time <= 当前时间 的 DAG
  2. 触发执行:调用 DagExecutionEngine.triggerDag() 创建 DAG 实例并执行
  3. 更新下次执行时间:根据 Cron 表达式计算下一次触发时间,更新到数据库

Cron 工具类

public class CronUtils {
    // 基于 Spring CronExpression 实现
    public static boolean isValid(String cronExpression);
    public static LocalDateTime getNextFireTime(String cronExpression);
    public static String getDescription(String cronExpression);  // 可读描述
}

支持 Spring Cron 6 段格式:秒 分 时 日 月 周

调度管理 API

接口 方法 说明
/api/v1/dags/{dagId}/schedule GET 获取调度配置
/api/v1/dags/{dagId}/schedule PUT 更新调度配置
/api/v1/dags/{dagId}/schedule/enable POST 启用调度
/api/v1/dags/{dagId}/schedule/disable POST 禁用调度
/api/v1/dags/schedule/validate-cron POST 验证 Cron 表达式

数据库字段

ALTER TABLE t_dag
    ADD COLUMN cron_expression VARCHAR(100) COMMENT 'Cron 调度表达式',
    ADD COLUMN schedule_enabled TINYINT DEFAULT 0 COMMENT '是否启用调度 0-否 1-是',
    ADD COLUMN next_fire_time DATETIME COMMENT '下次执行时间';

-- 调度扫描优化索引
ALTER TABLE t_dag ADD INDEX idx_schedule (schedule_enabled, next_fire_time);

4.8 高可用设计

Master 高可用

┌─────────────────────────────────────────────┐
│               Zookeeper Cluster             │
│  ┌─────────┐  ┌─────────┐  ┌─────────┐     │
│  │   ZK1   │  │   ZK2   │  │   ZK3   │     │
│  └─────────┘  └─────────┘  └─────────┘     │
└─────────────────────┬───────────────────────┘
                      │ 选主
        ┌─────────────┼─────────────┐
        │             │             │
        ▼             ▼             ▼
   ┌─────────┐   ┌─────────┐   ┌─────────┐
   │ Master1 │   │ Master2 │   │ Master3 │
   │ (Active)│   │(Standby)│   │(Standby)│
   └─────────┘   └─────────┘   └─────────┘
  • 使用 Zookeeper 进行 Master 选主
  • Active Master 负责调度,Standby 热备
  • Master 故障时自动切换,恢复时间 < 30s
  • 调度状态持久化到 MySQL,支持故障恢复

Worker 弹性伸缩

public class WorkerAutoScaler {
    private final KubernetesClient k8sClient;

    @Scheduled(fixedRate = 60000)
    public void autoScale() {
        // 获取当前队列积压
        int queueSize = getQueueSize();
        int currentWorkers = getCurrentWorkerCount();
        int runningTasks = getRunningTaskCount();

        // 计算目标 Worker 数量
        int targetWorkers = calculateTargetWorkers(queueSize, runningTasks);

        // 限制在 min-max 范围内
        targetWorkers = Math.max(MIN_WORKERS, Math.min(MAX_WORKERS, targetWorkers));

        if (targetWorkers != currentWorkers) {
            scaleWorkers(targetWorkers);
            log.info("Worker 扩缩容: {} -> {}", currentWorkers, targetWorkers);
        }
    }
}

5. 数据库设计

5.1 核心表结构

-- DAG 定义表
CREATE TABLE t_dag (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    name VARCHAR(255) NOT NULL,
    description TEXT,
    project_id BIGINT NOT NULL,
    status TINYINT DEFAULT 1 COMMENT '1-上线 0-下线',
    schedule_config JSON,
    alert_config JSON,
    cron_expression VARCHAR(100) COMMENT 'Cron 调度表达式',
    schedule_enabled TINYINT DEFAULT 0 COMMENT '是否启用调度 0-否 1-是',
    next_fire_time DATETIME COMMENT '下次执行时间',
    created_by BIGINT,
    created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
    updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    INDEX idx_project (project_id),
    INDEX idx_status (status),
    INDEX idx_schedule (schedule_enabled, next_fire_time)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

-- 任务定义表
CREATE TABLE t_task (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    dag_id BIGINT NOT NULL,
    name VARCHAR(255) NOT NULL,
    type VARCHAR(50) NOT NULL COMMENT 'SQL/SHELL/SPARK/FLINK/PYTHON',
    content LONGTEXT,
    params JSON,
    retry_config JSON,
    resource_config JSON,
    position_x INT,
    position_y INT,
    created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
    updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    INDEX idx_dag (dag_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

-- 任务依赖表
CREATE TABLE t_task_dependency (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    dag_id BIGINT NOT NULL,
    source_task_id BIGINT NOT NULL,
    target_task_id BIGINT NOT NULL,
    dependency_type VARCHAR(20) DEFAULT 'STRONG',
    INDEX idx_dag (dag_id),
    INDEX idx_source (source_task_id),
    INDEX idx_target (target_task_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

-- DAG 实例表
CREATE TABLE t_dag_instance (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    dag_id BIGINT NOT NULL,
    schedule_time DATETIME NOT NULL COMMENT '调度时间',
    status TINYINT NOT NULL COMMENT '状态',
    start_time DATETIME,
    end_time DATETIME,
    trigger_type VARCHAR(20) COMMENT 'MANUAL/SCHEDULED/EVENT',
    run_params JSON,
    INDEX idx_dag_schedule (dag_id, schedule_time),
    INDEX idx_status (status)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

-- 任务实例表
CREATE TABLE t_task_instance (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    task_id BIGINT NOT NULL,
    dag_instance_id BIGINT NOT NULL,
    status TINYINT NOT NULL,
    start_time DATETIME,
    end_time DATETIME,
    worker_id VARCHAR(100),
    retry_count INT DEFAULT 0,
    log_path VARCHAR(500),
    log_content LONGTEXT COMMENT '任务执行日志内容',
    exit_code INT,
    error_message VARCHAR(2000) COMMENT '错误信息',
    INDEX idx_dag_instance (dag_instance_id),
    INDEX idx_task (task_id),
    INDEX idx_status_time (status, start_time)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

6. API 设计

6.1 DAG 管理

# 获取 DAG 列表(按项目)
GET /api/v1/dags?projectId={projectId}

# 创建 DAG
POST /api/v1/dags
Request:
  name: string
  description: string
  projectId: number

# 获取 DAG 详情
GET /api/v1/dags/{dagId}

# 更新 DAG
PUT /api/v1/dags/{dagId}

# 删除 DAG
DELETE /api/v1/dags/{dagId}

# 获取 DAG 下的任务列表
GET /api/v1/dags/{dagId}/tasks

# 上线 DAG
POST /api/v1/dags/{dagId}/online

# 下线 DAG
POST /api/v1/dags/{dagId}/offline

6.2 画布管理

# 保存 DAG 画布(任务节点和依赖边)
POST /api/v1/dags/{dagId}/canvas
Request:
  tasks: TaskNode[]
  edges: Edge[]

# 加载 DAG 画布
GET /api/v1/dags/{dagId}/canvas

6.3 DAG 校验

# 校验 DAG(循环依赖检测)
POST /api/v1/dags/{dagId}/validate
Request:
  edges: Edge[]  # 画布中的边列表
    - source: string  # 源节点 ID
    - target: string  # 目标节点 ID
Response:
  valid: boolean    # 是否通过校验
  message: string   # 校验结果描述

使用 DFS 深度优先搜索检测图中是否存在环。空边列表或无 edges 字段时直接返回校验通过。

6.4 任务执行

# 手动触发 DAG
POST /api/v1/dags/{dagId}/trigger
Request:
  params: object  # 可选运行参数

# 获取 DAG 执行历史
GET /api/v1/dags/{dagId}/instances
Query:
  limit: number  # 默认 20

# 获取 DAG 实例详情
GET /api/v1/dags/instances/{instanceId}

# 获取 DAG 实例状态
GET /api/v1/dags/instances/{instanceId}/status

# 获取任务实例详情
GET /api/v1/dags/task-instances/{taskInstanceId}

# 获取任务实例日志
GET /api/v1/dags/task-instances/{taskInstanceId}/log

6.5 调度管理

# 获取 DAG 调度配置
GET /api/v1/dags/{dagId}/schedule
Response:
  dagId: number
  cronExpression: string
  cronDescription: string
  enabled: boolean
  nextFireTime: datetime

# 更新调度配置
PUT /api/v1/dags/{dagId}/schedule
Request:
  cronExpression: string
  enabled: boolean

# 启用调度
POST /api/v1/dags/{dagId}/schedule/enable
Request:
  cronExpression: string

# 禁用调度
POST /api/v1/dags/{dagId}/schedule/disable

# 验证 Cron 表达式
POST /api/v1/dags/schedule/validate-cron
Request:
  cronExpression: string
Response:
  valid: boolean
  description: string
  nextFireTime: datetime

7. 性能优化

7.1 调度性能优化

  • 分片调度:按 DAG ID 分片,多 Master 并行调度
  • 批量操作:批量更新任务状态,减少数据库压力
  • 缓存:DAG 定义缓存到 Redis,减少数据库查询
  • 异步处理:任务分发、状态更新异步化

7.2 执行性能优化

  • 任务预热:提前拉取即将执行的任务
  • 资源池化:执行器线程池复用
  • 本地缓存:Worker 本地缓存任务脚本

8. 监控指标

指标 说明 告警阈值
scheduler.dag.running 运行中 DAG 数量 -
scheduler.task.queued 队列中任务数量 > 10000
scheduler.task.running 运行中任务数量 -
scheduler.dispatch.latency 调度延迟 P99 > 1s
scheduler.task.success_rate 任务成功率 < 95%
worker.cpu.usage Worker CPU 使用率 > 80%
worker.memory.usage Worker 内存使用率 > 80%