数据血缘关系映射器 data-lineage-mapper

数据血缘关系映射器是一款用于数据治理的专业工具,能够从SQL、dbt、Airflow、Spark等多种数据源中自动提取、分析和可视化数据血缘关系。它支持列级血缘追踪、上下游影响分析,并能生成兼容DataHub、OpenLineage等主流数据目录的标准化血缘图谱,帮助企业实现数据资产的透明化管理、变更影响评估和数据合规性审计。

数据治理 0 次安装 0 次浏览 更新于 2/23/2026

name: data-lineage-mapper description: 从SQL、dbt、Airflow、Spark等多种数据源中提取并映射数据血缘关系,生成用于影响分析的全面血缘关系图。 version: 1.0.0 category: 数据治理 skill-id: SK-DEA-010 allowed-tools: Read, Grep, Glob, Bash, WebFetch

数据血缘关系映射器

从多种数据源中提取并映射数据血缘关系,以提供全面的数据流可见性。

概述

本技能解析并提取来自SQL查询、dbt项目、Airflow DAG和Spark作业的数据血缘信息。它生成从源头到目的地的全面数据血缘关系图,支持影响分析和数据治理。

能力

  • 用于血缘提取的SQL解析 - 解析SELECT、INSERT、MERGE语句
  • dbt血缘集成 - 从manifest.json中提取血缘
  • Airflow任务血缘映射 - 跨DAG任务映射数据流
  • Spark作业血缘提取 - 解析Spark SQL和DataFrame操作
  • 跨系统血缘连接 - 连接不同工具间的血缘
  • 列级血缘追踪 - 追踪单个列的转换过程
  • 影响分析 - 下游/上游影响评估
  • 血缘图生成 - 可视化和机器可读的血缘关系
  • 与数据目录集成 - 导出至DataHub、Amundsen、Alation

输入模式

{
  "sources": {
    "type": "array",
    "required": true,
    "items": {
      "type": {
        "type": "string",
        "enum": ["sql", "dbt", "airflow", "spark", "file"]
      },
      "content": {
        "type": "string|object",
        "description": "SQL字符串、文件路径或清单对象"
      },
      "metadata": {
        "type": "object",
        "properties": {
          "database": "string",
          "schema": "string",
          "catalog": "string"
        }
      }
    }
  },
  "existingLineage": {
    "type": "object",
    "description": "要合并的现有血缘图"
  },
  "targetCatalog": {
    "type": "string",
    "enum": ["datahub", "amundsen", "alation", "openlineage", "json"],
    "default": "json",
    "description": "血缘导出的目标格式"
  },
  "options": {
    "type": "object",
    "properties": {
      "columnLevel": {
        "type": "boolean",
        "default": true,
        "description": "提取列级血缘"
      },
      "resolveViews": {
        "type": "boolean",
        "default": false,
        "description": "将视图解析为基础表"
      },
      "includeTemporary": {
        "type": "boolean",
        "default": false,
        "description": "在血缘中包含临时表/CTE表"
      }
    }
  }
}

输出模式

{
  "lineageGraph": {
    "type": "object",
    "properties": {
      "nodes": {
        "type": "array",
        "items": {
          "id": "string",
          "type": "table|view|file|external",
          "name": "string",
          "database": "string",
          "schema": "string",
          "columns": "array"
        }
      },
      "edges": {
        "type": "array",
        "items": {
          "source": "string",
          "target": "string",
          "transformationType": "string",
          "sql": "string"
        }
      }
    }
  },
  "columnLineage": {
    "type": "array",
    "items": {
      "targetColumn": {
        "table": "string",
        "column": "string"
      },
      "sourceColumns": {
        "type": "array",
        "items": {
          "table": "string",
          "column": "string",
          "transformation": "string"
        }
      },
      "transformationLogic": "string"
    }
  },
  "impactAnalysis": {
    "type": "object",
    "properties": {
      "upstream": {
        "type": "array",
        "description": "所有上游依赖项"
      },
      "downstream": {
        "type": "array",
        "description": "所有下游依赖项"
      },
      "criticalPath": {
        "type": "array",
        "description": "最重要的血缘路径"
      }
    }
  },
  "catalogIntegration": {
    "type": "object",
    "description": "目标目录的导出格式",
    "properties": {
      "format": "string",
      "payload": "object|string"
    }
  },
  "statistics": {
    "tablesCount": "number",
    "columnsCount": "number",
    "edgesCount": "number",
    "maxDepth": "number"
  }
}

