name: data-lineage-mapper description: 从SQL、dbt、Airflow、Spark等多种数据源中提取并映射数据血缘关系,生成用于影响分析的全面血缘关系图。 version: 1.0.0 category: 数据治理 skill-id: SK-DEA-010 allowed-tools: Read, Grep, Glob, Bash, WebFetch
数据血缘关系映射器
从多种数据源中提取并映射数据血缘关系,以提供全面的数据流可见性。
概述
本技能解析并提取来自SQL查询、dbt项目、Airflow DAG和Spark作业的数据血缘信息。它生成从源头到目的地的全面数据血缘关系图,支持影响分析和数据治理。
能力
- 用于血缘提取的SQL解析 - 解析SELECT、INSERT、MERGE语句
- dbt血缘集成 - 从manifest.json中提取血缘
- Airflow任务血缘映射 - 跨DAG任务映射数据流
- Spark作业血缘提取 - 解析Spark SQL和DataFrame操作
- 跨系统血缘连接 - 连接不同工具间的血缘
- 列级血缘追踪 - 追踪单个列的转换过程
- 影响分析 - 下游/上游影响评估
- 血缘图生成 - 可视化和机器可读的血缘关系
- 与数据目录集成 - 导出至DataHub、Amundsen、Alation
输入模式
{
"sources": {
"type": "array",
"required": true,
"items": {
"type": {
"type": "string",
"enum": ["sql", "dbt", "airflow", "spark", "file"]
},
"content": {
"type": "string|object",
"description": "SQL字符串、文件路径或清单对象"
},
"metadata": {
"type": "object",
"properties": {
"database": "string",
"schema": "string",
"catalog": "string"
}
}
}
},
"existingLineage": {
"type": "object",
"description": "要合并的现有血缘图"
},
"targetCatalog": {
"type": "string",
"enum": ["datahub", "amundsen", "alation", "openlineage", "json"],
"default": "json",
"description": "血缘导出的目标格式"
},
"options": {
"type": "object",
"properties": {
"columnLevel": {
"type": "boolean",
"default": true,
"description": "提取列级血缘"
},
"resolveViews": {
"type": "boolean",
"default": false,
"description": "将视图解析为基础表"
},
"includeTemporary": {
"type": "boolean",
"default": false,
"description": "在血缘中包含临时表/CTE表"
}
}
}
}
输出模式
{
"lineageGraph": {
"type": "object",
"properties": {
"nodes": {
"type": "array",
"items": {
"id": "string",
"type": "table|view|file|external",
"name": "string",
"database": "string",
"schema": "string",
"columns": "array"
}
},
"edges": {
"type": "array",
"items": {
"source": "string",
"target": "string",
"transformationType": "string",
"sql": "string"
}
}
}
},
"columnLineage": {
"type": "array",
"items": {
"targetColumn": {
"table": "string",
"column": "string"
},
"sourceColumns": {
"type": "array",
"items": {
"table": "string",
"column": "string",
"transformation": "string"
}
},
"transformationLogic": "string"
}
},
"impactAnalysis": {
"type": "object",
"properties": {
"upstream": {
"type": "array",
"description": "所有上游依赖项"
},
"downstream": {
"type": "array",
"description": "所有下游依赖项"
},
"criticalPath": {
"type": "array",
"description": "最重要的血缘路径"
}
}
},
"catalogIntegration": {
"type": "object",
"description": "目标目录的导出格式",
"properties": {
"format": "string",
"payload": "object|string"
}
},
"statistics": {
"tablesCount": "number",
"columnsCount": "number",
"edgesCount": "number",
"maxDepth": "number"
}
}
使用示例
SQL查询血缘
{
"sources": [
{
"type": "sql",
"content": "INSERT INTO analytics.fct_orders SELECT o.order_id, c.customer_name FROM staging.orders o JOIN staging.customers c ON o.customer_id = c.id",
"metadata": {
"database": "warehouse",
"schema": "analytics"
}
}
],
"options": {
"columnLevel": true
}
}
dbt项目血缘
{
"sources": [
{
"type": "dbt",
"content": "./target/manifest.json"
}
],
"targetCatalog": "datahub",
"options": {
"resolveViews": true
}
}
多源血缘
{
"sources": [
{
"type": "dbt",
"content": "./analytics/target/manifest.json"
},
{
"type": "airflow",
"content": "./dags/etl_pipeline.py"
},
{
"type": "sql",
"content": "SELECT * FROM external_db.customers"
}
],
"targetCatalog": "openlineage"
}
表变更影响分析
{
"sources": [
{
"type": "dbt",
"content": "./target/manifest.json"
}
],
"options": {
"columnLevel": true,
"impactAnalysisTarget": "raw.customers"
}
}
血缘提取方法
SQL解析
| 语句类型 | 提取信息 |
|---|---|
| SELECT | 源表、列映射 |
| INSERT INTO…SELECT | 目标表、源表 |
| CREATE TABLE AS | 新表、源血缘 |
| MERGE | 目标、源、更新/插入列 |
| UPDATE…FROM | 目标表、源连接表 |
dbt清单
从manifest.json中提取:
- 通过
ref()和source()的模型依赖关系 - 来自
catalog.json的列级血缘 - 测试依赖关系
- 文档链接
Airflow DAG
从以下内容映射血缘:
- XCom数据传递
- 操作符源/目标参数
- 代表数据流的任务依赖关系
- 外部任务传感器
Spark作业
从以下内容解析血缘:
- Spark SQL查询
- DataFrame操作(连接、选择、分组)
- 读/写操作
- 目录表引用
列级血缘
转换类型
| 类型 | 示例 | 血缘 |
|---|---|---|
| 直接 | SELECT customer_id |
1:1映射 |
| 重命名 | customer_id AS cust_id |
重命名映射 |
| 表达式 | CONCAT(first, last) AS name |
多列 → 单列 |
| 聚合 | SUM(amount) AS total |
多 → 单(带聚合) |
| 条件 | CASE WHEN... |
条件映射 |
示例输出
{
"columnLineage": [
{
"targetColumn": {
"table": "fct_orders",
"column": "customer_name"
},
"sourceColumns": [
{
"table": "stg_customers",
"column": "first_name",
"transformation": "CONCAT"
},
{
"table": "stg_customers",
"column": "last_name",
"transformation": "CONCAT"
}
],
"transformationLogic": "CONCAT(first_name, ' ', last_name)"
}
]
}
目录导出格式
DataHub
{
"format": "datahub",
"payload": {
"entities": [...],
"relationships": [...]
}
}
OpenLineage
{
"format": "openlineage",
"payload": {
"eventType": "COMPLETE",
"run": {...},
"job": {...},
"inputs": [...],
"outputs": [...]
}
}
Amundsen
{
"format": "amundsen",
"payload": {
"tables": [...],
"columns": [...],
"lineage": [...]
}
}
集成点
MCP服务器集成
- dbt MCP - 直接清单访问
- 数据库MCP - 模式和视图解析
- MindsDB - 跨平台血缘
相关技能
- dbt项目分析器 (SK-DEA-003) - dbt血缘分析
- 数据目录增强器 (SK-DEA-017) - 目录元数据增强
适用流程
- 数据血缘映射 (
data-lineage.js) - 数据目录 (
data-catalog.js) - dbt项目设置 (
dbt-project-setup.js)
参考资料
版本历史
- 1.0.0 - 支持多源血缘提取的初始版本