名称: airflow-dag-analyzer
描述: 分析、验证和优化 Apache Airflow DAG,以确保其可靠性、性能并遵循最佳实践。
版本: 1.0.0
类别: 编排
技能ID: SK-DEA-002
允许工具: Read, Grep, Glob, Bash, WebFetch
Airflow DAG 分析器
分析、验证和优化 Apache Airflow DAG,以提高可靠性和性能。
概述
此技能检查 Apache Airflow DAG 定义,以识别性能瓶颈、可靠性问题和最佳实践违规。它提供任务依赖关系优化、并行配置、错误处理和资源管理的建议。
能力
- DAG 结构分析与验证 - 解析和验证 DAG 结构
- 任务依赖关系优化 - 识别瓶颈并建议并行执行
- 并行性与并发性建议 - 优化池和插槽分配
- SLA 和超时配置 - 建议适当的超时和 SLA
- 重试和故障处理模式 - 验证重试逻辑和告警
- 传感器优化 - 智能传感器、可延迟操作符、重调度模式
- 资源池分配 - 优化池使用和工作节点分布
- DAG 调度优化 - 追赶、回填和调度间隔调优
- 跨 DAG 依赖检测 - 识别外部依赖和触发器
输入模式
{
"dagCode": {
"type": "string",
"description": "Python DAG 定义代码",
"required": true
},
"dagId": {
"type": "string",
"description": "DAG 标识符"
},
"executionHistory": {
"type": "object",
"description": "历史执行指标",
"properties": {
"runs": {
"type": "array",
"items": {
"dagRunId": "string",
"executionDate": "string",
"duration": "number",
"state": "string",
"taskDurations": "object"
}
}
}
},
"clusterConfig": {
"type": "object",
"properties": {
"workerCount": "number",
"executorType": "string",
"poolConfigs": "object",
"airflowVersion": "string"
}
},
"analysisScope": {
"type": "array",
"items": {
"type": "string",
"enum": ["structure", "performance", "reliability", "resources", "security"]
},
"default": ["structure", "performance", "reliability"]
}
}
输出模式
{
"validationResults": {
"errors": {
"type": "array",
"items": {
"code": "string",
"message": "string",
"line": "number",
"severity": "error"
}
},
"warnings": {
"type": "array",
"items": {
"code": "string",
"message": "string",
"line": "number",
"severity": "warning"
}
}
},
"optimizations": {
"type": "array",
"items": {
"category": "string",
"current": "string",
"recommended": "string",
"impact": "high|medium|low",
"effort": "string",
"codeChange": "string"
}
},
"recommendedConfig": {
"type": "object",
"properties": {
"poolSize": "number",
"maxActiveRuns": "number",
"concurrency": "number",
"defaultRetries": "number",
"executionTimeout": "string"
}
},
"dependencyGraph": {
"type": "object",
"properties": {
"nodes": "array",
"edges": "array",
"criticalPath": "array",
"parallelGroups": "array"
}
},
"metrics": {
"taskCount": "number",
"maxDepth": "number",
"parallelizationRatio": "number",
"estimatedDuration": "string"
},
"securityFindings": {
"type": "array",
"items": {
"severity": "high|medium|low",
"finding": "string",
"recommendation": "string"
}
}
}
使用示例
基础 DAG 分析
{
"dagCode": "from airflow import DAG
from airflow.operators.python import PythonOperator
...",
"dagId": "daily_etl_pipeline"
}
带执行历史
{
"dagCode": "...",
"dagId": "daily_etl_pipeline",
"executionHistory": {
"runs": [
{
"dagRunId": "manual__2024-01-15",
"duration": 3600,
"state": "success",
"taskDurations": {
"extract": 600,
"transform": 1800,
"load": 1200
}
}
]
}
}
完整分析(含集群配置)
{
"dagCode": "...",
"dagId": "complex_ml_pipeline",
"clusterConfig": {
"workerCount": 8,
"executorType": "KubernetesExecutor",
"poolConfigs": {
"default_pool": {"slots": 128},
"ml_pool": {"slots": 32}
},
"airflowVersion": "2.8.0"
},
"analysisScope": ["structure", "performance", "reliability", "resources", "security"]
}
验证规则
DAG 定义规则
| 规则 |
严重性 |
描述 |
| DAG-001 |
错误 |
缺少 DAG default_args |
| DAG-002 |
错误 |
无效的 schedule_interval |
| DAG-003 |
警告 |
长时间运行的 DAG 启用了 Catchup |
| DAG-004 |
警告 |
未配置失败时发送邮件 |
| DAG-005 |
信息 |
考虑使用 @dag 装饰器 |
任务定义规则
| 规则 |
严重性 |
描述 |
| TSK-001 |
错误 |
任务没有上游或下游 |
| TSK-002 |
警告 |
任务缺少重试配置 |
| TSK-003 |
警告 |
未设置执行超时 |
| TSK-004 |
警告 |
PythonOperator 未指定池 |
| TSK-005 |
信息 |
考虑使用 TaskGroup 组织相关任务 |
传感器规则
| 规则 |
严重性 |
描述 |
| SEN-001 |
警告 |
传感器处于 poke 模式(应使用 reschedule) |
| SEN-002 |
警告 |
传感器缺少超时设置 |
| SEN-003 |
信息 |
考虑使用可延迟操作符 |
| SEN-004 |
警告 |
外部传感器未设置 soft_fail |
安全规则
| 规则 |
严重性 |
描述 |
| SEC-001 |
错误 |
硬编码凭据 |
| SEC-002 |
警告 |
使用 Variable.get 时未设置默认值 |
| SEC-003 |
警告 |
连接 ID 未参数化 |
| SEC-004 |
信息 |
考虑使用 Secrets Backend |
优化模式
并行化
# 之前:顺序执行
task1 >> task2 >> task3 >> task4
# 之后:尽可能并行执行
task1 >> [task2, task3] >> task4
传感器优化
# 之前:Poke 模式(占用工作节点)
FileSensor(
task_id='wait_for_file',
filepath='/data/input.csv',
mode='poke' # 不佳
)
# 之后:Reschedule 模式(释放工作节点)
FileSensor(
task_id='wait_for_file',
filepath='/data/input.csv',
mode='reschedule', # 良好
poke_interval=300
)
# 最佳:可延迟操作符(Airflow 2.2+)
from airflow.sensors.filesystem import FileSensor
FileSensor(
task_id='wait_for_file',
filepath='/data/input.csv',
deferrable=True
)
TaskGroups
# 之前:扁平的任务结构
extract_orders >> transform_orders >> load_orders
extract_products >> transform_products >> load_products
# 之后:使用 TaskGroups 进行组织
with TaskGroup('orders') as orders_group:
extract >> transform >> load
with TaskGroup('products') as products_group:
extract >> transform >> load
动态任务映射(Airflow 2.3+)
# 之前:静态任务生成
for i in range(10):
PythonOperator(task_id=f'process_{i}', ...)
# 之后:动态任务映射
@task
def process_item(item):
return item * 2
process_item.expand(item=[1, 2, 3, 4, 5])
配置建议
Default Args 模板
default_args = {
'owner': 'data-team',
'depends_on_past': False,
'email': ['alerts@company.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=5),
'retry_exponential_backoff': True,
'max_retry_delay': timedelta(minutes=30),
'execution_timeout': timedelta(hours=2),
'sla': timedelta(hours=1),
}
池配置
| 工作负载类型 |
建议池大小 |
| 高计算量 |
每工作节点 2-4 个 |
| I/O 密集型 |
每工作节点 8-16 个 |
| API 调用 |
基于速率限制 |
| 传感器 |
独立池,高插槽数 |
集成点
MCP 服务器集成
- yangkyeongmo/mcp-server-apache-airflow - Airflow REST API 集成
- Dagster MCP - 替代编排模式
- Prefect MCP - 现代编排对比
相关技能
- dbt 项目分析器 (SK-DEA-003) - dbt 操作符优化
- 数据血缘映射器 (SK-DEA-010) - 任务血缘提取
适用流程
- ETL/ELT 管道 (
etl-elt-pipeline.js)
- A/B 测试管道 (
ab-testing-pipeline.js)
- 管道迁移 (
pipeline-migration.js)
- 数据质量框架 (
data-quality-framework.js)
参考资料
版本历史
- 1.0.0 - 初始版本,支持 Airflow 2.x