使用示例

SQL查询血缘

{
  "sources": [
    {
      "type": "sql",
      "content": "INSERT INTO analytics.fct_orders SELECT o.order_id, c.customer_name FROM staging.orders o JOIN staging.customers c ON o.customer_id = c.id",
      "metadata": {
        "database": "warehouse",
        "schema": "analytics"
      }
    }
  ],
  "options": {
    "columnLevel": true
  }
}

dbt项目血缘

{
  "sources": [
    {
      "type": "dbt",
      "content": "./target/manifest.json"
    }
  ],
  "targetCatalog": "datahub",
  "options": {
    "resolveViews": true
  }
}

多源血缘

{
  "sources": [
    {
      "type": "dbt",
      "content": "./analytics/target/manifest.json"
    },
    {
      "type": "airflow",
      "content": "./dags/etl_pipeline.py"
    },
    {
      "type": "sql",
      "content": "SELECT * FROM external_db.customers"
    }
  ],
  "targetCatalog": "openlineage"
}

表变更影响分析

{
  "sources": [
    {
      "type": "dbt",
      "content": "./target/manifest.json"
    }
  ],
  "options": {
    "columnLevel": true,
    "impactAnalysisTarget": "raw.customers"
  }
}

血缘提取方法

SQL解析

语句类型 提取信息
SELECT 源表、列映射
INSERT INTO…SELECT 目标表、源表
CREATE TABLE AS 新表、源血缘
MERGE 目标、源、更新/插入列
UPDATE…FROM 目标表、源连接表

dbt清单

manifest.json中提取:

  • 通过ref()source()的模型依赖关系
  • 来自catalog.json的列级血缘
  • 测试依赖关系
  • 文档链接

Airflow DAG

从以下内容映射血缘:

  • XCom数据传递
  • 操作符源/目标参数
  • 代表数据流的任务依赖关系
  • 外部任务传感器

Spark作业

从以下内容解析血缘:

  • Spark SQL查询
  • DataFrame操作(连接、选择、分组)
  • 读/写操作
  • 目录表引用

列级血缘

转换类型

类型 示例 血缘
直接 SELECT customer_id 1:1映射
重命名 customer_id AS cust_id 重命名映射
表达式 CONCAT(first, last) AS name 多列 → 单列
聚合 SUM(amount) AS total 多 → 单(带聚合)
条件 CASE WHEN... 条件映射

示例输出

{
  "columnLineage": [
    {
      "targetColumn": {
        "table": "fct_orders",
        "column": "customer_name"
      },
      "sourceColumns": [
        {
          "table": "stg_customers",
          "column": "first_name",
          "transformation": "CONCAT"
        },
        {
          "table": "stg_customers",
          "column": "last_name",
          "transformation": "CONCAT"
        }
      ],
      "transformationLogic": "CONCAT(first_name, ' ', last_name)"
    }
  ]
}

目录导出格式

DataHub

{
  "format": "datahub",
  "payload": {
    "entities": [...],
    "relationships": [...]
  }
}

OpenLineage

{
  "format": "openlineage",
  "payload": {
    "eventType": "COMPLETE",
    "run": {...},
    "job": {...},
    "inputs": [...],
    "outputs": [...]
  }
}

Amundsen

{
  "format": "amundsen",
  "payload": {
    "tables": [...],
    "columns": [...],
    "lineage": [...]
  }
}

集成点

MCP服务器集成

  • dbt MCP - 直接清单访问
  • 数据库MCP - 模式和视图解析
  • MindsDB - 跨平台血缘

相关技能

  • dbt项目分析器 (SK-DEA-003) - dbt血缘分析
  • 数据目录增强器 (SK-DEA-017) - 目录元数据增强

适用流程

  • 数据血缘映射 (data-lineage.js)
  • 数据目录 (data-catalog.js)
  • dbt项目设置 (dbt-project-setup.js)

参考资料

版本历史

  • 1.0.0 - 支持多源血缘提取的初始版本