转换技能Skill transform

转换技能用于执行SQL或Python数据转换任务,支持从源表读取数据并写入目标表,具备依赖管理、多种写入策略和递归执行功能。适用于数据工程、ETL开发、数据仓库构建和数据处理流程,关键词包括SQL转换、Python脚本、数据管道、ETL工具、数据集成、数据科学、数据库管理。

ETL开发 0 次安装 0 次浏览 更新于 3/15/2026

名称: 转换 描述: 运行SQL或Python转换任务

转换技能

运行定义的转换任务。任务是SQL或Python脚本,从源表读取数据并将结果写入目标表。任务可以具有依赖关系,支持多种写入策略,并可以递归执行其所有上游依赖关系。

用法

starlake transform [选项]

选项

  • --name <值>: 任务名称,格式为域名.任务(除非使用--tags,否则必需)
  • --compile: 仅返回最终编译的SQL查询而不执行
  • --sync-apply: 更新YAML属性以匹配SQL查询列
  • --sync-preview: 预览将匹配SQL查询的YAML属性更改
  • --query <值>: 运行此SQL查询而非任务文件中定义的查询
  • --dry-run: 仅干运行——编译和验证而不执行(支持BigQuery)
  • --tags <值>: 运行所有匹配这些标签的任务
  • --format: 美化打印最终SQL查询并更新.sql文件
  • --interactive <值>: 运行查询并显示结果而不下沉。格式:csvjsontablejson-array
  • --reload: 执行前从磁盘重新加载YAML文件(用于服务器模式)
  • --truncate: 在插入前强制截断目标表
  • --pageSize <值>: 每页记录数(用于交互模式)
  • --pageNumber <值>: 要显示的页码(用于交互模式)
  • --recursive: 递归执行此任务的所有上游依赖关系
  • --test: 在测试模式下运行而不提交更改
  • --accessToken <值>: 访问令牌用于认证(例如GCP)
  • --options k1=v1,k2=v2: SQL模板的替换参数
  • --scheduledDate <值>: 作业的调度日期,格式:yyyy-MM-dd'T'HH:mm:ss.SSSZ
  • --reportFormat <值>: 报告输出格式:consolejsonhtml

配置上下文

转换任务定义在metadata/transform/目录中。

转换域配置(metadata/transform/{域名}/_config.sl.yml

设置域中所有任务的默认属性:

# metadata/transform/kpi/_config.sl.yml
版本: 1
转换:
  默认:
    写入策略:
      类型: 覆盖

SQL转换文件(metadata/transform/{域名}/{任务}.sql

包含SQL查询。使用{{域名}}.表语法引用源表:

-- metadata/transform/kpi/收入汇总.sql
SELECT
    o.订单ID,
    o.时间戳 AS 订单日期,
    SUM(ol.数量 * ol.销售价格) AS 总收入
FROM
    starbake.订单 o
    JOIN starbake.订单行 ol ON o.订单ID = ol.订单ID
GROUP BY
    o.订单ID, o.时间戳

带有依赖关系的任务

任务可以引用其他任务的输出来形成有向无环图(DAG):

-- metadata/transform/kpi/订单汇总.sql
SELECT
    ps.订单ID,
    ps.订单日期,
    rs.总收入,
    ps.利润,
    ps.总销售单位
FROM
    kpi.产品汇总 ps
    JOIN kpi.收入汇总 rs ON ps.订单ID = rs.订单ID

任务YAML配置(metadata/transform/{域名}/{任务}.sl.yml

可选的YAML文件,用于配置写入策略、汇、期望等:

# metadata/transform/analytics/每日销售.sl.yml
版本: 1
任务:
  域: "analytics"
  表: "daily_sales"
  写入策略:
    类型: "按分区覆盖"
  汇:
    分区:
      - "报告日期"
    聚类:
      - "区域"
  connectionRef: "bigquery"
  期望:
    - 期望: "is_row_count_to_be_between(1, 1000000) => result(0) == 1"
      failOnError: true
  dagRef: "daily_analytics_dag"

Python转换(metadata/transform/{域名}/{任务}.py

Python转换必须创建一个名为SL_THIS的临时视图:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, count

spark = SparkSession.builder.getOrCreate()
df = spark.sql("SELECT * FROM sales.orders")
result = df.groupBy("customer_id").agg(
    count("*").alias("order_count"),
    sum("total_amount").alias("total_spent")
)
result.createOrReplaceTempView("SL_THIS")

示例

运行单个任务

starlake transform --name kpi.收入汇总

仅编译SQL(调试)

查看最终编译的SQL而不执行:

starlake transform --name kpi.订单汇总 --compile

运行带有递归依赖关系

执行任务及其所有上游依赖关系:

starlake transform --name kpi.订单汇总 --recursive

交互式查询(预览结果)

运行并将结果显示为表格而不写入目标:

starlake transform --name kpi.收入汇总 --interactive table

带分页的交互式查询

starlake transform --name kpi.收入汇总 --interactive json --pageSize 50 --pageNumber 1

干运行(BigQuery)

starlake transform --name kpi.订单汇总 --dry-run

运行所有带标签的任务

starlake transform --tags 每日

使用自定义选项运行

向SQL模板传递替换变量:

starlake transform --name kpi.收入汇总 --options 开始日期=2024-01-01,结束日期=2024-03-31

从SQL同步YAML属性

自动更新任务YAML属性以匹配SQL查询输出列:

starlake transform --name kpi.收入汇总 --sync-apply

美化打印和格式化SQL

starlake transform --name kpi.收入汇总 --format

测试转换

starlake transform --name kpi.收入汇总 --test

相关技能

  • load - 在转换前加载原始数据
  • lineage - 可视化任务依赖图
  • col-lineage - 任务的列级血缘
  • dag-generate - 生成编排DAG
  • test - 运行转换的集成测试
  • config - 配置参考(写入策略、连接)